Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(739)

Unified Diff: device/serial/data_receiver.cc

Issue 437933002: Add data pipe wrappers to be used to implement serial receive. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@serial-buffer
Patch Set: address comments Created 6 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « device/serial/data_receiver.h ('k') | device/serial/data_source_sender.h » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: device/serial/data_receiver.cc
diff --git a/device/serial/data_receiver.cc b/device/serial/data_receiver.cc
new file mode 100644
index 0000000000000000000000000000000000000000..d1a3f030442c84d4a49644015a6ddcb70a035519
--- /dev/null
+++ b/device/serial/data_receiver.cc
@@ -0,0 +1,196 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "device/serial/data_receiver.h"
+
+#include <limits>
+
+#include "base/bind.h"
+#include "base/message_loop/message_loop.h"
+#include "device/serial/async_waiter.h"
+
+namespace device {
+
+class DataReceiver::Buffer : public ReadOnlyBuffer {
+ public:
+ Buffer(scoped_refptr<DataReceiver> pipe,
+ const char* buffer,
+ uint32_t buffer_size);
+ virtual ~Buffer();
+ virtual const char* GetData() OVERRIDE;
+ virtual uint32_t GetSize() OVERRIDE;
+ virtual void Done(uint32_t bytes_consumed) OVERRIDE;
+ virtual void DoneWithError(uint32_t bytes_consumed, int32_t error) OVERRIDE;
+
+ private:
+ scoped_refptr<DataReceiver> pipe_;
+ const char* buffer_;
+ uint32_t buffer_size_;
+};
+
+DataReceiver::DataReceiver(mojo::InterfacePtr<serial::DataSource> source,
+ uint32_t buffer_size,
+ int32_t connection_error_value)
+ : source_(source.Pass()),
+ connection_error_value_(connection_error_value),
+ bytes_since_last_error_(0),
+ pending_error_(false),
+ error_offset_(0),
+ error_(0),
+ state_(STATE_IDLE),
+ weak_factory_(this) {
+ MojoCreateDataPipeOptions options = {
+ sizeof(options), MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1, buffer_size,
+ };
+ options.struct_size = sizeof(options);
+ mojo::ScopedDataPipeProducerHandle remote_handle;
+ MojoResult result = mojo::CreateDataPipe(&options, &remote_handle, &handle_);
+ DCHECK_EQ(MOJO_RESULT_OK, result);
+ source_->Init(remote_handle.Pass());
+ source_.set_client(this);
+}
+
+bool DataReceiver::Receive(const ReceiveDataCallback& callback,
+ const ReceiveErrorCallback& error_callback) {
+ DCHECK(!callback.is_null() && !error_callback.is_null());
+ if (state_ == STATE_PAUSED) {
+ source_->Resume();
+ state_ = STATE_IDLE;
+ }
+ if (state_ != STATE_IDLE)
+ return false;
+
+ state_ = STATE_WAITING_FOR_DATA;
+ receive_callback_ = callback;
+ receive_error_callback_ = error_callback;
+ base::MessageLoop::current()->PostTask(
+ FROM_HERE,
+ base::Bind(&DataReceiver::ReceiveInternal, weak_factory_.GetWeakPtr()));
+ return true;
+}
+
+DataReceiver::~DataReceiver() {
+ if (!receive_error_callback_.is_null())
+ DispatchError(connection_error_value_);
+}
+
+void DataReceiver::Done(uint32_t bytes_consumed) {
+ DCHECK(state_ == STATE_WAITING_FOR_BUFFER || state_ == STATE_SHUT_DOWN);
+ if (state_ == STATE_SHUT_DOWN)
+ return;
+
+ MojoResult result = mojo::EndReadDataRaw(handle_.get(), bytes_consumed);
+ DCHECK_EQ(MOJO_RESULT_OK, result);
+ bytes_since_last_error_ += bytes_consumed;
+ state_ = STATE_IDLE;
+}
+
+void DataReceiver::OnDoneWaiting(MojoResult result) {
raymes 2014/08/05 08:10:20 I think we can only be in a subset of states here,
Sam McNally 2014/08/05 08:33:09 Done.
+ if (result != MOJO_RESULT_OK) {
+ OnConnectionError();
raymes 2014/08/05 08:10:20 Consider inlining OnConnectionError here as I also
Sam McNally 2014/08/05 08:33:09 Done.
+ return;
+ }
+ RetryReceive();
+}
+
+void DataReceiver::OnError(uint32_t bytes_since_last_error, int32_t error) {
raymes 2014/08/05 08:10:20 Consider putting comments in all of the entrypoint
Sam McNally 2014/08/05 08:33:08 Done.
+ pending_error_ = true;
+ error_ = error;
+ error_offset_ = bytes_since_last_error;
+ RetryReceive();
+}
+
+void DataReceiver::OnConnectionError() {
+ state_ = STATE_SHUT_DOWN;
+ if (!receive_callback_.is_null())
raymes 2014/08/05 08:10:20 Maybe rather than checking whether callbacks are n
Sam McNally 2014/08/05 08:33:09 Done.
+ DispatchError(connection_error_value_);
+}
+
+void DataReceiver::RetryReceive() {
raymes 2014/08/05 08:10:20 Inline RetryReceive to reduce indirection
Sam McNally 2014/08/05 08:33:08 Done.
+ if (!receive_callback_.is_null())
+ ReceiveInternal();
+}
+
+void DataReceiver::ReceiveInternal() {
+ DCHECK(state_ == STATE_WAITING_FOR_DATA);
+ if (pending_error_ && bytes_since_last_error_ >= error_offset_) {
+ pending_error_ = false;
+ bytes_since_last_error_ -= error_offset_;
raymes 2014/08/05 08:10:20 As we discussed consider always just counting all
Sam McNally 2014/08/05 08:33:08 Done.
+ error_offset_ = 0;
+ state_ = STATE_PAUSED;
+ DispatchError(error_);
+ return;
+ }
+ const void* data;
+ uint32_t num_bytes = std::numeric_limits<uint32_t>::max();
+ MojoResult result = mojo::BeginReadDataRaw(
+ handle_.get(), &data, &num_bytes, MOJO_READ_DATA_FLAG_NONE);
+ if (result == MOJO_RESULT_OK) {
+ DispatchData(data, num_bytes);
+ return;
+ }
+ if (result == MOJO_RESULT_SHOULD_WAIT) {
+ waiter_.reset(new AsyncWaiter(
+ handle_.get(),
+ MOJO_HANDLE_SIGNAL_READABLE,
+ base::Bind(&DataReceiver::OnDoneWaiting, weak_factory_.GetWeakPtr())));
+ return;
+ }
+ state_ = STATE_SHUT_DOWN;
+ DispatchError(connection_error_value_);
+}
+
+void DataReceiver::DispatchData(const void* data, uint32_t num_bytes) {
+ DCHECK(state_ == STATE_WAITING_FOR_DATA);
+ state_ = STATE_WAITING_FOR_BUFFER;
+ ReceiveDataCallback callback = receive_callback_;
+ receive_callback_.Reset();
+ receive_error_callback_.Reset();
+ callback.Run(scoped_ptr<ReadOnlyBuffer>(
+ new Buffer(this, static_cast<const char*>(data), num_bytes)));
+}
+
+void DataReceiver::DispatchError(int32_t error) {
+ DCHECK(state_ == STATE_WAITING_FOR_DATA || state_ == STATE_PAUSED ||
+ state_ == STATE_SHUT_DOWN);
+ if (state_ == STATE_WAITING_FOR_DATA)
+ state_ = STATE_IDLE;
+ ReceiveErrorCallback callback = receive_error_callback_;
+ receive_callback_.Reset();
+ receive_error_callback_.Reset();
+ callback.Run(error);
+}
+
+DataReceiver::Buffer::Buffer(scoped_refptr<DataReceiver> pipe,
+ const char* buffer,
+ uint32_t buffer_size)
+ : pipe_(pipe), buffer_(buffer), buffer_size_(buffer_size) {
+}
+
+DataReceiver::Buffer::~Buffer() {
+ if (pipe_)
+ pipe_->Done(0);
+}
+
+const char* DataReceiver::Buffer::GetData() {
+ return buffer_;
+}
+
+uint32_t DataReceiver::Buffer::GetSize() {
+ return buffer_size_;
+}
+
+void DataReceiver::Buffer::Done(uint32_t bytes_consumed) {
+ pipe_->Done(bytes_consumed);
+ pipe_ = NULL;
+ buffer_ = NULL;
+ buffer_size_ = 0;
+}
+
+void DataReceiver::Buffer::DoneWithError(uint32_t bytes_consumed,
+ int32_t error) {
+ Done(bytes_consumed);
+}
+
+} // namespace device
« no previous file with comments | « device/serial/data_receiver.h ('k') | device/serial/data_source_sender.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698