Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(353)

Side by Side Diff: device/serial/data_receiver.cc

Issue 437933002: Add data pipe wrappers to be used to implement serial receive. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@serial-buffer
Patch Set: address comments Created 6 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « device/serial/data_receiver.h ('k') | device/serial/data_source_sender.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "device/serial/data_receiver.h"
6
7 #include <limits>
8
9 #include "base/bind.h"
10 #include "base/message_loop/message_loop.h"
11 #include "device/serial/async_waiter.h"
12
13 namespace device {
14
15 class DataReceiver::Buffer : public ReadOnlyBuffer {
16 public:
17 Buffer(scoped_refptr<DataReceiver> pipe,
18 const char* buffer,
19 uint32_t buffer_size);
20 virtual ~Buffer();
21 virtual const char* GetData() OVERRIDE;
22 virtual uint32_t GetSize() OVERRIDE;
23 virtual void Done(uint32_t bytes_consumed) OVERRIDE;
24 virtual void DoneWithError(uint32_t bytes_consumed, int32_t error) OVERRIDE;
25
26 private:
27 scoped_refptr<DataReceiver> pipe_;
28 const char* buffer_;
29 uint32_t buffer_size_;
30 };
31
32 DataReceiver::DataReceiver(mojo::InterfacePtr<serial::DataSource> source,
33 uint32_t buffer_size,
34 int32_t connection_error_value)
35 : source_(source.Pass()),
36 connection_error_value_(connection_error_value),
37 bytes_since_last_error_(0),
38 pending_error_(false),
39 error_offset_(0),
40 error_(0),
41 state_(STATE_IDLE),
42 weak_factory_(this) {
43 MojoCreateDataPipeOptions options = {
44 sizeof(options), MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1, buffer_size,
45 };
46 options.struct_size = sizeof(options);
47 mojo::ScopedDataPipeProducerHandle remote_handle;
48 MojoResult result = mojo::CreateDataPipe(&options, &remote_handle, &handle_);
49 DCHECK_EQ(MOJO_RESULT_OK, result);
50 source_->Init(remote_handle.Pass());
51 source_.set_client(this);
52 }
53
54 bool DataReceiver::Receive(const ReceiveDataCallback& callback,
55 const ReceiveErrorCallback& error_callback) {
56 DCHECK(!callback.is_null() && !error_callback.is_null());
57 if (state_ == STATE_PAUSED) {
58 source_->Resume();
59 state_ = STATE_IDLE;
60 }
61 if (state_ != STATE_IDLE)
62 return false;
63
64 state_ = STATE_WAITING_FOR_DATA;
65 receive_callback_ = callback;
66 receive_error_callback_ = error_callback;
67 base::MessageLoop::current()->PostTask(
68 FROM_HERE,
69 base::Bind(&DataReceiver::ReceiveInternal, weak_factory_.GetWeakPtr()));
70 return true;
71 }
72
73 DataReceiver::~DataReceiver() {
74 if (!receive_error_callback_.is_null())
75 DispatchError(connection_error_value_);
76 }
77
78 void DataReceiver::Done(uint32_t bytes_consumed) {
79 DCHECK(state_ == STATE_WAITING_FOR_BUFFER || state_ == STATE_SHUT_DOWN);
80 if (state_ == STATE_SHUT_DOWN)
81 return;
82
83 MojoResult result = mojo::EndReadDataRaw(handle_.get(), bytes_consumed);
84 DCHECK_EQ(MOJO_RESULT_OK, result);
85 bytes_since_last_error_ += bytes_consumed;
86 state_ = STATE_IDLE;
87 }
88
89 void DataReceiver::OnDoneWaiting(MojoResult result) {
raymes 2014/08/05 08:10:20 I think we can only be in a subset of states here,
Sam McNally 2014/08/05 08:33:09 Done.
90 if (result != MOJO_RESULT_OK) {
91 OnConnectionError();
raymes 2014/08/05 08:10:20 Consider inlining OnConnectionError here as I also
Sam McNally 2014/08/05 08:33:09 Done.
92 return;
93 }
94 RetryReceive();
95 }
96
97 void DataReceiver::OnError(uint32_t bytes_since_last_error, int32_t error) {
raymes 2014/08/05 08:10:20 Consider putting comments in all of the entrypoint
Sam McNally 2014/08/05 08:33:08 Done.
98 pending_error_ = true;
99 error_ = error;
100 error_offset_ = bytes_since_last_error;
101 RetryReceive();
102 }
103
104 void DataReceiver::OnConnectionError() {
105 state_ = STATE_SHUT_DOWN;
106 if (!receive_callback_.is_null())
raymes 2014/08/05 08:10:20 Maybe rather than checking whether callbacks are n
Sam McNally 2014/08/05 08:33:09 Done.
107 DispatchError(connection_error_value_);
108 }
109
110 void DataReceiver::RetryReceive() {
raymes 2014/08/05 08:10:20 Inline RetryReceive to reduce indirection
Sam McNally 2014/08/05 08:33:08 Done.
111 if (!receive_callback_.is_null())
112 ReceiveInternal();
113 }
114
115 void DataReceiver::ReceiveInternal() {
116 DCHECK(state_ == STATE_WAITING_FOR_DATA);
117 if (pending_error_ && bytes_since_last_error_ >= error_offset_) {
118 pending_error_ = false;
119 bytes_since_last_error_ -= error_offset_;
raymes 2014/08/05 08:10:20 As we discussed consider always just counting all
Sam McNally 2014/08/05 08:33:08 Done.
120 error_offset_ = 0;
121 state_ = STATE_PAUSED;
122 DispatchError(error_);
123 return;
124 }
125 const void* data;
126 uint32_t num_bytes = std::numeric_limits<uint32_t>::max();
127 MojoResult result = mojo::BeginReadDataRaw(
128 handle_.get(), &data, &num_bytes, MOJO_READ_DATA_FLAG_NONE);
129 if (result == MOJO_RESULT_OK) {
130 DispatchData(data, num_bytes);
131 return;
132 }
133 if (result == MOJO_RESULT_SHOULD_WAIT) {
134 waiter_.reset(new AsyncWaiter(
135 handle_.get(),
136 MOJO_HANDLE_SIGNAL_READABLE,
137 base::Bind(&DataReceiver::OnDoneWaiting, weak_factory_.GetWeakPtr())));
138 return;
139 }
140 state_ = STATE_SHUT_DOWN;
141 DispatchError(connection_error_value_);
142 }
143
144 void DataReceiver::DispatchData(const void* data, uint32_t num_bytes) {
145 DCHECK(state_ == STATE_WAITING_FOR_DATA);
146 state_ = STATE_WAITING_FOR_BUFFER;
147 ReceiveDataCallback callback = receive_callback_;
148 receive_callback_.Reset();
149 receive_error_callback_.Reset();
150 callback.Run(scoped_ptr<ReadOnlyBuffer>(
151 new Buffer(this, static_cast<const char*>(data), num_bytes)));
152 }
153
154 void DataReceiver::DispatchError(int32_t error) {
155 DCHECK(state_ == STATE_WAITING_FOR_DATA || state_ == STATE_PAUSED ||
156 state_ == STATE_SHUT_DOWN);
157 if (state_ == STATE_WAITING_FOR_DATA)
158 state_ = STATE_IDLE;
159 ReceiveErrorCallback callback = receive_error_callback_;
160 receive_callback_.Reset();
161 receive_error_callback_.Reset();
162 callback.Run(error);
163 }
164
165 DataReceiver::Buffer::Buffer(scoped_refptr<DataReceiver> pipe,
166 const char* buffer,
167 uint32_t buffer_size)
168 : pipe_(pipe), buffer_(buffer), buffer_size_(buffer_size) {
169 }
170
171 DataReceiver::Buffer::~Buffer() {
172 if (pipe_)
173 pipe_->Done(0);
174 }
175
176 const char* DataReceiver::Buffer::GetData() {
177 return buffer_;
178 }
179
180 uint32_t DataReceiver::Buffer::GetSize() {
181 return buffer_size_;
182 }
183
184 void DataReceiver::Buffer::Done(uint32_t bytes_consumed) {
185 pipe_->Done(bytes_consumed);
186 pipe_ = NULL;
187 buffer_ = NULL;
188 buffer_size_ = 0;
189 }
190
191 void DataReceiver::Buffer::DoneWithError(uint32_t bytes_consumed,
192 int32_t error) {
193 Done(bytes_consumed);
194 }
195
196 } // namespace device
OLDNEW
« no previous file with comments | « device/serial/data_receiver.h ('k') | device/serial/data_source_sender.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698