| OLD | NEW |
| (Empty) |
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #include "device/serial/data_receiver.h" | |
| 6 | |
| 7 #include <limits> | |
| 8 #include <memory> | |
| 9 #include <utility> | |
| 10 | |
| 11 #include "base/bind.h" | |
| 12 #include "base/location.h" | |
| 13 #include "base/single_thread_task_runner.h" | |
| 14 #include "base/threading/thread_task_runner_handle.h" | |
| 15 | |
| 16 namespace device { | |
| 17 | |
| 18 // Represents a receive that is not yet fulfilled. | |
| 19 class DataReceiver::PendingReceive { | |
| 20 public: | |
| 21 PendingReceive(DataReceiver* receiver, | |
| 22 const ReceiveDataCallback& callback, | |
| 23 const ReceiveErrorCallback& error_callback, | |
| 24 int32_t fatal_error_value); | |
| 25 | |
| 26 // Dispatches |data| to |receive_callback_|. Returns whether this | |
| 27 // PendingReceive is finished by this call. | |
| 28 bool DispatchDataFrame(DataReceiver::DataFrame* data); | |
| 29 | |
| 30 // Reports |fatal_error_value_| to |receive_error_callback_|. | |
| 31 void DispatchFatalError(); | |
| 32 | |
| 33 bool buffer_in_use() { return buffer_in_use_; } | |
| 34 | |
| 35 private: | |
| 36 class Buffer; | |
| 37 | |
| 38 // Invoked when the user is finished with the ReadOnlyBuffer provided to | |
| 39 // |receive_callback_|. | |
| 40 void Done(uint32_t num_bytes); | |
| 41 | |
| 42 // The DataReceiver that owns this. | |
| 43 DataReceiver* receiver_; | |
| 44 | |
| 45 // The callback to dispatch data. | |
| 46 ReceiveDataCallback receive_callback_; | |
| 47 | |
| 48 // The callback to report errors. | |
| 49 ReceiveErrorCallback receive_error_callback_; | |
| 50 | |
| 51 // The error value to report when DispatchFatalError() is called. | |
| 52 const int32_t fatal_error_value_; | |
| 53 | |
| 54 // True if the user owns a buffer passed to |receive_callback_| as part of | |
| 55 // DispatchDataFrame(). | |
| 56 bool buffer_in_use_; | |
| 57 }; | |
| 58 | |
| 59 // A ReadOnlyBuffer implementation that provides a view of a buffer owned by a | |
| 60 // DataReceiver. | |
| 61 class DataReceiver::PendingReceive::Buffer : public ReadOnlyBuffer { | |
| 62 public: | |
| 63 Buffer(scoped_refptr<DataReceiver> pipe, | |
| 64 PendingReceive* receive, | |
| 65 const char* buffer, | |
| 66 uint32_t buffer_size); | |
| 67 ~Buffer() override; | |
| 68 | |
| 69 // ReadOnlyBuffer overrides. | |
| 70 const char* GetData() override; | |
| 71 uint32_t GetSize() override; | |
| 72 void Done(uint32_t bytes_consumed) override; | |
| 73 void DoneWithError(uint32_t bytes_consumed, int32_t error) override; | |
| 74 | |
| 75 private: | |
| 76 // The DataReceiver of whose buffer we are providing a view. | |
| 77 scoped_refptr<DataReceiver> receiver_; | |
| 78 | |
| 79 // The PendingReceive to which this buffer has been created in response. | |
| 80 PendingReceive* pending_receive_; | |
| 81 | |
| 82 const char* buffer_; | |
| 83 uint32_t buffer_size_; | |
| 84 }; | |
| 85 | |
| 86 // A buffer of data or an error received from the DataSource. | |
| 87 struct DataReceiver::DataFrame { | |
| 88 explicit DataFrame(mojo::Array<uint8_t> data) | |
| 89 : is_error(false), | |
| 90 data(std::move(data)), | |
| 91 offset(0), | |
| 92 error(0), | |
| 93 dispatched(false) {} | |
| 94 | |
| 95 explicit DataFrame(int32_t error) | |
| 96 : is_error(true), offset(0), error(error), dispatched(false) {} | |
| 97 | |
| 98 // Whether this DataFrame represents an error. | |
| 99 bool is_error; | |
| 100 | |
| 101 // The data received from the DataSource. | |
| 102 mojo::Array<uint8_t> data; | |
| 103 | |
| 104 // The offset within |data| at which the next read should begin. | |
| 105 uint32_t offset; | |
| 106 | |
| 107 // The value of the error that occurred. | |
| 108 const int32_t error; | |
| 109 | |
| 110 // Whether the error has been dispatched to the user. | |
| 111 bool dispatched; | |
| 112 }; | |
| 113 | |
| 114 DataReceiver::DataReceiver( | |
| 115 mojo::InterfacePtr<serial::DataSource> source, | |
| 116 mojo::InterfaceRequest<serial::DataSourceClient> client, | |
| 117 uint32_t buffer_size, | |
| 118 int32_t fatal_error_value) | |
| 119 : source_(std::move(source)), | |
| 120 client_(this, std::move(client)), | |
| 121 fatal_error_value_(fatal_error_value), | |
| 122 shut_down_(false), | |
| 123 weak_factory_(this) { | |
| 124 source_.set_connection_error_handler( | |
| 125 base::Bind(&DataReceiver::OnConnectionError, base::Unretained(this))); | |
| 126 source_->Init(buffer_size); | |
| 127 client_.set_connection_error_handler( | |
| 128 base::Bind(&DataReceiver::OnConnectionError, base::Unretained(this))); | |
| 129 } | |
| 130 | |
| 131 bool DataReceiver::Receive(const ReceiveDataCallback& callback, | |
| 132 const ReceiveErrorCallback& error_callback) { | |
| 133 DCHECK(!callback.is_null() && !error_callback.is_null()); | |
| 134 if (pending_receive_ || shut_down_) | |
| 135 return false; | |
| 136 // When the DataSource encounters an error, it pauses transmission. When the | |
| 137 // user starts a new receive following notification of the error (via | |
| 138 // |error_callback| of the previous Receive call) of the error we can tell the | |
| 139 // DataSource to resume transmission of data. | |
| 140 if (!pending_data_frames_.empty() && pending_data_frames_.front()->is_error && | |
| 141 pending_data_frames_.front()->dispatched) { | |
| 142 source_->Resume(); | |
| 143 pending_data_frames_.pop(); | |
| 144 } | |
| 145 | |
| 146 pending_receive_.reset( | |
| 147 new PendingReceive(this, callback, error_callback, fatal_error_value_)); | |
| 148 ReceiveInternal(); | |
| 149 return true; | |
| 150 } | |
| 151 | |
| 152 DataReceiver::~DataReceiver() { | |
| 153 ShutDown(); | |
| 154 } | |
| 155 | |
| 156 void DataReceiver::OnError(int32_t error) { | |
| 157 if (shut_down_) | |
| 158 return; | |
| 159 | |
| 160 pending_data_frames_.push(linked_ptr<DataFrame>(new DataFrame(error))); | |
| 161 if (pending_receive_) | |
| 162 ReceiveInternal(); | |
| 163 } | |
| 164 | |
| 165 void DataReceiver::OnData(mojo::Array<uint8_t> data) { | |
| 166 pending_data_frames_.push( | |
| 167 linked_ptr<DataFrame>(new DataFrame(std::move(data)))); | |
| 168 if (pending_receive_) | |
| 169 ReceiveInternal(); | |
| 170 } | |
| 171 | |
| 172 void DataReceiver::OnConnectionError() { | |
| 173 ShutDown(); | |
| 174 } | |
| 175 | |
| 176 void DataReceiver::Done(uint32_t bytes_consumed) { | |
| 177 if (shut_down_) | |
| 178 return; | |
| 179 | |
| 180 DCHECK(pending_receive_); | |
| 181 DataFrame& pending_data = *pending_data_frames_.front(); | |
| 182 pending_data.offset += bytes_consumed; | |
| 183 DCHECK_LE(pending_data.offset, pending_data.data.size()); | |
| 184 if (pending_data.offset == pending_data.data.size()) { | |
| 185 source_->ReportBytesReceived( | |
| 186 static_cast<uint32_t>(pending_data.data.size())); | |
| 187 pending_data_frames_.pop(); | |
| 188 } | |
| 189 pending_receive_.reset(); | |
| 190 } | |
| 191 | |
| 192 void DataReceiver::ReceiveInternal() { | |
| 193 if (shut_down_) | |
| 194 return; | |
| 195 DCHECK(pending_receive_); | |
| 196 if (pending_receive_->buffer_in_use()) | |
| 197 return; | |
| 198 | |
| 199 if (!pending_data_frames_.empty() && | |
| 200 pending_receive_->DispatchDataFrame(pending_data_frames_.front().get())) { | |
| 201 pending_receive_.reset(); | |
| 202 } | |
| 203 } | |
| 204 | |
| 205 void DataReceiver::ShutDown() { | |
| 206 shut_down_ = true; | |
| 207 if (pending_receive_) | |
| 208 pending_receive_->DispatchFatalError(); | |
| 209 } | |
| 210 | |
| 211 DataReceiver::PendingReceive::PendingReceive( | |
| 212 DataReceiver* receiver, | |
| 213 const ReceiveDataCallback& callback, | |
| 214 const ReceiveErrorCallback& error_callback, | |
| 215 int32_t fatal_error_value) | |
| 216 : receiver_(receiver), | |
| 217 receive_callback_(callback), | |
| 218 receive_error_callback_(error_callback), | |
| 219 fatal_error_value_(fatal_error_value), | |
| 220 buffer_in_use_(false) { | |
| 221 } | |
| 222 | |
| 223 bool DataReceiver::PendingReceive::DispatchDataFrame( | |
| 224 DataReceiver::DataFrame* data) { | |
| 225 DCHECK(!buffer_in_use_); | |
| 226 DCHECK(!data->dispatched); | |
| 227 | |
| 228 if (data->is_error) { | |
| 229 data->dispatched = true; | |
| 230 base::ThreadTaskRunnerHandle::Get()->PostTask( | |
| 231 FROM_HERE, base::Bind(receive_error_callback_, data->error)); | |
| 232 return true; | |
| 233 } | |
| 234 buffer_in_use_ = true; | |
| 235 base::ThreadTaskRunnerHandle::Get()->PostTask( | |
| 236 FROM_HERE, | |
| 237 base::Bind( | |
| 238 receive_callback_, | |
| 239 base::Passed(std::unique_ptr<ReadOnlyBuffer>(new Buffer( | |
| 240 receiver_, this, | |
| 241 reinterpret_cast<char*>(&data->data[0]) + data->offset, | |
| 242 static_cast<uint32_t>(data->data.size() - data->offset)))))); | |
| 243 return false; | |
| 244 } | |
| 245 | |
| 246 void DataReceiver::PendingReceive::DispatchFatalError() { | |
| 247 receive_error_callback_.Run(fatal_error_value_); | |
| 248 } | |
| 249 | |
| 250 void DataReceiver::PendingReceive::Done(uint32_t bytes_consumed) { | |
| 251 DCHECK(buffer_in_use_); | |
| 252 buffer_in_use_ = false; | |
| 253 receiver_->Done(bytes_consumed); | |
| 254 } | |
| 255 | |
| 256 DataReceiver::PendingReceive::Buffer::Buffer( | |
| 257 scoped_refptr<DataReceiver> receiver, | |
| 258 PendingReceive* receive, | |
| 259 const char* buffer, | |
| 260 uint32_t buffer_size) | |
| 261 : receiver_(receiver), | |
| 262 pending_receive_(receive), | |
| 263 buffer_(buffer), | |
| 264 buffer_size_(buffer_size) { | |
| 265 } | |
| 266 | |
| 267 DataReceiver::PendingReceive::Buffer::~Buffer() { | |
| 268 if (pending_receive_) | |
| 269 pending_receive_->Done(0); | |
| 270 } | |
| 271 | |
| 272 const char* DataReceiver::PendingReceive::Buffer::GetData() { | |
| 273 return buffer_; | |
| 274 } | |
| 275 | |
| 276 uint32_t DataReceiver::PendingReceive::Buffer::GetSize() { | |
| 277 return buffer_size_; | |
| 278 } | |
| 279 | |
| 280 void DataReceiver::PendingReceive::Buffer::Done(uint32_t bytes_consumed) { | |
| 281 pending_receive_->Done(bytes_consumed); | |
| 282 pending_receive_ = NULL; | |
| 283 receiver_ = NULL; | |
| 284 buffer_ = NULL; | |
| 285 buffer_size_ = 0; | |
| 286 } | |
| 287 | |
| 288 void DataReceiver::PendingReceive::Buffer::DoneWithError( | |
| 289 uint32_t bytes_consumed, | |
| 290 int32_t error) { | |
| 291 Done(bytes_consumed); | |
| 292 } | |
| 293 | |
| 294 } // namespace device | |
| OLD | NEW |