Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(76)

Side by Side Diff: device/serial/data_sink_receiver.cc

Issue 646063003: Change data pipe wrappers used by SerialConnection to use message pipe. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: address comments Created 6 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "device/serial/data_sink_receiver.h" 5 #include "device/serial/data_sink_receiver.h"
6 6
7 #include <limits> 7 #include <limits>
8 8
9 #include "base/bind.h" 9 #include "base/bind.h"
10 #include "device/serial/async_waiter.h" 10 #include "base/message_loop/message_loop.h"
11 11
12 namespace device { 12 namespace device {
13 13
14 // Represents a flush of data that has not been completed. 14 // A ReadOnlyBuffer implementation that provides a view of a buffer owned by a
15 class DataSinkReceiver::PendingFlush { 15 // DataSinkReceiver.
16 public:
17 PendingFlush();
18
19 // Initializes this PendingFlush with |num_bytes|, the number of bytes to
20 // flush.
21 void SetNumBytesToFlush(uint32_t num_bytes);
22
23 // Attempts to discard |bytes_to_flush_| bytes from |handle|. Returns
24 // MOJO_RESULT_OK on success, MOJO_RESULT_SHOULD_WAIT if fewer than
25 // |bytes_to_flush_| bytes were flushed or the error if one is encountered
26 // discarding data from |handle|.
27 MojoResult Flush(mojo::DataPipeConsumerHandle handle);
28
29 // Whether this PendingFlush has received the number of bytes to flush.
30 bool received_flush() { return received_flush_; }
31
32 private:
33 // Whether this PendingFlush has received the number of bytes to flush.
34 bool received_flush_;
35
36 // The remaining number of bytes to flush.
37 uint32_t bytes_to_flush_;
38 };
39
40 // A ReadOnlyBuffer implementation that provides a view of a data pipe owned by
41 // a DataSinkReceiver.
42 class DataSinkReceiver::Buffer : public ReadOnlyBuffer { 16 class DataSinkReceiver::Buffer : public ReadOnlyBuffer {
43 public: 17 public:
44 Buffer(scoped_refptr<DataSinkReceiver> receiver, 18 Buffer(scoped_refptr<DataSinkReceiver> receiver,
45 const char* buffer, 19 const char* buffer,
46 uint32_t buffer_size); 20 uint32_t buffer_size);
47 virtual ~Buffer(); 21 virtual ~Buffer();
48 22
49 void Cancel(int32_t error); 23 void Cancel(int32_t error);
50 24
51 // ReadOnlyBuffer overrides. 25 // ReadOnlyBuffer overrides.
52 virtual const char* GetData() override; 26 virtual const char* GetData() override;
53 virtual uint32_t GetSize() override; 27 virtual uint32_t GetSize() override;
54 virtual void Done(uint32_t bytes_read) override; 28 virtual void Done(uint32_t bytes_read) override;
55 virtual void DoneWithError(uint32_t bytes_read, int32_t error) override; 29 virtual void DoneWithError(uint32_t bytes_read, int32_t error) override;
56 30
57 private: 31 private:
58 // The DataSinkReceiver whose data pipe we are providing a view. 32 // The DataSinkReceiver of whose buffer we are providing a view.
59 scoped_refptr<DataSinkReceiver> receiver_; 33 scoped_refptr<DataSinkReceiver> receiver_;
60 34
61 const char* buffer_; 35 const char* buffer_;
62 uint32_t buffer_size_; 36 uint32_t buffer_size_;
63 37
64 // Whether this receive has been cancelled. 38 // Whether this receive has been cancelled.
65 bool cancelled_; 39 bool cancelled_;
66 40
67 // If |cancelled_|, contains the cancellation error to report. 41 // If |cancelled_|, contains the cancellation error to report.
68 int32_t cancellation_error_; 42 int32_t cancellation_error_;
69 }; 43 };
70 44
45 // A frame of data received from the client.
46 class DataSinkReceiver::DataFrame {
47 public:
48 explicit DataFrame(mojo::Array<uint8_t> data);
49
50 // Returns the number of uncomsumed bytes remaining of this data frame.
51 uint32_t remaining_bytes() {
52 return static_cast<uint32_t>(data_.size() - offset_);
53 }
54
55 // Returns a pointer to the remaining data to be consumed.
56 const char* data() {
57 DCHECK_LT(offset_, data_.size());
58 return reinterpret_cast<const char*>(&data_[0]) + offset_;
59 }
raymes 2014/10/27 03:02:22 nit: since we're not inlining OnDataConsumed we mi
Sam McNally 2014/10/27 05:39:14 Done.
60
61 // Reports that |bytes_read| bytes have been consumed.
62 void OnDataConsumed(uint32_t bytes_read);
63
64 private:
65 mojo::Array<uint8_t> data_;
66 uint32_t offset_;
67 };
68
71 DataSinkReceiver::DataSinkReceiver(const ReadyCallback& ready_callback, 69 DataSinkReceiver::DataSinkReceiver(const ReadyCallback& ready_callback,
72 const CancelCallback& cancel_callback, 70 const CancelCallback& cancel_callback,
73 const ErrorCallback& error_callback) 71 const ErrorCallback& error_callback)
74 : ready_callback_(ready_callback), 72 : ready_callback_(ready_callback),
75 cancel_callback_(cancel_callback), 73 cancel_callback_(cancel_callback),
76 error_callback_(error_callback), 74 error_callback_(error_callback),
75 flush_pending_(false),
77 buffer_in_use_(NULL), 76 buffer_in_use_(NULL),
77 initialized_(false),
78 available_buffer_capacity_(0),
78 shut_down_(false), 79 shut_down_(false),
79 weak_factory_(this) { 80 weak_factory_(this) {
80 } 81 }
81 82
82 void DataSinkReceiver::ShutDown() { 83 void DataSinkReceiver::ShutDown() {
83 shut_down_ = true; 84 shut_down_ = true;
84 if (waiter_)
85 waiter_.reset();
86 } 85 }
87 86
88 DataSinkReceiver::~DataSinkReceiver() { 87 DataSinkReceiver::~DataSinkReceiver() {
89 } 88 }
90 89
91 void DataSinkReceiver::Init(mojo::ScopedDataPipeConsumerHandle handle) { 90 void DataSinkReceiver::Init(uint32_t buffer_size) {
92 if (handle_.is_valid()) { 91 if (initialized_) {
93 DispatchFatalError(); 92 ShutDown();
94 return; 93 return;
95 } 94 }
96 95 initialized_ = true;
97 handle_ = handle.Pass(); 96 available_buffer_capacity_ = buffer_size;
98 StartWaiting();
99 } 97 }
100 98
101 void DataSinkReceiver::Cancel(int32_t error) { 99 void DataSinkReceiver::Cancel(int32_t error) {
102 // If we have sent a ReportBytesSentAndError but have not received the 100 // If we have sent a ReportBytesSentAndError but have not received the
103 // response, that ReportBytesSentAndError message will appear to the 101 // response, that ReportBytesSentAndError message will appear to the
104 // DataSinkClient to be caused by this Cancel message. In that case, we ignore 102 // DataSinkClient to be caused by this Cancel message. In that case, we ignore
105 // the cancel. 103 // the cancel.
106 if (!pending_flushes_.empty() && !pending_flushes_.back()->received_flush()) 104 if (flush_pending_)
107 return; 105 return;
108 106
109 // If there is a buffer is in use, mark the buffer as cancelled and notify the 107 // If there is a buffer is in use, mark the buffer as cancelled and notify the
110 // client by calling |cancel_callback_|. The sink implementation may or may 108 // client by calling |cancel_callback_|. The sink implementation may or may
111 // not take the cancellation into account when deciding what error (if any) to 109 // not take the cancellation into account when deciding what error (if any) to
112 // return. If the sink returns an error, we ignore the cancellation error. 110 // return. If the sink returns an error, we ignore the cancellation error.
113 // Otherwise, if the sink does not report an error, we override that with the 111 // Otherwise, if the sink does not report an error, we override that with the
114 // cancellation error. Once a cancellation has been received, the next report 112 // cancellation error. Once a cancellation has been received, the next report
115 // sent to the client will always contain an error; the error returned by the 113 // sent to the client will always contain an error; the error returned by the
116 // sink or the cancellation error if the sink does not return an error. 114 // sink or the cancellation error if the sink does not return an error.
117 if (buffer_in_use_) { 115 if (buffer_in_use_) {
118 buffer_in_use_->Cancel(error); 116 buffer_in_use_->Cancel(error);
119 if (!cancel_callback_.is_null()) 117 if (!cancel_callback_.is_null())
120 cancel_callback_.Run(error); 118 cancel_callback_.Run(error);
121 return; 119 return;
122 } 120 }
123 // If there is no buffer in use, immediately report the error and cancel the
124 // waiting for the data pipe if one exists. This transitions straight into the
125 // state after the sink has returned an error.
126 waiter_.reset();
127 ReportBytesSentAndError(0, error); 121 ReportBytesSentAndError(0, error);
128 } 122 }
129 123
124 void DataSinkReceiver::OnData(mojo::Array<uint8_t> data) {
125 if (!initialized_) {
126 ShutDown();
127 return;
128 }
129 if (data.size() > available_buffer_capacity_) {
130 ShutDown();
131 return;
132 }
133 available_buffer_capacity_ -= static_cast<uint32_t>(data.size());
134 pending_data_buffers_.push(linked_ptr<DataFrame>(new DataFrame(data.Pass())));
135 if (!buffer_in_use_ && !flush_pending_)
136 RunReadyCallback();
137 }
138
130 void DataSinkReceiver::OnConnectionError() { 139 void DataSinkReceiver::OnConnectionError() {
131 DispatchFatalError(); 140 DispatchFatalError();
132 } 141 }
133 142
134 void DataSinkReceiver::StartWaiting() { 143 void DataSinkReceiver::RunReadyCallback() {
135 DCHECK(!waiter_ && !shut_down_); 144 DCHECK(!shut_down_ && !flush_pending_);
136 waiter_.reset( 145 // If data arrives while a call to RunReadyCallback() is posted, we can be
137 new AsyncWaiter(handle_.get(), 146 // called with buffer_in_use_ already set.
138 MOJO_HANDLE_SIGNAL_READABLE, 147 if (buffer_in_use_)
139 base::Bind(&DataSinkReceiver::OnDoneWaiting, this)));
140 }
141
142 void DataSinkReceiver::OnDoneWaiting(MojoResult result) {
143 DCHECK(waiter_ && !shut_down_);
144 waiter_.reset();
145 if (result != MOJO_RESULT_OK) {
146 DispatchFatalError();
147 return; 148 return;
148 } 149 buffer_in_use_ = new Buffer(this,
149 // If there are any queued flushes (from ReportBytesSentAndError()), let them 150 pending_data_buffers_.front()->data(),
150 // flush data from the data pipe. 151 pending_data_buffers_.front()->remaining_bytes());
151 if (!pending_flushes_.empty()) {
152 MojoResult result = pending_flushes_.front()->Flush(handle_.get());
153 if (result == MOJO_RESULT_OK) {
154 pending_flushes_.pop();
155 } else if (result != MOJO_RESULT_SHOULD_WAIT) {
156 DispatchFatalError();
157 return;
158 }
159 StartWaiting();
160 return;
161 }
162 const void* data = NULL;
163 uint32_t num_bytes = std::numeric_limits<uint32_t>::max();
164 result = mojo::BeginReadDataRaw(
165 handle_.get(), &data, &num_bytes, MOJO_READ_DATA_FLAG_NONE);
166 if (result != MOJO_RESULT_OK) {
167 DispatchFatalError();
168 return;
169 }
170 buffer_in_use_ = new Buffer(this, static_cast<const char*>(data), num_bytes);
171 ready_callback_.Run(scoped_ptr<ReadOnlyBuffer>(buffer_in_use_)); 152 ready_callback_.Run(scoped_ptr<ReadOnlyBuffer>(buffer_in_use_));
172 } 153 }
173 154
174 void DataSinkReceiver::Done(uint32_t bytes_read) { 155 void DataSinkReceiver::Done(uint32_t bytes_read) {
175 if (!DoneInternal(bytes_read)) 156 if (!DoneInternal(bytes_read))
176 return; 157 return;
177 client()->ReportBytesSent(bytes_read); 158 client()->ReportBytesSent(bytes_read);
178 StartWaiting(); 159 if (!pending_data_buffers_.empty()) {
160 base::MessageLoop::current()->PostTask(
161 FROM_HERE,
162 base::Bind(&DataSinkReceiver::RunReadyCallback,
163 weak_factory_.GetWeakPtr()));
164 }
179 } 165 }
180 166
181 void DataSinkReceiver::DoneWithError(uint32_t bytes_read, int32_t error) { 167 void DataSinkReceiver::DoneWithError(uint32_t bytes_read, int32_t error) {
182 if (!DoneInternal(bytes_read)) 168 if (!DoneInternal(bytes_read))
183 return; 169 return;
184 ReportBytesSentAndError(bytes_read, error); 170 ReportBytesSentAndError(bytes_read, error);
185 } 171 }
186 172
187 bool DataSinkReceiver::DoneInternal(uint32_t bytes_read) { 173 bool DataSinkReceiver::DoneInternal(uint32_t bytes_read) {
188 if (shut_down_) 174 if (shut_down_)
189 return false; 175 return false;
190 176
191 DCHECK(buffer_in_use_); 177 DCHECK(buffer_in_use_);
192 buffer_in_use_ = NULL; 178 buffer_in_use_ = NULL;
193 MojoResult result = mojo::EndReadDataRaw(handle_.get(), bytes_read); 179 available_buffer_capacity_ += bytes_read;
194 if (result != MOJO_RESULT_OK) { 180 pending_data_buffers_.front()->OnDataConsumed(bytes_read);
195 DispatchFatalError(); 181 if (pending_data_buffers_.front()->remaining_bytes() == 0)
196 return false; 182 pending_data_buffers_.pop();
197 }
198 return true; 183 return true;
199 } 184 }
200 185
201 void DataSinkReceiver::ReportBytesSentAndError(uint32_t bytes_read, 186 void DataSinkReceiver::ReportBytesSentAndError(uint32_t bytes_read,
202 int32_t error) { 187 int32_t error) {
203 // When we encounter an error, we must discard the data from any sends already 188 // When we encounter an error, we must discard the data from any send buffers
204 // in the data pipe before we can resume dispatching data to the sink. We add 189 // transmitted by the DataSinkClient before it receives this error.
205 // a pending flush here. The response containing the number of bytes to flush 190 flush_pending_ = true;
206 // is handled in SetNumBytesToFlush(). The actual flush is handled in
207 // OnDoneWaiting().
208 pending_flushes_.push(linked_ptr<PendingFlush>(new PendingFlush()));
209 client()->ReportBytesSentAndError( 191 client()->ReportBytesSentAndError(
210 bytes_read, 192 bytes_read,
211 error, 193 error,
212 base::Bind(&DataSinkReceiver::SetNumBytesToFlush, 194 base::Bind(&DataSinkReceiver::DoFlush, weak_factory_.GetWeakPtr()));
213 weak_factory_.GetWeakPtr()));
214 } 195 }
215 196
216 void DataSinkReceiver::SetNumBytesToFlush(uint32_t bytes_to_flush) { 197 void DataSinkReceiver::DoFlush() {
217 DCHECK(!pending_flushes_.empty()); 198 DCHECK(flush_pending_);
218 DCHECK(!pending_flushes_.back()->received_flush()); 199 flush_pending_ = false;
219 pending_flushes_.back()->SetNumBytesToFlush(bytes_to_flush); 200 while (!pending_data_buffers_.empty()) {
220 if (!waiter_) 201 available_buffer_capacity_ +=
221 StartWaiting(); 202 pending_data_buffers_.front()->remaining_bytes();
203 pending_data_buffers_.pop();
204 }
222 } 205 }
223 206
224 void DataSinkReceiver::DispatchFatalError() { 207 void DataSinkReceiver::DispatchFatalError() {
225 if (shut_down_) 208 if (shut_down_)
226 return; 209 return;
227 210
228 ShutDown(); 211 ShutDown();
229 if (!error_callback_.is_null()) 212 if (!error_callback_.is_null())
230 error_callback_.Run(); 213 error_callback_.Run();
231 } 214 }
(...skipping 24 matching lines...) Expand all
256 239
257 const char* DataSinkReceiver::Buffer::GetData() { 240 const char* DataSinkReceiver::Buffer::GetData() {
258 return buffer_; 241 return buffer_;
259 } 242 }
260 243
261 uint32_t DataSinkReceiver::Buffer::GetSize() { 244 uint32_t DataSinkReceiver::Buffer::GetSize() {
262 return buffer_size_; 245 return buffer_size_;
263 } 246 }
264 247
265 void DataSinkReceiver::Buffer::Done(uint32_t bytes_read) { 248 void DataSinkReceiver::Buffer::Done(uint32_t bytes_read) {
249 scoped_refptr<DataSinkReceiver> receiver = receiver_;
250 receiver_ = nullptr;
266 if (cancelled_) 251 if (cancelled_)
267 receiver_->DoneWithError(bytes_read, cancellation_error_); 252 receiver->DoneWithError(bytes_read, cancellation_error_);
268 else 253 else
269 receiver_->Done(bytes_read); 254 receiver->Done(bytes_read);
270 receiver_ = NULL;
271 buffer_ = NULL; 255 buffer_ = NULL;
272 buffer_size_ = 0; 256 buffer_size_ = 0;
273 } 257 }
274 258
275 void DataSinkReceiver::Buffer::DoneWithError(uint32_t bytes_read, 259 void DataSinkReceiver::Buffer::DoneWithError(uint32_t bytes_read,
276 int32_t error) { 260 int32_t error) {
277 receiver_->DoneWithError(bytes_read, error); 261 scoped_refptr<DataSinkReceiver> receiver = receiver_;
278 receiver_ = NULL; 262 receiver_ = nullptr;
263 receiver->DoneWithError(bytes_read, error);
279 buffer_ = NULL; 264 buffer_ = NULL;
280 buffer_size_ = 0; 265 buffer_size_ = 0;
281 } 266 }
282 267
283 DataSinkReceiver::PendingFlush::PendingFlush() 268 DataSinkReceiver::DataFrame::DataFrame(mojo::Array<uint8_t> data)
284 : received_flush_(false), bytes_to_flush_(0) { 269 : data_(data.Pass()), offset_(0) {
270 DCHECK_LT(0u, data_.size());
285 } 271 }
286 272
287 void DataSinkReceiver::PendingFlush::SetNumBytesToFlush(uint32_t num_bytes) { 273 void DataSinkReceiver::DataFrame::OnDataConsumed(uint32_t bytes_read) {
288 DCHECK(!received_flush_); 274 offset_ += bytes_read;
289 received_flush_ = true; 275 DCHECK_LE(offset_, data_.size());
290 bytes_to_flush_ = num_bytes;
291 }
292
293 MojoResult DataSinkReceiver::PendingFlush::Flush(
294 mojo::DataPipeConsumerHandle handle) {
295 DCHECK(received_flush_);
296 uint32_t num_bytes = bytes_to_flush_;
297 MojoResult result =
298 mojo::ReadDataRaw(handle, NULL, &num_bytes, MOJO_READ_DATA_FLAG_DISCARD);
299 if (result != MOJO_RESULT_OK)
300 return result;
301 DCHECK(num_bytes <= bytes_to_flush_);
302 bytes_to_flush_ -= num_bytes;
303 return bytes_to_flush_ == 0 ? MOJO_RESULT_OK : MOJO_RESULT_SHOULD_WAIT;
304 } 276 }
305 277
306 } // namespace device 278 } // namespace device
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698