Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(135)

Unified Diff: device/serial/data_sender.cc

Issue 889283002: Remove Client= from device/serial/data_stream.mojom. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: This time without racing message pipes Created 5 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « device/serial/data_sender.h ('k') | device/serial/data_sink_receiver.h » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: device/serial/data_sender.cc
diff --git a/device/serial/data_sender.cc b/device/serial/data_sender.cc
index 18b1a43ef3357543a32a99bd16d95843262e0a35..dbd17760161104010d90dd336ea33940c40e496c 100644
--- a/device/serial/data_sender.cc
+++ b/device/serial/data_sender.cc
@@ -17,30 +17,21 @@ class DataSender::PendingSend {
PendingSend(const base::StringPiece& data,
const DataSentCallback& callback,
const SendErrorCallback& error_callback,
- int32_t fatal_error_value);
-
- // Invoked to report that |num_bytes| of data have been sent. Subtracts the
- // number of bytes that were part of this send from |num_bytes|. Returns
- // whether this send has been completed. If this send has been completed, this
- // calls |callback_|.
- bool ReportBytesSent(uint32_t* num_bytes);
-
- // Invoked to report that |num_bytes| of data have been sent and then an
- // error, |error| was encountered. Subtracts the number of bytes that were
- // part of this send from |num_bytes|. If this send was not completed before
- // the error, this calls |error_callback_| to report the error. Otherwise,
- // this calls |callback_|. Returns the number of bytes sent but not acked.
- uint32_t ReportBytesSentAndError(uint32_t* num_bytes, int32_t error);
+ DataSender* sender);
// Reports |fatal_error_value_| to |receive_error_callback_|.
void DispatchFatalError();
// Attempts to send any data not yet sent to |sink|.
- bool SendData(serial::DataSink* sink, uint32_t* available_buffer_size);
+ void SendData();
private:
- // Invoked to update |bytes_acked_| and |num_bytes|.
- void ReportBytesSentInternal(uint32_t* num_bytes);
+ // Invoked to report that |num_bytes| of data have been sent and then an
+ // error, |error| was encountered. Subtracts the number of bytes that were
+ // part of this send from |num_bytes|. If this send was not completed before
+ // the error, this calls |error_callback_| to report the error. Otherwise,
+ // this calls |callback_|. Returns the number of bytes sent but not acked.
+ void OnDataSent(uint32_t num_bytes, int32_t error);
// The data to send.
const base::StringPiece data_;
@@ -51,14 +42,8 @@ class DataSender::PendingSend {
// The callback to report errors.
const SendErrorCallback error_callback_;
- // The error value to report when DispatchFatalError() is called.
- const int32_t fatal_error_value_;
-
- // The number of bytes sent to the DataSink.
- uint32_t bytes_sent_;
-
- // The number of bytes acked.
- uint32_t bytes_acked_;
+ // The DataSender that owns this PendingSend.
+ DataSender* sender_;
};
DataSender::DataSender(mojo::InterfacePtr<serial::DataSink> sink,
@@ -66,11 +51,8 @@ DataSender::DataSender(mojo::InterfacePtr<serial::DataSink> sink,
int32_t fatal_error_value)
: sink_(sink.Pass()),
fatal_error_value_(fatal_error_value),
- available_buffer_capacity_(buffer_size),
shut_down_(false) {
sink_.set_error_handler(this);
- sink_.set_client(this);
- sink_->Init(buffer_size);
}
DataSender::~DataSender() {
@@ -84,9 +66,10 @@ bool DataSender::Send(const base::StringPiece& data,
if (!pending_cancel_.is_null() || shut_down_)
return false;
- pending_sends_.push(linked_ptr<PendingSend>(
- new PendingSend(data, callback, error_callback, fatal_error_value_)));
- SendInternal();
+ linked_ptr<PendingSend> pending_send(
+ new PendingSend(data, callback, error_callback, this));
+ pending_send->SendData();
+ sends_awaiting_ack_.push(pending_send);
return true;
}
@@ -94,7 +77,7 @@ bool DataSender::Cancel(int32_t error, const CancelCallback& callback) {
DCHECK(!callback.is_null());
if (!pending_cancel_.is_null() || shut_down_)
return false;
- if (pending_sends_.empty() && sends_awaiting_ack_.empty()) {
+ if (sends_awaiting_ack_.empty()) {
base::MessageLoop::current()->PostTask(FROM_HERE, callback);
return true;
}
@@ -104,52 +87,25 @@ bool DataSender::Cancel(int32_t error, const CancelCallback& callback) {
return true;
}
-void DataSender::ReportBytesSent(uint32_t bytes_sent) {
+void DataSender::SendComplete() {
if (shut_down_)
return;
- available_buffer_capacity_ += bytes_sent;
- while (bytes_sent != 0 && !sends_awaiting_ack_.empty() &&
- sends_awaiting_ack_.front()->ReportBytesSent(&bytes_sent)) {
- sends_awaiting_ack_.pop();
- }
- if (bytes_sent > 0 && !pending_sends_.empty()) {
- bool finished = pending_sends_.front()->ReportBytesSent(&bytes_sent);
- DCHECK(!finished);
- if (finished) {
- ShutDown();
- return;
- }
- }
- if (bytes_sent != 0) {
- ShutDown();
- return;
- }
- if (pending_sends_.empty() && sends_awaiting_ack_.empty())
+ DCHECK(!sends_awaiting_ack_.empty());
+ sends_awaiting_ack_.pop();
+ if (sends_awaiting_ack_.empty())
RunCancelCallback();
- SendInternal();
}
-void DataSender::ReportBytesSentAndError(
- uint32_t bytes_sent,
- int32_t error,
- const mojo::Callback<void()>& callback) {
+void DataSender::SendFailed(int32_t error) {
if (shut_down_)
return;
- available_buffer_capacity_ += bytes_sent;
- while (!sends_awaiting_ack_.empty()) {
- available_buffer_capacity_ +=
- sends_awaiting_ack_.front()->ReportBytesSentAndError(&bytes_sent,
- error);
- sends_awaiting_ack_.pop();
- }
- while (!pending_sends_.empty()) {
- available_buffer_capacity_ +=
- pending_sends_.front()->ReportBytesSentAndError(&bytes_sent, error);
- pending_sends_.pop();
- }
- callback.Run();
+ DCHECK(!sends_awaiting_ack_.empty());
+ sends_awaiting_ack_.pop();
+ if (!sends_awaiting_ack_.empty())
+ return;
+ sink_->ClearError();
RunCancelCallback();
}
@@ -157,18 +113,8 @@ void DataSender::OnConnectionError() {
ShutDown();
}
-void DataSender::SendInternal() {
- while (!pending_sends_.empty() && available_buffer_capacity_) {
- if (pending_sends_.front()->SendData(sink_.get(),
- &available_buffer_capacity_)) {
- sends_awaiting_ack_.push(pending_sends_.front());
- pending_sends_.pop();
- }
- }
-}
-
void DataSender::RunCancelCallback() {
- DCHECK(pending_sends_.empty() && sends_awaiting_ack_.empty());
+ DCHECK(sends_awaiting_ack_.empty());
if (pending_cancel_.is_null())
return;
@@ -179,10 +125,6 @@ void DataSender::RunCancelCallback() {
void DataSender::ShutDown() {
shut_down_ = true;
- while (!pending_sends_.empty()) {
- pending_sends_.front()->DispatchFatalError();
- pending_sends_.pop();
- }
while (!sends_awaiting_ack_.empty()) {
sends_awaiting_ack_.front()->DispatchFatalError();
sends_awaiting_ack_.pop();
@@ -193,64 +135,38 @@ void DataSender::ShutDown() {
DataSender::PendingSend::PendingSend(const base::StringPiece& data,
const DataSentCallback& callback,
const SendErrorCallback& error_callback,
- int32_t fatal_error_value)
+ DataSender* sender)
: data_(data),
callback_(callback),
error_callback_(error_callback),
- fatal_error_value_(fatal_error_value),
- bytes_sent_(0),
- bytes_acked_(0) {
-}
-
-bool DataSender::PendingSend::ReportBytesSent(uint32_t* num_bytes) {
- ReportBytesSentInternal(num_bytes);
- if (bytes_acked_ < data_.size())
- return false;
-
- base::MessageLoop::current()->PostTask(FROM_HERE,
- base::Bind(callback_, bytes_acked_));
- return true;
+ sender_(sender) {
}
-uint32_t DataSender::PendingSend::ReportBytesSentAndError(uint32_t* num_bytes,
- int32_t error) {
- ReportBytesSentInternal(num_bytes);
- if (*num_bytes > 0) {
+void DataSender::PendingSend::OnDataSent(uint32_t num_bytes, int32_t error) {
+ if (error) {
+ base::MessageLoop::current()->PostTask(
+ FROM_HERE, base::Bind(error_callback_, num_bytes, error));
+ sender_->SendFailed(error);
+ } else {
+ DCHECK(num_bytes == data_.size());
base::MessageLoop::current()->PostTask(FROM_HERE,
- base::Bind(callback_, bytes_acked_));
- return 0;
+ base::Bind(callback_, num_bytes));
+ sender_->SendComplete();
}
- base::MessageLoop::current()->PostTask(
- FROM_HERE, base::Bind(error_callback_, bytes_acked_, error));
- return bytes_sent_ - bytes_acked_;
}
void DataSender::PendingSend::DispatchFatalError() {
base::MessageLoop::current()->PostTask(
- FROM_HERE, base::Bind(error_callback_, 0, fatal_error_value_));
+ FROM_HERE, base::Bind(error_callback_, 0, sender_->fatal_error_value_));
}
-bool DataSender::PendingSend::SendData(serial::DataSink* sink,
- uint32_t* available_buffer_size) {
- uint32_t num_bytes_to_send =
- std::min(static_cast<uint32_t>(data_.size() - bytes_sent_),
- *available_buffer_size);
+void DataSender::PendingSend::SendData() {
+ uint32_t num_bytes_to_send = static_cast<uint32_t>(data_.size());
mojo::Array<uint8_t> bytes(num_bytes_to_send);
- memcpy(&bytes[0], data_.data() + bytes_sent_, num_bytes_to_send);
- bytes_sent_ += num_bytes_to_send;
- *available_buffer_size -= num_bytes_to_send;
- sink->OnData(bytes.Pass());
- return bytes_sent_ == data_.size();
-}
-
-void DataSender::PendingSend::ReportBytesSentInternal(uint32_t* num_bytes) {
- bytes_acked_ += *num_bytes;
- if (bytes_acked_ > bytes_sent_) {
- *num_bytes = bytes_acked_ - bytes_sent_;
- bytes_acked_ = bytes_sent_;
- } else {
- *num_bytes = 0;
- }
+ memcpy(&bytes[0], data_.data(), num_bytes_to_send);
+ sender_->sink_->OnData(
+ bytes.Pass(),
+ base::Bind(&DataSender::PendingSend::OnDataSent, base::Unretained(this)));
}
} // namespace device
« no previous file with comments | « device/serial/data_sender.h ('k') | device/serial/data_sink_receiver.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698