Index: device/serial/data_receiver.cc |
diff --git a/device/serial/data_receiver.cc b/device/serial/data_receiver.cc |
index d7a47fa1bc8d640792bf5071e05d94e3b62082fa..fdc23caaff385c144b1f05ed7b25a5dabf4b0a4f 100644 |
--- a/device/serial/data_receiver.cc |
+++ b/device/serial/data_receiver.cc |
@@ -8,7 +8,6 @@ |
#include "base/bind.h" |
#include "base/message_loop/message_loop.h" |
-#include "device/serial/async_waiter.h" |
namespace device { |
@@ -20,17 +19,15 @@ class DataReceiver::PendingReceive { |
const ReceiveErrorCallback& error_callback, |
int32_t fatal_error_value); |
- // Dispatches |data| to |receive_callback_|. |
- void DispatchData(const void* data, uint32_t num_bytes); |
- |
- // Reports |error| to |receive_error_callback_| if it is an appropriate time. |
- // Returns whether it dispatched |error|. |
- bool DispatchError(DataReceiver::PendingError* error, |
- uint32_t bytes_received); |
+ // Dispatches |data| to |receive_callback_|. Returns whether this |
+ // PendingReceive is finished by this call. |
+ bool DispatchDataFrame(DataReceiver::DataFrame* data); |
// Reports |fatal_error_value_| to |receive_error_callback_|. |
void DispatchFatalError(); |
+ bool buffer_in_use() { return buffer_in_use_; } |
+ |
private: |
class Buffer; |
@@ -51,12 +48,12 @@ class DataReceiver::PendingReceive { |
const int32_t fatal_error_value_; |
// True if the user owns a buffer passed to |receive_callback_| as part of |
- // DispatchData(). |
+ // DispatchDataFrame(). |
bool buffer_in_use_; |
}; |
-// A ReadOnlyBuffer implementation that provides a view of a data pipe owned by |
-// a DataReceiver. |
+// A ReadOnlyBuffer implementation that provides a view of a buffer owned by a |
+// DataReceiver. |
class DataReceiver::PendingReceive::Buffer : public ReadOnlyBuffer { |
public: |
Buffer(scoped_refptr<DataReceiver> pipe, |
@@ -72,7 +69,7 @@ class DataReceiver::PendingReceive::Buffer : public ReadOnlyBuffer { |
void DoneWithError(uint32_t bytes_consumed, int32_t error) override; |
private: |
- // The DataReceiver whose data pipe we are providing a view. |
+ // The DataReceiver of whose buffer we are providing a view. |
scoped_refptr<DataReceiver> receiver_; |
// The PendingReceive to which this buffer has been created in response. |
@@ -82,13 +79,26 @@ class DataReceiver::PendingReceive::Buffer : public ReadOnlyBuffer { |
uint32_t buffer_size_; |
}; |
-// Represents an error received from the DataSource. |
-struct DataReceiver::PendingError { |
- PendingError(uint32_t offset, int32_t error) |
- : offset(offset), error(error), dispatched(false) {} |
+// A buffer of data or an error received from the DataSource. |
+struct DataReceiver::DataFrame { |
+ explicit DataFrame(mojo::Array<uint8_t> data) |
+ : is_error(false), |
+ data(data.Pass()), |
+ offset(0), |
+ error(0), |
+ dispatched(false) {} |
+ |
+ explicit DataFrame(int32_t error) |
+ : is_error(true), offset(0), error(error), dispatched(false) {} |
- // The location within the data stream where the error occurred. |
- const uint32_t offset; |
+ // Whether this DataFrame represents an error. |
+ bool is_error; |
+ |
+ // The data received from the DataSource. |
+ mojo::Array<uint8_t> data; |
+ |
+ // The offset within |data| at which the next read should begin. |
+ uint32_t offset; |
// The value of the error that occurred. |
const int32_t error; |
@@ -102,17 +112,11 @@ DataReceiver::DataReceiver(mojo::InterfacePtr<serial::DataSource> source, |
int32_t fatal_error_value) |
: source_(source.Pass()), |
fatal_error_value_(fatal_error_value), |
- bytes_received_(0), |
shut_down_(false), |
weak_factory_(this) { |
- MojoCreateDataPipeOptions options = { |
- sizeof(options), MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1, buffer_size, |
- }; |
- mojo::ScopedDataPipeProducerHandle remote_handle; |
- MojoResult result = mojo::CreateDataPipe(&options, &remote_handle, &handle_); |
- DCHECK_EQ(MOJO_RESULT_OK, result); |
- source_->Init(remote_handle.Pass()); |
source_.set_client(this); |
+ source_.set_error_handler(this); |
+ source_->Init(buffer_size); |
} |
bool DataReceiver::Receive(const ReceiveDataCallback& callback, |
@@ -124,16 +128,15 @@ bool DataReceiver::Receive(const ReceiveDataCallback& callback, |
// user starts a new receive following notification of the error (via |
// |error_callback| of the previous Receive call) of the error we can tell the |
// DataSource to resume transmission of data. |
- if (pending_error_ && pending_error_->dispatched) { |
+ if (!pending_data_frames_.empty() && pending_data_frames_.front()->is_error && |
+ pending_data_frames_.front()->dispatched) { |
source_->Resume(); |
- pending_error_.reset(); |
+ pending_data_frames_.pop(); |
} |
pending_receive_.reset( |
new PendingReceive(this, callback, error_callback, fatal_error_value_)); |
- base::MessageLoop::current()->PostTask( |
- FROM_HERE, |
- base::Bind(&DataReceiver::ReceiveInternal, weak_factory_.GetWeakPtr())); |
+ ReceiveInternal(); |
return true; |
} |
@@ -141,25 +144,19 @@ DataReceiver::~DataReceiver() { |
ShutDown(); |
} |
-void DataReceiver::OnError(uint32_t offset, int32_t error) { |
+void DataReceiver::OnError(int32_t error) { |
if (shut_down_) |
return; |
- if (pending_error_) { |
- // When OnError is called by the DataSource, transmission of data is |
- // suspended. Thus we shouldn't receive another call to OnError until we |
- // have fully dealt with the error and called Resume to resume transmission |
- // (see Receive()). Under normal operation we should never get here, but if |
- // we do (e.g. in the case of a hijacked service process) just shut down. |
- ShutDown(); |
- return; |
- } |
- pending_error_.reset(new PendingError(offset, error)); |
- if (pending_receive_ && |
- pending_receive_->DispatchError(pending_error_.get(), bytes_received_)) { |
- pending_receive_.reset(); |
- waiter_.reset(); |
- } |
+ pending_data_frames_.push(linked_ptr<DataFrame>(new DataFrame(error))); |
+ if (pending_receive_) |
+ ReceiveInternal(); |
+} |
+ |
+void DataReceiver::OnData(mojo::Array<uint8_t> data) { |
+ pending_data_frames_.push(linked_ptr<DataFrame>(new DataFrame(data.Pass()))); |
+ if (pending_receive_) |
+ ReceiveInternal(); |
} |
void DataReceiver::OnConnectionError() { |
@@ -171,91 +168,34 @@ void DataReceiver::Done(uint32_t bytes_consumed) { |
return; |
DCHECK(pending_receive_); |
- MojoResult result = mojo::EndReadDataRaw(handle_.get(), bytes_consumed); |
- DCHECK_EQ(MOJO_RESULT_OK, result); |
- pending_receive_.reset(); |
- bytes_received_ += bytes_consumed; |
-} |
- |
-void DataReceiver::OnDoneWaiting(MojoResult result) { |
- DCHECK(pending_receive_ && !shut_down_ && waiter_); |
- waiter_.reset(); |
- if (result != MOJO_RESULT_OK) { |
- ShutDown(); |
- return; |
+ DataFrame& pending_data = *pending_data_frames_.front(); |
+ pending_data.offset += bytes_consumed; |
+ DCHECK_LE(pending_data.offset, pending_data.data.size()); |
+ if (pending_data.offset == pending_data.data.size()) { |
+ source_->ReportBytesReceived( |
+ static_cast<uint32_t>(pending_data.data.size())); |
+ pending_data_frames_.pop(); |
} |
- ReceiveInternal(); |
+ pending_receive_.reset(); |
} |
void DataReceiver::ReceiveInternal() { |
if (shut_down_) |
return; |
DCHECK(pending_receive_); |
- if (pending_error_ && |
- pending_receive_->DispatchError(pending_error_.get(), bytes_received_)) { |
- pending_receive_.reset(); |
- waiter_.reset(); |
+ if (pending_receive_->buffer_in_use()) |
return; |
- } |
- const void* data; |
- uint32_t num_bytes = std::numeric_limits<uint32_t>::max(); |
- MojoResult result = mojo::BeginReadDataRaw( |
- handle_.get(), &data, &num_bytes, MOJO_READ_DATA_FLAG_NONE); |
- if (result == MOJO_RESULT_OK) { |
- if (!CheckErrorNotInReadRange(num_bytes)) { |
- ShutDown(); |
- return; |
- } |
- |
- pending_receive_->DispatchData(data, num_bytes); |
- return; |
- } |
- if (result == MOJO_RESULT_SHOULD_WAIT) { |
- waiter_.reset(new AsyncWaiter( |
- handle_.get(), |
- MOJO_HANDLE_SIGNAL_READABLE, |
- base::Bind(&DataReceiver::OnDoneWaiting, weak_factory_.GetWeakPtr()))); |
- return; |
+ if (!pending_data_frames_.empty() && |
+ pending_receive_->DispatchDataFrame(pending_data_frames_.front().get())) { |
+ pending_receive_.reset(); |
} |
- ShutDown(); |
-} |
- |
-bool DataReceiver::CheckErrorNotInReadRange(uint32_t num_bytes) { |
- DCHECK(pending_receive_); |
- if (!pending_error_) |
- return true; |
- |
- DCHECK_NE(bytes_received_, pending_error_->offset); |
- DCHECK_NE(num_bytes, 0u); |
- uint32_t potential_bytes_received = bytes_received_ + num_bytes; |
- // bytes_received_ can overflow so we must consider two cases: |
- // 1. Both |bytes_received_| and |pending_error_->offset| have overflowed an |
- // equal number of times. In this case, |potential_bytes_received| must |
- // be in the range (|bytes_received|, |pending_error_->offset|]. Below |
- // this range can only occur if |bytes_received_| overflows before |
- // |pending_error_->offset|. Above can only occur if |bytes_received_| |
- // overtakes |pending_error_->offset|. |
- // 2. |pending_error_->offset| has overflowed once more than |
- // |bytes_received_|. In this case, |potential_bytes_received| must not |
- // be in the range (|pending_error_->offset|, |bytes_received_|]. |
- if ((bytes_received_ < pending_error_->offset && |
- (potential_bytes_received > pending_error_->offset || |
- potential_bytes_received <= bytes_received_)) || |
- (bytes_received_ > pending_error_->offset && |
- potential_bytes_received > pending_error_->offset && |
- potential_bytes_received <= bytes_received_)) { |
- return false; |
- } |
- return true; |
} |
void DataReceiver::ShutDown() { |
shut_down_ = true; |
if (pending_receive_) |
pending_receive_->DispatchFatalError(); |
- pending_error_.reset(); |
- waiter_.reset(); |
} |
DataReceiver::PendingReceive::PendingReceive( |
@@ -270,23 +210,28 @@ DataReceiver::PendingReceive::PendingReceive( |
buffer_in_use_(false) { |
} |
-void DataReceiver::PendingReceive::DispatchData(const void* data, |
- uint32_t num_bytes) { |
+bool DataReceiver::PendingReceive::DispatchDataFrame( |
+ DataReceiver::DataFrame* data) { |
DCHECK(!buffer_in_use_); |
- buffer_in_use_ = true; |
- receive_callback_.Run(scoped_ptr<ReadOnlyBuffer>( |
- new Buffer(receiver_, this, static_cast<const char*>(data), num_bytes))); |
-} |
+ DCHECK(!data->dispatched); |
-bool DataReceiver::PendingReceive::DispatchError(PendingError* error, |
- uint32_t bytes_received) { |
- DCHECK(!error->dispatched); |
- if (buffer_in_use_ || bytes_received != error->offset) |
- return false; |
- |
- error->dispatched = true; |
- receive_error_callback_.Run(error->error); |
- return true; |
+ if (data->is_error) { |
+ data->dispatched = true; |
+ base::MessageLoop::current()->PostTask( |
+ FROM_HERE, base::Bind(receive_error_callback_, data->error)); |
+ return true; |
+ } |
+ buffer_in_use_ = true; |
+ base::MessageLoop::current()->PostTask( |
+ FROM_HERE, |
+ base::Bind( |
+ receive_callback_, |
+ base::Passed(scoped_ptr<ReadOnlyBuffer>(new Buffer( |
+ receiver_, |
+ this, |
+ reinterpret_cast<char*>(&data->data[0]) + data->offset, |
+ static_cast<uint32_t>(data->data.size() - data->offset)))))); |
+ return false; |
} |
void DataReceiver::PendingReceive::DispatchFatalError() { |