Chromium Code Reviews| Index: extensions/renderer/resources/data_sender.js |
| diff --git a/extensions/renderer/resources/data_sender.js b/extensions/renderer/resources/data_sender.js |
| index 63c97c0ef31a2df2848764aebd72f5e269e4e8a8..2c830e26faa5870d7d10009d23409c23d2f8cbfa 100644 |
| --- a/extensions/renderer/resources/data_sender.js |
| +++ b/extensions/renderer/resources/data_sender.js |
| @@ -3,12 +3,11 @@ |
| // found in the LICENSE file. |
| define('data_sender', [ |
| - 'async_waiter', |
| 'device/serial/data_stream.mojom', |
| 'device/serial/data_stream_serialization.mojom', |
| 'mojo/public/js/bindings/core', |
| 'mojo/public/js/bindings/router', |
| -], function(asyncWaiter, dataStreamMojom, serialization, core, routerModule) { |
| +], function(dataStreamMojom, serialization, core, routerModule) { |
| /** |
| * @module data_sender |
| */ |
| @@ -144,7 +143,7 @@ define('data_sender', [ |
| /** |
| * Writes pending data into the data pipe. |
| - * @param {!MojoHandle} handle The handle to the data pipe. |
| + * @param {!DataSink} sink The DataSink to receive the data. |
| * @return {number} The Mojo result corresponding to the outcome: |
| * <ul> |
| * <li>RESULT_OK if the write completes successfully; |
| @@ -152,33 +151,29 @@ define('data_sender', [ |
| * <li>the data pipe error if the write failed. |
| * </ul> |
| */ |
| - PendingSend.prototype.sendData = function(handle) { |
| - var result = core.writeData( |
| - handle, new Int8Array(this.data_), core.WRITE_DATA_FLAG_NONE); |
| - if (result.result != core.RESULT_OK) |
| - return result.result; |
| - this.data_ = this.data_.slice(result.numBytes); |
| - return this.data_.byteLength ? core.RESULT_SHOULD_WAIT : core.RESULT_OK; |
| + PendingSend.prototype.sendData = function(sink, availableBufferCapacity) { |
|
raymes
2014/10/27 03:02:23
Please comment availableBufferCapacity
Sam McNally
2014/10/27 05:39:14
Done.
|
| + var numBytesToSend = |
| + Math.min(availableBufferCapacity, this.data_.byteLength); |
| + sink.onData(new Uint8Array(this.data_, 0, numBytesToSend)); |
| + this.data_ = this.data_.slice(numBytesToSend); |
| + return { |
| + completed: this.data_.byteLength == 0, |
| + remainingBufferCapacity: availableBufferCapacity - numBytesToSend, |
| + }; |
|
raymes
2014/10/27 03:02:23
This no longer returns a number either.
Sam McNally
2014/10/27 05:39:14
Done.
|
| }; |
| /** |
| * A DataSender that sends data to a DataSink. |
| * @param {!MojoHandle} handle The handle to the DataSink. |
| - * @param {number} bufferSize How large a buffer the data pipe should use. |
| + * @param {number} bufferSize How large a buffer to use for data. |
| * @param {number} fatalErrorValue The send error value to report in the |
| * event of a fatal error. |
| * @constructor |
| * @alias module:data_sender.DataSender |
| */ |
| function DataSender(handle, bufferSize, fatalErrorValue) { |
| - var dataPipeOptions = { |
| - flags: core.CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, |
| - elementNumBytes: 1, |
| - capacityNumBytes: bufferSize, |
| - }; |
| - var sendPipe = core.createDataPipe(dataPipeOptions); |
| - this.init_(handle, sendPipe.producerHandle, fatalErrorValue); |
| - this.sink_.init(sendPipe.consumerHandle); |
| + this.init_(handle, fatalErrorValue, bufferSize); |
| + this.sink_.init(bufferSize); |
| } |
| DataSender.prototype = |
| @@ -191,9 +186,7 @@ define('data_sender', [ |
| if (this.shutDown_) |
| return; |
| this.shutDown_ = true; |
| - this.waiter_.stop(); |
| this.router_.close(); |
| - core.close(this.sendPipe_); |
| while (this.pendingSends_.length) { |
| this.pendingSends_.pop().reportBytesSentAndError( |
| 0, this.fatalErrorValue_); |
| @@ -208,18 +201,12 @@ define('data_sender', [ |
| /** |
| * Initialize this DataSender. |
| * @param {!MojoHandle} sink A handle to the DataSink |
| - * @param {!MojoHandle} dataPipe A handle to use for sending data to the |
| - * DataSink. |
| * @param {number} fatalErrorValue The error to dispatch in the event of a |
| * fatal error. |
| + * @param {number} bufferSize The size of the send buffer. |
| * @private |
| */ |
| - DataSender.prototype.init_ = function(sink, dataPipe, fatalErrorValue) { |
| - /** |
| - * The handle to the data pipe to use for sending data. |
| - * @private |
| - */ |
| - this.sendPipe_ = dataPipe; |
| + DataSender.prototype.init_ = function(sink, fatalErrorValue, bufferSize) { |
| /** |
| * The error to be dispatched in the event of a fatal error. |
| * @const {number} |
| @@ -245,23 +232,14 @@ define('data_sender', [ |
| this.sink_ = new dataStreamMojom.DataSink.proxyClass(this.router_); |
| this.router_.setIncomingReceiver(this); |
| /** |
| - * The async waiter used to wait for |
| - * {@link module:data_sender.DataSender#sendPipe_} to be writable. |
| - * @type {!module:async_waiter.AsyncWaiter} |
| - * @private |
| - */ |
| - this.waiter_ = new asyncWaiter.AsyncWaiter( |
| - this.sendPipe_, core.HANDLE_SIGNAL_WRITABLE, |
| - this.onHandleReady_.bind(this)); |
| - /** |
| - * A queue of sends that have not fully written their data to the data pipe. |
| + * A queue of sends that have not fully sent their data to the DataSink. |
| * @type {!module:data_sender~PendingSend[]} |
| * @private |
| */ |
| this.pendingSends_ = []; |
| /** |
| - * A queue of sends that have written their data to the data pipe, but have |
| - * not been received by the DataSink. |
| + * A queue of sends that have sent their data to the DataSink, but have not |
| + * been received by the DataSink. |
| * @type {!module:data_sender~PendingSend[]} |
| * @private |
| */ |
| @@ -281,6 +259,12 @@ define('data_sender', [ |
| * @private |
| */ |
| this.cancelPromise_ = null; |
| + /** |
| + * The available send buffer capacity. |
| + * @type {number} |
| + * @private |
| + */ |
| + this.availableBufferCapacity_ = bufferSize; |
| }; |
| /** |
| @@ -296,18 +280,17 @@ define('data_sender', [ |
| return Promise.resolve(null); |
| var readyToSerialize = Promise.resolve(); |
| - if (this.pendingSends_.length) { |
| + if (this.pendingSends_.length || this.sendsAwaitingAck_.length) { |
|
raymes
2014/10/27 03:02:23
Was this a bug before?
Sam McNally
2014/10/27 05:39:14
Yes.
raymes
2014/10/27 22:57:29
We need to make sure it gets merged when we revert
Sam McNally
2014/10/27 23:33:57
Done.
|
| if (this.pendingCancel_) |
| readyToSerialize = this.cancelPromise_; |
| else |
| readyToSerialize = this.cancel(this.fatalErrorValue_); |
| } |
| return readyToSerialize.then(function() { |
| - this.waiter_.stop(); |
| var serialized = new serialization.SerializedDataSender(); |
| - serialized.sink = this.router_.connector_.handle_, |
| - serialized.data_pipe = this.sendPipe_, |
| - serialized.fatal_error_value = this.fatalErrorValue_, |
| + serialized.sink = this.router_.connector_.handle_; |
| + serialized.fatal_error_value = this.fatalErrorValue_; |
| + serialized.buffer_size = this.availableBufferCapacity_; |
| this.router_.connector_.handle_ = null; |
| this.router_.close(); |
| this.shutDown_ = true; |
| @@ -337,7 +320,7 @@ define('data_sender', [ |
| return; |
| } |
| this.init_( |
| - serialized.sink, serialized.data_pipe, serialized.fatal_error_value); |
| + serialized.sink, serialized.fatal_error_value, serialized.buffer_size); |
| }; |
| /** |
| @@ -355,11 +338,21 @@ define('data_sender', [ |
| throw new Error('Cancel in progress'); |
| var send = new PendingSend(data); |
| this.pendingSends_.push(send); |
| - if (!this.waiter_.isWaiting()) |
| - this.waiter_.start(); |
| + this.sendInternal_(); |
| return send.getPromise(); |
| }; |
| + DataSender.prototype.sendInternal_ = function() { |
| + while (this.pendingSends_.length && this.availableBufferCapacity_) { |
| + var result = this.pendingSends_[0].sendData( |
| + this.sink_, this.availableBufferCapacity_); |
| + this.availableBufferCapacity_ = result.remainingBufferCapacity; |
| + if (result.completed) { |
| + this.sendsAwaitingAck_.push(this.pendingSends_.shift()); |
| + } |
| + } |
| + } |
| + |
| /** |
| * Requests the cancellation of any in-progress sends. Calls to |
| * [send()]{@link module:data_sender.DataSender#send} will fail until the |
| @@ -385,32 +378,6 @@ define('data_sender', [ |
| }; |
| /** |
| - * Invoked when |
| - * |[sendPipe_]{@link module:data_sender.DataSender#sendPipe_}| is ready to |
| - * write. Writes to the data pipe if the wait is successful. |
| - * @param {number} waitResult The result of the asynchronous wait. |
| - * @private |
| - */ |
| - DataSender.prototype.onHandleReady_ = function(result) { |
| - if (result != core.RESULT_OK) { |
| - this.close(); |
| - return; |
| - } |
| - while (this.pendingSends_.length) { |
| - var result = this.pendingSends_[0].sendData(this.sendPipe_); |
| - if (result == core.RESULT_OK) { |
| - this.sendsAwaitingAck_.push(this.pendingSends_.shift()); |
| - } else if (result == core.RESULT_SHOULD_WAIT) { |
| - this.waiter_.start(); |
| - return; |
| - } else { |
| - this.close(); |
| - return; |
| - } |
| - } |
| - }; |
| - |
| - /** |
| * Calls and clears the pending cancel callback if one is pending. |
| * @private |
| */ |
| @@ -428,6 +395,7 @@ define('data_sender', [ |
| * @private |
| */ |
| DataSender.prototype.reportBytesSent = function(numBytes) { |
| + this.availableBufferCapacity_ += numBytes; |
|
raymes
2014/10/27 03:02:23
Should this be -=? I'm not sure if we should test
Sam McNally
2014/10/27 05:39:14
No. Buffer space is freed up as the data is passed
raymes
2014/10/27 22:57:29
I don't see where the availableBufferCapacity_ is
Sam McNally
2014/10/27 23:33:57
It's set in sendInternal_().
|
| while (numBytes > 0 && this.sendsAwaitingAck_.length) { |
| var result = this.sendsAwaitingAck_[0].reportBytesSent(numBytes); |
| numBytes = result.bytesUnreported; |
| @@ -443,6 +411,8 @@ define('data_sender', [ |
| // successfully. |
| if (this.pendingSends_.length + this.sendsAwaitingAck_.length == 0) |
| this.callCancelCallback_(); |
| + |
| + this.sendInternal_(); |
| }; |
| /** |
| @@ -452,13 +422,13 @@ define('data_sender', [ |
| * @private |
| */ |
| DataSender.prototype.reportBytesSentAndError = function(numBytes, error) { |
| - var bytesToFlush = 0; |
| + this.availableBufferCapacity_ += numBytes; |
| while (this.sendsAwaitingAck_.length) { |
| var result = this.sendsAwaitingAck_[0].reportBytesSentAndError( |
| numBytes, error); |
| numBytes = result.bytesUnreported; |
| this.sendsAwaitingAck_.shift(); |
| - bytesToFlush += result.bytesToFlush; |
| + this.availableBufferCapacity_ += result.bytesToFlush; |
| } |
| while (this.pendingSends_.length) { |
| var result = this.pendingSends_[0].reportBytesSentAndError( |
| @@ -466,11 +436,11 @@ define('data_sender', [ |
| numBytes = result.bytesUnreported; |
| this.pendingSends_.shift(); |
| // Note: Only the first PendingSend in |pendingSends_| will have data to |
| - // flush as only the first can have written data to the data pipe. |
| - bytesToFlush += result.bytesToFlush; |
| + // flush as only the first can have sent data to the DataSink. |
| + this.availableBufferCapacity_ += result.bytesToFlush; |
| } |
| this.callCancelCallback_(); |
| - return Promise.resolve({bytes_to_flush: bytesToFlush}); |
| + return Promise.resolve(); |
| }; |
| return {DataSender: DataSender}; |