| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 #include "device/serial/data_source_sender.h" |
| 6 |
| 7 #include <limits> |
| 8 |
| 9 #include "base/bind.h" |
| 10 #include "base/message_loop/message_loop.h" |
| 11 #include "device/serial/async_waiter.h" |
| 12 |
| 13 namespace device { |
| 14 |
| 15 class DataSourceSender::PendingSend { |
| 16 public: |
| 17 PendingSend(DataSourceSender* sender, const ReadyCallback& callback); |
| 18 void GetData(void* data, uint32_t num_bytes); |
| 19 |
| 20 private: |
| 21 class Buffer; |
| 22 void Done(uint32_t bytes_written); |
| 23 void DoneWithError(uint32_t bytes_written, int32_t error); |
| 24 |
| 25 DataSourceSender* sender_; |
| 26 ReadyCallback callback_; |
| 27 bool buffer_in_use_; |
| 28 }; |
| 29 |
| 30 class DataSourceSender::PendingSend::Buffer : public WritableBuffer { |
| 31 public: |
| 32 Buffer(scoped_refptr<DataSourceSender> sender, |
| 33 PendingSend* send, |
| 34 char* buffer, |
| 35 uint32_t buffer_size); |
| 36 virtual ~Buffer(); |
| 37 virtual char* GetData() OVERRIDE; |
| 38 virtual uint32_t GetSize() OVERRIDE; |
| 39 virtual void Done(uint32_t bytes_written) OVERRIDE; |
| 40 virtual void DoneWithError(uint32_t bytes_written, int32_t error) OVERRIDE; |
| 41 |
| 42 private: |
| 43 scoped_refptr<DataSourceSender> sender_; |
| 44 PendingSend* pending_send_; |
| 45 char* buffer_; |
| 46 uint32_t buffer_size_; |
| 47 }; |
| 48 |
| 49 DataSourceSender::DataSourceSender(const ReadyCallback& ready_callback, |
| 50 const ErrorCallback& error_callback) |
| 51 : ready_callback_(ready_callback), |
| 52 error_callback_(error_callback), |
| 53 bytes_sent_(0), |
| 54 shut_down_(false) { |
| 55 DCHECK(!ready_callback.is_null() && !error_callback.is_null()); |
| 56 } |
| 57 |
| 58 void DataSourceSender::ShutDown() { |
| 59 shut_down_ = true; |
| 60 waiter_.reset(); |
| 61 ready_callback_.Reset(); |
| 62 error_callback_.Reset(); |
| 63 } |
| 64 |
| 65 DataSourceSender::~DataSourceSender() { |
| 66 DCHECK(shut_down_); |
| 67 } |
| 68 |
| 69 void DataSourceSender::Init(mojo::ScopedDataPipeProducerHandle handle) { |
| 70 // This should never occur. |handle_| is only valid and |pending_send_| is |
| 71 // only set after Init is called. Receiving an invalid |handle| from the |
| 72 // client is also unrecoverable. |
| 73 if (pending_send_ || handle_.is_valid() || !handle.is_valid() || shut_down_) { |
| 74 DispatchFatalError(); |
| 75 return; |
| 76 } |
| 77 handle_ = handle.Pass(); |
| 78 pending_send_.reset(new PendingSend(this, ready_callback_)); |
| 79 StartWaiting(); |
| 80 } |
| 81 |
| 82 void DataSourceSender::Resume() { |
| 83 if (pending_send_ || !handle_.is_valid()) { |
| 84 DispatchFatalError(); |
| 85 return; |
| 86 } |
| 87 |
| 88 pending_send_.reset(new PendingSend(this, ready_callback_)); |
| 89 StartWaiting(); |
| 90 } |
| 91 |
| 92 void DataSourceSender::OnConnectionError() { |
| 93 DispatchFatalError(); |
| 94 } |
| 95 |
| 96 void DataSourceSender::StartWaiting() { |
| 97 DCHECK(pending_send_ && !waiter_); |
| 98 waiter_.reset( |
| 99 new AsyncWaiter(handle_.get(), |
| 100 MOJO_HANDLE_SIGNAL_WRITABLE, |
| 101 base::Bind(&DataSourceSender::OnDoneWaiting, this))); |
| 102 } |
| 103 |
| 104 void DataSourceSender::OnDoneWaiting(MojoResult result) { |
| 105 DCHECK(pending_send_ && !shut_down_ && waiter_); |
| 106 waiter_.reset(); |
| 107 if (result != MOJO_RESULT_OK) { |
| 108 DispatchFatalError(); |
| 109 return; |
| 110 } |
| 111 void* data = NULL; |
| 112 uint32_t num_bytes = std::numeric_limits<uint32_t>::max(); |
| 113 result = mojo::BeginWriteDataRaw( |
| 114 handle_.get(), &data, &num_bytes, MOJO_READ_DATA_FLAG_NONE); |
| 115 if (result != MOJO_RESULT_OK) { |
| 116 DispatchFatalError(); |
| 117 return; |
| 118 } |
| 119 pending_send_->GetData(static_cast<char*>(data), num_bytes); |
| 120 } |
| 121 |
| 122 void DataSourceSender::Done(uint32_t bytes_written) { |
| 123 DoneInternal(bytes_written); |
| 124 if (!shut_down_) |
| 125 StartWaiting(); |
| 126 } |
| 127 |
| 128 void DataSourceSender::DoneWithError(uint32_t bytes_written, int32_t error) { |
| 129 DoneInternal(bytes_written); |
| 130 pending_send_.reset(); |
| 131 if (!shut_down_) |
| 132 client()->OnError(bytes_sent_, error); |
| 133 // We don't call StartWaiting here so we don't send any additional data until |
| 134 // Resume() is called. |
| 135 } |
| 136 |
| 137 void DataSourceSender::DoneInternal(uint32_t bytes_written) { |
| 138 DCHECK(pending_send_); |
| 139 if (shut_down_) |
| 140 return; |
| 141 |
| 142 bytes_sent_ += bytes_written; |
| 143 MojoResult result = mojo::EndWriteDataRaw(handle_.get(), bytes_written); |
| 144 if (result != MOJO_RESULT_OK) { |
| 145 DispatchFatalError(); |
| 146 return; |
| 147 } |
| 148 } |
| 149 |
| 150 void DataSourceSender::DispatchFatalError() { |
| 151 if (shut_down_) |
| 152 return; |
| 153 |
| 154 error_callback_.Run(); |
| 155 ShutDown(); |
| 156 } |
| 157 |
| 158 DataSourceSender::PendingSend::PendingSend(DataSourceSender* sender, |
| 159 const ReadyCallback& callback) |
| 160 : sender_(sender), callback_(callback), buffer_in_use_(false) { |
| 161 } |
| 162 |
| 163 void DataSourceSender::PendingSend::GetData(void* data, uint32_t num_bytes) { |
| 164 DCHECK(!buffer_in_use_); |
| 165 buffer_in_use_ = true; |
| 166 callback_.Run(scoped_ptr<WritableBuffer>( |
| 167 new Buffer(sender_, this, static_cast<char*>(data), num_bytes))); |
| 168 } |
| 169 |
| 170 void DataSourceSender::PendingSend::Done(uint32_t bytes_written) { |
| 171 DCHECK(buffer_in_use_); |
| 172 buffer_in_use_ = false; |
| 173 sender_->Done(bytes_written); |
| 174 } |
| 175 |
| 176 void DataSourceSender::PendingSend::DoneWithError(uint32_t bytes_written, |
| 177 int32_t error) { |
| 178 DCHECK(buffer_in_use_); |
| 179 buffer_in_use_ = false; |
| 180 sender_->DoneWithError(bytes_written, error); |
| 181 } |
| 182 |
| 183 DataSourceSender::PendingSend::Buffer::Buffer( |
| 184 scoped_refptr<DataSourceSender> sender, |
| 185 PendingSend* send, |
| 186 char* buffer, |
| 187 uint32_t buffer_size) |
| 188 : sender_(sender), |
| 189 pending_send_(send), |
| 190 buffer_(buffer), |
| 191 buffer_size_(buffer_size) { |
| 192 } |
| 193 |
| 194 DataSourceSender::PendingSend::Buffer::~Buffer() { |
| 195 if (sender_) |
| 196 pending_send_->Done(0); |
| 197 } |
| 198 |
| 199 char* DataSourceSender::PendingSend::Buffer::GetData() { |
| 200 return buffer_; |
| 201 } |
| 202 |
| 203 uint32_t DataSourceSender::PendingSend::Buffer::GetSize() { |
| 204 return buffer_size_; |
| 205 } |
| 206 |
| 207 void DataSourceSender::PendingSend::Buffer::Done(uint32_t bytes_written) { |
| 208 DCHECK(sender_); |
| 209 pending_send_->Done(bytes_written); |
| 210 sender_ = NULL; |
| 211 pending_send_ = NULL; |
| 212 buffer_ = NULL; |
| 213 buffer_size_ = 0; |
| 214 } |
| 215 |
| 216 void DataSourceSender::PendingSend::Buffer::DoneWithError( |
| 217 uint32_t bytes_written, |
| 218 int32_t error) { |
| 219 DCHECK(sender_); |
| 220 pending_send_->DoneWithError(bytes_written, error); |
| 221 sender_ = NULL; |
| 222 pending_send_ = NULL; |
| 223 buffer_ = NULL; |
| 224 buffer_size_ = 0; |
| 225 } |
| 226 |
| 227 } // namespace device |
| OLD | NEW |