| Index: device/serial/data_source_sender.cc
|
| diff --git a/device/serial/data_source_sender.cc b/device/serial/data_source_sender.cc
|
| index 4eaee758b987b337710244dc38ec0ac24fb46821..ac568bd318a83fd528dbdaa84f41f9a21458c0ac 100644
|
| --- a/device/serial/data_source_sender.cc
|
| +++ b/device/serial/data_source_sender.cc
|
| @@ -4,11 +4,11 @@
|
|
|
| #include "device/serial/data_source_sender.h"
|
|
|
| +#include <algorithm>
|
| #include <limits>
|
|
|
| #include "base/bind.h"
|
| #include "base/message_loop/message_loop.h"
|
| -#include "device/serial/async_waiter.h"
|
|
|
| namespace device {
|
|
|
| @@ -17,9 +17,9 @@ class DataSourceSender::PendingSend {
|
| public:
|
| PendingSend(DataSourceSender* sender, const ReadyCallback& callback);
|
|
|
| - // Asynchronously fills |data| with up to |num_bytes| of data. Following this,
|
| - // one of Done() and DoneWithError() will be called with the result.
|
| - void GetData(void* data, uint32_t num_bytes);
|
| + // Asynchronously fills |data_| with up to |num_bytes| of data. Following
|
| + // this, one of Done() and DoneWithError() will be called with the result.
|
| + void GetData(uint32_t num_bytes);
|
|
|
| private:
|
| class Buffer;
|
| @@ -39,9 +39,12 @@ class DataSourceSender::PendingSend {
|
| // Whether the buffer specified by GetData() has been passed to |callback_|,
|
| // but has not yet called Done() or DoneWithError().
|
| bool buffer_in_use_;
|
| +
|
| + // The data obtained using |callback_| to be dispatched to the client.
|
| + std::vector<char> data_;
|
| };
|
|
|
| -// A Writable implementation that provides a view of a data pipe owned by a
|
| +// A Writable implementation that provides a view of a buffer owned by a
|
| // DataSourceSender.
|
| class DataSourceSender::PendingSend::Buffer : public WritableBuffer {
|
| public:
|
| @@ -58,7 +61,7 @@ class DataSourceSender::PendingSend::Buffer : public WritableBuffer {
|
| virtual void DoneWithError(uint32_t bytes_written, int32_t error) override;
|
|
|
| private:
|
| - // The DataSourceSender whose data pipe we are providing a view.
|
| + // The DataSourceSender of whose buffer we are providing a view.
|
| scoped_refptr<DataSourceSender> sender_;
|
|
|
| // The PendingSend to which this buffer has been created in response.
|
| @@ -72,14 +75,15 @@ DataSourceSender::DataSourceSender(const ReadyCallback& ready_callback,
|
| const ErrorCallback& error_callback)
|
| : ready_callback_(ready_callback),
|
| error_callback_(error_callback),
|
| - bytes_sent_(0),
|
| - shut_down_(false) {
|
| + available_buffer_capacity_(0),
|
| + paused_(false),
|
| + shut_down_(false),
|
| + weak_factory_(this) {
|
| DCHECK(!ready_callback.is_null() && !error_callback.is_null());
|
| }
|
|
|
| void DataSourceSender::ShutDown() {
|
| shut_down_ = true;
|
| - waiter_.reset();
|
| ready_callback_.Reset();
|
| error_callback_.Reset();
|
| }
|
| @@ -88,84 +92,70 @@ DataSourceSender::~DataSourceSender() {
|
| DCHECK(shut_down_);
|
| }
|
|
|
| -void DataSourceSender::Init(mojo::ScopedDataPipeProducerHandle handle) {
|
| - // This should never occur. |handle_| is only valid and |pending_send_| is
|
| - // only set after Init is called.
|
| - if (pending_send_ || handle_.is_valid() || shut_down_) {
|
| - DispatchFatalError();
|
| - return;
|
| - }
|
| - handle_ = handle.Pass();
|
| - pending_send_.reset(new PendingSend(this, ready_callback_));
|
| - StartWaiting();
|
| +void DataSourceSender::Init(uint32_t buffer_size) {
|
| + available_buffer_capacity_ = buffer_size;
|
| + GetMoreData();
|
| }
|
|
|
| void DataSourceSender::Resume() {
|
| - if (pending_send_ || !handle_.is_valid()) {
|
| + if (pending_send_) {
|
| DispatchFatalError();
|
| return;
|
| }
|
|
|
| - pending_send_.reset(new PendingSend(this, ready_callback_));
|
| - StartWaiting();
|
| + paused_ = false;
|
| + GetMoreData();
|
| }
|
|
|
| -void DataSourceSender::OnConnectionError() {
|
| - DispatchFatalError();
|
| +void DataSourceSender::ReportBytesSent(uint32_t bytes_sent) {
|
| + available_buffer_capacity_ += bytes_sent;
|
| + if (!pending_send_ && !paused_)
|
| + GetMoreData();
|
| }
|
|
|
| -void DataSourceSender::StartWaiting() {
|
| - DCHECK(pending_send_ && !waiter_);
|
| - waiter_.reset(
|
| - new AsyncWaiter(handle_.get(),
|
| - MOJO_HANDLE_SIGNAL_WRITABLE,
|
| - base::Bind(&DataSourceSender::OnDoneWaiting, this)));
|
| +void DataSourceSender::OnConnectionError() {
|
| + DispatchFatalError();
|
| }
|
|
|
| -void DataSourceSender::OnDoneWaiting(MojoResult result) {
|
| - DCHECK(pending_send_ && !shut_down_ && waiter_);
|
| - waiter_.reset();
|
| - if (result != MOJO_RESULT_OK) {
|
| - DispatchFatalError();
|
| +void DataSourceSender::GetMoreData() {
|
| + if (shut_down_ || paused_ || pending_send_ || !available_buffer_capacity_)
|
| return;
|
| - }
|
| - void* data = NULL;
|
| - uint32_t num_bytes = std::numeric_limits<uint32_t>::max();
|
| - result = mojo::BeginWriteDataRaw(
|
| - handle_.get(), &data, &num_bytes, MOJO_WRITE_DATA_FLAG_NONE);
|
| - if (result != MOJO_RESULT_OK) {
|
| - DispatchFatalError();
|
| - return;
|
| - }
|
| - pending_send_->GetData(static_cast<char*>(data), num_bytes);
|
| +
|
| + pending_send_.reset(new PendingSend(this, ready_callback_));
|
| + pending_send_->GetData(available_buffer_capacity_);
|
| }
|
|
|
| -void DataSourceSender::Done(uint32_t bytes_written) {
|
| - DoneInternal(bytes_written);
|
| - if (!shut_down_)
|
| - StartWaiting();
|
| +void DataSourceSender::Done(const std::vector<char>& data) {
|
| + DoneInternal(data);
|
| + if (!shut_down_ && available_buffer_capacity_) {
|
| + base::MessageLoop::current()->PostTask(
|
| + FROM_HERE,
|
| + base::Bind(&DataSourceSender::GetMoreData, weak_factory_.GetWeakPtr()));
|
| + }
|
| }
|
|
|
| -void DataSourceSender::DoneWithError(uint32_t bytes_written, int32_t error) {
|
| - DoneInternal(bytes_written);
|
| - pending_send_.reset();
|
| +void DataSourceSender::DoneWithError(const std::vector<char>& data,
|
| + int32_t error) {
|
| + DoneInternal(data);
|
| if (!shut_down_)
|
| - client()->OnError(bytes_sent_, error);
|
| - // We don't call StartWaiting here so we don't send any additional data until
|
| + client()->OnError(error);
|
| + paused_ = true;
|
| + // We don't call GetMoreData here so we don't send any additional data until
|
| // Resume() is called.
|
| }
|
|
|
| -void DataSourceSender::DoneInternal(uint32_t bytes_written) {
|
| +void DataSourceSender::DoneInternal(const std::vector<char>& data) {
|
| DCHECK(pending_send_);
|
| if (shut_down_)
|
| return;
|
|
|
| - bytes_sent_ += bytes_written;
|
| - MojoResult result = mojo::EndWriteDataRaw(handle_.get(), bytes_written);
|
| - if (result != MOJO_RESULT_OK) {
|
| - DispatchFatalError();
|
| - return;
|
| + available_buffer_capacity_ -= data.size();
|
| + if (!data.empty()) {
|
| + mojo::Array<uint8_t> data_to_send(data.size());
|
| + std::copy(data.begin(), data.end(), &data_to_send[0]);
|
| + client()->OnData(data_to_send.Pass());
|
| }
|
| + pending_send_.reset();
|
| }
|
|
|
| void DataSourceSender::DispatchFatalError() {
|
| @@ -181,24 +171,30 @@ DataSourceSender::PendingSend::PendingSend(DataSourceSender* sender,
|
| : sender_(sender), callback_(callback), buffer_in_use_(false) {
|
| }
|
|
|
| -void DataSourceSender::PendingSend::GetData(void* data, uint32_t num_bytes) {
|
| +void DataSourceSender::PendingSend::GetData(uint32_t num_bytes) {
|
| + DCHECK(num_bytes);
|
| DCHECK(!buffer_in_use_);
|
| buffer_in_use_ = true;
|
| + data_.resize(num_bytes);
|
| callback_.Run(scoped_ptr<WritableBuffer>(
|
| - new Buffer(sender_, this, static_cast<char*>(data), num_bytes)));
|
| + new Buffer(sender_, this, &data_[0], num_bytes)));
|
| }
|
|
|
| void DataSourceSender::PendingSend::Done(uint32_t bytes_written) {
|
| DCHECK(buffer_in_use_);
|
| + DCHECK_LE(bytes_written, data_.size());
|
| buffer_in_use_ = false;
|
| - sender_->Done(bytes_written);
|
| + data_.resize(bytes_written);
|
| + sender_->Done(data_);
|
| }
|
|
|
| void DataSourceSender::PendingSend::DoneWithError(uint32_t bytes_written,
|
| int32_t error) {
|
| DCHECK(buffer_in_use_);
|
| + DCHECK_LE(bytes_written, data_.size());
|
| buffer_in_use_ = false;
|
| - sender_->DoneWithError(bytes_written, error);
|
| + data_.resize(bytes_written);
|
| + sender_->DoneWithError(data_, error);
|
| }
|
|
|
| DataSourceSender::PendingSend::Buffer::Buffer(
|
| @@ -227,22 +223,20 @@ uint32_t DataSourceSender::PendingSend::Buffer::GetSize() {
|
|
|
| void DataSourceSender::PendingSend::Buffer::Done(uint32_t bytes_written) {
|
| DCHECK(sender_.get());
|
| - pending_send_->Done(bytes_written);
|
| - sender_ = NULL;
|
| - pending_send_ = NULL;
|
| - buffer_ = NULL;
|
| - buffer_size_ = 0;
|
| + PendingSend* send = pending_send_;
|
| + pending_send_ = nullptr;
|
| + sender_ = nullptr;
|
| + send->Done(bytes_written);
|
| }
|
|
|
| void DataSourceSender::PendingSend::Buffer::DoneWithError(
|
| uint32_t bytes_written,
|
| int32_t error) {
|
| DCHECK(sender_.get());
|
| - pending_send_->DoneWithError(bytes_written, error);
|
| - sender_ = NULL;
|
| - pending_send_ = NULL;
|
| - buffer_ = NULL;
|
| - buffer_size_ = 0;
|
| + PendingSend* send = pending_send_;
|
| + pending_send_ = nullptr;
|
| + sender_ = nullptr;
|
| + send->DoneWithError(bytes_written, error);
|
| }
|
|
|
| } // namespace device
|
|
|