OLD | NEW |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "device/serial/data_sink_receiver.h" | 5 #include "device/serial/data_sink_receiver.h" |
6 | 6 |
7 #include <limits> | 7 #include <limits> |
8 | 8 |
9 #include "base/bind.h" | 9 #include "base/bind.h" |
10 #include "base/message_loop/message_loop.h" | 10 #include "base/message_loop/message_loop.h" |
(...skipping 27 matching lines...) Expand all Loading... |
38 // Whether this receive has been cancelled. | 38 // Whether this receive has been cancelled. |
39 bool cancelled_; | 39 bool cancelled_; |
40 | 40 |
41 // If |cancelled_|, contains the cancellation error to report. | 41 // If |cancelled_|, contains the cancellation error to report. |
42 int32_t cancellation_error_; | 42 int32_t cancellation_error_; |
43 }; | 43 }; |
44 | 44 |
45 // A frame of data received from the client. | 45 // A frame of data received from the client. |
46 class DataSinkReceiver::DataFrame { | 46 class DataSinkReceiver::DataFrame { |
47 public: | 47 public: |
48 explicit DataFrame(mojo::Array<uint8_t> data); | 48 explicit DataFrame(mojo::Array<uint8_t> data, |
| 49 const mojo::Callback<void(uint32_t, int32_t)>& callback); |
49 | 50 |
50 // Returns the number of uncomsumed bytes remaining of this data frame. | 51 // Returns the number of unconsumed bytes remaining of this data frame. |
51 uint32_t GetRemainingBytes(); | 52 uint32_t GetRemainingBytes(); |
52 | 53 |
53 // Returns a pointer to the remaining data to be consumed. | 54 // Returns a pointer to the remaining data to be consumed. |
54 const char* GetData(); | 55 const char* GetData(); |
55 | 56 |
56 // Reports that |bytes_read| bytes have been consumed. | 57 // Reports that |bytes_read| bytes have been consumed. |
57 void OnDataConsumed(uint32_t bytes_read); | 58 void OnDataConsumed(uint32_t bytes_read); |
58 | 59 |
| 60 // Reports that an error occurred. |
| 61 void ReportError(uint32_t bytes_read, int32_t error); |
| 62 |
59 private: | 63 private: |
60 mojo::Array<uint8_t> data_; | 64 mojo::Array<uint8_t> data_; |
61 uint32_t offset_; | 65 uint32_t offset_; |
| 66 const mojo::Callback<void(uint32_t, int32_t)> callback_; |
62 }; | 67 }; |
63 | 68 |
64 DataSinkReceiver::DataSinkReceiver(const ReadyCallback& ready_callback, | 69 DataSinkReceiver::DataSinkReceiver( |
65 const CancelCallback& cancel_callback, | 70 mojo::InterfaceRequest<serial::DataSink> request, |
66 const ErrorCallback& error_callback) | 71 const ReadyCallback& ready_callback, |
67 : ready_callback_(ready_callback), | 72 const CancelCallback& cancel_callback, |
| 73 const ErrorCallback& error_callback) |
| 74 : binding_(this, request.Pass()), |
| 75 ready_callback_(ready_callback), |
68 cancel_callback_(cancel_callback), | 76 cancel_callback_(cancel_callback), |
69 error_callback_(error_callback), | 77 error_callback_(error_callback), |
70 flush_pending_(false), | 78 current_error_(0), |
71 buffer_in_use_(NULL), | 79 buffer_in_use_(NULL), |
72 initialized_(false), | |
73 available_buffer_capacity_(0), | |
74 shut_down_(false), | 80 shut_down_(false), |
75 weak_factory_(this) { | 81 weak_factory_(this) { |
| 82 binding_.set_error_handler(this); |
76 } | 83 } |
77 | 84 |
78 void DataSinkReceiver::ShutDown() { | 85 void DataSinkReceiver::ShutDown() { |
79 shut_down_ = true; | 86 shut_down_ = true; |
80 } | 87 } |
81 | 88 |
82 DataSinkReceiver::~DataSinkReceiver() { | 89 DataSinkReceiver::~DataSinkReceiver() { |
83 } | 90 } |
84 | 91 |
85 void DataSinkReceiver::Init(uint32_t buffer_size) { | |
86 if (initialized_) { | |
87 ShutDown(); | |
88 return; | |
89 } | |
90 initialized_ = true; | |
91 available_buffer_capacity_ = buffer_size; | |
92 } | |
93 | |
94 void DataSinkReceiver::Cancel(int32_t error) { | 92 void DataSinkReceiver::Cancel(int32_t error) { |
95 // If we have sent a ReportBytesSentAndError but have not received the | 93 // If we have sent a ReportBytesSentAndError but have not received the |
96 // response, that ReportBytesSentAndError message will appear to the | 94 // response, that ReportBytesSentAndError message will appear to the |
97 // DataSinkClient to be caused by this Cancel message. In that case, we ignore | 95 // DataSinkClient to be caused by this Cancel message. In that case, we ignore |
98 // the cancel. | 96 // the cancel. |
99 if (flush_pending_) | 97 if (current_error_) |
100 return; | 98 return; |
101 | 99 |
102 // If there is a buffer is in use, mark the buffer as cancelled and notify the | 100 // If there is a buffer is in use, mark the buffer as cancelled and notify the |
103 // client by calling |cancel_callback_|. The sink implementation may or may | 101 // client by calling |cancel_callback_|. The sink implementation may or may |
104 // not take the cancellation into account when deciding what error (if any) to | 102 // not take the cancellation into account when deciding what error (if any) to |
105 // return. If the sink returns an error, we ignore the cancellation error. | 103 // return. If the sink returns an error, we ignore the cancellation error. |
106 // Otherwise, if the sink does not report an error, we override that with the | 104 // Otherwise, if the sink does not report an error, we override that with the |
107 // cancellation error. Once a cancellation has been received, the next report | 105 // cancellation error. Once a cancellation has been received, the next report |
108 // sent to the client will always contain an error; the error returned by the | 106 // sent to the client will always contain an error; the error returned by the |
109 // sink or the cancellation error if the sink does not return an error. | 107 // sink or the cancellation error if the sink does not return an error. |
110 if (buffer_in_use_) { | 108 if (buffer_in_use_) { |
111 buffer_in_use_->Cancel(error); | 109 buffer_in_use_->Cancel(error); |
112 if (!cancel_callback_.is_null()) | 110 if (!cancel_callback_.is_null()) |
113 cancel_callback_.Run(error); | 111 cancel_callback_.Run(error); |
114 return; | 112 return; |
115 } | 113 } |
116 ReportBytesSentAndError(0, error); | 114 ReportError(0, error); |
117 } | 115 } |
118 | 116 |
119 void DataSinkReceiver::OnData(mojo::Array<uint8_t> data) { | 117 void DataSinkReceiver::OnData( |
120 if (!initialized_) { | 118 mojo::Array<uint8_t> data, |
121 ShutDown(); | 119 const mojo::Callback<void(uint32_t, int32_t)>& callback) { |
| 120 if (current_error_) { |
| 121 callback.Run(0, current_error_); |
122 return; | 122 return; |
123 } | 123 } |
124 if (data.size() > available_buffer_capacity_) { | 124 pending_data_buffers_.push( |
125 ShutDown(); | 125 linked_ptr<DataFrame>(new DataFrame(data.Pass(), callback))); |
126 return; | 126 if (!buffer_in_use_) |
127 } | |
128 available_buffer_capacity_ -= static_cast<uint32_t>(data.size()); | |
129 pending_data_buffers_.push(linked_ptr<DataFrame>(new DataFrame(data.Pass()))); | |
130 if (!buffer_in_use_ && !flush_pending_) | |
131 RunReadyCallback(); | 127 RunReadyCallback(); |
132 } | 128 } |
133 | 129 |
134 void DataSinkReceiver::OnConnectionError() { | 130 void DataSinkReceiver::OnConnectionError() { |
135 DispatchFatalError(); | 131 DispatchFatalError(); |
136 } | 132 } |
137 | 133 |
138 void DataSinkReceiver::RunReadyCallback() { | 134 void DataSinkReceiver::RunReadyCallback() { |
139 DCHECK(!shut_down_ && !flush_pending_); | 135 DCHECK(!shut_down_ && !current_error_); |
140 // If data arrives while a call to RunReadyCallback() is posted, we can be | 136 // If data arrives while a call to RunReadyCallback() is posted, we can be |
141 // called with buffer_in_use_ already set. | 137 // called with buffer_in_use_ already set. |
142 if (buffer_in_use_) | 138 if (buffer_in_use_) |
143 return; | 139 return; |
144 buffer_in_use_ = | 140 buffer_in_use_ = |
145 new Buffer(this, | 141 new Buffer(this, |
146 pending_data_buffers_.front()->GetData(), | 142 pending_data_buffers_.front()->GetData(), |
147 pending_data_buffers_.front()->GetRemainingBytes()); | 143 pending_data_buffers_.front()->GetRemainingBytes()); |
148 ready_callback_.Run(scoped_ptr<ReadOnlyBuffer>(buffer_in_use_)); | 144 ready_callback_.Run(scoped_ptr<ReadOnlyBuffer>(buffer_in_use_)); |
149 } | 145 } |
150 | 146 |
151 void DataSinkReceiver::Done(uint32_t bytes_read) { | 147 void DataSinkReceiver::Done(uint32_t bytes_read) { |
152 if (!DoneInternal(bytes_read)) | 148 if (!DoneInternal(bytes_read)) |
153 return; | 149 return; |
154 client()->ReportBytesSent(bytes_read); | 150 pending_data_buffers_.front()->OnDataConsumed(bytes_read); |
| 151 if (pending_data_buffers_.front()->GetRemainingBytes() == 0) |
| 152 pending_data_buffers_.pop(); |
155 if (!pending_data_buffers_.empty()) { | 153 if (!pending_data_buffers_.empty()) { |
156 base::MessageLoop::current()->PostTask( | 154 base::MessageLoop::current()->PostTask( |
157 FROM_HERE, | 155 FROM_HERE, |
158 base::Bind(&DataSinkReceiver::RunReadyCallback, | 156 base::Bind(&DataSinkReceiver::RunReadyCallback, |
159 weak_factory_.GetWeakPtr())); | 157 weak_factory_.GetWeakPtr())); |
160 } | 158 } |
161 } | 159 } |
162 | 160 |
163 void DataSinkReceiver::DoneWithError(uint32_t bytes_read, int32_t error) { | 161 void DataSinkReceiver::DoneWithError(uint32_t bytes_read, int32_t error) { |
164 if (!DoneInternal(bytes_read)) | 162 if (!DoneInternal(bytes_read)) |
165 return; | 163 return; |
166 ReportBytesSentAndError(bytes_read, error); | 164 ReportError(bytes_read, error); |
167 } | 165 } |
168 | 166 |
169 bool DataSinkReceiver::DoneInternal(uint32_t bytes_read) { | 167 bool DataSinkReceiver::DoneInternal(uint32_t bytes_read) { |
170 if (shut_down_) | 168 if (shut_down_) |
171 return false; | 169 return false; |
172 | 170 |
173 DCHECK(buffer_in_use_); | 171 DCHECK(buffer_in_use_); |
174 buffer_in_use_ = NULL; | 172 buffer_in_use_ = NULL; |
175 available_buffer_capacity_ += bytes_read; | |
176 pending_data_buffers_.front()->OnDataConsumed(bytes_read); | |
177 if (pending_data_buffers_.front()->GetRemainingBytes() == 0) | |
178 pending_data_buffers_.pop(); | |
179 return true; | 173 return true; |
180 } | 174 } |
181 | 175 |
182 void DataSinkReceiver::ReportBytesSentAndError(uint32_t bytes_read, | 176 void DataSinkReceiver::ReportError(uint32_t bytes_read, int32_t error) { |
183 int32_t error) { | |
184 // When we encounter an error, we must discard the data from any send buffers | 177 // When we encounter an error, we must discard the data from any send buffers |
185 // transmitted by the DataSinkClient before it receives this error. | 178 // transmitted by the DataSink client before it receives this error. |
186 flush_pending_ = true; | 179 DCHECK(error); |
187 client()->ReportBytesSentAndError( | 180 current_error_ = error; |
188 bytes_read, | 181 while (!pending_data_buffers_.empty()) { |
189 error, | 182 pending_data_buffers_.front()->ReportError(bytes_read, error); |
190 base::Bind(&DataSinkReceiver::DoFlush, weak_factory_.GetWeakPtr())); | 183 pending_data_buffers_.pop(); |
| 184 bytes_read = 0; |
| 185 } |
191 } | 186 } |
192 | 187 |
193 void DataSinkReceiver::DoFlush() { | 188 void DataSinkReceiver::ClearError() { |
194 DCHECK(flush_pending_); | 189 current_error_ = 0; |
195 flush_pending_ = false; | |
196 while (!pending_data_buffers_.empty()) { | |
197 available_buffer_capacity_ += | |
198 pending_data_buffers_.front()->GetRemainingBytes(); | |
199 pending_data_buffers_.pop(); | |
200 } | |
201 } | 190 } |
202 | 191 |
203 void DataSinkReceiver::DispatchFatalError() { | 192 void DataSinkReceiver::DispatchFatalError() { |
204 if (shut_down_) | 193 if (shut_down_) |
205 return; | 194 return; |
206 | 195 |
207 ShutDown(); | 196 ShutDown(); |
208 if (!error_callback_.is_null()) | 197 if (!error_callback_.is_null()) |
209 error_callback_.Run(); | 198 error_callback_.Run(); |
210 } | 199 } |
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
254 | 243 |
255 void DataSinkReceiver::Buffer::DoneWithError(uint32_t bytes_read, | 244 void DataSinkReceiver::Buffer::DoneWithError(uint32_t bytes_read, |
256 int32_t error) { | 245 int32_t error) { |
257 scoped_refptr<DataSinkReceiver> receiver = receiver_; | 246 scoped_refptr<DataSinkReceiver> receiver = receiver_; |
258 receiver_ = nullptr; | 247 receiver_ = nullptr; |
259 receiver->DoneWithError(bytes_read, error); | 248 receiver->DoneWithError(bytes_read, error); |
260 buffer_ = NULL; | 249 buffer_ = NULL; |
261 buffer_size_ = 0; | 250 buffer_size_ = 0; |
262 } | 251 } |
263 | 252 |
264 DataSinkReceiver::DataFrame::DataFrame(mojo::Array<uint8_t> data) | 253 DataSinkReceiver::DataFrame::DataFrame( |
265 : data_(data.Pass()), offset_(0) { | 254 mojo::Array<uint8_t> data, |
| 255 const mojo::Callback<void(uint32_t, int32_t)>& callback) |
| 256 : data_(data.Pass()), offset_(0), callback_(callback) { |
266 DCHECK_LT(0u, data_.size()); | 257 DCHECK_LT(0u, data_.size()); |
267 } | 258 } |
268 | 259 |
269 // Returns the number of uncomsumed bytes remaining of this data frame. | 260 // Returns the number of uncomsumed bytes remaining of this data frame. |
270 uint32_t DataSinkReceiver::DataFrame::GetRemainingBytes() { | 261 uint32_t DataSinkReceiver::DataFrame::GetRemainingBytes() { |
271 return static_cast<uint32_t>(data_.size() - offset_); | 262 return static_cast<uint32_t>(data_.size() - offset_); |
272 } | 263 } |
273 | 264 |
274 // Returns a pointer to the remaining data to be consumed. | 265 // Returns a pointer to the remaining data to be consumed. |
275 const char* DataSinkReceiver::DataFrame::GetData() { | 266 const char* DataSinkReceiver::DataFrame::GetData() { |
276 DCHECK_LT(offset_, data_.size()); | 267 DCHECK_LT(offset_, data_.size()); |
277 return reinterpret_cast<const char*>(&data_[0]) + offset_; | 268 return reinterpret_cast<const char*>(&data_[0]) + offset_; |
278 } | 269 } |
279 | 270 |
280 void DataSinkReceiver::DataFrame::OnDataConsumed(uint32_t bytes_read) { | 271 void DataSinkReceiver::DataFrame::OnDataConsumed(uint32_t bytes_read) { |
281 offset_ += bytes_read; | 272 offset_ += bytes_read; |
282 DCHECK_LE(offset_, data_.size()); | 273 DCHECK_LE(offset_, data_.size()); |
| 274 if (offset_ == data_.size()) |
| 275 callback_.Run(offset_, 0); |
| 276 } |
| 277 void DataSinkReceiver::DataFrame::ReportError(uint32_t bytes_read, |
| 278 int32_t error) { |
| 279 offset_ += bytes_read; |
| 280 DCHECK_LE(offset_, data_.size()); |
| 281 callback_.Run(offset_, error); |
283 } | 282 } |
284 | 283 |
285 } // namespace device | 284 } // namespace device |
OLD | NEW |