Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(8904)

Unified Diff: device/serial/data_receiver.cc

Issue 646063003: Change data pipe wrappers used by SerialConnection to use message pipe. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: address comments Created 6 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: device/serial/data_receiver.cc
diff --git a/device/serial/data_receiver.cc b/device/serial/data_receiver.cc
index b6e44a977ec64bdc977ffc8d9f4014889d9e6743..f151ea610d7e52a910f1a6eee81079ddfc6b5e9d 100644
--- a/device/serial/data_receiver.cc
+++ b/device/serial/data_receiver.cc
@@ -8,7 +8,6 @@
#include "base/bind.h"
#include "base/message_loop/message_loop.h"
-#include "device/serial/async_waiter.h"
namespace device {
@@ -21,16 +20,13 @@ class DataReceiver::PendingReceive {
int32_t fatal_error_value);
// Dispatches |data| to |receive_callback_|.
- void DispatchData(const void* data, uint32_t num_bytes);
-
- // Reports |error| to |receive_error_callback_| if it is an appropriate time.
- // Returns whether it dispatched |error|.
- bool DispatchError(DataReceiver::PendingError* error,
- uint32_t bytes_received);
+ bool DispatchDataFrame(DataReceiver::DataFrame* data);
// Reports |fatal_error_value_| to |receive_error_callback_|.
void DispatchFatalError();
+ bool buffer_in_use() { return buffer_in_use_; }
+
private:
class Buffer;
@@ -51,12 +47,12 @@ class DataReceiver::PendingReceive {
const int32_t fatal_error_value_;
// True if the user owns a buffer passed to |receive_callback_| as part of
- // DispatchData().
+ // DispatchDataFrame().
bool buffer_in_use_;
};
-// A ReadOnlyBuffer implementation that provides a view of a data pipe owned by
-// a DataReceiver.
+// A ReadOnlyBuffer implementation that provides a view of a buffer owned by a
+// DataReceiver.
class DataReceiver::PendingReceive::Buffer : public ReadOnlyBuffer {
public:
Buffer(scoped_refptr<DataReceiver> pipe,
@@ -72,7 +68,7 @@ class DataReceiver::PendingReceive::Buffer : public ReadOnlyBuffer {
virtual void DoneWithError(uint32_t bytes_consumed, int32_t error) override;
private:
- // The DataReceiver whose data pipe we are providing a view.
+ // The DataReceiver of whose buffer we are providing a view.
scoped_refptr<DataReceiver> receiver_;
// The PendingReceive to which this buffer has been created in response.
@@ -82,13 +78,26 @@ class DataReceiver::PendingReceive::Buffer : public ReadOnlyBuffer {
uint32_t buffer_size_;
};
-// Represents an error received from the DataSource.
-struct DataReceiver::PendingError {
- PendingError(uint32_t offset, int32_t error)
- : offset(offset), error(error), dispatched(false) {}
+// A buffer of data or an error received from the DataSource.
+struct DataReceiver::DataFrame {
+ explicit DataFrame(mojo::Array<uint8_t> data)
+ : is_error(false),
+ data(data.Pass()),
+ offset(0),
+ error(0),
+ dispatched(false) {}
+
+ explicit DataFrame(int32_t error)
+ : is_error(true), offset(0), error(error), dispatched(false) {}
- // The location within the data stream where the error occurred.
- const uint32_t offset;
+ // Whether this DataFrame represents an error.
+ bool is_error;
+
+ // The data received from the DataSource.
+ mojo::Array<uint8_t> data;
+
+ // The offset within |data| at which the next read should begin.
+ uint32_t offset;
// The value of the error that occurred.
const int32_t error;
@@ -102,17 +111,11 @@ DataReceiver::DataReceiver(mojo::InterfacePtr<serial::DataSource> source,
int32_t fatal_error_value)
: source_(source.Pass()),
fatal_error_value_(fatal_error_value),
- bytes_received_(0),
shut_down_(false),
weak_factory_(this) {
- MojoCreateDataPipeOptions options = {
- sizeof(options), MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1, buffer_size,
- };
- mojo::ScopedDataPipeProducerHandle remote_handle;
- MojoResult result = mojo::CreateDataPipe(&options, &remote_handle, &handle_);
- DCHECK_EQ(MOJO_RESULT_OK, result);
- source_->Init(remote_handle.Pass());
source_.set_client(this);
+ source_.set_error_handler(this);
+ source_->Init(buffer_size);
}
bool DataReceiver::Receive(const ReceiveDataCallback& callback,
@@ -124,16 +127,16 @@ bool DataReceiver::Receive(const ReceiveDataCallback& callback,
// user starts a new receive following notification of the error (via
// |error_callback| of the previous Receive call) of the error we can tell the
// DataSource to resume transmission of data.
- if (pending_error_ && pending_error_->dispatched) {
+ if (!pending_data_buffers_.empty() &&
+ pending_data_buffers_.front()->is_error &&
+ pending_data_buffers_.front()->dispatched) {
source_->Resume();
- pending_error_.reset();
+ pending_data_buffers_.pop();
}
pending_receive_.reset(
new PendingReceive(this, callback, error_callback, fatal_error_value_));
- base::MessageLoop::current()->PostTask(
- FROM_HERE,
- base::Bind(&DataReceiver::ReceiveInternal, weak_factory_.GetWeakPtr()));
+ ReceiveInternal();
return true;
}
@@ -141,25 +144,19 @@ DataReceiver::~DataReceiver() {
ShutDown();
}
-void DataReceiver::OnError(uint32_t offset, int32_t error) {
+void DataReceiver::OnError(int32_t error) {
if (shut_down_)
return;
- if (pending_error_) {
- // When OnError is called by the DataSource, transmission of data is
- // suspended. Thus we shouldn't receive another call to OnError until we
- // have fully dealt with the error and called Resume to resume transmission
- // (see Receive()). Under normal operation we should never get here, but if
- // we do (e.g. in the case of a hijacked service process) just shut down.
- ShutDown();
- return;
- }
- pending_error_.reset(new PendingError(offset, error));
- if (pending_receive_ &&
- pending_receive_->DispatchError(pending_error_.get(), bytes_received_)) {
- pending_receive_.reset();
- waiter_.reset();
- }
+ pending_data_buffers_.push(linked_ptr<DataFrame>(new DataFrame(error)));
+ if (pending_receive_)
+ ReceiveInternal();
+}
+
+void DataReceiver::OnData(mojo::Array<uint8_t> data) {
+ pending_data_buffers_.push(linked_ptr<DataFrame>(new DataFrame(data.Pass())));
+ if (pending_receive_)
+ ReceiveInternal();
}
void DataReceiver::OnConnectionError() {
@@ -171,91 +168,34 @@ void DataReceiver::Done(uint32_t bytes_consumed) {
return;
DCHECK(pending_receive_);
- MojoResult result = mojo::EndReadDataRaw(handle_.get(), bytes_consumed);
- DCHECK_EQ(MOJO_RESULT_OK, result);
- pending_receive_.reset();
- bytes_received_ += bytes_consumed;
-}
-
-void DataReceiver::OnDoneWaiting(MojoResult result) {
- DCHECK(pending_receive_ && !shut_down_ && waiter_);
- waiter_.reset();
- if (result != MOJO_RESULT_OK) {
- ShutDown();
- return;
+ DataFrame& pending_data = *pending_data_buffers_.front();
+ pending_data.offset += bytes_consumed;
+ DCHECK_LE(pending_data.offset, pending_data.data.size());
+ if (pending_data.offset == pending_data.data.size()) {
+ source_->ReportBytesSent(pending_data.data.size());
+ pending_data_buffers_.pop();
}
- ReceiveInternal();
+ pending_receive_.reset();
}
void DataReceiver::ReceiveInternal() {
if (shut_down_)
return;
DCHECK(pending_receive_);
- if (pending_error_ &&
- pending_receive_->DispatchError(pending_error_.get(), bytes_received_)) {
- pending_receive_.reset();
- waiter_.reset();
+ if (pending_receive_->buffer_in_use())
return;
- }
- const void* data;
- uint32_t num_bytes = std::numeric_limits<uint32_t>::max();
- MojoResult result = mojo::BeginReadDataRaw(
- handle_.get(), &data, &num_bytes, MOJO_READ_DATA_FLAG_NONE);
- if (result == MOJO_RESULT_OK) {
- if (!CheckErrorNotInReadRange(num_bytes)) {
- ShutDown();
- return;
- }
-
- pending_receive_->DispatchData(data, num_bytes);
- return;
- }
- if (result == MOJO_RESULT_SHOULD_WAIT) {
- waiter_.reset(new AsyncWaiter(
- handle_.get(),
- MOJO_HANDLE_SIGNAL_READABLE,
- base::Bind(&DataReceiver::OnDoneWaiting, weak_factory_.GetWeakPtr())));
- return;
+ if (!pending_data_buffers_.empty() &&
+ pending_receive_->DispatchDataFrame(
+ pending_data_buffers_.front().get())) {
+ pending_receive_.reset();
}
- ShutDown();
-}
-
-bool DataReceiver::CheckErrorNotInReadRange(uint32_t num_bytes) {
- DCHECK(pending_receive_);
- if (!pending_error_)
- return true;
-
- DCHECK_NE(bytes_received_, pending_error_->offset);
- DCHECK_NE(num_bytes, 0u);
- uint32_t potential_bytes_received = bytes_received_ + num_bytes;
- // bytes_received_ can overflow so we must consider two cases:
- // 1. Both |bytes_received_| and |pending_error_->offset| have overflowed an
- // equal number of times. In this case, |potential_bytes_received| must
- // be in the range (|bytes_received|, |pending_error_->offset|]. Below
- // this range can only occur if |bytes_received_| overflows before
- // |pending_error_->offset|. Above can only occur if |bytes_received_|
- // overtakes |pending_error_->offset|.
- // 2. |pending_error_->offset| has overflowed once more than
- // |bytes_received_|. In this case, |potential_bytes_received| must not
- // be in the range (|pending_error_->offset|, |bytes_received_|].
- if ((bytes_received_ < pending_error_->offset &&
- (potential_bytes_received > pending_error_->offset ||
- potential_bytes_received <= bytes_received_)) ||
- (bytes_received_ > pending_error_->offset &&
- potential_bytes_received > pending_error_->offset &&
- potential_bytes_received <= bytes_received_)) {
- return false;
- }
- return true;
}
void DataReceiver::ShutDown() {
shut_down_ = true;
if (pending_receive_)
pending_receive_->DispatchFatalError();
- pending_error_.reset();
- waiter_.reset();
}
DataReceiver::PendingReceive::PendingReceive(
@@ -270,23 +210,28 @@ DataReceiver::PendingReceive::PendingReceive(
buffer_in_use_(false) {
}
-void DataReceiver::PendingReceive::DispatchData(const void* data,
- uint32_t num_bytes) {
+bool DataReceiver::PendingReceive::DispatchDataFrame(
+ DataReceiver::DataFrame* data) {
DCHECK(!buffer_in_use_);
- buffer_in_use_ = true;
- receive_callback_.Run(scoped_ptr<ReadOnlyBuffer>(
- new Buffer(receiver_, this, static_cast<const char*>(data), num_bytes)));
-}
+ DCHECK(!data->dispatched);
-bool DataReceiver::PendingReceive::DispatchError(PendingError* error,
- uint32_t bytes_received) {
- DCHECK(!error->dispatched);
- if (buffer_in_use_ || bytes_received != error->offset)
- return false;
-
- error->dispatched = true;
- receive_error_callback_.Run(error->error);
- return true;
+ if (data->is_error) {
+ data->dispatched = true;
+ base::MessageLoop::current()->PostTask(
+ FROM_HERE, base::Bind(receive_error_callback_, data->error));
+ return true;
+ }
+ buffer_in_use_ = true;
+ base::MessageLoop::current()->PostTask(
+ FROM_HERE,
+ base::Bind(
+ receive_callback_,
+ base::Passed(scoped_ptr<ReadOnlyBuffer>(new Buffer(
+ receiver_,
+ this,
+ reinterpret_cast<char*>(&data->data[0]) + data->offset,
+ static_cast<uint32_t>(data->data.size() - data->offset))))));
+ return false;
}
void DataReceiver::PendingReceive::DispatchFatalError() {

Powered by Google App Engine
This is Rietveld 408576698