| Index: device/serial/data_sender.cc
|
| diff --git a/device/serial/data_sender.cc b/device/serial/data_sender.cc
|
| index 409f5f449d1003f45cee750039634bc2e53f9876..18b1a43ef3357543a32a99bd16d95843262e0a35 100644
|
| --- a/device/serial/data_sender.cc
|
| +++ b/device/serial/data_sender.cc
|
| @@ -4,9 +4,10 @@
|
|
|
| #include "device/serial/data_sender.h"
|
|
|
| +#include <algorithm>
|
| +
|
| #include "base/bind.h"
|
| #include "base/message_loop/message_loop.h"
|
| -#include "device/serial/async_waiter.h"
|
|
|
| namespace device {
|
|
|
| @@ -34,10 +35,8 @@ class DataSender::PendingSend {
|
| // Reports |fatal_error_value_| to |receive_error_callback_|.
|
| void DispatchFatalError();
|
|
|
| - // Attempts to send any data not yet sent to |handle|. Returns MOJO_RESULT_OK
|
| - // if all data is sent, MOJO_RESULT_SHOULD_WAIT if not all of the data is sent
|
| - // or the error if one is encountered writing to |handle|.
|
| - MojoResult SendData(mojo::DataPipeProducerHandle handle);
|
| + // Attempts to send any data not yet sent to |sink|.
|
| + bool SendData(serial::DataSink* sink, uint32_t* available_buffer_size);
|
|
|
| private:
|
| // Invoked to update |bytes_acked_| and |num_bytes|.
|
| @@ -55,7 +54,7 @@ class DataSender::PendingSend {
|
| // The error value to report when DispatchFatalError() is called.
|
| const int32_t fatal_error_value_;
|
|
|
| - // The number of bytes sent to the data pipe.
|
| + // The number of bytes sent to the DataSink.
|
| uint32_t bytes_sent_;
|
|
|
| // The number of bytes acked.
|
| @@ -67,17 +66,11 @@ DataSender::DataSender(mojo::InterfacePtr<serial::DataSink> sink,
|
| int32_t fatal_error_value)
|
| : sink_(sink.Pass()),
|
| fatal_error_value_(fatal_error_value),
|
| + available_buffer_capacity_(buffer_size),
|
| shut_down_(false) {
|
| sink_.set_error_handler(this);
|
| - MojoCreateDataPipeOptions options = {
|
| - sizeof(options), MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1, buffer_size,
|
| - };
|
| - options.struct_size = sizeof(options);
|
| - mojo::ScopedDataPipeConsumerHandle remote_handle;
|
| - MojoResult result = mojo::CreateDataPipe(&options, &handle_, &remote_handle);
|
| - DCHECK_EQ(MOJO_RESULT_OK, result);
|
| - sink_->Init(remote_handle.Pass());
|
| sink_.set_client(this);
|
| + sink_->Init(buffer_size);
|
| }
|
|
|
| DataSender::~DataSender() {
|
| @@ -115,6 +108,7 @@ void DataSender::ReportBytesSent(uint32_t bytes_sent) {
|
| if (shut_down_)
|
| return;
|
|
|
| + available_buffer_capacity_ += bytes_sent;
|
| while (bytes_sent != 0 && !sends_awaiting_ack_.empty() &&
|
| sends_awaiting_ack_.front()->ReportBytesSent(&bytes_sent)) {
|
| sends_awaiting_ack_.pop();
|
| @@ -133,27 +127,29 @@ void DataSender::ReportBytesSent(uint32_t bytes_sent) {
|
| }
|
| if (pending_sends_.empty() && sends_awaiting_ack_.empty())
|
| RunCancelCallback();
|
| + SendInternal();
|
| }
|
|
|
| void DataSender::ReportBytesSentAndError(
|
| uint32_t bytes_sent,
|
| int32_t error,
|
| - const mojo::Callback<void(uint32_t)>& callback) {
|
| + const mojo::Callback<void()>& callback) {
|
| if (shut_down_)
|
| return;
|
|
|
| - uint32_t bytes_to_flush = 0;
|
| + available_buffer_capacity_ += bytes_sent;
|
| while (!sends_awaiting_ack_.empty()) {
|
| - bytes_to_flush += sends_awaiting_ack_.front()->ReportBytesSentAndError(
|
| - &bytes_sent, error);
|
| + available_buffer_capacity_ +=
|
| + sends_awaiting_ack_.front()->ReportBytesSentAndError(&bytes_sent,
|
| + error);
|
| sends_awaiting_ack_.pop();
|
| }
|
| while (!pending_sends_.empty()) {
|
| - bytes_to_flush +=
|
| + available_buffer_capacity_ +=
|
| pending_sends_.front()->ReportBytesSentAndError(&bytes_sent, error);
|
| pending_sends_.pop();
|
| }
|
| - callback.Run(bytes_to_flush);
|
| + callback.Run();
|
| RunCancelCallback();
|
| }
|
|
|
| @@ -162,33 +158,15 @@ void DataSender::OnConnectionError() {
|
| }
|
|
|
| void DataSender::SendInternal() {
|
| - while (!pending_sends_.empty()) {
|
| - MojoResult result = pending_sends_.front()->SendData(handle_.get());
|
| - if (result == MOJO_RESULT_OK) {
|
| + while (!pending_sends_.empty() && available_buffer_capacity_) {
|
| + if (pending_sends_.front()->SendData(sink_.get(),
|
| + &available_buffer_capacity_)) {
|
| sends_awaiting_ack_.push(pending_sends_.front());
|
| pending_sends_.pop();
|
| - } else if (result == MOJO_RESULT_SHOULD_WAIT) {
|
| - waiter_.reset(new AsyncWaiter(
|
| - handle_.get(),
|
| - MOJO_HANDLE_SIGNAL_WRITABLE,
|
| - base::Bind(&DataSender::OnDoneWaiting, base::Unretained(this))));
|
| - return;
|
| - } else {
|
| - ShutDown();
|
| - return;
|
| }
|
| }
|
| }
|
|
|
| -void DataSender::OnDoneWaiting(MojoResult result) {
|
| - waiter_.reset();
|
| - if (result != MOJO_RESULT_OK) {
|
| - ShutDown();
|
| - return;
|
| - }
|
| - SendInternal();
|
| -}
|
| -
|
| void DataSender::RunCancelCallback() {
|
| DCHECK(pending_sends_.empty() && sends_awaiting_ack_.empty());
|
| if (pending_cancel_.is_null())
|
| @@ -200,7 +178,6 @@ void DataSender::RunCancelCallback() {
|
| }
|
|
|
| void DataSender::ShutDown() {
|
| - waiter_.reset();
|
| shut_down_ = true;
|
| while (!pending_sends_.empty()) {
|
| pending_sends_.front()->DispatchFatalError();
|
| @@ -253,18 +230,17 @@ void DataSender::PendingSend::DispatchFatalError() {
|
| FROM_HERE, base::Bind(error_callback_, 0, fatal_error_value_));
|
| }
|
|
|
| -MojoResult DataSender::PendingSend::SendData(
|
| - mojo::DataPipeProducerHandle handle) {
|
| - uint32_t bytes_to_send = static_cast<uint32_t>(data_.size()) - bytes_sent_;
|
| - MojoResult result = mojo::WriteDataRaw(handle,
|
| - data_.data() + bytes_sent_,
|
| - &bytes_to_send,
|
| - MOJO_WRITE_DATA_FLAG_NONE);
|
| - if (result != MOJO_RESULT_OK)
|
| - return result;
|
| -
|
| - bytes_sent_ += bytes_to_send;
|
| - return bytes_sent_ == data_.size() ? MOJO_RESULT_OK : MOJO_RESULT_SHOULD_WAIT;
|
| +bool DataSender::PendingSend::SendData(serial::DataSink* sink,
|
| + uint32_t* available_buffer_size) {
|
| + uint32_t num_bytes_to_send =
|
| + std::min(static_cast<uint32_t>(data_.size() - bytes_sent_),
|
| + *available_buffer_size);
|
| + mojo::Array<uint8_t> bytes(num_bytes_to_send);
|
| + memcpy(&bytes[0], data_.data() + bytes_sent_, num_bytes_to_send);
|
| + bytes_sent_ += num_bytes_to_send;
|
| + *available_buffer_size -= num_bytes_to_send;
|
| + sink->OnData(bytes.Pass());
|
| + return bytes_sent_ == data_.size();
|
| }
|
|
|
| void DataSender::PendingSend::ReportBytesSentInternal(uint32_t* num_bytes) {
|
|
|