Chromium Code Reviews| Index: device/serial/data_receiver.cc |
| diff --git a/device/serial/data_receiver.cc b/device/serial/data_receiver.cc |
| index b6e44a977ec64bdc977ffc8d9f4014889d9e6743..239b1651b178abcf63dcf5099aa886d9fab46383 100644 |
| --- a/device/serial/data_receiver.cc |
| +++ b/device/serial/data_receiver.cc |
| @@ -8,7 +8,6 @@ |
| #include "base/bind.h" |
| #include "base/message_loop/message_loop.h" |
| -#include "device/serial/async_waiter.h" |
| namespace device { |
| @@ -21,16 +20,17 @@ class DataReceiver::PendingReceive { |
| int32_t fatal_error_value); |
| // Dispatches |data| to |receive_callback_|. |
| - void DispatchData(const void* data, uint32_t num_bytes); |
| + void DispatchData(DataReceiver::PendingData* data); |
| // Reports |error| to |receive_error_callback_| if it is an appropriate time. |
| // Returns whether it dispatched |error|. |
| - bool DispatchError(DataReceiver::PendingError* error, |
| - uint32_t bytes_received); |
| + bool DispatchError(DataReceiver::PendingError* error); |
| // Reports |fatal_error_value_| to |receive_error_callback_|. |
| void DispatchFatalError(); |
| + bool dispatched() { return buffer_in_use_; } |
|
raymes
2014/10/17 01:55:42
why not just call this function "buffer_in_use()"
Sam McNally
2014/10/20 05:12:58
Done.
|
| + |
| private: |
| class Buffer; |
| @@ -82,13 +82,24 @@ class DataReceiver::PendingReceive::Buffer : public ReadOnlyBuffer { |
| uint32_t buffer_size_; |
| }; |
| +// A buffer of data received from the DataSource. |
| +struct DataReceiver::PendingData { |
| + explicit PendingData(mojo::Array<uint8_t> data) |
| + : data(data.Pass()), offset(0) {} |
| + |
| + mojo::Array<uint8_t> data; |
| + |
| + // The offset within |data| at which the next read should begin. |
| + uint32_t offset; |
| +}; |
| + |
| // Represents an error received from the DataSource. |
| struct DataReceiver::PendingError { |
|
raymes
2014/10/17 01:55:42
Could we combine PendingData and PendingError? It
Sam McNally
2014/10/20 05:12:58
Done.
|
| - PendingError(uint32_t offset, int32_t error) |
| - : offset(offset), error(error), dispatched(false) {} |
| + PendingError(int32_t error, size_t queue_position) |
| + : queue_position(queue_position), error(error), dispatched(false) {} |
| - // The location within the data stream where the error occurred. |
| - const uint32_t offset; |
| + // The number of pending data buffers to be dispatched before this error. |
| + size_t queue_position; |
| // The value of the error that occurred. |
| const int32_t error; |
| @@ -102,17 +113,11 @@ DataReceiver::DataReceiver(mojo::InterfacePtr<serial::DataSource> source, |
| int32_t fatal_error_value) |
| : source_(source.Pass()), |
| fatal_error_value_(fatal_error_value), |
| - bytes_received_(0), |
| shut_down_(false), |
| weak_factory_(this) { |
| - MojoCreateDataPipeOptions options = { |
| - sizeof(options), MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1, buffer_size, |
| - }; |
| - mojo::ScopedDataPipeProducerHandle remote_handle; |
| - MojoResult result = mojo::CreateDataPipe(&options, &remote_handle, &handle_); |
| - DCHECK_EQ(MOJO_RESULT_OK, result); |
| - source_->Init(remote_handle.Pass()); |
| source_.set_client(this); |
| + source_.set_error_handler(this); |
| + source_->Init(buffer_size); |
| } |
| bool DataReceiver::Receive(const ReceiveDataCallback& callback, |
| @@ -131,9 +136,7 @@ bool DataReceiver::Receive(const ReceiveDataCallback& callback, |
| pending_receive_.reset( |
| new PendingReceive(this, callback, error_callback, fatal_error_value_)); |
| - base::MessageLoop::current()->PostTask( |
| - FROM_HERE, |
| - base::Bind(&DataReceiver::ReceiveInternal, weak_factory_.GetWeakPtr())); |
| + ReceiveInternal(); |
| return true; |
| } |
| @@ -141,7 +144,7 @@ DataReceiver::~DataReceiver() { |
| ShutDown(); |
| } |
| -void DataReceiver::OnError(uint32_t offset, int32_t error) { |
| +void DataReceiver::OnError(int32_t error) { |
| if (shut_down_) |
| return; |
| @@ -154,12 +157,16 @@ void DataReceiver::OnError(uint32_t offset, int32_t error) { |
| ShutDown(); |
| return; |
| } |
| - pending_error_.reset(new PendingError(offset, error)); |
| - if (pending_receive_ && |
| - pending_receive_->DispatchError(pending_error_.get(), bytes_received_)) { |
| - pending_receive_.reset(); |
| - waiter_.reset(); |
| - } |
| + pending_error_.reset(new PendingError(error, pending_data_buffers_.size())); |
| + if (pending_receive_) |
| + ReceiveInternal(); |
| +} |
| + |
| +void DataReceiver::OnData(mojo::Array<uint8_t> data) { |
| + pending_data_buffers_.push( |
| + linked_ptr<PendingData>(new PendingData(data.Pass()))); |
| + if (pending_receive_) |
| + ReceiveInternal(); |
| } |
| void DataReceiver::OnConnectionError() { |
| @@ -171,83 +178,29 @@ void DataReceiver::Done(uint32_t bytes_consumed) { |
| return; |
| DCHECK(pending_receive_); |
| - MojoResult result = mojo::EndReadDataRaw(handle_.get(), bytes_consumed); |
| - DCHECK_EQ(MOJO_RESULT_OK, result); |
| - pending_receive_.reset(); |
| - bytes_received_ += bytes_consumed; |
| -} |
| - |
| -void DataReceiver::OnDoneWaiting(MojoResult result) { |
| - DCHECK(pending_receive_ && !shut_down_ && waiter_); |
| - waiter_.reset(); |
| - if (result != MOJO_RESULT_OK) { |
| - ShutDown(); |
| - return; |
| + PendingData& pending_data = *pending_data_buffers_.front(); |
| + pending_data.offset += bytes_consumed; |
| + if (pending_data.offset == pending_data.data.size()) { |
|
raymes
2014/10/17 01:55:42
Maybe we should at least DCHECK here that pending_
Sam McNally
2014/10/20 05:12:58
Done.
|
| + if (pending_error_) |
| + pending_error_->queue_position--; |
| + pending_data_buffers_.pop(); |
| } |
| - ReceiveInternal(); |
| + pending_receive_.reset(); |
| } |
| void DataReceiver::ReceiveInternal() { |
| if (shut_down_) |
| return; |
| DCHECK(pending_receive_); |
| - if (pending_error_ && |
| - pending_receive_->DispatchError(pending_error_.get(), bytes_received_)) { |
| - pending_receive_.reset(); |
| - waiter_.reset(); |
| + if (pending_receive_->dispatched()) |
| return; |
| - } |
| - const void* data; |
| - uint32_t num_bytes = std::numeric_limits<uint32_t>::max(); |
| - MojoResult result = mojo::BeginReadDataRaw( |
| - handle_.get(), &data, &num_bytes, MOJO_READ_DATA_FLAG_NONE); |
| - if (result == MOJO_RESULT_OK) { |
| - if (!CheckErrorNotInReadRange(num_bytes)) { |
| - ShutDown(); |
| - return; |
| - } |
| - |
| - pending_receive_->DispatchData(data, num_bytes); |
| - return; |
| - } |
| - if (result == MOJO_RESULT_SHOULD_WAIT) { |
| - waiter_.reset(new AsyncWaiter( |
| - handle_.get(), |
| - MOJO_HANDLE_SIGNAL_READABLE, |
| - base::Bind(&DataReceiver::OnDoneWaiting, weak_factory_.GetWeakPtr()))); |
| + if (pending_error_ && pending_receive_->DispatchError(pending_error_.get())) { |
| + pending_receive_.reset(); |
| return; |
| } |
| - ShutDown(); |
| -} |
| - |
| -bool DataReceiver::CheckErrorNotInReadRange(uint32_t num_bytes) { |
| - DCHECK(pending_receive_); |
| - if (!pending_error_) |
| - return true; |
| - |
| - DCHECK_NE(bytes_received_, pending_error_->offset); |
| - DCHECK_NE(num_bytes, 0u); |
| - uint32_t potential_bytes_received = bytes_received_ + num_bytes; |
| - // bytes_received_ can overflow so we must consider two cases: |
| - // 1. Both |bytes_received_| and |pending_error_->offset| have overflowed an |
| - // equal number of times. In this case, |potential_bytes_received| must |
| - // be in the range (|bytes_received|, |pending_error_->offset|]. Below |
| - // this range can only occur if |bytes_received_| overflows before |
| - // |pending_error_->offset|. Above can only occur if |bytes_received_| |
| - // overtakes |pending_error_->offset|. |
| - // 2. |pending_error_->offset| has overflowed once more than |
| - // |bytes_received_|. In this case, |potential_bytes_received| must not |
| - // be in the range (|pending_error_->offset|, |bytes_received_|]. |
| - if ((bytes_received_ < pending_error_->offset && |
| - (potential_bytes_received > pending_error_->offset || |
| - potential_bytes_received <= bytes_received_)) || |
| - (bytes_received_ > pending_error_->offset && |
| - potential_bytes_received > pending_error_->offset && |
| - potential_bytes_received <= bytes_received_)) { |
| - return false; |
| - } |
| - return true; |
| + if (!pending_data_buffers_.empty()) |
| + pending_receive_->DispatchData(pending_data_buffers_.front().get()); |
| } |
| void DataReceiver::ShutDown() { |
| @@ -255,7 +208,6 @@ void DataReceiver::ShutDown() { |
| if (pending_receive_) |
| pending_receive_->DispatchFatalError(); |
| pending_error_.reset(); |
| - waiter_.reset(); |
| } |
| DataReceiver::PendingReceive::PendingReceive( |
| @@ -270,22 +222,29 @@ DataReceiver::PendingReceive::PendingReceive( |
| buffer_in_use_(false) { |
| } |
| -void DataReceiver::PendingReceive::DispatchData(const void* data, |
| - uint32_t num_bytes) { |
| +void DataReceiver::PendingReceive::DispatchData( |
| + DataReceiver::PendingData* data) { |
| DCHECK(!buffer_in_use_); |
| buffer_in_use_ = true; |
| - receive_callback_.Run(scoped_ptr<ReadOnlyBuffer>( |
| - new Buffer(receiver_, this, static_cast<const char*>(data), num_bytes))); |
| + base::MessageLoop::current()->PostTask( |
| + FROM_HERE, |
| + base::Bind( |
| + receive_callback_, |
| + base::Passed(scoped_ptr<ReadOnlyBuffer>(new Buffer( |
| + receiver_, |
| + this, |
| + reinterpret_cast<char*>(&data->data[0]) + data->offset, |
| + static_cast<uint32_t>(data->data.size() - data->offset)))))); |
| } |
| -bool DataReceiver::PendingReceive::DispatchError(PendingError* error, |
| - uint32_t bytes_received) { |
| +bool DataReceiver::PendingReceive::DispatchError(PendingError* error) { |
| DCHECK(!error->dispatched); |
| - if (buffer_in_use_ || bytes_received != error->offset) |
| + if (buffer_in_use_ || error->queue_position != 0) |
| return false; |
| error->dispatched = true; |
| - receive_error_callback_.Run(error->error); |
| + base::MessageLoop::current()->PostTask( |
| + FROM_HERE, base::Bind(receive_error_callback_, error->error)); |
| return true; |
| } |