Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3589)

Unified Diff: device/serial/data_receiver.cc

Issue 437933002: Add data pipe wrappers to be used to implement serial receive. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@serial-buffer
Patch Set: address comments Created 6 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: device/serial/data_receiver.cc
diff --git a/device/serial/data_receiver.cc b/device/serial/data_receiver.cc
new file mode 100644
index 0000000000000000000000000000000000000000..ddb3b5b787be4f80e8989ebb2a38df1f2e9fc887
--- /dev/null
+++ b/device/serial/data_receiver.cc
@@ -0,0 +1,278 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "device/serial/data_receiver.h"
+
+#include <limits>
+
+#include "base/bind.h"
+#include "base/message_loop/message_loop.h"
+#include "device/serial/async_waiter.h"
+
+namespace device {
+
+class DataReceiver::PendingReceive {
raymes 2014/08/06 05:46:43 Please add some comments for these helper classes
Sam McNally 2014/08/06 08:28:14 Done.
+ public:
+ PendingReceive(DataReceiver* receiver,
+ const ReceiveDataCallback& callback,
+ const ReceiveErrorCallback& error_callback,
+ int32_t fatal_error_value);
+ void DispatchData(const void* data, uint32_t num_bytes);
+ bool DispatchError(DataReceiver::PendingError* error,
+ uint32_t bytes_received);
+ void DispatchFatalError();
+
+ private:
+ class Buffer;
raymes 2014/08/06 05:46:43 since this class is only used in the .cc file, you
Sam McNally 2014/08/06 08:28:14 It's nested so it can see DataReceiver::PendingRec
+ void Done(uint32_t num_bytes);
+
+ DataReceiver* receiver_;
+ ReceiveDataCallback receive_callback_;
+ ReceiveErrorCallback receive_error_callback_;
+ const int32_t fatal_error_value_;
+ bool buffer_in_use_;
+};
+
+class DataReceiver::PendingReceive::Buffer : public ReadOnlyBuffer {
+ public:
+ Buffer(scoped_refptr<DataReceiver> pipe,
+ PendingReceive* receive,
+ const char* buffer,
+ uint32_t buffer_size);
+ virtual ~Buffer();
+ virtual const char* GetData() OVERRIDE;
+ virtual uint32_t GetSize() OVERRIDE;
+ virtual void Done(uint32_t bytes_consumed) OVERRIDE;
+ virtual void DoneWithError(uint32_t bytes_consumed, int32_t error) OVERRIDE;
+
+ private:
+ scoped_refptr<DataReceiver> receiver_;
+ PendingReceive* receive_;
+ const char* buffer_;
+ uint32_t buffer_size_;
+};
+
+struct DataReceiver::PendingError {
+ PendingError(uint32_t offset, int32_t error)
+ : offset(offset), error(error), dispatched(false) {}
+
+ const uint32_t offset;
+ const int32_t error;
+ bool dispatched;
+};
+
+DataReceiver::DataReceiver(mojo::InterfacePtr<serial::DataSource> source,
+ uint32_t buffer_size,
+ int32_t fatal_error_value)
+ : source_(source.Pass()),
+ fatal_error_value_(fatal_error_value),
+ bytes_received_(0),
+ shut_down_(false),
+ weak_factory_(this) {
+ MojoCreateDataPipeOptions options = {
+ sizeof(options), MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1, buffer_size,
+ };
+ mojo::ScopedDataPipeProducerHandle remote_handle;
+ MojoResult result = mojo::CreateDataPipe(&options, &remote_handle, &handle_);
+ DCHECK_EQ(MOJO_RESULT_OK, result);
+ source_->Init(remote_handle.Pass());
+ source_.set_client(this);
+}
+
+bool DataReceiver::Receive(const ReceiveDataCallback& callback,
+ const ReceiveErrorCallback& error_callback) {
+ DCHECK(!callback.is_null() && !error_callback.is_null());
+ if (receive_ || shut_down_)
raymes 2014/08/06 05:46:43 probably pending_receive_ is a more descriptive na
Sam McNally 2014/08/06 08:28:14 Done.
+ return false;
+ if (error_ && error_->dispatched) {
raymes 2014/08/06 05:46:43 similarly, pending_error_
Sam McNally 2014/08/06 08:28:14 Done.
+ source_->Resume();
raymes 2014/08/06 05:46:43 A comment here describing what's going on would be
Sam McNally 2014/08/06 08:28:14 Done.
+ error_.reset();
+ }
+
+ receive_.reset(
+ new PendingReceive(this, callback, error_callback, fatal_error_value_));
+ base::MessageLoop::current()->PostTask(
+ FROM_HERE,
+ base::Bind(&DataReceiver::ReceiveInternal, weak_factory_.GetWeakPtr()));
+ return true;
+}
+
+DataReceiver::~DataReceiver() {
+ ShutDown();
+}
+
+void DataReceiver::OnError(uint32_t offset, int32_t error) {
+ if (shut_down_)
+ return;
+
+ if (error_) {
+ OnConnectionError();
raymes 2014/08/06 05:46:43 Change this to call ShutDown instead of OnConnecti
Sam McNally 2014/08/06 08:28:14 Done.
+ return;
+ }
+ error_.reset(new PendingError(offset, error));
+ if (receive_ && receive_->DispatchError(error_.get(), bytes_received_)) {
+ receive_.reset();
+ waiter_.reset();
+ }
+}
+
+void DataReceiver::OnConnectionError() {
+ ShutDown();
+}
+
+void DataReceiver::Done(uint32_t bytes_consumed) {
+ if (shut_down_)
+ return;
+
+ DCHECK(receive_);
+ MojoResult result = mojo::EndReadDataRaw(handle_.get(), bytes_consumed);
+ DCHECK_EQ(MOJO_RESULT_OK, result);
+ receive_.reset();
+ bytes_received_ += bytes_consumed;
+}
+
+void DataReceiver::OnDoneWaiting(MojoResult result) {
+ DCHECK(receive_ && !shut_down_);
raymes 2014/08/06 05:46:43 we can DCHECK waiter_ and cal waiter_.reset() here
Sam McNally 2014/08/06 08:28:14 Done.
+ if (result != MOJO_RESULT_OK) {
+ ShutDown();
+ return;
+ }
+ ReceiveInternal();
+}
+
+void DataReceiver::ReceiveInternal() {
+ if (shut_down_)
+ return;
+ DCHECK(receive_);
+ if (error_ && receive_->DispatchError(error_.get(), bytes_received_)) {
+ receive_.reset();
+ waiter_.reset();
+ return;
+ }
+
+ const void* data;
+ uint32_t num_bytes = std::numeric_limits<uint32_t>::max();
+ MojoResult result = mojo::BeginReadDataRaw(
+ handle_.get(), &data, &num_bytes, MOJO_READ_DATA_FLAG_NONE);
+ if (result == MOJO_RESULT_OK) {
+ if (error_ && !CheckBytesReceived(num_bytes)) {
raymes 2014/08/06 05:46:43 Maybe rename this function to something like Check
Sam McNally 2014/08/06 08:28:14 Done.
+ ShutDown();
+ return;
+ }
+
+ receive_->DispatchData(data, num_bytes);
+ return;
+ }
+ if (result == MOJO_RESULT_SHOULD_WAIT) {
+ waiter_.reset(new AsyncWaiter(
+ handle_.get(),
+ MOJO_HANDLE_SIGNAL_READABLE,
+ base::Bind(&DataReceiver::OnDoneWaiting, weak_factory_.GetWeakPtr())));
+ return;
+ }
+ ShutDown();
+}
+
+bool DataReceiver::CheckBytesReceived(uint32_t num_bytes) {
+ DCHECK(receive_);
+ DCHECK_NE(bytes_received_, error_->offset);
+ uint32_t potential_bytes_received = bytes_received_ + num_bytes;
+ if ((bytes_received_ < error_->offset &&
+ (potential_bytes_received > error_->offset ||
+ potential_bytes_received < bytes_received_)) ||
+ (bytes_received_ > error_->offset &&
+ potential_bytes_received > error_->offset &&
+ potential_bytes_received < bytes_received_)) {
+ return false;
+ }
raymes 2014/08/06 05:46:43 Mention the fact that this is complicated because
Sam McNally 2014/08/06 08:28:14 Done.
+ return true;
+}
+
+void DataReceiver::ShutDown() {
+ shut_down_ = true;
+ if (receive_)
+ receive_->DispatchFatalError();
+ error_.reset();
+ waiter_.reset();
+}
+
+DataReceiver::PendingReceive::PendingReceive(
+ DataReceiver* receiver,
+ const ReceiveDataCallback& callback,
+ const ReceiveErrorCallback& error_callback,
+ int32_t fatal_error_value)
+ : receiver_(receiver),
+ receive_callback_(callback),
+ receive_error_callback_(error_callback),
+ fatal_error_value_(fatal_error_value),
+ buffer_in_use_(false) {
+}
+
+void DataReceiver::PendingReceive::DispatchData(const void* data,
+ uint32_t num_bytes) {
+ DCHECK(!buffer_in_use_);
+ buffer_in_use_ = true;
+ receive_callback_.Run(scoped_ptr<ReadOnlyBuffer>(
+ new Buffer(receiver_, this, static_cast<const char*>(data), num_bytes)));
+}
+
+bool DataReceiver::PendingReceive::DispatchError(PendingError* error,
+ uint32_t bytes_received) {
+ if (buffer_in_use_ || bytes_received != error->offset || error->dispatched)
raymes 2014/08/06 05:46:43 Probably if error->dispatched is already true, we
Sam McNally 2014/08/06 08:28:14 I don't think this should be called if error->disp
+ return false;
+
+ error->dispatched = true;
+ receive_error_callback_.Run(error->error);
+ return true;
+}
+
+void DataReceiver::PendingReceive::DispatchFatalError() {
+ receive_error_callback_.Run(fatal_error_value_);
+}
+
+void DataReceiver::PendingReceive::Done(uint32_t bytes_consumed) {
+ DCHECK(buffer_in_use_);
+ buffer_in_use_ = false;
+ receiver_->Done(bytes_consumed);
+}
+
+DataReceiver::PendingReceive::Buffer::Buffer(
+ scoped_refptr<DataReceiver> receiver,
+ PendingReceive* receive,
+ const char* buffer,
+ uint32_t buffer_size)
+ : receiver_(receiver),
+ receive_(receive),
+ buffer_(buffer),
+ buffer_size_(buffer_size) {
+}
+
+DataReceiver::PendingReceive::Buffer::~Buffer() {
+ if (receive_)
+ receive_->Done(0);
+}
+
+const char* DataReceiver::PendingReceive::Buffer::GetData() {
+ return buffer_;
+}
+
+uint32_t DataReceiver::PendingReceive::Buffer::GetSize() {
+ return buffer_size_;
+}
+
+void DataReceiver::PendingReceive::Buffer::Done(uint32_t bytes_consumed) {
+ receive_->Done(bytes_consumed);
+ receive_ = NULL;
+ receiver_ = NULL;
+ buffer_ = NULL;
+ buffer_size_ = 0;
+}
+
+void DataReceiver::PendingReceive::Buffer::DoneWithError(
+ uint32_t bytes_consumed,
+ int32_t error) {
+ Done(bytes_consumed);
+}
+
+} // namespace device

Powered by Google App Engine
This is Rietveld 408576698