Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #include "device/serial/data_pipe_receiver.h" | |
| 6 | |
| 7 #include "base/bind.h" | |
| 8 #include "base/message_loop/message_loop.h" | |
| 9 #include "device/serial/async_waiter.h" | |
| 10 | |
| 11 namespace device { | |
| 12 | |
| 13 class DataPipeReceiver::Buffer : public ReadOnlyBuffer { | |
| 14 public: | |
| 15 Buffer(scoped_refptr<DataPipeReceiver> pipe, | |
| 16 const char* buffer, | |
| 17 uint32_t buffer_size); | |
| 18 virtual ~Buffer(); | |
| 19 virtual const char* GetData() OVERRIDE; | |
| 20 virtual uint32_t GetSize() OVERRIDE; | |
| 21 virtual void Done(uint32_t bytes_consumed) OVERRIDE; | |
| 22 virtual void DoneWithError(uint32_t bytes_consumed, int32_t error) OVERRIDE; | |
| 23 | |
| 24 private: | |
| 25 scoped_refptr<DataPipeReceiver> pipe_; | |
| 26 const char* buffer_; | |
| 27 uint32_t buffer_size_; | |
| 28 }; | |
| 29 | |
| 30 DataPipeReceiver::DataPipeReceiver( | |
| 31 mojo::InterfacePtr<serial::DataPipeProducer> producer, | |
| 32 uint32_t buffer_size, | |
| 33 int32_t connection_error_value) | |
| 34 : producer_(producer.Pass()), | |
| 35 connection_error_value_(connection_error_value), | |
| 36 bytes_since_last_error_(0), | |
| 37 pending_error_(false), | |
| 38 error_offset_(0), | |
| 39 error_(0), | |
| 40 state_(STATE_IDLE), | |
| 41 weak_factory_(this) { | |
| 42 MojoCreateDataPipeOptions options = { | |
| 43 sizeof(options), MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1, buffer_size, | |
| 44 }; | |
| 45 options.struct_size = sizeof(options); | |
|
raymes
2014/08/05 06:26:44
is this necessary?
Sam McNally
2014/08/05 07:26:33
Yes.
| |
| 46 mojo::ScopedDataPipeProducerHandle remote_handle; | |
| 47 MojoResult result = mojo::CreateDataPipe(&options, &remote_handle, &handle_); | |
| 48 DCHECK_EQ(MOJO_RESULT_OK, result); | |
| 49 producer_->Init(remote_handle.Pass()); | |
| 50 producer_.set_client(this); | |
| 51 } | |
| 52 | |
| 53 bool DataPipeReceiver::Receive(const ReceiveDataCallback& callback, | |
| 54 const ReceiveErrorCallback& error_callback) { | |
|
raymes
2014/08/05 06:26:44
Similarly for these functions it would be good to
Sam McNally
2014/08/05 07:26:33
Done.
| |
| 55 if (state_ == STATE_PAUSED) { | |
| 56 producer_->Resume(); | |
| 57 state_ = STATE_IDLE; | |
| 58 } | |
| 59 if (state_ != STATE_IDLE || callback.is_null() || error_callback.is_null()) { | |
|
raymes
2014/08/05 06:26:44
Might as well just DCHECK null callbacks rather th
Sam McNally
2014/08/05 07:26:33
Done.
| |
| 60 return false; | |
| 61 } | |
| 62 state_ = STATE_WAITING_FOR_DATA; | |
| 63 receive_callback_ = callback; | |
| 64 receive_error_callback_ = error_callback; | |
| 65 base::MessageLoop::current()->PostTask( | |
| 66 FROM_HERE, | |
| 67 base::Bind(&DataPipeReceiver::ReceiveInternal, | |
| 68 weak_factory_.GetWeakPtr())); | |
| 69 return true; | |
| 70 } | |
| 71 | |
| 72 DataPipeReceiver::~DataPipeReceiver() { | |
| 73 if (!receive_error_callback_.is_null()) | |
| 74 DispatchError(connection_error_value_); | |
| 75 } | |
| 76 | |
| 77 void DataPipeReceiver::Done(uint32_t bytes_consumed) { | |
| 78 if (state_ == STATE_SHUT_DOWN) | |
| 79 return; | |
| 80 | |
| 81 MojoResult result = mojo::EndReadDataRaw(handle_.get(), bytes_consumed); | |
| 82 DCHECK_EQ(MOJO_RESULT_OK, result); | |
| 83 bytes_since_last_error_ += bytes_consumed; | |
| 84 state_ = STATE_IDLE; | |
| 85 } | |
| 86 | |
| 87 void DataPipeReceiver::OnDoneWaiting(MojoResult result) { | |
| 88 RetryReceive(); | |
| 89 } | |
| 90 | |
| 91 void DataPipeReceiver::OnError(uint32_t bytes_since_last_error, int32_t error) { | |
| 92 pending_error_ = true; | |
| 93 error_ = error; | |
| 94 error_offset_ = bytes_since_last_error; | |
| 95 RetryReceive(); | |
| 96 } | |
| 97 | |
| 98 void DataPipeReceiver::OnConnectionError() { | |
| 99 state_ = STATE_SHUT_DOWN; | |
| 100 if (!receive_callback_.is_null()) | |
| 101 DispatchError(connection_error_value_); | |
| 102 } | |
| 103 | |
| 104 void DataPipeReceiver::RetryReceive() { | |
| 105 if (!receive_callback_.is_null()) | |
| 106 ReceiveInternal(); | |
| 107 } | |
| 108 | |
| 109 void DataPipeReceiver::ReceiveInternal() { | |
| 110 if (pending_error_ && bytes_since_last_error_ >= error_offset_) { | |
| 111 pending_error_ = false; | |
| 112 bytes_since_last_error_ -= error_offset_; | |
| 113 error_offset_ = 0; | |
| 114 state_ = STATE_PAUSED; | |
| 115 DispatchError(error_); | |
| 116 return; | |
| 117 } | |
| 118 const void* data; | |
| 119 uint32_t num_bytes = std::numeric_limits<uint32_t>::max(); | |
| 120 MojoResult result = mojo::BeginReadDataRaw( | |
| 121 handle_.get(), &data, &num_bytes, MOJO_READ_DATA_FLAG_NONE); | |
| 122 switch (result) { | |
|
raymes
2014/08/05 06:26:44
consider just using if/else rather than switch sin
Sam McNally
2014/08/05 07:26:33
Done.
| |
| 123 case MOJO_RESULT_OK: | |
| 124 DispatchData(data, num_bytes); | |
| 125 break; | |
| 126 case MOJO_RESULT_SHOULD_WAIT: | |
| 127 waiter_.reset(new AsyncWaiter(handle_.get(), | |
| 128 MOJO_HANDLE_SIGNAL_READABLE, | |
| 129 base::Bind(&DataPipeReceiver::OnDoneWaiting, | |
|
raymes
2014/08/05 06:26:44
Could you bind directly to RetryReceive?
Sam McNally
2014/08/05 07:26:33
OnDoneWaiting should be doing something with the w
| |
| 130 weak_factory_.GetWeakPtr()))); | |
| 131 break; | |
| 132 default: | |
| 133 state_ = STATE_SHUT_DOWN; | |
| 134 DispatchError(connection_error_value_); | |
| 135 break; | |
| 136 } | |
| 137 } | |
| 138 | |
| 139 void DataPipeReceiver::DispatchData(const void* data, uint32_t num_bytes) { | |
| 140 state_ = STATE_WAITING_FOR_BUFFER; | |
| 141 ReceiveDataCallback callback = receive_callback_; | |
| 142 receive_callback_.Reset(); | |
| 143 receive_error_callback_.Reset(); | |
| 144 callback.Run(scoped_ptr<ReadOnlyBuffer>( | |
| 145 new Buffer(this, static_cast<const char*>(data), num_bytes))); | |
| 146 } | |
| 147 | |
| 148 void DataPipeReceiver::DispatchError(int32_t error) { | |
| 149 if (state_ == STATE_WAITING_FOR_DATA) | |
| 150 state_ = STATE_IDLE; | |
| 151 ReceiveErrorCallback callback = receive_error_callback_; | |
| 152 receive_callback_.Reset(); | |
| 153 receive_error_callback_.Reset(); | |
| 154 callback.Run(error); | |
| 155 } | |
| 156 | |
| 157 DataPipeReceiver::Buffer::Buffer(scoped_refptr<DataPipeReceiver> pipe, | |
| 158 const char* buffer, | |
| 159 uint32_t buffer_size) | |
| 160 : pipe_(pipe), buffer_(buffer), buffer_size_(buffer_size) { | |
| 161 } | |
| 162 | |
| 163 DataPipeReceiver::Buffer::~Buffer() { | |
| 164 if (pipe_) | |
| 165 pipe_->Done(0); | |
| 166 } | |
| 167 | |
| 168 const char* DataPipeReceiver::Buffer::GetData() { | |
| 169 return buffer_; | |
| 170 } | |
| 171 | |
| 172 uint32_t DataPipeReceiver::Buffer::GetSize() { | |
| 173 return buffer_size_; | |
| 174 } | |
| 175 | |
| 176 void DataPipeReceiver::Buffer::Done(uint32_t bytes_consumed) { | |
| 177 pipe_->Done(bytes_consumed); | |
| 178 pipe_ = NULL; | |
| 179 buffer_ = NULL; | |
| 180 buffer_size_ = 0; | |
| 181 } | |
| 182 | |
| 183 void DataPipeReceiver::Buffer::DoneWithError(uint32_t bytes_consumed, | |
| 184 int32_t error) { | |
| 185 Done(bytes_consumed); | |
| 186 } | |
| 187 | |
| 188 } // namespace device | |
| OLD | NEW |