Chromium Code Reviews| Index: device/serial/data_source_sender.cc |
| diff --git a/device/serial/data_source_sender.cc b/device/serial/data_source_sender.cc |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..64ef1dc4b157b5f1310ecbebc0c94130d02b2939 |
| --- /dev/null |
| +++ b/device/serial/data_source_sender.cc |
| @@ -0,0 +1,212 @@ |
| +// Copyright 2014 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +#include "device/serial/data_source_sender.h" |
| + |
| +#include <limits> |
| + |
| +#include "base/bind.h" |
| +#include "base/message_loop/message_loop.h" |
| +#include "device/serial/async_waiter.h" |
| + |
| +namespace device { |
| + |
| +class DataSourceSender::PendingSend { |
| + public: |
| + PendingSend(DataSourceSender* sender, const ReadyCallback& callback); |
| + void GetData(void* data, uint32_t num_bytes); |
| + |
| + private: |
| + class Buffer; |
| + void Done(uint32_t bytes_produced); |
| + void DoneWithError(uint32_t bytes_produced, int32_t error); |
| + |
| + DataSourceSender* sender_; |
| + ReadyCallback callback_; |
| + bool buffer_in_use_; |
| +}; |
| + |
| +class DataSourceSender::PendingSend::Buffer : public WritableBuffer { |
| + public: |
| + Buffer(scoped_refptr<DataSourceSender> sender, |
| + PendingSend* send, |
| + char* buffer, |
| + uint32_t buffer_size); |
| + virtual ~Buffer(); |
| + virtual char* GetData() OVERRIDE; |
| + virtual uint32_t GetSize() OVERRIDE; |
| + virtual void Done(uint32_t bytes_produced) OVERRIDE; |
| + virtual void DoneWithError(uint32_t bytes_produced, int32_t error) OVERRIDE; |
| + |
| + private: |
| + scoped_refptr<DataSourceSender> sender_; |
| + PendingSend* send_; |
| + char* buffer_; |
| + uint32_t buffer_size_; |
| +}; |
| + |
| +DataSourceSender::DataSourceSender(const ReadyCallback& ready_callback, |
| + const ErrorCallback& error_callback) |
| + : ready_callback_(ready_callback), |
| + error_callback_(error_callback), |
| + bytes_sent_(0), |
| + shut_down_(false) { |
| + DCHECK(!ready_callback.is_null() && !error_callback.is_null()); |
| +} |
| + |
| +void DataSourceSender::ShutDown() { |
| + shut_down_ = true; |
| + waiter_.reset(); |
|
raymes
2014/08/06 06:41:53
I guess we could reset send_ here too?
Also consid
Sam McNally
2014/08/06 08:28:14
We can't reset pending_send_ in case it has buffer
|
| +} |
| + |
| +DataSourceSender::~DataSourceSender() { |
| + DCHECK(shut_down_); |
| +} |
| + |
| +void DataSourceSender::Init(mojo::ScopedDataPipeProducerHandle handle) { |
| + if (send_ || handle_.is_valid()) |
|
raymes
2014/08/06 06:41:53
send_ -> pending_send_
I guess this case should n
Sam McNally
2014/08/06 08:28:14
Done.
|
| + return; |
| + |
| + handle_ = handle.Pass(); |
| + send_.reset(new PendingSend(this, ready_callback_)); |
| + StartWaiting(); |
| +} |
| + |
| +void DataSourceSender::Resume() { |
| + if (send_ || !handle_.is_valid()) |
|
raymes
2014/08/06 06:41:53
-Can the handle_ ever go from being valid to inval
Sam McNally
2014/08/06 08:28:14
It could get called before Init().
|
| + return; |
| + |
| + send_.reset(new PendingSend(this, ready_callback_)); |
| + StartWaiting(); |
| +} |
| + |
| +void DataSourceSender::OnConnectionError() { |
| + DispatchFatalError(); |
| +} |
| + |
| +void DataSourceSender::StartWaiting() { |
| + DCHECK(send_); |
|
raymes
2014/08/06 06:41:53
Also !waiter_ should be true here?
Sam McNally
2014/08/06 08:28:14
Done.
|
| + waiter_.reset( |
| + new AsyncWaiter(handle_.get(), |
| + MOJO_HANDLE_SIGNAL_WRITABLE, |
| + base::Bind(&DataSourceSender::OnDoneWaiting, this))); |
| +} |
| + |
| +void DataSourceSender::OnDoneWaiting(MojoResult result) { |
| + DCHECK(send_ && !shut_down_); |
| + if (!HandleMojoResult(result)) |
| + return; |
|
raymes
2014/08/06 06:41:53
I think you can get rid of the HandleMojoResult fu
Sam McNally
2014/08/06 08:28:14
Done.
|
| + void* data = NULL; |
| + uint32_t num_bytes = std::numeric_limits<uint32_t>::max(); |
| + result = mojo::BeginWriteDataRaw( |
| + handle_.get(), &data, &num_bytes, MOJO_READ_DATA_FLAG_NONE); |
| + if (!HandleMojoResult(result)) |
| + return; |
| + send_->GetData(static_cast<char*>(data), num_bytes); |
| +} |
| + |
| +void DataSourceSender::Done(uint32_t bytes_produced) { |
| + DoneInternal(bytes_produced); |
| + if (!shut_down_) |
| + StartWaiting(); |
| +} |
| + |
| +void DataSourceSender::DoneWithError(uint32_t bytes_produced, int32_t error) { |
|
raymes
2014/08/06 06:41:53
Make a note in here that we don't call StartWaitin
Sam McNally
2014/08/06 08:28:15
Done.
|
| + DoneInternal(bytes_produced); |
| + send_.reset(); |
| + if (!shut_down_) |
| + client()->OnError(bytes_sent_, error); |
| +} |
| + |
| +void DataSourceSender::DoneInternal(uint32_t bytes_produced) { |
|
raymes
2014/08/06 06:41:53
Should we bail if we are shut_down_ on entry to th
Sam McNally
2014/08/06 08:28:15
Done.
|
| + DCHECK(send_); |
| + bytes_sent_ += bytes_produced; |
| + MojoResult result = mojo::EndWriteDataRaw(handle_.get(), bytes_produced); |
| + if (!HandleMojoResult(result)) |
| + return; |
| +} |
| + |
| +bool DataSourceSender::HandleMojoResult(MojoResult result) { |
| + DCHECK(send_); |
| + if (result == MOJO_RESULT_OK) |
| + return true; |
| + DispatchFatalError(); |
| + return false; |
| +} |
| + |
| +void DataSourceSender::DispatchFatalError() { |
| + if (shut_down_) |
| + return; |
| + |
| + ShutDown(); |
| + error_callback_.Run(); |
| +} |
| + |
| +DataSourceSender::PendingSend::PendingSend(DataSourceSender* sender, |
| + const ReadyCallback& callback) |
| + : sender_(sender), callback_(callback), buffer_in_use_(false) { |
| +} |
| + |
| +void DataSourceSender::PendingSend::GetData(void* data, uint32_t num_bytes) { |
| + DCHECK(!buffer_in_use_); |
| + buffer_in_use_ = true; |
| + callback_.Run(scoped_ptr<WritableBuffer>( |
| + new Buffer(sender_, this, static_cast<char*>(data), num_bytes))); |
| +} |
| + |
| +void DataSourceSender::PendingSend::Done(uint32_t bytes_produced) { |
| + DCHECK(buffer_in_use_); |
| + buffer_in_use_ = false; |
| + sender_->Done(bytes_produced); |
| +} |
| + |
| +void DataSourceSender::PendingSend::DoneWithError(uint32_t bytes_produced, |
| + int32_t error) { |
| + DCHECK(buffer_in_use_); |
| + buffer_in_use_ = false; |
| + sender_->DoneWithError(bytes_produced, error); |
| +} |
| + |
| +DataSourceSender::PendingSend::Buffer::Buffer( |
| + scoped_refptr<DataSourceSender> sender, |
| + PendingSend* send, |
| + char* buffer, |
| + uint32_t buffer_size) |
| + : sender_(sender), send_(send), buffer_(buffer), buffer_size_(buffer_size) { |
| +} |
| + |
| +DataSourceSender::PendingSend::Buffer::~Buffer() { |
| + if (sender_) |
| + send_->Done(0); |
| +} |
| + |
| +char* DataSourceSender::PendingSend::Buffer::GetData() { |
| + return buffer_; |
| +} |
| + |
| +uint32_t DataSourceSender::PendingSend::Buffer::GetSize() { |
| + return buffer_size_; |
| +} |
| + |
| +void DataSourceSender::PendingSend::Buffer::Done(uint32_t bytes_produced) { |
| + DCHECK(sender_); |
| + send_->Done(bytes_produced); |
| + sender_ = NULL; |
| + send_ = NULL; |
| + buffer_ = NULL; |
| + buffer_size_ = 0; |
| +} |
| + |
| +void DataSourceSender::PendingSend::Buffer::DoneWithError( |
| + uint32_t bytes_produced, |
| + int32_t error) { |
| + DCHECK(sender_); |
| + send_->DoneWithError(bytes_produced, error); |
| + sender_ = NULL; |
| + send_ = NULL; |
| + buffer_ = NULL; |
| + buffer_size_ = 0; |
| +} |
| + |
| +} // namespace device |