OLD | NEW |
---|---|
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "device/serial/data_receiver.h" | 5 #include "device/serial/data_receiver.h" |
6 | 6 |
7 #include <limits> | 7 #include <limits> |
8 | 8 |
9 #include "base/bind.h" | 9 #include "base/bind.h" |
10 #include "base/message_loop/message_loop.h" | 10 #include "base/message_loop/message_loop.h" |
11 #include "device/serial/async_waiter.h" | |
12 | 11 |
13 namespace device { | 12 namespace device { |
14 | 13 |
15 // Represents a receive that is not yet fulfilled. | 14 // Represents a receive that is not yet fulfilled. |
16 class DataReceiver::PendingReceive { | 15 class DataReceiver::PendingReceive { |
17 public: | 16 public: |
18 PendingReceive(DataReceiver* receiver, | 17 PendingReceive(DataReceiver* receiver, |
19 const ReceiveDataCallback& callback, | 18 const ReceiveDataCallback& callback, |
20 const ReceiveErrorCallback& error_callback, | 19 const ReceiveErrorCallback& error_callback, |
21 int32_t fatal_error_value); | 20 int32_t fatal_error_value); |
22 | 21 |
23 // Dispatches |data| to |receive_callback_|. | 22 // Dispatches |data| to |receive_callback_|. |
24 void DispatchData(const void* data, uint32_t num_bytes); | 23 bool DispatchDataFrame(DataReceiver::DataFrame* data); |
Ken Rockot(use gerrit already)
2014/10/28 04:59:33
nit: Please document the return value meaning - it
Sam McNally
2014/10/28 05:11:10
Done.
| |
25 | |
26 // Reports |error| to |receive_error_callback_| if it is an appropriate time. | |
27 // Returns whether it dispatched |error|. | |
28 bool DispatchError(DataReceiver::PendingError* error, | |
29 uint32_t bytes_received); | |
30 | 24 |
31 // Reports |fatal_error_value_| to |receive_error_callback_|. | 25 // Reports |fatal_error_value_| to |receive_error_callback_|. |
32 void DispatchFatalError(); | 26 void DispatchFatalError(); |
33 | 27 |
28 bool buffer_in_use() { return buffer_in_use_; } | |
29 | |
34 private: | 30 private: |
35 class Buffer; | 31 class Buffer; |
36 | 32 |
37 // Invoked when the user is finished with the ReadOnlyBuffer provided to | 33 // Invoked when the user is finished with the ReadOnlyBuffer provided to |
38 // |receive_callback_|. | 34 // |receive_callback_|. |
39 void Done(uint32_t num_bytes); | 35 void Done(uint32_t num_bytes); |
40 | 36 |
41 // The DataReceiver that owns this. | 37 // The DataReceiver that owns this. |
42 DataReceiver* receiver_; | 38 DataReceiver* receiver_; |
43 | 39 |
44 // The callback to dispatch data. | 40 // The callback to dispatch data. |
45 ReceiveDataCallback receive_callback_; | 41 ReceiveDataCallback receive_callback_; |
46 | 42 |
47 // The callback to report errors. | 43 // The callback to report errors. |
48 ReceiveErrorCallback receive_error_callback_; | 44 ReceiveErrorCallback receive_error_callback_; |
49 | 45 |
50 // The error value to report when DispatchFatalError() is called. | 46 // The error value to report when DispatchFatalError() is called. |
51 const int32_t fatal_error_value_; | 47 const int32_t fatal_error_value_; |
52 | 48 |
53 // True if the user owns a buffer passed to |receive_callback_| as part of | 49 // True if the user owns a buffer passed to |receive_callback_| as part of |
54 // DispatchData(). | 50 // DispatchDataFrame(). |
55 bool buffer_in_use_; | 51 bool buffer_in_use_; |
56 }; | 52 }; |
57 | 53 |
58 // A ReadOnlyBuffer implementation that provides a view of a data pipe owned by | 54 // A ReadOnlyBuffer implementation that provides a view of a buffer owned by a |
59 // a DataReceiver. | 55 // DataReceiver. |
60 class DataReceiver::PendingReceive::Buffer : public ReadOnlyBuffer { | 56 class DataReceiver::PendingReceive::Buffer : public ReadOnlyBuffer { |
61 public: | 57 public: |
62 Buffer(scoped_refptr<DataReceiver> pipe, | 58 Buffer(scoped_refptr<DataReceiver> pipe, |
63 PendingReceive* receive, | 59 PendingReceive* receive, |
64 const char* buffer, | 60 const char* buffer, |
65 uint32_t buffer_size); | 61 uint32_t buffer_size); |
66 ~Buffer() override; | 62 ~Buffer() override; |
67 | 63 |
68 // ReadOnlyBuffer overrides. | 64 // ReadOnlyBuffer overrides. |
69 const char* GetData() override; | 65 const char* GetData() override; |
70 uint32_t GetSize() override; | 66 uint32_t GetSize() override; |
71 void Done(uint32_t bytes_consumed) override; | 67 void Done(uint32_t bytes_consumed) override; |
72 void DoneWithError(uint32_t bytes_consumed, int32_t error) override; | 68 void DoneWithError(uint32_t bytes_consumed, int32_t error) override; |
73 | 69 |
74 private: | 70 private: |
75 // The DataReceiver whose data pipe we are providing a view. | 71 // The DataReceiver of whose buffer we are providing a view. |
76 scoped_refptr<DataReceiver> receiver_; | 72 scoped_refptr<DataReceiver> receiver_; |
77 | 73 |
78 // The PendingReceive to which this buffer has been created in response. | 74 // The PendingReceive to which this buffer has been created in response. |
79 PendingReceive* pending_receive_; | 75 PendingReceive* pending_receive_; |
80 | 76 |
81 const char* buffer_; | 77 const char* buffer_; |
82 uint32_t buffer_size_; | 78 uint32_t buffer_size_; |
83 }; | 79 }; |
84 | 80 |
85 // Represents an error received from the DataSource. | 81 // A buffer of data or an error received from the DataSource. |
86 struct DataReceiver::PendingError { | 82 struct DataReceiver::DataFrame { |
87 PendingError(uint32_t offset, int32_t error) | 83 explicit DataFrame(mojo::Array<uint8_t> data) |
88 : offset(offset), error(error), dispatched(false) {} | 84 : is_error(false), |
85 data(data.Pass()), | |
86 offset(0), | |
87 error(0), | |
88 dispatched(false) {} | |
89 | 89 |
90 // The location within the data stream where the error occurred. | 90 explicit DataFrame(int32_t error) |
91 const uint32_t offset; | 91 : is_error(true), offset(0), error(error), dispatched(false) {} |
92 | |
93 // Whether this DataFrame represents an error. | |
94 bool is_error; | |
95 | |
96 // The data received from the DataSource. | |
97 mojo::Array<uint8_t> data; | |
98 | |
99 // The offset within |data| at which the next read should begin. | |
100 uint32_t offset; | |
92 | 101 |
93 // The value of the error that occurred. | 102 // The value of the error that occurred. |
94 const int32_t error; | 103 const int32_t error; |
95 | 104 |
96 // Whether the error has been dispatched to the user. | 105 // Whether the error has been dispatched to the user. |
97 bool dispatched; | 106 bool dispatched; |
98 }; | 107 }; |
99 | 108 |
100 DataReceiver::DataReceiver(mojo::InterfacePtr<serial::DataSource> source, | 109 DataReceiver::DataReceiver(mojo::InterfacePtr<serial::DataSource> source, |
101 uint32_t buffer_size, | 110 uint32_t buffer_size, |
102 int32_t fatal_error_value) | 111 int32_t fatal_error_value) |
103 : source_(source.Pass()), | 112 : source_(source.Pass()), |
104 fatal_error_value_(fatal_error_value), | 113 fatal_error_value_(fatal_error_value), |
105 bytes_received_(0), | |
106 shut_down_(false), | 114 shut_down_(false), |
107 weak_factory_(this) { | 115 weak_factory_(this) { |
108 MojoCreateDataPipeOptions options = { | |
109 sizeof(options), MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1, buffer_size, | |
110 }; | |
111 mojo::ScopedDataPipeProducerHandle remote_handle; | |
112 MojoResult result = mojo::CreateDataPipe(&options, &remote_handle, &handle_); | |
113 DCHECK_EQ(MOJO_RESULT_OK, result); | |
114 source_->Init(remote_handle.Pass()); | |
115 source_.set_client(this); | 116 source_.set_client(this); |
117 source_.set_error_handler(this); | |
118 source_->Init(buffer_size); | |
116 } | 119 } |
117 | 120 |
118 bool DataReceiver::Receive(const ReceiveDataCallback& callback, | 121 bool DataReceiver::Receive(const ReceiveDataCallback& callback, |
119 const ReceiveErrorCallback& error_callback) { | 122 const ReceiveErrorCallback& error_callback) { |
120 DCHECK(!callback.is_null() && !error_callback.is_null()); | 123 DCHECK(!callback.is_null() && !error_callback.is_null()); |
121 if (pending_receive_ || shut_down_) | 124 if (pending_receive_ || shut_down_) |
122 return false; | 125 return false; |
123 // When the DataSource encounters an error, it pauses transmission. When the | 126 // When the DataSource encounters an error, it pauses transmission. When the |
124 // user starts a new receive following notification of the error (via | 127 // user starts a new receive following notification of the error (via |
125 // |error_callback| of the previous Receive call) of the error we can tell the | 128 // |error_callback| of the previous Receive call) of the error we can tell the |
126 // DataSource to resume transmission of data. | 129 // DataSource to resume transmission of data. |
127 if (pending_error_ && pending_error_->dispatched) { | 130 if (!pending_data_frames_.empty() && pending_data_frames_.front()->is_error && |
131 pending_data_frames_.front()->dispatched) { | |
128 source_->Resume(); | 132 source_->Resume(); |
129 pending_error_.reset(); | 133 pending_data_frames_.pop(); |
130 } | 134 } |
131 | 135 |
132 pending_receive_.reset( | 136 pending_receive_.reset( |
133 new PendingReceive(this, callback, error_callback, fatal_error_value_)); | 137 new PendingReceive(this, callback, error_callback, fatal_error_value_)); |
134 base::MessageLoop::current()->PostTask( | 138 ReceiveInternal(); |
135 FROM_HERE, | |
136 base::Bind(&DataReceiver::ReceiveInternal, weak_factory_.GetWeakPtr())); | |
137 return true; | 139 return true; |
138 } | 140 } |
139 | 141 |
140 DataReceiver::~DataReceiver() { | 142 DataReceiver::~DataReceiver() { |
141 ShutDown(); | 143 ShutDown(); |
142 } | 144 } |
143 | 145 |
144 void DataReceiver::OnError(uint32_t offset, int32_t error) { | 146 void DataReceiver::OnError(int32_t error) { |
145 if (shut_down_) | 147 if (shut_down_) |
146 return; | 148 return; |
147 | 149 |
148 if (pending_error_) { | 150 pending_data_frames_.push(linked_ptr<DataFrame>(new DataFrame(error))); |
149 // When OnError is called by the DataSource, transmission of data is | 151 if (pending_receive_) |
150 // suspended. Thus we shouldn't receive another call to OnError until we | 152 ReceiveInternal(); |
151 // have fully dealt with the error and called Resume to resume transmission | 153 } |
152 // (see Receive()). Under normal operation we should never get here, but if | 154 |
153 // we do (e.g. in the case of a hijacked service process) just shut down. | 155 void DataReceiver::OnData(mojo::Array<uint8_t> data) { |
154 ShutDown(); | 156 pending_data_frames_.push(linked_ptr<DataFrame>(new DataFrame(data.Pass()))); |
155 return; | 157 if (pending_receive_) |
156 } | 158 ReceiveInternal(); |
157 pending_error_.reset(new PendingError(offset, error)); | |
158 if (pending_receive_ && | |
159 pending_receive_->DispatchError(pending_error_.get(), bytes_received_)) { | |
160 pending_receive_.reset(); | |
161 waiter_.reset(); | |
162 } | |
163 } | 159 } |
164 | 160 |
165 void DataReceiver::OnConnectionError() { | 161 void DataReceiver::OnConnectionError() { |
166 ShutDown(); | 162 ShutDown(); |
167 } | 163 } |
168 | 164 |
169 void DataReceiver::Done(uint32_t bytes_consumed) { | 165 void DataReceiver::Done(uint32_t bytes_consumed) { |
170 if (shut_down_) | 166 if (shut_down_) |
171 return; | 167 return; |
172 | 168 |
173 DCHECK(pending_receive_); | 169 DCHECK(pending_receive_); |
174 MojoResult result = mojo::EndReadDataRaw(handle_.get(), bytes_consumed); | 170 DataFrame& pending_data = *pending_data_frames_.front(); |
175 DCHECK_EQ(MOJO_RESULT_OK, result); | 171 pending_data.offset += bytes_consumed; |
172 DCHECK_LE(pending_data.offset, pending_data.data.size()); | |
173 if (pending_data.offset == pending_data.data.size()) { | |
174 source_->ReportBytesReceived(pending_data.data.size()); | |
175 pending_data_frames_.pop(); | |
176 } | |
176 pending_receive_.reset(); | 177 pending_receive_.reset(); |
177 bytes_received_ += bytes_consumed; | |
178 } | |
179 | |
180 void DataReceiver::OnDoneWaiting(MojoResult result) { | |
181 DCHECK(pending_receive_ && !shut_down_ && waiter_); | |
182 waiter_.reset(); | |
183 if (result != MOJO_RESULT_OK) { | |
184 ShutDown(); | |
185 return; | |
186 } | |
187 ReceiveInternal(); | |
188 } | 178 } |
189 | 179 |
190 void DataReceiver::ReceiveInternal() { | 180 void DataReceiver::ReceiveInternal() { |
191 if (shut_down_) | 181 if (shut_down_) |
192 return; | 182 return; |
193 DCHECK(pending_receive_); | 183 DCHECK(pending_receive_); |
194 if (pending_error_ && | 184 if (pending_receive_->buffer_in_use()) |
195 pending_receive_->DispatchError(pending_error_.get(), bytes_received_)) { | 185 return; |
186 | |
187 if (!pending_data_frames_.empty() && | |
188 pending_receive_->DispatchDataFrame(pending_data_frames_.front().get())) { | |
196 pending_receive_.reset(); | 189 pending_receive_.reset(); |
197 waiter_.reset(); | |
198 return; | |
199 } | 190 } |
200 | |
201 const void* data; | |
202 uint32_t num_bytes = std::numeric_limits<uint32_t>::max(); | |
203 MojoResult result = mojo::BeginReadDataRaw( | |
204 handle_.get(), &data, &num_bytes, MOJO_READ_DATA_FLAG_NONE); | |
205 if (result == MOJO_RESULT_OK) { | |
206 if (!CheckErrorNotInReadRange(num_bytes)) { | |
207 ShutDown(); | |
208 return; | |
209 } | |
210 | |
211 pending_receive_->DispatchData(data, num_bytes); | |
212 return; | |
213 } | |
214 if (result == MOJO_RESULT_SHOULD_WAIT) { | |
215 waiter_.reset(new AsyncWaiter( | |
216 handle_.get(), | |
217 MOJO_HANDLE_SIGNAL_READABLE, | |
218 base::Bind(&DataReceiver::OnDoneWaiting, weak_factory_.GetWeakPtr()))); | |
219 return; | |
220 } | |
221 ShutDown(); | |
222 } | |
223 | |
224 bool DataReceiver::CheckErrorNotInReadRange(uint32_t num_bytes) { | |
225 DCHECK(pending_receive_); | |
226 if (!pending_error_) | |
227 return true; | |
228 | |
229 DCHECK_NE(bytes_received_, pending_error_->offset); | |
230 DCHECK_NE(num_bytes, 0u); | |
231 uint32_t potential_bytes_received = bytes_received_ + num_bytes; | |
232 // bytes_received_ can overflow so we must consider two cases: | |
233 // 1. Both |bytes_received_| and |pending_error_->offset| have overflowed an | |
234 // equal number of times. In this case, |potential_bytes_received| must | |
235 // be in the range (|bytes_received|, |pending_error_->offset|]. Below | |
236 // this range can only occur if |bytes_received_| overflows before | |
237 // |pending_error_->offset|. Above can only occur if |bytes_received_| | |
238 // overtakes |pending_error_->offset|. | |
239 // 2. |pending_error_->offset| has overflowed once more than | |
240 // |bytes_received_|. In this case, |potential_bytes_received| must not | |
241 // be in the range (|pending_error_->offset|, |bytes_received_|]. | |
242 if ((bytes_received_ < pending_error_->offset && | |
243 (potential_bytes_received > pending_error_->offset || | |
244 potential_bytes_received <= bytes_received_)) || | |
245 (bytes_received_ > pending_error_->offset && | |
246 potential_bytes_received > pending_error_->offset && | |
247 potential_bytes_received <= bytes_received_)) { | |
248 return false; | |
249 } | |
250 return true; | |
251 } | 191 } |
252 | 192 |
253 void DataReceiver::ShutDown() { | 193 void DataReceiver::ShutDown() { |
254 shut_down_ = true; | 194 shut_down_ = true; |
255 if (pending_receive_) | 195 if (pending_receive_) |
256 pending_receive_->DispatchFatalError(); | 196 pending_receive_->DispatchFatalError(); |
257 pending_error_.reset(); | |
258 waiter_.reset(); | |
259 } | 197 } |
260 | 198 |
261 DataReceiver::PendingReceive::PendingReceive( | 199 DataReceiver::PendingReceive::PendingReceive( |
262 DataReceiver* receiver, | 200 DataReceiver* receiver, |
263 const ReceiveDataCallback& callback, | 201 const ReceiveDataCallback& callback, |
264 const ReceiveErrorCallback& error_callback, | 202 const ReceiveErrorCallback& error_callback, |
265 int32_t fatal_error_value) | 203 int32_t fatal_error_value) |
266 : receiver_(receiver), | 204 : receiver_(receiver), |
267 receive_callback_(callback), | 205 receive_callback_(callback), |
268 receive_error_callback_(error_callback), | 206 receive_error_callback_(error_callback), |
269 fatal_error_value_(fatal_error_value), | 207 fatal_error_value_(fatal_error_value), |
270 buffer_in_use_(false) { | 208 buffer_in_use_(false) { |
271 } | 209 } |
272 | 210 |
273 void DataReceiver::PendingReceive::DispatchData(const void* data, | 211 bool DataReceiver::PendingReceive::DispatchDataFrame( |
274 uint32_t num_bytes) { | 212 DataReceiver::DataFrame* data) { |
275 DCHECK(!buffer_in_use_); | 213 DCHECK(!buffer_in_use_); |
214 DCHECK(!data->dispatched); | |
215 | |
216 if (data->is_error) { | |
217 data->dispatched = true; | |
218 base::MessageLoop::current()->PostTask( | |
219 FROM_HERE, base::Bind(receive_error_callback_, data->error)); | |
220 return true; | |
221 } | |
276 buffer_in_use_ = true; | 222 buffer_in_use_ = true; |
277 receive_callback_.Run(scoped_ptr<ReadOnlyBuffer>( | 223 base::MessageLoop::current()->PostTask( |
278 new Buffer(receiver_, this, static_cast<const char*>(data), num_bytes))); | 224 FROM_HERE, |
279 } | 225 base::Bind( |
280 | 226 receive_callback_, |
281 bool DataReceiver::PendingReceive::DispatchError(PendingError* error, | 227 base::Passed(scoped_ptr<ReadOnlyBuffer>(new Buffer( |
282 uint32_t bytes_received) { | 228 receiver_, |
283 DCHECK(!error->dispatched); | 229 this, |
284 if (buffer_in_use_ || bytes_received != error->offset) | 230 reinterpret_cast<char*>(&data->data[0]) + data->offset, |
285 return false; | 231 static_cast<uint32_t>(data->data.size() - data->offset)))))); |
286 | 232 return false; |
287 error->dispatched = true; | |
288 receive_error_callback_.Run(error->error); | |
289 return true; | |
290 } | 233 } |
291 | 234 |
292 void DataReceiver::PendingReceive::DispatchFatalError() { | 235 void DataReceiver::PendingReceive::DispatchFatalError() { |
293 receive_error_callback_.Run(fatal_error_value_); | 236 receive_error_callback_.Run(fatal_error_value_); |
294 } | 237 } |
295 | 238 |
296 void DataReceiver::PendingReceive::Done(uint32_t bytes_consumed) { | 239 void DataReceiver::PendingReceive::Done(uint32_t bytes_consumed) { |
297 DCHECK(buffer_in_use_); | 240 DCHECK(buffer_in_use_); |
298 buffer_in_use_ = false; | 241 buffer_in_use_ = false; |
299 receiver_->Done(bytes_consumed); | 242 receiver_->Done(bytes_consumed); |
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
331 buffer_size_ = 0; | 274 buffer_size_ = 0; |
332 } | 275 } |
333 | 276 |
334 void DataReceiver::PendingReceive::Buffer::DoneWithError( | 277 void DataReceiver::PendingReceive::Buffer::DoneWithError( |
335 uint32_t bytes_consumed, | 278 uint32_t bytes_consumed, |
336 int32_t error) { | 279 int32_t error) { |
337 Done(bytes_consumed); | 280 Done(bytes_consumed); |
338 } | 281 } |
339 | 282 |
340 } // namespace device | 283 } // namespace device |
OLD | NEW |