Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(44)

Side by Side Diff: device/serial/data_sink_receiver.cc

Issue 646063003: Change data pipe wrappers used by SerialConnection to use message pipe. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 6 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "device/serial/data_sink_receiver.h" 5 #include "device/serial/data_sink_receiver.h"
6 6
7 #include <limits> 7 #include <limits>
8 8
9 #include "base/bind.h" 9 #include "base/bind.h"
10 #include "device/serial/async_waiter.h" 10 #include "base/message_loop/message_loop.h"
11 11
12 namespace device { 12 namespace device {
13 13
14 // Represents a flush of data that has not been completed.
15 class DataSinkReceiver::PendingFlush {
16 public:
17 PendingFlush();
18
19 // Initializes this PendingFlush with |num_bytes|, the number of bytes to
20 // flush.
21 void SetNumBytesToFlush(uint32_t num_bytes);
22
23 // Attempts to discard |bytes_to_flush_| bytes from |handle|. Returns
24 // MOJO_RESULT_OK on success, MOJO_RESULT_SHOULD_WAIT if fewer than
25 // |bytes_to_flush_| bytes were flushed or the error if one is encountered
26 // discarding data from |handle|.
27 MojoResult Flush(mojo::DataPipeConsumerHandle handle);
28
29 // Whether this PendingFlush has received the number of bytes to flush.
30 bool received_flush() { return received_flush_; }
31
32 private:
33 // Whether this PendingFlush has received the number of bytes to flush.
34 bool received_flush_;
35
36 // The remaining number of bytes to flush.
37 uint32_t bytes_to_flush_;
38 };
39
40 // A ReadOnlyBuffer implementation that provides a view of a data pipe owned by 14 // A ReadOnlyBuffer implementation that provides a view of a data pipe owned by
41 // a DataSinkReceiver. 15 // a DataSinkReceiver.
42 class DataSinkReceiver::Buffer : public ReadOnlyBuffer { 16 class DataSinkReceiver::Buffer : public ReadOnlyBuffer {
43 public: 17 public:
44 Buffer(scoped_refptr<DataSinkReceiver> receiver, 18 Buffer(scoped_refptr<DataSinkReceiver> receiver,
45 const char* buffer, 19 const char* buffer,
46 uint32_t buffer_size); 20 uint32_t buffer_size);
47 virtual ~Buffer(); 21 virtual ~Buffer();
48 22
49 void Cancel(int32_t error); 23 void Cancel(int32_t error);
(...skipping 17 matching lines...) Expand all
67 // If |cancelled_|, contains the cancellation error to report. 41 // If |cancelled_|, contains the cancellation error to report.
68 int32_t cancellation_error_; 42 int32_t cancellation_error_;
69 }; 43 };
70 44
71 DataSinkReceiver::DataSinkReceiver(const ReadyCallback& ready_callback, 45 DataSinkReceiver::DataSinkReceiver(const ReadyCallback& ready_callback,
72 const CancelCallback& cancel_callback, 46 const CancelCallback& cancel_callback,
73 const ErrorCallback& error_callback) 47 const ErrorCallback& error_callback)
74 : ready_callback_(ready_callback), 48 : ready_callback_(ready_callback),
75 cancel_callback_(cancel_callback), 49 cancel_callback_(cancel_callback),
76 error_callback_(error_callback), 50 error_callback_(error_callback),
51 flush_pending_(false),
77 buffer_in_use_(NULL), 52 buffer_in_use_(NULL),
53 waiting_for_data_(true),
54 initialized_(false),
55 available_buffer_capacity_(0),
56 pending_data_offset_(0),
78 shut_down_(false), 57 shut_down_(false),
79 weak_factory_(this) { 58 weak_factory_(this) {
80 } 59 }
81 60
82 void DataSinkReceiver::ShutDown() { 61 void DataSinkReceiver::ShutDown() {
83 shut_down_ = true; 62 shut_down_ = true;
84 if (waiter_)
85 waiter_.reset();
86 } 63 }
87 64
88 DataSinkReceiver::~DataSinkReceiver() { 65 DataSinkReceiver::~DataSinkReceiver() {
89 } 66 }
90 67
91 void DataSinkReceiver::Init(mojo::ScopedDataPipeConsumerHandle handle) { 68 void DataSinkReceiver::Init(uint32_t buffer_size) {
92 if (handle_.is_valid()) { 69 if (initialized_) {
93 DispatchFatalError(); 70 ShutDown();
94 return; 71 return;
95 } 72 }
96 73 initialized_ = true;
97 handle_ = handle.Pass(); 74 available_buffer_capacity_ = buffer_size;
98 StartWaiting();
99 } 75 }
100 76
101 void DataSinkReceiver::Cancel(int32_t error) { 77 void DataSinkReceiver::Cancel(int32_t error) {
102 // If we have sent a ReportBytesSentAndError but have not received the 78 // If we have sent a ReportBytesSentAndError but have not received the
103 // response, that ReportBytesSentAndError message will appear to the 79 // response, that ReportBytesSentAndError message will appear to the
104 // DataSinkClient to be caused by this Cancel message. In that case, we ignore 80 // DataSinkClient to be caused by this Cancel message. In that case, we ignore
105 // the cancel. 81 // the cancel.
106 if (!pending_flushes_.empty() && !pending_flushes_.back()->received_flush()) 82 if (flush_pending_)
107 return; 83 return;
108 84
109 // If there is a buffer is in use, mark the buffer as cancelled and notify the 85 // If there is a buffer is in use, mark the buffer as cancelled and notify the
110 // client by calling |cancel_callback_|. The sink implementation may or may 86 // client by calling |cancel_callback_|. The sink implementation may or may
111 // not take the cancellation into account when deciding what error (if any) to 87 // not take the cancellation into account when deciding what error (if any) to
112 // return. If the sink returns an error, we ignore the cancellation error. 88 // return. If the sink returns an error, we ignore the cancellation error.
113 // Otherwise, if the sink does not report an error, we override that with the 89 // Otherwise, if the sink does not report an error, we override that with the
114 // cancellation error. Once a cancellation has been received, the next report 90 // cancellation error. Once a cancellation has been received, the next report
115 // sent to the client will always contain an error; the error returned by the 91 // sent to the client will always contain an error; the error returned by the
116 // sink or the cancellation error if the sink does not return an error. 92 // sink or the cancellation error if the sink does not return an error.
117 if (buffer_in_use_) { 93 if (buffer_in_use_) {
118 buffer_in_use_->Cancel(error); 94 buffer_in_use_->Cancel(error);
119 if (!cancel_callback_.is_null()) 95 if (!cancel_callback_.is_null())
120 cancel_callback_.Run(error); 96 cancel_callback_.Run(error);
121 return; 97 return;
122 } 98 }
123 // If there is no buffer in use, immediately report the error and cancel the
124 // waiting for the data pipe if one exists. This transitions straight into the
125 // state after the sink has returned an error.
126 waiter_.reset();
127 ReportBytesSentAndError(0, error); 99 ReportBytesSentAndError(0, error);
128 } 100 }
129 101
102 void DataSinkReceiver::AcceptData(mojo::Array<uint8_t> data) {
103 if (flush_pending_)
104 return;
raymes 2014/10/17 03:10:12 Should this ever happen in normal operation? We sh
Sam McNally 2014/10/20 05:12:59 It can occur if the error notification and some da
105 if (data.size() > available_buffer_capacity_) {
106 ShutDown();
107 return;
108 }
109 available_buffer_capacity_ -= static_cast<uint32_t>(data.size());
110 pending_data_buffers_.push(
111 linked_ptr<mojo::Array<uint8>>(new mojo::Array<uint8>(data.Pass())));
112 if (waiting_for_data_ && !buffer_in_use_)
raymes 2014/10/17 03:10:12 Hmm I can't work out what waiting_for_data_ is for
Sam McNally 2014/10/20 05:12:59 Removed it.
113 CheckForData();
114 }
115
130 void DataSinkReceiver::OnConnectionError() { 116 void DataSinkReceiver::OnConnectionError() {
131 DispatchFatalError(); 117 DispatchFatalError();
132 } 118 }
133 119
134 void DataSinkReceiver::StartWaiting() { 120 void DataSinkReceiver::CheckForData() {
raymes 2014/10/17 03:10:12 I feel like there might be a more descriptive name
Sam McNally 2014/10/20 05:12:59 Done.
135 DCHECK(!waiter_ && !shut_down_); 121 DCHECK(!shut_down_ && !buffer_in_use_ && !flush_pending_);
136 waiter_.reset( 122 if (pending_data_buffers_.empty()) {
137 new AsyncWaiter(handle_.get(), 123 waiting_for_data_ = true;
138 MOJO_HANDLE_SIGNAL_READABLE,
139 base::Bind(&DataSinkReceiver::OnDoneWaiting, this)));
140 }
141
142 void DataSinkReceiver::OnDoneWaiting(MojoResult result) {
143 DCHECK(waiter_ && !shut_down_);
144 waiter_.reset();
145 if (result != MOJO_RESULT_OK) {
146 DispatchFatalError();
147 return; 124 return;
148 } 125 }
149 // If there are any queued flushes (from ReportBytesSentAndError()), let them 126 DCHECK_LT(pending_data_offset_, pending_data_buffers_.front()->size());
150 // flush data from the data pipe. 127 buffer_in_use_ = new Buffer(
151 if (!pending_flushes_.empty()) { 128 this,
152 MojoResult result = pending_flushes_.front()->Flush(handle_.get()); 129 reinterpret_cast<const char*>(&(*pending_data_buffers_.front())[0]),
153 if (result == MOJO_RESULT_OK) { 130 static_cast<uint32_t>(pending_data_buffers_.front()->size() -
154 pending_flushes_.pop(); 131 pending_data_offset_));
155 } else if (result != MOJO_RESULT_SHOULD_WAIT) {
156 DispatchFatalError();
157 return;
158 }
159 StartWaiting();
160 return;
161 }
162 const void* data = NULL;
163 uint32_t num_bytes = std::numeric_limits<uint32_t>::max();
164 result = mojo::BeginReadDataRaw(
165 handle_.get(), &data, &num_bytes, MOJO_READ_DATA_FLAG_NONE);
166 if (result != MOJO_RESULT_OK) {
167 DispatchFatalError();
168 return;
169 }
170 buffer_in_use_ = new Buffer(this, static_cast<const char*>(data), num_bytes);
171 ready_callback_.Run(scoped_ptr<ReadOnlyBuffer>(buffer_in_use_)); 132 ready_callback_.Run(scoped_ptr<ReadOnlyBuffer>(buffer_in_use_));
172 } 133 }
173 134
174 void DataSinkReceiver::Done(uint32_t bytes_read) { 135 void DataSinkReceiver::Done(uint32_t bytes_read) {
175 if (!DoneInternal(bytes_read)) 136 if (!DoneInternal(bytes_read))
176 return; 137 return;
177 client()->ReportBytesSent(bytes_read); 138 client()->ReportBytesSent(bytes_read);
178 StartWaiting(); 139 waiting_for_data_ = false;
140 base::MessageLoop::current()->PostTask(
141 FROM_HERE,
142 base::Bind(&DataSinkReceiver::CheckForData, weak_factory_.GetWeakPtr()));
179 } 143 }
180 144
181 void DataSinkReceiver::DoneWithError(uint32_t bytes_read, int32_t error) { 145 void DataSinkReceiver::DoneWithError(uint32_t bytes_read, int32_t error) {
182 if (!DoneInternal(bytes_read)) 146 if (!DoneInternal(bytes_read))
183 return; 147 return;
184 ReportBytesSentAndError(bytes_read, error); 148 ReportBytesSentAndError(bytes_read, error);
185 } 149 }
186 150
187 bool DataSinkReceiver::DoneInternal(uint32_t bytes_read) { 151 bool DataSinkReceiver::DoneInternal(uint32_t bytes_read) {
188 if (shut_down_) 152 if (shut_down_)
189 return false; 153 return false;
190 154
191 DCHECK(buffer_in_use_); 155 DCHECK(buffer_in_use_);
192 buffer_in_use_ = NULL; 156 buffer_in_use_ = NULL;
193 MojoResult result = mojo::EndReadDataRaw(handle_.get(), bytes_read); 157 available_buffer_capacity_ += bytes_read;
194 if (result != MOJO_RESULT_OK) { 158 pending_data_offset_ += bytes_read;
raymes 2014/10/17 03:10:12 We could use a small struct for pending data buffe
Sam McNally 2014/10/20 05:12:59 Done.
195 DispatchFatalError(); 159 if (pending_data_offset_ == pending_data_buffers_.front()->size()) {
196 return false; 160 pending_data_buffers_.pop();
161 pending_data_offset_ = 0;
197 } 162 }
198 return true; 163 return true;
199 } 164 }
200 165
201 void DataSinkReceiver::ReportBytesSentAndError(uint32_t bytes_read, 166 void DataSinkReceiver::ReportBytesSentAndError(uint32_t bytes_read,
202 int32_t error) { 167 int32_t error) {
203 // When we encounter an error, we must discard the data from any sends already 168 // When we encounter an error, we must discard the data from any sends
raymes 2014/10/17 03:10:12 sends->send ?
Sam McNally 2014/10/20 05:12:59 Done.
204 // in the data pipe before we can resume dispatching data to the sink. We add 169 // buffers transmitted by the DataSinkClient before it receives this error.
205 // a pending flush here. The response containing the number of bytes to flush 170 flush_pending_ = true;
206 // is handled in SetNumBytesToFlush(). The actual flush is handled in
207 // OnDoneWaiting().
208 pending_flushes_.push(linked_ptr<PendingFlush>(new PendingFlush()));
209 client()->ReportBytesSentAndError( 171 client()->ReportBytesSentAndError(
210 bytes_read, 172 bytes_read,
211 error, 173 error,
212 base::Bind(&DataSinkReceiver::SetNumBytesToFlush, 174 base::Bind(&DataSinkReceiver::DoFlush, weak_factory_.GetWeakPtr()));
213 weak_factory_.GetWeakPtr()));
214 } 175 }
215 176
216 void DataSinkReceiver::SetNumBytesToFlush(uint32_t bytes_to_flush) { 177 void DataSinkReceiver::DoFlush() {
217 DCHECK(!pending_flushes_.empty()); 178 DCHECK(flush_pending_);
218 DCHECK(!pending_flushes_.back()->received_flush()); 179 flush_pending_ = false;
219 pending_flushes_.back()->SetNumBytesToFlush(bytes_to_flush); 180 while (!pending_data_buffers_.empty()) {
220 if (!waiter_) 181 available_buffer_capacity_ += static_cast<uint32_t>(
221 StartWaiting(); 182 pending_data_buffers_.front()->size() - pending_data_offset_);
183 pending_data_buffers_.pop();
184 pending_data_offset_ = 0;
185 }
186 waiting_for_data_ = true;
222 } 187 }
223 188
224 void DataSinkReceiver::DispatchFatalError() { 189 void DataSinkReceiver::DispatchFatalError() {
225 if (shut_down_) 190 if (shut_down_)
226 return; 191 return;
227 192
228 ShutDown(); 193 ShutDown();
229 if (!error_callback_.is_null()) 194 if (!error_callback_.is_null())
230 error_callback_.Run(); 195 error_callback_.Run();
231 } 196 }
(...skipping 24 matching lines...) Expand all
256 221
257 const char* DataSinkReceiver::Buffer::GetData() { 222 const char* DataSinkReceiver::Buffer::GetData() {
258 return buffer_; 223 return buffer_;
259 } 224 }
260 225
261 uint32_t DataSinkReceiver::Buffer::GetSize() { 226 uint32_t DataSinkReceiver::Buffer::GetSize() {
262 return buffer_size_; 227 return buffer_size_;
263 } 228 }
264 229
265 void DataSinkReceiver::Buffer::Done(uint32_t bytes_read) { 230 void DataSinkReceiver::Buffer::Done(uint32_t bytes_read) {
231 scoped_refptr<DataSinkReceiver> receiver = receiver_;
232 receiver_ = nullptr;
266 if (cancelled_) 233 if (cancelled_)
267 receiver_->DoneWithError(bytes_read, cancellation_error_); 234 receiver->DoneWithError(bytes_read, cancellation_error_);
268 else 235 else
269 receiver_->Done(bytes_read); 236 receiver->Done(bytes_read);
270 receiver_ = NULL;
271 buffer_ = NULL; 237 buffer_ = NULL;
272 buffer_size_ = 0; 238 buffer_size_ = 0;
273 } 239 }
274 240
275 void DataSinkReceiver::Buffer::DoneWithError(uint32_t bytes_read, 241 void DataSinkReceiver::Buffer::DoneWithError(uint32_t bytes_read,
276 int32_t error) { 242 int32_t error) {
277 receiver_->DoneWithError(bytes_read, error); 243 scoped_refptr<DataSinkReceiver> receiver = receiver_;
278 receiver_ = NULL; 244 receiver_ = nullptr;
245 receiver->DoneWithError(bytes_read, error);
279 buffer_ = NULL; 246 buffer_ = NULL;
280 buffer_size_ = 0; 247 buffer_size_ = 0;
281 } 248 }
282 249
283 DataSinkReceiver::PendingFlush::PendingFlush()
284 : received_flush_(false), bytes_to_flush_(0) {
285 }
286
287 void DataSinkReceiver::PendingFlush::SetNumBytesToFlush(uint32_t num_bytes) {
288 DCHECK(!received_flush_);
289 received_flush_ = true;
290 bytes_to_flush_ = num_bytes;
291 }
292
293 MojoResult DataSinkReceiver::PendingFlush::Flush(
294 mojo::DataPipeConsumerHandle handle) {
295 DCHECK(received_flush_);
296 uint32_t num_bytes = bytes_to_flush_;
297 MojoResult result =
298 mojo::ReadDataRaw(handle, NULL, &num_bytes, MOJO_READ_DATA_FLAG_DISCARD);
299 if (result != MOJO_RESULT_OK)
300 return result;
301 DCHECK(num_bytes <= bytes_to_flush_);
302 bytes_to_flush_ -= num_bytes;
303 return bytes_to_flush_ == 0 ? MOJO_RESULT_OK : MOJO_RESULT_SHOULD_WAIT;
304 }
305
306 } // namespace device 250 } // namespace device
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698