Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2110)

Unified Diff: device/serial/data_sink_receiver.cc

Issue 646063003: Change data pipe wrappers used by SerialConnection to use message pipe. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: split out bug fix Created 6 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: device/serial/data_sink_receiver.cc
diff --git a/device/serial/data_sink_receiver.cc b/device/serial/data_sink_receiver.cc
index 3d52e48587932b58713f121ff9fa6a90da718d4b..170f8b2c5b052e80d1c9be8a945a2d3492da3326 100644
--- a/device/serial/data_sink_receiver.cc
+++ b/device/serial/data_sink_receiver.cc
@@ -7,38 +7,12 @@
#include <limits>
#include "base/bind.h"
-#include "device/serial/async_waiter.h"
+#include "base/message_loop/message_loop.h"
namespace device {
-// Represents a flush of data that has not been completed.
-class DataSinkReceiver::PendingFlush {
- public:
- PendingFlush();
-
- // Initializes this PendingFlush with |num_bytes|, the number of bytes to
- // flush.
- void SetNumBytesToFlush(uint32_t num_bytes);
-
- // Attempts to discard |bytes_to_flush_| bytes from |handle|. Returns
- // MOJO_RESULT_OK on success, MOJO_RESULT_SHOULD_WAIT if fewer than
- // |bytes_to_flush_| bytes were flushed or the error if one is encountered
- // discarding data from |handle|.
- MojoResult Flush(mojo::DataPipeConsumerHandle handle);
-
- // Whether this PendingFlush has received the number of bytes to flush.
- bool received_flush() { return received_flush_; }
-
- private:
- // Whether this PendingFlush has received the number of bytes to flush.
- bool received_flush_;
-
- // The remaining number of bytes to flush.
- uint32_t bytes_to_flush_;
-};
-
-// A ReadOnlyBuffer implementation that provides a view of a data pipe owned by
-// a DataSinkReceiver.
+// A ReadOnlyBuffer implementation that provides a view of a buffer owned by a
+// DataSinkReceiver.
class DataSinkReceiver::Buffer : public ReadOnlyBuffer {
public:
Buffer(scoped_refptr<DataSinkReceiver> receiver,
@@ -55,7 +29,7 @@ class DataSinkReceiver::Buffer : public ReadOnlyBuffer {
void DoneWithError(uint32_t bytes_read, int32_t error) override;
private:
- // The DataSinkReceiver whose data pipe we are providing a view.
+ // The DataSinkReceiver of whose buffer we are providing a view.
scoped_refptr<DataSinkReceiver> receiver_;
const char* buffer_;
@@ -68,34 +42,53 @@ class DataSinkReceiver::Buffer : public ReadOnlyBuffer {
int32_t cancellation_error_;
};
+// A frame of data received from the client.
+class DataSinkReceiver::DataFrame {
+ public:
+ explicit DataFrame(mojo::Array<uint8_t> data);
+
+ // Returns the number of uncomsumed bytes remaining of this data frame.
+ uint32_t GetRemainingBytes();
+
+ // Returns a pointer to the remaining data to be consumed.
+ const char* GetData();
+
+ // Reports that |bytes_read| bytes have been consumed.
+ void OnDataConsumed(uint32_t bytes_read);
+
+ private:
+ mojo::Array<uint8_t> data_;
+ uint32_t offset_;
+};
+
DataSinkReceiver::DataSinkReceiver(const ReadyCallback& ready_callback,
const CancelCallback& cancel_callback,
const ErrorCallback& error_callback)
: ready_callback_(ready_callback),
cancel_callback_(cancel_callback),
error_callback_(error_callback),
+ flush_pending_(false),
buffer_in_use_(NULL),
+ initialized_(false),
+ available_buffer_capacity_(0),
shut_down_(false),
weak_factory_(this) {
}
void DataSinkReceiver::ShutDown() {
shut_down_ = true;
- if (waiter_)
- waiter_.reset();
}
DataSinkReceiver::~DataSinkReceiver() {
}
-void DataSinkReceiver::Init(mojo::ScopedDataPipeConsumerHandle handle) {
- if (handle_.is_valid()) {
- DispatchFatalError();
+void DataSinkReceiver::Init(uint32_t buffer_size) {
+ if (initialized_) {
+ ShutDown();
return;
}
-
- handle_ = handle.Pass();
- StartWaiting();
+ initialized_ = true;
+ available_buffer_capacity_ = buffer_size;
}
void DataSinkReceiver::Cancel(int32_t error) {
@@ -103,7 +96,7 @@ void DataSinkReceiver::Cancel(int32_t error) {
// response, that ReportBytesSentAndError message will appear to the
// DataSinkClient to be caused by this Cancel message. In that case, we ignore
// the cancel.
- if (!pending_flushes_.empty() && !pending_flushes_.back()->received_flush())
+ if (flush_pending_)
return;
// If there is a buffer is in use, mark the buffer as cancelled and notify the
@@ -120,54 +113,38 @@ void DataSinkReceiver::Cancel(int32_t error) {
cancel_callback_.Run(error);
return;
}
- // If there is no buffer in use, immediately report the error and cancel the
- // waiting for the data pipe if one exists. This transitions straight into the
- // state after the sink has returned an error.
- waiter_.reset();
ReportBytesSentAndError(0, error);
}
-void DataSinkReceiver::OnConnectionError() {
- DispatchFatalError();
-}
-
-void DataSinkReceiver::StartWaiting() {
- DCHECK(!waiter_ && !shut_down_);
- waiter_.reset(
- new AsyncWaiter(handle_.get(),
- MOJO_HANDLE_SIGNAL_READABLE,
- base::Bind(&DataSinkReceiver::OnDoneWaiting, this)));
-}
-
-void DataSinkReceiver::OnDoneWaiting(MojoResult result) {
- DCHECK(waiter_ && !shut_down_);
- waiter_.reset();
- if (result != MOJO_RESULT_OK) {
- DispatchFatalError();
+void DataSinkReceiver::OnData(mojo::Array<uint8_t> data) {
+ if (!initialized_) {
+ ShutDown();
return;
}
- // If there are any queued flushes (from ReportBytesSentAndError()), let them
- // flush data from the data pipe.
- if (!pending_flushes_.empty()) {
- MojoResult result = pending_flushes_.front()->Flush(handle_.get());
- if (result == MOJO_RESULT_OK) {
- pending_flushes_.pop();
- } else if (result != MOJO_RESULT_SHOULD_WAIT) {
- DispatchFatalError();
- return;
- }
- StartWaiting();
+ if (data.size() > available_buffer_capacity_) {
+ ShutDown();
return;
}
- const void* data = NULL;
- uint32_t num_bytes = std::numeric_limits<uint32_t>::max();
- result = mojo::BeginReadDataRaw(
- handle_.get(), &data, &num_bytes, MOJO_READ_DATA_FLAG_NONE);
- if (result != MOJO_RESULT_OK) {
- DispatchFatalError();
+ available_buffer_capacity_ -= static_cast<uint32_t>(data.size());
+ pending_data_buffers_.push(linked_ptr<DataFrame>(new DataFrame(data.Pass())));
+ if (!buffer_in_use_ && !flush_pending_)
+ RunReadyCallback();
+}
+
+void DataSinkReceiver::OnConnectionError() {
+ DispatchFatalError();
+}
+
+void DataSinkReceiver::RunReadyCallback() {
+ DCHECK(!shut_down_ && !flush_pending_);
+ // If data arrives while a call to RunReadyCallback() is posted, we can be
+ // called with buffer_in_use_ already set.
+ if (buffer_in_use_)
return;
- }
- buffer_in_use_ = new Buffer(this, static_cast<const char*>(data), num_bytes);
+ buffer_in_use_ =
+ new Buffer(this,
+ pending_data_buffers_.front()->GetData(),
+ pending_data_buffers_.front()->GetRemainingBytes());
ready_callback_.Run(scoped_ptr<ReadOnlyBuffer>(buffer_in_use_));
}
@@ -175,7 +152,12 @@ void DataSinkReceiver::Done(uint32_t bytes_read) {
if (!DoneInternal(bytes_read))
return;
client()->ReportBytesSent(bytes_read);
- StartWaiting();
+ if (!pending_data_buffers_.empty()) {
+ base::MessageLoop::current()->PostTask(
+ FROM_HERE,
+ base::Bind(&DataSinkReceiver::RunReadyCallback,
+ weak_factory_.GetWeakPtr()));
+ }
}
void DataSinkReceiver::DoneWithError(uint32_t bytes_read, int32_t error) {
@@ -190,35 +172,32 @@ bool DataSinkReceiver::DoneInternal(uint32_t bytes_read) {
DCHECK(buffer_in_use_);
buffer_in_use_ = NULL;
- MojoResult result = mojo::EndReadDataRaw(handle_.get(), bytes_read);
- if (result != MOJO_RESULT_OK) {
- DispatchFatalError();
- return false;
- }
+ available_buffer_capacity_ += bytes_read;
+ pending_data_buffers_.front()->OnDataConsumed(bytes_read);
+ if (pending_data_buffers_.front()->GetRemainingBytes() == 0)
+ pending_data_buffers_.pop();
return true;
}
void DataSinkReceiver::ReportBytesSentAndError(uint32_t bytes_read,
int32_t error) {
- // When we encounter an error, we must discard the data from any sends already
- // in the data pipe before we can resume dispatching data to the sink. We add
- // a pending flush here. The response containing the number of bytes to flush
- // is handled in SetNumBytesToFlush(). The actual flush is handled in
- // OnDoneWaiting().
- pending_flushes_.push(linked_ptr<PendingFlush>(new PendingFlush()));
+ // When we encounter an error, we must discard the data from any send buffers
+ // transmitted by the DataSinkClient before it receives this error.
+ flush_pending_ = true;
client()->ReportBytesSentAndError(
bytes_read,
error,
- base::Bind(&DataSinkReceiver::SetNumBytesToFlush,
- weak_factory_.GetWeakPtr()));
+ base::Bind(&DataSinkReceiver::DoFlush, weak_factory_.GetWeakPtr()));
}
-void DataSinkReceiver::SetNumBytesToFlush(uint32_t bytes_to_flush) {
- DCHECK(!pending_flushes_.empty());
- DCHECK(!pending_flushes_.back()->received_flush());
- pending_flushes_.back()->SetNumBytesToFlush(bytes_to_flush);
- if (!waiter_)
- StartWaiting();
+void DataSinkReceiver::DoFlush() {
+ DCHECK(flush_pending_);
+ flush_pending_ = false;
+ while (!pending_data_buffers_.empty()) {
+ available_buffer_capacity_ +=
+ pending_data_buffers_.front()->GetRemainingBytes();
+ pending_data_buffers_.pop();
+ }
}
void DataSinkReceiver::DispatchFatalError() {
@@ -263,44 +242,44 @@ uint32_t DataSinkReceiver::Buffer::GetSize() {
}
void DataSinkReceiver::Buffer::Done(uint32_t bytes_read) {
+ scoped_refptr<DataSinkReceiver> receiver = receiver_;
+ receiver_ = nullptr;
if (cancelled_)
- receiver_->DoneWithError(bytes_read, cancellation_error_);
+ receiver->DoneWithError(bytes_read, cancellation_error_);
else
- receiver_->Done(bytes_read);
- receiver_ = NULL;
+ receiver->Done(bytes_read);
buffer_ = NULL;
buffer_size_ = 0;
}
void DataSinkReceiver::Buffer::DoneWithError(uint32_t bytes_read,
int32_t error) {
- receiver_->DoneWithError(bytes_read, error);
- receiver_ = NULL;
+ scoped_refptr<DataSinkReceiver> receiver = receiver_;
+ receiver_ = nullptr;
+ receiver->DoneWithError(bytes_read, error);
buffer_ = NULL;
buffer_size_ = 0;
}
-DataSinkReceiver::PendingFlush::PendingFlush()
- : received_flush_(false), bytes_to_flush_(0) {
+DataSinkReceiver::DataFrame::DataFrame(mojo::Array<uint8_t> data)
+ : data_(data.Pass()), offset_(0) {
+ DCHECK_LT(0u, data_.size());
+}
+
+// Returns the number of uncomsumed bytes remaining of this data frame.
+uint32_t DataSinkReceiver::DataFrame::GetRemainingBytes() {
+ return static_cast<uint32_t>(data_.size() - offset_);
}
-void DataSinkReceiver::PendingFlush::SetNumBytesToFlush(uint32_t num_bytes) {
- DCHECK(!received_flush_);
- received_flush_ = true;
- bytes_to_flush_ = num_bytes;
+// Returns a pointer to the remaining data to be consumed.
+const char* DataSinkReceiver::DataFrame::GetData() {
+ DCHECK_LT(offset_, data_.size());
+ return reinterpret_cast<const char*>(&data_[0]) + offset_;
}
-MojoResult DataSinkReceiver::PendingFlush::Flush(
- mojo::DataPipeConsumerHandle handle) {
- DCHECK(received_flush_);
- uint32_t num_bytes = bytes_to_flush_;
- MojoResult result =
- mojo::ReadDataRaw(handle, NULL, &num_bytes, MOJO_READ_DATA_FLAG_DISCARD);
- if (result != MOJO_RESULT_OK)
- return result;
- DCHECK(num_bytes <= bytes_to_flush_);
- bytes_to_flush_ -= num_bytes;
- return bytes_to_flush_ == 0 ? MOJO_RESULT_OK : MOJO_RESULT_SHOULD_WAIT;
+void DataSinkReceiver::DataFrame::OnDataConsumed(uint32_t bytes_read) {
+ offset_ += bytes_read;
+ DCHECK_LE(offset_, data_.size());
}
} // namespace device

Powered by Google App Engine
This is Rietveld 408576698