| OLD | NEW |
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "device/serial/data_sender.h" | 5 #include "device/serial/data_sender.h" |
| 6 | 6 |
| 7 #include <algorithm> |
| 8 |
| 7 #include "base/bind.h" | 9 #include "base/bind.h" |
| 8 #include "base/message_loop/message_loop.h" | 10 #include "base/message_loop/message_loop.h" |
| 9 #include "device/serial/async_waiter.h" | |
| 10 | 11 |
| 11 namespace device { | 12 namespace device { |
| 12 | 13 |
| 13 // Represents a send that is not yet fulfilled. | 14 // Represents a send that is not yet fulfilled. |
| 14 class DataSender::PendingSend { | 15 class DataSender::PendingSend { |
| 15 public: | 16 public: |
| 16 PendingSend(const base::StringPiece& data, | 17 PendingSend(const base::StringPiece& data, |
| 17 const DataSentCallback& callback, | 18 const DataSentCallback& callback, |
| 18 const SendErrorCallback& error_callback, | 19 const SendErrorCallback& error_callback, |
| 19 int32_t fatal_error_value); | 20 int32_t fatal_error_value); |
| 20 | 21 |
| 21 // Invoked to report that |num_bytes| of data have been sent. Subtracts the | 22 // Invoked to report that |num_bytes| of data have been sent. Subtracts the |
| 22 // number of bytes that were part of this send from |num_bytes|. Returns | 23 // number of bytes that were part of this send from |num_bytes|. Returns |
| 23 // whether this send has been completed. If this send has been completed, this | 24 // whether this send has been completed. If this send has been completed, this |
| 24 // calls |callback_|. | 25 // calls |callback_|. |
| 25 bool ReportBytesSent(uint32_t* num_bytes); | 26 bool ReportBytesSent(uint32_t* num_bytes); |
| 26 | 27 |
| 27 // Invoked to report that |num_bytes| of data have been sent and then an | 28 // Invoked to report that |num_bytes| of data have been sent and then an |
| 28 // error, |error| was encountered. Subtracts the number of bytes that were | 29 // error, |error| was encountered. Subtracts the number of bytes that were |
| 29 // part of this send from |num_bytes|. If this send was not completed before | 30 // part of this send from |num_bytes|. If this send was not completed before |
| 30 // the error, this calls |error_callback_| to report the error. Otherwise, | 31 // the error, this calls |error_callback_| to report the error. Otherwise, |
| 31 // this calls |callback_|. Returns the number of bytes sent but not acked. | 32 // this calls |callback_|. Returns the number of bytes sent but not acked. |
| 32 uint32_t ReportBytesSentAndError(uint32_t* num_bytes, int32_t error); | 33 uint32_t ReportBytesSentAndError(uint32_t* num_bytes, int32_t error); |
| 33 | 34 |
| 34 // Reports |fatal_error_value_| to |receive_error_callback_|. | 35 // Reports |fatal_error_value_| to |receive_error_callback_|. |
| 35 void DispatchFatalError(); | 36 void DispatchFatalError(); |
| 36 | 37 |
| 37 // Attempts to send any data not yet sent to |handle|. Returns MOJO_RESULT_OK | 38 // Attempts to send any data not yet sent to |sink|. |
| 38 // if all data is sent, MOJO_RESULT_SHOULD_WAIT if not all of the data is sent | 39 bool SendData(serial::DataSink* sink, uint32_t* available_buffer_size); |
| 39 // or the error if one is encountered writing to |handle|. | |
| 40 MojoResult SendData(mojo::DataPipeProducerHandle handle); | |
| 41 | 40 |
| 42 private: | 41 private: |
| 43 // Invoked to update |bytes_acked_| and |num_bytes|. | 42 // Invoked to update |bytes_acked_| and |num_bytes|. |
| 44 void ReportBytesSentInternal(uint32_t* num_bytes); | 43 void ReportBytesSentInternal(uint32_t* num_bytes); |
| 45 | 44 |
| 46 // The data to send. | 45 // The data to send. |
| 47 const base::StringPiece data_; | 46 const base::StringPiece data_; |
| 48 | 47 |
| 49 // The callback to report success. | 48 // The callback to report success. |
| 50 const DataSentCallback callback_; | 49 const DataSentCallback callback_; |
| 51 | 50 |
| 52 // The callback to report errors. | 51 // The callback to report errors. |
| 53 const SendErrorCallback error_callback_; | 52 const SendErrorCallback error_callback_; |
| 54 | 53 |
| 55 // The error value to report when DispatchFatalError() is called. | 54 // The error value to report when DispatchFatalError() is called. |
| 56 const int32_t fatal_error_value_; | 55 const int32_t fatal_error_value_; |
| 57 | 56 |
| 58 // The number of bytes sent to the data pipe. | 57 // The number of bytes sent to the DataSink. |
| 59 uint32_t bytes_sent_; | 58 uint32_t bytes_sent_; |
| 60 | 59 |
| 61 // The number of bytes acked. | 60 // The number of bytes acked. |
| 62 uint32_t bytes_acked_; | 61 uint32_t bytes_acked_; |
| 63 }; | 62 }; |
| 64 | 63 |
| 65 DataSender::DataSender(mojo::InterfacePtr<serial::DataSink> sink, | 64 DataSender::DataSender(mojo::InterfacePtr<serial::DataSink> sink, |
| 66 uint32_t buffer_size, | 65 uint32_t buffer_size, |
| 67 int32_t fatal_error_value) | 66 int32_t fatal_error_value) |
| 68 : sink_(sink.Pass()), | 67 : sink_(sink.Pass()), |
| 69 fatal_error_value_(fatal_error_value), | 68 fatal_error_value_(fatal_error_value), |
| 69 available_buffer_capacity_(buffer_size), |
| 70 shut_down_(false) { | 70 shut_down_(false) { |
| 71 sink_.set_error_handler(this); | 71 sink_.set_error_handler(this); |
| 72 MojoCreateDataPipeOptions options = { | |
| 73 sizeof(options), MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1, buffer_size, | |
| 74 }; | |
| 75 options.struct_size = sizeof(options); | |
| 76 mojo::ScopedDataPipeConsumerHandle remote_handle; | |
| 77 MojoResult result = mojo::CreateDataPipe(&options, &handle_, &remote_handle); | |
| 78 DCHECK_EQ(MOJO_RESULT_OK, result); | |
| 79 sink_->Init(remote_handle.Pass()); | |
| 80 sink_.set_client(this); | 72 sink_.set_client(this); |
| 73 sink_->Init(buffer_size); |
| 81 } | 74 } |
| 82 | 75 |
| 83 DataSender::~DataSender() { | 76 DataSender::~DataSender() { |
| 84 ShutDown(); | 77 ShutDown(); |
| 85 } | 78 } |
| 86 | 79 |
| 87 bool DataSender::Send(const base::StringPiece& data, | 80 bool DataSender::Send(const base::StringPiece& data, |
| 88 const DataSentCallback& callback, | 81 const DataSentCallback& callback, |
| 89 const SendErrorCallback& error_callback) { | 82 const SendErrorCallback& error_callback) { |
| 90 DCHECK(!callback.is_null() && !error_callback.is_null()); | 83 DCHECK(!callback.is_null() && !error_callback.is_null()); |
| (...skipping 17 matching lines...) Expand all Loading... |
| 108 | 101 |
| 109 pending_cancel_ = callback; | 102 pending_cancel_ = callback; |
| 110 sink_->Cancel(error); | 103 sink_->Cancel(error); |
| 111 return true; | 104 return true; |
| 112 } | 105 } |
| 113 | 106 |
| 114 void DataSender::ReportBytesSent(uint32_t bytes_sent) { | 107 void DataSender::ReportBytesSent(uint32_t bytes_sent) { |
| 115 if (shut_down_) | 108 if (shut_down_) |
| 116 return; | 109 return; |
| 117 | 110 |
| 111 available_buffer_capacity_ += bytes_sent; |
| 118 while (bytes_sent != 0 && !sends_awaiting_ack_.empty() && | 112 while (bytes_sent != 0 && !sends_awaiting_ack_.empty() && |
| 119 sends_awaiting_ack_.front()->ReportBytesSent(&bytes_sent)) { | 113 sends_awaiting_ack_.front()->ReportBytesSent(&bytes_sent)) { |
| 120 sends_awaiting_ack_.pop(); | 114 sends_awaiting_ack_.pop(); |
| 121 } | 115 } |
| 122 if (bytes_sent > 0 && !pending_sends_.empty()) { | 116 if (bytes_sent > 0 && !pending_sends_.empty()) { |
| 123 bool finished = pending_sends_.front()->ReportBytesSent(&bytes_sent); | 117 bool finished = pending_sends_.front()->ReportBytesSent(&bytes_sent); |
| 124 DCHECK(!finished); | 118 DCHECK(!finished); |
| 125 if (finished) { | 119 if (finished) { |
| 126 ShutDown(); | 120 ShutDown(); |
| 127 return; | 121 return; |
| 128 } | 122 } |
| 129 } | 123 } |
| 130 if (bytes_sent != 0) { | 124 if (bytes_sent != 0) { |
| 131 ShutDown(); | 125 ShutDown(); |
| 132 return; | 126 return; |
| 133 } | 127 } |
| 134 if (pending_sends_.empty() && sends_awaiting_ack_.empty()) | 128 if (pending_sends_.empty() && sends_awaiting_ack_.empty()) |
| 135 RunCancelCallback(); | 129 RunCancelCallback(); |
| 130 SendInternal(); |
| 136 } | 131 } |
| 137 | 132 |
| 138 void DataSender::ReportBytesSentAndError( | 133 void DataSender::ReportBytesSentAndError( |
| 139 uint32_t bytes_sent, | 134 uint32_t bytes_sent, |
| 140 int32_t error, | 135 int32_t error, |
| 141 const mojo::Callback<void(uint32_t)>& callback) { | 136 const mojo::Callback<void()>& callback) { |
| 142 if (shut_down_) | 137 if (shut_down_) |
| 143 return; | 138 return; |
| 144 | 139 |
| 145 uint32_t bytes_to_flush = 0; | 140 available_buffer_capacity_ += bytes_sent; |
| 146 while (!sends_awaiting_ack_.empty()) { | 141 while (!sends_awaiting_ack_.empty()) { |
| 147 bytes_to_flush += sends_awaiting_ack_.front()->ReportBytesSentAndError( | 142 available_buffer_capacity_ += |
| 148 &bytes_sent, error); | 143 sends_awaiting_ack_.front()->ReportBytesSentAndError(&bytes_sent, |
| 144 error); |
| 149 sends_awaiting_ack_.pop(); | 145 sends_awaiting_ack_.pop(); |
| 150 } | 146 } |
| 151 while (!pending_sends_.empty()) { | 147 while (!pending_sends_.empty()) { |
| 152 bytes_to_flush += | 148 available_buffer_capacity_ += |
| 153 pending_sends_.front()->ReportBytesSentAndError(&bytes_sent, error); | 149 pending_sends_.front()->ReportBytesSentAndError(&bytes_sent, error); |
| 154 pending_sends_.pop(); | 150 pending_sends_.pop(); |
| 155 } | 151 } |
| 156 callback.Run(bytes_to_flush); | 152 callback.Run(); |
| 157 RunCancelCallback(); | 153 RunCancelCallback(); |
| 158 } | 154 } |
| 159 | 155 |
| 160 void DataSender::OnConnectionError() { | 156 void DataSender::OnConnectionError() { |
| 161 ShutDown(); | 157 ShutDown(); |
| 162 } | 158 } |
| 163 | 159 |
| 164 void DataSender::SendInternal() { | 160 void DataSender::SendInternal() { |
| 165 while (!pending_sends_.empty()) { | 161 while (!pending_sends_.empty() && available_buffer_capacity_) { |
| 166 MojoResult result = pending_sends_.front()->SendData(handle_.get()); | 162 if (pending_sends_.front()->SendData(sink_.get(), |
| 167 if (result == MOJO_RESULT_OK) { | 163 &available_buffer_capacity_)) { |
| 168 sends_awaiting_ack_.push(pending_sends_.front()); | 164 sends_awaiting_ack_.push(pending_sends_.front()); |
| 169 pending_sends_.pop(); | 165 pending_sends_.pop(); |
| 170 } else if (result == MOJO_RESULT_SHOULD_WAIT) { | |
| 171 waiter_.reset(new AsyncWaiter( | |
| 172 handle_.get(), | |
| 173 MOJO_HANDLE_SIGNAL_WRITABLE, | |
| 174 base::Bind(&DataSender::OnDoneWaiting, base::Unretained(this)))); | |
| 175 return; | |
| 176 } else { | |
| 177 ShutDown(); | |
| 178 return; | |
| 179 } | 166 } |
| 180 } | 167 } |
| 181 } | 168 } |
| 182 | 169 |
| 183 void DataSender::OnDoneWaiting(MojoResult result) { | |
| 184 waiter_.reset(); | |
| 185 if (result != MOJO_RESULT_OK) { | |
| 186 ShutDown(); | |
| 187 return; | |
| 188 } | |
| 189 SendInternal(); | |
| 190 } | |
| 191 | |
| 192 void DataSender::RunCancelCallback() { | 170 void DataSender::RunCancelCallback() { |
| 193 DCHECK(pending_sends_.empty() && sends_awaiting_ack_.empty()); | 171 DCHECK(pending_sends_.empty() && sends_awaiting_ack_.empty()); |
| 194 if (pending_cancel_.is_null()) | 172 if (pending_cancel_.is_null()) |
| 195 return; | 173 return; |
| 196 | 174 |
| 197 base::MessageLoop::current()->PostTask(FROM_HERE, | 175 base::MessageLoop::current()->PostTask(FROM_HERE, |
| 198 base::Bind(pending_cancel_)); | 176 base::Bind(pending_cancel_)); |
| 199 pending_cancel_.Reset(); | 177 pending_cancel_.Reset(); |
| 200 } | 178 } |
| 201 | 179 |
| 202 void DataSender::ShutDown() { | 180 void DataSender::ShutDown() { |
| 203 waiter_.reset(); | |
| 204 shut_down_ = true; | 181 shut_down_ = true; |
| 205 while (!pending_sends_.empty()) { | 182 while (!pending_sends_.empty()) { |
| 206 pending_sends_.front()->DispatchFatalError(); | 183 pending_sends_.front()->DispatchFatalError(); |
| 207 pending_sends_.pop(); | 184 pending_sends_.pop(); |
| 208 } | 185 } |
| 209 while (!sends_awaiting_ack_.empty()) { | 186 while (!sends_awaiting_ack_.empty()) { |
| 210 sends_awaiting_ack_.front()->DispatchFatalError(); | 187 sends_awaiting_ack_.front()->DispatchFatalError(); |
| 211 sends_awaiting_ack_.pop(); | 188 sends_awaiting_ack_.pop(); |
| 212 } | 189 } |
| 213 RunCancelCallback(); | 190 RunCancelCallback(); |
| (...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 246 base::MessageLoop::current()->PostTask( | 223 base::MessageLoop::current()->PostTask( |
| 247 FROM_HERE, base::Bind(error_callback_, bytes_acked_, error)); | 224 FROM_HERE, base::Bind(error_callback_, bytes_acked_, error)); |
| 248 return bytes_sent_ - bytes_acked_; | 225 return bytes_sent_ - bytes_acked_; |
| 249 } | 226 } |
| 250 | 227 |
| 251 void DataSender::PendingSend::DispatchFatalError() { | 228 void DataSender::PendingSend::DispatchFatalError() { |
| 252 base::MessageLoop::current()->PostTask( | 229 base::MessageLoop::current()->PostTask( |
| 253 FROM_HERE, base::Bind(error_callback_, 0, fatal_error_value_)); | 230 FROM_HERE, base::Bind(error_callback_, 0, fatal_error_value_)); |
| 254 } | 231 } |
| 255 | 232 |
| 256 MojoResult DataSender::PendingSend::SendData( | 233 bool DataSender::PendingSend::SendData(serial::DataSink* sink, |
| 257 mojo::DataPipeProducerHandle handle) { | 234 uint32_t* available_buffer_size) { |
| 258 uint32_t bytes_to_send = static_cast<uint32_t>(data_.size()) - bytes_sent_; | 235 uint32_t num_bytes_to_send = |
| 259 MojoResult result = mojo::WriteDataRaw(handle, | 236 std::min(static_cast<uint32_t>(data_.size() - bytes_sent_), |
| 260 data_.data() + bytes_sent_, | 237 *available_buffer_size); |
| 261 &bytes_to_send, | 238 mojo::Array<uint8_t> bytes(num_bytes_to_send); |
| 262 MOJO_WRITE_DATA_FLAG_NONE); | 239 memcpy(&bytes[0], data_.data() + bytes_sent_, num_bytes_to_send); |
| 263 if (result != MOJO_RESULT_OK) | 240 bytes_sent_ += num_bytes_to_send; |
| 264 return result; | 241 *available_buffer_size -= num_bytes_to_send; |
| 265 | 242 sink->OnData(bytes.Pass()); |
| 266 bytes_sent_ += bytes_to_send; | 243 return bytes_sent_ == data_.size(); |
| 267 return bytes_sent_ == data_.size() ? MOJO_RESULT_OK : MOJO_RESULT_SHOULD_WAIT; | |
| 268 } | 244 } |
| 269 | 245 |
| 270 void DataSender::PendingSend::ReportBytesSentInternal(uint32_t* num_bytes) { | 246 void DataSender::PendingSend::ReportBytesSentInternal(uint32_t* num_bytes) { |
| 271 bytes_acked_ += *num_bytes; | 247 bytes_acked_ += *num_bytes; |
| 272 if (bytes_acked_ > bytes_sent_) { | 248 if (bytes_acked_ > bytes_sent_) { |
| 273 *num_bytes = bytes_acked_ - bytes_sent_; | 249 *num_bytes = bytes_acked_ - bytes_sent_; |
| 274 bytes_acked_ = bytes_sent_; | 250 bytes_acked_ = bytes_sent_; |
| 275 } else { | 251 } else { |
| 276 *num_bytes = 0; | 252 *num_bytes = 0; |
| 277 } | 253 } |
| 278 } | 254 } |
| 279 | 255 |
| 280 } // namespace device | 256 } // namespace device |
| OLD | NEW |