| Index: device/serial/data_sink_receiver.cc
|
| diff --git a/device/serial/data_sink_receiver.cc b/device/serial/data_sink_receiver.cc
|
| index 3d52e48587932b58713f121ff9fa6a90da718d4b..170f8b2c5b052e80d1c9be8a945a2d3492da3326 100644
|
| --- a/device/serial/data_sink_receiver.cc
|
| +++ b/device/serial/data_sink_receiver.cc
|
| @@ -7,38 +7,12 @@
|
| #include <limits>
|
|
|
| #include "base/bind.h"
|
| -#include "device/serial/async_waiter.h"
|
| +#include "base/message_loop/message_loop.h"
|
|
|
| namespace device {
|
|
|
| -// Represents a flush of data that has not been completed.
|
| -class DataSinkReceiver::PendingFlush {
|
| - public:
|
| - PendingFlush();
|
| -
|
| - // Initializes this PendingFlush with |num_bytes|, the number of bytes to
|
| - // flush.
|
| - void SetNumBytesToFlush(uint32_t num_bytes);
|
| -
|
| - // Attempts to discard |bytes_to_flush_| bytes from |handle|. Returns
|
| - // MOJO_RESULT_OK on success, MOJO_RESULT_SHOULD_WAIT if fewer than
|
| - // |bytes_to_flush_| bytes were flushed or the error if one is encountered
|
| - // discarding data from |handle|.
|
| - MojoResult Flush(mojo::DataPipeConsumerHandle handle);
|
| -
|
| - // Whether this PendingFlush has received the number of bytes to flush.
|
| - bool received_flush() { return received_flush_; }
|
| -
|
| - private:
|
| - // Whether this PendingFlush has received the number of bytes to flush.
|
| - bool received_flush_;
|
| -
|
| - // The remaining number of bytes to flush.
|
| - uint32_t bytes_to_flush_;
|
| -};
|
| -
|
| -// A ReadOnlyBuffer implementation that provides a view of a data pipe owned by
|
| -// a DataSinkReceiver.
|
| +// A ReadOnlyBuffer implementation that provides a view of a buffer owned by a
|
| +// DataSinkReceiver.
|
| class DataSinkReceiver::Buffer : public ReadOnlyBuffer {
|
| public:
|
| Buffer(scoped_refptr<DataSinkReceiver> receiver,
|
| @@ -55,7 +29,7 @@ class DataSinkReceiver::Buffer : public ReadOnlyBuffer {
|
| void DoneWithError(uint32_t bytes_read, int32_t error) override;
|
|
|
| private:
|
| - // The DataSinkReceiver whose data pipe we are providing a view.
|
| + // The DataSinkReceiver of whose buffer we are providing a view.
|
| scoped_refptr<DataSinkReceiver> receiver_;
|
|
|
| const char* buffer_;
|
| @@ -68,34 +42,53 @@ class DataSinkReceiver::Buffer : public ReadOnlyBuffer {
|
| int32_t cancellation_error_;
|
| };
|
|
|
| +// A frame of data received from the client.
|
| +class DataSinkReceiver::DataFrame {
|
| + public:
|
| + explicit DataFrame(mojo::Array<uint8_t> data);
|
| +
|
| + // Returns the number of uncomsumed bytes remaining of this data frame.
|
| + uint32_t GetRemainingBytes();
|
| +
|
| + // Returns a pointer to the remaining data to be consumed.
|
| + const char* GetData();
|
| +
|
| + // Reports that |bytes_read| bytes have been consumed.
|
| + void OnDataConsumed(uint32_t bytes_read);
|
| +
|
| + private:
|
| + mojo::Array<uint8_t> data_;
|
| + uint32_t offset_;
|
| +};
|
| +
|
| DataSinkReceiver::DataSinkReceiver(const ReadyCallback& ready_callback,
|
| const CancelCallback& cancel_callback,
|
| const ErrorCallback& error_callback)
|
| : ready_callback_(ready_callback),
|
| cancel_callback_(cancel_callback),
|
| error_callback_(error_callback),
|
| + flush_pending_(false),
|
| buffer_in_use_(NULL),
|
| + initialized_(false),
|
| + available_buffer_capacity_(0),
|
| shut_down_(false),
|
| weak_factory_(this) {
|
| }
|
|
|
| void DataSinkReceiver::ShutDown() {
|
| shut_down_ = true;
|
| - if (waiter_)
|
| - waiter_.reset();
|
| }
|
|
|
| DataSinkReceiver::~DataSinkReceiver() {
|
| }
|
|
|
| -void DataSinkReceiver::Init(mojo::ScopedDataPipeConsumerHandle handle) {
|
| - if (handle_.is_valid()) {
|
| - DispatchFatalError();
|
| +void DataSinkReceiver::Init(uint32_t buffer_size) {
|
| + if (initialized_) {
|
| + ShutDown();
|
| return;
|
| }
|
| -
|
| - handle_ = handle.Pass();
|
| - StartWaiting();
|
| + initialized_ = true;
|
| + available_buffer_capacity_ = buffer_size;
|
| }
|
|
|
| void DataSinkReceiver::Cancel(int32_t error) {
|
| @@ -103,7 +96,7 @@ void DataSinkReceiver::Cancel(int32_t error) {
|
| // response, that ReportBytesSentAndError message will appear to the
|
| // DataSinkClient to be caused by this Cancel message. In that case, we ignore
|
| // the cancel.
|
| - if (!pending_flushes_.empty() && !pending_flushes_.back()->received_flush())
|
| + if (flush_pending_)
|
| return;
|
|
|
| // If there is a buffer is in use, mark the buffer as cancelled and notify the
|
| @@ -120,54 +113,38 @@ void DataSinkReceiver::Cancel(int32_t error) {
|
| cancel_callback_.Run(error);
|
| return;
|
| }
|
| - // If there is no buffer in use, immediately report the error and cancel the
|
| - // waiting for the data pipe if one exists. This transitions straight into the
|
| - // state after the sink has returned an error.
|
| - waiter_.reset();
|
| ReportBytesSentAndError(0, error);
|
| }
|
|
|
| -void DataSinkReceiver::OnConnectionError() {
|
| - DispatchFatalError();
|
| -}
|
| -
|
| -void DataSinkReceiver::StartWaiting() {
|
| - DCHECK(!waiter_ && !shut_down_);
|
| - waiter_.reset(
|
| - new AsyncWaiter(handle_.get(),
|
| - MOJO_HANDLE_SIGNAL_READABLE,
|
| - base::Bind(&DataSinkReceiver::OnDoneWaiting, this)));
|
| -}
|
| -
|
| -void DataSinkReceiver::OnDoneWaiting(MojoResult result) {
|
| - DCHECK(waiter_ && !shut_down_);
|
| - waiter_.reset();
|
| - if (result != MOJO_RESULT_OK) {
|
| - DispatchFatalError();
|
| +void DataSinkReceiver::OnData(mojo::Array<uint8_t> data) {
|
| + if (!initialized_) {
|
| + ShutDown();
|
| return;
|
| }
|
| - // If there are any queued flushes (from ReportBytesSentAndError()), let them
|
| - // flush data from the data pipe.
|
| - if (!pending_flushes_.empty()) {
|
| - MojoResult result = pending_flushes_.front()->Flush(handle_.get());
|
| - if (result == MOJO_RESULT_OK) {
|
| - pending_flushes_.pop();
|
| - } else if (result != MOJO_RESULT_SHOULD_WAIT) {
|
| - DispatchFatalError();
|
| - return;
|
| - }
|
| - StartWaiting();
|
| + if (data.size() > available_buffer_capacity_) {
|
| + ShutDown();
|
| return;
|
| }
|
| - const void* data = NULL;
|
| - uint32_t num_bytes = std::numeric_limits<uint32_t>::max();
|
| - result = mojo::BeginReadDataRaw(
|
| - handle_.get(), &data, &num_bytes, MOJO_READ_DATA_FLAG_NONE);
|
| - if (result != MOJO_RESULT_OK) {
|
| - DispatchFatalError();
|
| + available_buffer_capacity_ -= static_cast<uint32_t>(data.size());
|
| + pending_data_buffers_.push(linked_ptr<DataFrame>(new DataFrame(data.Pass())));
|
| + if (!buffer_in_use_ && !flush_pending_)
|
| + RunReadyCallback();
|
| +}
|
| +
|
| +void DataSinkReceiver::OnConnectionError() {
|
| + DispatchFatalError();
|
| +}
|
| +
|
| +void DataSinkReceiver::RunReadyCallback() {
|
| + DCHECK(!shut_down_ && !flush_pending_);
|
| + // If data arrives while a call to RunReadyCallback() is posted, we can be
|
| + // called with buffer_in_use_ already set.
|
| + if (buffer_in_use_)
|
| return;
|
| - }
|
| - buffer_in_use_ = new Buffer(this, static_cast<const char*>(data), num_bytes);
|
| + buffer_in_use_ =
|
| + new Buffer(this,
|
| + pending_data_buffers_.front()->GetData(),
|
| + pending_data_buffers_.front()->GetRemainingBytes());
|
| ready_callback_.Run(scoped_ptr<ReadOnlyBuffer>(buffer_in_use_));
|
| }
|
|
|
| @@ -175,7 +152,12 @@ void DataSinkReceiver::Done(uint32_t bytes_read) {
|
| if (!DoneInternal(bytes_read))
|
| return;
|
| client()->ReportBytesSent(bytes_read);
|
| - StartWaiting();
|
| + if (!pending_data_buffers_.empty()) {
|
| + base::MessageLoop::current()->PostTask(
|
| + FROM_HERE,
|
| + base::Bind(&DataSinkReceiver::RunReadyCallback,
|
| + weak_factory_.GetWeakPtr()));
|
| + }
|
| }
|
|
|
| void DataSinkReceiver::DoneWithError(uint32_t bytes_read, int32_t error) {
|
| @@ -190,35 +172,32 @@ bool DataSinkReceiver::DoneInternal(uint32_t bytes_read) {
|
|
|
| DCHECK(buffer_in_use_);
|
| buffer_in_use_ = NULL;
|
| - MojoResult result = mojo::EndReadDataRaw(handle_.get(), bytes_read);
|
| - if (result != MOJO_RESULT_OK) {
|
| - DispatchFatalError();
|
| - return false;
|
| - }
|
| + available_buffer_capacity_ += bytes_read;
|
| + pending_data_buffers_.front()->OnDataConsumed(bytes_read);
|
| + if (pending_data_buffers_.front()->GetRemainingBytes() == 0)
|
| + pending_data_buffers_.pop();
|
| return true;
|
| }
|
|
|
| void DataSinkReceiver::ReportBytesSentAndError(uint32_t bytes_read,
|
| int32_t error) {
|
| - // When we encounter an error, we must discard the data from any sends already
|
| - // in the data pipe before we can resume dispatching data to the sink. We add
|
| - // a pending flush here. The response containing the number of bytes to flush
|
| - // is handled in SetNumBytesToFlush(). The actual flush is handled in
|
| - // OnDoneWaiting().
|
| - pending_flushes_.push(linked_ptr<PendingFlush>(new PendingFlush()));
|
| + // When we encounter an error, we must discard the data from any send buffers
|
| + // transmitted by the DataSinkClient before it receives this error.
|
| + flush_pending_ = true;
|
| client()->ReportBytesSentAndError(
|
| bytes_read,
|
| error,
|
| - base::Bind(&DataSinkReceiver::SetNumBytesToFlush,
|
| - weak_factory_.GetWeakPtr()));
|
| + base::Bind(&DataSinkReceiver::DoFlush, weak_factory_.GetWeakPtr()));
|
| }
|
|
|
| -void DataSinkReceiver::SetNumBytesToFlush(uint32_t bytes_to_flush) {
|
| - DCHECK(!pending_flushes_.empty());
|
| - DCHECK(!pending_flushes_.back()->received_flush());
|
| - pending_flushes_.back()->SetNumBytesToFlush(bytes_to_flush);
|
| - if (!waiter_)
|
| - StartWaiting();
|
| +void DataSinkReceiver::DoFlush() {
|
| + DCHECK(flush_pending_);
|
| + flush_pending_ = false;
|
| + while (!pending_data_buffers_.empty()) {
|
| + available_buffer_capacity_ +=
|
| + pending_data_buffers_.front()->GetRemainingBytes();
|
| + pending_data_buffers_.pop();
|
| + }
|
| }
|
|
|
| void DataSinkReceiver::DispatchFatalError() {
|
| @@ -263,44 +242,44 @@ uint32_t DataSinkReceiver::Buffer::GetSize() {
|
| }
|
|
|
| void DataSinkReceiver::Buffer::Done(uint32_t bytes_read) {
|
| + scoped_refptr<DataSinkReceiver> receiver = receiver_;
|
| + receiver_ = nullptr;
|
| if (cancelled_)
|
| - receiver_->DoneWithError(bytes_read, cancellation_error_);
|
| + receiver->DoneWithError(bytes_read, cancellation_error_);
|
| else
|
| - receiver_->Done(bytes_read);
|
| - receiver_ = NULL;
|
| + receiver->Done(bytes_read);
|
| buffer_ = NULL;
|
| buffer_size_ = 0;
|
| }
|
|
|
| void DataSinkReceiver::Buffer::DoneWithError(uint32_t bytes_read,
|
| int32_t error) {
|
| - receiver_->DoneWithError(bytes_read, error);
|
| - receiver_ = NULL;
|
| + scoped_refptr<DataSinkReceiver> receiver = receiver_;
|
| + receiver_ = nullptr;
|
| + receiver->DoneWithError(bytes_read, error);
|
| buffer_ = NULL;
|
| buffer_size_ = 0;
|
| }
|
|
|
| -DataSinkReceiver::PendingFlush::PendingFlush()
|
| - : received_flush_(false), bytes_to_flush_(0) {
|
| +DataSinkReceiver::DataFrame::DataFrame(mojo::Array<uint8_t> data)
|
| + : data_(data.Pass()), offset_(0) {
|
| + DCHECK_LT(0u, data_.size());
|
| +}
|
| +
|
| +// Returns the number of uncomsumed bytes remaining of this data frame.
|
| +uint32_t DataSinkReceiver::DataFrame::GetRemainingBytes() {
|
| + return static_cast<uint32_t>(data_.size() - offset_);
|
| }
|
|
|
| -void DataSinkReceiver::PendingFlush::SetNumBytesToFlush(uint32_t num_bytes) {
|
| - DCHECK(!received_flush_);
|
| - received_flush_ = true;
|
| - bytes_to_flush_ = num_bytes;
|
| +// Returns a pointer to the remaining data to be consumed.
|
| +const char* DataSinkReceiver::DataFrame::GetData() {
|
| + DCHECK_LT(offset_, data_.size());
|
| + return reinterpret_cast<const char*>(&data_[0]) + offset_;
|
| }
|
|
|
| -MojoResult DataSinkReceiver::PendingFlush::Flush(
|
| - mojo::DataPipeConsumerHandle handle) {
|
| - DCHECK(received_flush_);
|
| - uint32_t num_bytes = bytes_to_flush_;
|
| - MojoResult result =
|
| - mojo::ReadDataRaw(handle, NULL, &num_bytes, MOJO_READ_DATA_FLAG_DISCARD);
|
| - if (result != MOJO_RESULT_OK)
|
| - return result;
|
| - DCHECK(num_bytes <= bytes_to_flush_);
|
| - bytes_to_flush_ -= num_bytes;
|
| - return bytes_to_flush_ == 0 ? MOJO_RESULT_OK : MOJO_RESULT_SHOULD_WAIT;
|
| +void DataSinkReceiver::DataFrame::OnDataConsumed(uint32_t bytes_read) {
|
| + offset_ += bytes_read;
|
| + DCHECK_LE(offset_, data_.size());
|
| }
|
|
|
| } // namespace device
|
|
|