Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(12)

Side by Side Diff: device/serial/data_sink_receiver.cc

Issue 889283002: Remove Client= from device/serial/data_stream.mojom. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: This time without racing message pipes Created 5 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « device/serial/data_sink_receiver.h ('k') | device/serial/data_sink_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "device/serial/data_sink_receiver.h" 5 #include "device/serial/data_sink_receiver.h"
6 6
7 #include <limits> 7 #include <limits>
8 8
9 #include "base/bind.h" 9 #include "base/bind.h"
10 #include "base/message_loop/message_loop.h" 10 #include "base/message_loop/message_loop.h"
(...skipping 27 matching lines...) Expand all
38 // Whether this receive has been cancelled. 38 // Whether this receive has been cancelled.
39 bool cancelled_; 39 bool cancelled_;
40 40
41 // If |cancelled_|, contains the cancellation error to report. 41 // If |cancelled_|, contains the cancellation error to report.
42 int32_t cancellation_error_; 42 int32_t cancellation_error_;
43 }; 43 };
44 44
45 // A frame of data received from the client. 45 // A frame of data received from the client.
46 class DataSinkReceiver::DataFrame { 46 class DataSinkReceiver::DataFrame {
47 public: 47 public:
48 explicit DataFrame(mojo::Array<uint8_t> data); 48 explicit DataFrame(mojo::Array<uint8_t> data,
49 const mojo::Callback<void(uint32_t, int32_t)>& callback);
49 50
50 // Returns the number of uncomsumed bytes remaining of this data frame. 51 // Returns the number of unconsumed bytes remaining of this data frame.
51 uint32_t GetRemainingBytes(); 52 uint32_t GetRemainingBytes();
52 53
53 // Returns a pointer to the remaining data to be consumed. 54 // Returns a pointer to the remaining data to be consumed.
54 const char* GetData(); 55 const char* GetData();
55 56
56 // Reports that |bytes_read| bytes have been consumed. 57 // Reports that |bytes_read| bytes have been consumed.
57 void OnDataConsumed(uint32_t bytes_read); 58 void OnDataConsumed(uint32_t bytes_read);
58 59
60 // Reports that an error occurred.
61 void ReportError(uint32_t bytes_read, int32_t error);
62
59 private: 63 private:
60 mojo::Array<uint8_t> data_; 64 mojo::Array<uint8_t> data_;
61 uint32_t offset_; 65 uint32_t offset_;
66 const mojo::Callback<void(uint32_t, int32_t)> callback_;
62 }; 67 };
63 68
64 DataSinkReceiver::DataSinkReceiver(const ReadyCallback& ready_callback, 69 DataSinkReceiver::DataSinkReceiver(
65 const CancelCallback& cancel_callback, 70 mojo::InterfaceRequest<serial::DataSink> request,
66 const ErrorCallback& error_callback) 71 const ReadyCallback& ready_callback,
67 : ready_callback_(ready_callback), 72 const CancelCallback& cancel_callback,
73 const ErrorCallback& error_callback)
74 : binding_(this, request.Pass()),
75 ready_callback_(ready_callback),
68 cancel_callback_(cancel_callback), 76 cancel_callback_(cancel_callback),
69 error_callback_(error_callback), 77 error_callback_(error_callback),
70 flush_pending_(false), 78 current_error_(0),
71 buffer_in_use_(NULL), 79 buffer_in_use_(NULL),
72 initialized_(false),
73 available_buffer_capacity_(0),
74 shut_down_(false), 80 shut_down_(false),
75 weak_factory_(this) { 81 weak_factory_(this) {
82 binding_.set_error_handler(this);
76 } 83 }
77 84
78 void DataSinkReceiver::ShutDown() { 85 void DataSinkReceiver::ShutDown() {
79 shut_down_ = true; 86 shut_down_ = true;
80 } 87 }
81 88
82 DataSinkReceiver::~DataSinkReceiver() { 89 DataSinkReceiver::~DataSinkReceiver() {
83 } 90 }
84 91
85 void DataSinkReceiver::Init(uint32_t buffer_size) {
86 if (initialized_) {
87 ShutDown();
88 return;
89 }
90 initialized_ = true;
91 available_buffer_capacity_ = buffer_size;
92 }
93
94 void DataSinkReceiver::Cancel(int32_t error) { 92 void DataSinkReceiver::Cancel(int32_t error) {
95 // If we have sent a ReportBytesSentAndError but have not received the 93 // If we have sent a ReportBytesSentAndError but have not received the
96 // response, that ReportBytesSentAndError message will appear to the 94 // response, that ReportBytesSentAndError message will appear to the
97 // DataSinkClient to be caused by this Cancel message. In that case, we ignore 95 // DataSinkClient to be caused by this Cancel message. In that case, we ignore
98 // the cancel. 96 // the cancel.
99 if (flush_pending_) 97 if (current_error_)
100 return; 98 return;
101 99
102 // If there is a buffer is in use, mark the buffer as cancelled and notify the 100 // If there is a buffer is in use, mark the buffer as cancelled and notify the
103 // client by calling |cancel_callback_|. The sink implementation may or may 101 // client by calling |cancel_callback_|. The sink implementation may or may
104 // not take the cancellation into account when deciding what error (if any) to 102 // not take the cancellation into account when deciding what error (if any) to
105 // return. If the sink returns an error, we ignore the cancellation error. 103 // return. If the sink returns an error, we ignore the cancellation error.
106 // Otherwise, if the sink does not report an error, we override that with the 104 // Otherwise, if the sink does not report an error, we override that with the
107 // cancellation error. Once a cancellation has been received, the next report 105 // cancellation error. Once a cancellation has been received, the next report
108 // sent to the client will always contain an error; the error returned by the 106 // sent to the client will always contain an error; the error returned by the
109 // sink or the cancellation error if the sink does not return an error. 107 // sink or the cancellation error if the sink does not return an error.
110 if (buffer_in_use_) { 108 if (buffer_in_use_) {
111 buffer_in_use_->Cancel(error); 109 buffer_in_use_->Cancel(error);
112 if (!cancel_callback_.is_null()) 110 if (!cancel_callback_.is_null())
113 cancel_callback_.Run(error); 111 cancel_callback_.Run(error);
114 return; 112 return;
115 } 113 }
116 ReportBytesSentAndError(0, error); 114 ReportError(0, error);
117 } 115 }
118 116
119 void DataSinkReceiver::OnData(mojo::Array<uint8_t> data) { 117 void DataSinkReceiver::OnData(
120 if (!initialized_) { 118 mojo::Array<uint8_t> data,
121 ShutDown(); 119 const mojo::Callback<void(uint32_t, int32_t)>& callback) {
120 if (current_error_) {
121 callback.Run(0, current_error_);
122 return; 122 return;
123 } 123 }
124 if (data.size() > available_buffer_capacity_) { 124 pending_data_buffers_.push(
125 ShutDown(); 125 linked_ptr<DataFrame>(new DataFrame(data.Pass(), callback)));
126 return; 126 if (!buffer_in_use_)
127 }
128 available_buffer_capacity_ -= static_cast<uint32_t>(data.size());
129 pending_data_buffers_.push(linked_ptr<DataFrame>(new DataFrame(data.Pass())));
130 if (!buffer_in_use_ && !flush_pending_)
131 RunReadyCallback(); 127 RunReadyCallback();
132 } 128 }
133 129
134 void DataSinkReceiver::OnConnectionError() { 130 void DataSinkReceiver::OnConnectionError() {
135 DispatchFatalError(); 131 DispatchFatalError();
136 } 132 }
137 133
138 void DataSinkReceiver::RunReadyCallback() { 134 void DataSinkReceiver::RunReadyCallback() {
139 DCHECK(!shut_down_ && !flush_pending_); 135 DCHECK(!shut_down_ && !current_error_);
140 // If data arrives while a call to RunReadyCallback() is posted, we can be 136 // If data arrives while a call to RunReadyCallback() is posted, we can be
141 // called with buffer_in_use_ already set. 137 // called with buffer_in_use_ already set.
142 if (buffer_in_use_) 138 if (buffer_in_use_)
143 return; 139 return;
144 buffer_in_use_ = 140 buffer_in_use_ =
145 new Buffer(this, 141 new Buffer(this,
146 pending_data_buffers_.front()->GetData(), 142 pending_data_buffers_.front()->GetData(),
147 pending_data_buffers_.front()->GetRemainingBytes()); 143 pending_data_buffers_.front()->GetRemainingBytes());
148 ready_callback_.Run(scoped_ptr<ReadOnlyBuffer>(buffer_in_use_)); 144 ready_callback_.Run(scoped_ptr<ReadOnlyBuffer>(buffer_in_use_));
149 } 145 }
150 146
151 void DataSinkReceiver::Done(uint32_t bytes_read) { 147 void DataSinkReceiver::Done(uint32_t bytes_read) {
152 if (!DoneInternal(bytes_read)) 148 if (!DoneInternal(bytes_read))
153 return; 149 return;
154 client()->ReportBytesSent(bytes_read); 150 pending_data_buffers_.front()->OnDataConsumed(bytes_read);
151 if (pending_data_buffers_.front()->GetRemainingBytes() == 0)
152 pending_data_buffers_.pop();
155 if (!pending_data_buffers_.empty()) { 153 if (!pending_data_buffers_.empty()) {
156 base::MessageLoop::current()->PostTask( 154 base::MessageLoop::current()->PostTask(
157 FROM_HERE, 155 FROM_HERE,
158 base::Bind(&DataSinkReceiver::RunReadyCallback, 156 base::Bind(&DataSinkReceiver::RunReadyCallback,
159 weak_factory_.GetWeakPtr())); 157 weak_factory_.GetWeakPtr()));
160 } 158 }
161 } 159 }
162 160
163 void DataSinkReceiver::DoneWithError(uint32_t bytes_read, int32_t error) { 161 void DataSinkReceiver::DoneWithError(uint32_t bytes_read, int32_t error) {
164 if (!DoneInternal(bytes_read)) 162 if (!DoneInternal(bytes_read))
165 return; 163 return;
166 ReportBytesSentAndError(bytes_read, error); 164 ReportError(bytes_read, error);
167 } 165 }
168 166
169 bool DataSinkReceiver::DoneInternal(uint32_t bytes_read) { 167 bool DataSinkReceiver::DoneInternal(uint32_t bytes_read) {
170 if (shut_down_) 168 if (shut_down_)
171 return false; 169 return false;
172 170
173 DCHECK(buffer_in_use_); 171 DCHECK(buffer_in_use_);
174 buffer_in_use_ = NULL; 172 buffer_in_use_ = NULL;
175 available_buffer_capacity_ += bytes_read;
176 pending_data_buffers_.front()->OnDataConsumed(bytes_read);
177 if (pending_data_buffers_.front()->GetRemainingBytes() == 0)
178 pending_data_buffers_.pop();
179 return true; 173 return true;
180 } 174 }
181 175
182 void DataSinkReceiver::ReportBytesSentAndError(uint32_t bytes_read, 176 void DataSinkReceiver::ReportError(uint32_t bytes_read, int32_t error) {
183 int32_t error) {
184 // When we encounter an error, we must discard the data from any send buffers 177 // When we encounter an error, we must discard the data from any send buffers
185 // transmitted by the DataSinkClient before it receives this error. 178 // transmitted by the DataSink client before it receives this error.
186 flush_pending_ = true; 179 DCHECK(error);
187 client()->ReportBytesSentAndError( 180 current_error_ = error;
188 bytes_read, 181 while (!pending_data_buffers_.empty()) {
189 error, 182 pending_data_buffers_.front()->ReportError(bytes_read, error);
190 base::Bind(&DataSinkReceiver::DoFlush, weak_factory_.GetWeakPtr())); 183 pending_data_buffers_.pop();
184 bytes_read = 0;
185 }
191 } 186 }
192 187
193 void DataSinkReceiver::DoFlush() { 188 void DataSinkReceiver::ClearError() {
194 DCHECK(flush_pending_); 189 current_error_ = 0;
195 flush_pending_ = false;
196 while (!pending_data_buffers_.empty()) {
197 available_buffer_capacity_ +=
198 pending_data_buffers_.front()->GetRemainingBytes();
199 pending_data_buffers_.pop();
200 }
201 } 190 }
202 191
203 void DataSinkReceiver::DispatchFatalError() { 192 void DataSinkReceiver::DispatchFatalError() {
204 if (shut_down_) 193 if (shut_down_)
205 return; 194 return;
206 195
207 ShutDown(); 196 ShutDown();
208 if (!error_callback_.is_null()) 197 if (!error_callback_.is_null())
209 error_callback_.Run(); 198 error_callback_.Run();
210 } 199 }
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after
254 243
255 void DataSinkReceiver::Buffer::DoneWithError(uint32_t bytes_read, 244 void DataSinkReceiver::Buffer::DoneWithError(uint32_t bytes_read,
256 int32_t error) { 245 int32_t error) {
257 scoped_refptr<DataSinkReceiver> receiver = receiver_; 246 scoped_refptr<DataSinkReceiver> receiver = receiver_;
258 receiver_ = nullptr; 247 receiver_ = nullptr;
259 receiver->DoneWithError(bytes_read, error); 248 receiver->DoneWithError(bytes_read, error);
260 buffer_ = NULL; 249 buffer_ = NULL;
261 buffer_size_ = 0; 250 buffer_size_ = 0;
262 } 251 }
263 252
264 DataSinkReceiver::DataFrame::DataFrame(mojo::Array<uint8_t> data) 253 DataSinkReceiver::DataFrame::DataFrame(
265 : data_(data.Pass()), offset_(0) { 254 mojo::Array<uint8_t> data,
255 const mojo::Callback<void(uint32_t, int32_t)>& callback)
256 : data_(data.Pass()), offset_(0), callback_(callback) {
266 DCHECK_LT(0u, data_.size()); 257 DCHECK_LT(0u, data_.size());
267 } 258 }
268 259
269 // Returns the number of uncomsumed bytes remaining of this data frame. 260 // Returns the number of uncomsumed bytes remaining of this data frame.
270 uint32_t DataSinkReceiver::DataFrame::GetRemainingBytes() { 261 uint32_t DataSinkReceiver::DataFrame::GetRemainingBytes() {
271 return static_cast<uint32_t>(data_.size() - offset_); 262 return static_cast<uint32_t>(data_.size() - offset_);
272 } 263 }
273 264
274 // Returns a pointer to the remaining data to be consumed. 265 // Returns a pointer to the remaining data to be consumed.
275 const char* DataSinkReceiver::DataFrame::GetData() { 266 const char* DataSinkReceiver::DataFrame::GetData() {
276 DCHECK_LT(offset_, data_.size()); 267 DCHECK_LT(offset_, data_.size());
277 return reinterpret_cast<const char*>(&data_[0]) + offset_; 268 return reinterpret_cast<const char*>(&data_[0]) + offset_;
278 } 269 }
279 270
280 void DataSinkReceiver::DataFrame::OnDataConsumed(uint32_t bytes_read) { 271 void DataSinkReceiver::DataFrame::OnDataConsumed(uint32_t bytes_read) {
281 offset_ += bytes_read; 272 offset_ += bytes_read;
282 DCHECK_LE(offset_, data_.size()); 273 DCHECK_LE(offset_, data_.size());
274 if (offset_ == data_.size())
275 callback_.Run(offset_, 0);
276 }
277 void DataSinkReceiver::DataFrame::ReportError(uint32_t bytes_read,
278 int32_t error) {
279 offset_ += bytes_read;
280 DCHECK_LE(offset_, data_.size());
281 callback_.Run(offset_, error);
283 } 282 }
284 283
285 } // namespace device 284 } // namespace device
OLDNEW
« no previous file with comments | « device/serial/data_sink_receiver.h ('k') | device/serial/data_sink_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698