OLD | NEW |
---|---|
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "device/serial/data_sink_receiver.h" | 5 #include "device/serial/data_sink_receiver.h" |
6 | 6 |
7 #include <limits> | 7 #include <limits> |
8 | 8 |
9 #include "base/bind.h" | 9 #include "base/bind.h" |
10 #include "device/serial/async_waiter.h" | 10 #include "base/message_loop/message_loop.h" |
11 | 11 |
12 namespace device { | 12 namespace device { |
13 | 13 |
14 // Represents a flush of data that has not been completed. | 14 // A ReadOnlyBuffer implementation that provides a view of a buffer owned by a |
15 class DataSinkReceiver::PendingFlush { | 15 // DataSinkReceiver. |
16 public: | |
17 PendingFlush(); | |
18 | |
19 // Initializes this PendingFlush with |num_bytes|, the number of bytes to | |
20 // flush. | |
21 void SetNumBytesToFlush(uint32_t num_bytes); | |
22 | |
23 // Attempts to discard |bytes_to_flush_| bytes from |handle|. Returns | |
24 // MOJO_RESULT_OK on success, MOJO_RESULT_SHOULD_WAIT if fewer than | |
25 // |bytes_to_flush_| bytes were flushed or the error if one is encountered | |
26 // discarding data from |handle|. | |
27 MojoResult Flush(mojo::DataPipeConsumerHandle handle); | |
28 | |
29 // Whether this PendingFlush has received the number of bytes to flush. | |
30 bool received_flush() { return received_flush_; } | |
31 | |
32 private: | |
33 // Whether this PendingFlush has received the number of bytes to flush. | |
34 bool received_flush_; | |
35 | |
36 // The remaining number of bytes to flush. | |
37 uint32_t bytes_to_flush_; | |
38 }; | |
39 | |
40 // A ReadOnlyBuffer implementation that provides a view of a data pipe owned by | |
41 // a DataSinkReceiver. | |
42 class DataSinkReceiver::Buffer : public ReadOnlyBuffer { | 16 class DataSinkReceiver::Buffer : public ReadOnlyBuffer { |
43 public: | 17 public: |
44 Buffer(scoped_refptr<DataSinkReceiver> receiver, | 18 Buffer(scoped_refptr<DataSinkReceiver> receiver, |
45 const char* buffer, | 19 const char* buffer, |
46 uint32_t buffer_size); | 20 uint32_t buffer_size); |
47 virtual ~Buffer(); | 21 virtual ~Buffer(); |
48 | 22 |
49 void Cancel(int32_t error); | 23 void Cancel(int32_t error); |
50 | 24 |
51 // ReadOnlyBuffer overrides. | 25 // ReadOnlyBuffer overrides. |
52 virtual const char* GetData() override; | 26 virtual const char* GetData() override; |
53 virtual uint32_t GetSize() override; | 27 virtual uint32_t GetSize() override; |
54 virtual void Done(uint32_t bytes_read) override; | 28 virtual void Done(uint32_t bytes_read) override; |
55 virtual void DoneWithError(uint32_t bytes_read, int32_t error) override; | 29 virtual void DoneWithError(uint32_t bytes_read, int32_t error) override; |
56 | 30 |
57 private: | 31 private: |
58 // The DataSinkReceiver whose data pipe we are providing a view. | 32 // The DataSinkReceiver of whose buffer we are providing a view. |
59 scoped_refptr<DataSinkReceiver> receiver_; | 33 scoped_refptr<DataSinkReceiver> receiver_; |
60 | 34 |
61 const char* buffer_; | 35 const char* buffer_; |
62 uint32_t buffer_size_; | 36 uint32_t buffer_size_; |
63 | 37 |
64 // Whether this receive has been cancelled. | 38 // Whether this receive has been cancelled. |
65 bool cancelled_; | 39 bool cancelled_; |
66 | 40 |
67 // If |cancelled_|, contains the cancellation error to report. | 41 // If |cancelled_|, contains the cancellation error to report. |
68 int32_t cancellation_error_; | 42 int32_t cancellation_error_; |
69 }; | 43 }; |
70 | 44 |
45 // A frame of data received from the client. | |
46 class DataSinkReceiver::DataFrame { | |
47 public: | |
48 explicit DataFrame(mojo::Array<uint8_t> data); | |
49 | |
50 // Returns the number of uncomsumed bytes remaining of this data frame. | |
51 uint32_t remaining_bytes() { | |
52 return static_cast<uint32_t>(data_.size() - offset_); | |
53 } | |
54 | |
55 // Returns a pointer to the remaining data to be consumed. | |
56 const char* data() { | |
57 DCHECK_LT(offset_, data_.size()); | |
58 return reinterpret_cast<const char*>(&data_[0]) + offset_; | |
59 } | |
raymes
2014/10/27 03:02:22
nit: since we're not inlining OnDataConsumed we mi
Sam McNally
2014/10/27 05:39:14
Done.
| |
60 | |
61 // Reports that |bytes_read| bytes have been consumed. | |
62 void OnDataConsumed(uint32_t bytes_read); | |
63 | |
64 private: | |
65 mojo::Array<uint8_t> data_; | |
66 uint32_t offset_; | |
67 }; | |
68 | |
71 DataSinkReceiver::DataSinkReceiver(const ReadyCallback& ready_callback, | 69 DataSinkReceiver::DataSinkReceiver(const ReadyCallback& ready_callback, |
72 const CancelCallback& cancel_callback, | 70 const CancelCallback& cancel_callback, |
73 const ErrorCallback& error_callback) | 71 const ErrorCallback& error_callback) |
74 : ready_callback_(ready_callback), | 72 : ready_callback_(ready_callback), |
75 cancel_callback_(cancel_callback), | 73 cancel_callback_(cancel_callback), |
76 error_callback_(error_callback), | 74 error_callback_(error_callback), |
75 flush_pending_(false), | |
77 buffer_in_use_(NULL), | 76 buffer_in_use_(NULL), |
77 initialized_(false), | |
78 available_buffer_capacity_(0), | |
78 shut_down_(false), | 79 shut_down_(false), |
79 weak_factory_(this) { | 80 weak_factory_(this) { |
80 } | 81 } |
81 | 82 |
82 void DataSinkReceiver::ShutDown() { | 83 void DataSinkReceiver::ShutDown() { |
83 shut_down_ = true; | 84 shut_down_ = true; |
84 if (waiter_) | |
85 waiter_.reset(); | |
86 } | 85 } |
87 | 86 |
88 DataSinkReceiver::~DataSinkReceiver() { | 87 DataSinkReceiver::~DataSinkReceiver() { |
89 } | 88 } |
90 | 89 |
91 void DataSinkReceiver::Init(mojo::ScopedDataPipeConsumerHandle handle) { | 90 void DataSinkReceiver::Init(uint32_t buffer_size) { |
92 if (handle_.is_valid()) { | 91 if (initialized_) { |
93 DispatchFatalError(); | 92 ShutDown(); |
94 return; | 93 return; |
95 } | 94 } |
96 | 95 initialized_ = true; |
97 handle_ = handle.Pass(); | 96 available_buffer_capacity_ = buffer_size; |
98 StartWaiting(); | |
99 } | 97 } |
100 | 98 |
101 void DataSinkReceiver::Cancel(int32_t error) { | 99 void DataSinkReceiver::Cancel(int32_t error) { |
102 // If we have sent a ReportBytesSentAndError but have not received the | 100 // If we have sent a ReportBytesSentAndError but have not received the |
103 // response, that ReportBytesSentAndError message will appear to the | 101 // response, that ReportBytesSentAndError message will appear to the |
104 // DataSinkClient to be caused by this Cancel message. In that case, we ignore | 102 // DataSinkClient to be caused by this Cancel message. In that case, we ignore |
105 // the cancel. | 103 // the cancel. |
106 if (!pending_flushes_.empty() && !pending_flushes_.back()->received_flush()) | 104 if (flush_pending_) |
107 return; | 105 return; |
108 | 106 |
109 // If there is a buffer is in use, mark the buffer as cancelled and notify the | 107 // If there is a buffer is in use, mark the buffer as cancelled and notify the |
110 // client by calling |cancel_callback_|. The sink implementation may or may | 108 // client by calling |cancel_callback_|. The sink implementation may or may |
111 // not take the cancellation into account when deciding what error (if any) to | 109 // not take the cancellation into account when deciding what error (if any) to |
112 // return. If the sink returns an error, we ignore the cancellation error. | 110 // return. If the sink returns an error, we ignore the cancellation error. |
113 // Otherwise, if the sink does not report an error, we override that with the | 111 // Otherwise, if the sink does not report an error, we override that with the |
114 // cancellation error. Once a cancellation has been received, the next report | 112 // cancellation error. Once a cancellation has been received, the next report |
115 // sent to the client will always contain an error; the error returned by the | 113 // sent to the client will always contain an error; the error returned by the |
116 // sink or the cancellation error if the sink does not return an error. | 114 // sink or the cancellation error if the sink does not return an error. |
117 if (buffer_in_use_) { | 115 if (buffer_in_use_) { |
118 buffer_in_use_->Cancel(error); | 116 buffer_in_use_->Cancel(error); |
119 if (!cancel_callback_.is_null()) | 117 if (!cancel_callback_.is_null()) |
120 cancel_callback_.Run(error); | 118 cancel_callback_.Run(error); |
121 return; | 119 return; |
122 } | 120 } |
123 // If there is no buffer in use, immediately report the error and cancel the | |
124 // waiting for the data pipe if one exists. This transitions straight into the | |
125 // state after the sink has returned an error. | |
126 waiter_.reset(); | |
127 ReportBytesSentAndError(0, error); | 121 ReportBytesSentAndError(0, error); |
128 } | 122 } |
129 | 123 |
124 void DataSinkReceiver::OnData(mojo::Array<uint8_t> data) { | |
125 if (!initialized_) { | |
126 ShutDown(); | |
127 return; | |
128 } | |
129 if (data.size() > available_buffer_capacity_) { | |
130 ShutDown(); | |
131 return; | |
132 } | |
133 available_buffer_capacity_ -= static_cast<uint32_t>(data.size()); | |
134 pending_data_buffers_.push(linked_ptr<DataFrame>(new DataFrame(data.Pass()))); | |
135 if (!buffer_in_use_ && !flush_pending_) | |
136 RunReadyCallback(); | |
137 } | |
138 | |
130 void DataSinkReceiver::OnConnectionError() { | 139 void DataSinkReceiver::OnConnectionError() { |
131 DispatchFatalError(); | 140 DispatchFatalError(); |
132 } | 141 } |
133 | 142 |
134 void DataSinkReceiver::StartWaiting() { | 143 void DataSinkReceiver::RunReadyCallback() { |
135 DCHECK(!waiter_ && !shut_down_); | 144 DCHECK(!shut_down_ && !flush_pending_); |
136 waiter_.reset( | 145 // If data arrives while a call to RunReadyCallback() is posted, we can be |
137 new AsyncWaiter(handle_.get(), | 146 // called with buffer_in_use_ already set. |
138 MOJO_HANDLE_SIGNAL_READABLE, | 147 if (buffer_in_use_) |
139 base::Bind(&DataSinkReceiver::OnDoneWaiting, this))); | |
140 } | |
141 | |
142 void DataSinkReceiver::OnDoneWaiting(MojoResult result) { | |
143 DCHECK(waiter_ && !shut_down_); | |
144 waiter_.reset(); | |
145 if (result != MOJO_RESULT_OK) { | |
146 DispatchFatalError(); | |
147 return; | 148 return; |
148 } | 149 buffer_in_use_ = new Buffer(this, |
149 // If there are any queued flushes (from ReportBytesSentAndError()), let them | 150 pending_data_buffers_.front()->data(), |
150 // flush data from the data pipe. | 151 pending_data_buffers_.front()->remaining_bytes()); |
151 if (!pending_flushes_.empty()) { | |
152 MojoResult result = pending_flushes_.front()->Flush(handle_.get()); | |
153 if (result == MOJO_RESULT_OK) { | |
154 pending_flushes_.pop(); | |
155 } else if (result != MOJO_RESULT_SHOULD_WAIT) { | |
156 DispatchFatalError(); | |
157 return; | |
158 } | |
159 StartWaiting(); | |
160 return; | |
161 } | |
162 const void* data = NULL; | |
163 uint32_t num_bytes = std::numeric_limits<uint32_t>::max(); | |
164 result = mojo::BeginReadDataRaw( | |
165 handle_.get(), &data, &num_bytes, MOJO_READ_DATA_FLAG_NONE); | |
166 if (result != MOJO_RESULT_OK) { | |
167 DispatchFatalError(); | |
168 return; | |
169 } | |
170 buffer_in_use_ = new Buffer(this, static_cast<const char*>(data), num_bytes); | |
171 ready_callback_.Run(scoped_ptr<ReadOnlyBuffer>(buffer_in_use_)); | 152 ready_callback_.Run(scoped_ptr<ReadOnlyBuffer>(buffer_in_use_)); |
172 } | 153 } |
173 | 154 |
174 void DataSinkReceiver::Done(uint32_t bytes_read) { | 155 void DataSinkReceiver::Done(uint32_t bytes_read) { |
175 if (!DoneInternal(bytes_read)) | 156 if (!DoneInternal(bytes_read)) |
176 return; | 157 return; |
177 client()->ReportBytesSent(bytes_read); | 158 client()->ReportBytesSent(bytes_read); |
178 StartWaiting(); | 159 if (!pending_data_buffers_.empty()) { |
160 base::MessageLoop::current()->PostTask( | |
161 FROM_HERE, | |
162 base::Bind(&DataSinkReceiver::RunReadyCallback, | |
163 weak_factory_.GetWeakPtr())); | |
164 } | |
179 } | 165 } |
180 | 166 |
181 void DataSinkReceiver::DoneWithError(uint32_t bytes_read, int32_t error) { | 167 void DataSinkReceiver::DoneWithError(uint32_t bytes_read, int32_t error) { |
182 if (!DoneInternal(bytes_read)) | 168 if (!DoneInternal(bytes_read)) |
183 return; | 169 return; |
184 ReportBytesSentAndError(bytes_read, error); | 170 ReportBytesSentAndError(bytes_read, error); |
185 } | 171 } |
186 | 172 |
187 bool DataSinkReceiver::DoneInternal(uint32_t bytes_read) { | 173 bool DataSinkReceiver::DoneInternal(uint32_t bytes_read) { |
188 if (shut_down_) | 174 if (shut_down_) |
189 return false; | 175 return false; |
190 | 176 |
191 DCHECK(buffer_in_use_); | 177 DCHECK(buffer_in_use_); |
192 buffer_in_use_ = NULL; | 178 buffer_in_use_ = NULL; |
193 MojoResult result = mojo::EndReadDataRaw(handle_.get(), bytes_read); | 179 available_buffer_capacity_ += bytes_read; |
194 if (result != MOJO_RESULT_OK) { | 180 pending_data_buffers_.front()->OnDataConsumed(bytes_read); |
195 DispatchFatalError(); | 181 if (pending_data_buffers_.front()->remaining_bytes() == 0) |
196 return false; | 182 pending_data_buffers_.pop(); |
197 } | |
198 return true; | 183 return true; |
199 } | 184 } |
200 | 185 |
201 void DataSinkReceiver::ReportBytesSentAndError(uint32_t bytes_read, | 186 void DataSinkReceiver::ReportBytesSentAndError(uint32_t bytes_read, |
202 int32_t error) { | 187 int32_t error) { |
203 // When we encounter an error, we must discard the data from any sends already | 188 // When we encounter an error, we must discard the data from any send buffers |
204 // in the data pipe before we can resume dispatching data to the sink. We add | 189 // transmitted by the DataSinkClient before it receives this error. |
205 // a pending flush here. The response containing the number of bytes to flush | 190 flush_pending_ = true; |
206 // is handled in SetNumBytesToFlush(). The actual flush is handled in | |
207 // OnDoneWaiting(). | |
208 pending_flushes_.push(linked_ptr<PendingFlush>(new PendingFlush())); | |
209 client()->ReportBytesSentAndError( | 191 client()->ReportBytesSentAndError( |
210 bytes_read, | 192 bytes_read, |
211 error, | 193 error, |
212 base::Bind(&DataSinkReceiver::SetNumBytesToFlush, | 194 base::Bind(&DataSinkReceiver::DoFlush, weak_factory_.GetWeakPtr())); |
213 weak_factory_.GetWeakPtr())); | |
214 } | 195 } |
215 | 196 |
216 void DataSinkReceiver::SetNumBytesToFlush(uint32_t bytes_to_flush) { | 197 void DataSinkReceiver::DoFlush() { |
217 DCHECK(!pending_flushes_.empty()); | 198 DCHECK(flush_pending_); |
218 DCHECK(!pending_flushes_.back()->received_flush()); | 199 flush_pending_ = false; |
219 pending_flushes_.back()->SetNumBytesToFlush(bytes_to_flush); | 200 while (!pending_data_buffers_.empty()) { |
220 if (!waiter_) | 201 available_buffer_capacity_ += |
221 StartWaiting(); | 202 pending_data_buffers_.front()->remaining_bytes(); |
203 pending_data_buffers_.pop(); | |
204 } | |
222 } | 205 } |
223 | 206 |
224 void DataSinkReceiver::DispatchFatalError() { | 207 void DataSinkReceiver::DispatchFatalError() { |
225 if (shut_down_) | 208 if (shut_down_) |
226 return; | 209 return; |
227 | 210 |
228 ShutDown(); | 211 ShutDown(); |
229 if (!error_callback_.is_null()) | 212 if (!error_callback_.is_null()) |
230 error_callback_.Run(); | 213 error_callback_.Run(); |
231 } | 214 } |
(...skipping 24 matching lines...) Expand all Loading... | |
256 | 239 |
257 const char* DataSinkReceiver::Buffer::GetData() { | 240 const char* DataSinkReceiver::Buffer::GetData() { |
258 return buffer_; | 241 return buffer_; |
259 } | 242 } |
260 | 243 |
261 uint32_t DataSinkReceiver::Buffer::GetSize() { | 244 uint32_t DataSinkReceiver::Buffer::GetSize() { |
262 return buffer_size_; | 245 return buffer_size_; |
263 } | 246 } |
264 | 247 |
265 void DataSinkReceiver::Buffer::Done(uint32_t bytes_read) { | 248 void DataSinkReceiver::Buffer::Done(uint32_t bytes_read) { |
249 scoped_refptr<DataSinkReceiver> receiver = receiver_; | |
250 receiver_ = nullptr; | |
266 if (cancelled_) | 251 if (cancelled_) |
267 receiver_->DoneWithError(bytes_read, cancellation_error_); | 252 receiver->DoneWithError(bytes_read, cancellation_error_); |
268 else | 253 else |
269 receiver_->Done(bytes_read); | 254 receiver->Done(bytes_read); |
270 receiver_ = NULL; | |
271 buffer_ = NULL; | 255 buffer_ = NULL; |
272 buffer_size_ = 0; | 256 buffer_size_ = 0; |
273 } | 257 } |
274 | 258 |
275 void DataSinkReceiver::Buffer::DoneWithError(uint32_t bytes_read, | 259 void DataSinkReceiver::Buffer::DoneWithError(uint32_t bytes_read, |
276 int32_t error) { | 260 int32_t error) { |
277 receiver_->DoneWithError(bytes_read, error); | 261 scoped_refptr<DataSinkReceiver> receiver = receiver_; |
278 receiver_ = NULL; | 262 receiver_ = nullptr; |
263 receiver->DoneWithError(bytes_read, error); | |
279 buffer_ = NULL; | 264 buffer_ = NULL; |
280 buffer_size_ = 0; | 265 buffer_size_ = 0; |
281 } | 266 } |
282 | 267 |
283 DataSinkReceiver::PendingFlush::PendingFlush() | 268 DataSinkReceiver::DataFrame::DataFrame(mojo::Array<uint8_t> data) |
284 : received_flush_(false), bytes_to_flush_(0) { | 269 : data_(data.Pass()), offset_(0) { |
270 DCHECK_LT(0u, data_.size()); | |
285 } | 271 } |
286 | 272 |
287 void DataSinkReceiver::PendingFlush::SetNumBytesToFlush(uint32_t num_bytes) { | 273 void DataSinkReceiver::DataFrame::OnDataConsumed(uint32_t bytes_read) { |
288 DCHECK(!received_flush_); | 274 offset_ += bytes_read; |
289 received_flush_ = true; | 275 DCHECK_LE(offset_, data_.size()); |
290 bytes_to_flush_ = num_bytes; | |
291 } | |
292 | |
293 MojoResult DataSinkReceiver::PendingFlush::Flush( | |
294 mojo::DataPipeConsumerHandle handle) { | |
295 DCHECK(received_flush_); | |
296 uint32_t num_bytes = bytes_to_flush_; | |
297 MojoResult result = | |
298 mojo::ReadDataRaw(handle, NULL, &num_bytes, MOJO_READ_DATA_FLAG_DISCARD); | |
299 if (result != MOJO_RESULT_OK) | |
300 return result; | |
301 DCHECK(num_bytes <= bytes_to_flush_); | |
302 bytes_to_flush_ -= num_bytes; | |
303 return bytes_to_flush_ == 0 ? MOJO_RESULT_OK : MOJO_RESULT_SHOULD_WAIT; | |
304 } | 276 } |
305 | 277 |
306 } // namespace device | 278 } // namespace device |
OLD | NEW |