Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1135)

Unified Diff: device/serial/data_source_sender.cc

Issue 437933002: Add data pipe wrappers to be used to implement serial receive. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@serial-buffer
Patch Set: Created 6 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « device/serial/data_source_sender.h ('k') | device/serial/data_source_unittest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: device/serial/data_source_sender.cc
diff --git a/device/serial/data_source_sender.cc b/device/serial/data_source_sender.cc
new file mode 100644
index 0000000000000000000000000000000000000000..876a5e5557fffc93b79bbba928ed2aef2a115d34
--- /dev/null
+++ b/device/serial/data_source_sender.cc
@@ -0,0 +1,249 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "device/serial/data_source_sender.h"
+
+#include <limits>
+
+#include "base/bind.h"
+#include "base/message_loop/message_loop.h"
+#include "device/serial/async_waiter.h"
+
+namespace device {
+
+// Represents a send that is not yet fulfilled.
+class DataSourceSender::PendingSend {
+ public:
+ PendingSend(DataSourceSender* sender, const ReadyCallback& callback);
+
+ // Asynchronously fills |data| with up to |num_bytes| of data. Following this,
+ // one of Done() and DoneWithError() will be called with the result.
+ void GetData(void* data, uint32_t num_bytes);
+
+ private:
+ class Buffer;
+ // Reports a successful write of |bytes_written|.
+ void Done(uint32_t bytes_written);
+
+ // Reports a partially successful or unsuccessful write of |bytes_written|
+ // with an error of |error|.
+ void DoneWithError(uint32_t bytes_written, int32_t error);
+
+ // The DataSourceSender that owns this.
+ DataSourceSender* sender_;
+
+ // The callback to call to get data.
+ ReadyCallback callback_;
+
+ // Whether the buffer specified by GetData() has been passed to |callback_|,
+ // but has not yet called Done() or DoneWithError().
+ bool buffer_in_use_;
+};
+
+// A Writable implementation that provides a view of a data pipe owned by a
+// DataSourceSender.
+class DataSourceSender::PendingSend::Buffer : public WritableBuffer {
+ public:
+ Buffer(scoped_refptr<DataSourceSender> sender,
+ PendingSend* send,
+ char* buffer,
+ uint32_t buffer_size);
+ virtual ~Buffer();
+
+ // WritableBuffer overrides.
+ virtual char* GetData() OVERRIDE;
+ virtual uint32_t GetSize() OVERRIDE;
+ virtual void Done(uint32_t bytes_written) OVERRIDE;
+ virtual void DoneWithError(uint32_t bytes_written, int32_t error) OVERRIDE;
+
+ private:
+ // The DataSourceSender whose data pipe we are providing a view.
+ scoped_refptr<DataSourceSender> sender_;
+
+ // The PendingSend to which this buffer has been created in response.
+ PendingSend* pending_send_;
+
+ char* buffer_;
+ uint32_t buffer_size_;
+};
+
+DataSourceSender::DataSourceSender(const ReadyCallback& ready_callback,
+ const ErrorCallback& error_callback)
+ : ready_callback_(ready_callback),
+ error_callback_(error_callback),
+ bytes_sent_(0),
+ shut_down_(false) {
+ DCHECK(!ready_callback.is_null() && !error_callback.is_null());
+}
+
+void DataSourceSender::ShutDown() {
+ shut_down_ = true;
+ waiter_.reset();
+ ready_callback_.Reset();
+ error_callback_.Reset();
+}
+
+DataSourceSender::~DataSourceSender() {
+ DCHECK(shut_down_);
+}
+
+void DataSourceSender::Init(mojo::ScopedDataPipeProducerHandle handle) {
+ // This should never occur. |handle_| is only valid and |pending_send_| is
+ // only set after Init is called. Receiving an invalid |handle| from the
+ // client is also unrecoverable.
+ if (pending_send_ || handle_.is_valid() || !handle.is_valid() || shut_down_) {
+ DispatchFatalError();
+ return;
+ }
+ handle_ = handle.Pass();
+ pending_send_.reset(new PendingSend(this, ready_callback_));
+ StartWaiting();
+}
+
+void DataSourceSender::Resume() {
+ if (pending_send_ || !handle_.is_valid()) {
+ DispatchFatalError();
+ return;
+ }
+
+ pending_send_.reset(new PendingSend(this, ready_callback_));
+ StartWaiting();
+}
+
+void DataSourceSender::OnConnectionError() {
+ DispatchFatalError();
+}
+
+void DataSourceSender::StartWaiting() {
+ DCHECK(pending_send_ && !waiter_);
+ waiter_.reset(
+ new AsyncWaiter(handle_.get(),
+ MOJO_HANDLE_SIGNAL_WRITABLE,
+ base::Bind(&DataSourceSender::OnDoneWaiting, this)));
+}
+
+void DataSourceSender::OnDoneWaiting(MojoResult result) {
+ DCHECK(pending_send_ && !shut_down_ && waiter_);
+ waiter_.reset();
+ if (result != MOJO_RESULT_OK) {
+ DispatchFatalError();
+ return;
+ }
+ void* data = NULL;
+ uint32_t num_bytes = std::numeric_limits<uint32_t>::max();
+ result = mojo::BeginWriteDataRaw(
+ handle_.get(), &data, &num_bytes, MOJO_WRITE_DATA_FLAG_NONE);
+ if (result != MOJO_RESULT_OK) {
+ DispatchFatalError();
+ return;
+ }
+ pending_send_->GetData(static_cast<char*>(data), num_bytes);
+}
+
+void DataSourceSender::Done(uint32_t bytes_written) {
+ DoneInternal(bytes_written);
+ if (!shut_down_)
+ StartWaiting();
+}
+
+void DataSourceSender::DoneWithError(uint32_t bytes_written, int32_t error) {
+ DoneInternal(bytes_written);
+ pending_send_.reset();
+ if (!shut_down_)
+ client()->OnError(bytes_sent_, error);
+ // We don't call StartWaiting here so we don't send any additional data until
+ // Resume() is called.
+}
+
+void DataSourceSender::DoneInternal(uint32_t bytes_written) {
+ DCHECK(pending_send_);
+ if (shut_down_)
+ return;
+
+ bytes_sent_ += bytes_written;
+ MojoResult result = mojo::EndWriteDataRaw(handle_.get(), bytes_written);
+ if (result != MOJO_RESULT_OK) {
+ DispatchFatalError();
+ return;
+ }
+}
+
+void DataSourceSender::DispatchFatalError() {
+ if (shut_down_)
+ return;
+
+ error_callback_.Run();
+ ShutDown();
+}
+
+DataSourceSender::PendingSend::PendingSend(DataSourceSender* sender,
+ const ReadyCallback& callback)
+ : sender_(sender), callback_(callback), buffer_in_use_(false) {
+}
+
+void DataSourceSender::PendingSend::GetData(void* data, uint32_t num_bytes) {
+ DCHECK(!buffer_in_use_);
+ buffer_in_use_ = true;
+ callback_.Run(scoped_ptr<WritableBuffer>(
+ new Buffer(sender_, this, static_cast<char*>(data), num_bytes)));
+}
+
+void DataSourceSender::PendingSend::Done(uint32_t bytes_written) {
+ DCHECK(buffer_in_use_);
+ buffer_in_use_ = false;
+ sender_->Done(bytes_written);
+}
+
+void DataSourceSender::PendingSend::DoneWithError(uint32_t bytes_written,
+ int32_t error) {
+ DCHECK(buffer_in_use_);
+ buffer_in_use_ = false;
+ sender_->DoneWithError(bytes_written, error);
+}
+
+DataSourceSender::PendingSend::Buffer::Buffer(
+ scoped_refptr<DataSourceSender> sender,
+ PendingSend* send,
+ char* buffer,
+ uint32_t buffer_size)
+ : sender_(sender),
+ pending_send_(send),
+ buffer_(buffer),
+ buffer_size_(buffer_size) {
+}
+
+DataSourceSender::PendingSend::Buffer::~Buffer() {
+ if (sender_)
+ pending_send_->Done(0);
+}
+
+char* DataSourceSender::PendingSend::Buffer::GetData() {
+ return buffer_;
+}
+
+uint32_t DataSourceSender::PendingSend::Buffer::GetSize() {
+ return buffer_size_;
+}
+
+void DataSourceSender::PendingSend::Buffer::Done(uint32_t bytes_written) {
+ DCHECK(sender_);
+ pending_send_->Done(bytes_written);
+ sender_ = NULL;
+ pending_send_ = NULL;
+ buffer_ = NULL;
+ buffer_size_ = 0;
+}
+
+void DataSourceSender::PendingSend::Buffer::DoneWithError(
+ uint32_t bytes_written,
+ int32_t error) {
+ DCHECK(sender_);
+ pending_send_->DoneWithError(bytes_written, error);
+ sender_ = NULL;
+ pending_send_ = NULL;
+ buffer_ = NULL;
+ buffer_size_ = 0;
+}
+
+} // namespace device
« no previous file with comments | « device/serial/data_source_sender.h ('k') | device/serial/data_source_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698