Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(14)

Side by Side Diff: extensions/renderer/resources/data_sender.js

Issue 488003002: Add the JS data pipe client to be used to implement serial send. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@js-receive-pipe
Patch Set: address comments Created 6 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 define('data_sender', [
6 'async_waiter',
7 'device/serial/data_stream.mojom',
8 'mojo/public/js/bindings/core',
9 'mojo/public/js/bindings/router',
10 ], function(asyncWaiter, dataStreamMojom, core, routerModule) {
11 /**
12 * @module data_sender
13 */
14
15 /**
16 * A pending send operation.
17 * @param {ArrayBuffer} data The data to be sent.
18 * @constructor
19 * @alias module:data_sender~PendingSend
20 * @private
21 */
22 function PendingSend(data) {
23 /**
24 * The remaining data to be sent.
25 * @type {ArrayBuffer}
26 * @private
27 */
28 this.data_ = data;
29 /**
30 * The total length of data to be sent.
31 * @type {number}
32 * @private
33 */
34 this.length_ = data.byteLength;
35 /**
36 * The number of bytes that have been received by the DataSink.
37 * @type {number}
38 * @private
39 */
40 this.bytesReceivedBySink_ = 0;
41 /**
42 * The promise that will be resolved or rejected when this send completes
43 * or fails, respectively.
44 * @type {Promise.<number>}
45 * @private
46 */
47 this.promise_ = new Promise(function(resolve, reject) {
48 /**
49 * The callback to call on success.
50 * @type {Function}
51 * @private
52 */
53 this.successCallback_ = resolve;
54 /**
55 * The callback to call with the error on failure.
56 * @type {Function}
57 * @private
58 */
59 this.errorCallback_ = reject;
60 }.bind(this));
61 }
62
63 /**
64 * Returns the promise that will be resolved when this operation completes or
65 * rejected if an error occurs.
66 * @return {Promise.<number>} A promise to the number of bytes sent.
67 */
68 PendingSend.prototype.getPromise = function() {
69 return this.promise_;
70 };
71
72 /**
73 * @typedef module:data_sender~PendingSend.ReportBytesResult
74 * @property {number} bytesUnreported The number of bytes reported that were
75 * not part of the send.
76 * @property {boolean?} done Whether this send has completed.
77 * @property {number?} bytesToFlush The number of bytes to flush in the event
78 * of an error.
79 */
80
81 /**
82 * Invoked when the DataSink reports that bytes have been sent. Resolves the
83 * promise returned by
84 * [getPromise()]{@link module:data_sender~PendingSend#getPromise} once all
85 * bytes have been reported as sent.
86 * @param {number} numBytes The number of bytes sent.
87 * @return {module:data_sender~PendingSend.ReportBytesResult}
88 */
89 PendingSend.prototype.reportBytesSent = function(numBytes) {
90 var result = this.reportBytesSentInternal_(numBytes);
91 if (this.bytesReceivedBySink_ == this.length_) {
92 result.done = true;
93 this.successCallback_(this.bytesReceivedBySink_);
94 }
95 return result;
96 };
97
98 /**
99 * Invoked when the DataSink reports an error. Rejects the promise returned by
100 * [getPromise()]{@link module:data_sender~PendingSend#getPromise} unless the
101 * error occurred after this send, that is, unless numBytes is greater than
102 * the nubmer of outstanding bytes.
103 * @param {number} numBytes The number of bytes sent.
104 * @param {number} error The error reported by the DataSink.
105 * @return {module:data_sender~PendingSend.ReportBytesResult}
106 */
107 PendingSend.prototype.reportBytesSentAndError = function(numBytes, error) {
108 var result = this.reportBytesSentInternal_(numBytes);
109 // If there are remaining bytes to report, the error occurred after this
110 // PendingSend so we should report success.
111 if (result.bytesUnreported > 0) {
112 this.successCallback_(this.bytesReceivedBySink_);
113 result.bytesToFlush = 0;
114 return result;
115 }
116
117 var e = new Error();
118 e.error = error;
119 e.bytesSent = this.bytesReceivedBySink_;
120 this.errorCallback_(e);
121 this.done = true;
122 result.bytesToFlush =
123 this.length_ - this.data_.byteLength - this.bytesReceivedBySink_;
124 return result;
125 };
126
127 /**
128 * Updates the internal state in response to a report from the DataSink.
129 * @param {number} numBytes The number of bytes sent.
130 * @return {module:data_sender~PendingSend.ReportBytesResult}
131 * @private
132 */
133 PendingSend.prototype.reportBytesSentInternal_ = function(numBytes) {
134 this.bytesReceivedBySink_ += numBytes;
135 var result = {bytesUnreported: 0};
136 if (this.bytesReceivedBySink_ > this.length_) {
137 result.bytesUnreported = this.bytesReceivedBySink_ - this.length_;
138 this.bytesReceivedBySink_ = this.length_;
139 }
140 result.done = false;
141 return result;
142 };
143
144 /**
145 * Writes pending data into the data pipe.
146 * @param {MojoHandle} handle The handle to the data pipe.
147 * @return {number} The Mojo result corresponding to the outcome:
148 * <ul>
149 * <li>RESULT_OK if the write completes successfully;
150 * <li>RESULT_SHOULD_WAIT if some, but not all data was written; or
151 * <li>the data pipe error if the write failed.
152 * </ul>
153 */
154 PendingSend.prototype.sendData = function(handle) {
155 var result = core.writeData(
156 handle, new Int8Array(this.data_), core.WRITE_DATA_FLAG_NONE);
157 if (result.result != core.RESULT_OK)
158 return result.result;
159 this.data_ = this.data_.slice(result.numBytes);
160 return this.data_.byteLength ? core.RESULT_SHOULD_WAIT : core.RESULT_OK;
161 };
162
163 /**
164 * A DataSender that sends data to a DataSink.
165 * @param {MojoHandle} handle The handle to the DataSink.
166 * @param {number} bufferSize How large a buffer the data pipe should use.
167 * @param {number} fatalErrorValue The send error value to report in the
168 * event of a fatal error.
169 * @constructor
170 * @alias module:data_sender.DataSender
171 */
172 function DataSender(handle, bufferSize, fatalErrorValue) {
173 /**
174 * The [Router]{@link module:mojo/public/js/bindings/router.Router} for the
175 * connection to the DataSink.
176 * @private
177 */
178 this.router_ = new routerModule.Router(handle);
179 /**
180 * The connection to the DataSink.
181 * @private
182 */
183 this.sink_ = new dataStreamMojom.DataSinkProxy(this.router_);
184 this.router_.setIncomingReceiver(this);
185 var dataPipeOptions = {
186 flags: core.CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,
187 elementNumBytes: 1,
188 capacityNumBytes: bufferSize,
189 };
190 var sendPipe = core.createDataPipe(dataPipeOptions);
191 this.sink_.init(sendPipe.consumerHandle);
192 /**
193 * The handle to the data pipe to use for sending data.
194 * @private
195 */
196 this.sendPipe_ = sendPipe.producerHandle;
197 /**
198 * The error to be dispatched in the event of a fatal error.
199 * @type {number}
200 * @private
201 */
202 this.fatalErrorValue_ = fatalErrorValue;
203 /**
204 * The async waiter used to wait for
205 * {@link module:data_sender.DataSender#sendPipe_} to be writable.
206 * @type module:async_waiter.AsyncWaiter
207 * @private
208 */
209 this.waiter_ = new asyncWaiter.AsyncWaiter(
210 this.sendPipe_, core.HANDLE_SIGNAL_WRITABLE,
211 this.onHandleReady_.bind(this));
212 /**
213 * A queue of sends that have not fully written their data to the data pipe.
214 * @type module:data_sender~PendingSend[]
215 * @private
216 */
217 this.pendingSends_ = [];
218 /**
219 * A queue of sends that have written their data to the data pipe, but have
220 * not been received by the DataSink.
221 * @type module:data_sender~PendingSend[]
222 * @private
223 */
224 this.sendsAwaitingAck_ = [];
225 /**
226 * The callback that will resolve a pending cancel if one is in progress.
227 * @type Function
228 * @private
229 */
230 this.pendingCancel_ = null;
231 /**
232 * Whether this DataReceiver has shut down.
233 * @type {boolean}
234 * @private
235 */
236 this.shutDown_ = false;
237 }
238
239 DataSender.prototype =
240 $Object.create(dataStreamMojom.DataSinkClientStub.prototype);
241
242 /**
243 * Closes this DataSender.
244 */
245 DataSender.prototype.close = function() {
246 if (this.shutDown_)
247 return;
248 this.shutDown_ = true;
249 this.waiter_.stop();
250 this.router_.close();
251 core.close(this.sendPipe_);
252 while (this.pendingSends_.length) {
253 this.pendingSends_.pop().reportBytesSentAndError(
254 0, this.fatalErrorValue_);
255 }
256 while (this.sendsAwaitingAck_.length) {
257 this.sendsAwaitingAck_.pop().reportBytesSentAndError(
258 0, this.fatalErrorValue_);
259 }
260 if (this.pendingCancel_) {
261 this.pendingCancel_();
262 this.pendingCancel_ = null;
263 }
264 };
265
266 /**
267 * Sends data to the DataSink.
268 * @return {Promise.<number>} A promise to the number of bytes sent. If an
269 * error occurs, the promise will reject with an Error object with a
270 * property error containing the error code.
271 * @throws Will throw if this has encountered a fatal error or a cancel is in
272 * progress.
273 */
274 DataSender.prototype.send = function(data) {
275 if (this.shutDown_)
276 throw new Error('DataSender has been closed');
277 if (this.pendingCancel_)
278 throw new Error('Cancel in progress');
279 var send = new PendingSend(data);
280 this.pendingSends_.push(send);
281 if (!this.waiter_.isWaiting())
282 this.waiter_.start();
283 return send.getPromise();
284 };
285
286 /**
287 * Requests the cancellation of any in-progress sends. Calls to
288 * [send()]{@link module:data_sender.DataSender#send} will fail until the
289 * cancel has completed.
290 * @param {number} error The error to report for cancelled sends.
291 * @return {Promise} A promise that will resolve when the cancel completes.
292 * @throws Will throw if this has encountered a fatal error or another cancel
293 * is in progress.
294 */
295 DataSender.prototype.cancel = function(error) {
296 if (this.shutDown_)
297 throw new Error('DataSender has been closed');
298 if (this.pendingCancel_)
299 throw new Error('Cancel already in progress');
300 if (this.pendingSends_.length + this.sendsAwaitingAck_.length == 0)
301 return Promise.resolve();
302
303 this.sink_.cancel(error);
304 return new Promise(function(resolve) {
305 this.pendingCancel_ = resolve;
306 }.bind(this));
307 };
308
309 /**
310 * Invoked when |handle_| is ready to write. Writes to the data pipe if the
311 * wait is successful.
312 * @param {number} waitResult The result of the asynchronous wait.
313 * @private
314 */
315 DataSender.prototype.onHandleReady_ = function(result) {
316 if (result != core.RESULT_OK) {
317 this.close();
318 return;
319 }
320 while (this.pendingSends_.length) {
321 var result = this.pendingSends_[0].sendData(this.sendPipe_);
322 if (result == core.RESULT_OK) {
323 this.sendsAwaitingAck_.push(this.pendingSends_.shift());
324 } else if (result == core.RESULT_SHOULD_WAIT) {
325 this.waiter_.start();
326 return;
327 } else {
328 this.close();
329 return;
330 }
331 }
332 };
333
334 /**
335 * Calls and clears the pending cancel callback if one is pending.
336 * @private
337 */
338 DataSender.prototype.callCancelCallback_ = function() {
339 if (this.pendingCancel_) {
340 this.pendingCancel_();
341 this.pendingCancel_ = null;
342 }
343 };
344
345 /**
346 * Invoked by the DataSink to report that data has been successfully sent.
347 * @param {number} numBytes The number of bytes sent.
348 * @private
349 */
350 DataSender.prototype.reportBytesSent = function(numBytes) {
351 while (numBytes > 0 && this.sendsAwaitingAck_.length) {
352 var result = this.sendsAwaitingAck_[0].reportBytesSent(numBytes);
353 numBytes = result.bytesUnreported;
354 if (result.done)
355 this.sendsAwaitingAck_.shift();
356 }
357 if (numBytes > 0 && this.pendingSends_.length) {
358 var result = this.pendingSends_[0].reportBytesSent(numBytes);
359 numBytes = result.bytesUnreported;
360 }
361 // A cancel is completed when all of the sends that were in progress have
362 // completed or failed. This is the case where all sends complete
363 // successfully.
364 if (this.pendingSends_.length + this.sendsAwaitingAck_.length == 0)
365 this.callCancelCallback_();
366 };
367
368 /**
369 * Invoked by the DataSink to report an error in sending data.
370 * @param {number} numBytes The number of bytes sent.
371 * @param {number} error The error reported by the DataSink.
372 * @private
373 */
374 DataSender.prototype.reportBytesSentAndError = function(numBytes, error) {
375 var bytesToFlush = 0;
376 while (this.sendsAwaitingAck_.length) {
377 var result = this.sendsAwaitingAck_[0].reportBytesSentAndError(
378 numBytes, error);
379 numBytes = result.bytesUnreported;
380 this.sendsAwaitingAck_.shift();
381 bytesToFlush += result.bytesToFlush;
382 }
383 while (this.pendingSends_.length) {
384 var result = this.pendingSends_[0].reportBytesSentAndError(
385 numBytes, error);
386 numBytes = result.bytesUnreported;
387 this.pendingSends_.shift();
388 // Note: Only the first PendingSend in |pendingSends_| will have data to
389 // flush as only the first can have written data to the data pipe.
390 bytesToFlush += result.bytesToFlush;
391 }
392 this.callCancelCallback_();
393 return Promise.resolve({bytes_to_flush: bytesToFlush});
394 };
395
396 return {DataSender: DataSender};
397 });
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698