Chromium Code Reviews| Index: device/serial/data_pipe_producer.cc |
| diff --git a/device/serial/data_pipe_producer.cc b/device/serial/data_pipe_producer.cc |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..ca0affb4ad47e7ed9d5d1eefb04f8d6abf068e9c |
| --- /dev/null |
| +++ b/device/serial/data_pipe_producer.cc |
| @@ -0,0 +1,153 @@ |
| +// Copyright 2014 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +#include "device/serial/data_pipe_producer.h" |
| + |
| +#include "base/bind.h" |
| +#include "base/message_loop/message_loop.h" |
| +#include "device/serial/async_waiter.h" |
| + |
| +namespace device { |
| + |
| +class DataPipeProducer::Buffer : public WritableBuffer { |
| + public: |
| + Buffer(scoped_refptr<DataPipeProducer> pipe, |
| + char* buffer, |
| + uint32_t buffer_size); |
| + virtual ~Buffer(); |
| + virtual char* GetData() OVERRIDE; |
| + virtual uint32_t GetSize() OVERRIDE; |
| + virtual void Done(uint32_t bytes_produced) OVERRIDE; |
| + virtual void DoneWithError(uint32_t bytes_produced, int32_t error) OVERRIDE; |
| + |
| + private: |
| + scoped_refptr<DataPipeProducer> pipe_; |
| + char* buffer_; |
| + uint32_t buffer_size_; |
| +}; |
| + |
| +DataPipeProducer::DataPipeProducer(const ReadyCallback& ready_callback, |
| + const ErrorCallback& error_callback) |
| + : ready_callback_(ready_callback), |
| + error_callback_(error_callback), |
| + state_(STATE_UNINITIALIZED), |
| + bytes_since_last_error_(0) { |
| +} |
| + |
| +void DataPipeProducer::Shutdown() { |
| + state_ = STATE_SHUT_DOWN; |
| + waiter_.reset(); |
| +} |
| + |
| +DataPipeProducer::~DataPipeProducer() { |
| +} |
| + |
| +void DataPipeProducer::Init(mojo::ScopedDataPipeProducerHandle handle) { |
| + if (state_ != STATE_UNINITIALIZED) |
| + return; |
|
raymes
2014/08/05 06:26:44
It might be good to DCHECK at the start of every f
Sam McNally
2014/08/05 07:26:33
Init and Resume are in response to client calls so
|
| + |
| + handle_ = handle.Pass(); |
| + StartWaiting(); |
| +} |
| + |
| +void DataPipeProducer::Resume() { |
| + if (state_ != STATE_PAUSED) |
| + return; |
| + |
| + StartWaiting(); |
| +} |
| + |
| +void DataPipeProducer::OnConnectionError() { |
| + HandleMojoResult(MOJO_RESULT_CANCELLED); |
| +} |
| + |
| +void DataPipeProducer::StartWaiting() { |
| + state_ = STATE_WAITING_FOR_SPACE; |
| + waiter_.reset( |
| + new AsyncWaiter(handle_.get(), |
| + MOJO_HANDLE_SIGNAL_WRITABLE, |
| + base::Bind(&DataPipeProducer::OnDoneWaiting, this))); |
| +} |
| + |
| +void DataPipeProducer::OnDoneWaiting(MojoResult result) { |
| + if (state_ != STATE_WAITING_FOR_SPACE || !HandleMojoResult(result)) |
| + return; |
| + void* data = NULL; |
| + uint32_t num_bytes = std::numeric_limits<uint32_t>::max(); |
| + result = mojo::BeginWriteDataRaw( |
| + handle_.get(), &data, &num_bytes, MOJO_READ_DATA_FLAG_NONE); |
| + if (!HandleMojoResult(result)) |
| + return; |
| + state_ = STATE_WAITING_FOR_BUFFER; |
| + ready_callback_.Run(scoped_ptr<WritableBuffer>( |
| + new Buffer(this, static_cast<char*>(data), num_bytes))); |
| +} |
| + |
| +void DataPipeProducer::Done(uint32_t bytes_produced) { |
| + bytes_since_last_error_ += bytes_produced; |
| + MojoResult result = mojo::EndWriteDataRaw(handle_.get(), bytes_produced); |
| + if (!HandleMojoResult(result)) |
| + return; |
| + if (state_ == STATE_WAITING_FOR_BUFFER) |
| + StartWaiting(); |
| +} |
| + |
| +void DataPipeProducer::DoneWithError(uint32_t bytes_produced, int32_t error) { |
| + if (state_ == STATE_WAITING_FOR_BUFFER) |
| + state_ = STATE_PAUSED; |
| + Done(bytes_produced); |
| + client()->OnError(bytes_since_last_error_, error); |
| + bytes_since_last_error_ = 0; |
| +} |
| + |
| +bool DataPipeProducer::HandleMojoResult(MojoResult result) { |
| + if (result == MOJO_RESULT_OK) |
| + return true; |
| + |
| + if (state_ == STATE_SHUT_DOWN) |
| + return false; |
| + |
| + Shutdown(); |
| + if (!error_callback_.is_null()) |
| + error_callback_.Run(); |
| + return false; |
| +} |
| + |
| +DataPipeProducer::Buffer::Buffer(scoped_refptr<DataPipeProducer> pipe, |
| + char* buffer, |
| + uint32_t buffer_size) |
| + : pipe_(pipe), buffer_(buffer), buffer_size_(buffer_size) { |
| +} |
| + |
| +DataPipeProducer::Buffer::~Buffer() { |
| + if (pipe_) |
| + pipe_->Done(0); |
| +} |
| + |
| +char* DataPipeProducer::Buffer::GetData() { |
| + return buffer_; |
| +} |
| + |
| +uint32_t DataPipeProducer::Buffer::GetSize() { |
| + return buffer_size_; |
| +} |
| + |
| +void DataPipeProducer::Buffer::Done(uint32_t bytes_produced) { |
| + DCHECK(pipe_); |
| + pipe_->Done(bytes_produced); |
| + pipe_ = NULL; |
| + buffer_ = NULL; |
| + buffer_size_ = 0; |
| +} |
| + |
| +void DataPipeProducer::Buffer::DoneWithError(uint32_t bytes_produced, |
| + int32_t error) { |
| + DCHECK(pipe_); |
| + pipe_->DoneWithError(bytes_produced, error); |
| + pipe_ = NULL; |
| + buffer_ = NULL; |
| + buffer_size_ = 0; |
| +} |
| + |
| +} // namespace device |