| Index: device/serial/data_receiver.cc
|
| diff --git a/device/serial/data_receiver.cc b/device/serial/data_receiver.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..521143797216bd8acb2f039d90c0a2570bbe8ff5
|
| --- /dev/null
|
| +++ b/device/serial/data_receiver.cc
|
| @@ -0,0 +1,340 @@
|
| +// Copyright 2014 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +#include "device/serial/data_receiver.h"
|
| +
|
| +#include <limits>
|
| +
|
| +#include "base/bind.h"
|
| +#include "base/message_loop/message_loop.h"
|
| +#include "device/serial/async_waiter.h"
|
| +
|
| +namespace device {
|
| +
|
| +// Represents a receive that is not yet fulfilled.
|
| +class DataReceiver::PendingReceive {
|
| + public:
|
| + PendingReceive(DataReceiver* receiver,
|
| + const ReceiveDataCallback& callback,
|
| + const ReceiveErrorCallback& error_callback,
|
| + int32_t fatal_error_value);
|
| +
|
| + // Dispatches |data| to |receive_callback_|.
|
| + void DispatchData(const void* data, uint32_t num_bytes);
|
| +
|
| + // Reports |error| to |receive_error_callback_| if it is an appropriate time.
|
| + // Returns whether it dispatched |error|.
|
| + bool DispatchError(DataReceiver::PendingError* error,
|
| + uint32_t bytes_received);
|
| +
|
| + // Reports |fatal_error_value_| to |receive_error_callback_|.
|
| + void DispatchFatalError();
|
| +
|
| + private:
|
| + class Buffer;
|
| +
|
| + // Invoked when the user is finished with the ReadOnlyBuffer provided to
|
| + // |receive_callback_|.
|
| + void Done(uint32_t num_bytes);
|
| +
|
| + // The DataReceiver that owns this.
|
| + DataReceiver* receiver_;
|
| +
|
| + // The callback to dispatch data.
|
| + ReceiveDataCallback receive_callback_;
|
| +
|
| + // The callback to report errors.
|
| + ReceiveErrorCallback receive_error_callback_;
|
| +
|
| + // The error value to report when DispatchFatalError() is called.
|
| + const int32_t fatal_error_value_;
|
| +
|
| + // True if the user owns a buffer passed to |receive_callback_| as part of
|
| + // DispatchData().
|
| + bool buffer_in_use_;
|
| +};
|
| +
|
| +// A ReadOnlyBuffer implementation that provides a view of a data pipe owned by
|
| +// a DataReceiver.
|
| +class DataReceiver::PendingReceive::Buffer : public ReadOnlyBuffer {
|
| + public:
|
| + Buffer(scoped_refptr<DataReceiver> pipe,
|
| + PendingReceive* receive,
|
| + const char* buffer,
|
| + uint32_t buffer_size);
|
| + virtual ~Buffer();
|
| +
|
| + // ReadOnlyBuffer overrides.
|
| + virtual const char* GetData() OVERRIDE;
|
| + virtual uint32_t GetSize() OVERRIDE;
|
| + virtual void Done(uint32_t bytes_consumed) OVERRIDE;
|
| + virtual void DoneWithError(uint32_t bytes_consumed, int32_t error) OVERRIDE;
|
| +
|
| + private:
|
| + // The DataReceiver whose data pipe we are providing a view.
|
| + scoped_refptr<DataReceiver> receiver_;
|
| +
|
| + // The PendingReceive to which this buffer has been created in response.
|
| + PendingReceive* pending_receive_;
|
| +
|
| + const char* buffer_;
|
| + uint32_t buffer_size_;
|
| +};
|
| +
|
| +// Represents an error received from the DataSource.
|
| +struct DataReceiver::PendingError {
|
| + PendingError(uint32_t offset, int32_t error)
|
| + : offset(offset), error(error), dispatched(false) {}
|
| +
|
| + // The location within the data stream where the error occurred.
|
| + const uint32_t offset;
|
| +
|
| + // The value of the error that occurred.
|
| + const int32_t error;
|
| +
|
| + // Whether the error has been dispatched to the user.
|
| + bool dispatched;
|
| +};
|
| +
|
| +DataReceiver::DataReceiver(mojo::InterfacePtr<serial::DataSource> source,
|
| + uint32_t buffer_size,
|
| + int32_t fatal_error_value)
|
| + : source_(source.Pass()),
|
| + fatal_error_value_(fatal_error_value),
|
| + bytes_received_(0),
|
| + shut_down_(false),
|
| + weak_factory_(this) {
|
| + MojoCreateDataPipeOptions options = {
|
| + sizeof(options), MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1, buffer_size,
|
| + };
|
| + mojo::ScopedDataPipeProducerHandle remote_handle;
|
| + MojoResult result = mojo::CreateDataPipe(&options, &remote_handle, &handle_);
|
| + DCHECK_EQ(MOJO_RESULT_OK, result);
|
| + source_->Init(remote_handle.Pass());
|
| + source_.set_client(this);
|
| +}
|
| +
|
| +bool DataReceiver::Receive(const ReceiveDataCallback& callback,
|
| + const ReceiveErrorCallback& error_callback) {
|
| + DCHECK(!callback.is_null() && !error_callback.is_null());
|
| + if (pending_receive_ || shut_down_)
|
| + return false;
|
| + // When the DataSource encounters an error, it pauses transmission. When the
|
| + // user starts a new receive following notification of the error (via
|
| + // |error_callback| of the previous Receive call) of the error we can tell the
|
| + // DataSource to resume transmission of data.
|
| + if (pending_error_ && pending_error_->dispatched) {
|
| + source_->Resume();
|
| + pending_error_.reset();
|
| + }
|
| +
|
| + pending_receive_.reset(
|
| + new PendingReceive(this, callback, error_callback, fatal_error_value_));
|
| + base::MessageLoop::current()->PostTask(
|
| + FROM_HERE,
|
| + base::Bind(&DataReceiver::ReceiveInternal, weak_factory_.GetWeakPtr()));
|
| + return true;
|
| +}
|
| +
|
| +DataReceiver::~DataReceiver() {
|
| + ShutDown();
|
| +}
|
| +
|
| +void DataReceiver::OnError(uint32_t offset, int32_t error) {
|
| + if (shut_down_)
|
| + return;
|
| +
|
| + if (pending_error_) {
|
| + // When OnError is called by the DataSource, transmission of data is
|
| + // suspended. Thus we shouldn't receive another call to OnError until we
|
| + // have fully dealt with the error and called Resume to resume transmission
|
| + // (see Receive()). Under normal operation we should never get here, but if
|
| + // we do (e.g. in the case of a hijacked service process) just shut down.
|
| + ShutDown();
|
| + return;
|
| + }
|
| + pending_error_.reset(new PendingError(offset, error));
|
| + if (pending_receive_ &&
|
| + pending_receive_->DispatchError(pending_error_.get(), bytes_received_)) {
|
| + pending_receive_.reset();
|
| + waiter_.reset();
|
| + }
|
| +}
|
| +
|
| +void DataReceiver::OnConnectionError() {
|
| + ShutDown();
|
| +}
|
| +
|
| +void DataReceiver::Done(uint32_t bytes_consumed) {
|
| + if (shut_down_)
|
| + return;
|
| +
|
| + DCHECK(pending_receive_);
|
| + MojoResult result = mojo::EndReadDataRaw(handle_.get(), bytes_consumed);
|
| + DCHECK_EQ(MOJO_RESULT_OK, result);
|
| + pending_receive_.reset();
|
| + bytes_received_ += bytes_consumed;
|
| +}
|
| +
|
| +void DataReceiver::OnDoneWaiting(MojoResult result) {
|
| + DCHECK(pending_receive_ && !shut_down_ && waiter_);
|
| + waiter_.reset();
|
| + if (result != MOJO_RESULT_OK) {
|
| + ShutDown();
|
| + return;
|
| + }
|
| + ReceiveInternal();
|
| +}
|
| +
|
| +void DataReceiver::ReceiveInternal() {
|
| + if (shut_down_)
|
| + return;
|
| + DCHECK(pending_receive_);
|
| + if (pending_error_ &&
|
| + pending_receive_->DispatchError(pending_error_.get(), bytes_received_)) {
|
| + pending_receive_.reset();
|
| + waiter_.reset();
|
| + return;
|
| + }
|
| +
|
| + const void* data;
|
| + uint32_t num_bytes = std::numeric_limits<uint32_t>::max();
|
| + MojoResult result = mojo::BeginReadDataRaw(
|
| + handle_.get(), &data, &num_bytes, MOJO_READ_DATA_FLAG_NONE);
|
| + if (result == MOJO_RESULT_OK) {
|
| + if (!CheckErrorNotInReadRange(num_bytes)) {
|
| + ShutDown();
|
| + return;
|
| + }
|
| +
|
| + pending_receive_->DispatchData(data, num_bytes);
|
| + return;
|
| + }
|
| + if (result == MOJO_RESULT_SHOULD_WAIT) {
|
| + waiter_.reset(new AsyncWaiter(
|
| + handle_.get(),
|
| + MOJO_HANDLE_SIGNAL_READABLE,
|
| + base::Bind(&DataReceiver::OnDoneWaiting, weak_factory_.GetWeakPtr())));
|
| + return;
|
| + }
|
| + ShutDown();
|
| +}
|
| +
|
| +bool DataReceiver::CheckErrorNotInReadRange(uint32_t num_bytes) {
|
| + DCHECK(pending_receive_);
|
| + if (!pending_error_)
|
| + return true;
|
| +
|
| + DCHECK_NE(bytes_received_, pending_error_->offset);
|
| + DCHECK_NE(num_bytes, 0u);
|
| + uint32_t potential_bytes_received = bytes_received_ + num_bytes;
|
| + // bytes_received_ can overflow so we must consider two cases:
|
| + // 1. Both |bytes_received_| and |pending_error_->offset| have overflowed an
|
| + // equal number of times. In this case, |potential_bytes_received| must
|
| + // be in the range (|bytes_received|, |pending_error_->offset|]. Below
|
| + // this range can only occur if |bytes_received_| overflows before
|
| + // |pending_error_->offset|. Above can only occur if |bytes_received_|
|
| + // overtakes |pending_error_->offset|.
|
| + // 2. |pending_error_->offset| has overflowed once more than
|
| + // |bytes_received_|. In this case, |potential_bytes_received| must not
|
| + // be in the range (|pending_error_->offset|, |bytes_received_|].
|
| + if ((bytes_received_ < pending_error_->offset &&
|
| + (potential_bytes_received > pending_error_->offset ||
|
| + potential_bytes_received <= bytes_received_)) ||
|
| + (bytes_received_ > pending_error_->offset &&
|
| + potential_bytes_received > pending_error_->offset &&
|
| + potential_bytes_received <= bytes_received_)) {
|
| + return false;
|
| + }
|
| + return true;
|
| +}
|
| +
|
| +void DataReceiver::ShutDown() {
|
| + shut_down_ = true;
|
| + if (pending_receive_)
|
| + pending_receive_->DispatchFatalError();
|
| + pending_error_.reset();
|
| + waiter_.reset();
|
| +}
|
| +
|
| +DataReceiver::PendingReceive::PendingReceive(
|
| + DataReceiver* receiver,
|
| + const ReceiveDataCallback& callback,
|
| + const ReceiveErrorCallback& error_callback,
|
| + int32_t fatal_error_value)
|
| + : receiver_(receiver),
|
| + receive_callback_(callback),
|
| + receive_error_callback_(error_callback),
|
| + fatal_error_value_(fatal_error_value),
|
| + buffer_in_use_(false) {
|
| +}
|
| +
|
| +void DataReceiver::PendingReceive::DispatchData(const void* data,
|
| + uint32_t num_bytes) {
|
| + DCHECK(!buffer_in_use_);
|
| + buffer_in_use_ = true;
|
| + receive_callback_.Run(scoped_ptr<ReadOnlyBuffer>(
|
| + new Buffer(receiver_, this, static_cast<const char*>(data), num_bytes)));
|
| +}
|
| +
|
| +bool DataReceiver::PendingReceive::DispatchError(PendingError* error,
|
| + uint32_t bytes_received) {
|
| + DCHECK(!error->dispatched);
|
| + if (buffer_in_use_ || bytes_received != error->offset)
|
| + return false;
|
| +
|
| + error->dispatched = true;
|
| + receive_error_callback_.Run(error->error);
|
| + return true;
|
| +}
|
| +
|
| +void DataReceiver::PendingReceive::DispatchFatalError() {
|
| + receive_error_callback_.Run(fatal_error_value_);
|
| +}
|
| +
|
| +void DataReceiver::PendingReceive::Done(uint32_t bytes_consumed) {
|
| + DCHECK(buffer_in_use_);
|
| + buffer_in_use_ = false;
|
| + receiver_->Done(bytes_consumed);
|
| +}
|
| +
|
| +DataReceiver::PendingReceive::Buffer::Buffer(
|
| + scoped_refptr<DataReceiver> receiver,
|
| + PendingReceive* receive,
|
| + const char* buffer,
|
| + uint32_t buffer_size)
|
| + : receiver_(receiver),
|
| + pending_receive_(receive),
|
| + buffer_(buffer),
|
| + buffer_size_(buffer_size) {
|
| +}
|
| +
|
| +DataReceiver::PendingReceive::Buffer::~Buffer() {
|
| + if (pending_receive_)
|
| + pending_receive_->Done(0);
|
| +}
|
| +
|
| +const char* DataReceiver::PendingReceive::Buffer::GetData() {
|
| + return buffer_;
|
| +}
|
| +
|
| +uint32_t DataReceiver::PendingReceive::Buffer::GetSize() {
|
| + return buffer_size_;
|
| +}
|
| +
|
| +void DataReceiver::PendingReceive::Buffer::Done(uint32_t bytes_consumed) {
|
| + pending_receive_->Done(bytes_consumed);
|
| + pending_receive_ = NULL;
|
| + receiver_ = NULL;
|
| + buffer_ = NULL;
|
| + buffer_size_ = 0;
|
| +}
|
| +
|
| +void DataReceiver::PendingReceive::Buffer::DoneWithError(
|
| + uint32_t bytes_consumed,
|
| + int32_t error) {
|
| + Done(bytes_consumed);
|
| +}
|
| +
|
| +} // namespace device
|
|
|