Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(171)

Side by Side Diff: device/serial/data_receiver.cc

Issue 437933002: Add data pipe wrappers to be used to implement serial receive. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@serial-buffer
Patch Set: address comments Created 6 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "device/serial/data_receiver.h"
6
7 #include <limits>
8
9 #include "base/bind.h"
10 #include "base/message_loop/message_loop.h"
11 #include "device/serial/async_waiter.h"
12
13 namespace device {
14
15 class DataReceiver::PendingReceive {
raymes 2014/08/06 05:46:43 Please add some comments for these helper classes
Sam McNally 2014/08/06 08:28:14 Done.
16 public:
17 PendingReceive(DataReceiver* receiver,
18 const ReceiveDataCallback& callback,
19 const ReceiveErrorCallback& error_callback,
20 int32_t fatal_error_value);
21 void DispatchData(const void* data, uint32_t num_bytes);
22 bool DispatchError(DataReceiver::PendingError* error,
23 uint32_t bytes_received);
24 void DispatchFatalError();
25
26 private:
27 class Buffer;
raymes 2014/08/06 05:46:43 since this class is only used in the .cc file, you
Sam McNally 2014/08/06 08:28:14 It's nested so it can see DataReceiver::PendingRec
28 void Done(uint32_t num_bytes);
29
30 DataReceiver* receiver_;
31 ReceiveDataCallback receive_callback_;
32 ReceiveErrorCallback receive_error_callback_;
33 const int32_t fatal_error_value_;
34 bool buffer_in_use_;
35 };
36
37 class DataReceiver::PendingReceive::Buffer : public ReadOnlyBuffer {
38 public:
39 Buffer(scoped_refptr<DataReceiver> pipe,
40 PendingReceive* receive,
41 const char* buffer,
42 uint32_t buffer_size);
43 virtual ~Buffer();
44 virtual const char* GetData() OVERRIDE;
45 virtual uint32_t GetSize() OVERRIDE;
46 virtual void Done(uint32_t bytes_consumed) OVERRIDE;
47 virtual void DoneWithError(uint32_t bytes_consumed, int32_t error) OVERRIDE;
48
49 private:
50 scoped_refptr<DataReceiver> receiver_;
51 PendingReceive* receive_;
52 const char* buffer_;
53 uint32_t buffer_size_;
54 };
55
56 struct DataReceiver::PendingError {
57 PendingError(uint32_t offset, int32_t error)
58 : offset(offset), error(error), dispatched(false) {}
59
60 const uint32_t offset;
61 const int32_t error;
62 bool dispatched;
63 };
64
65 DataReceiver::DataReceiver(mojo::InterfacePtr<serial::DataSource> source,
66 uint32_t buffer_size,
67 int32_t fatal_error_value)
68 : source_(source.Pass()),
69 fatal_error_value_(fatal_error_value),
70 bytes_received_(0),
71 shut_down_(false),
72 weak_factory_(this) {
73 MojoCreateDataPipeOptions options = {
74 sizeof(options), MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1, buffer_size,
75 };
76 mojo::ScopedDataPipeProducerHandle remote_handle;
77 MojoResult result = mojo::CreateDataPipe(&options, &remote_handle, &handle_);
78 DCHECK_EQ(MOJO_RESULT_OK, result);
79 source_->Init(remote_handle.Pass());
80 source_.set_client(this);
81 }
82
83 bool DataReceiver::Receive(const ReceiveDataCallback& callback,
84 const ReceiveErrorCallback& error_callback) {
85 DCHECK(!callback.is_null() && !error_callback.is_null());
86 if (receive_ || shut_down_)
raymes 2014/08/06 05:46:43 probably pending_receive_ is a more descriptive na
Sam McNally 2014/08/06 08:28:14 Done.
87 return false;
88 if (error_ && error_->dispatched) {
raymes 2014/08/06 05:46:43 similarly, pending_error_
Sam McNally 2014/08/06 08:28:14 Done.
89 source_->Resume();
raymes 2014/08/06 05:46:43 A comment here describing what's going on would be
Sam McNally 2014/08/06 08:28:14 Done.
90 error_.reset();
91 }
92
93 receive_.reset(
94 new PendingReceive(this, callback, error_callback, fatal_error_value_));
95 base::MessageLoop::current()->PostTask(
96 FROM_HERE,
97 base::Bind(&DataReceiver::ReceiveInternal, weak_factory_.GetWeakPtr()));
98 return true;
99 }
100
101 DataReceiver::~DataReceiver() {
102 ShutDown();
103 }
104
105 void DataReceiver::OnError(uint32_t offset, int32_t error) {
106 if (shut_down_)
107 return;
108
109 if (error_) {
110 OnConnectionError();
raymes 2014/08/06 05:46:43 Change this to call ShutDown instead of OnConnecti
Sam McNally 2014/08/06 08:28:14 Done.
111 return;
112 }
113 error_.reset(new PendingError(offset, error));
114 if (receive_ && receive_->DispatchError(error_.get(), bytes_received_)) {
115 receive_.reset();
116 waiter_.reset();
117 }
118 }
119
120 void DataReceiver::OnConnectionError() {
121 ShutDown();
122 }
123
124 void DataReceiver::Done(uint32_t bytes_consumed) {
125 if (shut_down_)
126 return;
127
128 DCHECK(receive_);
129 MojoResult result = mojo::EndReadDataRaw(handle_.get(), bytes_consumed);
130 DCHECK_EQ(MOJO_RESULT_OK, result);
131 receive_.reset();
132 bytes_received_ += bytes_consumed;
133 }
134
135 void DataReceiver::OnDoneWaiting(MojoResult result) {
136 DCHECK(receive_ && !shut_down_);
raymes 2014/08/06 05:46:43 we can DCHECK waiter_ and cal waiter_.reset() here
Sam McNally 2014/08/06 08:28:14 Done.
137 if (result != MOJO_RESULT_OK) {
138 ShutDown();
139 return;
140 }
141 ReceiveInternal();
142 }
143
144 void DataReceiver::ReceiveInternal() {
145 if (shut_down_)
146 return;
147 DCHECK(receive_);
148 if (error_ && receive_->DispatchError(error_.get(), bytes_received_)) {
149 receive_.reset();
150 waiter_.reset();
151 return;
152 }
153
154 const void* data;
155 uint32_t num_bytes = std::numeric_limits<uint32_t>::max();
156 MojoResult result = mojo::BeginReadDataRaw(
157 handle_.get(), &data, &num_bytes, MOJO_READ_DATA_FLAG_NONE);
158 if (result == MOJO_RESULT_OK) {
159 if (error_ && !CheckBytesReceived(num_bytes)) {
raymes 2014/08/06 05:46:43 Maybe rename this function to something like Check
Sam McNally 2014/08/06 08:28:14 Done.
160 ShutDown();
161 return;
162 }
163
164 receive_->DispatchData(data, num_bytes);
165 return;
166 }
167 if (result == MOJO_RESULT_SHOULD_WAIT) {
168 waiter_.reset(new AsyncWaiter(
169 handle_.get(),
170 MOJO_HANDLE_SIGNAL_READABLE,
171 base::Bind(&DataReceiver::OnDoneWaiting, weak_factory_.GetWeakPtr())));
172 return;
173 }
174 ShutDown();
175 }
176
177 bool DataReceiver::CheckBytesReceived(uint32_t num_bytes) {
178 DCHECK(receive_);
179 DCHECK_NE(bytes_received_, error_->offset);
180 uint32_t potential_bytes_received = bytes_received_ + num_bytes;
181 if ((bytes_received_ < error_->offset &&
182 (potential_bytes_received > error_->offset ||
183 potential_bytes_received < bytes_received_)) ||
184 (bytes_received_ > error_->offset &&
185 potential_bytes_received > error_->offset &&
186 potential_bytes_received < bytes_received_)) {
187 return false;
188 }
raymes 2014/08/06 05:46:43 Mention the fact that this is complicated because
Sam McNally 2014/08/06 08:28:14 Done.
189 return true;
190 }
191
192 void DataReceiver::ShutDown() {
193 shut_down_ = true;
194 if (receive_)
195 receive_->DispatchFatalError();
196 error_.reset();
197 waiter_.reset();
198 }
199
200 DataReceiver::PendingReceive::PendingReceive(
201 DataReceiver* receiver,
202 const ReceiveDataCallback& callback,
203 const ReceiveErrorCallback& error_callback,
204 int32_t fatal_error_value)
205 : receiver_(receiver),
206 receive_callback_(callback),
207 receive_error_callback_(error_callback),
208 fatal_error_value_(fatal_error_value),
209 buffer_in_use_(false) {
210 }
211
212 void DataReceiver::PendingReceive::DispatchData(const void* data,
213 uint32_t num_bytes) {
214 DCHECK(!buffer_in_use_);
215 buffer_in_use_ = true;
216 receive_callback_.Run(scoped_ptr<ReadOnlyBuffer>(
217 new Buffer(receiver_, this, static_cast<const char*>(data), num_bytes)));
218 }
219
220 bool DataReceiver::PendingReceive::DispatchError(PendingError* error,
221 uint32_t bytes_received) {
222 if (buffer_in_use_ || bytes_received != error->offset || error->dispatched)
raymes 2014/08/06 05:46:43 Probably if error->dispatched is already true, we
Sam McNally 2014/08/06 08:28:14 I don't think this should be called if error->disp
223 return false;
224
225 error->dispatched = true;
226 receive_error_callback_.Run(error->error);
227 return true;
228 }
229
230 void DataReceiver::PendingReceive::DispatchFatalError() {
231 receive_error_callback_.Run(fatal_error_value_);
232 }
233
234 void DataReceiver::PendingReceive::Done(uint32_t bytes_consumed) {
235 DCHECK(buffer_in_use_);
236 buffer_in_use_ = false;
237 receiver_->Done(bytes_consumed);
238 }
239
240 DataReceiver::PendingReceive::Buffer::Buffer(
241 scoped_refptr<DataReceiver> receiver,
242 PendingReceive* receive,
243 const char* buffer,
244 uint32_t buffer_size)
245 : receiver_(receiver),
246 receive_(receive),
247 buffer_(buffer),
248 buffer_size_(buffer_size) {
249 }
250
251 DataReceiver::PendingReceive::Buffer::~Buffer() {
252 if (receive_)
253 receive_->Done(0);
254 }
255
256 const char* DataReceiver::PendingReceive::Buffer::GetData() {
257 return buffer_;
258 }
259
260 uint32_t DataReceiver::PendingReceive::Buffer::GetSize() {
261 return buffer_size_;
262 }
263
264 void DataReceiver::PendingReceive::Buffer::Done(uint32_t bytes_consumed) {
265 receive_->Done(bytes_consumed);
266 receive_ = NULL;
267 receiver_ = NULL;
268 buffer_ = NULL;
269 buffer_size_ = 0;
270 }
271
272 void DataReceiver::PendingReceive::Buffer::DoneWithError(
273 uint32_t bytes_consumed,
274 int32_t error) {
275 Done(bytes_consumed);
276 }
277
278 } // namespace device
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698