Index: extensions/renderer/resources/data_sender.js |
diff --git a/extensions/renderer/resources/data_sender.js b/extensions/renderer/resources/data_sender.js |
index 4b77a25f7ba7a35349aeaf8a0c2f984ed5ce7ed4..2e12afaa01d44e3cb6755fc439a65b16c84c9c8a 100644 |
--- a/extensions/renderer/resources/data_sender.js |
+++ b/extensions/renderer/resources/data_sender.js |
@@ -3,12 +3,11 @@ |
// found in the LICENSE file. |
define('data_sender', [ |
- 'async_waiter', |
'device/serial/data_stream.mojom', |
'device/serial/data_stream_serialization.mojom', |
'mojo/public/js/bindings/core', |
'mojo/public/js/bindings/router', |
-], function(asyncWaiter, dataStreamMojom, serialization, core, routerModule) { |
+], function(dataStreamMojom, serialization, core, routerModule) { |
/** |
* @module data_sender |
*/ |
@@ -144,41 +143,38 @@ define('data_sender', [ |
/** |
* Writes pending data into the data pipe. |
- * @param {!MojoHandle} handle The handle to the data pipe. |
- * @return {number} The Mojo result corresponding to the outcome: |
- * <ul> |
- * <li>RESULT_OK if the write completes successfully; |
- * <li>RESULT_SHOULD_WAIT if some, but not all data was written; or |
- * <li>the data pipe error if the write failed. |
- * </ul> |
+ * @param {!DataSink} sink The DataSink to receive the data. |
+ * @param {number} availableBufferCapacity The maximum number of bytes to |
+ * send. |
+ * @return {object} result The send result. |
Ken Rockot(use gerrit already)
2014/10/28 04:59:33
nit: !Object
Sam McNally
2014/10/28 05:11:11
Done.
|
+ * @return {boolean} result.completed Whether all of the pending data was |
+ * sent. |
+ * @return {number} result.remainingBufferCapacity The remaining send buffer |
+ * capacity. |
*/ |
- PendingSend.prototype.sendData = function(handle) { |
- var result = core.writeData( |
- handle, new Int8Array(this.data_), core.WRITE_DATA_FLAG_NONE); |
- if (result.result != core.RESULT_OK) |
- return result.result; |
- this.data_ = this.data_.slice(result.numBytes); |
- return this.data_.byteLength ? core.RESULT_SHOULD_WAIT : core.RESULT_OK; |
+ PendingSend.prototype.sendData = function(sink, availableBufferCapacity) { |
+ var numBytesToSend = |
+ Math.min(availableBufferCapacity, this.data_.byteLength); |
+ sink.onData(new Uint8Array(this.data_, 0, numBytesToSend)); |
+ this.data_ = this.data_.slice(numBytesToSend); |
+ return { |
+ completed: this.data_.byteLength == 0, |
+ remainingBufferCapacity: availableBufferCapacity - numBytesToSend, |
+ }; |
}; |
/** |
* A DataSender that sends data to a DataSink. |
* @param {!MojoHandle} handle The handle to the DataSink. |
- * @param {number} bufferSize How large a buffer the data pipe should use. |
+ * @param {number} bufferSize How large a buffer to use for data. |
* @param {number} fatalErrorValue The send error value to report in the |
* event of a fatal error. |
* @constructor |
* @alias module:data_sender.DataSender |
*/ |
function DataSender(handle, bufferSize, fatalErrorValue) { |
- var dataPipeOptions = { |
- flags: core.CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, |
- elementNumBytes: 1, |
- capacityNumBytes: bufferSize, |
- }; |
- var sendPipe = core.createDataPipe(dataPipeOptions); |
- this.init_(handle, sendPipe.producerHandle, fatalErrorValue); |
- this.sink_.init(sendPipe.consumerHandle); |
+ this.init_(handle, fatalErrorValue, bufferSize); |
+ this.sink_.init(bufferSize); |
} |
DataSender.prototype = |
@@ -191,9 +187,7 @@ define('data_sender', [ |
if (this.shutDown_) |
return; |
this.shutDown_ = true; |
- this.waiter_.stop(); |
this.router_.close(); |
- core.close(this.sendPipe_); |
while (this.pendingSends_.length) { |
this.pendingSends_.pop().reportBytesSentAndError( |
0, this.fatalErrorValue_); |
@@ -208,18 +202,12 @@ define('data_sender', [ |
/** |
* Initialize this DataSender. |
* @param {!MojoHandle} sink A handle to the DataSink |
- * @param {!MojoHandle} dataPipe A handle to use for sending data to the |
- * DataSink. |
* @param {number} fatalErrorValue The error to dispatch in the event of a |
* fatal error. |
+ * @param {number} bufferSize The size of the send buffer. |
* @private |
*/ |
- DataSender.prototype.init_ = function(sink, dataPipe, fatalErrorValue) { |
- /** |
- * The handle to the data pipe to use for sending data. |
- * @private |
- */ |
- this.sendPipe_ = dataPipe; |
+ DataSender.prototype.init_ = function(sink, fatalErrorValue, bufferSize) { |
/** |
* The error to be dispatched in the event of a fatal error. |
* @const {number} |
@@ -245,23 +233,14 @@ define('data_sender', [ |
this.sink_ = new dataStreamMojom.DataSink.proxyClass(this.router_); |
this.router_.setIncomingReceiver(this); |
/** |
- * The async waiter used to wait for |
- * {@link module:data_sender.DataSender#sendPipe_} to be writable. |
- * @type {!module:async_waiter.AsyncWaiter} |
- * @private |
- */ |
- this.waiter_ = new asyncWaiter.AsyncWaiter( |
- this.sendPipe_, core.HANDLE_SIGNAL_WRITABLE, |
- this.onHandleReady_.bind(this)); |
- /** |
- * A queue of sends that have not fully written their data to the data pipe. |
+ * A queue of sends that have not fully sent their data to the DataSink. |
* @type {!module:data_sender~PendingSend[]} |
* @private |
*/ |
this.pendingSends_ = []; |
/** |
- * A queue of sends that have written their data to the data pipe, but have |
- * not been received by the DataSink. |
+ * A queue of sends that have sent their data to the DataSink, but have not |
+ * been received by the DataSink. |
* @type {!module:data_sender~PendingSend[]} |
* @private |
*/ |
@@ -281,6 +260,12 @@ define('data_sender', [ |
* @private |
*/ |
this.cancelPromise_ = null; |
+ /** |
+ * The available send buffer capacity. |
+ * @type {number} |
+ * @private |
+ */ |
+ this.availableBufferCapacity_ = bufferSize; |
}; |
/** |
@@ -303,11 +288,10 @@ define('data_sender', [ |
readyToSerialize = this.cancel(this.fatalErrorValue_); |
} |
return readyToSerialize.then(function() { |
- this.waiter_.stop(); |
var serialized = new serialization.SerializedDataSender(); |
- serialized.sink = this.router_.connector_.handle_, |
- serialized.data_pipe = this.sendPipe_, |
- serialized.fatal_error_value = this.fatalErrorValue_, |
+ serialized.sink = this.router_.connector_.handle_; |
+ serialized.fatal_error_value = this.fatalErrorValue_; |
+ serialized.buffer_size = this.availableBufferCapacity_; |
this.router_.connector_.handle_ = null; |
this.router_.close(); |
this.shutDown_ = true; |
@@ -337,7 +321,7 @@ define('data_sender', [ |
return; |
} |
this.init_( |
- serialized.sink, serialized.data_pipe, serialized.fatal_error_value); |
+ serialized.sink, serialized.fatal_error_value, serialized.buffer_size); |
}; |
/** |
@@ -355,11 +339,21 @@ define('data_sender', [ |
throw new Error('Cancel in progress'); |
var send = new PendingSend(data); |
this.pendingSends_.push(send); |
- if (!this.waiter_.isWaiting()) |
- this.waiter_.start(); |
+ this.sendInternal_(); |
return send.getPromise(); |
}; |
+ DataSender.prototype.sendInternal_ = function() { |
+ while (this.pendingSends_.length && this.availableBufferCapacity_) { |
+ var result = this.pendingSends_[0].sendData( |
+ this.sink_, this.availableBufferCapacity_); |
+ this.availableBufferCapacity_ = result.remainingBufferCapacity; |
+ if (result.completed) { |
+ this.sendsAwaitingAck_.push(this.pendingSends_.shift()); |
+ } |
+ } |
+ }; |
+ |
/** |
* Requests the cancellation of any in-progress sends. Calls to |
* [send()]{@link module:data_sender.DataSender#send} will fail until the |
@@ -385,32 +379,6 @@ define('data_sender', [ |
}; |
/** |
- * Invoked when |
- * |[sendPipe_]{@link module:data_sender.DataSender#sendPipe_}| is ready to |
- * write. Writes to the data pipe if the wait is successful. |
- * @param {number} waitResult The result of the asynchronous wait. |
- * @private |
- */ |
- DataSender.prototype.onHandleReady_ = function(result) { |
- if (result != core.RESULT_OK) { |
- this.close(); |
- return; |
- } |
- while (this.pendingSends_.length) { |
- var result = this.pendingSends_[0].sendData(this.sendPipe_); |
- if (result == core.RESULT_OK) { |
- this.sendsAwaitingAck_.push(this.pendingSends_.shift()); |
- } else if (result == core.RESULT_SHOULD_WAIT) { |
- this.waiter_.start(); |
- return; |
- } else { |
- this.close(); |
- return; |
- } |
- } |
- }; |
- |
- /** |
* Calls and clears the pending cancel callback if one is pending. |
* @private |
*/ |
@@ -428,6 +396,7 @@ define('data_sender', [ |
* @private |
*/ |
DataSender.prototype.reportBytesSent = function(numBytes) { |
+ this.availableBufferCapacity_ += numBytes; |
while (numBytes > 0 && this.sendsAwaitingAck_.length) { |
var result = this.sendsAwaitingAck_[0].reportBytesSent(numBytes); |
numBytes = result.bytesUnreported; |
@@ -443,6 +412,8 @@ define('data_sender', [ |
// successfully. |
if (this.pendingSends_.length + this.sendsAwaitingAck_.length == 0) |
this.callCancelCallback_(); |
+ |
+ this.sendInternal_(); |
}; |
/** |
@@ -452,13 +423,13 @@ define('data_sender', [ |
* @private |
*/ |
DataSender.prototype.reportBytesSentAndError = function(numBytes, error) { |
- var bytesToFlush = 0; |
+ this.availableBufferCapacity_ += numBytes; |
while (this.sendsAwaitingAck_.length) { |
var result = this.sendsAwaitingAck_[0].reportBytesSentAndError( |
numBytes, error); |
numBytes = result.bytesUnreported; |
this.sendsAwaitingAck_.shift(); |
- bytesToFlush += result.bytesToFlush; |
+ this.availableBufferCapacity_ += result.bytesToFlush; |
} |
while (this.pendingSends_.length) { |
var result = this.pendingSends_[0].reportBytesSentAndError( |
@@ -466,11 +437,11 @@ define('data_sender', [ |
numBytes = result.bytesUnreported; |
this.pendingSends_.shift(); |
// Note: Only the first PendingSend in |pendingSends_| will have data to |
- // flush as only the first can have written data to the data pipe. |
- bytesToFlush += result.bytesToFlush; |
+ // flush as only the first can have sent data to the DataSink. |
+ this.availableBufferCapacity_ += result.bytesToFlush; |
} |
this.callCancelCallback_(); |
- return Promise.resolve({bytes_to_flush: bytesToFlush}); |
+ return Promise.resolve(); |
}; |
return {DataSender: DataSender}; |