Index: device/serial/data_sink_receiver.cc |
diff --git a/device/serial/data_sink_receiver.cc b/device/serial/data_sink_receiver.cc |
index 767f036ba4eb60c351a5c1f79e6c6d863ffb3615..ae2651fdf9bcfa71da4b9a6a6c24ff2ee02e9625 100644 |
--- a/device/serial/data_sink_receiver.cc |
+++ b/device/serial/data_sink_receiver.cc |
@@ -7,36 +7,10 @@ |
#include <limits> |
#include "base/bind.h" |
-#include "device/serial/async_waiter.h" |
+#include "base/message_loop/message_loop.h" |
namespace device { |
-// Represents a flush of data that has not been completed. |
-class DataSinkReceiver::PendingFlush { |
- public: |
- PendingFlush(); |
- |
- // Initializes this PendingFlush with |num_bytes|, the number of bytes to |
- // flush. |
- void SetNumBytesToFlush(uint32_t num_bytes); |
- |
- // Attempts to discard |bytes_to_flush_| bytes from |handle|. Returns |
- // MOJO_RESULT_OK on success, MOJO_RESULT_SHOULD_WAIT if fewer than |
- // |bytes_to_flush_| bytes were flushed or the error if one is encountered |
- // discarding data from |handle|. |
- MojoResult Flush(mojo::DataPipeConsumerHandle handle); |
- |
- // Whether this PendingFlush has received the number of bytes to flush. |
- bool received_flush() { return received_flush_; } |
- |
- private: |
- // Whether this PendingFlush has received the number of bytes to flush. |
- bool received_flush_; |
- |
- // The remaining number of bytes to flush. |
- uint32_t bytes_to_flush_; |
-}; |
- |
// A ReadOnlyBuffer implementation that provides a view of a data pipe owned by |
// a DataSinkReceiver. |
class DataSinkReceiver::Buffer : public ReadOnlyBuffer { |
@@ -74,28 +48,30 @@ DataSinkReceiver::DataSinkReceiver(const ReadyCallback& ready_callback, |
: ready_callback_(ready_callback), |
cancel_callback_(cancel_callback), |
error_callback_(error_callback), |
+ flush_pending_(false), |
buffer_in_use_(NULL), |
+ waiting_for_data_(true), |
+ initialized_(false), |
+ available_buffer_capacity_(0), |
+ pending_data_offset_(0), |
shut_down_(false), |
weak_factory_(this) { |
} |
void DataSinkReceiver::ShutDown() { |
shut_down_ = true; |
- if (waiter_) |
- waiter_.reset(); |
} |
DataSinkReceiver::~DataSinkReceiver() { |
} |
-void DataSinkReceiver::Init(mojo::ScopedDataPipeConsumerHandle handle) { |
- if (handle_.is_valid()) { |
- DispatchFatalError(); |
+void DataSinkReceiver::Init(uint32_t buffer_size) { |
+ if (initialized_) { |
+ ShutDown(); |
return; |
} |
- |
- handle_ = handle.Pass(); |
- StartWaiting(); |
+ initialized_ = true; |
+ available_buffer_capacity_ = buffer_size; |
} |
void DataSinkReceiver::Cancel(int32_t error) { |
@@ -103,7 +79,7 @@ void DataSinkReceiver::Cancel(int32_t error) { |
// response, that ReportBytesSentAndError message will appear to the |
// DataSinkClient to be caused by this Cancel message. In that case, we ignore |
// the cancel. |
- if (!pending_flushes_.empty() && !pending_flushes_.back()->received_flush()) |
+ if (flush_pending_) |
return; |
// If there is a buffer is in use, mark the buffer as cancelled and notify the |
@@ -120,54 +96,39 @@ void DataSinkReceiver::Cancel(int32_t error) { |
cancel_callback_.Run(error); |
return; |
} |
- // If there is no buffer in use, immediately report the error and cancel the |
- // waiting for the data pipe if one exists. This transitions straight into the |
- // state after the sink has returned an error. |
- waiter_.reset(); |
ReportBytesSentAndError(0, error); |
} |
-void DataSinkReceiver::OnConnectionError() { |
- DispatchFatalError(); |
+void DataSinkReceiver::AcceptData(mojo::Array<uint8_t> data) { |
+ if (flush_pending_) |
+ return; |
raymes
2014/10/17 03:10:12
Should this ever happen in normal operation? We sh
Sam McNally
2014/10/20 05:12:59
It can occur if the error notification and some da
|
+ if (data.size() > available_buffer_capacity_) { |
+ ShutDown(); |
+ return; |
+ } |
+ available_buffer_capacity_ -= static_cast<uint32_t>(data.size()); |
+ pending_data_buffers_.push( |
+ linked_ptr<mojo::Array<uint8>>(new mojo::Array<uint8>(data.Pass()))); |
+ if (waiting_for_data_ && !buffer_in_use_) |
raymes
2014/10/17 03:10:12
Hmm I can't work out what waiting_for_data_ is for
Sam McNally
2014/10/20 05:12:59
Removed it.
|
+ CheckForData(); |
} |
-void DataSinkReceiver::StartWaiting() { |
- DCHECK(!waiter_ && !shut_down_); |
- waiter_.reset( |
- new AsyncWaiter(handle_.get(), |
- MOJO_HANDLE_SIGNAL_READABLE, |
- base::Bind(&DataSinkReceiver::OnDoneWaiting, this))); |
+void DataSinkReceiver::OnConnectionError() { |
+ DispatchFatalError(); |
} |
-void DataSinkReceiver::OnDoneWaiting(MojoResult result) { |
- DCHECK(waiter_ && !shut_down_); |
- waiter_.reset(); |
- if (result != MOJO_RESULT_OK) { |
- DispatchFatalError(); |
- return; |
- } |
- // If there are any queued flushes (from ReportBytesSentAndError()), let them |
- // flush data from the data pipe. |
- if (!pending_flushes_.empty()) { |
- MojoResult result = pending_flushes_.front()->Flush(handle_.get()); |
- if (result == MOJO_RESULT_OK) { |
- pending_flushes_.pop(); |
- } else if (result != MOJO_RESULT_SHOULD_WAIT) { |
- DispatchFatalError(); |
- return; |
- } |
- StartWaiting(); |
+void DataSinkReceiver::CheckForData() { |
raymes
2014/10/17 03:10:12
I feel like there might be a more descriptive name
Sam McNally
2014/10/20 05:12:59
Done.
|
+ DCHECK(!shut_down_ && !buffer_in_use_ && !flush_pending_); |
+ if (pending_data_buffers_.empty()) { |
+ waiting_for_data_ = true; |
return; |
} |
- const void* data = NULL; |
- uint32_t num_bytes = std::numeric_limits<uint32_t>::max(); |
- result = mojo::BeginReadDataRaw( |
- handle_.get(), &data, &num_bytes, MOJO_READ_DATA_FLAG_NONE); |
- if (result != MOJO_RESULT_OK) { |
- DispatchFatalError(); |
- return; |
- } |
- buffer_in_use_ = new Buffer(this, static_cast<const char*>(data), num_bytes); |
+ DCHECK_LT(pending_data_offset_, pending_data_buffers_.front()->size()); |
+ buffer_in_use_ = new Buffer( |
+ this, |
+ reinterpret_cast<const char*>(&(*pending_data_buffers_.front())[0]), |
+ static_cast<uint32_t>(pending_data_buffers_.front()->size() - |
+ pending_data_offset_)); |
ready_callback_.Run(scoped_ptr<ReadOnlyBuffer>(buffer_in_use_)); |
} |
@@ -175,7 +136,10 @@ void DataSinkReceiver::Done(uint32_t bytes_read) { |
if (!DoneInternal(bytes_read)) |
return; |
client()->ReportBytesSent(bytes_read); |
- StartWaiting(); |
+ waiting_for_data_ = false; |
+ base::MessageLoop::current()->PostTask( |
+ FROM_HERE, |
+ base::Bind(&DataSinkReceiver::CheckForData, weak_factory_.GetWeakPtr())); |
} |
void DataSinkReceiver::DoneWithError(uint32_t bytes_read, int32_t error) { |
@@ -190,35 +154,36 @@ bool DataSinkReceiver::DoneInternal(uint32_t bytes_read) { |
DCHECK(buffer_in_use_); |
buffer_in_use_ = NULL; |
- MojoResult result = mojo::EndReadDataRaw(handle_.get(), bytes_read); |
- if (result != MOJO_RESULT_OK) { |
- DispatchFatalError(); |
- return false; |
+ available_buffer_capacity_ += bytes_read; |
+ pending_data_offset_ += bytes_read; |
raymes
2014/10/17 03:10:12
We could use a small struct for pending data buffe
Sam McNally
2014/10/20 05:12:59
Done.
|
+ if (pending_data_offset_ == pending_data_buffers_.front()->size()) { |
+ pending_data_buffers_.pop(); |
+ pending_data_offset_ = 0; |
} |
return true; |
} |
void DataSinkReceiver::ReportBytesSentAndError(uint32_t bytes_read, |
int32_t error) { |
- // When we encounter an error, we must discard the data from any sends already |
- // in the data pipe before we can resume dispatching data to the sink. We add |
- // a pending flush here. The response containing the number of bytes to flush |
- // is handled in SetNumBytesToFlush(). The actual flush is handled in |
- // OnDoneWaiting(). |
- pending_flushes_.push(linked_ptr<PendingFlush>(new PendingFlush())); |
+ // When we encounter an error, we must discard the data from any sends |
raymes
2014/10/17 03:10:12
sends->send ?
Sam McNally
2014/10/20 05:12:59
Done.
|
+ // buffers transmitted by the DataSinkClient before it receives this error. |
+ flush_pending_ = true; |
client()->ReportBytesSentAndError( |
bytes_read, |
error, |
- base::Bind(&DataSinkReceiver::SetNumBytesToFlush, |
- weak_factory_.GetWeakPtr())); |
+ base::Bind(&DataSinkReceiver::DoFlush, weak_factory_.GetWeakPtr())); |
} |
-void DataSinkReceiver::SetNumBytesToFlush(uint32_t bytes_to_flush) { |
- DCHECK(!pending_flushes_.empty()); |
- DCHECK(!pending_flushes_.back()->received_flush()); |
- pending_flushes_.back()->SetNumBytesToFlush(bytes_to_flush); |
- if (!waiter_) |
- StartWaiting(); |
+void DataSinkReceiver::DoFlush() { |
+ DCHECK(flush_pending_); |
+ flush_pending_ = false; |
+ while (!pending_data_buffers_.empty()) { |
+ available_buffer_capacity_ += static_cast<uint32_t>( |
+ pending_data_buffers_.front()->size() - pending_data_offset_); |
+ pending_data_buffers_.pop(); |
+ pending_data_offset_ = 0; |
+ } |
+ waiting_for_data_ = true; |
} |
void DataSinkReceiver::DispatchFatalError() { |
@@ -263,44 +228,23 @@ uint32_t DataSinkReceiver::Buffer::GetSize() { |
} |
void DataSinkReceiver::Buffer::Done(uint32_t bytes_read) { |
+ scoped_refptr<DataSinkReceiver> receiver = receiver_; |
+ receiver_ = nullptr; |
if (cancelled_) |
- receiver_->DoneWithError(bytes_read, cancellation_error_); |
+ receiver->DoneWithError(bytes_read, cancellation_error_); |
else |
- receiver_->Done(bytes_read); |
- receiver_ = NULL; |
+ receiver->Done(bytes_read); |
buffer_ = NULL; |
buffer_size_ = 0; |
} |
void DataSinkReceiver::Buffer::DoneWithError(uint32_t bytes_read, |
int32_t error) { |
- receiver_->DoneWithError(bytes_read, error); |
- receiver_ = NULL; |
+ scoped_refptr<DataSinkReceiver> receiver = receiver_; |
+ receiver_ = nullptr; |
+ receiver->DoneWithError(bytes_read, error); |
buffer_ = NULL; |
buffer_size_ = 0; |
} |
-DataSinkReceiver::PendingFlush::PendingFlush() |
- : received_flush_(false), bytes_to_flush_(0) { |
-} |
- |
-void DataSinkReceiver::PendingFlush::SetNumBytesToFlush(uint32_t num_bytes) { |
- DCHECK(!received_flush_); |
- received_flush_ = true; |
- bytes_to_flush_ = num_bytes; |
-} |
- |
-MojoResult DataSinkReceiver::PendingFlush::Flush( |
- mojo::DataPipeConsumerHandle handle) { |
- DCHECK(received_flush_); |
- uint32_t num_bytes = bytes_to_flush_; |
- MojoResult result = |
- mojo::ReadDataRaw(handle, NULL, &num_bytes, MOJO_READ_DATA_FLAG_DISCARD); |
- if (result != MOJO_RESULT_OK) |
- return result; |
- DCHECK(num_bytes <= bytes_to_flush_); |
- bytes_to_flush_ -= num_bytes; |
- return bytes_to_flush_ == 0 ? MOJO_RESULT_OK : MOJO_RESULT_SHOULD_WAIT; |
-} |
- |
} // namespace device |