Chromium Code Reviews| Index: device/serial/data_sink_receiver.cc |
| diff --git a/device/serial/data_sink_receiver.cc b/device/serial/data_sink_receiver.cc |
| index 767f036ba4eb60c351a5c1f79e6c6d863ffb3615..55e53f72460c38269c13d1d675403deb2c750f49 100644 |
| --- a/device/serial/data_sink_receiver.cc |
| +++ b/device/serial/data_sink_receiver.cc |
| @@ -7,38 +7,12 @@ |
| #include <limits> |
| #include "base/bind.h" |
| -#include "device/serial/async_waiter.h" |
| +#include "base/message_loop/message_loop.h" |
| namespace device { |
| -// Represents a flush of data that has not been completed. |
| -class DataSinkReceiver::PendingFlush { |
| - public: |
| - PendingFlush(); |
| - |
| - // Initializes this PendingFlush with |num_bytes|, the number of bytes to |
| - // flush. |
| - void SetNumBytesToFlush(uint32_t num_bytes); |
| - |
| - // Attempts to discard |bytes_to_flush_| bytes from |handle|. Returns |
| - // MOJO_RESULT_OK on success, MOJO_RESULT_SHOULD_WAIT if fewer than |
| - // |bytes_to_flush_| bytes were flushed or the error if one is encountered |
| - // discarding data from |handle|. |
| - MojoResult Flush(mojo::DataPipeConsumerHandle handle); |
| - |
| - // Whether this PendingFlush has received the number of bytes to flush. |
| - bool received_flush() { return received_flush_; } |
| - |
| - private: |
| - // Whether this PendingFlush has received the number of bytes to flush. |
| - bool received_flush_; |
| - |
| - // The remaining number of bytes to flush. |
| - uint32_t bytes_to_flush_; |
| -}; |
| - |
| -// A ReadOnlyBuffer implementation that provides a view of a data pipe owned by |
| -// a DataSinkReceiver. |
| +// A ReadOnlyBuffer implementation that provides a view of a buffer owned by a |
| +// DataSinkReceiver. |
| class DataSinkReceiver::Buffer : public ReadOnlyBuffer { |
| public: |
| Buffer(scoped_refptr<DataSinkReceiver> receiver, |
| @@ -55,7 +29,7 @@ class DataSinkReceiver::Buffer : public ReadOnlyBuffer { |
| virtual void DoneWithError(uint32_t bytes_read, int32_t error) override; |
| private: |
| - // The DataSinkReceiver whose data pipe we are providing a view. |
| + // The DataSinkReceiver of whose buffer we are providing a view. |
| scoped_refptr<DataSinkReceiver> receiver_; |
| const char* buffer_; |
| @@ -68,34 +42,58 @@ class DataSinkReceiver::Buffer : public ReadOnlyBuffer { |
| int32_t cancellation_error_; |
| }; |
| +// A frame of data received from the client. |
| +class DataSinkReceiver::DataFrame { |
| + public: |
| + explicit DataFrame(mojo::Array<uint8_t> data); |
| + |
| + // Returns the number of uncomsumed bytes remaining of this data frame. |
| + uint32_t remaining_bytes() { |
| + return static_cast<uint32_t>(data_.size() - offset_); |
| + } |
| + |
| + // Returns a pointer to the remaining data to be consumed. |
| + const char* data() { |
| + DCHECK_LT(offset_, data_.size()); |
| + return reinterpret_cast<const char*>(&data_[0]) + offset_; |
| + } |
|
raymes
2014/10/27 03:02:22
nit: since we're not inlining OnDataConsumed we mi
Sam McNally
2014/10/27 05:39:14
Done.
|
| + |
| + // Reports that |bytes_read| bytes have been consumed. |
| + void OnDataConsumed(uint32_t bytes_read); |
| + |
| + private: |
| + mojo::Array<uint8_t> data_; |
| + uint32_t offset_; |
| +}; |
| + |
| DataSinkReceiver::DataSinkReceiver(const ReadyCallback& ready_callback, |
| const CancelCallback& cancel_callback, |
| const ErrorCallback& error_callback) |
| : ready_callback_(ready_callback), |
| cancel_callback_(cancel_callback), |
| error_callback_(error_callback), |
| + flush_pending_(false), |
| buffer_in_use_(NULL), |
| + initialized_(false), |
| + available_buffer_capacity_(0), |
| shut_down_(false), |
| weak_factory_(this) { |
| } |
| void DataSinkReceiver::ShutDown() { |
| shut_down_ = true; |
| - if (waiter_) |
| - waiter_.reset(); |
| } |
| DataSinkReceiver::~DataSinkReceiver() { |
| } |
| -void DataSinkReceiver::Init(mojo::ScopedDataPipeConsumerHandle handle) { |
| - if (handle_.is_valid()) { |
| - DispatchFatalError(); |
| +void DataSinkReceiver::Init(uint32_t buffer_size) { |
| + if (initialized_) { |
| + ShutDown(); |
| return; |
| } |
| - |
| - handle_ = handle.Pass(); |
| - StartWaiting(); |
| + initialized_ = true; |
| + available_buffer_capacity_ = buffer_size; |
| } |
| void DataSinkReceiver::Cancel(int32_t error) { |
| @@ -103,7 +101,7 @@ void DataSinkReceiver::Cancel(int32_t error) { |
| // response, that ReportBytesSentAndError message will appear to the |
| // DataSinkClient to be caused by this Cancel message. In that case, we ignore |
| // the cancel. |
| - if (!pending_flushes_.empty() && !pending_flushes_.back()->received_flush()) |
| + if (flush_pending_) |
| return; |
| // If there is a buffer is in use, mark the buffer as cancelled and notify the |
| @@ -120,54 +118,37 @@ void DataSinkReceiver::Cancel(int32_t error) { |
| cancel_callback_.Run(error); |
| return; |
| } |
| - // If there is no buffer in use, immediately report the error and cancel the |
| - // waiting for the data pipe if one exists. This transitions straight into the |
| - // state after the sink has returned an error. |
| - waiter_.reset(); |
| ReportBytesSentAndError(0, error); |
| } |
| -void DataSinkReceiver::OnConnectionError() { |
| - DispatchFatalError(); |
| -} |
| - |
| -void DataSinkReceiver::StartWaiting() { |
| - DCHECK(!waiter_ && !shut_down_); |
| - waiter_.reset( |
| - new AsyncWaiter(handle_.get(), |
| - MOJO_HANDLE_SIGNAL_READABLE, |
| - base::Bind(&DataSinkReceiver::OnDoneWaiting, this))); |
| -} |
| - |
| -void DataSinkReceiver::OnDoneWaiting(MojoResult result) { |
| - DCHECK(waiter_ && !shut_down_); |
| - waiter_.reset(); |
| - if (result != MOJO_RESULT_OK) { |
| - DispatchFatalError(); |
| +void DataSinkReceiver::OnData(mojo::Array<uint8_t> data) { |
| + if (!initialized_) { |
| + ShutDown(); |
| return; |
| } |
| - // If there are any queued flushes (from ReportBytesSentAndError()), let them |
| - // flush data from the data pipe. |
| - if (!pending_flushes_.empty()) { |
| - MojoResult result = pending_flushes_.front()->Flush(handle_.get()); |
| - if (result == MOJO_RESULT_OK) { |
| - pending_flushes_.pop(); |
| - } else if (result != MOJO_RESULT_SHOULD_WAIT) { |
| - DispatchFatalError(); |
| - return; |
| - } |
| - StartWaiting(); |
| + if (data.size() > available_buffer_capacity_) { |
| + ShutDown(); |
| return; |
| } |
| - const void* data = NULL; |
| - uint32_t num_bytes = std::numeric_limits<uint32_t>::max(); |
| - result = mojo::BeginReadDataRaw( |
| - handle_.get(), &data, &num_bytes, MOJO_READ_DATA_FLAG_NONE); |
| - if (result != MOJO_RESULT_OK) { |
| - DispatchFatalError(); |
| + available_buffer_capacity_ -= static_cast<uint32_t>(data.size()); |
| + pending_data_buffers_.push(linked_ptr<DataFrame>(new DataFrame(data.Pass()))); |
| + if (!buffer_in_use_ && !flush_pending_) |
| + RunReadyCallback(); |
| +} |
| + |
| +void DataSinkReceiver::OnConnectionError() { |
| + DispatchFatalError(); |
| +} |
| + |
| +void DataSinkReceiver::RunReadyCallback() { |
| + DCHECK(!shut_down_ && !flush_pending_); |
| + // If data arrives while a call to RunReadyCallback() is posted, we can be |
| + // called with buffer_in_use_ already set. |
| + if (buffer_in_use_) |
| return; |
| - } |
| - buffer_in_use_ = new Buffer(this, static_cast<const char*>(data), num_bytes); |
| + buffer_in_use_ = new Buffer(this, |
| + pending_data_buffers_.front()->data(), |
| + pending_data_buffers_.front()->remaining_bytes()); |
| ready_callback_.Run(scoped_ptr<ReadOnlyBuffer>(buffer_in_use_)); |
| } |
| @@ -175,7 +156,12 @@ void DataSinkReceiver::Done(uint32_t bytes_read) { |
| if (!DoneInternal(bytes_read)) |
| return; |
| client()->ReportBytesSent(bytes_read); |
| - StartWaiting(); |
| + if (!pending_data_buffers_.empty()) { |
| + base::MessageLoop::current()->PostTask( |
| + FROM_HERE, |
| + base::Bind(&DataSinkReceiver::RunReadyCallback, |
| + weak_factory_.GetWeakPtr())); |
| + } |
| } |
| void DataSinkReceiver::DoneWithError(uint32_t bytes_read, int32_t error) { |
| @@ -190,35 +176,32 @@ bool DataSinkReceiver::DoneInternal(uint32_t bytes_read) { |
| DCHECK(buffer_in_use_); |
| buffer_in_use_ = NULL; |
| - MojoResult result = mojo::EndReadDataRaw(handle_.get(), bytes_read); |
| - if (result != MOJO_RESULT_OK) { |
| - DispatchFatalError(); |
| - return false; |
| - } |
| + available_buffer_capacity_ += bytes_read; |
| + pending_data_buffers_.front()->OnDataConsumed(bytes_read); |
| + if (pending_data_buffers_.front()->remaining_bytes() == 0) |
| + pending_data_buffers_.pop(); |
| return true; |
| } |
| void DataSinkReceiver::ReportBytesSentAndError(uint32_t bytes_read, |
| int32_t error) { |
| - // When we encounter an error, we must discard the data from any sends already |
| - // in the data pipe before we can resume dispatching data to the sink. We add |
| - // a pending flush here. The response containing the number of bytes to flush |
| - // is handled in SetNumBytesToFlush(). The actual flush is handled in |
| - // OnDoneWaiting(). |
| - pending_flushes_.push(linked_ptr<PendingFlush>(new PendingFlush())); |
| + // When we encounter an error, we must discard the data from any send buffers |
| + // transmitted by the DataSinkClient before it receives this error. |
| + flush_pending_ = true; |
| client()->ReportBytesSentAndError( |
| bytes_read, |
| error, |
| - base::Bind(&DataSinkReceiver::SetNumBytesToFlush, |
| - weak_factory_.GetWeakPtr())); |
| + base::Bind(&DataSinkReceiver::DoFlush, weak_factory_.GetWeakPtr())); |
| } |
| -void DataSinkReceiver::SetNumBytesToFlush(uint32_t bytes_to_flush) { |
| - DCHECK(!pending_flushes_.empty()); |
| - DCHECK(!pending_flushes_.back()->received_flush()); |
| - pending_flushes_.back()->SetNumBytesToFlush(bytes_to_flush); |
| - if (!waiter_) |
| - StartWaiting(); |
| +void DataSinkReceiver::DoFlush() { |
| + DCHECK(flush_pending_); |
| + flush_pending_ = false; |
| + while (!pending_data_buffers_.empty()) { |
| + available_buffer_capacity_ += |
| + pending_data_buffers_.front()->remaining_bytes(); |
| + pending_data_buffers_.pop(); |
| + } |
| } |
| void DataSinkReceiver::DispatchFatalError() { |
| @@ -263,44 +246,33 @@ uint32_t DataSinkReceiver::Buffer::GetSize() { |
| } |
| void DataSinkReceiver::Buffer::Done(uint32_t bytes_read) { |
| + scoped_refptr<DataSinkReceiver> receiver = receiver_; |
| + receiver_ = nullptr; |
| if (cancelled_) |
| - receiver_->DoneWithError(bytes_read, cancellation_error_); |
| + receiver->DoneWithError(bytes_read, cancellation_error_); |
| else |
| - receiver_->Done(bytes_read); |
| - receiver_ = NULL; |
| + receiver->Done(bytes_read); |
| buffer_ = NULL; |
| buffer_size_ = 0; |
| } |
| void DataSinkReceiver::Buffer::DoneWithError(uint32_t bytes_read, |
| int32_t error) { |
| - receiver_->DoneWithError(bytes_read, error); |
| - receiver_ = NULL; |
| + scoped_refptr<DataSinkReceiver> receiver = receiver_; |
| + receiver_ = nullptr; |
| + receiver->DoneWithError(bytes_read, error); |
| buffer_ = NULL; |
| buffer_size_ = 0; |
| } |
| -DataSinkReceiver::PendingFlush::PendingFlush() |
| - : received_flush_(false), bytes_to_flush_(0) { |
| -} |
| - |
| -void DataSinkReceiver::PendingFlush::SetNumBytesToFlush(uint32_t num_bytes) { |
| - DCHECK(!received_flush_); |
| - received_flush_ = true; |
| - bytes_to_flush_ = num_bytes; |
| +DataSinkReceiver::DataFrame::DataFrame(mojo::Array<uint8_t> data) |
| + : data_(data.Pass()), offset_(0) { |
| + DCHECK_LT(0u, data_.size()); |
| } |
| -MojoResult DataSinkReceiver::PendingFlush::Flush( |
| - mojo::DataPipeConsumerHandle handle) { |
| - DCHECK(received_flush_); |
| - uint32_t num_bytes = bytes_to_flush_; |
| - MojoResult result = |
| - mojo::ReadDataRaw(handle, NULL, &num_bytes, MOJO_READ_DATA_FLAG_DISCARD); |
| - if (result != MOJO_RESULT_OK) |
| - return result; |
| - DCHECK(num_bytes <= bytes_to_flush_); |
| - bytes_to_flush_ -= num_bytes; |
| - return bytes_to_flush_ == 0 ? MOJO_RESULT_OK : MOJO_RESULT_SHOULD_WAIT; |
| +void DataSinkReceiver::DataFrame::OnDataConsumed(uint32_t bytes_read) { |
| + offset_ += bytes_read; |
| + DCHECK_LE(offset_, data_.size()); |
| } |
| } // namespace device |