OLD | NEW |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "device/serial/data_sender.h" | 5 #include "device/serial/data_sender.h" |
6 | 6 |
| 7 #include <algorithm> |
| 8 |
7 #include "base/bind.h" | 9 #include "base/bind.h" |
8 #include "base/message_loop/message_loop.h" | 10 #include "base/message_loop/message_loop.h" |
9 #include "device/serial/async_waiter.h" | |
10 | 11 |
11 namespace device { | 12 namespace device { |
12 | 13 |
13 // Represents a send that is not yet fulfilled. | 14 // Represents a send that is not yet fulfilled. |
14 class DataSender::PendingSend { | 15 class DataSender::PendingSend { |
15 public: | 16 public: |
16 PendingSend(const base::StringPiece& data, | 17 PendingSend(const base::StringPiece& data, |
17 const DataSentCallback& callback, | 18 const DataSentCallback& callback, |
18 const SendErrorCallback& error_callback, | 19 const SendErrorCallback& error_callback, |
19 int32_t fatal_error_value); | 20 int32_t fatal_error_value); |
20 | 21 |
21 // Invoked to report that |num_bytes| of data have been sent. Subtracts the | 22 // Invoked to report that |num_bytes| of data have been sent. Subtracts the |
22 // number of bytes that were part of this send from |num_bytes|. Returns | 23 // number of bytes that were part of this send from |num_bytes|. Returns |
23 // whether this send has been completed. If this send has been completed, this | 24 // whether this send has been completed. If this send has been completed, this |
24 // calls |callback_|. | 25 // calls |callback_|. |
25 bool ReportBytesSent(uint32_t* num_bytes); | 26 bool ReportBytesSent(uint32_t* num_bytes); |
26 | 27 |
27 // Invoked to report that |num_bytes| of data have been sent and then an | 28 // Invoked to report that |num_bytes| of data have been sent and then an |
28 // error, |error| was encountered. Subtracts the number of bytes that were | 29 // error, |error| was encountered. Subtracts the number of bytes that were |
29 // part of this send from |num_bytes|. If this send was not completed before | 30 // part of this send from |num_bytes|. If this send was not completed before |
30 // the error, this calls |error_callback_| to report the error. Otherwise, | 31 // the error, this calls |error_callback_| to report the error. Otherwise, |
31 // this calls |callback_|. Returns the number of bytes sent but not acked. | 32 // this calls |callback_|. Returns the number of bytes sent but not acked. |
32 uint32_t ReportBytesSentAndError(uint32_t* num_bytes, int32_t error); | 33 uint32_t ReportBytesSentAndError(uint32_t* num_bytes, int32_t error); |
33 | 34 |
34 // Reports |fatal_error_value_| to |receive_error_callback_|. | 35 // Reports |fatal_error_value_| to |receive_error_callback_|. |
35 void DispatchFatalError(); | 36 void DispatchFatalError(); |
36 | 37 |
37 // Attempts to send any data not yet sent to |handle|. Returns MOJO_RESULT_OK | 38 // Attempts to send any data not yet sent to |sink|. |
38 // if all data is sent, MOJO_RESULT_SHOULD_WAIT if not all of the data is sent | 39 bool SendData(serial::DataSink* sink, uint32_t* available_buffer_size); |
39 // or the error if one is encountered writing to |handle|. | |
40 MojoResult SendData(mojo::DataPipeProducerHandle handle); | |
41 | 40 |
42 private: | 41 private: |
43 // Invoked to update |bytes_acked_| and |num_bytes|. | 42 // Invoked to update |bytes_acked_| and |num_bytes|. |
44 void ReportBytesSentInternal(uint32_t* num_bytes); | 43 void ReportBytesSentInternal(uint32_t* num_bytes); |
45 | 44 |
46 // The data to send. | 45 // The data to send. |
47 const base::StringPiece data_; | 46 const base::StringPiece data_; |
48 | 47 |
49 // The callback to report success. | 48 // The callback to report success. |
50 const DataSentCallback callback_; | 49 const DataSentCallback callback_; |
51 | 50 |
52 // The callback to report errors. | 51 // The callback to report errors. |
53 const SendErrorCallback error_callback_; | 52 const SendErrorCallback error_callback_; |
54 | 53 |
55 // The error value to report when DispatchFatalError() is called. | 54 // The error value to report when DispatchFatalError() is called. |
56 const int32_t fatal_error_value_; | 55 const int32_t fatal_error_value_; |
57 | 56 |
58 // The number of bytes sent to the data pipe. | 57 // The number of bytes sent to the DataSink. |
59 uint32_t bytes_sent_; | 58 uint32_t bytes_sent_; |
60 | 59 |
61 // The number of bytes acked. | 60 // The number of bytes acked. |
62 uint32_t bytes_acked_; | 61 uint32_t bytes_acked_; |
63 }; | 62 }; |
64 | 63 |
65 DataSender::DataSender(mojo::InterfacePtr<serial::DataSink> sink, | 64 DataSender::DataSender(mojo::InterfacePtr<serial::DataSink> sink, |
66 uint32_t buffer_size, | 65 uint32_t buffer_size, |
67 int32_t fatal_error_value) | 66 int32_t fatal_error_value) |
68 : sink_(sink.Pass()), | 67 : sink_(sink.Pass()), |
69 fatal_error_value_(fatal_error_value), | 68 fatal_error_value_(fatal_error_value), |
| 69 available_buffer_capacity_(buffer_size), |
70 shut_down_(false) { | 70 shut_down_(false) { |
71 sink_.set_error_handler(this); | 71 sink_.set_error_handler(this); |
72 MojoCreateDataPipeOptions options = { | |
73 sizeof(options), MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1, buffer_size, | |
74 }; | |
75 options.struct_size = sizeof(options); | |
76 mojo::ScopedDataPipeConsumerHandle remote_handle; | |
77 MojoResult result = mojo::CreateDataPipe(&options, &handle_, &remote_handle); | |
78 DCHECK_EQ(MOJO_RESULT_OK, result); | |
79 sink_->Init(remote_handle.Pass()); | |
80 sink_.set_client(this); | 72 sink_.set_client(this); |
| 73 sink_->Init(buffer_size); |
81 } | 74 } |
82 | 75 |
83 DataSender::~DataSender() { | 76 DataSender::~DataSender() { |
84 ShutDown(); | 77 ShutDown(); |
85 } | 78 } |
86 | 79 |
87 bool DataSender::Send(const base::StringPiece& data, | 80 bool DataSender::Send(const base::StringPiece& data, |
88 const DataSentCallback& callback, | 81 const DataSentCallback& callback, |
89 const SendErrorCallback& error_callback) { | 82 const SendErrorCallback& error_callback) { |
90 DCHECK(!callback.is_null() && !error_callback.is_null()); | 83 DCHECK(!callback.is_null() && !error_callback.is_null()); |
(...skipping 17 matching lines...) Expand all Loading... |
108 | 101 |
109 pending_cancel_ = callback; | 102 pending_cancel_ = callback; |
110 sink_->Cancel(error); | 103 sink_->Cancel(error); |
111 return true; | 104 return true; |
112 } | 105 } |
113 | 106 |
114 void DataSender::ReportBytesSent(uint32_t bytes_sent) { | 107 void DataSender::ReportBytesSent(uint32_t bytes_sent) { |
115 if (shut_down_) | 108 if (shut_down_) |
116 return; | 109 return; |
117 | 110 |
| 111 available_buffer_capacity_ += bytes_sent; |
118 while (bytes_sent != 0 && !sends_awaiting_ack_.empty() && | 112 while (bytes_sent != 0 && !sends_awaiting_ack_.empty() && |
119 sends_awaiting_ack_.front()->ReportBytesSent(&bytes_sent)) { | 113 sends_awaiting_ack_.front()->ReportBytesSent(&bytes_sent)) { |
120 sends_awaiting_ack_.pop(); | 114 sends_awaiting_ack_.pop(); |
121 } | 115 } |
122 if (bytes_sent > 0 && !pending_sends_.empty()) { | 116 if (bytes_sent > 0 && !pending_sends_.empty()) { |
123 bool finished = pending_sends_.front()->ReportBytesSent(&bytes_sent); | 117 bool finished = pending_sends_.front()->ReportBytesSent(&bytes_sent); |
124 DCHECK(!finished); | 118 DCHECK(!finished); |
125 if (finished) { | 119 if (finished) { |
126 ShutDown(); | 120 ShutDown(); |
127 return; | 121 return; |
128 } | 122 } |
129 } | 123 } |
130 if (bytes_sent != 0) { | 124 if (bytes_sent != 0) { |
131 ShutDown(); | 125 ShutDown(); |
132 return; | 126 return; |
133 } | 127 } |
134 if (pending_sends_.empty() && sends_awaiting_ack_.empty()) | 128 if (pending_sends_.empty() && sends_awaiting_ack_.empty()) |
135 RunCancelCallback(); | 129 RunCancelCallback(); |
| 130 SendInternal(); |
136 } | 131 } |
137 | 132 |
138 void DataSender::ReportBytesSentAndError( | 133 void DataSender::ReportBytesSentAndError( |
139 uint32_t bytes_sent, | 134 uint32_t bytes_sent, |
140 int32_t error, | 135 int32_t error, |
141 const mojo::Callback<void(uint32_t)>& callback) { | 136 const mojo::Callback<void()>& callback) { |
142 if (shut_down_) | 137 if (shut_down_) |
143 return; | 138 return; |
144 | 139 |
145 uint32_t bytes_to_flush = 0; | 140 available_buffer_capacity_ += bytes_sent; |
146 while (!sends_awaiting_ack_.empty()) { | 141 while (!sends_awaiting_ack_.empty()) { |
147 bytes_to_flush += sends_awaiting_ack_.front()->ReportBytesSentAndError( | 142 available_buffer_capacity_ += |
148 &bytes_sent, error); | 143 sends_awaiting_ack_.front()->ReportBytesSentAndError(&bytes_sent, |
| 144 error); |
149 sends_awaiting_ack_.pop(); | 145 sends_awaiting_ack_.pop(); |
150 } | 146 } |
151 while (!pending_sends_.empty()) { | 147 while (!pending_sends_.empty()) { |
152 bytes_to_flush += | 148 available_buffer_capacity_ += |
153 pending_sends_.front()->ReportBytesSentAndError(&bytes_sent, error); | 149 pending_sends_.front()->ReportBytesSentAndError(&bytes_sent, error); |
154 pending_sends_.pop(); | 150 pending_sends_.pop(); |
155 } | 151 } |
156 callback.Run(bytes_to_flush); | 152 callback.Run(); |
157 RunCancelCallback(); | 153 RunCancelCallback(); |
158 } | 154 } |
159 | 155 |
160 void DataSender::OnConnectionError() { | 156 void DataSender::OnConnectionError() { |
161 ShutDown(); | 157 ShutDown(); |
162 } | 158 } |
163 | 159 |
164 void DataSender::SendInternal() { | 160 void DataSender::SendInternal() { |
165 while (!pending_sends_.empty()) { | 161 while (!pending_sends_.empty() && available_buffer_capacity_) { |
166 MojoResult result = pending_sends_.front()->SendData(handle_.get()); | 162 if (pending_sends_.front()->SendData(sink_.get(), |
167 if (result == MOJO_RESULT_OK) { | 163 &available_buffer_capacity_)) { |
168 sends_awaiting_ack_.push(pending_sends_.front()); | 164 sends_awaiting_ack_.push(pending_sends_.front()); |
169 pending_sends_.pop(); | 165 pending_sends_.pop(); |
170 } else if (result == MOJO_RESULT_SHOULD_WAIT) { | |
171 waiter_.reset(new AsyncWaiter( | |
172 handle_.get(), | |
173 MOJO_HANDLE_SIGNAL_WRITABLE, | |
174 base::Bind(&DataSender::OnDoneWaiting, base::Unretained(this)))); | |
175 return; | |
176 } else { | |
177 ShutDown(); | |
178 return; | |
179 } | 166 } |
180 } | 167 } |
181 } | 168 } |
182 | 169 |
183 void DataSender::OnDoneWaiting(MojoResult result) { | |
184 waiter_.reset(); | |
185 if (result != MOJO_RESULT_OK) { | |
186 ShutDown(); | |
187 return; | |
188 } | |
189 SendInternal(); | |
190 } | |
191 | |
192 void DataSender::RunCancelCallback() { | 170 void DataSender::RunCancelCallback() { |
193 DCHECK(pending_sends_.empty() && sends_awaiting_ack_.empty()); | 171 DCHECK(pending_sends_.empty() && sends_awaiting_ack_.empty()); |
194 if (pending_cancel_.is_null()) | 172 if (pending_cancel_.is_null()) |
195 return; | 173 return; |
196 | 174 |
197 base::MessageLoop::current()->PostTask(FROM_HERE, | 175 base::MessageLoop::current()->PostTask(FROM_HERE, |
198 base::Bind(pending_cancel_)); | 176 base::Bind(pending_cancel_)); |
199 pending_cancel_.Reset(); | 177 pending_cancel_.Reset(); |
200 } | 178 } |
201 | 179 |
202 void DataSender::ShutDown() { | 180 void DataSender::ShutDown() { |
203 waiter_.reset(); | |
204 shut_down_ = true; | 181 shut_down_ = true; |
205 while (!pending_sends_.empty()) { | 182 while (!pending_sends_.empty()) { |
206 pending_sends_.front()->DispatchFatalError(); | 183 pending_sends_.front()->DispatchFatalError(); |
207 pending_sends_.pop(); | 184 pending_sends_.pop(); |
208 } | 185 } |
209 while (!sends_awaiting_ack_.empty()) { | 186 while (!sends_awaiting_ack_.empty()) { |
210 sends_awaiting_ack_.front()->DispatchFatalError(); | 187 sends_awaiting_ack_.front()->DispatchFatalError(); |
211 sends_awaiting_ack_.pop(); | 188 sends_awaiting_ack_.pop(); |
212 } | 189 } |
213 RunCancelCallback(); | 190 RunCancelCallback(); |
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
246 base::MessageLoop::current()->PostTask( | 223 base::MessageLoop::current()->PostTask( |
247 FROM_HERE, base::Bind(error_callback_, bytes_acked_, error)); | 224 FROM_HERE, base::Bind(error_callback_, bytes_acked_, error)); |
248 return bytes_sent_ - bytes_acked_; | 225 return bytes_sent_ - bytes_acked_; |
249 } | 226 } |
250 | 227 |
251 void DataSender::PendingSend::DispatchFatalError() { | 228 void DataSender::PendingSend::DispatchFatalError() { |
252 base::MessageLoop::current()->PostTask( | 229 base::MessageLoop::current()->PostTask( |
253 FROM_HERE, base::Bind(error_callback_, 0, fatal_error_value_)); | 230 FROM_HERE, base::Bind(error_callback_, 0, fatal_error_value_)); |
254 } | 231 } |
255 | 232 |
256 MojoResult DataSender::PendingSend::SendData( | 233 bool DataSender::PendingSend::SendData(serial::DataSink* sink, |
257 mojo::DataPipeProducerHandle handle) { | 234 uint32_t* available_buffer_size) { |
258 uint32_t bytes_to_send = static_cast<uint32_t>(data_.size()) - bytes_sent_; | 235 uint32_t num_bytes_to_send = |
259 MojoResult result = mojo::WriteDataRaw(handle, | 236 std::min(static_cast<uint32_t>(data_.size() - bytes_sent_), |
260 data_.data() + bytes_sent_, | 237 *available_buffer_size); |
261 &bytes_to_send, | 238 mojo::Array<uint8_t> bytes(num_bytes_to_send); |
262 MOJO_WRITE_DATA_FLAG_NONE); | 239 memcpy(&bytes[0], data_.data() + bytes_sent_, num_bytes_to_send); |
263 if (result != MOJO_RESULT_OK) | 240 bytes_sent_ += num_bytes_to_send; |
264 return result; | 241 *available_buffer_size -= num_bytes_to_send; |
265 | 242 sink->OnData(bytes.Pass()); |
266 bytes_sent_ += bytes_to_send; | 243 return bytes_sent_ == data_.size(); |
267 return bytes_sent_ == data_.size() ? MOJO_RESULT_OK : MOJO_RESULT_SHOULD_WAIT; | |
268 } | 244 } |
269 | 245 |
270 void DataSender::PendingSend::ReportBytesSentInternal(uint32_t* num_bytes) { | 246 void DataSender::PendingSend::ReportBytesSentInternal(uint32_t* num_bytes) { |
271 bytes_acked_ += *num_bytes; | 247 bytes_acked_ += *num_bytes; |
272 if (bytes_acked_ > bytes_sent_) { | 248 if (bytes_acked_ > bytes_sent_) { |
273 *num_bytes = bytes_acked_ - bytes_sent_; | 249 *num_bytes = bytes_acked_ - bytes_sent_; |
274 bytes_acked_ = bytes_sent_; | 250 bytes_acked_ = bytes_sent_; |
275 } else { | 251 } else { |
276 *num_bytes = 0; | 252 *num_bytes = 0; |
277 } | 253 } |
278 } | 254 } |
279 | 255 |
280 } // namespace device | 256 } // namespace device |
OLD | NEW |