Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(409)

Side by Side Diff: device/serial/data_receiver.cc

Issue 437933002: Add data pipe wrappers to be used to implement serial receive. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@serial-buffer
Patch Set: address comments Created 6 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « device/serial/data_receiver.h ('k') | device/serial/data_source_sender.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "device/serial/data_receiver.h"
6
7 #include <limits>
8
9 #include "base/bind.h"
10 #include "base/message_loop/message_loop.h"
11 #include "device/serial/async_waiter.h"
12
13 namespace device {
14
15 class DataReceiver::Buffer : public ReadOnlyBuffer {
16 public:
17 Buffer(scoped_refptr<DataReceiver> pipe,
18 const char* buffer,
19 uint32_t buffer_size);
20 virtual ~Buffer();
21 virtual const char* GetData() OVERRIDE;
22 virtual uint32_t GetSize() OVERRIDE;
23 virtual void Done(uint32_t bytes_consumed) OVERRIDE;
24 virtual void DoneWithError(uint32_t bytes_consumed, int32_t error) OVERRIDE;
25
26 private:
27 scoped_refptr<DataReceiver> pipe_;
28 const char* buffer_;
29 uint32_t buffer_size_;
30 };
31
32 DataReceiver::DataReceiver(mojo::InterfacePtr<serial::DataSource> source,
33 uint32_t buffer_size,
34 int32_t connection_error_value)
35 : source_(source.Pass()),
36 connection_error_value_(connection_error_value),
37 bytes_received_(0),
38 pending_error_(false),
39 error_offset_(0),
40 error_(0),
41 state_(STATE_IDLE),
42 weak_factory_(this) {
43 MojoCreateDataPipeOptions options = {
44 sizeof(options), MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1, buffer_size,
45 };
46 mojo::ScopedDataPipeProducerHandle remote_handle;
47 MojoResult result = mojo::CreateDataPipe(&options, &remote_handle, &handle_);
48 DCHECK_EQ(MOJO_RESULT_OK, result);
49 source_->Init(remote_handle.Pass());
50 source_.set_client(this);
51 }
52
53 // This is part of the public interface so can be called while we are in any
54 // state.
55 bool DataReceiver::Receive(const ReceiveDataCallback& callback,
56 const ReceiveErrorCallback& error_callback) {
57 DCHECK(!callback.is_null() && !error_callback.is_null());
58 if (state_ == STATE_PAUSED) {
59 source_->Resume();
60 state_ = STATE_IDLE;
61 }
62 if (state_ != STATE_IDLE)
63 return false;
64
65 state_ = STATE_WAITING_FOR_DATA;
66 receive_callback_ = callback;
67 receive_error_callback_ = error_callback;
68 base::MessageLoop::current()->PostTask(
69 FROM_HERE,
70 base::Bind(&DataReceiver::ReceiveInternal, weak_factory_.GetWeakPtr()));
71 return true;
72 }
73
74 DataReceiver::~DataReceiver() {
75 DCHECK(state_ == STATE_IDLE || state_ == STATE_PAUSED ||
76 state_ == STATE_WAITING_FOR_DATA || state_ == STATE_SHUT_DOWN);
77 if (state_ == STATE_WAITING_FOR_DATA) {
78 state_ = STATE_SHUT_DOWN;
79 DispatchError(connection_error_value_);
80 }
81 }
82
83 void DataReceiver::Done(uint32_t bytes_consumed) {
84 DCHECK(state_ == STATE_WAITING_FOR_BUFFER || state_ == STATE_SHUT_DOWN);
85 if (state_ == STATE_SHUT_DOWN)
86 return;
87
88 MojoResult result = mojo::EndReadDataRaw(handle_.get(), bytes_consumed);
89 DCHECK_EQ(MOJO_RESULT_OK, result);
90 bytes_received_ += bytes_consumed;
raymes 2014/08/06 00:13:36 We should make sure that this never exceeds error_
Sam McNally 2014/08/06 03:51:30 Done.
91 state_ = STATE_IDLE;
92 }
93
94 void DataReceiver::OnDoneWaiting(MojoResult result) {
95 DCHECK_EQ(state_, STATE_WAITING_FOR_DATA);
raymes 2014/08/06 00:13:36 could we be shutdown here? I'm not sure?
Sam McNally 2014/08/06 03:51:30 No, when waiting is cancelled when shutting down.
96 if (result != MOJO_RESULT_OK) {
97 state_ = STATE_SHUT_DOWN;
98 DispatchError(connection_error_value_);
99 return;
100 }
101 ReceiveInternal();
102 }
103
104 // This is part of the DataSourceClient interface so can be called while we are
105 // in any state.
106 void DataReceiver::OnError(uint32_t bytes_since_last_error, int32_t error) {
raymes 2014/08/06 00:13:36 We should know that we aren't in the paused state
Sam McNally 2014/08/06 03:51:30 Done.
107 pending_error_ = true;
108 error_ = error;
109 error_offset_ = bytes_since_last_error;
110 if (state_ == STATE_WAITING_FOR_DATA)
111 ReceiveInternal();
112 }
113
114 // This is invoked in the case of a connection error so can be called while we
115 // are in any state.
116 void DataReceiver::OnConnectionError() {
117 State old_state = state_;
118 state_ = STATE_SHUT_DOWN;
119 if (old_state == STATE_WAITING_FOR_DATA)
120 DispatchError(connection_error_value_);
121 waiter_.reset();
122 }
123
124 void DataReceiver::ReceiveInternal() {
125 DCHECK_EQ(state_, STATE_WAITING_FOR_DATA);
126 if (pending_error_ && bytes_received_ == error_offset_) {
127 pending_error_ = false;
128 error_offset_ = 0;
129 state_ = STATE_PAUSED;
130 DispatchError(error_);
131 return;
132 }
133 const void* data;
134 uint32_t num_bytes = std::numeric_limits<uint32_t>::max();
135 MojoResult result = mojo::BeginReadDataRaw(
136 handle_.get(), &data, &num_bytes, MOJO_READ_DATA_FLAG_NONE);
137 if (result == MOJO_RESULT_OK) {
138 DispatchData(data, num_bytes);
139 return;
140 }
141 if (result == MOJO_RESULT_SHOULD_WAIT) {
142 waiter_.reset(new AsyncWaiter(
143 handle_.get(),
144 MOJO_HANDLE_SIGNAL_READABLE,
145 base::Bind(&DataReceiver::OnDoneWaiting, weak_factory_.GetWeakPtr())));
146 return;
147 }
148 waiter_.reset();
149 state_ = STATE_SHUT_DOWN;
150 DispatchError(connection_error_value_);
151 }
152
153 void DataReceiver::DispatchData(const void* data, uint32_t num_bytes) {
154 DCHECK(state_ == STATE_WAITING_FOR_DATA);
155 state_ = STATE_WAITING_FOR_BUFFER;
156 ReceiveDataCallback callback = receive_callback_;
157 receive_callback_.Reset();
158 receive_error_callback_.Reset();
159 callback.Run(scoped_ptr<ReadOnlyBuffer>(
160 new Buffer(this, static_cast<const char*>(data), num_bytes)));
161 }
162
163 void DataReceiver::DispatchError(int32_t error) {
164 DCHECK(state_ == STATE_PAUSED || state_ == STATE_SHUT_DOWN);
165 waiter_.reset();
166 ReceiveErrorCallback callback = receive_error_callback_;
167 receive_callback_.Reset();
168 receive_error_callback_.Reset();
169 callback.Run(error);
170 }
171
172 DataReceiver::Buffer::Buffer(scoped_refptr<DataReceiver> pipe,
173 const char* buffer,
174 uint32_t buffer_size)
175 : pipe_(pipe), buffer_(buffer), buffer_size_(buffer_size) {
176 }
177
178 DataReceiver::Buffer::~Buffer() {
179 if (pipe_)
180 pipe_->Done(0);
181 }
182
183 const char* DataReceiver::Buffer::GetData() {
184 return buffer_;
185 }
186
187 uint32_t DataReceiver::Buffer::GetSize() {
188 return buffer_size_;
189 }
190
191 void DataReceiver::Buffer::Done(uint32_t bytes_consumed) {
192 pipe_->Done(bytes_consumed);
193 pipe_ = NULL;
194 buffer_ = NULL;
195 buffer_size_ = 0;
196 }
197
198 void DataReceiver::Buffer::DoneWithError(uint32_t bytes_consumed,
199 int32_t error) {
200 Done(bytes_consumed);
201 }
202
203 } // namespace device
OLDNEW
« no previous file with comments | « device/serial/data_receiver.h ('k') | device/serial/data_source_sender.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698