Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(97)

Side by Side Diff: device/serial/data_source_sender.cc

Issue 646063003: Change data pipe wrappers used by SerialConnection to use message pipe. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: split out bug fix Created 6 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "device/serial/data_source_sender.h" 5 #include "device/serial/data_source_sender.h"
6 6
7 #include <algorithm>
7 #include <limits> 8 #include <limits>
8 9
9 #include "base/bind.h" 10 #include "base/bind.h"
10 #include "base/message_loop/message_loop.h" 11 #include "base/message_loop/message_loop.h"
11 #include "device/serial/async_waiter.h"
12 12
13 namespace device { 13 namespace device {
14 14
15 // Represents a send that is not yet fulfilled. 15 // Represents a send that is not yet fulfilled.
16 class DataSourceSender::PendingSend { 16 class DataSourceSender::PendingSend {
17 public: 17 public:
18 PendingSend(DataSourceSender* sender, const ReadyCallback& callback); 18 PendingSend(DataSourceSender* sender, const ReadyCallback& callback);
19 19
20 // Asynchronously fills |data| with up to |num_bytes| of data. Following this, 20 // Asynchronously fills |data_| with up to |num_bytes| of data. Following
21 // one of Done() and DoneWithError() will be called with the result. 21 // this, one of Done() and DoneWithError() will be called with the result.
22 void GetData(void* data, uint32_t num_bytes); 22 void GetData(uint32_t num_bytes);
23 23
24 private: 24 private:
25 class Buffer; 25 class Buffer;
26 // Reports a successful write of |bytes_written|. 26 // Reports a successful write of |bytes_written|.
27 void Done(uint32_t bytes_written); 27 void Done(uint32_t bytes_written);
28 28
29 // Reports a partially successful or unsuccessful write of |bytes_written| 29 // Reports a partially successful or unsuccessful write of |bytes_written|
30 // with an error of |error|. 30 // with an error of |error|.
31 void DoneWithError(uint32_t bytes_written, int32_t error); 31 void DoneWithError(uint32_t bytes_written, int32_t error);
32 32
33 // The DataSourceSender that owns this. 33 // The DataSourceSender that owns this.
34 DataSourceSender* sender_; 34 DataSourceSender* sender_;
35 35
36 // The callback to call to get data. 36 // The callback to call to get data.
37 ReadyCallback callback_; 37 ReadyCallback callback_;
38 38
39 // Whether the buffer specified by GetData() has been passed to |callback_|, 39 // Whether the buffer specified by GetData() has been passed to |callback_|,
40 // but has not yet called Done() or DoneWithError(). 40 // but has not yet called Done() or DoneWithError().
41 bool buffer_in_use_; 41 bool buffer_in_use_;
42
43 // The data obtained using |callback_| to be dispatched to the client.
44 std::vector<char> data_;
42 }; 45 };
43 46
44 // A Writable implementation that provides a view of a data pipe owned by a 47 // A Writable implementation that provides a view of a buffer owned by a
45 // DataSourceSender. 48 // DataSourceSender.
46 class DataSourceSender::PendingSend::Buffer : public WritableBuffer { 49 class DataSourceSender::PendingSend::Buffer : public WritableBuffer {
47 public: 50 public:
48 Buffer(scoped_refptr<DataSourceSender> sender, 51 Buffer(scoped_refptr<DataSourceSender> sender,
49 PendingSend* send, 52 PendingSend* send,
50 char* buffer, 53 char* buffer,
51 uint32_t buffer_size); 54 uint32_t buffer_size);
52 ~Buffer() override; 55 ~Buffer() override;
53 56
54 // WritableBuffer overrides. 57 // WritableBuffer overrides.
55 char* GetData() override; 58 char* GetData() override;
56 uint32_t GetSize() override; 59 uint32_t GetSize() override;
57 void Done(uint32_t bytes_written) override; 60 void Done(uint32_t bytes_written) override;
58 void DoneWithError(uint32_t bytes_written, int32_t error) override; 61 void DoneWithError(uint32_t bytes_written, int32_t error) override;
59 62
60 private: 63 private:
61 // The DataSourceSender whose data pipe we are providing a view. 64 // The DataSourceSender of whose buffer we are providing a view.
62 scoped_refptr<DataSourceSender> sender_; 65 scoped_refptr<DataSourceSender> sender_;
63 66
64 // The PendingSend to which this buffer has been created in response. 67 // The PendingSend to which this buffer has been created in response.
65 PendingSend* pending_send_; 68 PendingSend* pending_send_;
66 69
67 char* buffer_; 70 char* buffer_;
68 uint32_t buffer_size_; 71 uint32_t buffer_size_;
69 }; 72 };
70 73
71 DataSourceSender::DataSourceSender(const ReadyCallback& ready_callback, 74 DataSourceSender::DataSourceSender(const ReadyCallback& ready_callback,
72 const ErrorCallback& error_callback) 75 const ErrorCallback& error_callback)
73 : ready_callback_(ready_callback), 76 : ready_callback_(ready_callback),
74 error_callback_(error_callback), 77 error_callback_(error_callback),
75 bytes_sent_(0), 78 available_buffer_capacity_(0),
76 shut_down_(false) { 79 paused_(false),
80 shut_down_(false),
81 weak_factory_(this) {
77 DCHECK(!ready_callback.is_null() && !error_callback.is_null()); 82 DCHECK(!ready_callback.is_null() && !error_callback.is_null());
78 } 83 }
79 84
80 void DataSourceSender::ShutDown() { 85 void DataSourceSender::ShutDown() {
81 shut_down_ = true; 86 shut_down_ = true;
82 waiter_.reset();
83 ready_callback_.Reset(); 87 ready_callback_.Reset();
84 error_callback_.Reset(); 88 error_callback_.Reset();
85 } 89 }
86 90
87 DataSourceSender::~DataSourceSender() { 91 DataSourceSender::~DataSourceSender() {
88 DCHECK(shut_down_); 92 DCHECK(shut_down_);
89 } 93 }
90 94
91 void DataSourceSender::Init(mojo::ScopedDataPipeProducerHandle handle) { 95 void DataSourceSender::Init(uint32_t buffer_size) {
92 // This should never occur. |handle_| is only valid and |pending_send_| is 96 available_buffer_capacity_ = buffer_size;
93 // only set after Init is called. 97 GetMoreData();
94 if (pending_send_ || handle_.is_valid() || shut_down_) {
95 DispatchFatalError();
96 return;
97 }
98 handle_ = handle.Pass();
99 pending_send_.reset(new PendingSend(this, ready_callback_));
100 StartWaiting();
101 } 98 }
102 99
103 void DataSourceSender::Resume() { 100 void DataSourceSender::Resume() {
104 if (pending_send_ || !handle_.is_valid()) { 101 if (pending_send_) {
105 DispatchFatalError(); 102 DispatchFatalError();
106 return; 103 return;
107 } 104 }
108 105
109 pending_send_.reset(new PendingSend(this, ready_callback_)); 106 paused_ = false;
110 StartWaiting(); 107 GetMoreData();
108 }
109
110 void DataSourceSender::ReportBytesReceived(uint32_t bytes_sent) {
111 available_buffer_capacity_ += bytes_sent;
112 if (!pending_send_ && !paused_)
113 GetMoreData();
111 } 114 }
112 115
113 void DataSourceSender::OnConnectionError() { 116 void DataSourceSender::OnConnectionError() {
114 DispatchFatalError(); 117 DispatchFatalError();
115 } 118 }
116 119
117 void DataSourceSender::StartWaiting() { 120 void DataSourceSender::GetMoreData() {
118 DCHECK(pending_send_ && !waiter_); 121 if (shut_down_ || paused_ || pending_send_ || !available_buffer_capacity_)
119 waiter_.reset( 122 return;
120 new AsyncWaiter(handle_.get(), 123
121 MOJO_HANDLE_SIGNAL_WRITABLE, 124 pending_send_.reset(new PendingSend(this, ready_callback_));
122 base::Bind(&DataSourceSender::OnDoneWaiting, this))); 125 pending_send_->GetData(available_buffer_capacity_);
123 } 126 }
124 127
125 void DataSourceSender::OnDoneWaiting(MojoResult result) { 128 void DataSourceSender::Done(const std::vector<char>& data) {
126 DCHECK(pending_send_ && !shut_down_ && waiter_); 129 DoneInternal(data);
127 waiter_.reset(); 130 if (!shut_down_ && available_buffer_capacity_) {
128 if (result != MOJO_RESULT_OK) { 131 base::MessageLoop::current()->PostTask(
129 DispatchFatalError(); 132 FROM_HERE,
130 return; 133 base::Bind(&DataSourceSender::GetMoreData, weak_factory_.GetWeakPtr()));
131 } 134 }
132 void* data = NULL;
133 uint32_t num_bytes = std::numeric_limits<uint32_t>::max();
134 result = mojo::BeginWriteDataRaw(
135 handle_.get(), &data, &num_bytes, MOJO_WRITE_DATA_FLAG_NONE);
136 if (result != MOJO_RESULT_OK) {
137 DispatchFatalError();
138 return;
139 }
140 pending_send_->GetData(static_cast<char*>(data), num_bytes);
141 } 135 }
142 136
143 void DataSourceSender::Done(uint32_t bytes_written) { 137 void DataSourceSender::DoneWithError(const std::vector<char>& data,
144 DoneInternal(bytes_written); 138 int32_t error) {
139 DoneInternal(data);
145 if (!shut_down_) 140 if (!shut_down_)
146 StartWaiting(); 141 client()->OnError(error);
147 } 142 paused_ = true;
148 143 // We don't call GetMoreData here so we don't send any additional data until
149 void DataSourceSender::DoneWithError(uint32_t bytes_written, int32_t error) {
150 DoneInternal(bytes_written);
151 pending_send_.reset();
152 if (!shut_down_)
153 client()->OnError(bytes_sent_, error);
154 // We don't call StartWaiting here so we don't send any additional data until
155 // Resume() is called. 144 // Resume() is called.
156 } 145 }
157 146
158 void DataSourceSender::DoneInternal(uint32_t bytes_written) { 147 void DataSourceSender::DoneInternal(const std::vector<char>& data) {
159 DCHECK(pending_send_); 148 DCHECK(pending_send_);
160 if (shut_down_) 149 if (shut_down_)
161 return; 150 return;
162 151
163 bytes_sent_ += bytes_written; 152 available_buffer_capacity_ -= data.size();
164 MojoResult result = mojo::EndWriteDataRaw(handle_.get(), bytes_written); 153 if (!data.empty()) {
165 if (result != MOJO_RESULT_OK) { 154 mojo::Array<uint8_t> data_to_send(data.size());
166 DispatchFatalError(); 155 std::copy(data.begin(), data.end(), &data_to_send[0]);
167 return; 156 client()->OnData(data_to_send.Pass());
168 } 157 }
158 pending_send_.reset();
169 } 159 }
170 160
171 void DataSourceSender::DispatchFatalError() { 161 void DataSourceSender::DispatchFatalError() {
172 if (shut_down_) 162 if (shut_down_)
173 return; 163 return;
174 164
175 error_callback_.Run(); 165 error_callback_.Run();
176 ShutDown(); 166 ShutDown();
177 } 167 }
178 168
179 DataSourceSender::PendingSend::PendingSend(DataSourceSender* sender, 169 DataSourceSender::PendingSend::PendingSend(DataSourceSender* sender,
180 const ReadyCallback& callback) 170 const ReadyCallback& callback)
181 : sender_(sender), callback_(callback), buffer_in_use_(false) { 171 : sender_(sender), callback_(callback), buffer_in_use_(false) {
182 } 172 }
183 173
184 void DataSourceSender::PendingSend::GetData(void* data, uint32_t num_bytes) { 174 void DataSourceSender::PendingSend::GetData(uint32_t num_bytes) {
175 DCHECK(num_bytes);
185 DCHECK(!buffer_in_use_); 176 DCHECK(!buffer_in_use_);
186 buffer_in_use_ = true; 177 buffer_in_use_ = true;
178 data_.resize(num_bytes);
187 callback_.Run(scoped_ptr<WritableBuffer>( 179 callback_.Run(scoped_ptr<WritableBuffer>(
188 new Buffer(sender_, this, static_cast<char*>(data), num_bytes))); 180 new Buffer(sender_, this, &data_[0], num_bytes)));
189 } 181 }
190 182
191 void DataSourceSender::PendingSend::Done(uint32_t bytes_written) { 183 void DataSourceSender::PendingSend::Done(uint32_t bytes_written) {
192 DCHECK(buffer_in_use_); 184 DCHECK(buffer_in_use_);
185 DCHECK_LE(bytes_written, data_.size());
193 buffer_in_use_ = false; 186 buffer_in_use_ = false;
194 sender_->Done(bytes_written); 187 data_.resize(bytes_written);
188 sender_->Done(data_);
195 } 189 }
196 190
197 void DataSourceSender::PendingSend::DoneWithError(uint32_t bytes_written, 191 void DataSourceSender::PendingSend::DoneWithError(uint32_t bytes_written,
198 int32_t error) { 192 int32_t error) {
199 DCHECK(buffer_in_use_); 193 DCHECK(buffer_in_use_);
194 DCHECK_LE(bytes_written, data_.size());
200 buffer_in_use_ = false; 195 buffer_in_use_ = false;
201 sender_->DoneWithError(bytes_written, error); 196 data_.resize(bytes_written);
197 sender_->DoneWithError(data_, error);
202 } 198 }
203 199
204 DataSourceSender::PendingSend::Buffer::Buffer( 200 DataSourceSender::PendingSend::Buffer::Buffer(
205 scoped_refptr<DataSourceSender> sender, 201 scoped_refptr<DataSourceSender> sender,
206 PendingSend* send, 202 PendingSend* send,
207 char* buffer, 203 char* buffer,
208 uint32_t buffer_size) 204 uint32_t buffer_size)
209 : sender_(sender), 205 : sender_(sender),
210 pending_send_(send), 206 pending_send_(send),
211 buffer_(buffer), 207 buffer_(buffer),
212 buffer_size_(buffer_size) { 208 buffer_size_(buffer_size) {
213 } 209 }
214 210
215 DataSourceSender::PendingSend::Buffer::~Buffer() { 211 DataSourceSender::PendingSend::Buffer::~Buffer() {
216 if (sender_.get()) 212 if (pending_send_)
217 pending_send_->Done(0); 213 pending_send_->Done(0);
218 } 214 }
219 215
220 char* DataSourceSender::PendingSend::Buffer::GetData() { 216 char* DataSourceSender::PendingSend::Buffer::GetData() {
221 return buffer_; 217 return buffer_;
222 } 218 }
223 219
224 uint32_t DataSourceSender::PendingSend::Buffer::GetSize() { 220 uint32_t DataSourceSender::PendingSend::Buffer::GetSize() {
225 return buffer_size_; 221 return buffer_size_;
226 } 222 }
227 223
228 void DataSourceSender::PendingSend::Buffer::Done(uint32_t bytes_written) { 224 void DataSourceSender::PendingSend::Buffer::Done(uint32_t bytes_written) {
229 DCHECK(sender_.get()); 225 DCHECK(sender_.get());
230 pending_send_->Done(bytes_written); 226 PendingSend* send = pending_send_;
231 sender_ = NULL; 227 pending_send_ = nullptr;
232 pending_send_ = NULL; 228 send->Done(bytes_written);
233 buffer_ = NULL; 229 sender_ = nullptr;
234 buffer_size_ = 0;
235 } 230 }
236 231
237 void DataSourceSender::PendingSend::Buffer::DoneWithError( 232 void DataSourceSender::PendingSend::Buffer::DoneWithError(
238 uint32_t bytes_written, 233 uint32_t bytes_written,
239 int32_t error) { 234 int32_t error) {
240 DCHECK(sender_.get()); 235 DCHECK(sender_.get());
241 pending_send_->DoneWithError(bytes_written, error); 236 PendingSend* send = pending_send_;
242 sender_ = NULL; 237 pending_send_ = nullptr;
243 pending_send_ = NULL; 238 send->DoneWithError(bytes_written, error);
244 buffer_ = NULL; 239 sender_ = nullptr;
245 buffer_size_ = 0;
246 } 240 }
247 241
248 } // namespace device 242 } // namespace device
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698