Chromium Code Reviews| Index: device/serial/data_pipe_receiver.cc |
| diff --git a/device/serial/data_pipe_receiver.cc b/device/serial/data_pipe_receiver.cc |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..9ea3685016d90c79122f17fe0517501271a56d19 |
| --- /dev/null |
| +++ b/device/serial/data_pipe_receiver.cc |
| @@ -0,0 +1,188 @@ |
| +// Copyright 2014 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +#include "device/serial/data_pipe_receiver.h" |
| + |
| +#include "base/bind.h" |
| +#include "base/message_loop/message_loop.h" |
| +#include "device/serial/async_waiter.h" |
| + |
| +namespace device { |
| + |
| +class DataPipeReceiver::Buffer : public ReadOnlyBuffer { |
| + public: |
| + Buffer(scoped_refptr<DataPipeReceiver> pipe, |
| + const char* buffer, |
| + uint32_t buffer_size); |
| + virtual ~Buffer(); |
| + virtual const char* GetData() OVERRIDE; |
| + virtual uint32_t GetSize() OVERRIDE; |
| + virtual void Done(uint32_t bytes_consumed) OVERRIDE; |
| + virtual void DoneWithError(uint32_t bytes_consumed, int32_t error) OVERRIDE; |
| + |
| + private: |
| + scoped_refptr<DataPipeReceiver> pipe_; |
| + const char* buffer_; |
| + uint32_t buffer_size_; |
| +}; |
| + |
| +DataPipeReceiver::DataPipeReceiver( |
| + mojo::InterfacePtr<serial::DataPipeProducer> producer, |
| + uint32_t buffer_size, |
| + int32_t connection_error_value) |
| + : producer_(producer.Pass()), |
| + connection_error_value_(connection_error_value), |
| + bytes_since_last_error_(0), |
| + pending_error_(false), |
| + error_offset_(0), |
| + error_(0), |
| + state_(STATE_IDLE), |
| + weak_factory_(this) { |
| + MojoCreateDataPipeOptions options = { |
| + sizeof(options), MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1, buffer_size, |
| + }; |
| + options.struct_size = sizeof(options); |
|
raymes
2014/08/05 06:26:44
is this necessary?
Sam McNally
2014/08/05 07:26:33
Yes.
|
| + mojo::ScopedDataPipeProducerHandle remote_handle; |
| + MojoResult result = mojo::CreateDataPipe(&options, &remote_handle, &handle_); |
| + DCHECK_EQ(MOJO_RESULT_OK, result); |
| + producer_->Init(remote_handle.Pass()); |
| + producer_.set_client(this); |
| +} |
| + |
| +bool DataPipeReceiver::Receive(const ReceiveDataCallback& callback, |
| + const ReceiveErrorCallback& error_callback) { |
|
raymes
2014/08/05 06:26:44
Similarly for these functions it would be good to
Sam McNally
2014/08/05 07:26:33
Done.
|
| + if (state_ == STATE_PAUSED) { |
| + producer_->Resume(); |
| + state_ = STATE_IDLE; |
| + } |
| + if (state_ != STATE_IDLE || callback.is_null() || error_callback.is_null()) { |
|
raymes
2014/08/05 06:26:44
Might as well just DCHECK null callbacks rather th
Sam McNally
2014/08/05 07:26:33
Done.
|
| + return false; |
| + } |
| + state_ = STATE_WAITING_FOR_DATA; |
| + receive_callback_ = callback; |
| + receive_error_callback_ = error_callback; |
| + base::MessageLoop::current()->PostTask( |
| + FROM_HERE, |
| + base::Bind(&DataPipeReceiver::ReceiveInternal, |
| + weak_factory_.GetWeakPtr())); |
| + return true; |
| +} |
| + |
| +DataPipeReceiver::~DataPipeReceiver() { |
| + if (!receive_error_callback_.is_null()) |
| + DispatchError(connection_error_value_); |
| +} |
| + |
| +void DataPipeReceiver::Done(uint32_t bytes_consumed) { |
| + if (state_ == STATE_SHUT_DOWN) |
| + return; |
| + |
| + MojoResult result = mojo::EndReadDataRaw(handle_.get(), bytes_consumed); |
| + DCHECK_EQ(MOJO_RESULT_OK, result); |
| + bytes_since_last_error_ += bytes_consumed; |
| + state_ = STATE_IDLE; |
| +} |
| + |
| +void DataPipeReceiver::OnDoneWaiting(MojoResult result) { |
| + RetryReceive(); |
| +} |
| + |
| +void DataPipeReceiver::OnError(uint32_t bytes_since_last_error, int32_t error) { |
| + pending_error_ = true; |
| + error_ = error; |
| + error_offset_ = bytes_since_last_error; |
| + RetryReceive(); |
| +} |
| + |
| +void DataPipeReceiver::OnConnectionError() { |
| + state_ = STATE_SHUT_DOWN; |
| + if (!receive_callback_.is_null()) |
| + DispatchError(connection_error_value_); |
| +} |
| + |
| +void DataPipeReceiver::RetryReceive() { |
| + if (!receive_callback_.is_null()) |
| + ReceiveInternal(); |
| +} |
| + |
| +void DataPipeReceiver::ReceiveInternal() { |
| + if (pending_error_ && bytes_since_last_error_ >= error_offset_) { |
| + pending_error_ = false; |
| + bytes_since_last_error_ -= error_offset_; |
| + error_offset_ = 0; |
| + state_ = STATE_PAUSED; |
| + DispatchError(error_); |
| + return; |
| + } |
| + const void* data; |
| + uint32_t num_bytes = std::numeric_limits<uint32_t>::max(); |
| + MojoResult result = mojo::BeginReadDataRaw( |
| + handle_.get(), &data, &num_bytes, MOJO_READ_DATA_FLAG_NONE); |
| + switch (result) { |
|
raymes
2014/08/05 06:26:44
consider just using if/else rather than switch sin
Sam McNally
2014/08/05 07:26:33
Done.
|
| + case MOJO_RESULT_OK: |
| + DispatchData(data, num_bytes); |
| + break; |
| + case MOJO_RESULT_SHOULD_WAIT: |
| + waiter_.reset(new AsyncWaiter(handle_.get(), |
| + MOJO_HANDLE_SIGNAL_READABLE, |
| + base::Bind(&DataPipeReceiver::OnDoneWaiting, |
|
raymes
2014/08/05 06:26:44
Could you bind directly to RetryReceive?
Sam McNally
2014/08/05 07:26:33
OnDoneWaiting should be doing something with the w
|
| + weak_factory_.GetWeakPtr()))); |
| + break; |
| + default: |
| + state_ = STATE_SHUT_DOWN; |
| + DispatchError(connection_error_value_); |
| + break; |
| + } |
| +} |
| + |
| +void DataPipeReceiver::DispatchData(const void* data, uint32_t num_bytes) { |
| + state_ = STATE_WAITING_FOR_BUFFER; |
| + ReceiveDataCallback callback = receive_callback_; |
| + receive_callback_.Reset(); |
| + receive_error_callback_.Reset(); |
| + callback.Run(scoped_ptr<ReadOnlyBuffer>( |
| + new Buffer(this, static_cast<const char*>(data), num_bytes))); |
| +} |
| + |
| +void DataPipeReceiver::DispatchError(int32_t error) { |
| + if (state_ == STATE_WAITING_FOR_DATA) |
| + state_ = STATE_IDLE; |
| + ReceiveErrorCallback callback = receive_error_callback_; |
| + receive_callback_.Reset(); |
| + receive_error_callback_.Reset(); |
| + callback.Run(error); |
| +} |
| + |
| +DataPipeReceiver::Buffer::Buffer(scoped_refptr<DataPipeReceiver> pipe, |
| + const char* buffer, |
| + uint32_t buffer_size) |
| + : pipe_(pipe), buffer_(buffer), buffer_size_(buffer_size) { |
| +} |
| + |
| +DataPipeReceiver::Buffer::~Buffer() { |
| + if (pipe_) |
| + pipe_->Done(0); |
| +} |
| + |
| +const char* DataPipeReceiver::Buffer::GetData() { |
| + return buffer_; |
| +} |
| + |
| +uint32_t DataPipeReceiver::Buffer::GetSize() { |
| + return buffer_size_; |
| +} |
| + |
| +void DataPipeReceiver::Buffer::Done(uint32_t bytes_consumed) { |
| + pipe_->Done(bytes_consumed); |
| + pipe_ = NULL; |
| + buffer_ = NULL; |
| + buffer_size_ = 0; |
| +} |
| + |
| +void DataPipeReceiver::Buffer::DoneWithError(uint32_t bytes_consumed, |
| + int32_t error) { |
| + Done(bytes_consumed); |
| +} |
| + |
| +} // namespace device |