| OLD | NEW |
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "device/serial/data_sender.h" | 5 #include "device/serial/data_sender.h" |
| 6 | 6 |
| 7 #include <algorithm> | 7 #include <algorithm> |
| 8 | 8 |
| 9 #include "base/bind.h" | 9 #include "base/bind.h" |
| 10 #include "base/message_loop/message_loop.h" | 10 #include "base/message_loop/message_loop.h" |
| 11 | 11 |
| 12 namespace device { | 12 namespace device { |
| 13 | 13 |
| 14 // Represents a send that is not yet fulfilled. | 14 // Represents a send that is not yet fulfilled. |
| 15 class DataSender::PendingSend { | 15 class DataSender::PendingSend { |
| 16 public: | 16 public: |
| 17 PendingSend(const base::StringPiece& data, | 17 PendingSend(const base::StringPiece& data, |
| 18 const DataSentCallback& callback, | 18 const DataSentCallback& callback, |
| 19 const SendErrorCallback& error_callback, | 19 const SendErrorCallback& error_callback, |
| 20 int32_t fatal_error_value); | 20 DataSender* sender); |
| 21 | 21 |
| 22 // Invoked to report that |num_bytes| of data have been sent. Subtracts the | 22 // Reports |fatal_error_value_| to |receive_error_callback_|. |
| 23 // number of bytes that were part of this send from |num_bytes|. Returns | 23 void DispatchFatalError(); |
| 24 // whether this send has been completed. If this send has been completed, this | |
| 25 // calls |callback_|. | |
| 26 bool ReportBytesSent(uint32_t* num_bytes); | |
| 27 | 24 |
| 25 // Attempts to send any data not yet sent to |sink|. |
| 26 void SendData(); |
| 27 |
| 28 private: |
| 28 // Invoked to report that |num_bytes| of data have been sent and then an | 29 // Invoked to report that |num_bytes| of data have been sent and then an |
| 29 // error, |error| was encountered. Subtracts the number of bytes that were | 30 // error, |error| was encountered. Subtracts the number of bytes that were |
| 30 // part of this send from |num_bytes|. If this send was not completed before | 31 // part of this send from |num_bytes|. If this send was not completed before |
| 31 // the error, this calls |error_callback_| to report the error. Otherwise, | 32 // the error, this calls |error_callback_| to report the error. Otherwise, |
| 32 // this calls |callback_|. Returns the number of bytes sent but not acked. | 33 // this calls |callback_|. Returns the number of bytes sent but not acked. |
| 33 uint32_t ReportBytesSentAndError(uint32_t* num_bytes, int32_t error); | 34 void OnDataSent(uint32_t num_bytes, int32_t error); |
| 34 | |
| 35 // Reports |fatal_error_value_| to |receive_error_callback_|. | |
| 36 void DispatchFatalError(); | |
| 37 | |
| 38 // Attempts to send any data not yet sent to |sink|. | |
| 39 bool SendData(serial::DataSink* sink, uint32_t* available_buffer_size); | |
| 40 | |
| 41 private: | |
| 42 // Invoked to update |bytes_acked_| and |num_bytes|. | |
| 43 void ReportBytesSentInternal(uint32_t* num_bytes); | |
| 44 | 35 |
| 45 // The data to send. | 36 // The data to send. |
| 46 const base::StringPiece data_; | 37 const base::StringPiece data_; |
| 47 | 38 |
| 48 // The callback to report success. | 39 // The callback to report success. |
| 49 const DataSentCallback callback_; | 40 const DataSentCallback callback_; |
| 50 | 41 |
| 51 // The callback to report errors. | 42 // The callback to report errors. |
| 52 const SendErrorCallback error_callback_; | 43 const SendErrorCallback error_callback_; |
| 53 | 44 |
| 54 // The error value to report when DispatchFatalError() is called. | 45 // The DataSender that owns this PendingSend. |
| 55 const int32_t fatal_error_value_; | 46 DataSender* sender_; |
| 56 | |
| 57 // The number of bytes sent to the DataSink. | |
| 58 uint32_t bytes_sent_; | |
| 59 | |
| 60 // The number of bytes acked. | |
| 61 uint32_t bytes_acked_; | |
| 62 }; | 47 }; |
| 63 | 48 |
| 64 DataSender::DataSender(mojo::InterfacePtr<serial::DataSink> sink, | 49 DataSender::DataSender(mojo::InterfacePtr<serial::DataSink> sink, |
| 65 uint32_t buffer_size, | 50 uint32_t buffer_size, |
| 66 int32_t fatal_error_value) | 51 int32_t fatal_error_value) |
| 67 : sink_(sink.Pass()), | 52 : sink_(sink.Pass()), |
| 68 fatal_error_value_(fatal_error_value), | 53 fatal_error_value_(fatal_error_value), |
| 69 available_buffer_capacity_(buffer_size), | |
| 70 shut_down_(false) { | 54 shut_down_(false) { |
| 71 sink_.set_error_handler(this); | 55 sink_.set_error_handler(this); |
| 72 sink_.set_client(this); | |
| 73 sink_->Init(buffer_size); | |
| 74 } | 56 } |
| 75 | 57 |
| 76 DataSender::~DataSender() { | 58 DataSender::~DataSender() { |
| 77 ShutDown(); | 59 ShutDown(); |
| 78 } | 60 } |
| 79 | 61 |
| 80 bool DataSender::Send(const base::StringPiece& data, | 62 bool DataSender::Send(const base::StringPiece& data, |
| 81 const DataSentCallback& callback, | 63 const DataSentCallback& callback, |
| 82 const SendErrorCallback& error_callback) { | 64 const SendErrorCallback& error_callback) { |
| 83 DCHECK(!callback.is_null() && !error_callback.is_null()); | 65 DCHECK(!callback.is_null() && !error_callback.is_null()); |
| 84 if (!pending_cancel_.is_null() || shut_down_) | 66 if (!pending_cancel_.is_null() || shut_down_) |
| 85 return false; | 67 return false; |
| 86 | 68 |
| 87 pending_sends_.push(linked_ptr<PendingSend>( | 69 linked_ptr<PendingSend> pending_send( |
| 88 new PendingSend(data, callback, error_callback, fatal_error_value_))); | 70 new PendingSend(data, callback, error_callback, this)); |
| 89 SendInternal(); | 71 pending_send->SendData(); |
| 72 sends_awaiting_ack_.push(pending_send); |
| 90 return true; | 73 return true; |
| 91 } | 74 } |
| 92 | 75 |
| 93 bool DataSender::Cancel(int32_t error, const CancelCallback& callback) { | 76 bool DataSender::Cancel(int32_t error, const CancelCallback& callback) { |
| 94 DCHECK(!callback.is_null()); | 77 DCHECK(!callback.is_null()); |
| 95 if (!pending_cancel_.is_null() || shut_down_) | 78 if (!pending_cancel_.is_null() || shut_down_) |
| 96 return false; | 79 return false; |
| 97 if (pending_sends_.empty() && sends_awaiting_ack_.empty()) { | 80 if (sends_awaiting_ack_.empty()) { |
| 98 base::MessageLoop::current()->PostTask(FROM_HERE, callback); | 81 base::MessageLoop::current()->PostTask(FROM_HERE, callback); |
| 99 return true; | 82 return true; |
| 100 } | 83 } |
| 101 | 84 |
| 102 pending_cancel_ = callback; | 85 pending_cancel_ = callback; |
| 103 sink_->Cancel(error); | 86 sink_->Cancel(error); |
| 104 return true; | 87 return true; |
| 105 } | 88 } |
| 106 | 89 |
| 107 void DataSender::ReportBytesSent(uint32_t bytes_sent) { | 90 void DataSender::SendComplete() { |
| 108 if (shut_down_) | 91 if (shut_down_) |
| 109 return; | 92 return; |
| 110 | 93 |
| 111 available_buffer_capacity_ += bytes_sent; | 94 DCHECK(!sends_awaiting_ack_.empty()); |
| 112 while (bytes_sent != 0 && !sends_awaiting_ack_.empty() && | 95 sends_awaiting_ack_.pop(); |
| 113 sends_awaiting_ack_.front()->ReportBytesSent(&bytes_sent)) { | 96 if (sends_awaiting_ack_.empty()) |
| 114 sends_awaiting_ack_.pop(); | |
| 115 } | |
| 116 if (bytes_sent > 0 && !pending_sends_.empty()) { | |
| 117 bool finished = pending_sends_.front()->ReportBytesSent(&bytes_sent); | |
| 118 DCHECK(!finished); | |
| 119 if (finished) { | |
| 120 ShutDown(); | |
| 121 return; | |
| 122 } | |
| 123 } | |
| 124 if (bytes_sent != 0) { | |
| 125 ShutDown(); | |
| 126 return; | |
| 127 } | |
| 128 if (pending_sends_.empty() && sends_awaiting_ack_.empty()) | |
| 129 RunCancelCallback(); | 97 RunCancelCallback(); |
| 130 SendInternal(); | |
| 131 } | 98 } |
| 132 | 99 |
| 133 void DataSender::ReportBytesSentAndError( | 100 void DataSender::SendFailed(int32_t error) { |
| 134 uint32_t bytes_sent, | |
| 135 int32_t error, | |
| 136 const mojo::Callback<void()>& callback) { | |
| 137 if (shut_down_) | 101 if (shut_down_) |
| 138 return; | 102 return; |
| 139 | 103 |
| 140 available_buffer_capacity_ += bytes_sent; | 104 DCHECK(!sends_awaiting_ack_.empty()); |
| 141 while (!sends_awaiting_ack_.empty()) { | 105 sends_awaiting_ack_.pop(); |
| 142 available_buffer_capacity_ += | 106 if (!sends_awaiting_ack_.empty()) |
| 143 sends_awaiting_ack_.front()->ReportBytesSentAndError(&bytes_sent, | 107 return; |
| 144 error); | 108 sink_->ClearError(); |
| 145 sends_awaiting_ack_.pop(); | |
| 146 } | |
| 147 while (!pending_sends_.empty()) { | |
| 148 available_buffer_capacity_ += | |
| 149 pending_sends_.front()->ReportBytesSentAndError(&bytes_sent, error); | |
| 150 pending_sends_.pop(); | |
| 151 } | |
| 152 callback.Run(); | |
| 153 RunCancelCallback(); | 109 RunCancelCallback(); |
| 154 } | 110 } |
| 155 | 111 |
| 156 void DataSender::OnConnectionError() { | 112 void DataSender::OnConnectionError() { |
| 157 ShutDown(); | 113 ShutDown(); |
| 158 } | 114 } |
| 159 | 115 |
| 160 void DataSender::SendInternal() { | |
| 161 while (!pending_sends_.empty() && available_buffer_capacity_) { | |
| 162 if (pending_sends_.front()->SendData(sink_.get(), | |
| 163 &available_buffer_capacity_)) { | |
| 164 sends_awaiting_ack_.push(pending_sends_.front()); | |
| 165 pending_sends_.pop(); | |
| 166 } | |
| 167 } | |
| 168 } | |
| 169 | |
| 170 void DataSender::RunCancelCallback() { | 116 void DataSender::RunCancelCallback() { |
| 171 DCHECK(pending_sends_.empty() && sends_awaiting_ack_.empty()); | 117 DCHECK(sends_awaiting_ack_.empty()); |
| 172 if (pending_cancel_.is_null()) | 118 if (pending_cancel_.is_null()) |
| 173 return; | 119 return; |
| 174 | 120 |
| 175 base::MessageLoop::current()->PostTask(FROM_HERE, | 121 base::MessageLoop::current()->PostTask(FROM_HERE, |
| 176 base::Bind(pending_cancel_)); | 122 base::Bind(pending_cancel_)); |
| 177 pending_cancel_.Reset(); | 123 pending_cancel_.Reset(); |
| 178 } | 124 } |
| 179 | 125 |
| 180 void DataSender::ShutDown() { | 126 void DataSender::ShutDown() { |
| 181 shut_down_ = true; | 127 shut_down_ = true; |
| 182 while (!pending_sends_.empty()) { | |
| 183 pending_sends_.front()->DispatchFatalError(); | |
| 184 pending_sends_.pop(); | |
| 185 } | |
| 186 while (!sends_awaiting_ack_.empty()) { | 128 while (!sends_awaiting_ack_.empty()) { |
| 187 sends_awaiting_ack_.front()->DispatchFatalError(); | 129 sends_awaiting_ack_.front()->DispatchFatalError(); |
| 188 sends_awaiting_ack_.pop(); | 130 sends_awaiting_ack_.pop(); |
| 189 } | 131 } |
| 190 RunCancelCallback(); | 132 RunCancelCallback(); |
| 191 } | 133 } |
| 192 | 134 |
| 193 DataSender::PendingSend::PendingSend(const base::StringPiece& data, | 135 DataSender::PendingSend::PendingSend(const base::StringPiece& data, |
| 194 const DataSentCallback& callback, | 136 const DataSentCallback& callback, |
| 195 const SendErrorCallback& error_callback, | 137 const SendErrorCallback& error_callback, |
| 196 int32_t fatal_error_value) | 138 DataSender* sender) |
| 197 : data_(data), | 139 : data_(data), |
| 198 callback_(callback), | 140 callback_(callback), |
| 199 error_callback_(error_callback), | 141 error_callback_(error_callback), |
| 200 fatal_error_value_(fatal_error_value), | 142 sender_(sender) { |
| 201 bytes_sent_(0), | |
| 202 bytes_acked_(0) { | |
| 203 } | 143 } |
| 204 | 144 |
| 205 bool DataSender::PendingSend::ReportBytesSent(uint32_t* num_bytes) { | 145 void DataSender::PendingSend::OnDataSent(uint32_t num_bytes, int32_t error) { |
| 206 ReportBytesSentInternal(num_bytes); | 146 if (error) { |
| 207 if (bytes_acked_ < data_.size()) | 147 base::MessageLoop::current()->PostTask( |
| 208 return false; | 148 FROM_HERE, base::Bind(error_callback_, num_bytes, error)); |
| 209 | 149 sender_->SendFailed(error); |
| 210 base::MessageLoop::current()->PostTask(FROM_HERE, | 150 } else { |
| 211 base::Bind(callback_, bytes_acked_)); | 151 DCHECK(num_bytes == data_.size()); |
| 212 return true; | |
| 213 } | |
| 214 | |
| 215 uint32_t DataSender::PendingSend::ReportBytesSentAndError(uint32_t* num_bytes, | |
| 216 int32_t error) { | |
| 217 ReportBytesSentInternal(num_bytes); | |
| 218 if (*num_bytes > 0) { | |
| 219 base::MessageLoop::current()->PostTask(FROM_HERE, | 152 base::MessageLoop::current()->PostTask(FROM_HERE, |
| 220 base::Bind(callback_, bytes_acked_)); | 153 base::Bind(callback_, num_bytes)); |
| 221 return 0; | 154 sender_->SendComplete(); |
| 222 } | 155 } |
| 223 base::MessageLoop::current()->PostTask( | |
| 224 FROM_HERE, base::Bind(error_callback_, bytes_acked_, error)); | |
| 225 return bytes_sent_ - bytes_acked_; | |
| 226 } | 156 } |
| 227 | 157 |
| 228 void DataSender::PendingSend::DispatchFatalError() { | 158 void DataSender::PendingSend::DispatchFatalError() { |
| 229 base::MessageLoop::current()->PostTask( | 159 base::MessageLoop::current()->PostTask( |
| 230 FROM_HERE, base::Bind(error_callback_, 0, fatal_error_value_)); | 160 FROM_HERE, base::Bind(error_callback_, 0, sender_->fatal_error_value_)); |
| 231 } | 161 } |
| 232 | 162 |
| 233 bool DataSender::PendingSend::SendData(serial::DataSink* sink, | 163 void DataSender::PendingSend::SendData() { |
| 234 uint32_t* available_buffer_size) { | 164 uint32_t num_bytes_to_send = static_cast<uint32_t>(data_.size()); |
| 235 uint32_t num_bytes_to_send = | |
| 236 std::min(static_cast<uint32_t>(data_.size() - bytes_sent_), | |
| 237 *available_buffer_size); | |
| 238 mojo::Array<uint8_t> bytes(num_bytes_to_send); | 165 mojo::Array<uint8_t> bytes(num_bytes_to_send); |
| 239 memcpy(&bytes[0], data_.data() + bytes_sent_, num_bytes_to_send); | 166 memcpy(&bytes[0], data_.data(), num_bytes_to_send); |
| 240 bytes_sent_ += num_bytes_to_send; | 167 sender_->sink_->OnData( |
| 241 *available_buffer_size -= num_bytes_to_send; | 168 bytes.Pass(), |
| 242 sink->OnData(bytes.Pass()); | 169 base::Bind(&DataSender::PendingSend::OnDataSent, base::Unretained(this))); |
| 243 return bytes_sent_ == data_.size(); | |
| 244 } | |
| 245 | |
| 246 void DataSender::PendingSend::ReportBytesSentInternal(uint32_t* num_bytes) { | |
| 247 bytes_acked_ += *num_bytes; | |
| 248 if (bytes_acked_ > bytes_sent_) { | |
| 249 *num_bytes = bytes_acked_ - bytes_sent_; | |
| 250 bytes_acked_ = bytes_sent_; | |
| 251 } else { | |
| 252 *num_bytes = 0; | |
| 253 } | |
| 254 } | 170 } |
| 255 | 171 |
| 256 } // namespace device | 172 } // namespace device |
| OLD | NEW |