Index: device/serial/data_sender.cc |
diff --git a/device/serial/data_sender.cc b/device/serial/data_sender.cc |
index 409f5f449d1003f45cee750039634bc2e53f9876..5bf36f98afabefd7b14f0f4d0bbff49714958b09 100644 |
--- a/device/serial/data_sender.cc |
+++ b/device/serial/data_sender.cc |
@@ -4,9 +4,10 @@ |
#include "device/serial/data_sender.h" |
+#include <algorithm> |
+ |
#include "base/bind.h" |
#include "base/message_loop/message_loop.h" |
-#include "device/serial/async_waiter.h" |
namespace device { |
@@ -34,10 +35,8 @@ class DataSender::PendingSend { |
// Reports |fatal_error_value_| to |receive_error_callback_|. |
void DispatchFatalError(); |
- // Attempts to send any data not yet sent to |handle|. Returns MOJO_RESULT_OK |
- // if all data is sent, MOJO_RESULT_SHOULD_WAIT if not all of the data is sent |
- // or the error if one is encountered writing to |handle|. |
- MojoResult SendData(mojo::DataPipeProducerHandle handle); |
+ // Attempts to send any data not yet sent to |sink|. |
+ bool SendData(serial::DataSink* sink, uint32_t* capacity); |
private: |
// Invoked to update |bytes_acked_| and |num_bytes|. |
@@ -67,17 +66,11 @@ DataSender::DataSender(mojo::InterfacePtr<serial::DataSink> sink, |
int32_t fatal_error_value) |
: sink_(sink.Pass()), |
fatal_error_value_(fatal_error_value), |
+ available_buffer_capacity_(buffer_size), |
shut_down_(false) { |
sink_.set_error_handler(this); |
- MojoCreateDataPipeOptions options = { |
- sizeof(options), MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1, buffer_size, |
- }; |
- options.struct_size = sizeof(options); |
- mojo::ScopedDataPipeConsumerHandle remote_handle; |
- MojoResult result = mojo::CreateDataPipe(&options, &handle_, &remote_handle); |
- DCHECK_EQ(MOJO_RESULT_OK, result); |
- sink_->Init(remote_handle.Pass()); |
sink_.set_client(this); |
+ sink_->Init(buffer_size); |
} |
DataSender::~DataSender() { |
@@ -115,6 +108,7 @@ void DataSender::ReportBytesSent(uint32_t bytes_sent) { |
if (shut_down_) |
return; |
+ available_buffer_capacity_ += bytes_sent; |
while (bytes_sent != 0 && !sends_awaiting_ack_.empty() && |
sends_awaiting_ack_.front()->ReportBytesSent(&bytes_sent)) { |
sends_awaiting_ack_.pop(); |
@@ -133,27 +127,29 @@ void DataSender::ReportBytesSent(uint32_t bytes_sent) { |
} |
if (pending_sends_.empty() && sends_awaiting_ack_.empty()) |
RunCancelCallback(); |
+ SendInternal(); |
} |
void DataSender::ReportBytesSentAndError( |
uint32_t bytes_sent, |
int32_t error, |
- const mojo::Callback<void(uint32_t)>& callback) { |
+ const mojo::Callback<void()>& callback) { |
if (shut_down_) |
return; |
- uint32_t bytes_to_flush = 0; |
+ available_buffer_capacity_ += bytes_sent; |
while (!sends_awaiting_ack_.empty()) { |
- bytes_to_flush += sends_awaiting_ack_.front()->ReportBytesSentAndError( |
- &bytes_sent, error); |
+ available_buffer_capacity_ += |
+ sends_awaiting_ack_.front()->ReportBytesSentAndError(&bytes_sent, |
+ error); |
sends_awaiting_ack_.pop(); |
} |
while (!pending_sends_.empty()) { |
- bytes_to_flush += |
+ available_buffer_capacity_ += |
pending_sends_.front()->ReportBytesSentAndError(&bytes_sent, error); |
pending_sends_.pop(); |
} |
- callback.Run(bytes_to_flush); |
+ callback.Run(); |
RunCancelCallback(); |
} |
@@ -162,33 +158,15 @@ void DataSender::OnConnectionError() { |
} |
void DataSender::SendInternal() { |
- while (!pending_sends_.empty()) { |
- MojoResult result = pending_sends_.front()->SendData(handle_.get()); |
- if (result == MOJO_RESULT_OK) { |
+ while (!pending_sends_.empty() && available_buffer_capacity_) { |
+ if (pending_sends_.front()->SendData(sink_.get(), |
+ &available_buffer_capacity_)) { |
sends_awaiting_ack_.push(pending_sends_.front()); |
pending_sends_.pop(); |
- } else if (result == MOJO_RESULT_SHOULD_WAIT) { |
- waiter_.reset(new AsyncWaiter( |
- handle_.get(), |
- MOJO_HANDLE_SIGNAL_WRITABLE, |
- base::Bind(&DataSender::OnDoneWaiting, base::Unretained(this)))); |
- return; |
- } else { |
- ShutDown(); |
- return; |
} |
} |
} |
-void DataSender::OnDoneWaiting(MojoResult result) { |
- waiter_.reset(); |
- if (result != MOJO_RESULT_OK) { |
- ShutDown(); |
- return; |
- } |
- SendInternal(); |
-} |
- |
void DataSender::RunCancelCallback() { |
DCHECK(pending_sends_.empty() && sends_awaiting_ack_.empty()); |
if (pending_cancel_.is_null()) |
@@ -200,7 +178,6 @@ void DataSender::RunCancelCallback() { |
} |
void DataSender::ShutDown() { |
- waiter_.reset(); |
shut_down_ = true; |
while (!pending_sends_.empty()) { |
pending_sends_.front()->DispatchFatalError(); |
@@ -253,18 +230,16 @@ void DataSender::PendingSend::DispatchFatalError() { |
FROM_HERE, base::Bind(error_callback_, 0, fatal_error_value_)); |
} |
-MojoResult DataSender::PendingSend::SendData( |
- mojo::DataPipeProducerHandle handle) { |
- uint32_t bytes_to_send = static_cast<uint32_t>(data_.size()) - bytes_sent_; |
- MojoResult result = mojo::WriteDataRaw(handle, |
- data_.data() + bytes_sent_, |
- &bytes_to_send, |
- MOJO_WRITE_DATA_FLAG_NONE); |
- if (result != MOJO_RESULT_OK) |
- return result; |
- |
- bytes_sent_ += bytes_to_send; |
- return bytes_sent_ == data_.size() ? MOJO_RESULT_OK : MOJO_RESULT_SHOULD_WAIT; |
+bool DataSender::PendingSend::SendData(serial::DataSink* sink, |
+ uint32_t* capacity) { |
raymes
2014/10/17 01:55:42
I think calling this available_buffer_size to matc
Sam McNally
2014/10/20 05:12:58
Done.
|
+ uint32_t num_bytes_to_send = |
+ std::min(static_cast<uint32_t>(data_.size() - bytes_sent_), *capacity); |
+ mojo::Array<uint8_t> bytes(num_bytes_to_send); |
+ memcpy(&bytes[0], data_.data() + bytes_sent_, num_bytes_to_send); |
+ bytes_sent_ += num_bytes_to_send; |
+ *capacity -= num_bytes_to_send; |
+ sink->AcceptData(bytes.Pass()); |
+ return bytes_sent_ == data_.size(); |
} |
void DataSender::PendingSend::ReportBytesSentInternal(uint32_t* num_bytes) { |