Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(674)

Side by Side Diff: device/serial/data_sender.cc

Issue 889283002: Remove Client= from device/serial/data_stream.mojom. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: This time without racing message pipes Created 5 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « device/serial/data_sender.h ('k') | device/serial/data_sink_receiver.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "device/serial/data_sender.h" 5 #include "device/serial/data_sender.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 8
9 #include "base/bind.h" 9 #include "base/bind.h"
10 #include "base/message_loop/message_loop.h" 10 #include "base/message_loop/message_loop.h"
11 11
12 namespace device { 12 namespace device {
13 13
14 // Represents a send that is not yet fulfilled. 14 // Represents a send that is not yet fulfilled.
15 class DataSender::PendingSend { 15 class DataSender::PendingSend {
16 public: 16 public:
17 PendingSend(const base::StringPiece& data, 17 PendingSend(const base::StringPiece& data,
18 const DataSentCallback& callback, 18 const DataSentCallback& callback,
19 const SendErrorCallback& error_callback, 19 const SendErrorCallback& error_callback,
20 int32_t fatal_error_value); 20 DataSender* sender);
21 21
22 // Invoked to report that |num_bytes| of data have been sent. Subtracts the 22 // Reports |fatal_error_value_| to |receive_error_callback_|.
23 // number of bytes that were part of this send from |num_bytes|. Returns 23 void DispatchFatalError();
24 // whether this send has been completed. If this send has been completed, this
25 // calls |callback_|.
26 bool ReportBytesSent(uint32_t* num_bytes);
27 24
25 // Attempts to send any data not yet sent to |sink|.
26 void SendData();
27
28 private:
28 // Invoked to report that |num_bytes| of data have been sent and then an 29 // Invoked to report that |num_bytes| of data have been sent and then an
29 // error, |error| was encountered. Subtracts the number of bytes that were 30 // error, |error| was encountered. Subtracts the number of bytes that were
30 // part of this send from |num_bytes|. If this send was not completed before 31 // part of this send from |num_bytes|. If this send was not completed before
31 // the error, this calls |error_callback_| to report the error. Otherwise, 32 // the error, this calls |error_callback_| to report the error. Otherwise,
32 // this calls |callback_|. Returns the number of bytes sent but not acked. 33 // this calls |callback_|. Returns the number of bytes sent but not acked.
33 uint32_t ReportBytesSentAndError(uint32_t* num_bytes, int32_t error); 34 void OnDataSent(uint32_t num_bytes, int32_t error);
34
35 // Reports |fatal_error_value_| to |receive_error_callback_|.
36 void DispatchFatalError();
37
38 // Attempts to send any data not yet sent to |sink|.
39 bool SendData(serial::DataSink* sink, uint32_t* available_buffer_size);
40
41 private:
42 // Invoked to update |bytes_acked_| and |num_bytes|.
43 void ReportBytesSentInternal(uint32_t* num_bytes);
44 35
45 // The data to send. 36 // The data to send.
46 const base::StringPiece data_; 37 const base::StringPiece data_;
47 38
48 // The callback to report success. 39 // The callback to report success.
49 const DataSentCallback callback_; 40 const DataSentCallback callback_;
50 41
51 // The callback to report errors. 42 // The callback to report errors.
52 const SendErrorCallback error_callback_; 43 const SendErrorCallback error_callback_;
53 44
54 // The error value to report when DispatchFatalError() is called. 45 // The DataSender that owns this PendingSend.
55 const int32_t fatal_error_value_; 46 DataSender* sender_;
56
57 // The number of bytes sent to the DataSink.
58 uint32_t bytes_sent_;
59
60 // The number of bytes acked.
61 uint32_t bytes_acked_;
62 }; 47 };
63 48
64 DataSender::DataSender(mojo::InterfacePtr<serial::DataSink> sink, 49 DataSender::DataSender(mojo::InterfacePtr<serial::DataSink> sink,
65 uint32_t buffer_size, 50 uint32_t buffer_size,
66 int32_t fatal_error_value) 51 int32_t fatal_error_value)
67 : sink_(sink.Pass()), 52 : sink_(sink.Pass()),
68 fatal_error_value_(fatal_error_value), 53 fatal_error_value_(fatal_error_value),
69 available_buffer_capacity_(buffer_size),
70 shut_down_(false) { 54 shut_down_(false) {
71 sink_.set_error_handler(this); 55 sink_.set_error_handler(this);
72 sink_.set_client(this);
73 sink_->Init(buffer_size);
74 } 56 }
75 57
76 DataSender::~DataSender() { 58 DataSender::~DataSender() {
77 ShutDown(); 59 ShutDown();
78 } 60 }
79 61
80 bool DataSender::Send(const base::StringPiece& data, 62 bool DataSender::Send(const base::StringPiece& data,
81 const DataSentCallback& callback, 63 const DataSentCallback& callback,
82 const SendErrorCallback& error_callback) { 64 const SendErrorCallback& error_callback) {
83 DCHECK(!callback.is_null() && !error_callback.is_null()); 65 DCHECK(!callback.is_null() && !error_callback.is_null());
84 if (!pending_cancel_.is_null() || shut_down_) 66 if (!pending_cancel_.is_null() || shut_down_)
85 return false; 67 return false;
86 68
87 pending_sends_.push(linked_ptr<PendingSend>( 69 linked_ptr<PendingSend> pending_send(
88 new PendingSend(data, callback, error_callback, fatal_error_value_))); 70 new PendingSend(data, callback, error_callback, this));
89 SendInternal(); 71 pending_send->SendData();
72 sends_awaiting_ack_.push(pending_send);
90 return true; 73 return true;
91 } 74 }
92 75
93 bool DataSender::Cancel(int32_t error, const CancelCallback& callback) { 76 bool DataSender::Cancel(int32_t error, const CancelCallback& callback) {
94 DCHECK(!callback.is_null()); 77 DCHECK(!callback.is_null());
95 if (!pending_cancel_.is_null() || shut_down_) 78 if (!pending_cancel_.is_null() || shut_down_)
96 return false; 79 return false;
97 if (pending_sends_.empty() && sends_awaiting_ack_.empty()) { 80 if (sends_awaiting_ack_.empty()) {
98 base::MessageLoop::current()->PostTask(FROM_HERE, callback); 81 base::MessageLoop::current()->PostTask(FROM_HERE, callback);
99 return true; 82 return true;
100 } 83 }
101 84
102 pending_cancel_ = callback; 85 pending_cancel_ = callback;
103 sink_->Cancel(error); 86 sink_->Cancel(error);
104 return true; 87 return true;
105 } 88 }
106 89
107 void DataSender::ReportBytesSent(uint32_t bytes_sent) { 90 void DataSender::SendComplete() {
108 if (shut_down_) 91 if (shut_down_)
109 return; 92 return;
110 93
111 available_buffer_capacity_ += bytes_sent; 94 DCHECK(!sends_awaiting_ack_.empty());
112 while (bytes_sent != 0 && !sends_awaiting_ack_.empty() && 95 sends_awaiting_ack_.pop();
113 sends_awaiting_ack_.front()->ReportBytesSent(&bytes_sent)) { 96 if (sends_awaiting_ack_.empty())
114 sends_awaiting_ack_.pop();
115 }
116 if (bytes_sent > 0 && !pending_sends_.empty()) {
117 bool finished = pending_sends_.front()->ReportBytesSent(&bytes_sent);
118 DCHECK(!finished);
119 if (finished) {
120 ShutDown();
121 return;
122 }
123 }
124 if (bytes_sent != 0) {
125 ShutDown();
126 return;
127 }
128 if (pending_sends_.empty() && sends_awaiting_ack_.empty())
129 RunCancelCallback(); 97 RunCancelCallback();
130 SendInternal();
131 } 98 }
132 99
133 void DataSender::ReportBytesSentAndError( 100 void DataSender::SendFailed(int32_t error) {
134 uint32_t bytes_sent,
135 int32_t error,
136 const mojo::Callback<void()>& callback) {
137 if (shut_down_) 101 if (shut_down_)
138 return; 102 return;
139 103
140 available_buffer_capacity_ += bytes_sent; 104 DCHECK(!sends_awaiting_ack_.empty());
141 while (!sends_awaiting_ack_.empty()) { 105 sends_awaiting_ack_.pop();
142 available_buffer_capacity_ += 106 if (!sends_awaiting_ack_.empty())
143 sends_awaiting_ack_.front()->ReportBytesSentAndError(&bytes_sent, 107 return;
144 error); 108 sink_->ClearError();
145 sends_awaiting_ack_.pop();
146 }
147 while (!pending_sends_.empty()) {
148 available_buffer_capacity_ +=
149 pending_sends_.front()->ReportBytesSentAndError(&bytes_sent, error);
150 pending_sends_.pop();
151 }
152 callback.Run();
153 RunCancelCallback(); 109 RunCancelCallback();
154 } 110 }
155 111
156 void DataSender::OnConnectionError() { 112 void DataSender::OnConnectionError() {
157 ShutDown(); 113 ShutDown();
158 } 114 }
159 115
160 void DataSender::SendInternal() {
161 while (!pending_sends_.empty() && available_buffer_capacity_) {
162 if (pending_sends_.front()->SendData(sink_.get(),
163 &available_buffer_capacity_)) {
164 sends_awaiting_ack_.push(pending_sends_.front());
165 pending_sends_.pop();
166 }
167 }
168 }
169
170 void DataSender::RunCancelCallback() { 116 void DataSender::RunCancelCallback() {
171 DCHECK(pending_sends_.empty() && sends_awaiting_ack_.empty()); 117 DCHECK(sends_awaiting_ack_.empty());
172 if (pending_cancel_.is_null()) 118 if (pending_cancel_.is_null())
173 return; 119 return;
174 120
175 base::MessageLoop::current()->PostTask(FROM_HERE, 121 base::MessageLoop::current()->PostTask(FROM_HERE,
176 base::Bind(pending_cancel_)); 122 base::Bind(pending_cancel_));
177 pending_cancel_.Reset(); 123 pending_cancel_.Reset();
178 } 124 }
179 125
180 void DataSender::ShutDown() { 126 void DataSender::ShutDown() {
181 shut_down_ = true; 127 shut_down_ = true;
182 while (!pending_sends_.empty()) {
183 pending_sends_.front()->DispatchFatalError();
184 pending_sends_.pop();
185 }
186 while (!sends_awaiting_ack_.empty()) { 128 while (!sends_awaiting_ack_.empty()) {
187 sends_awaiting_ack_.front()->DispatchFatalError(); 129 sends_awaiting_ack_.front()->DispatchFatalError();
188 sends_awaiting_ack_.pop(); 130 sends_awaiting_ack_.pop();
189 } 131 }
190 RunCancelCallback(); 132 RunCancelCallback();
191 } 133 }
192 134
193 DataSender::PendingSend::PendingSend(const base::StringPiece& data, 135 DataSender::PendingSend::PendingSend(const base::StringPiece& data,
194 const DataSentCallback& callback, 136 const DataSentCallback& callback,
195 const SendErrorCallback& error_callback, 137 const SendErrorCallback& error_callback,
196 int32_t fatal_error_value) 138 DataSender* sender)
197 : data_(data), 139 : data_(data),
198 callback_(callback), 140 callback_(callback),
199 error_callback_(error_callback), 141 error_callback_(error_callback),
200 fatal_error_value_(fatal_error_value), 142 sender_(sender) {
201 bytes_sent_(0),
202 bytes_acked_(0) {
203 } 143 }
204 144
205 bool DataSender::PendingSend::ReportBytesSent(uint32_t* num_bytes) { 145 void DataSender::PendingSend::OnDataSent(uint32_t num_bytes, int32_t error) {
206 ReportBytesSentInternal(num_bytes); 146 if (error) {
207 if (bytes_acked_ < data_.size()) 147 base::MessageLoop::current()->PostTask(
208 return false; 148 FROM_HERE, base::Bind(error_callback_, num_bytes, error));
209 149 sender_->SendFailed(error);
210 base::MessageLoop::current()->PostTask(FROM_HERE, 150 } else {
211 base::Bind(callback_, bytes_acked_)); 151 DCHECK(num_bytes == data_.size());
212 return true;
213 }
214
215 uint32_t DataSender::PendingSend::ReportBytesSentAndError(uint32_t* num_bytes,
216 int32_t error) {
217 ReportBytesSentInternal(num_bytes);
218 if (*num_bytes > 0) {
219 base::MessageLoop::current()->PostTask(FROM_HERE, 152 base::MessageLoop::current()->PostTask(FROM_HERE,
220 base::Bind(callback_, bytes_acked_)); 153 base::Bind(callback_, num_bytes));
221 return 0; 154 sender_->SendComplete();
222 } 155 }
223 base::MessageLoop::current()->PostTask(
224 FROM_HERE, base::Bind(error_callback_, bytes_acked_, error));
225 return bytes_sent_ - bytes_acked_;
226 } 156 }
227 157
228 void DataSender::PendingSend::DispatchFatalError() { 158 void DataSender::PendingSend::DispatchFatalError() {
229 base::MessageLoop::current()->PostTask( 159 base::MessageLoop::current()->PostTask(
230 FROM_HERE, base::Bind(error_callback_, 0, fatal_error_value_)); 160 FROM_HERE, base::Bind(error_callback_, 0, sender_->fatal_error_value_));
231 } 161 }
232 162
233 bool DataSender::PendingSend::SendData(serial::DataSink* sink, 163 void DataSender::PendingSend::SendData() {
234 uint32_t* available_buffer_size) { 164 uint32_t num_bytes_to_send = static_cast<uint32_t>(data_.size());
235 uint32_t num_bytes_to_send =
236 std::min(static_cast<uint32_t>(data_.size() - bytes_sent_),
237 *available_buffer_size);
238 mojo::Array<uint8_t> bytes(num_bytes_to_send); 165 mojo::Array<uint8_t> bytes(num_bytes_to_send);
239 memcpy(&bytes[0], data_.data() + bytes_sent_, num_bytes_to_send); 166 memcpy(&bytes[0], data_.data(), num_bytes_to_send);
240 bytes_sent_ += num_bytes_to_send; 167 sender_->sink_->OnData(
241 *available_buffer_size -= num_bytes_to_send; 168 bytes.Pass(),
242 sink->OnData(bytes.Pass()); 169 base::Bind(&DataSender::PendingSend::OnDataSent, base::Unretained(this)));
243 return bytes_sent_ == data_.size();
244 }
245
246 void DataSender::PendingSend::ReportBytesSentInternal(uint32_t* num_bytes) {
247 bytes_acked_ += *num_bytes;
248 if (bytes_acked_ > bytes_sent_) {
249 *num_bytes = bytes_acked_ - bytes_sent_;
250 bytes_acked_ = bytes_sent_;
251 } else {
252 *num_bytes = 0;
253 }
254 } 170 }
255 171
256 } // namespace device 172 } // namespace device
OLDNEW
« no previous file with comments | « device/serial/data_sender.h ('k') | device/serial/data_sink_receiver.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698