| Index: device/serial/data_sink_receiver.cc
|
| diff --git a/device/serial/data_sink_receiver.cc b/device/serial/data_sink_receiver.cc
|
| index 170f8b2c5b052e80d1c9be8a945a2d3492da3326..ff66c1cb9c1729eae9e92d358f216d4fd0d10151 100644
|
| --- a/device/serial/data_sink_receiver.cc
|
| +++ b/device/serial/data_sink_receiver.cc
|
| @@ -45,9 +45,10 @@ class DataSinkReceiver::Buffer : public ReadOnlyBuffer {
|
| // A frame of data received from the client.
|
| class DataSinkReceiver::DataFrame {
|
| public:
|
| - explicit DataFrame(mojo::Array<uint8_t> data);
|
| + explicit DataFrame(mojo::Array<uint8_t> data,
|
| + const mojo::Callback<void(uint32_t, int32_t)>& callback);
|
|
|
| - // Returns the number of uncomsumed bytes remaining of this data frame.
|
| + // Returns the number of unconsumed bytes remaining of this data frame.
|
| uint32_t GetRemainingBytes();
|
|
|
| // Returns a pointer to the remaining data to be consumed.
|
| @@ -56,23 +57,29 @@ class DataSinkReceiver::DataFrame {
|
| // Reports that |bytes_read| bytes have been consumed.
|
| void OnDataConsumed(uint32_t bytes_read);
|
|
|
| + // Reports that an error occurred.
|
| + void ReportError(uint32_t bytes_read, int32_t error);
|
| +
|
| private:
|
| mojo::Array<uint8_t> data_;
|
| uint32_t offset_;
|
| + const mojo::Callback<void(uint32_t, int32_t)> callback_;
|
| };
|
|
|
| -DataSinkReceiver::DataSinkReceiver(const ReadyCallback& ready_callback,
|
| - const CancelCallback& cancel_callback,
|
| - const ErrorCallback& error_callback)
|
| - : ready_callback_(ready_callback),
|
| +DataSinkReceiver::DataSinkReceiver(
|
| + mojo::InterfaceRequest<serial::DataSink> request,
|
| + const ReadyCallback& ready_callback,
|
| + const CancelCallback& cancel_callback,
|
| + const ErrorCallback& error_callback)
|
| + : binding_(this, request.Pass()),
|
| + ready_callback_(ready_callback),
|
| cancel_callback_(cancel_callback),
|
| error_callback_(error_callback),
|
| - flush_pending_(false),
|
| + current_error_(0),
|
| buffer_in_use_(NULL),
|
| - initialized_(false),
|
| - available_buffer_capacity_(0),
|
| shut_down_(false),
|
| weak_factory_(this) {
|
| + binding_.set_error_handler(this);
|
| }
|
|
|
| void DataSinkReceiver::ShutDown() {
|
| @@ -82,21 +89,12 @@ void DataSinkReceiver::ShutDown() {
|
| DataSinkReceiver::~DataSinkReceiver() {
|
| }
|
|
|
| -void DataSinkReceiver::Init(uint32_t buffer_size) {
|
| - if (initialized_) {
|
| - ShutDown();
|
| - return;
|
| - }
|
| - initialized_ = true;
|
| - available_buffer_capacity_ = buffer_size;
|
| -}
|
| -
|
| void DataSinkReceiver::Cancel(int32_t error) {
|
| // If we have sent a ReportBytesSentAndError but have not received the
|
| // response, that ReportBytesSentAndError message will appear to the
|
| // DataSinkClient to be caused by this Cancel message. In that case, we ignore
|
| // the cancel.
|
| - if (flush_pending_)
|
| + if (current_error_)
|
| return;
|
|
|
| // If there is a buffer is in use, mark the buffer as cancelled and notify the
|
| @@ -113,21 +111,19 @@ void DataSinkReceiver::Cancel(int32_t error) {
|
| cancel_callback_.Run(error);
|
| return;
|
| }
|
| - ReportBytesSentAndError(0, error);
|
| + ReportError(0, error);
|
| }
|
|
|
| -void DataSinkReceiver::OnData(mojo::Array<uint8_t> data) {
|
| - if (!initialized_) {
|
| - ShutDown();
|
| +void DataSinkReceiver::OnData(
|
| + mojo::Array<uint8_t> data,
|
| + const mojo::Callback<void(uint32_t, int32_t)>& callback) {
|
| + if (current_error_) {
|
| + callback.Run(0, current_error_);
|
| return;
|
| }
|
| - if (data.size() > available_buffer_capacity_) {
|
| - ShutDown();
|
| - return;
|
| - }
|
| - available_buffer_capacity_ -= static_cast<uint32_t>(data.size());
|
| - pending_data_buffers_.push(linked_ptr<DataFrame>(new DataFrame(data.Pass())));
|
| - if (!buffer_in_use_ && !flush_pending_)
|
| + pending_data_buffers_.push(
|
| + linked_ptr<DataFrame>(new DataFrame(data.Pass(), callback)));
|
| + if (!buffer_in_use_)
|
| RunReadyCallback();
|
| }
|
|
|
| @@ -136,7 +132,7 @@ void DataSinkReceiver::OnConnectionError() {
|
| }
|
|
|
| void DataSinkReceiver::RunReadyCallback() {
|
| - DCHECK(!shut_down_ && !flush_pending_);
|
| + DCHECK(!shut_down_ && !current_error_);
|
| // If data arrives while a call to RunReadyCallback() is posted, we can be
|
| // called with buffer_in_use_ already set.
|
| if (buffer_in_use_)
|
| @@ -151,7 +147,9 @@ void DataSinkReceiver::RunReadyCallback() {
|
| void DataSinkReceiver::Done(uint32_t bytes_read) {
|
| if (!DoneInternal(bytes_read))
|
| return;
|
| - client()->ReportBytesSent(bytes_read);
|
| + pending_data_buffers_.front()->OnDataConsumed(bytes_read);
|
| + if (pending_data_buffers_.front()->GetRemainingBytes() == 0)
|
| + pending_data_buffers_.pop();
|
| if (!pending_data_buffers_.empty()) {
|
| base::MessageLoop::current()->PostTask(
|
| FROM_HERE,
|
| @@ -163,7 +161,7 @@ void DataSinkReceiver::Done(uint32_t bytes_read) {
|
| void DataSinkReceiver::DoneWithError(uint32_t bytes_read, int32_t error) {
|
| if (!DoneInternal(bytes_read))
|
| return;
|
| - ReportBytesSentAndError(bytes_read, error);
|
| + ReportError(bytes_read, error);
|
| }
|
|
|
| bool DataSinkReceiver::DoneInternal(uint32_t bytes_read) {
|
| @@ -172,34 +170,25 @@ bool DataSinkReceiver::DoneInternal(uint32_t bytes_read) {
|
|
|
| DCHECK(buffer_in_use_);
|
| buffer_in_use_ = NULL;
|
| - available_buffer_capacity_ += bytes_read;
|
| - pending_data_buffers_.front()->OnDataConsumed(bytes_read);
|
| - if (pending_data_buffers_.front()->GetRemainingBytes() == 0)
|
| - pending_data_buffers_.pop();
|
| return true;
|
| }
|
|
|
| -void DataSinkReceiver::ReportBytesSentAndError(uint32_t bytes_read,
|
| - int32_t error) {
|
| +void DataSinkReceiver::ReportError(uint32_t bytes_read, int32_t error) {
|
| // When we encounter an error, we must discard the data from any send buffers
|
| - // transmitted by the DataSinkClient before it receives this error.
|
| - flush_pending_ = true;
|
| - client()->ReportBytesSentAndError(
|
| - bytes_read,
|
| - error,
|
| - base::Bind(&DataSinkReceiver::DoFlush, weak_factory_.GetWeakPtr()));
|
| -}
|
| -
|
| -void DataSinkReceiver::DoFlush() {
|
| - DCHECK(flush_pending_);
|
| - flush_pending_ = false;
|
| + // transmitted by the DataSink client before it receives this error.
|
| + DCHECK(error);
|
| + current_error_ = error;
|
| while (!pending_data_buffers_.empty()) {
|
| - available_buffer_capacity_ +=
|
| - pending_data_buffers_.front()->GetRemainingBytes();
|
| + pending_data_buffers_.front()->ReportError(bytes_read, error);
|
| pending_data_buffers_.pop();
|
| + bytes_read = 0;
|
| }
|
| }
|
|
|
| +void DataSinkReceiver::ClearError() {
|
| + current_error_ = 0;
|
| +}
|
| +
|
| void DataSinkReceiver::DispatchFatalError() {
|
| if (shut_down_)
|
| return;
|
| @@ -261,8 +250,10 @@ void DataSinkReceiver::Buffer::DoneWithError(uint32_t bytes_read,
|
| buffer_size_ = 0;
|
| }
|
|
|
| -DataSinkReceiver::DataFrame::DataFrame(mojo::Array<uint8_t> data)
|
| - : data_(data.Pass()), offset_(0) {
|
| +DataSinkReceiver::DataFrame::DataFrame(
|
| + mojo::Array<uint8_t> data,
|
| + const mojo::Callback<void(uint32_t, int32_t)>& callback)
|
| + : data_(data.Pass()), offset_(0), callback_(callback) {
|
| DCHECK_LT(0u, data_.size());
|
| }
|
|
|
| @@ -280,6 +271,14 @@ const char* DataSinkReceiver::DataFrame::GetData() {
|
| void DataSinkReceiver::DataFrame::OnDataConsumed(uint32_t bytes_read) {
|
| offset_ += bytes_read;
|
| DCHECK_LE(offset_, data_.size());
|
| + if (offset_ == data_.size())
|
| + callback_.Run(offset_, 0);
|
| +}
|
| +void DataSinkReceiver::DataFrame::ReportError(uint32_t bytes_read,
|
| + int32_t error) {
|
| + offset_ += bytes_read;
|
| + DCHECK_LE(offset_, data_.size());
|
| + callback_.Run(offset_, error);
|
| }
|
|
|
| } // namespace device
|
|
|