| OLD | NEW |
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "device/serial/data_receiver.h" | 5 #include "device/serial/data_receiver.h" |
| 6 | 6 |
| 7 #include <limits> | 7 #include <limits> |
| 8 | 8 |
| 9 #include "base/bind.h" | 9 #include "base/bind.h" |
| 10 #include "base/message_loop/message_loop.h" | 10 #include "base/message_loop/message_loop.h" |
| 11 #include "device/serial/async_waiter.h" | |
| 12 | 11 |
| 13 namespace device { | 12 namespace device { |
| 14 | 13 |
| 15 // Represents a receive that is not yet fulfilled. | 14 // Represents a receive that is not yet fulfilled. |
| 16 class DataReceiver::PendingReceive { | 15 class DataReceiver::PendingReceive { |
| 17 public: | 16 public: |
| 18 PendingReceive(DataReceiver* receiver, | 17 PendingReceive(DataReceiver* receiver, |
| 19 const ReceiveDataCallback& callback, | 18 const ReceiveDataCallback& callback, |
| 20 const ReceiveErrorCallback& error_callback, | 19 const ReceiveErrorCallback& error_callback, |
| 21 int32_t fatal_error_value); | 20 int32_t fatal_error_value); |
| 22 | 21 |
| 23 // Dispatches |data| to |receive_callback_|. | 22 // Dispatches |data| to |receive_callback_|. Returns whether this |
| 24 void DispatchData(const void* data, uint32_t num_bytes); | 23 // PendingReceive is finished by this call. |
| 25 | 24 bool DispatchDataFrame(DataReceiver::DataFrame* data); |
| 26 // Reports |error| to |receive_error_callback_| if it is an appropriate time. | |
| 27 // Returns whether it dispatched |error|. | |
| 28 bool DispatchError(DataReceiver::PendingError* error, | |
| 29 uint32_t bytes_received); | |
| 30 | 25 |
| 31 // Reports |fatal_error_value_| to |receive_error_callback_|. | 26 // Reports |fatal_error_value_| to |receive_error_callback_|. |
| 32 void DispatchFatalError(); | 27 void DispatchFatalError(); |
| 33 | 28 |
| 29 bool buffer_in_use() { return buffer_in_use_; } |
| 30 |
| 34 private: | 31 private: |
| 35 class Buffer; | 32 class Buffer; |
| 36 | 33 |
| 37 // Invoked when the user is finished with the ReadOnlyBuffer provided to | 34 // Invoked when the user is finished with the ReadOnlyBuffer provided to |
| 38 // |receive_callback_|. | 35 // |receive_callback_|. |
| 39 void Done(uint32_t num_bytes); | 36 void Done(uint32_t num_bytes); |
| 40 | 37 |
| 41 // The DataReceiver that owns this. | 38 // The DataReceiver that owns this. |
| 42 DataReceiver* receiver_; | 39 DataReceiver* receiver_; |
| 43 | 40 |
| 44 // The callback to dispatch data. | 41 // The callback to dispatch data. |
| 45 ReceiveDataCallback receive_callback_; | 42 ReceiveDataCallback receive_callback_; |
| 46 | 43 |
| 47 // The callback to report errors. | 44 // The callback to report errors. |
| 48 ReceiveErrorCallback receive_error_callback_; | 45 ReceiveErrorCallback receive_error_callback_; |
| 49 | 46 |
| 50 // The error value to report when DispatchFatalError() is called. | 47 // The error value to report when DispatchFatalError() is called. |
| 51 const int32_t fatal_error_value_; | 48 const int32_t fatal_error_value_; |
| 52 | 49 |
| 53 // True if the user owns a buffer passed to |receive_callback_| as part of | 50 // True if the user owns a buffer passed to |receive_callback_| as part of |
| 54 // DispatchData(). | 51 // DispatchDataFrame(). |
| 55 bool buffer_in_use_; | 52 bool buffer_in_use_; |
| 56 }; | 53 }; |
| 57 | 54 |
| 58 // A ReadOnlyBuffer implementation that provides a view of a data pipe owned by | 55 // A ReadOnlyBuffer implementation that provides a view of a buffer owned by a |
| 59 // a DataReceiver. | 56 // DataReceiver. |
| 60 class DataReceiver::PendingReceive::Buffer : public ReadOnlyBuffer { | 57 class DataReceiver::PendingReceive::Buffer : public ReadOnlyBuffer { |
| 61 public: | 58 public: |
| 62 Buffer(scoped_refptr<DataReceiver> pipe, | 59 Buffer(scoped_refptr<DataReceiver> pipe, |
| 63 PendingReceive* receive, | 60 PendingReceive* receive, |
| 64 const char* buffer, | 61 const char* buffer, |
| 65 uint32_t buffer_size); | 62 uint32_t buffer_size); |
| 66 ~Buffer() override; | 63 ~Buffer() override; |
| 67 | 64 |
| 68 // ReadOnlyBuffer overrides. | 65 // ReadOnlyBuffer overrides. |
| 69 const char* GetData() override; | 66 const char* GetData() override; |
| 70 uint32_t GetSize() override; | 67 uint32_t GetSize() override; |
| 71 void Done(uint32_t bytes_consumed) override; | 68 void Done(uint32_t bytes_consumed) override; |
| 72 void DoneWithError(uint32_t bytes_consumed, int32_t error) override; | 69 void DoneWithError(uint32_t bytes_consumed, int32_t error) override; |
| 73 | 70 |
| 74 private: | 71 private: |
| 75 // The DataReceiver whose data pipe we are providing a view. | 72 // The DataReceiver of whose buffer we are providing a view. |
| 76 scoped_refptr<DataReceiver> receiver_; | 73 scoped_refptr<DataReceiver> receiver_; |
| 77 | 74 |
| 78 // The PendingReceive to which this buffer has been created in response. | 75 // The PendingReceive to which this buffer has been created in response. |
| 79 PendingReceive* pending_receive_; | 76 PendingReceive* pending_receive_; |
| 80 | 77 |
| 81 const char* buffer_; | 78 const char* buffer_; |
| 82 uint32_t buffer_size_; | 79 uint32_t buffer_size_; |
| 83 }; | 80 }; |
| 84 | 81 |
| 85 // Represents an error received from the DataSource. | 82 // A buffer of data or an error received from the DataSource. |
| 86 struct DataReceiver::PendingError { | 83 struct DataReceiver::DataFrame { |
| 87 PendingError(uint32_t offset, int32_t error) | 84 explicit DataFrame(mojo::Array<uint8_t> data) |
| 88 : offset(offset), error(error), dispatched(false) {} | 85 : is_error(false), |
| 86 data(data.Pass()), |
| 87 offset(0), |
| 88 error(0), |
| 89 dispatched(false) {} |
| 89 | 90 |
| 90 // The location within the data stream where the error occurred. | 91 explicit DataFrame(int32_t error) |
| 91 const uint32_t offset; | 92 : is_error(true), offset(0), error(error), dispatched(false) {} |
| 93 |
| 94 // Whether this DataFrame represents an error. |
| 95 bool is_error; |
| 96 |
| 97 // The data received from the DataSource. |
| 98 mojo::Array<uint8_t> data; |
| 99 |
| 100 // The offset within |data| at which the next read should begin. |
| 101 uint32_t offset; |
| 92 | 102 |
| 93 // The value of the error that occurred. | 103 // The value of the error that occurred. |
| 94 const int32_t error; | 104 const int32_t error; |
| 95 | 105 |
| 96 // Whether the error has been dispatched to the user. | 106 // Whether the error has been dispatched to the user. |
| 97 bool dispatched; | 107 bool dispatched; |
| 98 }; | 108 }; |
| 99 | 109 |
| 100 DataReceiver::DataReceiver(mojo::InterfacePtr<serial::DataSource> source, | 110 DataReceiver::DataReceiver(mojo::InterfacePtr<serial::DataSource> source, |
| 101 uint32_t buffer_size, | 111 uint32_t buffer_size, |
| 102 int32_t fatal_error_value) | 112 int32_t fatal_error_value) |
| 103 : source_(source.Pass()), | 113 : source_(source.Pass()), |
| 104 fatal_error_value_(fatal_error_value), | 114 fatal_error_value_(fatal_error_value), |
| 105 bytes_received_(0), | |
| 106 shut_down_(false), | 115 shut_down_(false), |
| 107 weak_factory_(this) { | 116 weak_factory_(this) { |
| 108 MojoCreateDataPipeOptions options = { | |
| 109 sizeof(options), MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1, buffer_size, | |
| 110 }; | |
| 111 mojo::ScopedDataPipeProducerHandle remote_handle; | |
| 112 MojoResult result = mojo::CreateDataPipe(&options, &remote_handle, &handle_); | |
| 113 DCHECK_EQ(MOJO_RESULT_OK, result); | |
| 114 source_->Init(remote_handle.Pass()); | |
| 115 source_.set_client(this); | 117 source_.set_client(this); |
| 118 source_.set_error_handler(this); |
| 119 source_->Init(buffer_size); |
| 116 } | 120 } |
| 117 | 121 |
| 118 bool DataReceiver::Receive(const ReceiveDataCallback& callback, | 122 bool DataReceiver::Receive(const ReceiveDataCallback& callback, |
| 119 const ReceiveErrorCallback& error_callback) { | 123 const ReceiveErrorCallback& error_callback) { |
| 120 DCHECK(!callback.is_null() && !error_callback.is_null()); | 124 DCHECK(!callback.is_null() && !error_callback.is_null()); |
| 121 if (pending_receive_ || shut_down_) | 125 if (pending_receive_ || shut_down_) |
| 122 return false; | 126 return false; |
| 123 // When the DataSource encounters an error, it pauses transmission. When the | 127 // When the DataSource encounters an error, it pauses transmission. When the |
| 124 // user starts a new receive following notification of the error (via | 128 // user starts a new receive following notification of the error (via |
| 125 // |error_callback| of the previous Receive call) of the error we can tell the | 129 // |error_callback| of the previous Receive call) of the error we can tell the |
| 126 // DataSource to resume transmission of data. | 130 // DataSource to resume transmission of data. |
| 127 if (pending_error_ && pending_error_->dispatched) { | 131 if (!pending_data_frames_.empty() && pending_data_frames_.front()->is_error && |
| 132 pending_data_frames_.front()->dispatched) { |
| 128 source_->Resume(); | 133 source_->Resume(); |
| 129 pending_error_.reset(); | 134 pending_data_frames_.pop(); |
| 130 } | 135 } |
| 131 | 136 |
| 132 pending_receive_.reset( | 137 pending_receive_.reset( |
| 133 new PendingReceive(this, callback, error_callback, fatal_error_value_)); | 138 new PendingReceive(this, callback, error_callback, fatal_error_value_)); |
| 134 base::MessageLoop::current()->PostTask( | 139 ReceiveInternal(); |
| 135 FROM_HERE, | |
| 136 base::Bind(&DataReceiver::ReceiveInternal, weak_factory_.GetWeakPtr())); | |
| 137 return true; | 140 return true; |
| 138 } | 141 } |
| 139 | 142 |
| 140 DataReceiver::~DataReceiver() { | 143 DataReceiver::~DataReceiver() { |
| 141 ShutDown(); | 144 ShutDown(); |
| 142 } | 145 } |
| 143 | 146 |
| 144 void DataReceiver::OnError(uint32_t offset, int32_t error) { | 147 void DataReceiver::OnError(int32_t error) { |
| 145 if (shut_down_) | 148 if (shut_down_) |
| 146 return; | 149 return; |
| 147 | 150 |
| 148 if (pending_error_) { | 151 pending_data_frames_.push(linked_ptr<DataFrame>(new DataFrame(error))); |
| 149 // When OnError is called by the DataSource, transmission of data is | 152 if (pending_receive_) |
| 150 // suspended. Thus we shouldn't receive another call to OnError until we | 153 ReceiveInternal(); |
| 151 // have fully dealt with the error and called Resume to resume transmission | 154 } |
| 152 // (see Receive()). Under normal operation we should never get here, but if | 155 |
| 153 // we do (e.g. in the case of a hijacked service process) just shut down. | 156 void DataReceiver::OnData(mojo::Array<uint8_t> data) { |
| 154 ShutDown(); | 157 pending_data_frames_.push(linked_ptr<DataFrame>(new DataFrame(data.Pass()))); |
| 155 return; | 158 if (pending_receive_) |
| 156 } | 159 ReceiveInternal(); |
| 157 pending_error_.reset(new PendingError(offset, error)); | |
| 158 if (pending_receive_ && | |
| 159 pending_receive_->DispatchError(pending_error_.get(), bytes_received_)) { | |
| 160 pending_receive_.reset(); | |
| 161 waiter_.reset(); | |
| 162 } | |
| 163 } | 160 } |
| 164 | 161 |
| 165 void DataReceiver::OnConnectionError() { | 162 void DataReceiver::OnConnectionError() { |
| 166 ShutDown(); | 163 ShutDown(); |
| 167 } | 164 } |
| 168 | 165 |
| 169 void DataReceiver::Done(uint32_t bytes_consumed) { | 166 void DataReceiver::Done(uint32_t bytes_consumed) { |
| 170 if (shut_down_) | 167 if (shut_down_) |
| 171 return; | 168 return; |
| 172 | 169 |
| 173 DCHECK(pending_receive_); | 170 DCHECK(pending_receive_); |
| 174 MojoResult result = mojo::EndReadDataRaw(handle_.get(), bytes_consumed); | 171 DataFrame& pending_data = *pending_data_frames_.front(); |
| 175 DCHECK_EQ(MOJO_RESULT_OK, result); | 172 pending_data.offset += bytes_consumed; |
| 173 DCHECK_LE(pending_data.offset, pending_data.data.size()); |
| 174 if (pending_data.offset == pending_data.data.size()) { |
| 175 source_->ReportBytesReceived( |
| 176 static_cast<uint32_t>(pending_data.data.size())); |
| 177 pending_data_frames_.pop(); |
| 178 } |
| 176 pending_receive_.reset(); | 179 pending_receive_.reset(); |
| 177 bytes_received_ += bytes_consumed; | |
| 178 } | |
| 179 | |
| 180 void DataReceiver::OnDoneWaiting(MojoResult result) { | |
| 181 DCHECK(pending_receive_ && !shut_down_ && waiter_); | |
| 182 waiter_.reset(); | |
| 183 if (result != MOJO_RESULT_OK) { | |
| 184 ShutDown(); | |
| 185 return; | |
| 186 } | |
| 187 ReceiveInternal(); | |
| 188 } | 180 } |
| 189 | 181 |
| 190 void DataReceiver::ReceiveInternal() { | 182 void DataReceiver::ReceiveInternal() { |
| 191 if (shut_down_) | 183 if (shut_down_) |
| 192 return; | 184 return; |
| 193 DCHECK(pending_receive_); | 185 DCHECK(pending_receive_); |
| 194 if (pending_error_ && | 186 if (pending_receive_->buffer_in_use()) |
| 195 pending_receive_->DispatchError(pending_error_.get(), bytes_received_)) { | 187 return; |
| 188 |
| 189 if (!pending_data_frames_.empty() && |
| 190 pending_receive_->DispatchDataFrame(pending_data_frames_.front().get())) { |
| 196 pending_receive_.reset(); | 191 pending_receive_.reset(); |
| 197 waiter_.reset(); | |
| 198 return; | |
| 199 } | 192 } |
| 200 | |
| 201 const void* data; | |
| 202 uint32_t num_bytes = std::numeric_limits<uint32_t>::max(); | |
| 203 MojoResult result = mojo::BeginReadDataRaw( | |
| 204 handle_.get(), &data, &num_bytes, MOJO_READ_DATA_FLAG_NONE); | |
| 205 if (result == MOJO_RESULT_OK) { | |
| 206 if (!CheckErrorNotInReadRange(num_bytes)) { | |
| 207 ShutDown(); | |
| 208 return; | |
| 209 } | |
| 210 | |
| 211 pending_receive_->DispatchData(data, num_bytes); | |
| 212 return; | |
| 213 } | |
| 214 if (result == MOJO_RESULT_SHOULD_WAIT) { | |
| 215 waiter_.reset(new AsyncWaiter( | |
| 216 handle_.get(), | |
| 217 MOJO_HANDLE_SIGNAL_READABLE, | |
| 218 base::Bind(&DataReceiver::OnDoneWaiting, weak_factory_.GetWeakPtr()))); | |
| 219 return; | |
| 220 } | |
| 221 ShutDown(); | |
| 222 } | |
| 223 | |
| 224 bool DataReceiver::CheckErrorNotInReadRange(uint32_t num_bytes) { | |
| 225 DCHECK(pending_receive_); | |
| 226 if (!pending_error_) | |
| 227 return true; | |
| 228 | |
| 229 DCHECK_NE(bytes_received_, pending_error_->offset); | |
| 230 DCHECK_NE(num_bytes, 0u); | |
| 231 uint32_t potential_bytes_received = bytes_received_ + num_bytes; | |
| 232 // bytes_received_ can overflow so we must consider two cases: | |
| 233 // 1. Both |bytes_received_| and |pending_error_->offset| have overflowed an | |
| 234 // equal number of times. In this case, |potential_bytes_received| must | |
| 235 // be in the range (|bytes_received|, |pending_error_->offset|]. Below | |
| 236 // this range can only occur if |bytes_received_| overflows before | |
| 237 // |pending_error_->offset|. Above can only occur if |bytes_received_| | |
| 238 // overtakes |pending_error_->offset|. | |
| 239 // 2. |pending_error_->offset| has overflowed once more than | |
| 240 // |bytes_received_|. In this case, |potential_bytes_received| must not | |
| 241 // be in the range (|pending_error_->offset|, |bytes_received_|]. | |
| 242 if ((bytes_received_ < pending_error_->offset && | |
| 243 (potential_bytes_received > pending_error_->offset || | |
| 244 potential_bytes_received <= bytes_received_)) || | |
| 245 (bytes_received_ > pending_error_->offset && | |
| 246 potential_bytes_received > pending_error_->offset && | |
| 247 potential_bytes_received <= bytes_received_)) { | |
| 248 return false; | |
| 249 } | |
| 250 return true; | |
| 251 } | 193 } |
| 252 | 194 |
| 253 void DataReceiver::ShutDown() { | 195 void DataReceiver::ShutDown() { |
| 254 shut_down_ = true; | 196 shut_down_ = true; |
| 255 if (pending_receive_) | 197 if (pending_receive_) |
| 256 pending_receive_->DispatchFatalError(); | 198 pending_receive_->DispatchFatalError(); |
| 257 pending_error_.reset(); | |
| 258 waiter_.reset(); | |
| 259 } | 199 } |
| 260 | 200 |
| 261 DataReceiver::PendingReceive::PendingReceive( | 201 DataReceiver::PendingReceive::PendingReceive( |
| 262 DataReceiver* receiver, | 202 DataReceiver* receiver, |
| 263 const ReceiveDataCallback& callback, | 203 const ReceiveDataCallback& callback, |
| 264 const ReceiveErrorCallback& error_callback, | 204 const ReceiveErrorCallback& error_callback, |
| 265 int32_t fatal_error_value) | 205 int32_t fatal_error_value) |
| 266 : receiver_(receiver), | 206 : receiver_(receiver), |
| 267 receive_callback_(callback), | 207 receive_callback_(callback), |
| 268 receive_error_callback_(error_callback), | 208 receive_error_callback_(error_callback), |
| 269 fatal_error_value_(fatal_error_value), | 209 fatal_error_value_(fatal_error_value), |
| 270 buffer_in_use_(false) { | 210 buffer_in_use_(false) { |
| 271 } | 211 } |
| 272 | 212 |
| 273 void DataReceiver::PendingReceive::DispatchData(const void* data, | 213 bool DataReceiver::PendingReceive::DispatchDataFrame( |
| 274 uint32_t num_bytes) { | 214 DataReceiver::DataFrame* data) { |
| 275 DCHECK(!buffer_in_use_); | 215 DCHECK(!buffer_in_use_); |
| 216 DCHECK(!data->dispatched); |
| 217 |
| 218 if (data->is_error) { |
| 219 data->dispatched = true; |
| 220 base::MessageLoop::current()->PostTask( |
| 221 FROM_HERE, base::Bind(receive_error_callback_, data->error)); |
| 222 return true; |
| 223 } |
| 276 buffer_in_use_ = true; | 224 buffer_in_use_ = true; |
| 277 receive_callback_.Run(scoped_ptr<ReadOnlyBuffer>( | 225 base::MessageLoop::current()->PostTask( |
| 278 new Buffer(receiver_, this, static_cast<const char*>(data), num_bytes))); | 226 FROM_HERE, |
| 279 } | 227 base::Bind( |
| 280 | 228 receive_callback_, |
| 281 bool DataReceiver::PendingReceive::DispatchError(PendingError* error, | 229 base::Passed(scoped_ptr<ReadOnlyBuffer>(new Buffer( |
| 282 uint32_t bytes_received) { | 230 receiver_, |
| 283 DCHECK(!error->dispatched); | 231 this, |
| 284 if (buffer_in_use_ || bytes_received != error->offset) | 232 reinterpret_cast<char*>(&data->data[0]) + data->offset, |
| 285 return false; | 233 static_cast<uint32_t>(data->data.size() - data->offset)))))); |
| 286 | 234 return false; |
| 287 error->dispatched = true; | |
| 288 receive_error_callback_.Run(error->error); | |
| 289 return true; | |
| 290 } | 235 } |
| 291 | 236 |
| 292 void DataReceiver::PendingReceive::DispatchFatalError() { | 237 void DataReceiver::PendingReceive::DispatchFatalError() { |
| 293 receive_error_callback_.Run(fatal_error_value_); | 238 receive_error_callback_.Run(fatal_error_value_); |
| 294 } | 239 } |
| 295 | 240 |
| 296 void DataReceiver::PendingReceive::Done(uint32_t bytes_consumed) { | 241 void DataReceiver::PendingReceive::Done(uint32_t bytes_consumed) { |
| 297 DCHECK(buffer_in_use_); | 242 DCHECK(buffer_in_use_); |
| 298 buffer_in_use_ = false; | 243 buffer_in_use_ = false; |
| 299 receiver_->Done(bytes_consumed); | 244 receiver_->Done(bytes_consumed); |
| (...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 331 buffer_size_ = 0; | 276 buffer_size_ = 0; |
| 332 } | 277 } |
| 333 | 278 |
| 334 void DataReceiver::PendingReceive::Buffer::DoneWithError( | 279 void DataReceiver::PendingReceive::Buffer::DoneWithError( |
| 335 uint32_t bytes_consumed, | 280 uint32_t bytes_consumed, |
| 336 int32_t error) { | 281 int32_t error) { |
| 337 Done(bytes_consumed); | 282 Done(bytes_consumed); |
| 338 } | 283 } |
| 339 | 284 |
| 340 } // namespace device | 285 } // namespace device |
| OLD | NEW |