Chromium Code Reviews| Index: device/serial/data_receiver.cc |
| diff --git a/device/serial/data_receiver.cc b/device/serial/data_receiver.cc |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..ddb3b5b787be4f80e8989ebb2a38df1f2e9fc887 |
| --- /dev/null |
| +++ b/device/serial/data_receiver.cc |
| @@ -0,0 +1,278 @@ |
| +// Copyright 2014 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +#include "device/serial/data_receiver.h" |
| + |
| +#include <limits> |
| + |
| +#include "base/bind.h" |
| +#include "base/message_loop/message_loop.h" |
| +#include "device/serial/async_waiter.h" |
| + |
| +namespace device { |
| + |
| +class DataReceiver::PendingReceive { |
|
raymes
2014/08/06 05:46:43
Please add some comments for these helper classes
Sam McNally
2014/08/06 08:28:14
Done.
|
| + public: |
| + PendingReceive(DataReceiver* receiver, |
| + const ReceiveDataCallback& callback, |
| + const ReceiveErrorCallback& error_callback, |
| + int32_t fatal_error_value); |
| + void DispatchData(const void* data, uint32_t num_bytes); |
| + bool DispatchError(DataReceiver::PendingError* error, |
| + uint32_t bytes_received); |
| + void DispatchFatalError(); |
| + |
| + private: |
| + class Buffer; |
|
raymes
2014/08/06 05:46:43
since this class is only used in the .cc file, you
Sam McNally
2014/08/06 08:28:14
It's nested so it can see DataReceiver::PendingRec
|
| + void Done(uint32_t num_bytes); |
| + |
| + DataReceiver* receiver_; |
| + ReceiveDataCallback receive_callback_; |
| + ReceiveErrorCallback receive_error_callback_; |
| + const int32_t fatal_error_value_; |
| + bool buffer_in_use_; |
| +}; |
| + |
| +class DataReceiver::PendingReceive::Buffer : public ReadOnlyBuffer { |
| + public: |
| + Buffer(scoped_refptr<DataReceiver> pipe, |
| + PendingReceive* receive, |
| + const char* buffer, |
| + uint32_t buffer_size); |
| + virtual ~Buffer(); |
| + virtual const char* GetData() OVERRIDE; |
| + virtual uint32_t GetSize() OVERRIDE; |
| + virtual void Done(uint32_t bytes_consumed) OVERRIDE; |
| + virtual void DoneWithError(uint32_t bytes_consumed, int32_t error) OVERRIDE; |
| + |
| + private: |
| + scoped_refptr<DataReceiver> receiver_; |
| + PendingReceive* receive_; |
| + const char* buffer_; |
| + uint32_t buffer_size_; |
| +}; |
| + |
| +struct DataReceiver::PendingError { |
| + PendingError(uint32_t offset, int32_t error) |
| + : offset(offset), error(error), dispatched(false) {} |
| + |
| + const uint32_t offset; |
| + const int32_t error; |
| + bool dispatched; |
| +}; |
| + |
| +DataReceiver::DataReceiver(mojo::InterfacePtr<serial::DataSource> source, |
| + uint32_t buffer_size, |
| + int32_t fatal_error_value) |
| + : source_(source.Pass()), |
| + fatal_error_value_(fatal_error_value), |
| + bytes_received_(0), |
| + shut_down_(false), |
| + weak_factory_(this) { |
| + MojoCreateDataPipeOptions options = { |
| + sizeof(options), MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1, buffer_size, |
| + }; |
| + mojo::ScopedDataPipeProducerHandle remote_handle; |
| + MojoResult result = mojo::CreateDataPipe(&options, &remote_handle, &handle_); |
| + DCHECK_EQ(MOJO_RESULT_OK, result); |
| + source_->Init(remote_handle.Pass()); |
| + source_.set_client(this); |
| +} |
| + |
| +bool DataReceiver::Receive(const ReceiveDataCallback& callback, |
| + const ReceiveErrorCallback& error_callback) { |
| + DCHECK(!callback.is_null() && !error_callback.is_null()); |
| + if (receive_ || shut_down_) |
|
raymes
2014/08/06 05:46:43
probably pending_receive_ is a more descriptive na
Sam McNally
2014/08/06 08:28:14
Done.
|
| + return false; |
| + if (error_ && error_->dispatched) { |
|
raymes
2014/08/06 05:46:43
similarly, pending_error_
Sam McNally
2014/08/06 08:28:14
Done.
|
| + source_->Resume(); |
|
raymes
2014/08/06 05:46:43
A comment here describing what's going on would be
Sam McNally
2014/08/06 08:28:14
Done.
|
| + error_.reset(); |
| + } |
| + |
| + receive_.reset( |
| + new PendingReceive(this, callback, error_callback, fatal_error_value_)); |
| + base::MessageLoop::current()->PostTask( |
| + FROM_HERE, |
| + base::Bind(&DataReceiver::ReceiveInternal, weak_factory_.GetWeakPtr())); |
| + return true; |
| +} |
| + |
| +DataReceiver::~DataReceiver() { |
| + ShutDown(); |
| +} |
| + |
| +void DataReceiver::OnError(uint32_t offset, int32_t error) { |
| + if (shut_down_) |
| + return; |
| + |
| + if (error_) { |
| + OnConnectionError(); |
|
raymes
2014/08/06 05:46:43
Change this to call ShutDown instead of OnConnecti
Sam McNally
2014/08/06 08:28:14
Done.
|
| + return; |
| + } |
| + error_.reset(new PendingError(offset, error)); |
| + if (receive_ && receive_->DispatchError(error_.get(), bytes_received_)) { |
| + receive_.reset(); |
| + waiter_.reset(); |
| + } |
| +} |
| + |
| +void DataReceiver::OnConnectionError() { |
| + ShutDown(); |
| +} |
| + |
| +void DataReceiver::Done(uint32_t bytes_consumed) { |
| + if (shut_down_) |
| + return; |
| + |
| + DCHECK(receive_); |
| + MojoResult result = mojo::EndReadDataRaw(handle_.get(), bytes_consumed); |
| + DCHECK_EQ(MOJO_RESULT_OK, result); |
| + receive_.reset(); |
| + bytes_received_ += bytes_consumed; |
| +} |
| + |
| +void DataReceiver::OnDoneWaiting(MojoResult result) { |
| + DCHECK(receive_ && !shut_down_); |
|
raymes
2014/08/06 05:46:43
we can DCHECK waiter_ and cal waiter_.reset() here
Sam McNally
2014/08/06 08:28:14
Done.
|
| + if (result != MOJO_RESULT_OK) { |
| + ShutDown(); |
| + return; |
| + } |
| + ReceiveInternal(); |
| +} |
| + |
| +void DataReceiver::ReceiveInternal() { |
| + if (shut_down_) |
| + return; |
| + DCHECK(receive_); |
| + if (error_ && receive_->DispatchError(error_.get(), bytes_received_)) { |
| + receive_.reset(); |
| + waiter_.reset(); |
| + return; |
| + } |
| + |
| + const void* data; |
| + uint32_t num_bytes = std::numeric_limits<uint32_t>::max(); |
| + MojoResult result = mojo::BeginReadDataRaw( |
| + handle_.get(), &data, &num_bytes, MOJO_READ_DATA_FLAG_NONE); |
| + if (result == MOJO_RESULT_OK) { |
| + if (error_ && !CheckBytesReceived(num_bytes)) { |
|
raymes
2014/08/06 05:46:43
Maybe rename this function to something like Check
Sam McNally
2014/08/06 08:28:14
Done.
|
| + ShutDown(); |
| + return; |
| + } |
| + |
| + receive_->DispatchData(data, num_bytes); |
| + return; |
| + } |
| + if (result == MOJO_RESULT_SHOULD_WAIT) { |
| + waiter_.reset(new AsyncWaiter( |
| + handle_.get(), |
| + MOJO_HANDLE_SIGNAL_READABLE, |
| + base::Bind(&DataReceiver::OnDoneWaiting, weak_factory_.GetWeakPtr()))); |
| + return; |
| + } |
| + ShutDown(); |
| +} |
| + |
| +bool DataReceiver::CheckBytesReceived(uint32_t num_bytes) { |
| + DCHECK(receive_); |
| + DCHECK_NE(bytes_received_, error_->offset); |
| + uint32_t potential_bytes_received = bytes_received_ + num_bytes; |
| + if ((bytes_received_ < error_->offset && |
| + (potential_bytes_received > error_->offset || |
| + potential_bytes_received < bytes_received_)) || |
| + (bytes_received_ > error_->offset && |
| + potential_bytes_received > error_->offset && |
| + potential_bytes_received < bytes_received_)) { |
| + return false; |
| + } |
|
raymes
2014/08/06 05:46:43
Mention the fact that this is complicated because
Sam McNally
2014/08/06 08:28:14
Done.
|
| + return true; |
| +} |
| + |
| +void DataReceiver::ShutDown() { |
| + shut_down_ = true; |
| + if (receive_) |
| + receive_->DispatchFatalError(); |
| + error_.reset(); |
| + waiter_.reset(); |
| +} |
| + |
| +DataReceiver::PendingReceive::PendingReceive( |
| + DataReceiver* receiver, |
| + const ReceiveDataCallback& callback, |
| + const ReceiveErrorCallback& error_callback, |
| + int32_t fatal_error_value) |
| + : receiver_(receiver), |
| + receive_callback_(callback), |
| + receive_error_callback_(error_callback), |
| + fatal_error_value_(fatal_error_value), |
| + buffer_in_use_(false) { |
| +} |
| + |
| +void DataReceiver::PendingReceive::DispatchData(const void* data, |
| + uint32_t num_bytes) { |
| + DCHECK(!buffer_in_use_); |
| + buffer_in_use_ = true; |
| + receive_callback_.Run(scoped_ptr<ReadOnlyBuffer>( |
| + new Buffer(receiver_, this, static_cast<const char*>(data), num_bytes))); |
| +} |
| + |
| +bool DataReceiver::PendingReceive::DispatchError(PendingError* error, |
| + uint32_t bytes_received) { |
| + if (buffer_in_use_ || bytes_received != error->offset || error->dispatched) |
|
raymes
2014/08/06 05:46:43
Probably if error->dispatched is already true, we
Sam McNally
2014/08/06 08:28:14
I don't think this should be called if error->disp
|
| + return false; |
| + |
| + error->dispatched = true; |
| + receive_error_callback_.Run(error->error); |
| + return true; |
| +} |
| + |
| +void DataReceiver::PendingReceive::DispatchFatalError() { |
| + receive_error_callback_.Run(fatal_error_value_); |
| +} |
| + |
| +void DataReceiver::PendingReceive::Done(uint32_t bytes_consumed) { |
| + DCHECK(buffer_in_use_); |
| + buffer_in_use_ = false; |
| + receiver_->Done(bytes_consumed); |
| +} |
| + |
| +DataReceiver::PendingReceive::Buffer::Buffer( |
| + scoped_refptr<DataReceiver> receiver, |
| + PendingReceive* receive, |
| + const char* buffer, |
| + uint32_t buffer_size) |
| + : receiver_(receiver), |
| + receive_(receive), |
| + buffer_(buffer), |
| + buffer_size_(buffer_size) { |
| +} |
| + |
| +DataReceiver::PendingReceive::Buffer::~Buffer() { |
| + if (receive_) |
| + receive_->Done(0); |
| +} |
| + |
| +const char* DataReceiver::PendingReceive::Buffer::GetData() { |
| + return buffer_; |
| +} |
| + |
| +uint32_t DataReceiver::PendingReceive::Buffer::GetSize() { |
| + return buffer_size_; |
| +} |
| + |
| +void DataReceiver::PendingReceive::Buffer::Done(uint32_t bytes_consumed) { |
| + receive_->Done(bytes_consumed); |
| + receive_ = NULL; |
| + receiver_ = NULL; |
| + buffer_ = NULL; |
| + buffer_size_ = 0; |
| +} |
| + |
| +void DataReceiver::PendingReceive::Buffer::DoneWithError( |
| + uint32_t bytes_consumed, |
| + int32_t error) { |
| + Done(bytes_consumed); |
| +} |
| + |
| +} // namespace device |