Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(70)

Unified Diff: device/serial/data_pipe_receiver.cc

Issue 437933002: Add data pipe wrappers to be used to implement serial receive. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@serial-buffer
Patch Set: Created 6 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: device/serial/data_pipe_receiver.cc
diff --git a/device/serial/data_pipe_receiver.cc b/device/serial/data_pipe_receiver.cc
new file mode 100644
index 0000000000000000000000000000000000000000..9ea3685016d90c79122f17fe0517501271a56d19
--- /dev/null
+++ b/device/serial/data_pipe_receiver.cc
@@ -0,0 +1,188 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "device/serial/data_pipe_receiver.h"
+
+#include "base/bind.h"
+#include "base/message_loop/message_loop.h"
+#include "device/serial/async_waiter.h"
+
+namespace device {
+
+class DataPipeReceiver::Buffer : public ReadOnlyBuffer {
+ public:
+ Buffer(scoped_refptr<DataPipeReceiver> pipe,
+ const char* buffer,
+ uint32_t buffer_size);
+ virtual ~Buffer();
+ virtual const char* GetData() OVERRIDE;
+ virtual uint32_t GetSize() OVERRIDE;
+ virtual void Done(uint32_t bytes_consumed) OVERRIDE;
+ virtual void DoneWithError(uint32_t bytes_consumed, int32_t error) OVERRIDE;
+
+ private:
+ scoped_refptr<DataPipeReceiver> pipe_;
+ const char* buffer_;
+ uint32_t buffer_size_;
+};
+
+DataPipeReceiver::DataPipeReceiver(
+ mojo::InterfacePtr<serial::DataPipeProducer> producer,
+ uint32_t buffer_size,
+ int32_t connection_error_value)
+ : producer_(producer.Pass()),
+ connection_error_value_(connection_error_value),
+ bytes_since_last_error_(0),
+ pending_error_(false),
+ error_offset_(0),
+ error_(0),
+ state_(STATE_IDLE),
+ weak_factory_(this) {
+ MojoCreateDataPipeOptions options = {
+ sizeof(options), MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1, buffer_size,
+ };
+ options.struct_size = sizeof(options);
raymes 2014/08/05 06:26:44 is this necessary?
Sam McNally 2014/08/05 07:26:33 Yes.
+ mojo::ScopedDataPipeProducerHandle remote_handle;
+ MojoResult result = mojo::CreateDataPipe(&options, &remote_handle, &handle_);
+ DCHECK_EQ(MOJO_RESULT_OK, result);
+ producer_->Init(remote_handle.Pass());
+ producer_.set_client(this);
+}
+
+bool DataPipeReceiver::Receive(const ReceiveDataCallback& callback,
+ const ReceiveErrorCallback& error_callback) {
raymes 2014/08/05 06:26:44 Similarly for these functions it would be good to
Sam McNally 2014/08/05 07:26:33 Done.
+ if (state_ == STATE_PAUSED) {
+ producer_->Resume();
+ state_ = STATE_IDLE;
+ }
+ if (state_ != STATE_IDLE || callback.is_null() || error_callback.is_null()) {
raymes 2014/08/05 06:26:44 Might as well just DCHECK null callbacks rather th
Sam McNally 2014/08/05 07:26:33 Done.
+ return false;
+ }
+ state_ = STATE_WAITING_FOR_DATA;
+ receive_callback_ = callback;
+ receive_error_callback_ = error_callback;
+ base::MessageLoop::current()->PostTask(
+ FROM_HERE,
+ base::Bind(&DataPipeReceiver::ReceiveInternal,
+ weak_factory_.GetWeakPtr()));
+ return true;
+}
+
+DataPipeReceiver::~DataPipeReceiver() {
+ if (!receive_error_callback_.is_null())
+ DispatchError(connection_error_value_);
+}
+
+void DataPipeReceiver::Done(uint32_t bytes_consumed) {
+ if (state_ == STATE_SHUT_DOWN)
+ return;
+
+ MojoResult result = mojo::EndReadDataRaw(handle_.get(), bytes_consumed);
+ DCHECK_EQ(MOJO_RESULT_OK, result);
+ bytes_since_last_error_ += bytes_consumed;
+ state_ = STATE_IDLE;
+}
+
+void DataPipeReceiver::OnDoneWaiting(MojoResult result) {
+ RetryReceive();
+}
+
+void DataPipeReceiver::OnError(uint32_t bytes_since_last_error, int32_t error) {
+ pending_error_ = true;
+ error_ = error;
+ error_offset_ = bytes_since_last_error;
+ RetryReceive();
+}
+
+void DataPipeReceiver::OnConnectionError() {
+ state_ = STATE_SHUT_DOWN;
+ if (!receive_callback_.is_null())
+ DispatchError(connection_error_value_);
+}
+
+void DataPipeReceiver::RetryReceive() {
+ if (!receive_callback_.is_null())
+ ReceiveInternal();
+}
+
+void DataPipeReceiver::ReceiveInternal() {
+ if (pending_error_ && bytes_since_last_error_ >= error_offset_) {
+ pending_error_ = false;
+ bytes_since_last_error_ -= error_offset_;
+ error_offset_ = 0;
+ state_ = STATE_PAUSED;
+ DispatchError(error_);
+ return;
+ }
+ const void* data;
+ uint32_t num_bytes = std::numeric_limits<uint32_t>::max();
+ MojoResult result = mojo::BeginReadDataRaw(
+ handle_.get(), &data, &num_bytes, MOJO_READ_DATA_FLAG_NONE);
+ switch (result) {
raymes 2014/08/05 06:26:44 consider just using if/else rather than switch sin
Sam McNally 2014/08/05 07:26:33 Done.
+ case MOJO_RESULT_OK:
+ DispatchData(data, num_bytes);
+ break;
+ case MOJO_RESULT_SHOULD_WAIT:
+ waiter_.reset(new AsyncWaiter(handle_.get(),
+ MOJO_HANDLE_SIGNAL_READABLE,
+ base::Bind(&DataPipeReceiver::OnDoneWaiting,
raymes 2014/08/05 06:26:44 Could you bind directly to RetryReceive?
Sam McNally 2014/08/05 07:26:33 OnDoneWaiting should be doing something with the w
+ weak_factory_.GetWeakPtr())));
+ break;
+ default:
+ state_ = STATE_SHUT_DOWN;
+ DispatchError(connection_error_value_);
+ break;
+ }
+}
+
+void DataPipeReceiver::DispatchData(const void* data, uint32_t num_bytes) {
+ state_ = STATE_WAITING_FOR_BUFFER;
+ ReceiveDataCallback callback = receive_callback_;
+ receive_callback_.Reset();
+ receive_error_callback_.Reset();
+ callback.Run(scoped_ptr<ReadOnlyBuffer>(
+ new Buffer(this, static_cast<const char*>(data), num_bytes)));
+}
+
+void DataPipeReceiver::DispatchError(int32_t error) {
+ if (state_ == STATE_WAITING_FOR_DATA)
+ state_ = STATE_IDLE;
+ ReceiveErrorCallback callback = receive_error_callback_;
+ receive_callback_.Reset();
+ receive_error_callback_.Reset();
+ callback.Run(error);
+}
+
+DataPipeReceiver::Buffer::Buffer(scoped_refptr<DataPipeReceiver> pipe,
+ const char* buffer,
+ uint32_t buffer_size)
+ : pipe_(pipe), buffer_(buffer), buffer_size_(buffer_size) {
+}
+
+DataPipeReceiver::Buffer::~Buffer() {
+ if (pipe_)
+ pipe_->Done(0);
+}
+
+const char* DataPipeReceiver::Buffer::GetData() {
+ return buffer_;
+}
+
+uint32_t DataPipeReceiver::Buffer::GetSize() {
+ return buffer_size_;
+}
+
+void DataPipeReceiver::Buffer::Done(uint32_t bytes_consumed) {
+ pipe_->Done(bytes_consumed);
+ pipe_ = NULL;
+ buffer_ = NULL;
+ buffer_size_ = 0;
+}
+
+void DataPipeReceiver::Buffer::DoneWithError(uint32_t bytes_consumed,
+ int32_t error) {
+ Done(bytes_consumed);
+}
+
+} // namespace device

Powered by Google App Engine
This is Rietveld 408576698