Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(497)

Side by Side Diff: device/serial/data_receiver.cc

Issue 646063003: Change data pipe wrappers used by SerialConnection to use message pipe. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: address comments Created 6 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "device/serial/data_receiver.h" 5 #include "device/serial/data_receiver.h"
6 6
7 #include <limits> 7 #include <limits>
8 8
9 #include "base/bind.h" 9 #include "base/bind.h"
10 #include "base/message_loop/message_loop.h" 10 #include "base/message_loop/message_loop.h"
11 #include "device/serial/async_waiter.h"
12 11
13 namespace device { 12 namespace device {
14 13
15 // Represents a receive that is not yet fulfilled. 14 // Represents a receive that is not yet fulfilled.
16 class DataReceiver::PendingReceive { 15 class DataReceiver::PendingReceive {
17 public: 16 public:
18 PendingReceive(DataReceiver* receiver, 17 PendingReceive(DataReceiver* receiver,
19 const ReceiveDataCallback& callback, 18 const ReceiveDataCallback& callback,
20 const ReceiveErrorCallback& error_callback, 19 const ReceiveErrorCallback& error_callback,
21 int32_t fatal_error_value); 20 int32_t fatal_error_value);
22 21
23 // Dispatches |data| to |receive_callback_|. 22 // Dispatches |data| to |receive_callback_|.
24 void DispatchData(const void* data, uint32_t num_bytes); 23 bool DispatchDataFrame(DataReceiver::DataFrame* data);
25
26 // Reports |error| to |receive_error_callback_| if it is an appropriate time.
27 // Returns whether it dispatched |error|.
28 bool DispatchError(DataReceiver::PendingError* error,
29 uint32_t bytes_received);
30 24
31 // Reports |fatal_error_value_| to |receive_error_callback_|. 25 // Reports |fatal_error_value_| to |receive_error_callback_|.
32 void DispatchFatalError(); 26 void DispatchFatalError();
33 27
28 bool buffer_in_use() { return buffer_in_use_; }
29
34 private: 30 private:
35 class Buffer; 31 class Buffer;
36 32
37 // Invoked when the user is finished with the ReadOnlyBuffer provided to 33 // Invoked when the user is finished with the ReadOnlyBuffer provided to
38 // |receive_callback_|. 34 // |receive_callback_|.
39 void Done(uint32_t num_bytes); 35 void Done(uint32_t num_bytes);
40 36
41 // The DataReceiver that owns this. 37 // The DataReceiver that owns this.
42 DataReceiver* receiver_; 38 DataReceiver* receiver_;
43 39
44 // The callback to dispatch data. 40 // The callback to dispatch data.
45 ReceiveDataCallback receive_callback_; 41 ReceiveDataCallback receive_callback_;
46 42
47 // The callback to report errors. 43 // The callback to report errors.
48 ReceiveErrorCallback receive_error_callback_; 44 ReceiveErrorCallback receive_error_callback_;
49 45
50 // The error value to report when DispatchFatalError() is called. 46 // The error value to report when DispatchFatalError() is called.
51 const int32_t fatal_error_value_; 47 const int32_t fatal_error_value_;
52 48
53 // True if the user owns a buffer passed to |receive_callback_| as part of 49 // True if the user owns a buffer passed to |receive_callback_| as part of
54 // DispatchData(). 50 // DispatchDataFrame().
55 bool buffer_in_use_; 51 bool buffer_in_use_;
56 }; 52 };
57 53
58 // A ReadOnlyBuffer implementation that provides a view of a data pipe owned by 54 // A ReadOnlyBuffer implementation that provides a view of a buffer owned by a
59 // a DataReceiver. 55 // DataReceiver.
60 class DataReceiver::PendingReceive::Buffer : public ReadOnlyBuffer { 56 class DataReceiver::PendingReceive::Buffer : public ReadOnlyBuffer {
61 public: 57 public:
62 Buffer(scoped_refptr<DataReceiver> pipe, 58 Buffer(scoped_refptr<DataReceiver> pipe,
63 PendingReceive* receive, 59 PendingReceive* receive,
64 const char* buffer, 60 const char* buffer,
65 uint32_t buffer_size); 61 uint32_t buffer_size);
66 virtual ~Buffer(); 62 virtual ~Buffer();
67 63
68 // ReadOnlyBuffer overrides. 64 // ReadOnlyBuffer overrides.
69 virtual const char* GetData() override; 65 virtual const char* GetData() override;
70 virtual uint32_t GetSize() override; 66 virtual uint32_t GetSize() override;
71 virtual void Done(uint32_t bytes_consumed) override; 67 virtual void Done(uint32_t bytes_consumed) override;
72 virtual void DoneWithError(uint32_t bytes_consumed, int32_t error) override; 68 virtual void DoneWithError(uint32_t bytes_consumed, int32_t error) override;
73 69
74 private: 70 private:
75 // The DataReceiver whose data pipe we are providing a view. 71 // The DataReceiver of whose buffer we are providing a view.
76 scoped_refptr<DataReceiver> receiver_; 72 scoped_refptr<DataReceiver> receiver_;
77 73
78 // The PendingReceive to which this buffer has been created in response. 74 // The PendingReceive to which this buffer has been created in response.
79 PendingReceive* pending_receive_; 75 PendingReceive* pending_receive_;
80 76
81 const char* buffer_; 77 const char* buffer_;
82 uint32_t buffer_size_; 78 uint32_t buffer_size_;
83 }; 79 };
84 80
85 // Represents an error received from the DataSource. 81 // A buffer of data or an error received from the DataSource.
86 struct DataReceiver::PendingError { 82 struct DataReceiver::DataFrame {
87 PendingError(uint32_t offset, int32_t error) 83 explicit DataFrame(mojo::Array<uint8_t> data)
88 : offset(offset), error(error), dispatched(false) {} 84 : is_error(false),
85 data(data.Pass()),
86 offset(0),
87 error(0),
88 dispatched(false) {}
89 89
90 // The location within the data stream where the error occurred. 90 explicit DataFrame(int32_t error)
91 const uint32_t offset; 91 : is_error(true), offset(0), error(error), dispatched(false) {}
92
93 // Whether this DataFrame represents an error.
94 bool is_error;
95
96 // The data received from the DataSource.
97 mojo::Array<uint8_t> data;
98
99 // The offset within |data| at which the next read should begin.
100 uint32_t offset;
92 101
93 // The value of the error that occurred. 102 // The value of the error that occurred.
94 const int32_t error; 103 const int32_t error;
95 104
96 // Whether the error has been dispatched to the user. 105 // Whether the error has been dispatched to the user.
97 bool dispatched; 106 bool dispatched;
98 }; 107 };
99 108
100 DataReceiver::DataReceiver(mojo::InterfacePtr<serial::DataSource> source, 109 DataReceiver::DataReceiver(mojo::InterfacePtr<serial::DataSource> source,
101 uint32_t buffer_size, 110 uint32_t buffer_size,
102 int32_t fatal_error_value) 111 int32_t fatal_error_value)
103 : source_(source.Pass()), 112 : source_(source.Pass()),
104 fatal_error_value_(fatal_error_value), 113 fatal_error_value_(fatal_error_value),
105 bytes_received_(0),
106 shut_down_(false), 114 shut_down_(false),
107 weak_factory_(this) { 115 weak_factory_(this) {
108 MojoCreateDataPipeOptions options = {
109 sizeof(options), MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1, buffer_size,
110 };
111 mojo::ScopedDataPipeProducerHandle remote_handle;
112 MojoResult result = mojo::CreateDataPipe(&options, &remote_handle, &handle_);
113 DCHECK_EQ(MOJO_RESULT_OK, result);
114 source_->Init(remote_handle.Pass());
115 source_.set_client(this); 116 source_.set_client(this);
117 source_.set_error_handler(this);
118 source_->Init(buffer_size);
116 } 119 }
117 120
118 bool DataReceiver::Receive(const ReceiveDataCallback& callback, 121 bool DataReceiver::Receive(const ReceiveDataCallback& callback,
119 const ReceiveErrorCallback& error_callback) { 122 const ReceiveErrorCallback& error_callback) {
120 DCHECK(!callback.is_null() && !error_callback.is_null()); 123 DCHECK(!callback.is_null() && !error_callback.is_null());
121 if (pending_receive_ || shut_down_) 124 if (pending_receive_ || shut_down_)
122 return false; 125 return false;
123 // When the DataSource encounters an error, it pauses transmission. When the 126 // When the DataSource encounters an error, it pauses transmission. When the
124 // user starts a new receive following notification of the error (via 127 // user starts a new receive following notification of the error (via
125 // |error_callback| of the previous Receive call) of the error we can tell the 128 // |error_callback| of the previous Receive call) of the error we can tell the
126 // DataSource to resume transmission of data. 129 // DataSource to resume transmission of data.
127 if (pending_error_ && pending_error_->dispatched) { 130 if (!pending_data_buffers_.empty() &&
131 pending_data_buffers_.front()->is_error &&
132 pending_data_buffers_.front()->dispatched) {
128 source_->Resume(); 133 source_->Resume();
129 pending_error_.reset(); 134 pending_data_buffers_.pop();
130 } 135 }
131 136
132 pending_receive_.reset( 137 pending_receive_.reset(
133 new PendingReceive(this, callback, error_callback, fatal_error_value_)); 138 new PendingReceive(this, callback, error_callback, fatal_error_value_));
134 base::MessageLoop::current()->PostTask( 139 ReceiveInternal();
135 FROM_HERE,
136 base::Bind(&DataReceiver::ReceiveInternal, weak_factory_.GetWeakPtr()));
137 return true; 140 return true;
138 } 141 }
139 142
140 DataReceiver::~DataReceiver() { 143 DataReceiver::~DataReceiver() {
141 ShutDown(); 144 ShutDown();
142 } 145 }
143 146
144 void DataReceiver::OnError(uint32_t offset, int32_t error) { 147 void DataReceiver::OnError(int32_t error) {
145 if (shut_down_) 148 if (shut_down_)
146 return; 149 return;
147 150
148 if (pending_error_) { 151 pending_data_buffers_.push(linked_ptr<DataFrame>(new DataFrame(error)));
149 // When OnError is called by the DataSource, transmission of data is 152 if (pending_receive_)
150 // suspended. Thus we shouldn't receive another call to OnError until we 153 ReceiveInternal();
151 // have fully dealt with the error and called Resume to resume transmission 154 }
152 // (see Receive()). Under normal operation we should never get here, but if 155
153 // we do (e.g. in the case of a hijacked service process) just shut down. 156 void DataReceiver::OnData(mojo::Array<uint8_t> data) {
154 ShutDown(); 157 pending_data_buffers_.push(linked_ptr<DataFrame>(new DataFrame(data.Pass())));
155 return; 158 if (pending_receive_)
156 } 159 ReceiveInternal();
157 pending_error_.reset(new PendingError(offset, error));
158 if (pending_receive_ &&
159 pending_receive_->DispatchError(pending_error_.get(), bytes_received_)) {
160 pending_receive_.reset();
161 waiter_.reset();
162 }
163 } 160 }
164 161
165 void DataReceiver::OnConnectionError() { 162 void DataReceiver::OnConnectionError() {
166 ShutDown(); 163 ShutDown();
167 } 164 }
168 165
169 void DataReceiver::Done(uint32_t bytes_consumed) { 166 void DataReceiver::Done(uint32_t bytes_consumed) {
170 if (shut_down_) 167 if (shut_down_)
171 return; 168 return;
172 169
173 DCHECK(pending_receive_); 170 DCHECK(pending_receive_);
174 MojoResult result = mojo::EndReadDataRaw(handle_.get(), bytes_consumed); 171 DataFrame& pending_data = *pending_data_buffers_.front();
175 DCHECK_EQ(MOJO_RESULT_OK, result); 172 pending_data.offset += bytes_consumed;
173 DCHECK_LE(pending_data.offset, pending_data.data.size());
174 if (pending_data.offset == pending_data.data.size()) {
175 source_->ReportBytesSent(pending_data.data.size());
176 pending_data_buffers_.pop();
177 }
176 pending_receive_.reset(); 178 pending_receive_.reset();
177 bytes_received_ += bytes_consumed;
178 }
179
180 void DataReceiver::OnDoneWaiting(MojoResult result) {
181 DCHECK(pending_receive_ && !shut_down_ && waiter_);
182 waiter_.reset();
183 if (result != MOJO_RESULT_OK) {
184 ShutDown();
185 return;
186 }
187 ReceiveInternal();
188 } 179 }
189 180
190 void DataReceiver::ReceiveInternal() { 181 void DataReceiver::ReceiveInternal() {
191 if (shut_down_) 182 if (shut_down_)
192 return; 183 return;
193 DCHECK(pending_receive_); 184 DCHECK(pending_receive_);
194 if (pending_error_ && 185 if (pending_receive_->buffer_in_use())
195 pending_receive_->DispatchError(pending_error_.get(), bytes_received_)) { 186 return;
187
188 if (!pending_data_buffers_.empty() &&
189 pending_receive_->DispatchDataFrame(
190 pending_data_buffers_.front().get())) {
196 pending_receive_.reset(); 191 pending_receive_.reset();
197 waiter_.reset();
198 return;
199 } 192 }
200
201 const void* data;
202 uint32_t num_bytes = std::numeric_limits<uint32_t>::max();
203 MojoResult result = mojo::BeginReadDataRaw(
204 handle_.get(), &data, &num_bytes, MOJO_READ_DATA_FLAG_NONE);
205 if (result == MOJO_RESULT_OK) {
206 if (!CheckErrorNotInReadRange(num_bytes)) {
207 ShutDown();
208 return;
209 }
210
211 pending_receive_->DispatchData(data, num_bytes);
212 return;
213 }
214 if (result == MOJO_RESULT_SHOULD_WAIT) {
215 waiter_.reset(new AsyncWaiter(
216 handle_.get(),
217 MOJO_HANDLE_SIGNAL_READABLE,
218 base::Bind(&DataReceiver::OnDoneWaiting, weak_factory_.GetWeakPtr())));
219 return;
220 }
221 ShutDown();
222 }
223
224 bool DataReceiver::CheckErrorNotInReadRange(uint32_t num_bytes) {
225 DCHECK(pending_receive_);
226 if (!pending_error_)
227 return true;
228
229 DCHECK_NE(bytes_received_, pending_error_->offset);
230 DCHECK_NE(num_bytes, 0u);
231 uint32_t potential_bytes_received = bytes_received_ + num_bytes;
232 // bytes_received_ can overflow so we must consider two cases:
233 // 1. Both |bytes_received_| and |pending_error_->offset| have overflowed an
234 // equal number of times. In this case, |potential_bytes_received| must
235 // be in the range (|bytes_received|, |pending_error_->offset|]. Below
236 // this range can only occur if |bytes_received_| overflows before
237 // |pending_error_->offset|. Above can only occur if |bytes_received_|
238 // overtakes |pending_error_->offset|.
239 // 2. |pending_error_->offset| has overflowed once more than
240 // |bytes_received_|. In this case, |potential_bytes_received| must not
241 // be in the range (|pending_error_->offset|, |bytes_received_|].
242 if ((bytes_received_ < pending_error_->offset &&
243 (potential_bytes_received > pending_error_->offset ||
244 potential_bytes_received <= bytes_received_)) ||
245 (bytes_received_ > pending_error_->offset &&
246 potential_bytes_received > pending_error_->offset &&
247 potential_bytes_received <= bytes_received_)) {
248 return false;
249 }
250 return true;
251 } 193 }
252 194
253 void DataReceiver::ShutDown() { 195 void DataReceiver::ShutDown() {
254 shut_down_ = true; 196 shut_down_ = true;
255 if (pending_receive_) 197 if (pending_receive_)
256 pending_receive_->DispatchFatalError(); 198 pending_receive_->DispatchFatalError();
257 pending_error_.reset();
258 waiter_.reset();
259 } 199 }
260 200
261 DataReceiver::PendingReceive::PendingReceive( 201 DataReceiver::PendingReceive::PendingReceive(
262 DataReceiver* receiver, 202 DataReceiver* receiver,
263 const ReceiveDataCallback& callback, 203 const ReceiveDataCallback& callback,
264 const ReceiveErrorCallback& error_callback, 204 const ReceiveErrorCallback& error_callback,
265 int32_t fatal_error_value) 205 int32_t fatal_error_value)
266 : receiver_(receiver), 206 : receiver_(receiver),
267 receive_callback_(callback), 207 receive_callback_(callback),
268 receive_error_callback_(error_callback), 208 receive_error_callback_(error_callback),
269 fatal_error_value_(fatal_error_value), 209 fatal_error_value_(fatal_error_value),
270 buffer_in_use_(false) { 210 buffer_in_use_(false) {
271 } 211 }
272 212
273 void DataReceiver::PendingReceive::DispatchData(const void* data, 213 bool DataReceiver::PendingReceive::DispatchDataFrame(
274 uint32_t num_bytes) { 214 DataReceiver::DataFrame* data) {
275 DCHECK(!buffer_in_use_); 215 DCHECK(!buffer_in_use_);
216 DCHECK(!data->dispatched);
217
218 if (data->is_error) {
219 data->dispatched = true;
220 base::MessageLoop::current()->PostTask(
221 FROM_HERE, base::Bind(receive_error_callback_, data->error));
222 return true;
223 }
276 buffer_in_use_ = true; 224 buffer_in_use_ = true;
277 receive_callback_.Run(scoped_ptr<ReadOnlyBuffer>( 225 base::MessageLoop::current()->PostTask(
278 new Buffer(receiver_, this, static_cast<const char*>(data), num_bytes))); 226 FROM_HERE,
279 } 227 base::Bind(
280 228 receive_callback_,
281 bool DataReceiver::PendingReceive::DispatchError(PendingError* error, 229 base::Passed(scoped_ptr<ReadOnlyBuffer>(new Buffer(
282 uint32_t bytes_received) { 230 receiver_,
283 DCHECK(!error->dispatched); 231 this,
284 if (buffer_in_use_ || bytes_received != error->offset) 232 reinterpret_cast<char*>(&data->data[0]) + data->offset,
285 return false; 233 static_cast<uint32_t>(data->data.size() - data->offset))))));
286 234 return false;
287 error->dispatched = true;
288 receive_error_callback_.Run(error->error);
289 return true;
290 } 235 }
291 236
292 void DataReceiver::PendingReceive::DispatchFatalError() { 237 void DataReceiver::PendingReceive::DispatchFatalError() {
293 receive_error_callback_.Run(fatal_error_value_); 238 receive_error_callback_.Run(fatal_error_value_);
294 } 239 }
295 240
296 void DataReceiver::PendingReceive::Done(uint32_t bytes_consumed) { 241 void DataReceiver::PendingReceive::Done(uint32_t bytes_consumed) {
297 DCHECK(buffer_in_use_); 242 DCHECK(buffer_in_use_);
298 buffer_in_use_ = false; 243 buffer_in_use_ = false;
299 receiver_->Done(bytes_consumed); 244 receiver_->Done(bytes_consumed);
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
331 buffer_size_ = 0; 276 buffer_size_ = 0;
332 } 277 }
333 278
334 void DataReceiver::PendingReceive::Buffer::DoneWithError( 279 void DataReceiver::PendingReceive::Buffer::DoneWithError(
335 uint32_t bytes_consumed, 280 uint32_t bytes_consumed,
336 int32_t error) { 281 int32_t error) {
337 Done(bytes_consumed); 282 Done(bytes_consumed);
338 } 283 }
339 284
340 } // namespace device 285 } // namespace device
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698