Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(309)

Side by Side Diff: device/serial/data_source_sender.cc

Issue 646063003: Change data pipe wrappers used by SerialConnection to use message pipe. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 6 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "device/serial/data_source_sender.h" 5 #include "device/serial/data_source_sender.h"
6 6
7 #include <limits> 7 #include <limits>
8 #include <vector>
8 9
9 #include "base/bind.h" 10 #include "base/bind.h"
10 #include "base/message_loop/message_loop.h" 11 #include "base/message_loop/message_loop.h"
11 #include "device/serial/async_waiter.h"
12 12
13 namespace device { 13 namespace device {
14 14
15 // Represents a send that is not yet fulfilled. 15 // Represents a send that is not yet fulfilled.
16 class DataSourceSender::PendingSend { 16 class DataSourceSender::PendingSend {
17 public: 17 public:
18 PendingSend(DataSourceSender* sender, const ReadyCallback& callback); 18 PendingSend(DataSourceSender* sender, const ReadyCallback& callback);
19 19
20 // Asynchronously fills |data| with up to |num_bytes| of data. Following this, 20 // Asynchronously fills |data| with up to |num_bytes| of data. Following this,
raymes 2014/10/17 04:04:45 |data| is gone!
Sam McNally 2014/10/20 05:12:59 Done.
21 // one of Done() and DoneWithError() will be called with the result. 21 // one of Done() and DoneWithError() will be called with the result.
22 void GetData(void* data, uint32_t num_bytes); 22 void GetData(uint32_t num_bytes);
23
24 void DispatchData(serial::DataSourceClient* client);
raymes 2014/10/17 04:04:45 Please add a comment here.
Sam McNally 2014/10/20 05:12:59 Done.
23 25
24 private: 26 private:
25 class Buffer; 27 class Buffer;
26 // Reports a successful write of |bytes_written|. 28 // Reports a successful write of |bytes_written|.
27 void Done(uint32_t bytes_written); 29 void Done(uint32_t bytes_written);
28 30
29 // Reports a partially successful or unsuccessful write of |bytes_written| 31 // Reports a partially successful or unsuccessful write of |bytes_written|
30 // with an error of |error|. 32 // with an error of |error|.
31 void DoneWithError(uint32_t bytes_written, int32_t error); 33 void DoneWithError(uint32_t bytes_written, int32_t error);
32 34
33 // The DataSourceSender that owns this. 35 // The DataSourceSender that owns this.
34 DataSourceSender* sender_; 36 DataSourceSender* sender_;
35 37
36 // The callback to call to get data. 38 // The callback to call to get data.
37 ReadyCallback callback_; 39 ReadyCallback callback_;
38 40
39 // Whether the buffer specified by GetData() has been passed to |callback_|, 41 // Whether the buffer specified by GetData() has been passed to |callback_|,
40 // but has not yet called Done() or DoneWithError(). 42 // but has not yet called Done() or DoneWithError().
41 bool buffer_in_use_; 43 bool buffer_in_use_;
44
45 std::vector<char> data_;
42 }; 46 };
43 47
44 // A Writable implementation that provides a view of a data pipe owned by a 48 // A Writable implementation that provides a view of a data pipe owned by a
45 // DataSourceSender. 49 // DataSourceSender.
46 class DataSourceSender::PendingSend::Buffer : public WritableBuffer { 50 class DataSourceSender::PendingSend::Buffer : public WritableBuffer {
47 public: 51 public:
48 Buffer(scoped_refptr<DataSourceSender> sender, 52 Buffer(scoped_refptr<DataSourceSender> sender,
49 PendingSend* send, 53 PendingSend* send,
50 char* buffer, 54 char* buffer,
51 uint32_t buffer_size); 55 uint32_t buffer_size);
(...skipping 13 matching lines...) Expand all
65 PendingSend* pending_send_; 69 PendingSend* pending_send_;
66 70
67 char* buffer_; 71 char* buffer_;
68 uint32_t buffer_size_; 72 uint32_t buffer_size_;
69 }; 73 };
70 74
71 DataSourceSender::DataSourceSender(const ReadyCallback& ready_callback, 75 DataSourceSender::DataSourceSender(const ReadyCallback& ready_callback,
72 const ErrorCallback& error_callback) 76 const ErrorCallback& error_callback)
73 : ready_callback_(ready_callback), 77 : ready_callback_(ready_callback),
74 error_callback_(error_callback), 78 error_callback_(error_callback),
75 bytes_sent_(0), 79 available_buffer_capacity_(0),
76 shut_down_(false) { 80 shut_down_(false),
81 weak_factory_(this) {
77 DCHECK(!ready_callback.is_null() && !error_callback.is_null()); 82 DCHECK(!ready_callback.is_null() && !error_callback.is_null());
78 } 83 }
79 84
80 void DataSourceSender::ShutDown() { 85 void DataSourceSender::ShutDown() {
81 shut_down_ = true; 86 shut_down_ = true;
82 waiter_.reset();
83 ready_callback_.Reset(); 87 ready_callback_.Reset();
84 error_callback_.Reset(); 88 error_callback_.Reset();
85 } 89 }
86 90
87 DataSourceSender::~DataSourceSender() { 91 DataSourceSender::~DataSourceSender() {
88 DCHECK(shut_down_); 92 DCHECK(shut_down_);
89 } 93 }
90 94
91 void DataSourceSender::Init(mojo::ScopedDataPipeProducerHandle handle) { 95 void DataSourceSender::Init(uint32_t buffer_size) {
92 // This should never occur. |handle_| is only valid and |pending_send_| is 96 available_buffer_capacity_ = buffer_size;
93 // only set after Init is called. 97 GetMoreData();
94 if (pending_send_ || handle_.is_valid() || shut_down_) {
95 DispatchFatalError();
96 return;
97 }
98 handle_ = handle.Pass();
99 pending_send_.reset(new PendingSend(this, ready_callback_));
100 StartWaiting();
101 } 98 }
102 99
103 void DataSourceSender::Resume() { 100 void DataSourceSender::Resume() {
104 if (pending_send_ || !handle_.is_valid()) { 101 if (pending_send_) {
105 DispatchFatalError(); 102 DispatchFatalError();
106 return; 103 return;
107 } 104 }
108 105
109 pending_send_.reset(new PendingSend(this, ready_callback_)); 106 GetMoreData();
110 StartWaiting(); 107 }
108
109 void DataSourceSender::AckData(uint32_t bytes_dispatched) {
110 available_buffer_capacity_ += bytes_dispatched;
111 } 111 }
112 112
113 void DataSourceSender::OnConnectionError() { 113 void DataSourceSender::OnConnectionError() {
114 DispatchFatalError(); 114 DispatchFatalError();
115 } 115 }
116 116
117 void DataSourceSender::StartWaiting() { 117 void DataSourceSender::GetMoreData() {
118 DCHECK(pending_send_ && !waiter_); 118 DCHECK(!pending_send_ && !shut_down_);
119 waiter_.reset( 119 pending_send_.reset(new PendingSend(this, ready_callback_));
120 new AsyncWaiter(handle_.get(), 120 pending_send_->GetData(available_buffer_capacity_);
raymes 2014/10/17 04:04:45 What if the available buffer capacity is 0? Don't
Sam McNally 2014/10/20 05:12:59 Done.
121 MOJO_HANDLE_SIGNAL_WRITABLE,
122 base::Bind(&DataSourceSender::OnDoneWaiting, this)));
123 }
124
125 void DataSourceSender::OnDoneWaiting(MojoResult result) {
126 DCHECK(pending_send_ && !shut_down_ && waiter_);
127 waiter_.reset();
128 if (result != MOJO_RESULT_OK) {
129 DispatchFatalError();
130 return;
131 }
132 void* data = NULL;
133 uint32_t num_bytes = std::numeric_limits<uint32_t>::max();
134 result = mojo::BeginWriteDataRaw(
135 handle_.get(), &data, &num_bytes, MOJO_WRITE_DATA_FLAG_NONE);
136 if (result != MOJO_RESULT_OK) {
137 DispatchFatalError();
138 return;
139 }
140 pending_send_->GetData(static_cast<char*>(data), num_bytes);
141 } 121 }
142 122
143 void DataSourceSender::Done(uint32_t bytes_written) { 123 void DataSourceSender::Done(uint32_t bytes_written) {
144 DoneInternal(bytes_written); 124 DoneInternal(bytes_written);
145 if (!shut_down_) 125 if (!shut_down_) {
146 StartWaiting(); 126 base::MessageLoop::current()->PostTask(
127 FROM_HERE,
128 base::Bind(&DataSourceSender::GetMoreData, weak_factory_.GetWeakPtr()));
129 }
147 } 130 }
148 131
149 void DataSourceSender::DoneWithError(uint32_t bytes_written, int32_t error) { 132 void DataSourceSender::DoneWithError(uint32_t bytes_written, int32_t error) {
150 DoneInternal(bytes_written); 133 DoneInternal(bytes_written);
151 pending_send_.reset(); 134 pending_send_.reset();
152 if (!shut_down_) 135 if (!shut_down_)
153 client()->OnError(bytes_sent_, error); 136 client()->OnError(error);
154 // We don't call StartWaiting here so we don't send any additional data until 137 // We don't call GetMoreData here so we don't send any additional data until
155 // Resume() is called. 138 // Resume() is called.
156 } 139 }
157 140
158 void DataSourceSender::DoneInternal(uint32_t bytes_written) { 141 void DataSourceSender::DoneInternal(uint32_t bytes_written) {
159 DCHECK(pending_send_); 142 DCHECK(pending_send_);
160 if (shut_down_) 143 if (shut_down_)
161 return; 144 return;
162 145
163 bytes_sent_ += bytes_written; 146 available_buffer_capacity_ -= bytes_written;
164 MojoResult result = mojo::EndWriteDataRaw(handle_.get(), bytes_written); 147 pending_send_->DispatchData(client());
raymes 2014/10/17 04:04:45 Rather than passing the client to the pending send
Sam McNally 2014/10/20 05:12:59 Done.
165 if (result != MOJO_RESULT_OK) { 148 pending_send_.reset();
166 DispatchFatalError();
167 return;
168 }
169 } 149 }
170 150
171 void DataSourceSender::DispatchFatalError() { 151 void DataSourceSender::DispatchFatalError() {
172 if (shut_down_) 152 if (shut_down_)
173 return; 153 return;
174 154
175 error_callback_.Run(); 155 error_callback_.Run();
176 ShutDown(); 156 ShutDown();
177 } 157 }
178 158
179 DataSourceSender::PendingSend::PendingSend(DataSourceSender* sender, 159 DataSourceSender::PendingSend::PendingSend(DataSourceSender* sender,
180 const ReadyCallback& callback) 160 const ReadyCallback& callback)
181 : sender_(sender), callback_(callback), buffer_in_use_(false) { 161 : sender_(sender), callback_(callback), buffer_in_use_(false) {
182 } 162 }
183 163
184 void DataSourceSender::PendingSend::GetData(void* data, uint32_t num_bytes) { 164 void DataSourceSender::PendingSend::GetData(uint32_t num_bytes) {
185 DCHECK(!buffer_in_use_); 165 DCHECK(!buffer_in_use_);
186 buffer_in_use_ = true; 166 buffer_in_use_ = true;
167 data_.resize(num_bytes);
187 callback_.Run(scoped_ptr<WritableBuffer>( 168 callback_.Run(scoped_ptr<WritableBuffer>(
188 new Buffer(sender_, this, static_cast<char*>(data), num_bytes))); 169 new Buffer(sender_, this, &data_[0], num_bytes)));
raymes 2014/10/17 04:04:45 We should be careful of a 0 buffer size here.
Sam McNally 2014/10/20 05:12:59 Done.
170 }
171
172 void DataSourceSender::PendingSend::DispatchData(
173 serial::DataSourceClient* client) {
174 if (data_.empty())
175 return;
176
177 mojo::Array<uint8_t> data(data_.size());
178 memcpy(&data[0], reinterpret_cast<uint8_t*>(&data_[0]), data.size());
179 client->OnData(data.Pass());
189 } 180 }
190 181
191 void DataSourceSender::PendingSend::Done(uint32_t bytes_written) { 182 void DataSourceSender::PendingSend::Done(uint32_t bytes_written) {
192 DCHECK(buffer_in_use_); 183 DCHECK(buffer_in_use_);
184 DCHECK_LE(bytes_written, data_.size());
193 buffer_in_use_ = false; 185 buffer_in_use_ = false;
186 data_.resize(bytes_written);
194 sender_->Done(bytes_written); 187 sender_->Done(bytes_written);
195 } 188 }
196 189
197 void DataSourceSender::PendingSend::DoneWithError(uint32_t bytes_written, 190 void DataSourceSender::PendingSend::DoneWithError(uint32_t bytes_written,
198 int32_t error) { 191 int32_t error) {
199 DCHECK(buffer_in_use_); 192 DCHECK(buffer_in_use_);
193 DCHECK_LE(bytes_written, data_.size());
194 data_.resize(bytes_written);
200 buffer_in_use_ = false; 195 buffer_in_use_ = false;
raymes 2014/10/17 04:04:45 nit: ordering of these 3 lines should be the same
Sam McNally 2014/10/20 05:12:59 Done.
201 sender_->DoneWithError(bytes_written, error); 196 sender_->DoneWithError(bytes_written, error);
202 } 197 }
203 198
204 DataSourceSender::PendingSend::Buffer::Buffer( 199 DataSourceSender::PendingSend::Buffer::Buffer(
205 scoped_refptr<DataSourceSender> sender, 200 scoped_refptr<DataSourceSender> sender,
206 PendingSend* send, 201 PendingSend* send,
207 char* buffer, 202 char* buffer,
208 uint32_t buffer_size) 203 uint32_t buffer_size)
209 : sender_(sender), 204 : sender_(sender),
210 pending_send_(send), 205 pending_send_(send),
211 buffer_(buffer), 206 buffer_(buffer),
212 buffer_size_(buffer_size) { 207 buffer_size_(buffer_size) {
213 } 208 }
214 209
215 DataSourceSender::PendingSend::Buffer::~Buffer() { 210 DataSourceSender::PendingSend::Buffer::~Buffer() {
216 if (sender_.get()) 211 if (sender_.get())
217 pending_send_->Done(0); 212 pending_send_->Done(0);
218 } 213 }
219 214
220 char* DataSourceSender::PendingSend::Buffer::GetData() { 215 char* DataSourceSender::PendingSend::Buffer::GetData() {
221 return buffer_; 216 return buffer_;
222 } 217 }
223 218
224 uint32_t DataSourceSender::PendingSend::Buffer::GetSize() { 219 uint32_t DataSourceSender::PendingSend::Buffer::GetSize() {
225 return buffer_size_; 220 return buffer_size_;
226 } 221 }
227 222
228 void DataSourceSender::PendingSend::Buffer::Done(uint32_t bytes_written) { 223 void DataSourceSender::PendingSend::Buffer::Done(uint32_t bytes_written) {
229 DCHECK(sender_.get()); 224 DCHECK(sender_.get());
230 pending_send_->Done(bytes_written); 225 PendingSend* send = pending_send_;
231 sender_ = NULL; 226 pending_send_ = nullptr;
232 pending_send_ = NULL; 227 sender_ = nullptr;
228 send->Done(bytes_written);
233 buffer_ = NULL; 229 buffer_ = NULL;
234 buffer_size_ = 0; 230 buffer_size_ = 0;
235 } 231 }
236 232
237 void DataSourceSender::PendingSend::Buffer::DoneWithError( 233 void DataSourceSender::PendingSend::Buffer::DoneWithError(
238 uint32_t bytes_written, 234 uint32_t bytes_written,
239 int32_t error) { 235 int32_t error) {
240 DCHECK(sender_.get()); 236 DCHECK(sender_.get());
241 pending_send_->DoneWithError(bytes_written, error); 237 PendingSend* send = pending_send_;
242 sender_ = NULL; 238 pending_send_ = nullptr;
243 pending_send_ = NULL; 239 sender_ = nullptr;
240 send->DoneWithError(bytes_written, error);
244 buffer_ = NULL; 241 buffer_ = NULL;
245 buffer_size_ = 0; 242 buffer_size_ = 0;
246 } 243 }
247 244
248 } // namespace device 245 } // namespace device
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698