| Index: device/serial/data_sender.cc
|
| diff --git a/device/serial/data_sender.cc b/device/serial/data_sender.cc
|
| index 18b1a43ef3357543a32a99bd16d95843262e0a35..dbd17760161104010d90dd336ea33940c40e496c 100644
|
| --- a/device/serial/data_sender.cc
|
| +++ b/device/serial/data_sender.cc
|
| @@ -17,30 +17,21 @@ class DataSender::PendingSend {
|
| PendingSend(const base::StringPiece& data,
|
| const DataSentCallback& callback,
|
| const SendErrorCallback& error_callback,
|
| - int32_t fatal_error_value);
|
| -
|
| - // Invoked to report that |num_bytes| of data have been sent. Subtracts the
|
| - // number of bytes that were part of this send from |num_bytes|. Returns
|
| - // whether this send has been completed. If this send has been completed, this
|
| - // calls |callback_|.
|
| - bool ReportBytesSent(uint32_t* num_bytes);
|
| -
|
| - // Invoked to report that |num_bytes| of data have been sent and then an
|
| - // error, |error| was encountered. Subtracts the number of bytes that were
|
| - // part of this send from |num_bytes|. If this send was not completed before
|
| - // the error, this calls |error_callback_| to report the error. Otherwise,
|
| - // this calls |callback_|. Returns the number of bytes sent but not acked.
|
| - uint32_t ReportBytesSentAndError(uint32_t* num_bytes, int32_t error);
|
| + DataSender* sender);
|
|
|
| // Reports |fatal_error_value_| to |receive_error_callback_|.
|
| void DispatchFatalError();
|
|
|
| // Attempts to send any data not yet sent to |sink|.
|
| - bool SendData(serial::DataSink* sink, uint32_t* available_buffer_size);
|
| + void SendData();
|
|
|
| private:
|
| - // Invoked to update |bytes_acked_| and |num_bytes|.
|
| - void ReportBytesSentInternal(uint32_t* num_bytes);
|
| + // Invoked to report that |num_bytes| of data have been sent and then an
|
| + // error, |error| was encountered. Subtracts the number of bytes that were
|
| + // part of this send from |num_bytes|. If this send was not completed before
|
| + // the error, this calls |error_callback_| to report the error. Otherwise,
|
| + // this calls |callback_|. Returns the number of bytes sent but not acked.
|
| + void OnDataSent(uint32_t num_bytes, int32_t error);
|
|
|
| // The data to send.
|
| const base::StringPiece data_;
|
| @@ -51,14 +42,8 @@ class DataSender::PendingSend {
|
| // The callback to report errors.
|
| const SendErrorCallback error_callback_;
|
|
|
| - // The error value to report when DispatchFatalError() is called.
|
| - const int32_t fatal_error_value_;
|
| -
|
| - // The number of bytes sent to the DataSink.
|
| - uint32_t bytes_sent_;
|
| -
|
| - // The number of bytes acked.
|
| - uint32_t bytes_acked_;
|
| + // The DataSender that owns this PendingSend.
|
| + DataSender* sender_;
|
| };
|
|
|
| DataSender::DataSender(mojo::InterfacePtr<serial::DataSink> sink,
|
| @@ -66,11 +51,8 @@ DataSender::DataSender(mojo::InterfacePtr<serial::DataSink> sink,
|
| int32_t fatal_error_value)
|
| : sink_(sink.Pass()),
|
| fatal_error_value_(fatal_error_value),
|
| - available_buffer_capacity_(buffer_size),
|
| shut_down_(false) {
|
| sink_.set_error_handler(this);
|
| - sink_.set_client(this);
|
| - sink_->Init(buffer_size);
|
| }
|
|
|
| DataSender::~DataSender() {
|
| @@ -84,9 +66,10 @@ bool DataSender::Send(const base::StringPiece& data,
|
| if (!pending_cancel_.is_null() || shut_down_)
|
| return false;
|
|
|
| - pending_sends_.push(linked_ptr<PendingSend>(
|
| - new PendingSend(data, callback, error_callback, fatal_error_value_)));
|
| - SendInternal();
|
| + linked_ptr<PendingSend> pending_send(
|
| + new PendingSend(data, callback, error_callback, this));
|
| + pending_send->SendData();
|
| + sends_awaiting_ack_.push(pending_send);
|
| return true;
|
| }
|
|
|
| @@ -94,7 +77,7 @@ bool DataSender::Cancel(int32_t error, const CancelCallback& callback) {
|
| DCHECK(!callback.is_null());
|
| if (!pending_cancel_.is_null() || shut_down_)
|
| return false;
|
| - if (pending_sends_.empty() && sends_awaiting_ack_.empty()) {
|
| + if (sends_awaiting_ack_.empty()) {
|
| base::MessageLoop::current()->PostTask(FROM_HERE, callback);
|
| return true;
|
| }
|
| @@ -104,52 +87,25 @@ bool DataSender::Cancel(int32_t error, const CancelCallback& callback) {
|
| return true;
|
| }
|
|
|
| -void DataSender::ReportBytesSent(uint32_t bytes_sent) {
|
| +void DataSender::SendComplete() {
|
| if (shut_down_)
|
| return;
|
|
|
| - available_buffer_capacity_ += bytes_sent;
|
| - while (bytes_sent != 0 && !sends_awaiting_ack_.empty() &&
|
| - sends_awaiting_ack_.front()->ReportBytesSent(&bytes_sent)) {
|
| - sends_awaiting_ack_.pop();
|
| - }
|
| - if (bytes_sent > 0 && !pending_sends_.empty()) {
|
| - bool finished = pending_sends_.front()->ReportBytesSent(&bytes_sent);
|
| - DCHECK(!finished);
|
| - if (finished) {
|
| - ShutDown();
|
| - return;
|
| - }
|
| - }
|
| - if (bytes_sent != 0) {
|
| - ShutDown();
|
| - return;
|
| - }
|
| - if (pending_sends_.empty() && sends_awaiting_ack_.empty())
|
| + DCHECK(!sends_awaiting_ack_.empty());
|
| + sends_awaiting_ack_.pop();
|
| + if (sends_awaiting_ack_.empty())
|
| RunCancelCallback();
|
| - SendInternal();
|
| }
|
|
|
| -void DataSender::ReportBytesSentAndError(
|
| - uint32_t bytes_sent,
|
| - int32_t error,
|
| - const mojo::Callback<void()>& callback) {
|
| +void DataSender::SendFailed(int32_t error) {
|
| if (shut_down_)
|
| return;
|
|
|
| - available_buffer_capacity_ += bytes_sent;
|
| - while (!sends_awaiting_ack_.empty()) {
|
| - available_buffer_capacity_ +=
|
| - sends_awaiting_ack_.front()->ReportBytesSentAndError(&bytes_sent,
|
| - error);
|
| - sends_awaiting_ack_.pop();
|
| - }
|
| - while (!pending_sends_.empty()) {
|
| - available_buffer_capacity_ +=
|
| - pending_sends_.front()->ReportBytesSentAndError(&bytes_sent, error);
|
| - pending_sends_.pop();
|
| - }
|
| - callback.Run();
|
| + DCHECK(!sends_awaiting_ack_.empty());
|
| + sends_awaiting_ack_.pop();
|
| + if (!sends_awaiting_ack_.empty())
|
| + return;
|
| + sink_->ClearError();
|
| RunCancelCallback();
|
| }
|
|
|
| @@ -157,18 +113,8 @@ void DataSender::OnConnectionError() {
|
| ShutDown();
|
| }
|
|
|
| -void DataSender::SendInternal() {
|
| - while (!pending_sends_.empty() && available_buffer_capacity_) {
|
| - if (pending_sends_.front()->SendData(sink_.get(),
|
| - &available_buffer_capacity_)) {
|
| - sends_awaiting_ack_.push(pending_sends_.front());
|
| - pending_sends_.pop();
|
| - }
|
| - }
|
| -}
|
| -
|
| void DataSender::RunCancelCallback() {
|
| - DCHECK(pending_sends_.empty() && sends_awaiting_ack_.empty());
|
| + DCHECK(sends_awaiting_ack_.empty());
|
| if (pending_cancel_.is_null())
|
| return;
|
|
|
| @@ -179,10 +125,6 @@ void DataSender::RunCancelCallback() {
|
|
|
| void DataSender::ShutDown() {
|
| shut_down_ = true;
|
| - while (!pending_sends_.empty()) {
|
| - pending_sends_.front()->DispatchFatalError();
|
| - pending_sends_.pop();
|
| - }
|
| while (!sends_awaiting_ack_.empty()) {
|
| sends_awaiting_ack_.front()->DispatchFatalError();
|
| sends_awaiting_ack_.pop();
|
| @@ -193,64 +135,38 @@ void DataSender::ShutDown() {
|
| DataSender::PendingSend::PendingSend(const base::StringPiece& data,
|
| const DataSentCallback& callback,
|
| const SendErrorCallback& error_callback,
|
| - int32_t fatal_error_value)
|
| + DataSender* sender)
|
| : data_(data),
|
| callback_(callback),
|
| error_callback_(error_callback),
|
| - fatal_error_value_(fatal_error_value),
|
| - bytes_sent_(0),
|
| - bytes_acked_(0) {
|
| -}
|
| -
|
| -bool DataSender::PendingSend::ReportBytesSent(uint32_t* num_bytes) {
|
| - ReportBytesSentInternal(num_bytes);
|
| - if (bytes_acked_ < data_.size())
|
| - return false;
|
| -
|
| - base::MessageLoop::current()->PostTask(FROM_HERE,
|
| - base::Bind(callback_, bytes_acked_));
|
| - return true;
|
| + sender_(sender) {
|
| }
|
|
|
| -uint32_t DataSender::PendingSend::ReportBytesSentAndError(uint32_t* num_bytes,
|
| - int32_t error) {
|
| - ReportBytesSentInternal(num_bytes);
|
| - if (*num_bytes > 0) {
|
| +void DataSender::PendingSend::OnDataSent(uint32_t num_bytes, int32_t error) {
|
| + if (error) {
|
| + base::MessageLoop::current()->PostTask(
|
| + FROM_HERE, base::Bind(error_callback_, num_bytes, error));
|
| + sender_->SendFailed(error);
|
| + } else {
|
| + DCHECK(num_bytes == data_.size());
|
| base::MessageLoop::current()->PostTask(FROM_HERE,
|
| - base::Bind(callback_, bytes_acked_));
|
| - return 0;
|
| + base::Bind(callback_, num_bytes));
|
| + sender_->SendComplete();
|
| }
|
| - base::MessageLoop::current()->PostTask(
|
| - FROM_HERE, base::Bind(error_callback_, bytes_acked_, error));
|
| - return bytes_sent_ - bytes_acked_;
|
| }
|
|
|
| void DataSender::PendingSend::DispatchFatalError() {
|
| base::MessageLoop::current()->PostTask(
|
| - FROM_HERE, base::Bind(error_callback_, 0, fatal_error_value_));
|
| + FROM_HERE, base::Bind(error_callback_, 0, sender_->fatal_error_value_));
|
| }
|
|
|
| -bool DataSender::PendingSend::SendData(serial::DataSink* sink,
|
| - uint32_t* available_buffer_size) {
|
| - uint32_t num_bytes_to_send =
|
| - std::min(static_cast<uint32_t>(data_.size() - bytes_sent_),
|
| - *available_buffer_size);
|
| +void DataSender::PendingSend::SendData() {
|
| + uint32_t num_bytes_to_send = static_cast<uint32_t>(data_.size());
|
| mojo::Array<uint8_t> bytes(num_bytes_to_send);
|
| - memcpy(&bytes[0], data_.data() + bytes_sent_, num_bytes_to_send);
|
| - bytes_sent_ += num_bytes_to_send;
|
| - *available_buffer_size -= num_bytes_to_send;
|
| - sink->OnData(bytes.Pass());
|
| - return bytes_sent_ == data_.size();
|
| -}
|
| -
|
| -void DataSender::PendingSend::ReportBytesSentInternal(uint32_t* num_bytes) {
|
| - bytes_acked_ += *num_bytes;
|
| - if (bytes_acked_ > bytes_sent_) {
|
| - *num_bytes = bytes_acked_ - bytes_sent_;
|
| - bytes_acked_ = bytes_sent_;
|
| - } else {
|
| - *num_bytes = 0;
|
| - }
|
| + memcpy(&bytes[0], data_.data(), num_bytes_to_send);
|
| + sender_->sink_->OnData(
|
| + bytes.Pass(),
|
| + base::Bind(&DataSender::PendingSend::OnDataSent, base::Unretained(this)));
|
| }
|
|
|
| } // namespace device
|
|
|