Index: extensions/renderer/resources/data_receiver.js |
diff --git a/extensions/renderer/resources/data_receiver.js b/extensions/renderer/resources/data_receiver.js |
index b4fc5431f8a26a2f189313c7bb43e68053adab94..190b290d82c07a4196f0655e04d912c3fb705ff1 100644 |
--- a/extensions/renderer/resources/data_receiver.js |
+++ b/extensions/renderer/resources/data_receiver.js |
@@ -3,12 +3,11 @@ |
// found in the LICENSE file. |
define('data_receiver', [ |
- 'async_waiter', |
'device/serial/data_stream.mojom', |
'device/serial/data_stream_serialization.mojom', |
'mojo/public/js/bindings/core', |
'mojo/public/js/bindings/router', |
-], function(asyncWaiter, dataStream, serialization, core, router) { |
+], function(dataStream, serialization, core, router) { |
/** |
* @module data_receiver |
*/ |
@@ -65,8 +64,8 @@ define('data_receiver', [ |
* @param {!PendingReceiveError} error The error to dispatch. |
* @param {number} bytesReceived The number of bytes that have been received. |
*/ |
- PendingReceive.prototype.dispatchError = function(error, bytesReceived) { |
- if (bytesReceived != error.offset) |
+ PendingReceive.prototype.dispatchError = function(error) { |
+ if (error.queuePosition > 0) |
return false; |
var e = new Error(); |
@@ -88,22 +87,15 @@ define('data_receiver', [ |
/** |
* A DataReceiver that receives data from a DataSource. |
* @param {!MojoHandle} handle The handle to the DataSource. |
- * @param {number} bufferSize How large a buffer the data pipe should use. |
+ * @param {number} bufferSize How large a buffer to use. |
* @param {number} fatalErrorValue The receive error value to report in the |
* event of a fatal error. |
* @constructor |
* @alias module:data_receiver.DataReceiver |
*/ |
function DataReceiver(handle, bufferSize, fatalErrorValue) { |
- var dataPipeOptions = { |
- flags: core.CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, |
- elementNumBytes: 1, |
- capacityNumBytes: bufferSize, |
- }; |
- var receivePipe = core.createDataPipe(dataPipeOptions); |
- this.init_( |
- handle, receivePipe.consumerHandle, fatalErrorValue, 0, null, false); |
- this.source_.init(receivePipe.producerHandle); |
+ this.init_(handle, fatalErrorValue, 0, null, [], false); |
+ this.source_.init(bufferSize); |
} |
DataReceiver.prototype = |
@@ -117,8 +109,6 @@ define('data_receiver', [ |
return; |
this.shutDown_ = true; |
this.router_.close(); |
- this.waiter_.stop(); |
- core.close(this.receivePipe_); |
if (this.receive_) { |
this.receive_.dispatchFatalError(this.fatalErrorValue_); |
this.receive_ = null; |
@@ -128,8 +118,6 @@ define('data_receiver', [ |
/** |
* Initialize this DataReceiver. |
* @param {!MojoHandle} source A handle to the DataSource |
- * @param {!MojoHandle} dataPipe A handle to use for receiving data from the |
- * DataSource. |
* @param {number} fatalErrorValue The error to dispatch in the event of a |
* fatal error. |
* @param {number} bytesReceived The number of bytes already received. |
@@ -139,10 +127,10 @@ define('data_receiver', [ |
* @private |
*/ |
DataReceiver.prototype.init_ = function(source, |
- dataPipe, |
fatalErrorValue, |
bytesReceived, |
pendingError, |
+ pendingData, |
paused) { |
/** |
* The [Router]{@link module:mojo/public/js/bindings/router.Router} for the |
@@ -157,11 +145,6 @@ define('data_receiver', [ |
this.source_ = new dataStream.DataSource.proxyClass(this.router_); |
this.router_.setIncomingReceiver(this); |
/** |
- * The handle to the data pipe to use for receiving data. |
- * @private |
- */ |
- this.receivePipe_ = dataPipe; |
- /** |
* The current receive operation. |
* @type {module:data_receiver~PendingReceive} |
* @private |
@@ -174,22 +157,6 @@ define('data_receiver', [ |
*/ |
this.fatalErrorValue_ = fatalErrorValue; |
/** |
- * The async waiter used to wait for |
- * |[receivePipe_]{@link module:data_receiver.DataReceiver#receivePipe_}| to |
- * be readable. |
- * @type {!module:async_waiter.AsyncWaiter} |
- * @private |
- */ |
- this.waiter_ = new asyncWaiter.AsyncWaiter(this.receivePipe_, |
- core.HANDLE_SIGNAL_READABLE, |
- this.onHandleReady_.bind(this)); |
- /** |
- * The number of bytes received from the DataSource. |
- * @type {number} |
- * @private |
- */ |
- this.bytesReceived_ = bytesReceived; |
- /** |
* The pending error if there is one. |
* @type {PendingReceiveError} |
* @private |
@@ -202,6 +169,13 @@ define('data_receiver', [ |
*/ |
this.paused_ = paused; |
/** |
+ * A queue of data that has been received from the DataSource, but not |
+ * consumed by the client. |
+ * @type {module:data_receiver~PendingData[]} |
+ * @private |
+ */ |
+ this.pendingDataBuffers_ = pendingData; |
+ /** |
* Whether this DataReceiver has shut down. |
* @type {boolean} |
* @private |
@@ -220,18 +194,19 @@ define('data_receiver', [ |
if (this.shutDown_) |
return Promise.resolve(null); |
- this.waiter_.stop(); |
if (this.receive_) { |
this.receive_.dispatchFatalError(this.fatalErrorValue_); |
this.receive_ = null; |
} |
var serialized = new serialization.SerializedDataReceiver(); |
serialized.source = this.router_.connector_.handle_; |
- serialized.data_pipe = this.receivePipe_; |
serialized.fatal_error_value = this.fatalErrorValue_; |
- serialized.bytes_received = this.bytesReceived_; |
serialized.paused = this.paused_; |
serialized.pending_error = this.pendingError_; |
+ serialized.pending_data = []; |
+ $Array.forEach(this.pendingDataBuffers_, function(buffer) { |
+ serialized.pending_data.push(new Uint8Array(buffer)); |
+ }); |
this.router_.connector_.handle_ = null; |
this.router_.close(); |
this.shutDown_ = true; |
@@ -259,11 +234,17 @@ define('data_receiver', [ |
this.shutDown_ = true; |
return; |
} |
+ var pendingData = []; |
+ $Array.forEach(serialized.pending_data, function(data) { |
+ var buffer = new Uint8Array(data.length); |
+ buffer.set(data); |
+ pendingData.push(buffer.buffer); |
+ }); |
this.init_(serialized.source, |
- serialized.data_pipe, |
serialized.fatal_error_value, |
serialized.bytes_received, |
serialized.pending_error, |
+ pendingData, |
serialized.paused); |
}; |
@@ -283,7 +264,7 @@ define('data_receiver', [ |
var receive = new PendingReceive(); |
var promise = receive.getPromise(); |
if (this.pendingError_ && |
- receive.dispatchError(this.pendingError_, this.bytesReceived_)) { |
+ receive.dispatchError(this.pendingError_)) { |
this.pendingError_ = null; |
this.paused_ = true; |
return promise; |
@@ -293,32 +274,21 @@ define('data_receiver', [ |
this.paused_ = false; |
} |
this.receive_ = receive; |
- this.waiter_.start(); |
+ this.dispatchData_(); |
return promise; |
}; |
- /** |
- * Invoked when |
- * |[receivePipe_]{@link module:data_receiver.DataReceiver#receivePipe_}| is |
- * ready to read. Reads from the data pipe if the wait is successful. |
- * @param {number} waitResult The result of the asynchronous wait. |
- * @private |
- */ |
- DataReceiver.prototype.onHandleReady_ = function(waitResult) { |
- if (waitResult != core.RESULT_OK || !this.receive_) { |
+ DataReceiver.prototype.dispatchData_ = function() { |
+ if (!this.receive_) { |
this.close(); |
return; |
} |
- var result = core.readData(this.receivePipe_, core.READ_DATA_FLAG_NONE); |
- if (result.result == core.RESULT_OK) { |
- // TODO(sammc): Handle overflow in the same fashion as the C++ receiver. |
- this.bytesReceived_ += result.buffer.byteLength; |
- this.receive_.dispatchData(result.buffer); |
+ if (this.pendingDataBuffers_.length) { |
+ this.receive_.dispatchData(this.pendingDataBuffers_[0]); |
this.receive_ = null; |
- } else if (result.result == core.RESULT_SHOULD_WAIT) { |
- this.waiter_.start(); |
- } else { |
- this.close(); |
+ this.pendingDataBuffers_.shift(); |
+ if (this.pendingError_) |
+ this.pendingError_.queuePosition--; |
} |
}; |
@@ -328,22 +298,29 @@ define('data_receiver', [ |
* @param {number} error The error that occurred. |
* @private |
*/ |
- DataReceiver.prototype.onError = function(offset, error) { |
+ DataReceiver.prototype.onError = function(error) { |
if (this.shutDown_) |
return; |
var pendingError = new serialization.PendingReceiveError(); |
pendingError.error = error; |
- pendingError.offset = offset; |
- if (this.receive_ && |
- this.receive_.dispatchError(pendingError, this.bytesReceived_)) { |
+ pendingError.queuePosition = this.pendingDataBuffers_.length; |
+ if (this.receive_ && this.receive_.dispatchError(pendingError)) { |
this.receive_ = null; |
- this.waiter_.stop(); |
this.paused_ = true; |
return; |
} |
this.pendingError_ = pendingError; |
}; |
+ DataReceiver.prototype.onData = function(data) { |
+ var buffer = new ArrayBuffer(data.length); |
+ var uintView = new Uint8Array(buffer); |
+ uintView.set(data); |
+ this.pendingDataBuffers_.push(buffer); |
+ if (this.receive_) |
+ this.dispatchData_(); |
+ }; |
+ |
return {DataReceiver: DataReceiver}; |
}); |