Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2661)

Unified Diff: device/serial/data_sender.cc

Issue 646063003: Change data pipe wrappers used by SerialConnection to use message pipe. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 6 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: device/serial/data_sender.cc
diff --git a/device/serial/data_sender.cc b/device/serial/data_sender.cc
index 409f5f449d1003f45cee750039634bc2e53f9876..5bf36f98afabefd7b14f0f4d0bbff49714958b09 100644
--- a/device/serial/data_sender.cc
+++ b/device/serial/data_sender.cc
@@ -4,9 +4,10 @@
#include "device/serial/data_sender.h"
+#include <algorithm>
+
#include "base/bind.h"
#include "base/message_loop/message_loop.h"
-#include "device/serial/async_waiter.h"
namespace device {
@@ -34,10 +35,8 @@ class DataSender::PendingSend {
// Reports |fatal_error_value_| to |receive_error_callback_|.
void DispatchFatalError();
- // Attempts to send any data not yet sent to |handle|. Returns MOJO_RESULT_OK
- // if all data is sent, MOJO_RESULT_SHOULD_WAIT if not all of the data is sent
- // or the error if one is encountered writing to |handle|.
- MojoResult SendData(mojo::DataPipeProducerHandle handle);
+ // Attempts to send any data not yet sent to |sink|.
+ bool SendData(serial::DataSink* sink, uint32_t* capacity);
private:
// Invoked to update |bytes_acked_| and |num_bytes|.
@@ -67,17 +66,11 @@ DataSender::DataSender(mojo::InterfacePtr<serial::DataSink> sink,
int32_t fatal_error_value)
: sink_(sink.Pass()),
fatal_error_value_(fatal_error_value),
+ available_buffer_capacity_(buffer_size),
shut_down_(false) {
sink_.set_error_handler(this);
- MojoCreateDataPipeOptions options = {
- sizeof(options), MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1, buffer_size,
- };
- options.struct_size = sizeof(options);
- mojo::ScopedDataPipeConsumerHandle remote_handle;
- MojoResult result = mojo::CreateDataPipe(&options, &handle_, &remote_handle);
- DCHECK_EQ(MOJO_RESULT_OK, result);
- sink_->Init(remote_handle.Pass());
sink_.set_client(this);
+ sink_->Init(buffer_size);
}
DataSender::~DataSender() {
@@ -115,6 +108,7 @@ void DataSender::ReportBytesSent(uint32_t bytes_sent) {
if (shut_down_)
return;
+ available_buffer_capacity_ += bytes_sent;
while (bytes_sent != 0 && !sends_awaiting_ack_.empty() &&
sends_awaiting_ack_.front()->ReportBytesSent(&bytes_sent)) {
sends_awaiting_ack_.pop();
@@ -133,27 +127,29 @@ void DataSender::ReportBytesSent(uint32_t bytes_sent) {
}
if (pending_sends_.empty() && sends_awaiting_ack_.empty())
RunCancelCallback();
+ SendInternal();
}
void DataSender::ReportBytesSentAndError(
uint32_t bytes_sent,
int32_t error,
- const mojo::Callback<void(uint32_t)>& callback) {
+ const mojo::Callback<void()>& callback) {
if (shut_down_)
return;
- uint32_t bytes_to_flush = 0;
+ available_buffer_capacity_ += bytes_sent;
while (!sends_awaiting_ack_.empty()) {
- bytes_to_flush += sends_awaiting_ack_.front()->ReportBytesSentAndError(
- &bytes_sent, error);
+ available_buffer_capacity_ +=
+ sends_awaiting_ack_.front()->ReportBytesSentAndError(&bytes_sent,
+ error);
sends_awaiting_ack_.pop();
}
while (!pending_sends_.empty()) {
- bytes_to_flush +=
+ available_buffer_capacity_ +=
pending_sends_.front()->ReportBytesSentAndError(&bytes_sent, error);
pending_sends_.pop();
}
- callback.Run(bytes_to_flush);
+ callback.Run();
RunCancelCallback();
}
@@ -162,33 +158,15 @@ void DataSender::OnConnectionError() {
}
void DataSender::SendInternal() {
- while (!pending_sends_.empty()) {
- MojoResult result = pending_sends_.front()->SendData(handle_.get());
- if (result == MOJO_RESULT_OK) {
+ while (!pending_sends_.empty() && available_buffer_capacity_) {
+ if (pending_sends_.front()->SendData(sink_.get(),
+ &available_buffer_capacity_)) {
sends_awaiting_ack_.push(pending_sends_.front());
pending_sends_.pop();
- } else if (result == MOJO_RESULT_SHOULD_WAIT) {
- waiter_.reset(new AsyncWaiter(
- handle_.get(),
- MOJO_HANDLE_SIGNAL_WRITABLE,
- base::Bind(&DataSender::OnDoneWaiting, base::Unretained(this))));
- return;
- } else {
- ShutDown();
- return;
}
}
}
-void DataSender::OnDoneWaiting(MojoResult result) {
- waiter_.reset();
- if (result != MOJO_RESULT_OK) {
- ShutDown();
- return;
- }
- SendInternal();
-}
-
void DataSender::RunCancelCallback() {
DCHECK(pending_sends_.empty() && sends_awaiting_ack_.empty());
if (pending_cancel_.is_null())
@@ -200,7 +178,6 @@ void DataSender::RunCancelCallback() {
}
void DataSender::ShutDown() {
- waiter_.reset();
shut_down_ = true;
while (!pending_sends_.empty()) {
pending_sends_.front()->DispatchFatalError();
@@ -253,18 +230,16 @@ void DataSender::PendingSend::DispatchFatalError() {
FROM_HERE, base::Bind(error_callback_, 0, fatal_error_value_));
}
-MojoResult DataSender::PendingSend::SendData(
- mojo::DataPipeProducerHandle handle) {
- uint32_t bytes_to_send = static_cast<uint32_t>(data_.size()) - bytes_sent_;
- MojoResult result = mojo::WriteDataRaw(handle,
- data_.data() + bytes_sent_,
- &bytes_to_send,
- MOJO_WRITE_DATA_FLAG_NONE);
- if (result != MOJO_RESULT_OK)
- return result;
-
- bytes_sent_ += bytes_to_send;
- return bytes_sent_ == data_.size() ? MOJO_RESULT_OK : MOJO_RESULT_SHOULD_WAIT;
+bool DataSender::PendingSend::SendData(serial::DataSink* sink,
+ uint32_t* capacity) {
raymes 2014/10/17 01:55:42 I think calling this available_buffer_size to matc
Sam McNally 2014/10/20 05:12:58 Done.
+ uint32_t num_bytes_to_send =
+ std::min(static_cast<uint32_t>(data_.size() - bytes_sent_), *capacity);
+ mojo::Array<uint8_t> bytes(num_bytes_to_send);
+ memcpy(&bytes[0], data_.data() + bytes_sent_, num_bytes_to_send);
+ bytes_sent_ += num_bytes_to_send;
+ *capacity -= num_bytes_to_send;
+ sink->AcceptData(bytes.Pass());
+ return bytes_sent_ == data_.size();
}
void DataSender::PendingSend::ReportBytesSentInternal(uint32_t* num_bytes) {

Powered by Google App Engine
This is Rietveld 408576698