Chromium Code Reviews| Index: device/serial/data_source_sender.cc |
| diff --git a/device/serial/data_source_sender.cc b/device/serial/data_source_sender.cc |
| index 4eaee758b987b337710244dc38ec0ac24fb46821..d9372808478bf01a9a7c469e164d794d8c3a3338 100644 |
| --- a/device/serial/data_source_sender.cc |
| +++ b/device/serial/data_source_sender.cc |
| @@ -5,10 +5,10 @@ |
| #include "device/serial/data_source_sender.h" |
| #include <limits> |
| +#include <vector> |
| #include "base/bind.h" |
| #include "base/message_loop/message_loop.h" |
| -#include "device/serial/async_waiter.h" |
| namespace device { |
| @@ -19,7 +19,9 @@ class DataSourceSender::PendingSend { |
| // Asynchronously fills |data| with up to |num_bytes| of data. Following this, |
|
raymes
2014/10/17 04:04:45
|data| is gone!
Sam McNally
2014/10/20 05:12:59
Done.
|
| // one of Done() and DoneWithError() will be called with the result. |
| - void GetData(void* data, uint32_t num_bytes); |
| + void GetData(uint32_t num_bytes); |
| + |
| + void DispatchData(serial::DataSourceClient* client); |
|
raymes
2014/10/17 04:04:45
Please add a comment here.
Sam McNally
2014/10/20 05:12:59
Done.
|
| private: |
| class Buffer; |
| @@ -39,6 +41,8 @@ class DataSourceSender::PendingSend { |
| // Whether the buffer specified by GetData() has been passed to |callback_|, |
| // but has not yet called Done() or DoneWithError(). |
| bool buffer_in_use_; |
| + |
| + std::vector<char> data_; |
| }; |
| // A Writable implementation that provides a view of a data pipe owned by a |
| @@ -72,14 +76,14 @@ DataSourceSender::DataSourceSender(const ReadyCallback& ready_callback, |
| const ErrorCallback& error_callback) |
| : ready_callback_(ready_callback), |
| error_callback_(error_callback), |
| - bytes_sent_(0), |
| - shut_down_(false) { |
| + available_buffer_capacity_(0), |
| + shut_down_(false), |
| + weak_factory_(this) { |
| DCHECK(!ready_callback.is_null() && !error_callback.is_null()); |
| } |
| void DataSourceSender::ShutDown() { |
| shut_down_ = true; |
| - waiter_.reset(); |
| ready_callback_.Reset(); |
| error_callback_.Reset(); |
| } |
| @@ -88,70 +92,49 @@ DataSourceSender::~DataSourceSender() { |
| DCHECK(shut_down_); |
| } |
| -void DataSourceSender::Init(mojo::ScopedDataPipeProducerHandle handle) { |
| - // This should never occur. |handle_| is only valid and |pending_send_| is |
| - // only set after Init is called. |
| - if (pending_send_ || handle_.is_valid() || shut_down_) { |
| - DispatchFatalError(); |
| - return; |
| - } |
| - handle_ = handle.Pass(); |
| - pending_send_.reset(new PendingSend(this, ready_callback_)); |
| - StartWaiting(); |
| +void DataSourceSender::Init(uint32_t buffer_size) { |
| + available_buffer_capacity_ = buffer_size; |
| + GetMoreData(); |
| } |
| void DataSourceSender::Resume() { |
| - if (pending_send_ || !handle_.is_valid()) { |
| + if (pending_send_) { |
| DispatchFatalError(); |
| return; |
| } |
| - pending_send_.reset(new PendingSend(this, ready_callback_)); |
| - StartWaiting(); |
| + GetMoreData(); |
| } |
| -void DataSourceSender::OnConnectionError() { |
| - DispatchFatalError(); |
| +void DataSourceSender::AckData(uint32_t bytes_dispatched) { |
| + available_buffer_capacity_ += bytes_dispatched; |
| } |
| -void DataSourceSender::StartWaiting() { |
| - DCHECK(pending_send_ && !waiter_); |
| - waiter_.reset( |
| - new AsyncWaiter(handle_.get(), |
| - MOJO_HANDLE_SIGNAL_WRITABLE, |
| - base::Bind(&DataSourceSender::OnDoneWaiting, this))); |
| +void DataSourceSender::OnConnectionError() { |
| + DispatchFatalError(); |
| } |
| -void DataSourceSender::OnDoneWaiting(MojoResult result) { |
| - DCHECK(pending_send_ && !shut_down_ && waiter_); |
| - waiter_.reset(); |
| - if (result != MOJO_RESULT_OK) { |
| - DispatchFatalError(); |
| - return; |
| - } |
| - void* data = NULL; |
| - uint32_t num_bytes = std::numeric_limits<uint32_t>::max(); |
| - result = mojo::BeginWriteDataRaw( |
| - handle_.get(), &data, &num_bytes, MOJO_WRITE_DATA_FLAG_NONE); |
| - if (result != MOJO_RESULT_OK) { |
| - DispatchFatalError(); |
| - return; |
| - } |
| - pending_send_->GetData(static_cast<char*>(data), num_bytes); |
| +void DataSourceSender::GetMoreData() { |
| + DCHECK(!pending_send_ && !shut_down_); |
| + pending_send_.reset(new PendingSend(this, ready_callback_)); |
| + pending_send_->GetData(available_buffer_capacity_); |
|
raymes
2014/10/17 04:04:45
What if the available buffer capacity is 0? Don't
Sam McNally
2014/10/20 05:12:59
Done.
|
| } |
| void DataSourceSender::Done(uint32_t bytes_written) { |
| DoneInternal(bytes_written); |
| - if (!shut_down_) |
| - StartWaiting(); |
| + if (!shut_down_) { |
| + base::MessageLoop::current()->PostTask( |
| + FROM_HERE, |
| + base::Bind(&DataSourceSender::GetMoreData, weak_factory_.GetWeakPtr())); |
| + } |
| } |
| void DataSourceSender::DoneWithError(uint32_t bytes_written, int32_t error) { |
| DoneInternal(bytes_written); |
| pending_send_.reset(); |
| if (!shut_down_) |
| - client()->OnError(bytes_sent_, error); |
| - // We don't call StartWaiting here so we don't send any additional data until |
| + client()->OnError(error); |
| + // We don't call GetMoreData here so we don't send any additional data until |
| // Resume() is called. |
| } |
| @@ -160,12 +143,9 @@ void DataSourceSender::DoneInternal(uint32_t bytes_written) { |
| if (shut_down_) |
| return; |
| - bytes_sent_ += bytes_written; |
| - MojoResult result = mojo::EndWriteDataRaw(handle_.get(), bytes_written); |
| - if (result != MOJO_RESULT_OK) { |
| - DispatchFatalError(); |
| - return; |
| - } |
| + available_buffer_capacity_ -= bytes_written; |
| + pending_send_->DispatchData(client()); |
|
raymes
2014/10/17 04:04:45
Rather than passing the client to the pending send
Sam McNally
2014/10/20 05:12:59
Done.
|
| + pending_send_.reset(); |
| } |
| void DataSourceSender::DispatchFatalError() { |
| @@ -181,22 +161,37 @@ DataSourceSender::PendingSend::PendingSend(DataSourceSender* sender, |
| : sender_(sender), callback_(callback), buffer_in_use_(false) { |
| } |
| -void DataSourceSender::PendingSend::GetData(void* data, uint32_t num_bytes) { |
| +void DataSourceSender::PendingSend::GetData(uint32_t num_bytes) { |
| DCHECK(!buffer_in_use_); |
| buffer_in_use_ = true; |
| + data_.resize(num_bytes); |
| callback_.Run(scoped_ptr<WritableBuffer>( |
| - new Buffer(sender_, this, static_cast<char*>(data), num_bytes))); |
| + new Buffer(sender_, this, &data_[0], num_bytes))); |
|
raymes
2014/10/17 04:04:45
We should be careful of a 0 buffer size here.
Sam McNally
2014/10/20 05:12:59
Done.
|
| +} |
| + |
| +void DataSourceSender::PendingSend::DispatchData( |
| + serial::DataSourceClient* client) { |
| + if (data_.empty()) |
| + return; |
| + |
| + mojo::Array<uint8_t> data(data_.size()); |
| + memcpy(&data[0], reinterpret_cast<uint8_t*>(&data_[0]), data.size()); |
| + client->OnData(data.Pass()); |
| } |
| void DataSourceSender::PendingSend::Done(uint32_t bytes_written) { |
| DCHECK(buffer_in_use_); |
| + DCHECK_LE(bytes_written, data_.size()); |
| buffer_in_use_ = false; |
| + data_.resize(bytes_written); |
| sender_->Done(bytes_written); |
| } |
| void DataSourceSender::PendingSend::DoneWithError(uint32_t bytes_written, |
| int32_t error) { |
| DCHECK(buffer_in_use_); |
| + DCHECK_LE(bytes_written, data_.size()); |
| + data_.resize(bytes_written); |
| buffer_in_use_ = false; |
|
raymes
2014/10/17 04:04:45
nit: ordering of these 3 lines should be the same
Sam McNally
2014/10/20 05:12:59
Done.
|
| sender_->DoneWithError(bytes_written, error); |
| } |
| @@ -227,9 +222,10 @@ uint32_t DataSourceSender::PendingSend::Buffer::GetSize() { |
| void DataSourceSender::PendingSend::Buffer::Done(uint32_t bytes_written) { |
| DCHECK(sender_.get()); |
| - pending_send_->Done(bytes_written); |
| - sender_ = NULL; |
| - pending_send_ = NULL; |
| + PendingSend* send = pending_send_; |
| + pending_send_ = nullptr; |
| + sender_ = nullptr; |
| + send->Done(bytes_written); |
| buffer_ = NULL; |
| buffer_size_ = 0; |
| } |
| @@ -238,9 +234,10 @@ void DataSourceSender::PendingSend::Buffer::DoneWithError( |
| uint32_t bytes_written, |
| int32_t error) { |
| DCHECK(sender_.get()); |
| - pending_send_->DoneWithError(bytes_written, error); |
| - sender_ = NULL; |
| - pending_send_ = NULL; |
| + PendingSend* send = pending_send_; |
| + pending_send_ = nullptr; |
| + sender_ = nullptr; |
| + send->DoneWithError(bytes_written, error); |
| buffer_ = NULL; |
| buffer_size_ = 0; |
| } |