Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(260)

Side by Side Diff: extensions/renderer/resources/data_receiver.js

Issue 646063003: Change data pipe wrappers used by SerialConnection to use message pipe. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: split out bug fix Created 6 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 define('data_receiver', [ 5 define('data_receiver', [
6 'async_waiter',
7 'device/serial/data_stream.mojom', 6 'device/serial/data_stream.mojom',
8 'device/serial/data_stream_serialization.mojom', 7 'device/serial/data_stream_serialization.mojom',
9 'mojo/public/js/bindings/core', 8 'mojo/public/js/bindings/core',
10 'mojo/public/js/bindings/router', 9 'mojo/public/js/bindings/router',
11 ], function(asyncWaiter, dataStream, serialization, core, router) { 10 ], function(dataStream, serialization, core, router) {
12 /** 11 /**
13 * @module data_receiver 12 * @module data_receiver
14 */ 13 */
15 14
16 /** 15 /**
17 * A pending receive operation. 16 * A pending receive operation.
18 * @constructor 17 * @constructor
19 * @alias module:data_receiver~PendingReceive 18 * @alias module:data_receiver~PendingReceive
20 * @private 19 * @private
21 */ 20 */
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after
58 */ 57 */
59 PendingReceive.prototype.dispatchData = function(data) { 58 PendingReceive.prototype.dispatchData = function(data) {
60 this.dataCallback_(data); 59 this.dataCallback_(data);
61 }; 60 };
62 61
63 /** 62 /**
64 * Dispatches an error if the offset of the error has been reached. 63 * Dispatches an error if the offset of the error has been reached.
65 * @param {!PendingReceiveError} error The error to dispatch. 64 * @param {!PendingReceiveError} error The error to dispatch.
66 * @param {number} bytesReceived The number of bytes that have been received. 65 * @param {number} bytesReceived The number of bytes that have been received.
67 */ 66 */
68 PendingReceive.prototype.dispatchError = function(error, bytesReceived) { 67 PendingReceive.prototype.dispatchError = function(error) {
69 if (bytesReceived != error.offset) 68 if (error.queuePosition > 0)
70 return false; 69 return false;
71 70
72 var e = new Error(); 71 var e = new Error();
73 e.error = error.error; 72 e.error = error.error;
74 this.errorCallback_(e); 73 this.errorCallback_(e);
75 return true; 74 return true;
76 }; 75 };
77 76
78 /** 77 /**
79 * Unconditionally dispatches an error. 78 * Unconditionally dispatches an error.
80 * @param {number} error The error to dispatch. 79 * @param {number} error The error to dispatch.
81 */ 80 */
82 PendingReceive.prototype.dispatchFatalError = function(error) { 81 PendingReceive.prototype.dispatchFatalError = function(error) {
83 var e = new Error(); 82 var e = new Error();
84 e.error = error; 83 e.error = error;
85 this.errorCallback_(e); 84 this.errorCallback_(e);
86 }; 85 };
87 86
88 /** 87 /**
89 * A DataReceiver that receives data from a DataSource. 88 * A DataReceiver that receives data from a DataSource.
90 * @param {!MojoHandle} handle The handle to the DataSource. 89 * @param {!MojoHandle} handle The handle to the DataSource.
91 * @param {number} bufferSize How large a buffer the data pipe should use. 90 * @param {number} bufferSize How large a buffer to use.
92 * @param {number} fatalErrorValue The receive error value to report in the 91 * @param {number} fatalErrorValue The receive error value to report in the
93 * event of a fatal error. 92 * event of a fatal error.
94 * @constructor 93 * @constructor
95 * @alias module:data_receiver.DataReceiver 94 * @alias module:data_receiver.DataReceiver
96 */ 95 */
97 function DataReceiver(handle, bufferSize, fatalErrorValue) { 96 function DataReceiver(handle, bufferSize, fatalErrorValue) {
98 var dataPipeOptions = { 97 this.init_(handle, fatalErrorValue, 0, null, [], false);
99 flags: core.CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 98 this.source_.init(bufferSize);
100 elementNumBytes: 1,
101 capacityNumBytes: bufferSize,
102 };
103 var receivePipe = core.createDataPipe(dataPipeOptions);
104 this.init_(
105 handle, receivePipe.consumerHandle, fatalErrorValue, 0, null, false);
106 this.source_.init(receivePipe.producerHandle);
107 } 99 }
108 100
109 DataReceiver.prototype = 101 DataReceiver.prototype =
110 $Object.create(dataStream.DataSourceClient.stubClass.prototype); 102 $Object.create(dataStream.DataSourceClient.stubClass.prototype);
111 103
112 /** 104 /**
113 * Closes this DataReceiver. 105 * Closes this DataReceiver.
114 */ 106 */
115 DataReceiver.prototype.close = function() { 107 DataReceiver.prototype.close = function() {
116 if (this.shutDown_) 108 if (this.shutDown_)
117 return; 109 return;
118 this.shutDown_ = true; 110 this.shutDown_ = true;
119 this.router_.close(); 111 this.router_.close();
120 this.waiter_.stop();
121 core.close(this.receivePipe_);
122 if (this.receive_) { 112 if (this.receive_) {
123 this.receive_.dispatchFatalError(this.fatalErrorValue_); 113 this.receive_.dispatchFatalError(this.fatalErrorValue_);
124 this.receive_ = null; 114 this.receive_ = null;
125 } 115 }
126 }; 116 };
127 117
128 /** 118 /**
129 * Initialize this DataReceiver. 119 * Initialize this DataReceiver.
130 * @param {!MojoHandle} source A handle to the DataSource 120 * @param {!MojoHandle} source A handle to the DataSource
131 * @param {!MojoHandle} dataPipe A handle to use for receiving data from the
132 * DataSource.
133 * @param {number} fatalErrorValue The error to dispatch in the event of a 121 * @param {number} fatalErrorValue The error to dispatch in the event of a
134 * fatal error. 122 * fatal error.
135 * @param {number} bytesReceived The number of bytes already received. 123 * @param {number} bytesReceived The number of bytes already received.
136 * @param {PendingReceiveError} pendingError The pending error if there is 124 * @param {PendingReceiveError} pendingError The pending error if there is
137 * one. 125 * one.
126 * @param {ArrayBuffer[]} pendingData Data received from the DataSource not
Ken Rockot(use gerrit already) 2014/10/28 04:59:33 At least the current version of closure compiler d
Sam McNally 2014/10/28 05:11:10 Done.
127 * yet requested by the client.
138 * @param {boolean} paused Whether the DataSource is paused. 128 * @param {boolean} paused Whether the DataSource is paused.
139 * @private 129 * @private
140 */ 130 */
141 DataReceiver.prototype.init_ = function(source, 131 DataReceiver.prototype.init_ = function(source,
142 dataPipe,
143 fatalErrorValue, 132 fatalErrorValue,
144 bytesReceived, 133 bytesReceived,
145 pendingError, 134 pendingError,
135 pendingData,
146 paused) { 136 paused) {
147 /** 137 /**
148 * The [Router]{@link module:mojo/public/js/bindings/router.Router} for the 138 * The [Router]{@link module:mojo/public/js/bindings/router.Router} for the
149 * connection to the DataSource. 139 * connection to the DataSource.
150 * @private 140 * @private
151 */ 141 */
152 this.router_ = new router.Router(source); 142 this.router_ = new router.Router(source);
153 /** 143 /**
154 * The connection to the DataSource. 144 * The connection to the DataSource.
155 * @private 145 * @private
156 */ 146 */
157 this.source_ = new dataStream.DataSource.proxyClass(this.router_); 147 this.source_ = new dataStream.DataSource.proxyClass(this.router_);
158 this.router_.setIncomingReceiver(this); 148 this.router_.setIncomingReceiver(this);
159 /** 149 /**
160 * The handle to the data pipe to use for receiving data.
161 * @private
162 */
163 this.receivePipe_ = dataPipe;
164 /**
165 * The current receive operation. 150 * The current receive operation.
166 * @type {module:data_receiver~PendingReceive} 151 * @type {module:data_receiver~PendingReceive}
167 * @private 152 * @private
168 */ 153 */
169 this.receive_ = null; 154 this.receive_ = null;
170 /** 155 /**
171 * The error to be dispatched in the event of a fatal error. 156 * The error to be dispatched in the event of a fatal error.
172 * @const {number} 157 * @const {number}
173 * @private 158 * @private
174 */ 159 */
175 this.fatalErrorValue_ = fatalErrorValue; 160 this.fatalErrorValue_ = fatalErrorValue;
176 /** 161 /**
177 * The async waiter used to wait for
178 * |[receivePipe_]{@link module:data_receiver.DataReceiver#receivePipe_}| to
179 * be readable.
180 * @type {!module:async_waiter.AsyncWaiter}
181 * @private
182 */
183 this.waiter_ = new asyncWaiter.AsyncWaiter(this.receivePipe_,
184 core.HANDLE_SIGNAL_READABLE,
185 this.onHandleReady_.bind(this));
186 /**
187 * The number of bytes received from the DataSource.
188 * @type {number}
189 * @private
190 */
191 this.bytesReceived_ = bytesReceived;
192 /**
193 * The pending error if there is one. 162 * The pending error if there is one.
194 * @type {PendingReceiveError} 163 * @type {PendingReceiveError}
195 * @private 164 * @private
196 */ 165 */
197 this.pendingError_ = pendingError; 166 this.pendingError_ = pendingError;
198 /** 167 /**
199 * Whether the DataSource is paused. 168 * Whether the DataSource is paused.
200 * @type {boolean} 169 * @type {boolean}
201 * @private 170 * @private
202 */ 171 */
203 this.paused_ = paused; 172 this.paused_ = paused;
204 /** 173 /**
174 * A queue of data that has been received from the DataSource, but not
175 * consumed by the client.
176 * @type {module:data_receiver~PendingData[]}
177 * @private
178 */
179 this.pendingDataBuffers_ = pendingData;
180 /**
205 * Whether this DataReceiver has shut down. 181 * Whether this DataReceiver has shut down.
206 * @type {boolean} 182 * @type {boolean}
207 * @private 183 * @private
208 */ 184 */
209 this.shutDown_ = false; 185 this.shutDown_ = false;
210 }; 186 };
211 187
212 /** 188 /**
213 * Serializes this DataReceiver. 189 * Serializes this DataReceiver.
214 * This will cancel a receive if one is in progress. 190 * This will cancel a receive if one is in progress.
215 * @return {!Promise.<SerializedDataReceiver>} A promise that will resolve to 191 * @return {!Promise.<SerializedDataReceiver>} A promise that will resolve to
216 * the serialization of this DataReceiver. If this DataReceiver has shut 192 * the serialization of this DataReceiver. If this DataReceiver has shut
217 * down, the promise will resolve to null. 193 * down, the promise will resolve to null.
218 */ 194 */
219 DataReceiver.prototype.serialize = function() { 195 DataReceiver.prototype.serialize = function() {
220 if (this.shutDown_) 196 if (this.shutDown_)
221 return Promise.resolve(null); 197 return Promise.resolve(null);
222 198
223 this.waiter_.stop();
224 if (this.receive_) { 199 if (this.receive_) {
225 this.receive_.dispatchFatalError(this.fatalErrorValue_); 200 this.receive_.dispatchFatalError(this.fatalErrorValue_);
226 this.receive_ = null; 201 this.receive_ = null;
227 } 202 }
228 var serialized = new serialization.SerializedDataReceiver(); 203 var serialized = new serialization.SerializedDataReceiver();
229 serialized.source = this.router_.connector_.handle_; 204 serialized.source = this.router_.connector_.handle_;
230 serialized.data_pipe = this.receivePipe_;
231 serialized.fatal_error_value = this.fatalErrorValue_; 205 serialized.fatal_error_value = this.fatalErrorValue_;
232 serialized.bytes_received = this.bytesReceived_;
233 serialized.paused = this.paused_; 206 serialized.paused = this.paused_;
234 serialized.pending_error = this.pendingError_; 207 serialized.pending_error = this.pendingError_;
208 serialized.pending_data = [];
209 $Array.forEach(this.pendingDataBuffers_, function(buffer) {
210 serialized.pending_data.push(new Uint8Array(buffer));
211 });
235 this.router_.connector_.handle_ = null; 212 this.router_.connector_.handle_ = null;
236 this.router_.close(); 213 this.router_.close();
237 this.shutDown_ = true; 214 this.shutDown_ = true;
238 return Promise.resolve(serialized); 215 return Promise.resolve(serialized);
239 }; 216 };
240 217
241 /** 218 /**
242 * Deserializes a SerializedDataReceiver. 219 * Deserializes a SerializedDataReceiver.
243 * @param {SerializedDataReceiver} serialized The serialized DataReceiver. 220 * @param {SerializedDataReceiver} serialized The serialized DataReceiver.
244 * @return {!DataReceiver} The deserialized DataReceiver. 221 * @return {!DataReceiver} The deserialized DataReceiver.
245 */ 222 */
246 DataReceiver.deserialize = function(serialized) { 223 DataReceiver.deserialize = function(serialized) {
247 var receiver = $Object.create(DataReceiver.prototype); 224 var receiver = $Object.create(DataReceiver.prototype);
248 receiver.deserialize_(serialized); 225 receiver.deserialize_(serialized);
249 return receiver; 226 return receiver;
250 }; 227 };
251 228
252 /** 229 /**
253 * Deserializes a SerializedDataReceiver into this DataReceiver. 230 * Deserializes a SerializedDataReceiver into this DataReceiver.
254 * @param {SerializedDataReceiver} serialized The serialized DataReceiver. 231 * @param {SerializedDataReceiver} serialized The serialized DataReceiver.
255 * @private 232 * @private
256 */ 233 */
257 DataReceiver.prototype.deserialize_ = function(serialized) { 234 DataReceiver.prototype.deserialize_ = function(serialized) {
258 if (!serialized) { 235 if (!serialized) {
259 this.shutDown_ = true; 236 this.shutDown_ = true;
260 return; 237 return;
261 } 238 }
239 var pendingData = [];
240 $Array.forEach(serialized.pending_data, function(data) {
241 var buffer = new Uint8Array(data.length);
242 buffer.set(data);
243 pendingData.push(buffer.buffer);
244 });
262 this.init_(serialized.source, 245 this.init_(serialized.source,
263 serialized.data_pipe,
264 serialized.fatal_error_value, 246 serialized.fatal_error_value,
265 serialized.bytes_received, 247 serialized.bytes_received,
266 serialized.pending_error, 248 serialized.pending_error,
249 pendingData,
267 serialized.paused); 250 serialized.paused);
268 }; 251 };
269 252
270 /** 253 /**
271 * Receive data from the DataSource. 254 * Receive data from the DataSource.
272 * @return {Promise.<ArrayBuffer>} A promise to the received data. If an error 255 * @return {Promise.<ArrayBuffer>} A promise to the received data. If an error
273 * occurs, the promise will reject with an Error object with a property 256 * occurs, the promise will reject with an Error object with a property
274 * error containing the error code. 257 * error containing the error code.
275 * @throws Will throw if this has encountered a fatal error or another receive 258 * @throws Will throw if this has encountered a fatal error or another receive
276 * is in progress. 259 * is in progress.
277 */ 260 */
278 DataReceiver.prototype.receive = function() { 261 DataReceiver.prototype.receive = function() {
279 if (this.shutDown_) 262 if (this.shutDown_)
280 throw new Error('DataReceiver has been closed'); 263 throw new Error('DataReceiver has been closed');
281 if (this.receive_) 264 if (this.receive_)
282 throw new Error('Receive already in progress.'); 265 throw new Error('Receive already in progress.');
283 var receive = new PendingReceive(); 266 var receive = new PendingReceive();
284 var promise = receive.getPromise(); 267 var promise = receive.getPromise();
285 if (this.pendingError_ && 268 if (this.pendingError_ &&
286 receive.dispatchError(this.pendingError_, this.bytesReceived_)) { 269 receive.dispatchError(this.pendingError_)) {
287 this.pendingError_ = null; 270 this.pendingError_ = null;
288 this.paused_ = true; 271 this.paused_ = true;
289 return promise; 272 return promise;
290 } 273 }
291 if (this.paused_) { 274 if (this.paused_) {
292 this.source_.resume(); 275 this.source_.resume();
293 this.paused_ = false; 276 this.paused_ = false;
294 } 277 }
295 this.receive_ = receive; 278 this.receive_ = receive;
296 this.waiter_.start(); 279 this.dispatchData_();
297 return promise; 280 return promise;
298 }; 281 };
299 282
300 /** 283 DataReceiver.prototype.dispatchData_ = function() {
301 * Invoked when 284 if (!this.receive_) {
302 * |[receivePipe_]{@link module:data_receiver.DataReceiver#receivePipe_}| is
303 * ready to read. Reads from the data pipe if the wait is successful.
304 * @param {number} waitResult The result of the asynchronous wait.
305 * @private
306 */
307 DataReceiver.prototype.onHandleReady_ = function(waitResult) {
308 if (waitResult != core.RESULT_OK || !this.receive_) {
309 this.close(); 285 this.close();
310 return; 286 return;
311 } 287 }
312 var result = core.readData(this.receivePipe_, core.READ_DATA_FLAG_NONE); 288 if (this.pendingDataBuffers_.length) {
313 if (result.result == core.RESULT_OK) { 289 this.receive_.dispatchData(this.pendingDataBuffers_[0]);
314 // TODO(sammc): Handle overflow in the same fashion as the C++ receiver. 290 this.source_.reportBytesReceived(this.pendingDataBuffers_[0].byteLength);
315 this.bytesReceived_ += result.buffer.byteLength;
316 this.receive_.dispatchData(result.buffer);
317 this.receive_ = null; 291 this.receive_ = null;
318 } else if (result.result == core.RESULT_SHOULD_WAIT) { 292 this.pendingDataBuffers_.shift();
319 this.waiter_.start(); 293 if (this.pendingError_)
320 } else { 294 this.pendingError_.queuePosition--;
321 this.close();
322 } 295 }
323 }; 296 };
324 297
325 /** 298 /**
326 * Invoked by the DataSource when an error is encountered. 299 * Invoked by the DataSource when an error is encountered.
327 * @param {number} offset The location at which the error occurred. 300 * @param {number} offset The location at which the error occurred.
328 * @param {number} error The error that occurred. 301 * @param {number} error The error that occurred.
329 * @private 302 * @private
330 */ 303 */
331 DataReceiver.prototype.onError = function(offset, error) { 304 DataReceiver.prototype.onError = function(error) {
332 if (this.shutDown_) 305 if (this.shutDown_)
333 return; 306 return;
334 307
335 var pendingError = new serialization.PendingReceiveError(); 308 var pendingError = new serialization.PendingReceiveError();
336 pendingError.error = error; 309 pendingError.error = error;
337 pendingError.offset = offset; 310 pendingError.queuePosition = this.pendingDataBuffers_.length;
338 if (this.receive_ && 311 if (this.receive_ && this.receive_.dispatchError(pendingError)) {
339 this.receive_.dispatchError(pendingError, this.bytesReceived_)) {
340 this.receive_ = null; 312 this.receive_ = null;
341 this.waiter_.stop();
342 this.paused_ = true; 313 this.paused_ = true;
343 return; 314 return;
344 } 315 }
345 this.pendingError_ = pendingError; 316 this.pendingError_ = pendingError;
346 }; 317 };
347 318
319 DataReceiver.prototype.onData = function(data) {
320 var buffer = new ArrayBuffer(data.length);
321 var uintView = new Uint8Array(buffer);
322 uintView.set(data);
323 this.pendingDataBuffers_.push(buffer);
324 if (this.receive_)
325 this.dispatchData_();
326 };
327
348 return {DataReceiver: DataReceiver}; 328 return {DataReceiver: DataReceiver};
349 }); 329 });
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698