Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(491)

Unified Diff: device/serial/data_receiver.cc

Issue 437933002: Add data pipe wrappers to be used to implement serial receive. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@serial-buffer
Patch Set: Created 6 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « device/serial/data_receiver.h ('k') | device/serial/data_source_sender.h » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: device/serial/data_receiver.cc
diff --git a/device/serial/data_receiver.cc b/device/serial/data_receiver.cc
new file mode 100644
index 0000000000000000000000000000000000000000..521143797216bd8acb2f039d90c0a2570bbe8ff5
--- /dev/null
+++ b/device/serial/data_receiver.cc
@@ -0,0 +1,340 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "device/serial/data_receiver.h"
+
+#include <limits>
+
+#include "base/bind.h"
+#include "base/message_loop/message_loop.h"
+#include "device/serial/async_waiter.h"
+
+namespace device {
+
+// Represents a receive that is not yet fulfilled.
+class DataReceiver::PendingReceive {
+ public:
+ PendingReceive(DataReceiver* receiver,
+ const ReceiveDataCallback& callback,
+ const ReceiveErrorCallback& error_callback,
+ int32_t fatal_error_value);
+
+ // Dispatches |data| to |receive_callback_|.
+ void DispatchData(const void* data, uint32_t num_bytes);
+
+ // Reports |error| to |receive_error_callback_| if it is an appropriate time.
+ // Returns whether it dispatched |error|.
+ bool DispatchError(DataReceiver::PendingError* error,
+ uint32_t bytes_received);
+
+ // Reports |fatal_error_value_| to |receive_error_callback_|.
+ void DispatchFatalError();
+
+ private:
+ class Buffer;
+
+ // Invoked when the user is finished with the ReadOnlyBuffer provided to
+ // |receive_callback_|.
+ void Done(uint32_t num_bytes);
+
+ // The DataReceiver that owns this.
+ DataReceiver* receiver_;
+
+ // The callback to dispatch data.
+ ReceiveDataCallback receive_callback_;
+
+ // The callback to report errors.
+ ReceiveErrorCallback receive_error_callback_;
+
+ // The error value to report when DispatchFatalError() is called.
+ const int32_t fatal_error_value_;
+
+ // True if the user owns a buffer passed to |receive_callback_| as part of
+ // DispatchData().
+ bool buffer_in_use_;
+};
+
+// A ReadOnlyBuffer implementation that provides a view of a data pipe owned by
+// a DataReceiver.
+class DataReceiver::PendingReceive::Buffer : public ReadOnlyBuffer {
+ public:
+ Buffer(scoped_refptr<DataReceiver> pipe,
+ PendingReceive* receive,
+ const char* buffer,
+ uint32_t buffer_size);
+ virtual ~Buffer();
+
+ // ReadOnlyBuffer overrides.
+ virtual const char* GetData() OVERRIDE;
+ virtual uint32_t GetSize() OVERRIDE;
+ virtual void Done(uint32_t bytes_consumed) OVERRIDE;
+ virtual void DoneWithError(uint32_t bytes_consumed, int32_t error) OVERRIDE;
+
+ private:
+ // The DataReceiver whose data pipe we are providing a view.
+ scoped_refptr<DataReceiver> receiver_;
+
+ // The PendingReceive to which this buffer has been created in response.
+ PendingReceive* pending_receive_;
+
+ const char* buffer_;
+ uint32_t buffer_size_;
+};
+
+// Represents an error received from the DataSource.
+struct DataReceiver::PendingError {
+ PendingError(uint32_t offset, int32_t error)
+ : offset(offset), error(error), dispatched(false) {}
+
+ // The location within the data stream where the error occurred.
+ const uint32_t offset;
+
+ // The value of the error that occurred.
+ const int32_t error;
+
+ // Whether the error has been dispatched to the user.
+ bool dispatched;
+};
+
+DataReceiver::DataReceiver(mojo::InterfacePtr<serial::DataSource> source,
+ uint32_t buffer_size,
+ int32_t fatal_error_value)
+ : source_(source.Pass()),
+ fatal_error_value_(fatal_error_value),
+ bytes_received_(0),
+ shut_down_(false),
+ weak_factory_(this) {
+ MojoCreateDataPipeOptions options = {
+ sizeof(options), MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1, buffer_size,
+ };
+ mojo::ScopedDataPipeProducerHandle remote_handle;
+ MojoResult result = mojo::CreateDataPipe(&options, &remote_handle, &handle_);
+ DCHECK_EQ(MOJO_RESULT_OK, result);
+ source_->Init(remote_handle.Pass());
+ source_.set_client(this);
+}
+
+bool DataReceiver::Receive(const ReceiveDataCallback& callback,
+ const ReceiveErrorCallback& error_callback) {
+ DCHECK(!callback.is_null() && !error_callback.is_null());
+ if (pending_receive_ || shut_down_)
+ return false;
+ // When the DataSource encounters an error, it pauses transmission. When the
+ // user starts a new receive following notification of the error (via
+ // |error_callback| of the previous Receive call) of the error we can tell the
+ // DataSource to resume transmission of data.
+ if (pending_error_ && pending_error_->dispatched) {
+ source_->Resume();
+ pending_error_.reset();
+ }
+
+ pending_receive_.reset(
+ new PendingReceive(this, callback, error_callback, fatal_error_value_));
+ base::MessageLoop::current()->PostTask(
+ FROM_HERE,
+ base::Bind(&DataReceiver::ReceiveInternal, weak_factory_.GetWeakPtr()));
+ return true;
+}
+
+DataReceiver::~DataReceiver() {
+ ShutDown();
+}
+
+void DataReceiver::OnError(uint32_t offset, int32_t error) {
+ if (shut_down_)
+ return;
+
+ if (pending_error_) {
+ // When OnError is called by the DataSource, transmission of data is
+ // suspended. Thus we shouldn't receive another call to OnError until we
+ // have fully dealt with the error and called Resume to resume transmission
+ // (see Receive()). Under normal operation we should never get here, but if
+ // we do (e.g. in the case of a hijacked service process) just shut down.
+ ShutDown();
+ return;
+ }
+ pending_error_.reset(new PendingError(offset, error));
+ if (pending_receive_ &&
+ pending_receive_->DispatchError(pending_error_.get(), bytes_received_)) {
+ pending_receive_.reset();
+ waiter_.reset();
+ }
+}
+
+void DataReceiver::OnConnectionError() {
+ ShutDown();
+}
+
+void DataReceiver::Done(uint32_t bytes_consumed) {
+ if (shut_down_)
+ return;
+
+ DCHECK(pending_receive_);
+ MojoResult result = mojo::EndReadDataRaw(handle_.get(), bytes_consumed);
+ DCHECK_EQ(MOJO_RESULT_OK, result);
+ pending_receive_.reset();
+ bytes_received_ += bytes_consumed;
+}
+
+void DataReceiver::OnDoneWaiting(MojoResult result) {
+ DCHECK(pending_receive_ && !shut_down_ && waiter_);
+ waiter_.reset();
+ if (result != MOJO_RESULT_OK) {
+ ShutDown();
+ return;
+ }
+ ReceiveInternal();
+}
+
+void DataReceiver::ReceiveInternal() {
+ if (shut_down_)
+ return;
+ DCHECK(pending_receive_);
+ if (pending_error_ &&
+ pending_receive_->DispatchError(pending_error_.get(), bytes_received_)) {
+ pending_receive_.reset();
+ waiter_.reset();
+ return;
+ }
+
+ const void* data;
+ uint32_t num_bytes = std::numeric_limits<uint32_t>::max();
+ MojoResult result = mojo::BeginReadDataRaw(
+ handle_.get(), &data, &num_bytes, MOJO_READ_DATA_FLAG_NONE);
+ if (result == MOJO_RESULT_OK) {
+ if (!CheckErrorNotInReadRange(num_bytes)) {
+ ShutDown();
+ return;
+ }
+
+ pending_receive_->DispatchData(data, num_bytes);
+ return;
+ }
+ if (result == MOJO_RESULT_SHOULD_WAIT) {
+ waiter_.reset(new AsyncWaiter(
+ handle_.get(),
+ MOJO_HANDLE_SIGNAL_READABLE,
+ base::Bind(&DataReceiver::OnDoneWaiting, weak_factory_.GetWeakPtr())));
+ return;
+ }
+ ShutDown();
+}
+
+bool DataReceiver::CheckErrorNotInReadRange(uint32_t num_bytes) {
+ DCHECK(pending_receive_);
+ if (!pending_error_)
+ return true;
+
+ DCHECK_NE(bytes_received_, pending_error_->offset);
+ DCHECK_NE(num_bytes, 0u);
+ uint32_t potential_bytes_received = bytes_received_ + num_bytes;
+ // bytes_received_ can overflow so we must consider two cases:
+ // 1. Both |bytes_received_| and |pending_error_->offset| have overflowed an
+ // equal number of times. In this case, |potential_bytes_received| must
+ // be in the range (|bytes_received|, |pending_error_->offset|]. Below
+ // this range can only occur if |bytes_received_| overflows before
+ // |pending_error_->offset|. Above can only occur if |bytes_received_|
+ // overtakes |pending_error_->offset|.
+ // 2. |pending_error_->offset| has overflowed once more than
+ // |bytes_received_|. In this case, |potential_bytes_received| must not
+ // be in the range (|pending_error_->offset|, |bytes_received_|].
+ if ((bytes_received_ < pending_error_->offset &&
+ (potential_bytes_received > pending_error_->offset ||
+ potential_bytes_received <= bytes_received_)) ||
+ (bytes_received_ > pending_error_->offset &&
+ potential_bytes_received > pending_error_->offset &&
+ potential_bytes_received <= bytes_received_)) {
+ return false;
+ }
+ return true;
+}
+
+void DataReceiver::ShutDown() {
+ shut_down_ = true;
+ if (pending_receive_)
+ pending_receive_->DispatchFatalError();
+ pending_error_.reset();
+ waiter_.reset();
+}
+
+DataReceiver::PendingReceive::PendingReceive(
+ DataReceiver* receiver,
+ const ReceiveDataCallback& callback,
+ const ReceiveErrorCallback& error_callback,
+ int32_t fatal_error_value)
+ : receiver_(receiver),
+ receive_callback_(callback),
+ receive_error_callback_(error_callback),
+ fatal_error_value_(fatal_error_value),
+ buffer_in_use_(false) {
+}
+
+void DataReceiver::PendingReceive::DispatchData(const void* data,
+ uint32_t num_bytes) {
+ DCHECK(!buffer_in_use_);
+ buffer_in_use_ = true;
+ receive_callback_.Run(scoped_ptr<ReadOnlyBuffer>(
+ new Buffer(receiver_, this, static_cast<const char*>(data), num_bytes)));
+}
+
+bool DataReceiver::PendingReceive::DispatchError(PendingError* error,
+ uint32_t bytes_received) {
+ DCHECK(!error->dispatched);
+ if (buffer_in_use_ || bytes_received != error->offset)
+ return false;
+
+ error->dispatched = true;
+ receive_error_callback_.Run(error->error);
+ return true;
+}
+
+void DataReceiver::PendingReceive::DispatchFatalError() {
+ receive_error_callback_.Run(fatal_error_value_);
+}
+
+void DataReceiver::PendingReceive::Done(uint32_t bytes_consumed) {
+ DCHECK(buffer_in_use_);
+ buffer_in_use_ = false;
+ receiver_->Done(bytes_consumed);
+}
+
+DataReceiver::PendingReceive::Buffer::Buffer(
+ scoped_refptr<DataReceiver> receiver,
+ PendingReceive* receive,
+ const char* buffer,
+ uint32_t buffer_size)
+ : receiver_(receiver),
+ pending_receive_(receive),
+ buffer_(buffer),
+ buffer_size_(buffer_size) {
+}
+
+DataReceiver::PendingReceive::Buffer::~Buffer() {
+ if (pending_receive_)
+ pending_receive_->Done(0);
+}
+
+const char* DataReceiver::PendingReceive::Buffer::GetData() {
+ return buffer_;
+}
+
+uint32_t DataReceiver::PendingReceive::Buffer::GetSize() {
+ return buffer_size_;
+}
+
+void DataReceiver::PendingReceive::Buffer::Done(uint32_t bytes_consumed) {
+ pending_receive_->Done(bytes_consumed);
+ pending_receive_ = NULL;
+ receiver_ = NULL;
+ buffer_ = NULL;
+ buffer_size_ = 0;
+}
+
+void DataReceiver::PendingReceive::Buffer::DoneWithError(
+ uint32_t bytes_consumed,
+ int32_t error) {
+ Done(bytes_consumed);
+}
+
+} // namespace device
« no previous file with comments | « device/serial/data_receiver.h ('k') | device/serial/data_source_sender.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698