Index: device/serial/data_sender.cc |
diff --git a/device/serial/data_sender.cc b/device/serial/data_sender.cc |
index 18b1a43ef3357543a32a99bd16d95843262e0a35..dbd17760161104010d90dd336ea33940c40e496c 100644 |
--- a/device/serial/data_sender.cc |
+++ b/device/serial/data_sender.cc |
@@ -17,30 +17,21 @@ class DataSender::PendingSend { |
PendingSend(const base::StringPiece& data, |
const DataSentCallback& callback, |
const SendErrorCallback& error_callback, |
- int32_t fatal_error_value); |
- |
- // Invoked to report that |num_bytes| of data have been sent. Subtracts the |
- // number of bytes that were part of this send from |num_bytes|. Returns |
- // whether this send has been completed. If this send has been completed, this |
- // calls |callback_|. |
- bool ReportBytesSent(uint32_t* num_bytes); |
- |
- // Invoked to report that |num_bytes| of data have been sent and then an |
- // error, |error| was encountered. Subtracts the number of bytes that were |
- // part of this send from |num_bytes|. If this send was not completed before |
- // the error, this calls |error_callback_| to report the error. Otherwise, |
- // this calls |callback_|. Returns the number of bytes sent but not acked. |
- uint32_t ReportBytesSentAndError(uint32_t* num_bytes, int32_t error); |
+ DataSender* sender); |
// Reports |fatal_error_value_| to |receive_error_callback_|. |
void DispatchFatalError(); |
// Attempts to send any data not yet sent to |sink|. |
- bool SendData(serial::DataSink* sink, uint32_t* available_buffer_size); |
+ void SendData(); |
private: |
- // Invoked to update |bytes_acked_| and |num_bytes|. |
- void ReportBytesSentInternal(uint32_t* num_bytes); |
+ // Invoked to report that |num_bytes| of data have been sent and then an |
+ // error, |error| was encountered. Subtracts the number of bytes that were |
+ // part of this send from |num_bytes|. If this send was not completed before |
+ // the error, this calls |error_callback_| to report the error. Otherwise, |
+ // this calls |callback_|. Returns the number of bytes sent but not acked. |
+ void OnDataSent(uint32_t num_bytes, int32_t error); |
// The data to send. |
const base::StringPiece data_; |
@@ -51,14 +42,8 @@ class DataSender::PendingSend { |
// The callback to report errors. |
const SendErrorCallback error_callback_; |
- // The error value to report when DispatchFatalError() is called. |
- const int32_t fatal_error_value_; |
- |
- // The number of bytes sent to the DataSink. |
- uint32_t bytes_sent_; |
- |
- // The number of bytes acked. |
- uint32_t bytes_acked_; |
+ // The DataSender that owns this PendingSend. |
+ DataSender* sender_; |
}; |
DataSender::DataSender(mojo::InterfacePtr<serial::DataSink> sink, |
@@ -66,11 +51,8 @@ DataSender::DataSender(mojo::InterfacePtr<serial::DataSink> sink, |
int32_t fatal_error_value) |
: sink_(sink.Pass()), |
fatal_error_value_(fatal_error_value), |
- available_buffer_capacity_(buffer_size), |
shut_down_(false) { |
sink_.set_error_handler(this); |
- sink_.set_client(this); |
- sink_->Init(buffer_size); |
} |
DataSender::~DataSender() { |
@@ -84,9 +66,10 @@ bool DataSender::Send(const base::StringPiece& data, |
if (!pending_cancel_.is_null() || shut_down_) |
return false; |
- pending_sends_.push(linked_ptr<PendingSend>( |
- new PendingSend(data, callback, error_callback, fatal_error_value_))); |
- SendInternal(); |
+ linked_ptr<PendingSend> pending_send( |
+ new PendingSend(data, callback, error_callback, this)); |
+ pending_send->SendData(); |
+ sends_awaiting_ack_.push(pending_send); |
return true; |
} |
@@ -94,7 +77,7 @@ bool DataSender::Cancel(int32_t error, const CancelCallback& callback) { |
DCHECK(!callback.is_null()); |
if (!pending_cancel_.is_null() || shut_down_) |
return false; |
- if (pending_sends_.empty() && sends_awaiting_ack_.empty()) { |
+ if (sends_awaiting_ack_.empty()) { |
base::MessageLoop::current()->PostTask(FROM_HERE, callback); |
return true; |
} |
@@ -104,52 +87,25 @@ bool DataSender::Cancel(int32_t error, const CancelCallback& callback) { |
return true; |
} |
-void DataSender::ReportBytesSent(uint32_t bytes_sent) { |
+void DataSender::SendComplete() { |
if (shut_down_) |
return; |
- available_buffer_capacity_ += bytes_sent; |
- while (bytes_sent != 0 && !sends_awaiting_ack_.empty() && |
- sends_awaiting_ack_.front()->ReportBytesSent(&bytes_sent)) { |
- sends_awaiting_ack_.pop(); |
- } |
- if (bytes_sent > 0 && !pending_sends_.empty()) { |
- bool finished = pending_sends_.front()->ReportBytesSent(&bytes_sent); |
- DCHECK(!finished); |
- if (finished) { |
- ShutDown(); |
- return; |
- } |
- } |
- if (bytes_sent != 0) { |
- ShutDown(); |
- return; |
- } |
- if (pending_sends_.empty() && sends_awaiting_ack_.empty()) |
+ DCHECK(!sends_awaiting_ack_.empty()); |
+ sends_awaiting_ack_.pop(); |
+ if (sends_awaiting_ack_.empty()) |
RunCancelCallback(); |
- SendInternal(); |
} |
-void DataSender::ReportBytesSentAndError( |
- uint32_t bytes_sent, |
- int32_t error, |
- const mojo::Callback<void()>& callback) { |
+void DataSender::SendFailed(int32_t error) { |
if (shut_down_) |
return; |
- available_buffer_capacity_ += bytes_sent; |
- while (!sends_awaiting_ack_.empty()) { |
- available_buffer_capacity_ += |
- sends_awaiting_ack_.front()->ReportBytesSentAndError(&bytes_sent, |
- error); |
- sends_awaiting_ack_.pop(); |
- } |
- while (!pending_sends_.empty()) { |
- available_buffer_capacity_ += |
- pending_sends_.front()->ReportBytesSentAndError(&bytes_sent, error); |
- pending_sends_.pop(); |
- } |
- callback.Run(); |
+ DCHECK(!sends_awaiting_ack_.empty()); |
+ sends_awaiting_ack_.pop(); |
+ if (!sends_awaiting_ack_.empty()) |
+ return; |
+ sink_->ClearError(); |
RunCancelCallback(); |
} |
@@ -157,18 +113,8 @@ void DataSender::OnConnectionError() { |
ShutDown(); |
} |
-void DataSender::SendInternal() { |
- while (!pending_sends_.empty() && available_buffer_capacity_) { |
- if (pending_sends_.front()->SendData(sink_.get(), |
- &available_buffer_capacity_)) { |
- sends_awaiting_ack_.push(pending_sends_.front()); |
- pending_sends_.pop(); |
- } |
- } |
-} |
- |
void DataSender::RunCancelCallback() { |
- DCHECK(pending_sends_.empty() && sends_awaiting_ack_.empty()); |
+ DCHECK(sends_awaiting_ack_.empty()); |
if (pending_cancel_.is_null()) |
return; |
@@ -179,10 +125,6 @@ void DataSender::RunCancelCallback() { |
void DataSender::ShutDown() { |
shut_down_ = true; |
- while (!pending_sends_.empty()) { |
- pending_sends_.front()->DispatchFatalError(); |
- pending_sends_.pop(); |
- } |
while (!sends_awaiting_ack_.empty()) { |
sends_awaiting_ack_.front()->DispatchFatalError(); |
sends_awaiting_ack_.pop(); |
@@ -193,64 +135,38 @@ void DataSender::ShutDown() { |
DataSender::PendingSend::PendingSend(const base::StringPiece& data, |
const DataSentCallback& callback, |
const SendErrorCallback& error_callback, |
- int32_t fatal_error_value) |
+ DataSender* sender) |
: data_(data), |
callback_(callback), |
error_callback_(error_callback), |
- fatal_error_value_(fatal_error_value), |
- bytes_sent_(0), |
- bytes_acked_(0) { |
-} |
- |
-bool DataSender::PendingSend::ReportBytesSent(uint32_t* num_bytes) { |
- ReportBytesSentInternal(num_bytes); |
- if (bytes_acked_ < data_.size()) |
- return false; |
- |
- base::MessageLoop::current()->PostTask(FROM_HERE, |
- base::Bind(callback_, bytes_acked_)); |
- return true; |
+ sender_(sender) { |
} |
-uint32_t DataSender::PendingSend::ReportBytesSentAndError(uint32_t* num_bytes, |
- int32_t error) { |
- ReportBytesSentInternal(num_bytes); |
- if (*num_bytes > 0) { |
+void DataSender::PendingSend::OnDataSent(uint32_t num_bytes, int32_t error) { |
+ if (error) { |
+ base::MessageLoop::current()->PostTask( |
+ FROM_HERE, base::Bind(error_callback_, num_bytes, error)); |
+ sender_->SendFailed(error); |
+ } else { |
+ DCHECK(num_bytes == data_.size()); |
base::MessageLoop::current()->PostTask(FROM_HERE, |
- base::Bind(callback_, bytes_acked_)); |
- return 0; |
+ base::Bind(callback_, num_bytes)); |
+ sender_->SendComplete(); |
} |
- base::MessageLoop::current()->PostTask( |
- FROM_HERE, base::Bind(error_callback_, bytes_acked_, error)); |
- return bytes_sent_ - bytes_acked_; |
} |
void DataSender::PendingSend::DispatchFatalError() { |
base::MessageLoop::current()->PostTask( |
- FROM_HERE, base::Bind(error_callback_, 0, fatal_error_value_)); |
+ FROM_HERE, base::Bind(error_callback_, 0, sender_->fatal_error_value_)); |
} |
-bool DataSender::PendingSend::SendData(serial::DataSink* sink, |
- uint32_t* available_buffer_size) { |
- uint32_t num_bytes_to_send = |
- std::min(static_cast<uint32_t>(data_.size() - bytes_sent_), |
- *available_buffer_size); |
+void DataSender::PendingSend::SendData() { |
+ uint32_t num_bytes_to_send = static_cast<uint32_t>(data_.size()); |
mojo::Array<uint8_t> bytes(num_bytes_to_send); |
- memcpy(&bytes[0], data_.data() + bytes_sent_, num_bytes_to_send); |
- bytes_sent_ += num_bytes_to_send; |
- *available_buffer_size -= num_bytes_to_send; |
- sink->OnData(bytes.Pass()); |
- return bytes_sent_ == data_.size(); |
-} |
- |
-void DataSender::PendingSend::ReportBytesSentInternal(uint32_t* num_bytes) { |
- bytes_acked_ += *num_bytes; |
- if (bytes_acked_ > bytes_sent_) { |
- *num_bytes = bytes_acked_ - bytes_sent_; |
- bytes_acked_ = bytes_sent_; |
- } else { |
- *num_bytes = 0; |
- } |
+ memcpy(&bytes[0], data_.data(), num_bytes_to_send); |
+ sender_->sink_->OnData( |
+ bytes.Pass(), |
+ base::Bind(&DataSender::PendingSend::OnDataSent, base::Unretained(this))); |
} |
} // namespace device |