Chromium Code Reviews| Index: device/serial/data_receiver.cc |
| diff --git a/device/serial/data_receiver.cc b/device/serial/data_receiver.cc |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..d1a3f030442c84d4a49644015a6ddcb70a035519 |
| --- /dev/null |
| +++ b/device/serial/data_receiver.cc |
| @@ -0,0 +1,196 @@ |
| +// Copyright 2014 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +#include "device/serial/data_receiver.h" |
| + |
| +#include <limits> |
| + |
| +#include "base/bind.h" |
| +#include "base/message_loop/message_loop.h" |
| +#include "device/serial/async_waiter.h" |
| + |
| +namespace device { |
| + |
| +class DataReceiver::Buffer : public ReadOnlyBuffer { |
| + public: |
| + Buffer(scoped_refptr<DataReceiver> pipe, |
| + const char* buffer, |
| + uint32_t buffer_size); |
| + virtual ~Buffer(); |
| + virtual const char* GetData() OVERRIDE; |
| + virtual uint32_t GetSize() OVERRIDE; |
| + virtual void Done(uint32_t bytes_consumed) OVERRIDE; |
| + virtual void DoneWithError(uint32_t bytes_consumed, int32_t error) OVERRIDE; |
| + |
| + private: |
| + scoped_refptr<DataReceiver> pipe_; |
| + const char* buffer_; |
| + uint32_t buffer_size_; |
| +}; |
| + |
| +DataReceiver::DataReceiver(mojo::InterfacePtr<serial::DataSource> source, |
| + uint32_t buffer_size, |
| + int32_t connection_error_value) |
| + : source_(source.Pass()), |
| + connection_error_value_(connection_error_value), |
| + bytes_since_last_error_(0), |
| + pending_error_(false), |
| + error_offset_(0), |
| + error_(0), |
| + state_(STATE_IDLE), |
| + weak_factory_(this) { |
| + MojoCreateDataPipeOptions options = { |
| + sizeof(options), MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1, buffer_size, |
| + }; |
| + options.struct_size = sizeof(options); |
| + mojo::ScopedDataPipeProducerHandle remote_handle; |
| + MojoResult result = mojo::CreateDataPipe(&options, &remote_handle, &handle_); |
| + DCHECK_EQ(MOJO_RESULT_OK, result); |
| + source_->Init(remote_handle.Pass()); |
| + source_.set_client(this); |
| +} |
| + |
| +bool DataReceiver::Receive(const ReceiveDataCallback& callback, |
| + const ReceiveErrorCallback& error_callback) { |
| + DCHECK(!callback.is_null() && !error_callback.is_null()); |
| + if (state_ == STATE_PAUSED) { |
| + source_->Resume(); |
| + state_ = STATE_IDLE; |
| + } |
| + if (state_ != STATE_IDLE) |
| + return false; |
| + |
| + state_ = STATE_WAITING_FOR_DATA; |
| + receive_callback_ = callback; |
| + receive_error_callback_ = error_callback; |
| + base::MessageLoop::current()->PostTask( |
| + FROM_HERE, |
| + base::Bind(&DataReceiver::ReceiveInternal, weak_factory_.GetWeakPtr())); |
| + return true; |
| +} |
| + |
| +DataReceiver::~DataReceiver() { |
| + if (!receive_error_callback_.is_null()) |
| + DispatchError(connection_error_value_); |
| +} |
| + |
| +void DataReceiver::Done(uint32_t bytes_consumed) { |
| + DCHECK(state_ == STATE_WAITING_FOR_BUFFER || state_ == STATE_SHUT_DOWN); |
| + if (state_ == STATE_SHUT_DOWN) |
| + return; |
| + |
| + MojoResult result = mojo::EndReadDataRaw(handle_.get(), bytes_consumed); |
| + DCHECK_EQ(MOJO_RESULT_OK, result); |
| + bytes_since_last_error_ += bytes_consumed; |
| + state_ = STATE_IDLE; |
| +} |
| + |
| +void DataReceiver::OnDoneWaiting(MojoResult result) { |
|
raymes
2014/08/05 08:10:20
I think we can only be in a subset of states here,
Sam McNally
2014/08/05 08:33:09
Done.
|
| + if (result != MOJO_RESULT_OK) { |
| + OnConnectionError(); |
|
raymes
2014/08/05 08:10:20
Consider inlining OnConnectionError here as I also
Sam McNally
2014/08/05 08:33:09
Done.
|
| + return; |
| + } |
| + RetryReceive(); |
| +} |
| + |
| +void DataReceiver::OnError(uint32_t bytes_since_last_error, int32_t error) { |
|
raymes
2014/08/05 08:10:20
Consider putting comments in all of the entrypoint
Sam McNally
2014/08/05 08:33:08
Done.
|
| + pending_error_ = true; |
| + error_ = error; |
| + error_offset_ = bytes_since_last_error; |
| + RetryReceive(); |
| +} |
| + |
| +void DataReceiver::OnConnectionError() { |
| + state_ = STATE_SHUT_DOWN; |
| + if (!receive_callback_.is_null()) |
|
raymes
2014/08/05 08:10:20
Maybe rather than checking whether callbacks are n
Sam McNally
2014/08/05 08:33:09
Done.
|
| + DispatchError(connection_error_value_); |
| +} |
| + |
| +void DataReceiver::RetryReceive() { |
|
raymes
2014/08/05 08:10:20
Inline RetryReceive to reduce indirection
Sam McNally
2014/08/05 08:33:08
Done.
|
| + if (!receive_callback_.is_null()) |
| + ReceiveInternal(); |
| +} |
| + |
| +void DataReceiver::ReceiveInternal() { |
| + DCHECK(state_ == STATE_WAITING_FOR_DATA); |
| + if (pending_error_ && bytes_since_last_error_ >= error_offset_) { |
| + pending_error_ = false; |
| + bytes_since_last_error_ -= error_offset_; |
|
raymes
2014/08/05 08:10:20
As we discussed consider always just counting all
Sam McNally
2014/08/05 08:33:08
Done.
|
| + error_offset_ = 0; |
| + state_ = STATE_PAUSED; |
| + DispatchError(error_); |
| + return; |
| + } |
| + const void* data; |
| + uint32_t num_bytes = std::numeric_limits<uint32_t>::max(); |
| + MojoResult result = mojo::BeginReadDataRaw( |
| + handle_.get(), &data, &num_bytes, MOJO_READ_DATA_FLAG_NONE); |
| + if (result == MOJO_RESULT_OK) { |
| + DispatchData(data, num_bytes); |
| + return; |
| + } |
| + if (result == MOJO_RESULT_SHOULD_WAIT) { |
| + waiter_.reset(new AsyncWaiter( |
| + handle_.get(), |
| + MOJO_HANDLE_SIGNAL_READABLE, |
| + base::Bind(&DataReceiver::OnDoneWaiting, weak_factory_.GetWeakPtr()))); |
| + return; |
| + } |
| + state_ = STATE_SHUT_DOWN; |
| + DispatchError(connection_error_value_); |
| +} |
| + |
| +void DataReceiver::DispatchData(const void* data, uint32_t num_bytes) { |
| + DCHECK(state_ == STATE_WAITING_FOR_DATA); |
| + state_ = STATE_WAITING_FOR_BUFFER; |
| + ReceiveDataCallback callback = receive_callback_; |
| + receive_callback_.Reset(); |
| + receive_error_callback_.Reset(); |
| + callback.Run(scoped_ptr<ReadOnlyBuffer>( |
| + new Buffer(this, static_cast<const char*>(data), num_bytes))); |
| +} |
| + |
| +void DataReceiver::DispatchError(int32_t error) { |
| + DCHECK(state_ == STATE_WAITING_FOR_DATA || state_ == STATE_PAUSED || |
| + state_ == STATE_SHUT_DOWN); |
| + if (state_ == STATE_WAITING_FOR_DATA) |
| + state_ = STATE_IDLE; |
| + ReceiveErrorCallback callback = receive_error_callback_; |
| + receive_callback_.Reset(); |
| + receive_error_callback_.Reset(); |
| + callback.Run(error); |
| +} |
| + |
| +DataReceiver::Buffer::Buffer(scoped_refptr<DataReceiver> pipe, |
| + const char* buffer, |
| + uint32_t buffer_size) |
| + : pipe_(pipe), buffer_(buffer), buffer_size_(buffer_size) { |
| +} |
| + |
| +DataReceiver::Buffer::~Buffer() { |
| + if (pipe_) |
| + pipe_->Done(0); |
| +} |
| + |
| +const char* DataReceiver::Buffer::GetData() { |
| + return buffer_; |
| +} |
| + |
| +uint32_t DataReceiver::Buffer::GetSize() { |
| + return buffer_size_; |
| +} |
| + |
| +void DataReceiver::Buffer::Done(uint32_t bytes_consumed) { |
| + pipe_->Done(bytes_consumed); |
| + pipe_ = NULL; |
| + buffer_ = NULL; |
| + buffer_size_ = 0; |
| +} |
| + |
| +void DataReceiver::Buffer::DoneWithError(uint32_t bytes_consumed, |
| + int32_t error) { |
| + Done(bytes_consumed); |
| +} |
| + |
| +} // namespace device |