| Index: device/serial/data_source_sender.cc
|
| diff --git a/device/serial/data_source_sender.cc b/device/serial/data_source_sender.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..876a5e5557fffc93b79bbba928ed2aef2a115d34
|
| --- /dev/null
|
| +++ b/device/serial/data_source_sender.cc
|
| @@ -0,0 +1,249 @@
|
| +// Copyright 2014 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +#include "device/serial/data_source_sender.h"
|
| +
|
| +#include <limits>
|
| +
|
| +#include "base/bind.h"
|
| +#include "base/message_loop/message_loop.h"
|
| +#include "device/serial/async_waiter.h"
|
| +
|
| +namespace device {
|
| +
|
| +// Represents a send that is not yet fulfilled.
|
| +class DataSourceSender::PendingSend {
|
| + public:
|
| + PendingSend(DataSourceSender* sender, const ReadyCallback& callback);
|
| +
|
| + // Asynchronously fills |data| with up to |num_bytes| of data. Following this,
|
| + // one of Done() and DoneWithError() will be called with the result.
|
| + void GetData(void* data, uint32_t num_bytes);
|
| +
|
| + private:
|
| + class Buffer;
|
| + // Reports a successful write of |bytes_written|.
|
| + void Done(uint32_t bytes_written);
|
| +
|
| + // Reports a partially successful or unsuccessful write of |bytes_written|
|
| + // with an error of |error|.
|
| + void DoneWithError(uint32_t bytes_written, int32_t error);
|
| +
|
| + // The DataSourceSender that owns this.
|
| + DataSourceSender* sender_;
|
| +
|
| + // The callback to call to get data.
|
| + ReadyCallback callback_;
|
| +
|
| + // Whether the buffer specified by GetData() has been passed to |callback_|,
|
| + // but has not yet called Done() or DoneWithError().
|
| + bool buffer_in_use_;
|
| +};
|
| +
|
| +// A Writable implementation that provides a view of a data pipe owned by a
|
| +// DataSourceSender.
|
| +class DataSourceSender::PendingSend::Buffer : public WritableBuffer {
|
| + public:
|
| + Buffer(scoped_refptr<DataSourceSender> sender,
|
| + PendingSend* send,
|
| + char* buffer,
|
| + uint32_t buffer_size);
|
| + virtual ~Buffer();
|
| +
|
| + // WritableBuffer overrides.
|
| + virtual char* GetData() OVERRIDE;
|
| + virtual uint32_t GetSize() OVERRIDE;
|
| + virtual void Done(uint32_t bytes_written) OVERRIDE;
|
| + virtual void DoneWithError(uint32_t bytes_written, int32_t error) OVERRIDE;
|
| +
|
| + private:
|
| + // The DataSourceSender whose data pipe we are providing a view.
|
| + scoped_refptr<DataSourceSender> sender_;
|
| +
|
| + // The PendingSend to which this buffer has been created in response.
|
| + PendingSend* pending_send_;
|
| +
|
| + char* buffer_;
|
| + uint32_t buffer_size_;
|
| +};
|
| +
|
| +DataSourceSender::DataSourceSender(const ReadyCallback& ready_callback,
|
| + const ErrorCallback& error_callback)
|
| + : ready_callback_(ready_callback),
|
| + error_callback_(error_callback),
|
| + bytes_sent_(0),
|
| + shut_down_(false) {
|
| + DCHECK(!ready_callback.is_null() && !error_callback.is_null());
|
| +}
|
| +
|
| +void DataSourceSender::ShutDown() {
|
| + shut_down_ = true;
|
| + waiter_.reset();
|
| + ready_callback_.Reset();
|
| + error_callback_.Reset();
|
| +}
|
| +
|
| +DataSourceSender::~DataSourceSender() {
|
| + DCHECK(shut_down_);
|
| +}
|
| +
|
| +void DataSourceSender::Init(mojo::ScopedDataPipeProducerHandle handle) {
|
| + // This should never occur. |handle_| is only valid and |pending_send_| is
|
| + // only set after Init is called. Receiving an invalid |handle| from the
|
| + // client is also unrecoverable.
|
| + if (pending_send_ || handle_.is_valid() || !handle.is_valid() || shut_down_) {
|
| + DispatchFatalError();
|
| + return;
|
| + }
|
| + handle_ = handle.Pass();
|
| + pending_send_.reset(new PendingSend(this, ready_callback_));
|
| + StartWaiting();
|
| +}
|
| +
|
| +void DataSourceSender::Resume() {
|
| + if (pending_send_ || !handle_.is_valid()) {
|
| + DispatchFatalError();
|
| + return;
|
| + }
|
| +
|
| + pending_send_.reset(new PendingSend(this, ready_callback_));
|
| + StartWaiting();
|
| +}
|
| +
|
| +void DataSourceSender::OnConnectionError() {
|
| + DispatchFatalError();
|
| +}
|
| +
|
| +void DataSourceSender::StartWaiting() {
|
| + DCHECK(pending_send_ && !waiter_);
|
| + waiter_.reset(
|
| + new AsyncWaiter(handle_.get(),
|
| + MOJO_HANDLE_SIGNAL_WRITABLE,
|
| + base::Bind(&DataSourceSender::OnDoneWaiting, this)));
|
| +}
|
| +
|
| +void DataSourceSender::OnDoneWaiting(MojoResult result) {
|
| + DCHECK(pending_send_ && !shut_down_ && waiter_);
|
| + waiter_.reset();
|
| + if (result != MOJO_RESULT_OK) {
|
| + DispatchFatalError();
|
| + return;
|
| + }
|
| + void* data = NULL;
|
| + uint32_t num_bytes = std::numeric_limits<uint32_t>::max();
|
| + result = mojo::BeginWriteDataRaw(
|
| + handle_.get(), &data, &num_bytes, MOJO_WRITE_DATA_FLAG_NONE);
|
| + if (result != MOJO_RESULT_OK) {
|
| + DispatchFatalError();
|
| + return;
|
| + }
|
| + pending_send_->GetData(static_cast<char*>(data), num_bytes);
|
| +}
|
| +
|
| +void DataSourceSender::Done(uint32_t bytes_written) {
|
| + DoneInternal(bytes_written);
|
| + if (!shut_down_)
|
| + StartWaiting();
|
| +}
|
| +
|
| +void DataSourceSender::DoneWithError(uint32_t bytes_written, int32_t error) {
|
| + DoneInternal(bytes_written);
|
| + pending_send_.reset();
|
| + if (!shut_down_)
|
| + client()->OnError(bytes_sent_, error);
|
| + // We don't call StartWaiting here so we don't send any additional data until
|
| + // Resume() is called.
|
| +}
|
| +
|
| +void DataSourceSender::DoneInternal(uint32_t bytes_written) {
|
| + DCHECK(pending_send_);
|
| + if (shut_down_)
|
| + return;
|
| +
|
| + bytes_sent_ += bytes_written;
|
| + MojoResult result = mojo::EndWriteDataRaw(handle_.get(), bytes_written);
|
| + if (result != MOJO_RESULT_OK) {
|
| + DispatchFatalError();
|
| + return;
|
| + }
|
| +}
|
| +
|
| +void DataSourceSender::DispatchFatalError() {
|
| + if (shut_down_)
|
| + return;
|
| +
|
| + error_callback_.Run();
|
| + ShutDown();
|
| +}
|
| +
|
| +DataSourceSender::PendingSend::PendingSend(DataSourceSender* sender,
|
| + const ReadyCallback& callback)
|
| + : sender_(sender), callback_(callback), buffer_in_use_(false) {
|
| +}
|
| +
|
| +void DataSourceSender::PendingSend::GetData(void* data, uint32_t num_bytes) {
|
| + DCHECK(!buffer_in_use_);
|
| + buffer_in_use_ = true;
|
| + callback_.Run(scoped_ptr<WritableBuffer>(
|
| + new Buffer(sender_, this, static_cast<char*>(data), num_bytes)));
|
| +}
|
| +
|
| +void DataSourceSender::PendingSend::Done(uint32_t bytes_written) {
|
| + DCHECK(buffer_in_use_);
|
| + buffer_in_use_ = false;
|
| + sender_->Done(bytes_written);
|
| +}
|
| +
|
| +void DataSourceSender::PendingSend::DoneWithError(uint32_t bytes_written,
|
| + int32_t error) {
|
| + DCHECK(buffer_in_use_);
|
| + buffer_in_use_ = false;
|
| + sender_->DoneWithError(bytes_written, error);
|
| +}
|
| +
|
| +DataSourceSender::PendingSend::Buffer::Buffer(
|
| + scoped_refptr<DataSourceSender> sender,
|
| + PendingSend* send,
|
| + char* buffer,
|
| + uint32_t buffer_size)
|
| + : sender_(sender),
|
| + pending_send_(send),
|
| + buffer_(buffer),
|
| + buffer_size_(buffer_size) {
|
| +}
|
| +
|
| +DataSourceSender::PendingSend::Buffer::~Buffer() {
|
| + if (sender_)
|
| + pending_send_->Done(0);
|
| +}
|
| +
|
| +char* DataSourceSender::PendingSend::Buffer::GetData() {
|
| + return buffer_;
|
| +}
|
| +
|
| +uint32_t DataSourceSender::PendingSend::Buffer::GetSize() {
|
| + return buffer_size_;
|
| +}
|
| +
|
| +void DataSourceSender::PendingSend::Buffer::Done(uint32_t bytes_written) {
|
| + DCHECK(sender_);
|
| + pending_send_->Done(bytes_written);
|
| + sender_ = NULL;
|
| + pending_send_ = NULL;
|
| + buffer_ = NULL;
|
| + buffer_size_ = 0;
|
| +}
|
| +
|
| +void DataSourceSender::PendingSend::Buffer::DoneWithError(
|
| + uint32_t bytes_written,
|
| + int32_t error) {
|
| + DCHECK(sender_);
|
| + pending_send_->DoneWithError(bytes_written, error);
|
| + sender_ = NULL;
|
| + pending_send_ = NULL;
|
| + buffer_ = NULL;
|
| + buffer_size_ = 0;
|
| +}
|
| +
|
| +} // namespace device
|
|
|