| OLD | NEW |
| (Empty) |
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #include "device/serial/data_source_sender.h" | |
| 6 | |
| 7 #include <algorithm> | |
| 8 #include <limits> | |
| 9 #include <memory> | |
| 10 #include <utility> | |
| 11 | |
| 12 #include "base/bind.h" | |
| 13 #include "base/location.h" | |
| 14 #include "base/single_thread_task_runner.h" | |
| 15 #include "base/threading/thread_task_runner_handle.h" | |
| 16 | |
| 17 namespace device { | |
| 18 | |
| 19 // Represents a send that is not yet fulfilled. | |
| 20 class DataSourceSender::PendingSend { | |
| 21 public: | |
| 22 PendingSend(DataSourceSender* sender, const ReadyCallback& callback); | |
| 23 | |
| 24 // Asynchronously fills |data_| with up to |num_bytes| of data. Following | |
| 25 // this, one of Done() and DoneWithError() will be called with the result. | |
| 26 void GetData(uint32_t num_bytes); | |
| 27 | |
| 28 private: | |
| 29 class Buffer; | |
| 30 // Reports a successful write of |bytes_written|. | |
| 31 void Done(uint32_t bytes_written); | |
| 32 | |
| 33 // Reports a partially successful or unsuccessful write of |bytes_written| | |
| 34 // with an error of |error|. | |
| 35 void DoneWithError(uint32_t bytes_written, int32_t error); | |
| 36 | |
| 37 // The DataSourceSender that owns this. | |
| 38 DataSourceSender* sender_; | |
| 39 | |
| 40 // The callback to call to get data. | |
| 41 ReadyCallback callback_; | |
| 42 | |
| 43 // Whether the buffer specified by GetData() has been passed to |callback_|, | |
| 44 // but has not yet called Done() or DoneWithError(). | |
| 45 bool buffer_in_use_; | |
| 46 | |
| 47 // The data obtained using |callback_| to be dispatched to the client. | |
| 48 std::vector<char> data_; | |
| 49 }; | |
| 50 | |
| 51 // A Writable implementation that provides a view of a buffer owned by a | |
| 52 // DataSourceSender. | |
| 53 class DataSourceSender::PendingSend::Buffer : public WritableBuffer { | |
| 54 public: | |
| 55 Buffer(scoped_refptr<DataSourceSender> sender, | |
| 56 PendingSend* send, | |
| 57 char* buffer, | |
| 58 uint32_t buffer_size); | |
| 59 ~Buffer() override; | |
| 60 | |
| 61 // WritableBuffer overrides. | |
| 62 char* GetData() override; | |
| 63 uint32_t GetSize() override; | |
| 64 void Done(uint32_t bytes_written) override; | |
| 65 void DoneWithError(uint32_t bytes_written, int32_t error) override; | |
| 66 | |
| 67 private: | |
| 68 // The DataSourceSender of whose buffer we are providing a view. | |
| 69 scoped_refptr<DataSourceSender> sender_; | |
| 70 | |
| 71 // The PendingSend to which this buffer has been created in response. | |
| 72 PendingSend* pending_send_; | |
| 73 | |
| 74 char* buffer_; | |
| 75 uint32_t buffer_size_; | |
| 76 }; | |
| 77 | |
| 78 DataSourceSender::DataSourceSender( | |
| 79 mojo::InterfaceRequest<serial::DataSource> source, | |
| 80 mojo::InterfacePtr<serial::DataSourceClient> client, | |
| 81 const ReadyCallback& ready_callback, | |
| 82 const ErrorCallback& error_callback) | |
| 83 : binding_(this, std::move(source)), | |
| 84 client_(std::move(client)), | |
| 85 ready_callback_(ready_callback), | |
| 86 error_callback_(error_callback), | |
| 87 available_buffer_capacity_(0), | |
| 88 paused_(false), | |
| 89 shut_down_(false), | |
| 90 weak_factory_(this) { | |
| 91 DCHECK(!ready_callback.is_null() && !error_callback.is_null()); | |
| 92 binding_.set_connection_error_handler( | |
| 93 base::Bind(&DataSourceSender::OnConnectionError, base::Unretained(this))); | |
| 94 client_.set_connection_error_handler( | |
| 95 base::Bind(&DataSourceSender::OnConnectionError, base::Unretained(this))); | |
| 96 } | |
| 97 | |
| 98 void DataSourceSender::ShutDown() { | |
| 99 shut_down_ = true; | |
| 100 ready_callback_.Reset(); | |
| 101 error_callback_.Reset(); | |
| 102 } | |
| 103 | |
| 104 DataSourceSender::~DataSourceSender() { | |
| 105 } | |
| 106 | |
| 107 void DataSourceSender::Init(uint32_t buffer_size) { | |
| 108 available_buffer_capacity_ = buffer_size; | |
| 109 GetMoreData(); | |
| 110 } | |
| 111 | |
| 112 void DataSourceSender::Resume() { | |
| 113 if (pending_send_) { | |
| 114 DispatchFatalError(); | |
| 115 return; | |
| 116 } | |
| 117 | |
| 118 paused_ = false; | |
| 119 GetMoreData(); | |
| 120 } | |
| 121 | |
| 122 void DataSourceSender::ReportBytesReceived(uint32_t bytes_sent) { | |
| 123 available_buffer_capacity_ += bytes_sent; | |
| 124 if (!pending_send_ && !paused_) | |
| 125 GetMoreData(); | |
| 126 } | |
| 127 | |
| 128 void DataSourceSender::OnConnectionError() { | |
| 129 DispatchFatalError(); | |
| 130 } | |
| 131 | |
| 132 void DataSourceSender::GetMoreData() { | |
| 133 if (shut_down_ || paused_ || pending_send_ || !available_buffer_capacity_) | |
| 134 return; | |
| 135 | |
| 136 pending_send_.reset(new PendingSend(this, ready_callback_)); | |
| 137 pending_send_->GetData(available_buffer_capacity_); | |
| 138 } | |
| 139 | |
| 140 void DataSourceSender::Done(const std::vector<char>& data) { | |
| 141 DoneInternal(data); | |
| 142 if (!shut_down_ && available_buffer_capacity_) { | |
| 143 base::ThreadTaskRunnerHandle::Get()->PostTask( | |
| 144 FROM_HERE, | |
| 145 base::Bind(&DataSourceSender::GetMoreData, weak_factory_.GetWeakPtr())); | |
| 146 } | |
| 147 } | |
| 148 | |
| 149 void DataSourceSender::DoneWithError(const std::vector<char>& data, | |
| 150 int32_t error) { | |
| 151 DoneInternal(data); | |
| 152 if (!shut_down_) | |
| 153 client_->OnError(error); | |
| 154 paused_ = true; | |
| 155 // We don't call GetMoreData here so we don't send any additional data until | |
| 156 // Resume() is called. | |
| 157 } | |
| 158 | |
| 159 void DataSourceSender::DoneInternal(const std::vector<char>& data) { | |
| 160 DCHECK(pending_send_); | |
| 161 if (shut_down_) | |
| 162 return; | |
| 163 | |
| 164 available_buffer_capacity_ -= static_cast<uint32_t>(data.size()); | |
| 165 if (!data.empty()) { | |
| 166 mojo::Array<uint8_t> data_to_send(data.size()); | |
| 167 std::copy(data.begin(), data.end(), &data_to_send[0]); | |
| 168 client_->OnData(std::move(data_to_send)); | |
| 169 } | |
| 170 pending_send_.reset(); | |
| 171 } | |
| 172 | |
| 173 void DataSourceSender::DispatchFatalError() { | |
| 174 if (shut_down_) | |
| 175 return; | |
| 176 | |
| 177 error_callback_.Run(); | |
| 178 ShutDown(); | |
| 179 } | |
| 180 | |
| 181 DataSourceSender::PendingSend::PendingSend(DataSourceSender* sender, | |
| 182 const ReadyCallback& callback) | |
| 183 : sender_(sender), callback_(callback), buffer_in_use_(false) { | |
| 184 } | |
| 185 | |
| 186 void DataSourceSender::PendingSend::GetData(uint32_t num_bytes) { | |
| 187 DCHECK(num_bytes); | |
| 188 DCHECK(!buffer_in_use_); | |
| 189 buffer_in_use_ = true; | |
| 190 data_.resize(num_bytes); | |
| 191 callback_.Run(std::unique_ptr<WritableBuffer>( | |
| 192 new Buffer(sender_, this, &data_[0], num_bytes))); | |
| 193 } | |
| 194 | |
| 195 void DataSourceSender::PendingSend::Done(uint32_t bytes_written) { | |
| 196 DCHECK(buffer_in_use_); | |
| 197 DCHECK_LE(bytes_written, data_.size()); | |
| 198 buffer_in_use_ = false; | |
| 199 data_.resize(bytes_written); | |
| 200 sender_->Done(data_); | |
| 201 } | |
| 202 | |
| 203 void DataSourceSender::PendingSend::DoneWithError(uint32_t bytes_written, | |
| 204 int32_t error) { | |
| 205 DCHECK(buffer_in_use_); | |
| 206 DCHECK_LE(bytes_written, data_.size()); | |
| 207 buffer_in_use_ = false; | |
| 208 data_.resize(bytes_written); | |
| 209 sender_->DoneWithError(data_, error); | |
| 210 } | |
| 211 | |
| 212 DataSourceSender::PendingSend::Buffer::Buffer( | |
| 213 scoped_refptr<DataSourceSender> sender, | |
| 214 PendingSend* send, | |
| 215 char* buffer, | |
| 216 uint32_t buffer_size) | |
| 217 : sender_(sender), | |
| 218 pending_send_(send), | |
| 219 buffer_(buffer), | |
| 220 buffer_size_(buffer_size) { | |
| 221 } | |
| 222 | |
| 223 DataSourceSender::PendingSend::Buffer::~Buffer() { | |
| 224 if (pending_send_) | |
| 225 pending_send_->Done(0); | |
| 226 } | |
| 227 | |
| 228 char* DataSourceSender::PendingSend::Buffer::GetData() { | |
| 229 return buffer_; | |
| 230 } | |
| 231 | |
| 232 uint32_t DataSourceSender::PendingSend::Buffer::GetSize() { | |
| 233 return buffer_size_; | |
| 234 } | |
| 235 | |
| 236 void DataSourceSender::PendingSend::Buffer::Done(uint32_t bytes_written) { | |
| 237 DCHECK(sender_.get()); | |
| 238 PendingSend* send = pending_send_; | |
| 239 pending_send_ = nullptr; | |
| 240 send->Done(bytes_written); | |
| 241 sender_ = nullptr; | |
| 242 } | |
| 243 | |
| 244 void DataSourceSender::PendingSend::Buffer::DoneWithError( | |
| 245 uint32_t bytes_written, | |
| 246 int32_t error) { | |
| 247 DCHECK(sender_.get()); | |
| 248 PendingSend* send = pending_send_; | |
| 249 pending_send_ = nullptr; | |
| 250 send->DoneWithError(bytes_written, error); | |
| 251 sender_ = nullptr; | |
| 252 } | |
| 253 | |
| 254 } // namespace device | |
| OLD | NEW |