OLD | NEW |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "device/serial/data_sender.h" | 5 #include "device/serial/data_sender.h" |
6 | 6 |
7 #include <algorithm> | 7 #include <algorithm> |
8 | 8 |
9 #include "base/bind.h" | 9 #include "base/bind.h" |
10 #include "base/message_loop/message_loop.h" | 10 #include "base/message_loop/message_loop.h" |
11 | 11 |
12 namespace device { | 12 namespace device { |
13 | 13 |
14 // Represents a send that is not yet fulfilled. | 14 // Represents a send that is not yet fulfilled. |
15 class DataSender::PendingSend { | 15 class DataSender::PendingSend { |
16 public: | 16 public: |
17 PendingSend(const base::StringPiece& data, | 17 PendingSend(const base::StringPiece& data, |
18 const DataSentCallback& callback, | 18 const DataSentCallback& callback, |
19 const SendErrorCallback& error_callback, | 19 const SendErrorCallback& error_callback, |
20 int32_t fatal_error_value); | 20 DataSender* sender); |
21 | 21 |
22 // Invoked to report that |num_bytes| of data have been sent. Subtracts the | 22 // Reports |fatal_error_value_| to |receive_error_callback_|. |
23 // number of bytes that were part of this send from |num_bytes|. Returns | 23 void DispatchFatalError(); |
24 // whether this send has been completed. If this send has been completed, this | |
25 // calls |callback_|. | |
26 bool ReportBytesSent(uint32_t* num_bytes); | |
27 | 24 |
| 25 // Attempts to send any data not yet sent to |sink|. |
| 26 void SendData(); |
| 27 |
| 28 private: |
28 // Invoked to report that |num_bytes| of data have been sent and then an | 29 // Invoked to report that |num_bytes| of data have been sent and then an |
29 // error, |error| was encountered. Subtracts the number of bytes that were | 30 // error, |error| was encountered. Subtracts the number of bytes that were |
30 // part of this send from |num_bytes|. If this send was not completed before | 31 // part of this send from |num_bytes|. If this send was not completed before |
31 // the error, this calls |error_callback_| to report the error. Otherwise, | 32 // the error, this calls |error_callback_| to report the error. Otherwise, |
32 // this calls |callback_|. Returns the number of bytes sent but not acked. | 33 // this calls |callback_|. Returns the number of bytes sent but not acked. |
33 uint32_t ReportBytesSentAndError(uint32_t* num_bytes, int32_t error); | 34 void OnDataSent(uint32_t num_bytes, int32_t error); |
34 | |
35 // Reports |fatal_error_value_| to |receive_error_callback_|. | |
36 void DispatchFatalError(); | |
37 | |
38 // Attempts to send any data not yet sent to |sink|. | |
39 bool SendData(serial::DataSink* sink, uint32_t* available_buffer_size); | |
40 | |
41 private: | |
42 // Invoked to update |bytes_acked_| and |num_bytes|. | |
43 void ReportBytesSentInternal(uint32_t* num_bytes); | |
44 | 35 |
45 // The data to send. | 36 // The data to send. |
46 const base::StringPiece data_; | 37 const base::StringPiece data_; |
47 | 38 |
48 // The callback to report success. | 39 // The callback to report success. |
49 const DataSentCallback callback_; | 40 const DataSentCallback callback_; |
50 | 41 |
51 // The callback to report errors. | 42 // The callback to report errors. |
52 const SendErrorCallback error_callback_; | 43 const SendErrorCallback error_callback_; |
53 | 44 |
54 // The error value to report when DispatchFatalError() is called. | 45 // The DataSender that owns this PendingSend. |
55 const int32_t fatal_error_value_; | 46 DataSender* sender_; |
56 | |
57 // The number of bytes sent to the DataSink. | |
58 uint32_t bytes_sent_; | |
59 | |
60 // The number of bytes acked. | |
61 uint32_t bytes_acked_; | |
62 }; | 47 }; |
63 | 48 |
64 DataSender::DataSender(mojo::InterfacePtr<serial::DataSink> sink, | 49 DataSender::DataSender(mojo::InterfacePtr<serial::DataSink> sink, |
65 uint32_t buffer_size, | 50 uint32_t buffer_size, |
66 int32_t fatal_error_value) | 51 int32_t fatal_error_value) |
67 : sink_(sink.Pass()), | 52 : sink_(sink.Pass()), |
68 fatal_error_value_(fatal_error_value), | 53 fatal_error_value_(fatal_error_value), |
69 available_buffer_capacity_(buffer_size), | |
70 shut_down_(false) { | 54 shut_down_(false) { |
71 sink_.set_error_handler(this); | 55 sink_.set_error_handler(this); |
72 sink_.set_client(this); | |
73 sink_->Init(buffer_size); | |
74 } | 56 } |
75 | 57 |
76 DataSender::~DataSender() { | 58 DataSender::~DataSender() { |
77 ShutDown(); | 59 ShutDown(); |
78 } | 60 } |
79 | 61 |
80 bool DataSender::Send(const base::StringPiece& data, | 62 bool DataSender::Send(const base::StringPiece& data, |
81 const DataSentCallback& callback, | 63 const DataSentCallback& callback, |
82 const SendErrorCallback& error_callback) { | 64 const SendErrorCallback& error_callback) { |
83 DCHECK(!callback.is_null() && !error_callback.is_null()); | 65 DCHECK(!callback.is_null() && !error_callback.is_null()); |
84 if (!pending_cancel_.is_null() || shut_down_) | 66 if (!pending_cancel_.is_null() || shut_down_) |
85 return false; | 67 return false; |
86 | 68 |
87 pending_sends_.push(linked_ptr<PendingSend>( | 69 linked_ptr<PendingSend> pending_send( |
88 new PendingSend(data, callback, error_callback, fatal_error_value_))); | 70 new PendingSend(data, callback, error_callback, this)); |
89 SendInternal(); | 71 pending_send->SendData(); |
| 72 sends_awaiting_ack_.push(pending_send); |
90 return true; | 73 return true; |
91 } | 74 } |
92 | 75 |
93 bool DataSender::Cancel(int32_t error, const CancelCallback& callback) { | 76 bool DataSender::Cancel(int32_t error, const CancelCallback& callback) { |
94 DCHECK(!callback.is_null()); | 77 DCHECK(!callback.is_null()); |
95 if (!pending_cancel_.is_null() || shut_down_) | 78 if (!pending_cancel_.is_null() || shut_down_) |
96 return false; | 79 return false; |
97 if (pending_sends_.empty() && sends_awaiting_ack_.empty()) { | 80 if (sends_awaiting_ack_.empty()) { |
98 base::MessageLoop::current()->PostTask(FROM_HERE, callback); | 81 base::MessageLoop::current()->PostTask(FROM_HERE, callback); |
99 return true; | 82 return true; |
100 } | 83 } |
101 | 84 |
102 pending_cancel_ = callback; | 85 pending_cancel_ = callback; |
103 sink_->Cancel(error); | 86 sink_->Cancel(error); |
104 return true; | 87 return true; |
105 } | 88 } |
106 | 89 |
107 void DataSender::ReportBytesSent(uint32_t bytes_sent) { | 90 void DataSender::SendComplete() { |
108 if (shut_down_) | 91 if (shut_down_) |
109 return; | 92 return; |
110 | 93 |
111 available_buffer_capacity_ += bytes_sent; | 94 DCHECK(!sends_awaiting_ack_.empty()); |
112 while (bytes_sent != 0 && !sends_awaiting_ack_.empty() && | 95 sends_awaiting_ack_.pop(); |
113 sends_awaiting_ack_.front()->ReportBytesSent(&bytes_sent)) { | 96 if (sends_awaiting_ack_.empty()) |
114 sends_awaiting_ack_.pop(); | |
115 } | |
116 if (bytes_sent > 0 && !pending_sends_.empty()) { | |
117 bool finished = pending_sends_.front()->ReportBytesSent(&bytes_sent); | |
118 DCHECK(!finished); | |
119 if (finished) { | |
120 ShutDown(); | |
121 return; | |
122 } | |
123 } | |
124 if (bytes_sent != 0) { | |
125 ShutDown(); | |
126 return; | |
127 } | |
128 if (pending_sends_.empty() && sends_awaiting_ack_.empty()) | |
129 RunCancelCallback(); | 97 RunCancelCallback(); |
130 SendInternal(); | |
131 } | 98 } |
132 | 99 |
133 void DataSender::ReportBytesSentAndError( | 100 void DataSender::SendFailed(int32_t error) { |
134 uint32_t bytes_sent, | |
135 int32_t error, | |
136 const mojo::Callback<void()>& callback) { | |
137 if (shut_down_) | 101 if (shut_down_) |
138 return; | 102 return; |
139 | 103 |
140 available_buffer_capacity_ += bytes_sent; | 104 DCHECK(!sends_awaiting_ack_.empty()); |
141 while (!sends_awaiting_ack_.empty()) { | 105 sends_awaiting_ack_.pop(); |
142 available_buffer_capacity_ += | 106 if (!sends_awaiting_ack_.empty()) |
143 sends_awaiting_ack_.front()->ReportBytesSentAndError(&bytes_sent, | 107 return; |
144 error); | 108 sink_->ClearError(); |
145 sends_awaiting_ack_.pop(); | |
146 } | |
147 while (!pending_sends_.empty()) { | |
148 available_buffer_capacity_ += | |
149 pending_sends_.front()->ReportBytesSentAndError(&bytes_sent, error); | |
150 pending_sends_.pop(); | |
151 } | |
152 callback.Run(); | |
153 RunCancelCallback(); | 109 RunCancelCallback(); |
154 } | 110 } |
155 | 111 |
156 void DataSender::OnConnectionError() { | 112 void DataSender::OnConnectionError() { |
157 ShutDown(); | 113 ShutDown(); |
158 } | 114 } |
159 | 115 |
160 void DataSender::SendInternal() { | |
161 while (!pending_sends_.empty() && available_buffer_capacity_) { | |
162 if (pending_sends_.front()->SendData(sink_.get(), | |
163 &available_buffer_capacity_)) { | |
164 sends_awaiting_ack_.push(pending_sends_.front()); | |
165 pending_sends_.pop(); | |
166 } | |
167 } | |
168 } | |
169 | |
170 void DataSender::RunCancelCallback() { | 116 void DataSender::RunCancelCallback() { |
171 DCHECK(pending_sends_.empty() && sends_awaiting_ack_.empty()); | 117 DCHECK(sends_awaiting_ack_.empty()); |
172 if (pending_cancel_.is_null()) | 118 if (pending_cancel_.is_null()) |
173 return; | 119 return; |
174 | 120 |
175 base::MessageLoop::current()->PostTask(FROM_HERE, | 121 base::MessageLoop::current()->PostTask(FROM_HERE, |
176 base::Bind(pending_cancel_)); | 122 base::Bind(pending_cancel_)); |
177 pending_cancel_.Reset(); | 123 pending_cancel_.Reset(); |
178 } | 124 } |
179 | 125 |
180 void DataSender::ShutDown() { | 126 void DataSender::ShutDown() { |
181 shut_down_ = true; | 127 shut_down_ = true; |
182 while (!pending_sends_.empty()) { | |
183 pending_sends_.front()->DispatchFatalError(); | |
184 pending_sends_.pop(); | |
185 } | |
186 while (!sends_awaiting_ack_.empty()) { | 128 while (!sends_awaiting_ack_.empty()) { |
187 sends_awaiting_ack_.front()->DispatchFatalError(); | 129 sends_awaiting_ack_.front()->DispatchFatalError(); |
188 sends_awaiting_ack_.pop(); | 130 sends_awaiting_ack_.pop(); |
189 } | 131 } |
190 RunCancelCallback(); | 132 RunCancelCallback(); |
191 } | 133 } |
192 | 134 |
193 DataSender::PendingSend::PendingSend(const base::StringPiece& data, | 135 DataSender::PendingSend::PendingSend(const base::StringPiece& data, |
194 const DataSentCallback& callback, | 136 const DataSentCallback& callback, |
195 const SendErrorCallback& error_callback, | 137 const SendErrorCallback& error_callback, |
196 int32_t fatal_error_value) | 138 DataSender* sender) |
197 : data_(data), | 139 : data_(data), |
198 callback_(callback), | 140 callback_(callback), |
199 error_callback_(error_callback), | 141 error_callback_(error_callback), |
200 fatal_error_value_(fatal_error_value), | 142 sender_(sender) { |
201 bytes_sent_(0), | |
202 bytes_acked_(0) { | |
203 } | 143 } |
204 | 144 |
205 bool DataSender::PendingSend::ReportBytesSent(uint32_t* num_bytes) { | 145 void DataSender::PendingSend::OnDataSent(uint32_t num_bytes, int32_t error) { |
206 ReportBytesSentInternal(num_bytes); | 146 if (error) { |
207 if (bytes_acked_ < data_.size()) | 147 base::MessageLoop::current()->PostTask( |
208 return false; | 148 FROM_HERE, base::Bind(error_callback_, num_bytes, error)); |
209 | 149 sender_->SendFailed(error); |
210 base::MessageLoop::current()->PostTask(FROM_HERE, | 150 } else { |
211 base::Bind(callback_, bytes_acked_)); | 151 DCHECK(num_bytes == data_.size()); |
212 return true; | |
213 } | |
214 | |
215 uint32_t DataSender::PendingSend::ReportBytesSentAndError(uint32_t* num_bytes, | |
216 int32_t error) { | |
217 ReportBytesSentInternal(num_bytes); | |
218 if (*num_bytes > 0) { | |
219 base::MessageLoop::current()->PostTask(FROM_HERE, | 152 base::MessageLoop::current()->PostTask(FROM_HERE, |
220 base::Bind(callback_, bytes_acked_)); | 153 base::Bind(callback_, num_bytes)); |
221 return 0; | 154 sender_->SendComplete(); |
222 } | 155 } |
223 base::MessageLoop::current()->PostTask( | |
224 FROM_HERE, base::Bind(error_callback_, bytes_acked_, error)); | |
225 return bytes_sent_ - bytes_acked_; | |
226 } | 156 } |
227 | 157 |
228 void DataSender::PendingSend::DispatchFatalError() { | 158 void DataSender::PendingSend::DispatchFatalError() { |
229 base::MessageLoop::current()->PostTask( | 159 base::MessageLoop::current()->PostTask( |
230 FROM_HERE, base::Bind(error_callback_, 0, fatal_error_value_)); | 160 FROM_HERE, base::Bind(error_callback_, 0, sender_->fatal_error_value_)); |
231 } | 161 } |
232 | 162 |
233 bool DataSender::PendingSend::SendData(serial::DataSink* sink, | 163 void DataSender::PendingSend::SendData() { |
234 uint32_t* available_buffer_size) { | 164 uint32_t num_bytes_to_send = static_cast<uint32_t>(data_.size()); |
235 uint32_t num_bytes_to_send = | |
236 std::min(static_cast<uint32_t>(data_.size() - bytes_sent_), | |
237 *available_buffer_size); | |
238 mojo::Array<uint8_t> bytes(num_bytes_to_send); | 165 mojo::Array<uint8_t> bytes(num_bytes_to_send); |
239 memcpy(&bytes[0], data_.data() + bytes_sent_, num_bytes_to_send); | 166 memcpy(&bytes[0], data_.data(), num_bytes_to_send); |
240 bytes_sent_ += num_bytes_to_send; | 167 sender_->sink_->OnData( |
241 *available_buffer_size -= num_bytes_to_send; | 168 bytes.Pass(), |
242 sink->OnData(bytes.Pass()); | 169 base::Bind(&DataSender::PendingSend::OnDataSent, base::Unretained(this))); |
243 return bytes_sent_ == data_.size(); | |
244 } | |
245 | |
246 void DataSender::PendingSend::ReportBytesSentInternal(uint32_t* num_bytes) { | |
247 bytes_acked_ += *num_bytes; | |
248 if (bytes_acked_ > bytes_sent_) { | |
249 *num_bytes = bytes_acked_ - bytes_sent_; | |
250 bytes_acked_ = bytes_sent_; | |
251 } else { | |
252 *num_bytes = 0; | |
253 } | |
254 } | 170 } |
255 | 171 |
256 } // namespace device | 172 } // namespace device |
OLD | NEW |