Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(171)

Side by Side Diff: device/serial/data_sink_receiver.cc

Issue 646063003: Change data pipe wrappers used by SerialConnection to use message pipe. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: split out bug fix Created 6 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "device/serial/data_sink_receiver.h" 5 #include "device/serial/data_sink_receiver.h"
6 6
7 #include <limits> 7 #include <limits>
8 8
9 #include "base/bind.h" 9 #include "base/bind.h"
10 #include "device/serial/async_waiter.h" 10 #include "base/message_loop/message_loop.h"
11 11
12 namespace device { 12 namespace device {
13 13
14 // Represents a flush of data that has not been completed. 14 // A ReadOnlyBuffer implementation that provides a view of a buffer owned by a
15 class DataSinkReceiver::PendingFlush { 15 // DataSinkReceiver.
16 public:
17 PendingFlush();
18
19 // Initializes this PendingFlush with |num_bytes|, the number of bytes to
20 // flush.
21 void SetNumBytesToFlush(uint32_t num_bytes);
22
23 // Attempts to discard |bytes_to_flush_| bytes from |handle|. Returns
24 // MOJO_RESULT_OK on success, MOJO_RESULT_SHOULD_WAIT if fewer than
25 // |bytes_to_flush_| bytes were flushed or the error if one is encountered
26 // discarding data from |handle|.
27 MojoResult Flush(mojo::DataPipeConsumerHandle handle);
28
29 // Whether this PendingFlush has received the number of bytes to flush.
30 bool received_flush() { return received_flush_; }
31
32 private:
33 // Whether this PendingFlush has received the number of bytes to flush.
34 bool received_flush_;
35
36 // The remaining number of bytes to flush.
37 uint32_t bytes_to_flush_;
38 };
39
40 // A ReadOnlyBuffer implementation that provides a view of a data pipe owned by
41 // a DataSinkReceiver.
42 class DataSinkReceiver::Buffer : public ReadOnlyBuffer { 16 class DataSinkReceiver::Buffer : public ReadOnlyBuffer {
43 public: 17 public:
44 Buffer(scoped_refptr<DataSinkReceiver> receiver, 18 Buffer(scoped_refptr<DataSinkReceiver> receiver,
45 const char* buffer, 19 const char* buffer,
46 uint32_t buffer_size); 20 uint32_t buffer_size);
47 ~Buffer() override; 21 ~Buffer() override;
48 22
49 void Cancel(int32_t error); 23 void Cancel(int32_t error);
50 24
51 // ReadOnlyBuffer overrides. 25 // ReadOnlyBuffer overrides.
52 const char* GetData() override; 26 const char* GetData() override;
53 uint32_t GetSize() override; 27 uint32_t GetSize() override;
54 void Done(uint32_t bytes_read) override; 28 void Done(uint32_t bytes_read) override;
55 void DoneWithError(uint32_t bytes_read, int32_t error) override; 29 void DoneWithError(uint32_t bytes_read, int32_t error) override;
56 30
57 private: 31 private:
58 // The DataSinkReceiver whose data pipe we are providing a view. 32 // The DataSinkReceiver of whose buffer we are providing a view.
59 scoped_refptr<DataSinkReceiver> receiver_; 33 scoped_refptr<DataSinkReceiver> receiver_;
60 34
61 const char* buffer_; 35 const char* buffer_;
62 uint32_t buffer_size_; 36 uint32_t buffer_size_;
63 37
64 // Whether this receive has been cancelled. 38 // Whether this receive has been cancelled.
65 bool cancelled_; 39 bool cancelled_;
66 40
67 // If |cancelled_|, contains the cancellation error to report. 41 // If |cancelled_|, contains the cancellation error to report.
68 int32_t cancellation_error_; 42 int32_t cancellation_error_;
69 }; 43 };
70 44
45 // A frame of data received from the client.
46 class DataSinkReceiver::DataFrame {
47 public:
48 explicit DataFrame(mojo::Array<uint8_t> data);
49
50 // Returns the number of uncomsumed bytes remaining of this data frame.
51 uint32_t GetRemainingBytes();
52
53 // Returns a pointer to the remaining data to be consumed.
54 const char* GetData();
55
56 // Reports that |bytes_read| bytes have been consumed.
57 void OnDataConsumed(uint32_t bytes_read);
58
59 private:
60 mojo::Array<uint8_t> data_;
61 uint32_t offset_;
62 };
63
71 DataSinkReceiver::DataSinkReceiver(const ReadyCallback& ready_callback, 64 DataSinkReceiver::DataSinkReceiver(const ReadyCallback& ready_callback,
72 const CancelCallback& cancel_callback, 65 const CancelCallback& cancel_callback,
73 const ErrorCallback& error_callback) 66 const ErrorCallback& error_callback)
74 : ready_callback_(ready_callback), 67 : ready_callback_(ready_callback),
75 cancel_callback_(cancel_callback), 68 cancel_callback_(cancel_callback),
76 error_callback_(error_callback), 69 error_callback_(error_callback),
70 flush_pending_(false),
77 buffer_in_use_(NULL), 71 buffer_in_use_(NULL),
72 initialized_(false),
73 available_buffer_capacity_(0),
78 shut_down_(false), 74 shut_down_(false),
79 weak_factory_(this) { 75 weak_factory_(this) {
80 } 76 }
81 77
82 void DataSinkReceiver::ShutDown() { 78 void DataSinkReceiver::ShutDown() {
83 shut_down_ = true; 79 shut_down_ = true;
84 if (waiter_)
85 waiter_.reset();
86 } 80 }
87 81
88 DataSinkReceiver::~DataSinkReceiver() { 82 DataSinkReceiver::~DataSinkReceiver() {
89 } 83 }
90 84
91 void DataSinkReceiver::Init(mojo::ScopedDataPipeConsumerHandle handle) { 85 void DataSinkReceiver::Init(uint32_t buffer_size) {
92 if (handle_.is_valid()) { 86 if (initialized_) {
93 DispatchFatalError(); 87 ShutDown();
94 return; 88 return;
95 } 89 }
96 90 initialized_ = true;
97 handle_ = handle.Pass(); 91 available_buffer_capacity_ = buffer_size;
98 StartWaiting();
99 } 92 }
100 93
101 void DataSinkReceiver::Cancel(int32_t error) { 94 void DataSinkReceiver::Cancel(int32_t error) {
102 // If we have sent a ReportBytesSentAndError but have not received the 95 // If we have sent a ReportBytesSentAndError but have not received the
103 // response, that ReportBytesSentAndError message will appear to the 96 // response, that ReportBytesSentAndError message will appear to the
104 // DataSinkClient to be caused by this Cancel message. In that case, we ignore 97 // DataSinkClient to be caused by this Cancel message. In that case, we ignore
105 // the cancel. 98 // the cancel.
106 if (!pending_flushes_.empty() && !pending_flushes_.back()->received_flush()) 99 if (flush_pending_)
107 return; 100 return;
108 101
109 // If there is a buffer is in use, mark the buffer as cancelled and notify the 102 // If there is a buffer is in use, mark the buffer as cancelled and notify the
110 // client by calling |cancel_callback_|. The sink implementation may or may 103 // client by calling |cancel_callback_|. The sink implementation may or may
111 // not take the cancellation into account when deciding what error (if any) to 104 // not take the cancellation into account when deciding what error (if any) to
112 // return. If the sink returns an error, we ignore the cancellation error. 105 // return. If the sink returns an error, we ignore the cancellation error.
113 // Otherwise, if the sink does not report an error, we override that with the 106 // Otherwise, if the sink does not report an error, we override that with the
114 // cancellation error. Once a cancellation has been received, the next report 107 // cancellation error. Once a cancellation has been received, the next report
115 // sent to the client will always contain an error; the error returned by the 108 // sent to the client will always contain an error; the error returned by the
116 // sink or the cancellation error if the sink does not return an error. 109 // sink or the cancellation error if the sink does not return an error.
117 if (buffer_in_use_) { 110 if (buffer_in_use_) {
118 buffer_in_use_->Cancel(error); 111 buffer_in_use_->Cancel(error);
119 if (!cancel_callback_.is_null()) 112 if (!cancel_callback_.is_null())
120 cancel_callback_.Run(error); 113 cancel_callback_.Run(error);
121 return; 114 return;
122 } 115 }
123 // If there is no buffer in use, immediately report the error and cancel the
124 // waiting for the data pipe if one exists. This transitions straight into the
125 // state after the sink has returned an error.
126 waiter_.reset();
127 ReportBytesSentAndError(0, error); 116 ReportBytesSentAndError(0, error);
128 } 117 }
129 118
119 void DataSinkReceiver::OnData(mojo::Array<uint8_t> data) {
120 if (!initialized_) {
121 ShutDown();
122 return;
123 }
124 if (data.size() > available_buffer_capacity_) {
125 ShutDown();
126 return;
127 }
128 available_buffer_capacity_ -= static_cast<uint32_t>(data.size());
129 pending_data_buffers_.push(linked_ptr<DataFrame>(new DataFrame(data.Pass())));
130 if (!buffer_in_use_ && !flush_pending_)
131 RunReadyCallback();
132 }
133
130 void DataSinkReceiver::OnConnectionError() { 134 void DataSinkReceiver::OnConnectionError() {
131 DispatchFatalError(); 135 DispatchFatalError();
132 } 136 }
133 137
134 void DataSinkReceiver::StartWaiting() { 138 void DataSinkReceiver::RunReadyCallback() {
135 DCHECK(!waiter_ && !shut_down_); 139 DCHECK(!shut_down_ && !flush_pending_);
136 waiter_.reset( 140 // If data arrives while a call to RunReadyCallback() is posted, we can be
137 new AsyncWaiter(handle_.get(), 141 // called with buffer_in_use_ already set.
138 MOJO_HANDLE_SIGNAL_READABLE, 142 if (buffer_in_use_)
139 base::Bind(&DataSinkReceiver::OnDoneWaiting, this)));
140 }
141
142 void DataSinkReceiver::OnDoneWaiting(MojoResult result) {
143 DCHECK(waiter_ && !shut_down_);
144 waiter_.reset();
145 if (result != MOJO_RESULT_OK) {
146 DispatchFatalError();
147 return; 143 return;
148 } 144 buffer_in_use_ =
149 // If there are any queued flushes (from ReportBytesSentAndError()), let them 145 new Buffer(this,
150 // flush data from the data pipe. 146 pending_data_buffers_.front()->GetData(),
151 if (!pending_flushes_.empty()) { 147 pending_data_buffers_.front()->GetRemainingBytes());
152 MojoResult result = pending_flushes_.front()->Flush(handle_.get());
153 if (result == MOJO_RESULT_OK) {
154 pending_flushes_.pop();
155 } else if (result != MOJO_RESULT_SHOULD_WAIT) {
156 DispatchFatalError();
157 return;
158 }
159 StartWaiting();
160 return;
161 }
162 const void* data = NULL;
163 uint32_t num_bytes = std::numeric_limits<uint32_t>::max();
164 result = mojo::BeginReadDataRaw(
165 handle_.get(), &data, &num_bytes, MOJO_READ_DATA_FLAG_NONE);
166 if (result != MOJO_RESULT_OK) {
167 DispatchFatalError();
168 return;
169 }
170 buffer_in_use_ = new Buffer(this, static_cast<const char*>(data), num_bytes);
171 ready_callback_.Run(scoped_ptr<ReadOnlyBuffer>(buffer_in_use_)); 148 ready_callback_.Run(scoped_ptr<ReadOnlyBuffer>(buffer_in_use_));
172 } 149 }
173 150
174 void DataSinkReceiver::Done(uint32_t bytes_read) { 151 void DataSinkReceiver::Done(uint32_t bytes_read) {
175 if (!DoneInternal(bytes_read)) 152 if (!DoneInternal(bytes_read))
176 return; 153 return;
177 client()->ReportBytesSent(bytes_read); 154 client()->ReportBytesSent(bytes_read);
178 StartWaiting(); 155 if (!pending_data_buffers_.empty()) {
156 base::MessageLoop::current()->PostTask(
157 FROM_HERE,
158 base::Bind(&DataSinkReceiver::RunReadyCallback,
159 weak_factory_.GetWeakPtr()));
160 }
179 } 161 }
180 162
181 void DataSinkReceiver::DoneWithError(uint32_t bytes_read, int32_t error) { 163 void DataSinkReceiver::DoneWithError(uint32_t bytes_read, int32_t error) {
182 if (!DoneInternal(bytes_read)) 164 if (!DoneInternal(bytes_read))
183 return; 165 return;
184 ReportBytesSentAndError(bytes_read, error); 166 ReportBytesSentAndError(bytes_read, error);
185 } 167 }
186 168
187 bool DataSinkReceiver::DoneInternal(uint32_t bytes_read) { 169 bool DataSinkReceiver::DoneInternal(uint32_t bytes_read) {
188 if (shut_down_) 170 if (shut_down_)
189 return false; 171 return false;
190 172
191 DCHECK(buffer_in_use_); 173 DCHECK(buffer_in_use_);
192 buffer_in_use_ = NULL; 174 buffer_in_use_ = NULL;
193 MojoResult result = mojo::EndReadDataRaw(handle_.get(), bytes_read); 175 available_buffer_capacity_ += bytes_read;
194 if (result != MOJO_RESULT_OK) { 176 pending_data_buffers_.front()->OnDataConsumed(bytes_read);
195 DispatchFatalError(); 177 if (pending_data_buffers_.front()->GetRemainingBytes() == 0)
196 return false; 178 pending_data_buffers_.pop();
197 }
198 return true; 179 return true;
199 } 180 }
200 181
201 void DataSinkReceiver::ReportBytesSentAndError(uint32_t bytes_read, 182 void DataSinkReceiver::ReportBytesSentAndError(uint32_t bytes_read,
202 int32_t error) { 183 int32_t error) {
203 // When we encounter an error, we must discard the data from any sends already 184 // When we encounter an error, we must discard the data from any send buffers
204 // in the data pipe before we can resume dispatching data to the sink. We add 185 // transmitted by the DataSinkClient before it receives this error.
205 // a pending flush here. The response containing the number of bytes to flush 186 flush_pending_ = true;
206 // is handled in SetNumBytesToFlush(). The actual flush is handled in
207 // OnDoneWaiting().
208 pending_flushes_.push(linked_ptr<PendingFlush>(new PendingFlush()));
209 client()->ReportBytesSentAndError( 187 client()->ReportBytesSentAndError(
210 bytes_read, 188 bytes_read,
211 error, 189 error,
212 base::Bind(&DataSinkReceiver::SetNumBytesToFlush, 190 base::Bind(&DataSinkReceiver::DoFlush, weak_factory_.GetWeakPtr()));
213 weak_factory_.GetWeakPtr()));
214 } 191 }
215 192
216 void DataSinkReceiver::SetNumBytesToFlush(uint32_t bytes_to_flush) { 193 void DataSinkReceiver::DoFlush() {
217 DCHECK(!pending_flushes_.empty()); 194 DCHECK(flush_pending_);
218 DCHECK(!pending_flushes_.back()->received_flush()); 195 flush_pending_ = false;
219 pending_flushes_.back()->SetNumBytesToFlush(bytes_to_flush); 196 while (!pending_data_buffers_.empty()) {
220 if (!waiter_) 197 available_buffer_capacity_ +=
221 StartWaiting(); 198 pending_data_buffers_.front()->GetRemainingBytes();
199 pending_data_buffers_.pop();
200 }
222 } 201 }
223 202
224 void DataSinkReceiver::DispatchFatalError() { 203 void DataSinkReceiver::DispatchFatalError() {
225 if (shut_down_) 204 if (shut_down_)
226 return; 205 return;
227 206
228 ShutDown(); 207 ShutDown();
229 if (!error_callback_.is_null()) 208 if (!error_callback_.is_null())
230 error_callback_.Run(); 209 error_callback_.Run();
231 } 210 }
(...skipping 24 matching lines...) Expand all
256 235
257 const char* DataSinkReceiver::Buffer::GetData() { 236 const char* DataSinkReceiver::Buffer::GetData() {
258 return buffer_; 237 return buffer_;
259 } 238 }
260 239
261 uint32_t DataSinkReceiver::Buffer::GetSize() { 240 uint32_t DataSinkReceiver::Buffer::GetSize() {
262 return buffer_size_; 241 return buffer_size_;
263 } 242 }
264 243
265 void DataSinkReceiver::Buffer::Done(uint32_t bytes_read) { 244 void DataSinkReceiver::Buffer::Done(uint32_t bytes_read) {
245 scoped_refptr<DataSinkReceiver> receiver = receiver_;
246 receiver_ = nullptr;
266 if (cancelled_) 247 if (cancelled_)
267 receiver_->DoneWithError(bytes_read, cancellation_error_); 248 receiver->DoneWithError(bytes_read, cancellation_error_);
268 else 249 else
269 receiver_->Done(bytes_read); 250 receiver->Done(bytes_read);
270 receiver_ = NULL;
271 buffer_ = NULL; 251 buffer_ = NULL;
272 buffer_size_ = 0; 252 buffer_size_ = 0;
273 } 253 }
274 254
275 void DataSinkReceiver::Buffer::DoneWithError(uint32_t bytes_read, 255 void DataSinkReceiver::Buffer::DoneWithError(uint32_t bytes_read,
276 int32_t error) { 256 int32_t error) {
277 receiver_->DoneWithError(bytes_read, error); 257 scoped_refptr<DataSinkReceiver> receiver = receiver_;
278 receiver_ = NULL; 258 receiver_ = nullptr;
259 receiver->DoneWithError(bytes_read, error);
279 buffer_ = NULL; 260 buffer_ = NULL;
280 buffer_size_ = 0; 261 buffer_size_ = 0;
281 } 262 }
282 263
283 DataSinkReceiver::PendingFlush::PendingFlush() 264 DataSinkReceiver::DataFrame::DataFrame(mojo::Array<uint8_t> data)
284 : received_flush_(false), bytes_to_flush_(0) { 265 : data_(data.Pass()), offset_(0) {
266 DCHECK_LT(0u, data_.size());
285 } 267 }
286 268
287 void DataSinkReceiver::PendingFlush::SetNumBytesToFlush(uint32_t num_bytes) { 269 // Returns the number of uncomsumed bytes remaining of this data frame.
288 DCHECK(!received_flush_); 270 uint32_t DataSinkReceiver::DataFrame::GetRemainingBytes() {
289 received_flush_ = true; 271 return static_cast<uint32_t>(data_.size() - offset_);
290 bytes_to_flush_ = num_bytes;
291 } 272 }
292 273
293 MojoResult DataSinkReceiver::PendingFlush::Flush( 274 // Returns a pointer to the remaining data to be consumed.
294 mojo::DataPipeConsumerHandle handle) { 275 const char* DataSinkReceiver::DataFrame::GetData() {
295 DCHECK(received_flush_); 276 DCHECK_LT(offset_, data_.size());
296 uint32_t num_bytes = bytes_to_flush_; 277 return reinterpret_cast<const char*>(&data_[0]) + offset_;
297 MojoResult result = 278 }
298 mojo::ReadDataRaw(handle, NULL, &num_bytes, MOJO_READ_DATA_FLAG_DISCARD); 279
299 if (result != MOJO_RESULT_OK) 280 void DataSinkReceiver::DataFrame::OnDataConsumed(uint32_t bytes_read) {
300 return result; 281 offset_ += bytes_read;
301 DCHECK(num_bytes <= bytes_to_flush_); 282 DCHECK_LE(offset_, data_.size());
302 bytes_to_flush_ -= num_bytes;
303 return bytes_to_flush_ == 0 ? MOJO_RESULT_OK : MOJO_RESULT_SHOULD_WAIT;
304 } 283 }
305 284
306 } // namespace device 285 } // namespace device
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698