Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1939)

Side by Side Diff: device/serial/data_sender.cc

Issue 646063003: Change data pipe wrappers used by SerialConnection to use message pipe. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 6 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "device/serial/data_sender.h" 5 #include "device/serial/data_sender.h"
6 6
7 #include <algorithm>
8
7 #include "base/bind.h" 9 #include "base/bind.h"
8 #include "base/message_loop/message_loop.h" 10 #include "base/message_loop/message_loop.h"
9 #include "device/serial/async_waiter.h"
10 11
11 namespace device { 12 namespace device {
12 13
13 // Represents a send that is not yet fulfilled. 14 // Represents a send that is not yet fulfilled.
14 class DataSender::PendingSend { 15 class DataSender::PendingSend {
15 public: 16 public:
16 PendingSend(const base::StringPiece& data, 17 PendingSend(const base::StringPiece& data,
17 const DataSentCallback& callback, 18 const DataSentCallback& callback,
18 const SendErrorCallback& error_callback, 19 const SendErrorCallback& error_callback,
19 int32_t fatal_error_value); 20 int32_t fatal_error_value);
20 21
21 // Invoked to report that |num_bytes| of data have been sent. Subtracts the 22 // Invoked to report that |num_bytes| of data have been sent. Subtracts the
22 // number of bytes that were part of this send from |num_bytes|. Returns 23 // number of bytes that were part of this send from |num_bytes|. Returns
23 // whether this send has been completed. If this send has been completed, this 24 // whether this send has been completed. If this send has been completed, this
24 // calls |callback_|. 25 // calls |callback_|.
25 bool ReportBytesSent(uint32_t* num_bytes); 26 bool ReportBytesSent(uint32_t* num_bytes);
26 27
27 // Invoked to report that |num_bytes| of data have been sent and then an 28 // Invoked to report that |num_bytes| of data have been sent and then an
28 // error, |error| was encountered. Subtracts the number of bytes that were 29 // error, |error| was encountered. Subtracts the number of bytes that were
29 // part of this send from |num_bytes|. If this send was not completed before 30 // part of this send from |num_bytes|. If this send was not completed before
30 // the error, this calls |error_callback_| to report the error. Otherwise, 31 // the error, this calls |error_callback_| to report the error. Otherwise,
31 // this calls |callback_|. Returns the number of bytes sent but not acked. 32 // this calls |callback_|. Returns the number of bytes sent but not acked.
32 uint32_t ReportBytesSentAndError(uint32_t* num_bytes, int32_t error); 33 uint32_t ReportBytesSentAndError(uint32_t* num_bytes, int32_t error);
33 34
34 // Reports |fatal_error_value_| to |receive_error_callback_|. 35 // Reports |fatal_error_value_| to |receive_error_callback_|.
35 void DispatchFatalError(); 36 void DispatchFatalError();
36 37
37 // Attempts to send any data not yet sent to |handle|. Returns MOJO_RESULT_OK 38 // Attempts to send any data not yet sent to |sink|.
38 // if all data is sent, MOJO_RESULT_SHOULD_WAIT if not all of the data is sent 39 bool SendData(serial::DataSink* sink, uint32_t* capacity);
39 // or the error if one is encountered writing to |handle|.
40 MojoResult SendData(mojo::DataPipeProducerHandle handle);
41 40
42 private: 41 private:
43 // Invoked to update |bytes_acked_| and |num_bytes|. 42 // Invoked to update |bytes_acked_| and |num_bytes|.
44 void ReportBytesSentInternal(uint32_t* num_bytes); 43 void ReportBytesSentInternal(uint32_t* num_bytes);
45 44
46 // The data to send. 45 // The data to send.
47 const base::StringPiece data_; 46 const base::StringPiece data_;
48 47
49 // The callback to report success. 48 // The callback to report success.
50 const DataSentCallback callback_; 49 const DataSentCallback callback_;
51 50
52 // The callback to report errors. 51 // The callback to report errors.
53 const SendErrorCallback error_callback_; 52 const SendErrorCallback error_callback_;
54 53
55 // The error value to report when DispatchFatalError() is called. 54 // The error value to report when DispatchFatalError() is called.
56 const int32_t fatal_error_value_; 55 const int32_t fatal_error_value_;
57 56
58 // The number of bytes sent to the data pipe. 57 // The number of bytes sent to the data pipe.
59 uint32_t bytes_sent_; 58 uint32_t bytes_sent_;
60 59
61 // The number of bytes acked. 60 // The number of bytes acked.
62 uint32_t bytes_acked_; 61 uint32_t bytes_acked_;
63 }; 62 };
64 63
65 DataSender::DataSender(mojo::InterfacePtr<serial::DataSink> sink, 64 DataSender::DataSender(mojo::InterfacePtr<serial::DataSink> sink,
66 uint32_t buffer_size, 65 uint32_t buffer_size,
67 int32_t fatal_error_value) 66 int32_t fatal_error_value)
68 : sink_(sink.Pass()), 67 : sink_(sink.Pass()),
69 fatal_error_value_(fatal_error_value), 68 fatal_error_value_(fatal_error_value),
69 available_buffer_capacity_(buffer_size),
70 shut_down_(false) { 70 shut_down_(false) {
71 sink_.set_error_handler(this); 71 sink_.set_error_handler(this);
72 MojoCreateDataPipeOptions options = {
73 sizeof(options), MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1, buffer_size,
74 };
75 options.struct_size = sizeof(options);
76 mojo::ScopedDataPipeConsumerHandle remote_handle;
77 MojoResult result = mojo::CreateDataPipe(&options, &handle_, &remote_handle);
78 DCHECK_EQ(MOJO_RESULT_OK, result);
79 sink_->Init(remote_handle.Pass());
80 sink_.set_client(this); 72 sink_.set_client(this);
73 sink_->Init(buffer_size);
81 } 74 }
82 75
83 DataSender::~DataSender() { 76 DataSender::~DataSender() {
84 ShutDown(); 77 ShutDown();
85 } 78 }
86 79
87 bool DataSender::Send(const base::StringPiece& data, 80 bool DataSender::Send(const base::StringPiece& data,
88 const DataSentCallback& callback, 81 const DataSentCallback& callback,
89 const SendErrorCallback& error_callback) { 82 const SendErrorCallback& error_callback) {
90 DCHECK(!callback.is_null() && !error_callback.is_null()); 83 DCHECK(!callback.is_null() && !error_callback.is_null());
(...skipping 17 matching lines...) Expand all
108 101
109 pending_cancel_ = callback; 102 pending_cancel_ = callback;
110 sink_->Cancel(error); 103 sink_->Cancel(error);
111 return true; 104 return true;
112 } 105 }
113 106
114 void DataSender::ReportBytesSent(uint32_t bytes_sent) { 107 void DataSender::ReportBytesSent(uint32_t bytes_sent) {
115 if (shut_down_) 108 if (shut_down_)
116 return; 109 return;
117 110
111 available_buffer_capacity_ += bytes_sent;
118 while (bytes_sent != 0 && !sends_awaiting_ack_.empty() && 112 while (bytes_sent != 0 && !sends_awaiting_ack_.empty() &&
119 sends_awaiting_ack_.front()->ReportBytesSent(&bytes_sent)) { 113 sends_awaiting_ack_.front()->ReportBytesSent(&bytes_sent)) {
120 sends_awaiting_ack_.pop(); 114 sends_awaiting_ack_.pop();
121 } 115 }
122 if (bytes_sent > 0 && !pending_sends_.empty()) { 116 if (bytes_sent > 0 && !pending_sends_.empty()) {
123 bool finished = pending_sends_.front()->ReportBytesSent(&bytes_sent); 117 bool finished = pending_sends_.front()->ReportBytesSent(&bytes_sent);
124 DCHECK(!finished); 118 DCHECK(!finished);
125 if (finished) { 119 if (finished) {
126 ShutDown(); 120 ShutDown();
127 return; 121 return;
128 } 122 }
129 } 123 }
130 if (bytes_sent != 0) { 124 if (bytes_sent != 0) {
131 ShutDown(); 125 ShutDown();
132 return; 126 return;
133 } 127 }
134 if (pending_sends_.empty() && sends_awaiting_ack_.empty()) 128 if (pending_sends_.empty() && sends_awaiting_ack_.empty())
135 RunCancelCallback(); 129 RunCancelCallback();
130 SendInternal();
136 } 131 }
137 132
138 void DataSender::ReportBytesSentAndError( 133 void DataSender::ReportBytesSentAndError(
139 uint32_t bytes_sent, 134 uint32_t bytes_sent,
140 int32_t error, 135 int32_t error,
141 const mojo::Callback<void(uint32_t)>& callback) { 136 const mojo::Callback<void()>& callback) {
142 if (shut_down_) 137 if (shut_down_)
143 return; 138 return;
144 139
145 uint32_t bytes_to_flush = 0; 140 available_buffer_capacity_ += bytes_sent;
146 while (!sends_awaiting_ack_.empty()) { 141 while (!sends_awaiting_ack_.empty()) {
147 bytes_to_flush += sends_awaiting_ack_.front()->ReportBytesSentAndError( 142 available_buffer_capacity_ +=
148 &bytes_sent, error); 143 sends_awaiting_ack_.front()->ReportBytesSentAndError(&bytes_sent,
144 error);
149 sends_awaiting_ack_.pop(); 145 sends_awaiting_ack_.pop();
150 } 146 }
151 while (!pending_sends_.empty()) { 147 while (!pending_sends_.empty()) {
152 bytes_to_flush += 148 available_buffer_capacity_ +=
153 pending_sends_.front()->ReportBytesSentAndError(&bytes_sent, error); 149 pending_sends_.front()->ReportBytesSentAndError(&bytes_sent, error);
154 pending_sends_.pop(); 150 pending_sends_.pop();
155 } 151 }
156 callback.Run(bytes_to_flush); 152 callback.Run();
157 RunCancelCallback(); 153 RunCancelCallback();
158 } 154 }
159 155
160 void DataSender::OnConnectionError() { 156 void DataSender::OnConnectionError() {
161 ShutDown(); 157 ShutDown();
162 } 158 }
163 159
164 void DataSender::SendInternal() { 160 void DataSender::SendInternal() {
165 while (!pending_sends_.empty()) { 161 while (!pending_sends_.empty() && available_buffer_capacity_) {
166 MojoResult result = pending_sends_.front()->SendData(handle_.get()); 162 if (pending_sends_.front()->SendData(sink_.get(),
167 if (result == MOJO_RESULT_OK) { 163 &available_buffer_capacity_)) {
168 sends_awaiting_ack_.push(pending_sends_.front()); 164 sends_awaiting_ack_.push(pending_sends_.front());
169 pending_sends_.pop(); 165 pending_sends_.pop();
170 } else if (result == MOJO_RESULT_SHOULD_WAIT) {
171 waiter_.reset(new AsyncWaiter(
172 handle_.get(),
173 MOJO_HANDLE_SIGNAL_WRITABLE,
174 base::Bind(&DataSender::OnDoneWaiting, base::Unretained(this))));
175 return;
176 } else {
177 ShutDown();
178 return;
179 } 166 }
180 } 167 }
181 } 168 }
182 169
183 void DataSender::OnDoneWaiting(MojoResult result) {
184 waiter_.reset();
185 if (result != MOJO_RESULT_OK) {
186 ShutDown();
187 return;
188 }
189 SendInternal();
190 }
191
192 void DataSender::RunCancelCallback() { 170 void DataSender::RunCancelCallback() {
193 DCHECK(pending_sends_.empty() && sends_awaiting_ack_.empty()); 171 DCHECK(pending_sends_.empty() && sends_awaiting_ack_.empty());
194 if (pending_cancel_.is_null()) 172 if (pending_cancel_.is_null())
195 return; 173 return;
196 174
197 base::MessageLoop::current()->PostTask(FROM_HERE, 175 base::MessageLoop::current()->PostTask(FROM_HERE,
198 base::Bind(pending_cancel_)); 176 base::Bind(pending_cancel_));
199 pending_cancel_.Reset(); 177 pending_cancel_.Reset();
200 } 178 }
201 179
202 void DataSender::ShutDown() { 180 void DataSender::ShutDown() {
203 waiter_.reset();
204 shut_down_ = true; 181 shut_down_ = true;
205 while (!pending_sends_.empty()) { 182 while (!pending_sends_.empty()) {
206 pending_sends_.front()->DispatchFatalError(); 183 pending_sends_.front()->DispatchFatalError();
207 pending_sends_.pop(); 184 pending_sends_.pop();
208 } 185 }
209 while (!sends_awaiting_ack_.empty()) { 186 while (!sends_awaiting_ack_.empty()) {
210 sends_awaiting_ack_.front()->DispatchFatalError(); 187 sends_awaiting_ack_.front()->DispatchFatalError();
211 sends_awaiting_ack_.pop(); 188 sends_awaiting_ack_.pop();
212 } 189 }
213 RunCancelCallback(); 190 RunCancelCallback();
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
246 base::MessageLoop::current()->PostTask( 223 base::MessageLoop::current()->PostTask(
247 FROM_HERE, base::Bind(error_callback_, bytes_acked_, error)); 224 FROM_HERE, base::Bind(error_callback_, bytes_acked_, error));
248 return bytes_sent_ - bytes_acked_; 225 return bytes_sent_ - bytes_acked_;
249 } 226 }
250 227
251 void DataSender::PendingSend::DispatchFatalError() { 228 void DataSender::PendingSend::DispatchFatalError() {
252 base::MessageLoop::current()->PostTask( 229 base::MessageLoop::current()->PostTask(
253 FROM_HERE, base::Bind(error_callback_, 0, fatal_error_value_)); 230 FROM_HERE, base::Bind(error_callback_, 0, fatal_error_value_));
254 } 231 }
255 232
256 MojoResult DataSender::PendingSend::SendData( 233 bool DataSender::PendingSend::SendData(serial::DataSink* sink,
257 mojo::DataPipeProducerHandle handle) { 234 uint32_t* capacity) {
raymes 2014/10/17 01:55:42 I think calling this available_buffer_size to matc
Sam McNally 2014/10/20 05:12:58 Done.
258 uint32_t bytes_to_send = static_cast<uint32_t>(data_.size()) - bytes_sent_; 235 uint32_t num_bytes_to_send =
259 MojoResult result = mojo::WriteDataRaw(handle, 236 std::min(static_cast<uint32_t>(data_.size() - bytes_sent_), *capacity);
260 data_.data() + bytes_sent_, 237 mojo::Array<uint8_t> bytes(num_bytes_to_send);
261 &bytes_to_send, 238 memcpy(&bytes[0], data_.data() + bytes_sent_, num_bytes_to_send);
262 MOJO_WRITE_DATA_FLAG_NONE); 239 bytes_sent_ += num_bytes_to_send;
263 if (result != MOJO_RESULT_OK) 240 *capacity -= num_bytes_to_send;
264 return result; 241 sink->AcceptData(bytes.Pass());
265 242 return bytes_sent_ == data_.size();
266 bytes_sent_ += bytes_to_send;
267 return bytes_sent_ == data_.size() ? MOJO_RESULT_OK : MOJO_RESULT_SHOULD_WAIT;
268 } 243 }
269 244
270 void DataSender::PendingSend::ReportBytesSentInternal(uint32_t* num_bytes) { 245 void DataSender::PendingSend::ReportBytesSentInternal(uint32_t* num_bytes) {
271 bytes_acked_ += *num_bytes; 246 bytes_acked_ += *num_bytes;
272 if (bytes_acked_ > bytes_sent_) { 247 if (bytes_acked_ > bytes_sent_) {
273 *num_bytes = bytes_acked_ - bytes_sent_; 248 *num_bytes = bytes_acked_ - bytes_sent_;
274 bytes_acked_ = bytes_sent_; 249 bytes_acked_ = bytes_sent_;
275 } else { 250 } else {
276 *num_bytes = 0; 251 *num_bytes = 0;
277 } 252 }
278 } 253 }
279 254
280 } // namespace device 255 } // namespace device
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698