Index: device/serial/data_sink_receiver.cc |
diff --git a/device/serial/data_sink_receiver.cc b/device/serial/data_sink_receiver.cc |
index 170f8b2c5b052e80d1c9be8a945a2d3492da3326..ff66c1cb9c1729eae9e92d358f216d4fd0d10151 100644 |
--- a/device/serial/data_sink_receiver.cc |
+++ b/device/serial/data_sink_receiver.cc |
@@ -45,9 +45,10 @@ class DataSinkReceiver::Buffer : public ReadOnlyBuffer { |
// A frame of data received from the client. |
class DataSinkReceiver::DataFrame { |
public: |
- explicit DataFrame(mojo::Array<uint8_t> data); |
+ explicit DataFrame(mojo::Array<uint8_t> data, |
+ const mojo::Callback<void(uint32_t, int32_t)>& callback); |
- // Returns the number of uncomsumed bytes remaining of this data frame. |
+ // Returns the number of unconsumed bytes remaining of this data frame. |
uint32_t GetRemainingBytes(); |
// Returns a pointer to the remaining data to be consumed. |
@@ -56,23 +57,29 @@ class DataSinkReceiver::DataFrame { |
// Reports that |bytes_read| bytes have been consumed. |
void OnDataConsumed(uint32_t bytes_read); |
+ // Reports that an error occurred. |
+ void ReportError(uint32_t bytes_read, int32_t error); |
+ |
private: |
mojo::Array<uint8_t> data_; |
uint32_t offset_; |
+ const mojo::Callback<void(uint32_t, int32_t)> callback_; |
}; |
-DataSinkReceiver::DataSinkReceiver(const ReadyCallback& ready_callback, |
- const CancelCallback& cancel_callback, |
- const ErrorCallback& error_callback) |
- : ready_callback_(ready_callback), |
+DataSinkReceiver::DataSinkReceiver( |
+ mojo::InterfaceRequest<serial::DataSink> request, |
+ const ReadyCallback& ready_callback, |
+ const CancelCallback& cancel_callback, |
+ const ErrorCallback& error_callback) |
+ : binding_(this, request.Pass()), |
+ ready_callback_(ready_callback), |
cancel_callback_(cancel_callback), |
error_callback_(error_callback), |
- flush_pending_(false), |
+ current_error_(0), |
buffer_in_use_(NULL), |
- initialized_(false), |
- available_buffer_capacity_(0), |
shut_down_(false), |
weak_factory_(this) { |
+ binding_.set_error_handler(this); |
} |
void DataSinkReceiver::ShutDown() { |
@@ -82,21 +89,12 @@ void DataSinkReceiver::ShutDown() { |
DataSinkReceiver::~DataSinkReceiver() { |
} |
-void DataSinkReceiver::Init(uint32_t buffer_size) { |
- if (initialized_) { |
- ShutDown(); |
- return; |
- } |
- initialized_ = true; |
- available_buffer_capacity_ = buffer_size; |
-} |
- |
void DataSinkReceiver::Cancel(int32_t error) { |
// If we have sent a ReportBytesSentAndError but have not received the |
// response, that ReportBytesSentAndError message will appear to the |
// DataSinkClient to be caused by this Cancel message. In that case, we ignore |
// the cancel. |
- if (flush_pending_) |
+ if (current_error_) |
return; |
// If there is a buffer is in use, mark the buffer as cancelled and notify the |
@@ -113,21 +111,19 @@ void DataSinkReceiver::Cancel(int32_t error) { |
cancel_callback_.Run(error); |
return; |
} |
- ReportBytesSentAndError(0, error); |
+ ReportError(0, error); |
} |
-void DataSinkReceiver::OnData(mojo::Array<uint8_t> data) { |
- if (!initialized_) { |
- ShutDown(); |
+void DataSinkReceiver::OnData( |
+ mojo::Array<uint8_t> data, |
+ const mojo::Callback<void(uint32_t, int32_t)>& callback) { |
+ if (current_error_) { |
+ callback.Run(0, current_error_); |
return; |
} |
- if (data.size() > available_buffer_capacity_) { |
- ShutDown(); |
- return; |
- } |
- available_buffer_capacity_ -= static_cast<uint32_t>(data.size()); |
- pending_data_buffers_.push(linked_ptr<DataFrame>(new DataFrame(data.Pass()))); |
- if (!buffer_in_use_ && !flush_pending_) |
+ pending_data_buffers_.push( |
+ linked_ptr<DataFrame>(new DataFrame(data.Pass(), callback))); |
+ if (!buffer_in_use_) |
RunReadyCallback(); |
} |
@@ -136,7 +132,7 @@ void DataSinkReceiver::OnConnectionError() { |
} |
void DataSinkReceiver::RunReadyCallback() { |
- DCHECK(!shut_down_ && !flush_pending_); |
+ DCHECK(!shut_down_ && !current_error_); |
// If data arrives while a call to RunReadyCallback() is posted, we can be |
// called with buffer_in_use_ already set. |
if (buffer_in_use_) |
@@ -151,7 +147,9 @@ void DataSinkReceiver::RunReadyCallback() { |
void DataSinkReceiver::Done(uint32_t bytes_read) { |
if (!DoneInternal(bytes_read)) |
return; |
- client()->ReportBytesSent(bytes_read); |
+ pending_data_buffers_.front()->OnDataConsumed(bytes_read); |
+ if (pending_data_buffers_.front()->GetRemainingBytes() == 0) |
+ pending_data_buffers_.pop(); |
if (!pending_data_buffers_.empty()) { |
base::MessageLoop::current()->PostTask( |
FROM_HERE, |
@@ -163,7 +161,7 @@ void DataSinkReceiver::Done(uint32_t bytes_read) { |
void DataSinkReceiver::DoneWithError(uint32_t bytes_read, int32_t error) { |
if (!DoneInternal(bytes_read)) |
return; |
- ReportBytesSentAndError(bytes_read, error); |
+ ReportError(bytes_read, error); |
} |
bool DataSinkReceiver::DoneInternal(uint32_t bytes_read) { |
@@ -172,34 +170,25 @@ bool DataSinkReceiver::DoneInternal(uint32_t bytes_read) { |
DCHECK(buffer_in_use_); |
buffer_in_use_ = NULL; |
- available_buffer_capacity_ += bytes_read; |
- pending_data_buffers_.front()->OnDataConsumed(bytes_read); |
- if (pending_data_buffers_.front()->GetRemainingBytes() == 0) |
- pending_data_buffers_.pop(); |
return true; |
} |
-void DataSinkReceiver::ReportBytesSentAndError(uint32_t bytes_read, |
- int32_t error) { |
+void DataSinkReceiver::ReportError(uint32_t bytes_read, int32_t error) { |
// When we encounter an error, we must discard the data from any send buffers |
- // transmitted by the DataSinkClient before it receives this error. |
- flush_pending_ = true; |
- client()->ReportBytesSentAndError( |
- bytes_read, |
- error, |
- base::Bind(&DataSinkReceiver::DoFlush, weak_factory_.GetWeakPtr())); |
-} |
- |
-void DataSinkReceiver::DoFlush() { |
- DCHECK(flush_pending_); |
- flush_pending_ = false; |
+ // transmitted by the DataSink client before it receives this error. |
+ DCHECK(error); |
+ current_error_ = error; |
while (!pending_data_buffers_.empty()) { |
- available_buffer_capacity_ += |
- pending_data_buffers_.front()->GetRemainingBytes(); |
+ pending_data_buffers_.front()->ReportError(bytes_read, error); |
pending_data_buffers_.pop(); |
+ bytes_read = 0; |
} |
} |
+void DataSinkReceiver::ClearError() { |
+ current_error_ = 0; |
+} |
+ |
void DataSinkReceiver::DispatchFatalError() { |
if (shut_down_) |
return; |
@@ -261,8 +250,10 @@ void DataSinkReceiver::Buffer::DoneWithError(uint32_t bytes_read, |
buffer_size_ = 0; |
} |
-DataSinkReceiver::DataFrame::DataFrame(mojo::Array<uint8_t> data) |
- : data_(data.Pass()), offset_(0) { |
+DataSinkReceiver::DataFrame::DataFrame( |
+ mojo::Array<uint8_t> data, |
+ const mojo::Callback<void(uint32_t, int32_t)>& callback) |
+ : data_(data.Pass()), offset_(0), callback_(callback) { |
DCHECK_LT(0u, data_.size()); |
} |
@@ -280,6 +271,14 @@ const char* DataSinkReceiver::DataFrame::GetData() { |
void DataSinkReceiver::DataFrame::OnDataConsumed(uint32_t bytes_read) { |
offset_ += bytes_read; |
DCHECK_LE(offset_, data_.size()); |
+ if (offset_ == data_.size()) |
+ callback_.Run(offset_, 0); |
+} |
+void DataSinkReceiver::DataFrame::ReportError(uint32_t bytes_read, |
+ int32_t error) { |
+ offset_ += bytes_read; |
+ DCHECK_LE(offset_, data_.size()); |
+ callback_.Run(offset_, error); |
} |
} // namespace device |