| OLD | NEW |
| (Empty) |
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #include "device/serial/data_sink_receiver.h" | |
| 6 | |
| 7 #include <limits> | |
| 8 #include <memory> | |
| 9 #include <utility> | |
| 10 | |
| 11 #include "base/bind.h" | |
| 12 #include "base/location.h" | |
| 13 #include "base/single_thread_task_runner.h" | |
| 14 #include "base/threading/thread_task_runner_handle.h" | |
| 15 | |
| 16 namespace device { | |
| 17 | |
| 18 // A ReadOnlyBuffer implementation that provides a view of a buffer owned by a | |
| 19 // DataSinkReceiver. | |
| 20 class DataSinkReceiver::Buffer : public ReadOnlyBuffer { | |
| 21 public: | |
| 22 Buffer(scoped_refptr<DataSinkReceiver> receiver, | |
| 23 const char* buffer, | |
| 24 uint32_t buffer_size); | |
| 25 ~Buffer() override; | |
| 26 | |
| 27 void Cancel(int32_t error); | |
| 28 | |
| 29 // ReadOnlyBuffer overrides. | |
| 30 const char* GetData() override; | |
| 31 uint32_t GetSize() override; | |
| 32 void Done(uint32_t bytes_read) override; | |
| 33 void DoneWithError(uint32_t bytes_read, int32_t error) override; | |
| 34 | |
| 35 private: | |
| 36 // The DataSinkReceiver of whose buffer we are providing a view. | |
| 37 scoped_refptr<DataSinkReceiver> receiver_; | |
| 38 | |
| 39 const char* buffer_; | |
| 40 uint32_t buffer_size_; | |
| 41 | |
| 42 // Whether this receive has been cancelled. | |
| 43 bool cancelled_; | |
| 44 | |
| 45 // If |cancelled_|, contains the cancellation error to report. | |
| 46 int32_t cancellation_error_; | |
| 47 }; | |
| 48 | |
| 49 // A frame of data received from the client. | |
| 50 class DataSinkReceiver::DataFrame { | |
| 51 public: | |
| 52 explicit DataFrame(mojo::Array<uint8_t> data, | |
| 53 const serial::DataSink::OnDataCallback& callback); | |
| 54 | |
| 55 // Returns the number of unconsumed bytes remaining of this data frame. | |
| 56 uint32_t GetRemainingBytes(); | |
| 57 | |
| 58 // Returns a pointer to the remaining data to be consumed. | |
| 59 const char* GetData(); | |
| 60 | |
| 61 // Reports that |bytes_read| bytes have been consumed. | |
| 62 void OnDataConsumed(uint32_t bytes_read); | |
| 63 | |
| 64 // Reports that an error occurred. | |
| 65 void ReportError(uint32_t bytes_read, int32_t error); | |
| 66 | |
| 67 private: | |
| 68 mojo::Array<uint8_t> data_; | |
| 69 uint32_t offset_; | |
| 70 const serial::DataSink::OnDataCallback callback_; | |
| 71 }; | |
| 72 | |
| 73 DataSinkReceiver::DataSinkReceiver( | |
| 74 mojo::InterfaceRequest<serial::DataSink> request, | |
| 75 const ReadyCallback& ready_callback, | |
| 76 const CancelCallback& cancel_callback, | |
| 77 const ErrorCallback& error_callback) | |
| 78 : binding_(this, std::move(request)), | |
| 79 ready_callback_(ready_callback), | |
| 80 cancel_callback_(cancel_callback), | |
| 81 error_callback_(error_callback), | |
| 82 current_error_(0), | |
| 83 buffer_in_use_(NULL), | |
| 84 shut_down_(false), | |
| 85 weak_factory_(this) { | |
| 86 binding_.set_connection_error_handler( | |
| 87 base::Bind(&DataSinkReceiver::OnConnectionError, base::Unretained(this))); | |
| 88 } | |
| 89 | |
| 90 void DataSinkReceiver::ShutDown() { | |
| 91 shut_down_ = true; | |
| 92 } | |
| 93 | |
| 94 DataSinkReceiver::~DataSinkReceiver() { | |
| 95 } | |
| 96 | |
| 97 void DataSinkReceiver::Cancel(int32_t error) { | |
| 98 // If we have sent a ReportBytesSentAndError but have not received the | |
| 99 // response, that ReportBytesSentAndError message will appear to the | |
| 100 // DataSinkClient to be caused by this Cancel message. In that case, we ignore | |
| 101 // the cancel. | |
| 102 if (current_error_) | |
| 103 return; | |
| 104 | |
| 105 // If there is a buffer is in use, mark the buffer as cancelled and notify the | |
| 106 // client by calling |cancel_callback_|. The sink implementation may or may | |
| 107 // not take the cancellation into account when deciding what error (if any) to | |
| 108 // return. If the sink returns an error, we ignore the cancellation error. | |
| 109 // Otherwise, if the sink does not report an error, we override that with the | |
| 110 // cancellation error. Once a cancellation has been received, the next report | |
| 111 // sent to the client will always contain an error; the error returned by the | |
| 112 // sink or the cancellation error if the sink does not return an error. | |
| 113 if (buffer_in_use_) { | |
| 114 buffer_in_use_->Cancel(error); | |
| 115 if (!cancel_callback_.is_null()) | |
| 116 cancel_callback_.Run(error); | |
| 117 return; | |
| 118 } | |
| 119 ReportError(0, error); | |
| 120 } | |
| 121 | |
| 122 void DataSinkReceiver::OnData( | |
| 123 mojo::Array<uint8_t> data, | |
| 124 const serial::DataSink::OnDataCallback& callback) { | |
| 125 if (current_error_) { | |
| 126 callback.Run(0, current_error_); | |
| 127 return; | |
| 128 } | |
| 129 pending_data_buffers_.push( | |
| 130 linked_ptr<DataFrame>(new DataFrame(std::move(data), callback))); | |
| 131 if (!buffer_in_use_) | |
| 132 RunReadyCallback(); | |
| 133 } | |
| 134 | |
| 135 void DataSinkReceiver::OnConnectionError() { | |
| 136 DispatchFatalError(); | |
| 137 } | |
| 138 | |
| 139 void DataSinkReceiver::RunReadyCallback() { | |
| 140 DCHECK(!shut_down_ && !current_error_); | |
| 141 // If data arrives while a call to RunReadyCallback() is posted, we can be | |
| 142 // called with buffer_in_use_ already set. | |
| 143 if (buffer_in_use_) | |
| 144 return; | |
| 145 buffer_in_use_ = | |
| 146 new Buffer(this, | |
| 147 pending_data_buffers_.front()->GetData(), | |
| 148 pending_data_buffers_.front()->GetRemainingBytes()); | |
| 149 ready_callback_.Run(std::unique_ptr<ReadOnlyBuffer>(buffer_in_use_)); | |
| 150 } | |
| 151 | |
| 152 void DataSinkReceiver::Done(uint32_t bytes_read) { | |
| 153 if (!DoneInternal(bytes_read)) | |
| 154 return; | |
| 155 pending_data_buffers_.front()->OnDataConsumed(bytes_read); | |
| 156 if (pending_data_buffers_.front()->GetRemainingBytes() == 0) | |
| 157 pending_data_buffers_.pop(); | |
| 158 if (!pending_data_buffers_.empty()) { | |
| 159 base::ThreadTaskRunnerHandle::Get()->PostTask( | |
| 160 FROM_HERE, base::Bind(&DataSinkReceiver::RunReadyCallback, | |
| 161 weak_factory_.GetWeakPtr())); | |
| 162 } | |
| 163 } | |
| 164 | |
| 165 void DataSinkReceiver::DoneWithError(uint32_t bytes_read, int32_t error) { | |
| 166 if (!DoneInternal(bytes_read)) | |
| 167 return; | |
| 168 ReportError(bytes_read, error); | |
| 169 } | |
| 170 | |
| 171 bool DataSinkReceiver::DoneInternal(uint32_t bytes_read) { | |
| 172 if (shut_down_) | |
| 173 return false; | |
| 174 | |
| 175 DCHECK(buffer_in_use_); | |
| 176 buffer_in_use_ = NULL; | |
| 177 return true; | |
| 178 } | |
| 179 | |
| 180 void DataSinkReceiver::ReportError(uint32_t bytes_read, int32_t error) { | |
| 181 // When we encounter an error, we must discard the data from any send buffers | |
| 182 // transmitted by the DataSink client before it receives this error. | |
| 183 DCHECK(error); | |
| 184 current_error_ = error; | |
| 185 while (!pending_data_buffers_.empty()) { | |
| 186 pending_data_buffers_.front()->ReportError(bytes_read, error); | |
| 187 pending_data_buffers_.pop(); | |
| 188 bytes_read = 0; | |
| 189 } | |
| 190 } | |
| 191 | |
| 192 void DataSinkReceiver::ClearError() { | |
| 193 current_error_ = 0; | |
| 194 } | |
| 195 | |
| 196 void DataSinkReceiver::DispatchFatalError() { | |
| 197 if (shut_down_) | |
| 198 return; | |
| 199 | |
| 200 ShutDown(); | |
| 201 if (!error_callback_.is_null()) | |
| 202 error_callback_.Run(); | |
| 203 } | |
| 204 | |
| 205 DataSinkReceiver::Buffer::Buffer(scoped_refptr<DataSinkReceiver> receiver, | |
| 206 const char* buffer, | |
| 207 uint32_t buffer_size) | |
| 208 : receiver_(receiver), | |
| 209 buffer_(buffer), | |
| 210 buffer_size_(buffer_size), | |
| 211 cancelled_(false), | |
| 212 cancellation_error_(0) { | |
| 213 } | |
| 214 | |
| 215 DataSinkReceiver::Buffer::~Buffer() { | |
| 216 if (!receiver_.get()) | |
| 217 return; | |
| 218 if (cancelled_) | |
| 219 receiver_->DoneWithError(0, cancellation_error_); | |
| 220 else | |
| 221 receiver_->Done(0); | |
| 222 } | |
| 223 | |
| 224 void DataSinkReceiver::Buffer::Cancel(int32_t error) { | |
| 225 cancelled_ = true; | |
| 226 cancellation_error_ = error; | |
| 227 } | |
| 228 | |
| 229 const char* DataSinkReceiver::Buffer::GetData() { | |
| 230 return buffer_; | |
| 231 } | |
| 232 | |
| 233 uint32_t DataSinkReceiver::Buffer::GetSize() { | |
| 234 return buffer_size_; | |
| 235 } | |
| 236 | |
| 237 void DataSinkReceiver::Buffer::Done(uint32_t bytes_read) { | |
| 238 scoped_refptr<DataSinkReceiver> receiver = receiver_; | |
| 239 receiver_ = nullptr; | |
| 240 if (cancelled_) | |
| 241 receiver->DoneWithError(bytes_read, cancellation_error_); | |
| 242 else | |
| 243 receiver->Done(bytes_read); | |
| 244 buffer_ = NULL; | |
| 245 buffer_size_ = 0; | |
| 246 } | |
| 247 | |
| 248 void DataSinkReceiver::Buffer::DoneWithError(uint32_t bytes_read, | |
| 249 int32_t error) { | |
| 250 scoped_refptr<DataSinkReceiver> receiver = receiver_; | |
| 251 receiver_ = nullptr; | |
| 252 receiver->DoneWithError(bytes_read, error); | |
| 253 buffer_ = NULL; | |
| 254 buffer_size_ = 0; | |
| 255 } | |
| 256 | |
| 257 DataSinkReceiver::DataFrame::DataFrame( | |
| 258 mojo::Array<uint8_t> data, | |
| 259 const serial::DataSink::OnDataCallback& callback) | |
| 260 : data_(std::move(data)), offset_(0), callback_(callback) { | |
| 261 DCHECK_LT(0u, data_.size()); | |
| 262 } | |
| 263 | |
| 264 // Returns the number of uncomsumed bytes remaining of this data frame. | |
| 265 uint32_t DataSinkReceiver::DataFrame::GetRemainingBytes() { | |
| 266 return static_cast<uint32_t>(data_.size() - offset_); | |
| 267 } | |
| 268 | |
| 269 // Returns a pointer to the remaining data to be consumed. | |
| 270 const char* DataSinkReceiver::DataFrame::GetData() { | |
| 271 DCHECK_LT(offset_, data_.size()); | |
| 272 return reinterpret_cast<const char*>(&data_[0]) + offset_; | |
| 273 } | |
| 274 | |
| 275 void DataSinkReceiver::DataFrame::OnDataConsumed(uint32_t bytes_read) { | |
| 276 offset_ += bytes_read; | |
| 277 DCHECK_LE(offset_, data_.size()); | |
| 278 if (offset_ == data_.size()) | |
| 279 callback_.Run(offset_, 0); | |
| 280 } | |
| 281 void DataSinkReceiver::DataFrame::ReportError(uint32_t bytes_read, | |
| 282 int32_t error) { | |
| 283 offset_ += bytes_read; | |
| 284 DCHECK_LE(offset_, data_.size()); | |
| 285 callback_.Run(offset_, error); | |
| 286 } | |
| 287 | |
| 288 } // namespace device | |
| OLD | NEW |