| OLD | NEW |
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "device/serial/data_sink_receiver.h" | 5 #include "device/serial/data_sink_receiver.h" |
| 6 | 6 |
| 7 #include <limits> | 7 #include <limits> |
| 8 | 8 |
| 9 #include "base/bind.h" | 9 #include "base/bind.h" |
| 10 #include "device/serial/async_waiter.h" | 10 #include "base/message_loop/message_loop.h" |
| 11 | 11 |
| 12 namespace device { | 12 namespace device { |
| 13 | 13 |
| 14 // Represents a flush of data that has not been completed. | 14 // A ReadOnlyBuffer implementation that provides a view of a buffer owned by a |
| 15 class DataSinkReceiver::PendingFlush { | 15 // DataSinkReceiver. |
| 16 public: | |
| 17 PendingFlush(); | |
| 18 | |
| 19 // Initializes this PendingFlush with |num_bytes|, the number of bytes to | |
| 20 // flush. | |
| 21 void SetNumBytesToFlush(uint32_t num_bytes); | |
| 22 | |
| 23 // Attempts to discard |bytes_to_flush_| bytes from |handle|. Returns | |
| 24 // MOJO_RESULT_OK on success, MOJO_RESULT_SHOULD_WAIT if fewer than | |
| 25 // |bytes_to_flush_| bytes were flushed or the error if one is encountered | |
| 26 // discarding data from |handle|. | |
| 27 MojoResult Flush(mojo::DataPipeConsumerHandle handle); | |
| 28 | |
| 29 // Whether this PendingFlush has received the number of bytes to flush. | |
| 30 bool received_flush() { return received_flush_; } | |
| 31 | |
| 32 private: | |
| 33 // Whether this PendingFlush has received the number of bytes to flush. | |
| 34 bool received_flush_; | |
| 35 | |
| 36 // The remaining number of bytes to flush. | |
| 37 uint32_t bytes_to_flush_; | |
| 38 }; | |
| 39 | |
| 40 // A ReadOnlyBuffer implementation that provides a view of a data pipe owned by | |
| 41 // a DataSinkReceiver. | |
| 42 class DataSinkReceiver::Buffer : public ReadOnlyBuffer { | 16 class DataSinkReceiver::Buffer : public ReadOnlyBuffer { |
| 43 public: | 17 public: |
| 44 Buffer(scoped_refptr<DataSinkReceiver> receiver, | 18 Buffer(scoped_refptr<DataSinkReceiver> receiver, |
| 45 const char* buffer, | 19 const char* buffer, |
| 46 uint32_t buffer_size); | 20 uint32_t buffer_size); |
| 47 ~Buffer() override; | 21 ~Buffer() override; |
| 48 | 22 |
| 49 void Cancel(int32_t error); | 23 void Cancel(int32_t error); |
| 50 | 24 |
| 51 // ReadOnlyBuffer overrides. | 25 // ReadOnlyBuffer overrides. |
| 52 const char* GetData() override; | 26 const char* GetData() override; |
| 53 uint32_t GetSize() override; | 27 uint32_t GetSize() override; |
| 54 void Done(uint32_t bytes_read) override; | 28 void Done(uint32_t bytes_read) override; |
| 55 void DoneWithError(uint32_t bytes_read, int32_t error) override; | 29 void DoneWithError(uint32_t bytes_read, int32_t error) override; |
| 56 | 30 |
| 57 private: | 31 private: |
| 58 // The DataSinkReceiver whose data pipe we are providing a view. | 32 // The DataSinkReceiver of whose buffer we are providing a view. |
| 59 scoped_refptr<DataSinkReceiver> receiver_; | 33 scoped_refptr<DataSinkReceiver> receiver_; |
| 60 | 34 |
| 61 const char* buffer_; | 35 const char* buffer_; |
| 62 uint32_t buffer_size_; | 36 uint32_t buffer_size_; |
| 63 | 37 |
| 64 // Whether this receive has been cancelled. | 38 // Whether this receive has been cancelled. |
| 65 bool cancelled_; | 39 bool cancelled_; |
| 66 | 40 |
| 67 // If |cancelled_|, contains the cancellation error to report. | 41 // If |cancelled_|, contains the cancellation error to report. |
| 68 int32_t cancellation_error_; | 42 int32_t cancellation_error_; |
| 69 }; | 43 }; |
| 70 | 44 |
| 45 // A frame of data received from the client. |
| 46 class DataSinkReceiver::DataFrame { |
| 47 public: |
| 48 explicit DataFrame(mojo::Array<uint8_t> data); |
| 49 |
| 50 // Returns the number of uncomsumed bytes remaining of this data frame. |
| 51 uint32_t GetRemainingBytes(); |
| 52 |
| 53 // Returns a pointer to the remaining data to be consumed. |
| 54 const char* GetData(); |
| 55 |
| 56 // Reports that |bytes_read| bytes have been consumed. |
| 57 void OnDataConsumed(uint32_t bytes_read); |
| 58 |
| 59 private: |
| 60 mojo::Array<uint8_t> data_; |
| 61 uint32_t offset_; |
| 62 }; |
| 63 |
| 71 DataSinkReceiver::DataSinkReceiver(const ReadyCallback& ready_callback, | 64 DataSinkReceiver::DataSinkReceiver(const ReadyCallback& ready_callback, |
| 72 const CancelCallback& cancel_callback, | 65 const CancelCallback& cancel_callback, |
| 73 const ErrorCallback& error_callback) | 66 const ErrorCallback& error_callback) |
| 74 : ready_callback_(ready_callback), | 67 : ready_callback_(ready_callback), |
| 75 cancel_callback_(cancel_callback), | 68 cancel_callback_(cancel_callback), |
| 76 error_callback_(error_callback), | 69 error_callback_(error_callback), |
| 70 flush_pending_(false), |
| 77 buffer_in_use_(NULL), | 71 buffer_in_use_(NULL), |
| 72 initialized_(false), |
| 73 available_buffer_capacity_(0), |
| 78 shut_down_(false), | 74 shut_down_(false), |
| 79 weak_factory_(this) { | 75 weak_factory_(this) { |
| 80 } | 76 } |
| 81 | 77 |
| 82 void DataSinkReceiver::ShutDown() { | 78 void DataSinkReceiver::ShutDown() { |
| 83 shut_down_ = true; | 79 shut_down_ = true; |
| 84 if (waiter_) | |
| 85 waiter_.reset(); | |
| 86 } | 80 } |
| 87 | 81 |
| 88 DataSinkReceiver::~DataSinkReceiver() { | 82 DataSinkReceiver::~DataSinkReceiver() { |
| 89 } | 83 } |
| 90 | 84 |
| 91 void DataSinkReceiver::Init(mojo::ScopedDataPipeConsumerHandle handle) { | 85 void DataSinkReceiver::Init(uint32_t buffer_size) { |
| 92 if (handle_.is_valid()) { | 86 if (initialized_) { |
| 93 DispatchFatalError(); | 87 ShutDown(); |
| 94 return; | 88 return; |
| 95 } | 89 } |
| 96 | 90 initialized_ = true; |
| 97 handle_ = handle.Pass(); | 91 available_buffer_capacity_ = buffer_size; |
| 98 StartWaiting(); | |
| 99 } | 92 } |
| 100 | 93 |
| 101 void DataSinkReceiver::Cancel(int32_t error) { | 94 void DataSinkReceiver::Cancel(int32_t error) { |
| 102 // If we have sent a ReportBytesSentAndError but have not received the | 95 // If we have sent a ReportBytesSentAndError but have not received the |
| 103 // response, that ReportBytesSentAndError message will appear to the | 96 // response, that ReportBytesSentAndError message will appear to the |
| 104 // DataSinkClient to be caused by this Cancel message. In that case, we ignore | 97 // DataSinkClient to be caused by this Cancel message. In that case, we ignore |
| 105 // the cancel. | 98 // the cancel. |
| 106 if (!pending_flushes_.empty() && !pending_flushes_.back()->received_flush()) | 99 if (flush_pending_) |
| 107 return; | 100 return; |
| 108 | 101 |
| 109 // If there is a buffer is in use, mark the buffer as cancelled and notify the | 102 // If there is a buffer is in use, mark the buffer as cancelled and notify the |
| 110 // client by calling |cancel_callback_|. The sink implementation may or may | 103 // client by calling |cancel_callback_|. The sink implementation may or may |
| 111 // not take the cancellation into account when deciding what error (if any) to | 104 // not take the cancellation into account when deciding what error (if any) to |
| 112 // return. If the sink returns an error, we ignore the cancellation error. | 105 // return. If the sink returns an error, we ignore the cancellation error. |
| 113 // Otherwise, if the sink does not report an error, we override that with the | 106 // Otherwise, if the sink does not report an error, we override that with the |
| 114 // cancellation error. Once a cancellation has been received, the next report | 107 // cancellation error. Once a cancellation has been received, the next report |
| 115 // sent to the client will always contain an error; the error returned by the | 108 // sent to the client will always contain an error; the error returned by the |
| 116 // sink or the cancellation error if the sink does not return an error. | 109 // sink or the cancellation error if the sink does not return an error. |
| 117 if (buffer_in_use_) { | 110 if (buffer_in_use_) { |
| 118 buffer_in_use_->Cancel(error); | 111 buffer_in_use_->Cancel(error); |
| 119 if (!cancel_callback_.is_null()) | 112 if (!cancel_callback_.is_null()) |
| 120 cancel_callback_.Run(error); | 113 cancel_callback_.Run(error); |
| 121 return; | 114 return; |
| 122 } | 115 } |
| 123 // If there is no buffer in use, immediately report the error and cancel the | |
| 124 // waiting for the data pipe if one exists. This transitions straight into the | |
| 125 // state after the sink has returned an error. | |
| 126 waiter_.reset(); | |
| 127 ReportBytesSentAndError(0, error); | 116 ReportBytesSentAndError(0, error); |
| 128 } | 117 } |
| 129 | 118 |
| 119 void DataSinkReceiver::OnData(mojo::Array<uint8_t> data) { |
| 120 if (!initialized_) { |
| 121 ShutDown(); |
| 122 return; |
| 123 } |
| 124 if (data.size() > available_buffer_capacity_) { |
| 125 ShutDown(); |
| 126 return; |
| 127 } |
| 128 available_buffer_capacity_ -= static_cast<uint32_t>(data.size()); |
| 129 pending_data_buffers_.push(linked_ptr<DataFrame>(new DataFrame(data.Pass()))); |
| 130 if (!buffer_in_use_ && !flush_pending_) |
| 131 RunReadyCallback(); |
| 132 } |
| 133 |
| 130 void DataSinkReceiver::OnConnectionError() { | 134 void DataSinkReceiver::OnConnectionError() { |
| 131 DispatchFatalError(); | 135 DispatchFatalError(); |
| 132 } | 136 } |
| 133 | 137 |
| 134 void DataSinkReceiver::StartWaiting() { | 138 void DataSinkReceiver::RunReadyCallback() { |
| 135 DCHECK(!waiter_ && !shut_down_); | 139 DCHECK(!shut_down_ && !flush_pending_); |
| 136 waiter_.reset( | 140 // If data arrives while a call to RunReadyCallback() is posted, we can be |
| 137 new AsyncWaiter(handle_.get(), | 141 // called with buffer_in_use_ already set. |
| 138 MOJO_HANDLE_SIGNAL_READABLE, | 142 if (buffer_in_use_) |
| 139 base::Bind(&DataSinkReceiver::OnDoneWaiting, this))); | |
| 140 } | |
| 141 | |
| 142 void DataSinkReceiver::OnDoneWaiting(MojoResult result) { | |
| 143 DCHECK(waiter_ && !shut_down_); | |
| 144 waiter_.reset(); | |
| 145 if (result != MOJO_RESULT_OK) { | |
| 146 DispatchFatalError(); | |
| 147 return; | 143 return; |
| 148 } | 144 buffer_in_use_ = |
| 149 // If there are any queued flushes (from ReportBytesSentAndError()), let them | 145 new Buffer(this, |
| 150 // flush data from the data pipe. | 146 pending_data_buffers_.front()->GetData(), |
| 151 if (!pending_flushes_.empty()) { | 147 pending_data_buffers_.front()->GetRemainingBytes()); |
| 152 MojoResult result = pending_flushes_.front()->Flush(handle_.get()); | |
| 153 if (result == MOJO_RESULT_OK) { | |
| 154 pending_flushes_.pop(); | |
| 155 } else if (result != MOJO_RESULT_SHOULD_WAIT) { | |
| 156 DispatchFatalError(); | |
| 157 return; | |
| 158 } | |
| 159 StartWaiting(); | |
| 160 return; | |
| 161 } | |
| 162 const void* data = NULL; | |
| 163 uint32_t num_bytes = std::numeric_limits<uint32_t>::max(); | |
| 164 result = mojo::BeginReadDataRaw( | |
| 165 handle_.get(), &data, &num_bytes, MOJO_READ_DATA_FLAG_NONE); | |
| 166 if (result != MOJO_RESULT_OK) { | |
| 167 DispatchFatalError(); | |
| 168 return; | |
| 169 } | |
| 170 buffer_in_use_ = new Buffer(this, static_cast<const char*>(data), num_bytes); | |
| 171 ready_callback_.Run(scoped_ptr<ReadOnlyBuffer>(buffer_in_use_)); | 148 ready_callback_.Run(scoped_ptr<ReadOnlyBuffer>(buffer_in_use_)); |
| 172 } | 149 } |
| 173 | 150 |
| 174 void DataSinkReceiver::Done(uint32_t bytes_read) { | 151 void DataSinkReceiver::Done(uint32_t bytes_read) { |
| 175 if (!DoneInternal(bytes_read)) | 152 if (!DoneInternal(bytes_read)) |
| 176 return; | 153 return; |
| 177 client()->ReportBytesSent(bytes_read); | 154 client()->ReportBytesSent(bytes_read); |
| 178 StartWaiting(); | 155 if (!pending_data_buffers_.empty()) { |
| 156 base::MessageLoop::current()->PostTask( |
| 157 FROM_HERE, |
| 158 base::Bind(&DataSinkReceiver::RunReadyCallback, |
| 159 weak_factory_.GetWeakPtr())); |
| 160 } |
| 179 } | 161 } |
| 180 | 162 |
| 181 void DataSinkReceiver::DoneWithError(uint32_t bytes_read, int32_t error) { | 163 void DataSinkReceiver::DoneWithError(uint32_t bytes_read, int32_t error) { |
| 182 if (!DoneInternal(bytes_read)) | 164 if (!DoneInternal(bytes_read)) |
| 183 return; | 165 return; |
| 184 ReportBytesSentAndError(bytes_read, error); | 166 ReportBytesSentAndError(bytes_read, error); |
| 185 } | 167 } |
| 186 | 168 |
| 187 bool DataSinkReceiver::DoneInternal(uint32_t bytes_read) { | 169 bool DataSinkReceiver::DoneInternal(uint32_t bytes_read) { |
| 188 if (shut_down_) | 170 if (shut_down_) |
| 189 return false; | 171 return false; |
| 190 | 172 |
| 191 DCHECK(buffer_in_use_); | 173 DCHECK(buffer_in_use_); |
| 192 buffer_in_use_ = NULL; | 174 buffer_in_use_ = NULL; |
| 193 MojoResult result = mojo::EndReadDataRaw(handle_.get(), bytes_read); | 175 available_buffer_capacity_ += bytes_read; |
| 194 if (result != MOJO_RESULT_OK) { | 176 pending_data_buffers_.front()->OnDataConsumed(bytes_read); |
| 195 DispatchFatalError(); | 177 if (pending_data_buffers_.front()->GetRemainingBytes() == 0) |
| 196 return false; | 178 pending_data_buffers_.pop(); |
| 197 } | |
| 198 return true; | 179 return true; |
| 199 } | 180 } |
| 200 | 181 |
| 201 void DataSinkReceiver::ReportBytesSentAndError(uint32_t bytes_read, | 182 void DataSinkReceiver::ReportBytesSentAndError(uint32_t bytes_read, |
| 202 int32_t error) { | 183 int32_t error) { |
| 203 // When we encounter an error, we must discard the data from any sends already | 184 // When we encounter an error, we must discard the data from any send buffers |
| 204 // in the data pipe before we can resume dispatching data to the sink. We add | 185 // transmitted by the DataSinkClient before it receives this error. |
| 205 // a pending flush here. The response containing the number of bytes to flush | 186 flush_pending_ = true; |
| 206 // is handled in SetNumBytesToFlush(). The actual flush is handled in | |
| 207 // OnDoneWaiting(). | |
| 208 pending_flushes_.push(linked_ptr<PendingFlush>(new PendingFlush())); | |
| 209 client()->ReportBytesSentAndError( | 187 client()->ReportBytesSentAndError( |
| 210 bytes_read, | 188 bytes_read, |
| 211 error, | 189 error, |
| 212 base::Bind(&DataSinkReceiver::SetNumBytesToFlush, | 190 base::Bind(&DataSinkReceiver::DoFlush, weak_factory_.GetWeakPtr())); |
| 213 weak_factory_.GetWeakPtr())); | |
| 214 } | 191 } |
| 215 | 192 |
| 216 void DataSinkReceiver::SetNumBytesToFlush(uint32_t bytes_to_flush) { | 193 void DataSinkReceiver::DoFlush() { |
| 217 DCHECK(!pending_flushes_.empty()); | 194 DCHECK(flush_pending_); |
| 218 DCHECK(!pending_flushes_.back()->received_flush()); | 195 flush_pending_ = false; |
| 219 pending_flushes_.back()->SetNumBytesToFlush(bytes_to_flush); | 196 while (!pending_data_buffers_.empty()) { |
| 220 if (!waiter_) | 197 available_buffer_capacity_ += |
| 221 StartWaiting(); | 198 pending_data_buffers_.front()->GetRemainingBytes(); |
| 199 pending_data_buffers_.pop(); |
| 200 } |
| 222 } | 201 } |
| 223 | 202 |
| 224 void DataSinkReceiver::DispatchFatalError() { | 203 void DataSinkReceiver::DispatchFatalError() { |
| 225 if (shut_down_) | 204 if (shut_down_) |
| 226 return; | 205 return; |
| 227 | 206 |
| 228 ShutDown(); | 207 ShutDown(); |
| 229 if (!error_callback_.is_null()) | 208 if (!error_callback_.is_null()) |
| 230 error_callback_.Run(); | 209 error_callback_.Run(); |
| 231 } | 210 } |
| (...skipping 24 matching lines...) Expand all Loading... |
| 256 | 235 |
| 257 const char* DataSinkReceiver::Buffer::GetData() { | 236 const char* DataSinkReceiver::Buffer::GetData() { |
| 258 return buffer_; | 237 return buffer_; |
| 259 } | 238 } |
| 260 | 239 |
| 261 uint32_t DataSinkReceiver::Buffer::GetSize() { | 240 uint32_t DataSinkReceiver::Buffer::GetSize() { |
| 262 return buffer_size_; | 241 return buffer_size_; |
| 263 } | 242 } |
| 264 | 243 |
| 265 void DataSinkReceiver::Buffer::Done(uint32_t bytes_read) { | 244 void DataSinkReceiver::Buffer::Done(uint32_t bytes_read) { |
| 245 scoped_refptr<DataSinkReceiver> receiver = receiver_; |
| 246 receiver_ = nullptr; |
| 266 if (cancelled_) | 247 if (cancelled_) |
| 267 receiver_->DoneWithError(bytes_read, cancellation_error_); | 248 receiver->DoneWithError(bytes_read, cancellation_error_); |
| 268 else | 249 else |
| 269 receiver_->Done(bytes_read); | 250 receiver->Done(bytes_read); |
| 270 receiver_ = NULL; | |
| 271 buffer_ = NULL; | 251 buffer_ = NULL; |
| 272 buffer_size_ = 0; | 252 buffer_size_ = 0; |
| 273 } | 253 } |
| 274 | 254 |
| 275 void DataSinkReceiver::Buffer::DoneWithError(uint32_t bytes_read, | 255 void DataSinkReceiver::Buffer::DoneWithError(uint32_t bytes_read, |
| 276 int32_t error) { | 256 int32_t error) { |
| 277 receiver_->DoneWithError(bytes_read, error); | 257 scoped_refptr<DataSinkReceiver> receiver = receiver_; |
| 278 receiver_ = NULL; | 258 receiver_ = nullptr; |
| 259 receiver->DoneWithError(bytes_read, error); |
| 279 buffer_ = NULL; | 260 buffer_ = NULL; |
| 280 buffer_size_ = 0; | 261 buffer_size_ = 0; |
| 281 } | 262 } |
| 282 | 263 |
| 283 DataSinkReceiver::PendingFlush::PendingFlush() | 264 DataSinkReceiver::DataFrame::DataFrame(mojo::Array<uint8_t> data) |
| 284 : received_flush_(false), bytes_to_flush_(0) { | 265 : data_(data.Pass()), offset_(0) { |
| 266 DCHECK_LT(0u, data_.size()); |
| 285 } | 267 } |
| 286 | 268 |
| 287 void DataSinkReceiver::PendingFlush::SetNumBytesToFlush(uint32_t num_bytes) { | 269 // Returns the number of uncomsumed bytes remaining of this data frame. |
| 288 DCHECK(!received_flush_); | 270 uint32_t DataSinkReceiver::DataFrame::GetRemainingBytes() { |
| 289 received_flush_ = true; | 271 return static_cast<uint32_t>(data_.size() - offset_); |
| 290 bytes_to_flush_ = num_bytes; | |
| 291 } | 272 } |
| 292 | 273 |
| 293 MojoResult DataSinkReceiver::PendingFlush::Flush( | 274 // Returns a pointer to the remaining data to be consumed. |
| 294 mojo::DataPipeConsumerHandle handle) { | 275 const char* DataSinkReceiver::DataFrame::GetData() { |
| 295 DCHECK(received_flush_); | 276 DCHECK_LT(offset_, data_.size()); |
| 296 uint32_t num_bytes = bytes_to_flush_; | 277 return reinterpret_cast<const char*>(&data_[0]) + offset_; |
| 297 MojoResult result = | 278 } |
| 298 mojo::ReadDataRaw(handle, NULL, &num_bytes, MOJO_READ_DATA_FLAG_DISCARD); | 279 |
| 299 if (result != MOJO_RESULT_OK) | 280 void DataSinkReceiver::DataFrame::OnDataConsumed(uint32_t bytes_read) { |
| 300 return result; | 281 offset_ += bytes_read; |
| 301 DCHECK(num_bytes <= bytes_to_flush_); | 282 DCHECK_LE(offset_, data_.size()); |
| 302 bytes_to_flush_ -= num_bytes; | |
| 303 return bytes_to_flush_ == 0 ? MOJO_RESULT_OK : MOJO_RESULT_SHOULD_WAIT; | |
| 304 } | 283 } |
| 305 | 284 |
| 306 } // namespace device | 285 } // namespace device |
| OLD | NEW |