| OLD | NEW |
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "device/serial/data_sink_receiver.h" | 5 #include "device/serial/data_sink_receiver.h" |
| 6 | 6 |
| 7 #include <limits> | 7 #include <limits> |
| 8 | 8 |
| 9 #include "base/bind.h" | 9 #include "base/bind.h" |
| 10 #include "base/message_loop/message_loop.h" | 10 #include "base/message_loop/message_loop.h" |
| (...skipping 27 matching lines...) Expand all Loading... |
| 38 // Whether this receive has been cancelled. | 38 // Whether this receive has been cancelled. |
| 39 bool cancelled_; | 39 bool cancelled_; |
| 40 | 40 |
| 41 // If |cancelled_|, contains the cancellation error to report. | 41 // If |cancelled_|, contains the cancellation error to report. |
| 42 int32_t cancellation_error_; | 42 int32_t cancellation_error_; |
| 43 }; | 43 }; |
| 44 | 44 |
| 45 // A frame of data received from the client. | 45 // A frame of data received from the client. |
| 46 class DataSinkReceiver::DataFrame { | 46 class DataSinkReceiver::DataFrame { |
| 47 public: | 47 public: |
| 48 explicit DataFrame(mojo::Array<uint8_t> data); | 48 explicit DataFrame(mojo::Array<uint8_t> data, |
| 49 const mojo::Callback<void(uint32_t, int32_t)>& callback); |
| 49 | 50 |
| 50 // Returns the number of uncomsumed bytes remaining of this data frame. | 51 // Returns the number of unconsumed bytes remaining of this data frame. |
| 51 uint32_t GetRemainingBytes(); | 52 uint32_t GetRemainingBytes(); |
| 52 | 53 |
| 53 // Returns a pointer to the remaining data to be consumed. | 54 // Returns a pointer to the remaining data to be consumed. |
| 54 const char* GetData(); | 55 const char* GetData(); |
| 55 | 56 |
| 56 // Reports that |bytes_read| bytes have been consumed. | 57 // Reports that |bytes_read| bytes have been consumed. |
| 57 void OnDataConsumed(uint32_t bytes_read); | 58 void OnDataConsumed(uint32_t bytes_read); |
| 58 | 59 |
| 60 // Reports that an error occurred. |
| 61 void ReportError(uint32_t bytes_read, int32_t error); |
| 62 |
| 59 private: | 63 private: |
| 60 mojo::Array<uint8_t> data_; | 64 mojo::Array<uint8_t> data_; |
| 61 uint32_t offset_; | 65 uint32_t offset_; |
| 66 const mojo::Callback<void(uint32_t, int32_t)> callback_; |
| 62 }; | 67 }; |
| 63 | 68 |
| 64 DataSinkReceiver::DataSinkReceiver(const ReadyCallback& ready_callback, | 69 DataSinkReceiver::DataSinkReceiver( |
| 65 const CancelCallback& cancel_callback, | 70 mojo::InterfaceRequest<serial::DataSink> request, |
| 66 const ErrorCallback& error_callback) | 71 const ReadyCallback& ready_callback, |
| 67 : ready_callback_(ready_callback), | 72 const CancelCallback& cancel_callback, |
| 73 const ErrorCallback& error_callback) |
| 74 : binding_(this, request.Pass()), |
| 75 ready_callback_(ready_callback), |
| 68 cancel_callback_(cancel_callback), | 76 cancel_callback_(cancel_callback), |
| 69 error_callback_(error_callback), | 77 error_callback_(error_callback), |
| 70 flush_pending_(false), | 78 current_error_(0), |
| 71 buffer_in_use_(NULL), | 79 buffer_in_use_(NULL), |
| 72 initialized_(false), | |
| 73 available_buffer_capacity_(0), | |
| 74 shut_down_(false), | 80 shut_down_(false), |
| 75 weak_factory_(this) { | 81 weak_factory_(this) { |
| 82 binding_.set_error_handler(this); |
| 76 } | 83 } |
| 77 | 84 |
| 78 void DataSinkReceiver::ShutDown() { | 85 void DataSinkReceiver::ShutDown() { |
| 79 shut_down_ = true; | 86 shut_down_ = true; |
| 80 } | 87 } |
| 81 | 88 |
| 82 DataSinkReceiver::~DataSinkReceiver() { | 89 DataSinkReceiver::~DataSinkReceiver() { |
| 83 } | 90 } |
| 84 | 91 |
| 85 void DataSinkReceiver::Init(uint32_t buffer_size) { | |
| 86 if (initialized_) { | |
| 87 ShutDown(); | |
| 88 return; | |
| 89 } | |
| 90 initialized_ = true; | |
| 91 available_buffer_capacity_ = buffer_size; | |
| 92 } | |
| 93 | |
| 94 void DataSinkReceiver::Cancel(int32_t error) { | 92 void DataSinkReceiver::Cancel(int32_t error) { |
| 95 // If we have sent a ReportBytesSentAndError but have not received the | 93 // If we have sent a ReportBytesSentAndError but have not received the |
| 96 // response, that ReportBytesSentAndError message will appear to the | 94 // response, that ReportBytesSentAndError message will appear to the |
| 97 // DataSinkClient to be caused by this Cancel message. In that case, we ignore | 95 // DataSinkClient to be caused by this Cancel message. In that case, we ignore |
| 98 // the cancel. | 96 // the cancel. |
| 99 if (flush_pending_) | 97 if (current_error_) |
| 100 return; | 98 return; |
| 101 | 99 |
| 102 // If there is a buffer is in use, mark the buffer as cancelled and notify the | 100 // If there is a buffer is in use, mark the buffer as cancelled and notify the |
| 103 // client by calling |cancel_callback_|. The sink implementation may or may | 101 // client by calling |cancel_callback_|. The sink implementation may or may |
| 104 // not take the cancellation into account when deciding what error (if any) to | 102 // not take the cancellation into account when deciding what error (if any) to |
| 105 // return. If the sink returns an error, we ignore the cancellation error. | 103 // return. If the sink returns an error, we ignore the cancellation error. |
| 106 // Otherwise, if the sink does not report an error, we override that with the | 104 // Otherwise, if the sink does not report an error, we override that with the |
| 107 // cancellation error. Once a cancellation has been received, the next report | 105 // cancellation error. Once a cancellation has been received, the next report |
| 108 // sent to the client will always contain an error; the error returned by the | 106 // sent to the client will always contain an error; the error returned by the |
| 109 // sink or the cancellation error if the sink does not return an error. | 107 // sink or the cancellation error if the sink does not return an error. |
| 110 if (buffer_in_use_) { | 108 if (buffer_in_use_) { |
| 111 buffer_in_use_->Cancel(error); | 109 buffer_in_use_->Cancel(error); |
| 112 if (!cancel_callback_.is_null()) | 110 if (!cancel_callback_.is_null()) |
| 113 cancel_callback_.Run(error); | 111 cancel_callback_.Run(error); |
| 114 return; | 112 return; |
| 115 } | 113 } |
| 116 ReportBytesSentAndError(0, error); | 114 ReportError(0, error); |
| 117 } | 115 } |
| 118 | 116 |
| 119 void DataSinkReceiver::OnData(mojo::Array<uint8_t> data) { | 117 void DataSinkReceiver::OnData( |
| 120 if (!initialized_) { | 118 mojo::Array<uint8_t> data, |
| 121 ShutDown(); | 119 const mojo::Callback<void(uint32_t, int32_t)>& callback) { |
| 120 if (current_error_) { |
| 121 callback.Run(0, current_error_); |
| 122 return; | 122 return; |
| 123 } | 123 } |
| 124 if (data.size() > available_buffer_capacity_) { | 124 pending_data_buffers_.push( |
| 125 ShutDown(); | 125 linked_ptr<DataFrame>(new DataFrame(data.Pass(), callback))); |
| 126 return; | 126 if (!buffer_in_use_) |
| 127 } | |
| 128 available_buffer_capacity_ -= static_cast<uint32_t>(data.size()); | |
| 129 pending_data_buffers_.push(linked_ptr<DataFrame>(new DataFrame(data.Pass()))); | |
| 130 if (!buffer_in_use_ && !flush_pending_) | |
| 131 RunReadyCallback(); | 127 RunReadyCallback(); |
| 132 } | 128 } |
| 133 | 129 |
| 134 void DataSinkReceiver::OnConnectionError() { | 130 void DataSinkReceiver::OnConnectionError() { |
| 135 DispatchFatalError(); | 131 DispatchFatalError(); |
| 136 } | 132 } |
| 137 | 133 |
| 138 void DataSinkReceiver::RunReadyCallback() { | 134 void DataSinkReceiver::RunReadyCallback() { |
| 139 DCHECK(!shut_down_ && !flush_pending_); | 135 DCHECK(!shut_down_ && !current_error_); |
| 140 // If data arrives while a call to RunReadyCallback() is posted, we can be | 136 // If data arrives while a call to RunReadyCallback() is posted, we can be |
| 141 // called with buffer_in_use_ already set. | 137 // called with buffer_in_use_ already set. |
| 142 if (buffer_in_use_) | 138 if (buffer_in_use_) |
| 143 return; | 139 return; |
| 144 buffer_in_use_ = | 140 buffer_in_use_ = |
| 145 new Buffer(this, | 141 new Buffer(this, |
| 146 pending_data_buffers_.front()->GetData(), | 142 pending_data_buffers_.front()->GetData(), |
| 147 pending_data_buffers_.front()->GetRemainingBytes()); | 143 pending_data_buffers_.front()->GetRemainingBytes()); |
| 148 ready_callback_.Run(scoped_ptr<ReadOnlyBuffer>(buffer_in_use_)); | 144 ready_callback_.Run(scoped_ptr<ReadOnlyBuffer>(buffer_in_use_)); |
| 149 } | 145 } |
| 150 | 146 |
| 151 void DataSinkReceiver::Done(uint32_t bytes_read) { | 147 void DataSinkReceiver::Done(uint32_t bytes_read) { |
| 152 if (!DoneInternal(bytes_read)) | 148 if (!DoneInternal(bytes_read)) |
| 153 return; | 149 return; |
| 154 client()->ReportBytesSent(bytes_read); | 150 pending_data_buffers_.front()->OnDataConsumed(bytes_read); |
| 151 if (pending_data_buffers_.front()->GetRemainingBytes() == 0) |
| 152 pending_data_buffers_.pop(); |
| 155 if (!pending_data_buffers_.empty()) { | 153 if (!pending_data_buffers_.empty()) { |
| 156 base::MessageLoop::current()->PostTask( | 154 base::MessageLoop::current()->PostTask( |
| 157 FROM_HERE, | 155 FROM_HERE, |
| 158 base::Bind(&DataSinkReceiver::RunReadyCallback, | 156 base::Bind(&DataSinkReceiver::RunReadyCallback, |
| 159 weak_factory_.GetWeakPtr())); | 157 weak_factory_.GetWeakPtr())); |
| 160 } | 158 } |
| 161 } | 159 } |
| 162 | 160 |
| 163 void DataSinkReceiver::DoneWithError(uint32_t bytes_read, int32_t error) { | 161 void DataSinkReceiver::DoneWithError(uint32_t bytes_read, int32_t error) { |
| 164 if (!DoneInternal(bytes_read)) | 162 if (!DoneInternal(bytes_read)) |
| 165 return; | 163 return; |
| 166 ReportBytesSentAndError(bytes_read, error); | 164 ReportError(bytes_read, error); |
| 167 } | 165 } |
| 168 | 166 |
| 169 bool DataSinkReceiver::DoneInternal(uint32_t bytes_read) { | 167 bool DataSinkReceiver::DoneInternal(uint32_t bytes_read) { |
| 170 if (shut_down_) | 168 if (shut_down_) |
| 171 return false; | 169 return false; |
| 172 | 170 |
| 173 DCHECK(buffer_in_use_); | 171 DCHECK(buffer_in_use_); |
| 174 buffer_in_use_ = NULL; | 172 buffer_in_use_ = NULL; |
| 175 available_buffer_capacity_ += bytes_read; | |
| 176 pending_data_buffers_.front()->OnDataConsumed(bytes_read); | |
| 177 if (pending_data_buffers_.front()->GetRemainingBytes() == 0) | |
| 178 pending_data_buffers_.pop(); | |
| 179 return true; | 173 return true; |
| 180 } | 174 } |
| 181 | 175 |
| 182 void DataSinkReceiver::ReportBytesSentAndError(uint32_t bytes_read, | 176 void DataSinkReceiver::ReportError(uint32_t bytes_read, int32_t error) { |
| 183 int32_t error) { | |
| 184 // When we encounter an error, we must discard the data from any send buffers | 177 // When we encounter an error, we must discard the data from any send buffers |
| 185 // transmitted by the DataSinkClient before it receives this error. | 178 // transmitted by the DataSink client before it receives this error. |
| 186 flush_pending_ = true; | 179 DCHECK(error); |
| 187 client()->ReportBytesSentAndError( | 180 current_error_ = error; |
| 188 bytes_read, | 181 while (!pending_data_buffers_.empty()) { |
| 189 error, | 182 pending_data_buffers_.front()->ReportError(bytes_read, error); |
| 190 base::Bind(&DataSinkReceiver::DoFlush, weak_factory_.GetWeakPtr())); | 183 pending_data_buffers_.pop(); |
| 184 bytes_read = 0; |
| 185 } |
| 191 } | 186 } |
| 192 | 187 |
| 193 void DataSinkReceiver::DoFlush() { | 188 void DataSinkReceiver::ClearError() { |
| 194 DCHECK(flush_pending_); | 189 current_error_ = 0; |
| 195 flush_pending_ = false; | |
| 196 while (!pending_data_buffers_.empty()) { | |
| 197 available_buffer_capacity_ += | |
| 198 pending_data_buffers_.front()->GetRemainingBytes(); | |
| 199 pending_data_buffers_.pop(); | |
| 200 } | |
| 201 } | 190 } |
| 202 | 191 |
| 203 void DataSinkReceiver::DispatchFatalError() { | 192 void DataSinkReceiver::DispatchFatalError() { |
| 204 if (shut_down_) | 193 if (shut_down_) |
| 205 return; | 194 return; |
| 206 | 195 |
| 207 ShutDown(); | 196 ShutDown(); |
| 208 if (!error_callback_.is_null()) | 197 if (!error_callback_.is_null()) |
| 209 error_callback_.Run(); | 198 error_callback_.Run(); |
| 210 } | 199 } |
| (...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 254 | 243 |
| 255 void DataSinkReceiver::Buffer::DoneWithError(uint32_t bytes_read, | 244 void DataSinkReceiver::Buffer::DoneWithError(uint32_t bytes_read, |
| 256 int32_t error) { | 245 int32_t error) { |
| 257 scoped_refptr<DataSinkReceiver> receiver = receiver_; | 246 scoped_refptr<DataSinkReceiver> receiver = receiver_; |
| 258 receiver_ = nullptr; | 247 receiver_ = nullptr; |
| 259 receiver->DoneWithError(bytes_read, error); | 248 receiver->DoneWithError(bytes_read, error); |
| 260 buffer_ = NULL; | 249 buffer_ = NULL; |
| 261 buffer_size_ = 0; | 250 buffer_size_ = 0; |
| 262 } | 251 } |
| 263 | 252 |
| 264 DataSinkReceiver::DataFrame::DataFrame(mojo::Array<uint8_t> data) | 253 DataSinkReceiver::DataFrame::DataFrame( |
| 265 : data_(data.Pass()), offset_(0) { | 254 mojo::Array<uint8_t> data, |
| 255 const mojo::Callback<void(uint32_t, int32_t)>& callback) |
| 256 : data_(data.Pass()), offset_(0), callback_(callback) { |
| 266 DCHECK_LT(0u, data_.size()); | 257 DCHECK_LT(0u, data_.size()); |
| 267 } | 258 } |
| 268 | 259 |
| 269 // Returns the number of uncomsumed bytes remaining of this data frame. | 260 // Returns the number of uncomsumed bytes remaining of this data frame. |
| 270 uint32_t DataSinkReceiver::DataFrame::GetRemainingBytes() { | 261 uint32_t DataSinkReceiver::DataFrame::GetRemainingBytes() { |
| 271 return static_cast<uint32_t>(data_.size() - offset_); | 262 return static_cast<uint32_t>(data_.size() - offset_); |
| 272 } | 263 } |
| 273 | 264 |
| 274 // Returns a pointer to the remaining data to be consumed. | 265 // Returns a pointer to the remaining data to be consumed. |
| 275 const char* DataSinkReceiver::DataFrame::GetData() { | 266 const char* DataSinkReceiver::DataFrame::GetData() { |
| 276 DCHECK_LT(offset_, data_.size()); | 267 DCHECK_LT(offset_, data_.size()); |
| 277 return reinterpret_cast<const char*>(&data_[0]) + offset_; | 268 return reinterpret_cast<const char*>(&data_[0]) + offset_; |
| 278 } | 269 } |
| 279 | 270 |
| 280 void DataSinkReceiver::DataFrame::OnDataConsumed(uint32_t bytes_read) { | 271 void DataSinkReceiver::DataFrame::OnDataConsumed(uint32_t bytes_read) { |
| 281 offset_ += bytes_read; | 272 offset_ += bytes_read; |
| 282 DCHECK_LE(offset_, data_.size()); | 273 DCHECK_LE(offset_, data_.size()); |
| 274 if (offset_ == data_.size()) |
| 275 callback_.Run(offset_, 0); |
| 276 } |
| 277 void DataSinkReceiver::DataFrame::ReportError(uint32_t bytes_read, |
| 278 int32_t error) { |
| 279 offset_ += bytes_read; |
| 280 DCHECK_LE(offset_, data_.size()); |
| 281 callback_.Run(offset_, error); |
| 283 } | 282 } |
| 284 | 283 |
| 285 } // namespace device | 284 } // namespace device |
| OLD | NEW |