Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(140)

Side by Side Diff: device/serial/data_source_sender.cc

Issue 437933002: Add data pipe wrappers to be used to implement serial receive. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@serial-buffer
Patch Set: address comments Created 6 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "device/serial/data_source_sender.h"
6
7 #include <limits>
8
9 #include "base/bind.h"
10 #include "base/message_loop/message_loop.h"
11 #include "device/serial/async_waiter.h"
12
13 namespace device {
14
15 class DataSourceSender::PendingSend {
16 public:
17 PendingSend(DataSourceSender* sender, const ReadyCallback& callback);
18 void GetData(void* data, uint32_t num_bytes);
19
20 private:
21 class Buffer;
22 void Done(uint32_t bytes_written);
23 void DoneWithError(uint32_t bytes_written, int32_t error);
24
25 DataSourceSender* sender_;
26 ReadyCallback callback_;
27 bool buffer_in_use_;
28 };
29
30 class DataSourceSender::PendingSend::Buffer : public WritableBuffer {
31 public:
32 Buffer(scoped_refptr<DataSourceSender> sender,
33 PendingSend* send,
34 char* buffer,
35 uint32_t buffer_size);
36 virtual ~Buffer();
37 virtual char* GetData() OVERRIDE;
38 virtual uint32_t GetSize() OVERRIDE;
39 virtual void Done(uint32_t bytes_written) OVERRIDE;
40 virtual void DoneWithError(uint32_t bytes_written, int32_t error) OVERRIDE;
41
42 private:
43 scoped_refptr<DataSourceSender> sender_;
44 PendingSend* pending_send_;
45 char* buffer_;
46 uint32_t buffer_size_;
47 };
48
49 DataSourceSender::DataSourceSender(const ReadyCallback& ready_callback,
50 const ErrorCallback& error_callback)
51 : ready_callback_(ready_callback),
52 error_callback_(error_callback),
53 bytes_sent_(0),
54 shut_down_(false) {
55 DCHECK(!ready_callback.is_null() && !error_callback.is_null());
56 }
57
58 void DataSourceSender::ShutDown() {
59 shut_down_ = true;
60 waiter_.reset();
61 ready_callback_.Reset();
62 error_callback_.Reset();
63 }
64
65 DataSourceSender::~DataSourceSender() {
66 DCHECK(shut_down_);
67 }
68
69 void DataSourceSender::Init(mojo::ScopedDataPipeProducerHandle handle) {
70 // This should never occur. |handle_| is only valid and |pending_send_| is
71 // only set after Init is called. Receiving an invalid |handle| from the
72 // client is also unrecoverable.
73 if (pending_send_ || handle_.is_valid() || !handle.is_valid() || shut_down_) {
74 DispatchFatalError();
75 return;
76 }
77 handle_ = handle.Pass();
78 pending_send_.reset(new PendingSend(this, ready_callback_));
79 StartWaiting();
80 }
81
82 void DataSourceSender::Resume() {
83 if (pending_send_ || !handle_.is_valid()) {
84 DispatchFatalError();
85 return;
86 }
87
88 pending_send_.reset(new PendingSend(this, ready_callback_));
89 StartWaiting();
90 }
91
92 void DataSourceSender::OnConnectionError() {
93 DispatchFatalError();
94 }
95
96 void DataSourceSender::StartWaiting() {
97 DCHECK(pending_send_ && !waiter_);
98 waiter_.reset(
99 new AsyncWaiter(handle_.get(),
100 MOJO_HANDLE_SIGNAL_WRITABLE,
101 base::Bind(&DataSourceSender::OnDoneWaiting, this)));
102 }
103
104 void DataSourceSender::OnDoneWaiting(MojoResult result) {
105 DCHECK(pending_send_ && !shut_down_ && waiter_);
106 waiter_.reset();
107 if (result != MOJO_RESULT_OK) {
108 DispatchFatalError();
109 return;
110 }
111 void* data = NULL;
112 uint32_t num_bytes = std::numeric_limits<uint32_t>::max();
113 result = mojo::BeginWriteDataRaw(
114 handle_.get(), &data, &num_bytes, MOJO_READ_DATA_FLAG_NONE);
115 if (result != MOJO_RESULT_OK) {
116 DispatchFatalError();
117 return;
118 }
119 pending_send_->GetData(static_cast<char*>(data), num_bytes);
120 }
121
122 void DataSourceSender::Done(uint32_t bytes_written) {
123 DoneInternal(bytes_written);
124 if (!shut_down_)
125 StartWaiting();
126 }
127
128 void DataSourceSender::DoneWithError(uint32_t bytes_written, int32_t error) {
129 DoneInternal(bytes_written);
130 pending_send_.reset();
131 if (!shut_down_)
132 client()->OnError(bytes_sent_, error);
133 // We don't call StartWaiting here so we don't send any additional data until
134 // Resume() is called.
135 }
136
137 void DataSourceSender::DoneInternal(uint32_t bytes_written) {
138 DCHECK(pending_send_);
139 if (shut_down_)
140 return;
141
142 bytes_sent_ += bytes_written;
143 MojoResult result = mojo::EndWriteDataRaw(handle_.get(), bytes_written);
144 if (result != MOJO_RESULT_OK) {
145 DispatchFatalError();
146 return;
147 }
148 }
149
150 void DataSourceSender::DispatchFatalError() {
151 if (shut_down_)
152 return;
153
154 error_callback_.Run();
155 ShutDown();
156 }
157
158 DataSourceSender::PendingSend::PendingSend(DataSourceSender* sender,
159 const ReadyCallback& callback)
160 : sender_(sender), callback_(callback), buffer_in_use_(false) {
161 }
162
163 void DataSourceSender::PendingSend::GetData(void* data, uint32_t num_bytes) {
164 DCHECK(!buffer_in_use_);
165 buffer_in_use_ = true;
166 callback_.Run(scoped_ptr<WritableBuffer>(
167 new Buffer(sender_, this, static_cast<char*>(data), num_bytes)));
168 }
169
170 void DataSourceSender::PendingSend::Done(uint32_t bytes_written) {
171 DCHECK(buffer_in_use_);
172 buffer_in_use_ = false;
173 sender_->Done(bytes_written);
174 }
175
176 void DataSourceSender::PendingSend::DoneWithError(uint32_t bytes_written,
177 int32_t error) {
178 DCHECK(buffer_in_use_);
179 buffer_in_use_ = false;
180 sender_->DoneWithError(bytes_written, error);
181 }
182
183 DataSourceSender::PendingSend::Buffer::Buffer(
184 scoped_refptr<DataSourceSender> sender,
185 PendingSend* send,
186 char* buffer,
187 uint32_t buffer_size)
188 : sender_(sender),
189 pending_send_(send),
190 buffer_(buffer),
191 buffer_size_(buffer_size) {
192 }
193
194 DataSourceSender::PendingSend::Buffer::~Buffer() {
195 if (sender_)
196 pending_send_->Done(0);
197 }
198
199 char* DataSourceSender::PendingSend::Buffer::GetData() {
200 return buffer_;
201 }
202
203 uint32_t DataSourceSender::PendingSend::Buffer::GetSize() {
204 return buffer_size_;
205 }
206
207 void DataSourceSender::PendingSend::Buffer::Done(uint32_t bytes_written) {
208 DCHECK(sender_);
209 pending_send_->Done(bytes_written);
210 sender_ = NULL;
211 pending_send_ = NULL;
212 buffer_ = NULL;
213 buffer_size_ = 0;
214 }
215
216 void DataSourceSender::PendingSend::Buffer::DoneWithError(
217 uint32_t bytes_written,
218 int32_t error) {
219 DCHECK(sender_);
220 pending_send_->DoneWithError(bytes_written, error);
221 sender_ = NULL;
222 pending_send_ = NULL;
223 buffer_ = NULL;
224 buffer_size_ = 0;
225 }
226
227 } // namespace device
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698