Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 define('data_sender', [ | 5 define('data_sender', [ |
| 6 'async_waiter', | |
| 7 'device/serial/data_stream.mojom', | 6 'device/serial/data_stream.mojom', |
| 8 'device/serial/data_stream_serialization.mojom', | 7 'device/serial/data_stream_serialization.mojom', |
| 9 'mojo/public/js/bindings/core', | 8 'mojo/public/js/bindings/core', |
| 10 'mojo/public/js/bindings/router', | 9 'mojo/public/js/bindings/router', |
| 11 ], function(asyncWaiter, dataStreamMojom, serialization, core, routerModule) { | 10 ], function(dataStreamMojom, serialization, core, routerModule) { |
| 12 /** | 11 /** |
| 13 * @module data_sender | 12 * @module data_sender |
| 14 */ | 13 */ |
| 15 | 14 |
| 16 /** | 15 /** |
| 17 * A pending send operation. | 16 * A pending send operation. |
| 18 * @param {!ArrayBuffer} data The data to be sent. | 17 * @param {!ArrayBuffer} data The data to be sent. |
| 19 * @constructor | 18 * @constructor |
| 20 * @alias module:data_sender~PendingSend | 19 * @alias module:data_sender~PendingSend |
| 21 * @private | 20 * @private |
| (...skipping 115 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 137 if (this.bytesReceivedBySink_ > this.length_) { | 136 if (this.bytesReceivedBySink_ > this.length_) { |
| 138 result.bytesUnreported = this.bytesReceivedBySink_ - this.length_; | 137 result.bytesUnreported = this.bytesReceivedBySink_ - this.length_; |
| 139 this.bytesReceivedBySink_ = this.length_; | 138 this.bytesReceivedBySink_ = this.length_; |
| 140 } | 139 } |
| 141 result.done = false; | 140 result.done = false; |
| 142 return result; | 141 return result; |
| 143 }; | 142 }; |
| 144 | 143 |
| 145 /** | 144 /** |
| 146 * Writes pending data into the data pipe. | 145 * Writes pending data into the data pipe. |
| 147 * @param {!MojoHandle} handle The handle to the data pipe. | 146 * @param {!DataSink} sink The DataSink to receive the data. |
| 148 * @return {number} The Mojo result corresponding to the outcome: | 147 * @return {number} The Mojo result corresponding to the outcome: |
| 149 * <ul> | 148 * <ul> |
| 150 * <li>RESULT_OK if the write completes successfully; | 149 * <li>RESULT_OK if the write completes successfully; |
| 151 * <li>RESULT_SHOULD_WAIT if some, but not all data was written; or | 150 * <li>RESULT_SHOULD_WAIT if some, but not all data was written; or |
| 152 * <li>the data pipe error if the write failed. | 151 * <li>the data pipe error if the write failed. |
| 153 * </ul> | 152 * </ul> |
| 154 */ | 153 */ |
| 155 PendingSend.prototype.sendData = function(handle) { | 154 PendingSend.prototype.sendData = function(sink, availableBufferCapacity) { |
|
raymes
2014/10/27 03:02:23
Please comment availableBufferCapacity
Sam McNally
2014/10/27 05:39:14
Done.
| |
| 156 var result = core.writeData( | 155 var numBytesToSend = |
| 157 handle, new Int8Array(this.data_), core.WRITE_DATA_FLAG_NONE); | 156 Math.min(availableBufferCapacity, this.data_.byteLength); |
| 158 if (result.result != core.RESULT_OK) | 157 sink.onData(new Uint8Array(this.data_, 0, numBytesToSend)); |
| 159 return result.result; | 158 this.data_ = this.data_.slice(numBytesToSend); |
| 160 this.data_ = this.data_.slice(result.numBytes); | 159 return { |
| 161 return this.data_.byteLength ? core.RESULT_SHOULD_WAIT : core.RESULT_OK; | 160 completed: this.data_.byteLength == 0, |
| 161 remainingBufferCapacity: availableBufferCapacity - numBytesToSend, | |
| 162 }; | |
|
raymes
2014/10/27 03:02:23
This no longer returns a number either.
Sam McNally
2014/10/27 05:39:14
Done.
| |
| 162 }; | 163 }; |
| 163 | 164 |
| 164 /** | 165 /** |
| 165 * A DataSender that sends data to a DataSink. | 166 * A DataSender that sends data to a DataSink. |
| 166 * @param {!MojoHandle} handle The handle to the DataSink. | 167 * @param {!MojoHandle} handle The handle to the DataSink. |
| 167 * @param {number} bufferSize How large a buffer the data pipe should use. | 168 * @param {number} bufferSize How large a buffer to use for data. |
| 168 * @param {number} fatalErrorValue The send error value to report in the | 169 * @param {number} fatalErrorValue The send error value to report in the |
| 169 * event of a fatal error. | 170 * event of a fatal error. |
| 170 * @constructor | 171 * @constructor |
| 171 * @alias module:data_sender.DataSender | 172 * @alias module:data_sender.DataSender |
| 172 */ | 173 */ |
| 173 function DataSender(handle, bufferSize, fatalErrorValue) { | 174 function DataSender(handle, bufferSize, fatalErrorValue) { |
| 174 var dataPipeOptions = { | 175 this.init_(handle, fatalErrorValue, bufferSize); |
| 175 flags: core.CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, | 176 this.sink_.init(bufferSize); |
| 176 elementNumBytes: 1, | |
| 177 capacityNumBytes: bufferSize, | |
| 178 }; | |
| 179 var sendPipe = core.createDataPipe(dataPipeOptions); | |
| 180 this.init_(handle, sendPipe.producerHandle, fatalErrorValue); | |
| 181 this.sink_.init(sendPipe.consumerHandle); | |
| 182 } | 177 } |
| 183 | 178 |
| 184 DataSender.prototype = | 179 DataSender.prototype = |
| 185 $Object.create(dataStreamMojom.DataSinkClient.stubClass.prototype); | 180 $Object.create(dataStreamMojom.DataSinkClient.stubClass.prototype); |
| 186 | 181 |
| 187 /** | 182 /** |
| 188 * Closes this DataSender. | 183 * Closes this DataSender. |
| 189 */ | 184 */ |
| 190 DataSender.prototype.close = function() { | 185 DataSender.prototype.close = function() { |
| 191 if (this.shutDown_) | 186 if (this.shutDown_) |
| 192 return; | 187 return; |
| 193 this.shutDown_ = true; | 188 this.shutDown_ = true; |
| 194 this.waiter_.stop(); | |
| 195 this.router_.close(); | 189 this.router_.close(); |
| 196 core.close(this.sendPipe_); | |
| 197 while (this.pendingSends_.length) { | 190 while (this.pendingSends_.length) { |
| 198 this.pendingSends_.pop().reportBytesSentAndError( | 191 this.pendingSends_.pop().reportBytesSentAndError( |
| 199 0, this.fatalErrorValue_); | 192 0, this.fatalErrorValue_); |
| 200 } | 193 } |
| 201 while (this.sendsAwaitingAck_.length) { | 194 while (this.sendsAwaitingAck_.length) { |
| 202 this.sendsAwaitingAck_.pop().reportBytesSentAndError( | 195 this.sendsAwaitingAck_.pop().reportBytesSentAndError( |
| 203 0, this.fatalErrorValue_); | 196 0, this.fatalErrorValue_); |
| 204 } | 197 } |
| 205 this.callCancelCallback_(); | 198 this.callCancelCallback_(); |
| 206 }; | 199 }; |
| 207 | 200 |
| 208 /** | 201 /** |
| 209 * Initialize this DataSender. | 202 * Initialize this DataSender. |
| 210 * @param {!MojoHandle} sink A handle to the DataSink | 203 * @param {!MojoHandle} sink A handle to the DataSink |
| 211 * @param {!MojoHandle} dataPipe A handle to use for sending data to the | |
| 212 * DataSink. | |
| 213 * @param {number} fatalErrorValue The error to dispatch in the event of a | 204 * @param {number} fatalErrorValue The error to dispatch in the event of a |
| 214 * fatal error. | 205 * fatal error. |
| 206 * @param {number} bufferSize The size of the send buffer. | |
| 215 * @private | 207 * @private |
| 216 */ | 208 */ |
| 217 DataSender.prototype.init_ = function(sink, dataPipe, fatalErrorValue) { | 209 DataSender.prototype.init_ = function(sink, fatalErrorValue, bufferSize) { |
| 218 /** | |
| 219 * The handle to the data pipe to use for sending data. | |
| 220 * @private | |
| 221 */ | |
| 222 this.sendPipe_ = dataPipe; | |
| 223 /** | 210 /** |
| 224 * The error to be dispatched in the event of a fatal error. | 211 * The error to be dispatched in the event of a fatal error. |
| 225 * @const {number} | 212 * @const {number} |
| 226 * @private | 213 * @private |
| 227 */ | 214 */ |
| 228 this.fatalErrorValue_ = fatalErrorValue; | 215 this.fatalErrorValue_ = fatalErrorValue; |
| 229 /** | 216 /** |
| 230 * Whether this DataSender has shut down. | 217 * Whether this DataSender has shut down. |
| 231 * @type {boolean} | 218 * @type {boolean} |
| 232 * @private | 219 * @private |
| 233 */ | 220 */ |
| 234 this.shutDown_ = false; | 221 this.shutDown_ = false; |
| 235 /** | 222 /** |
| 236 * The [Router]{@link module:mojo/public/js/bindings/router.Router} for the | 223 * The [Router]{@link module:mojo/public/js/bindings/router.Router} for the |
| 237 * connection to the DataSink. | 224 * connection to the DataSink. |
| 238 * @private | 225 * @private |
| 239 */ | 226 */ |
| 240 this.router_ = new routerModule.Router(sink); | 227 this.router_ = new routerModule.Router(sink); |
| 241 /** | 228 /** |
| 242 * The connection to the DataSink. | 229 * The connection to the DataSink. |
| 243 * @private | 230 * @private |
| 244 */ | 231 */ |
| 245 this.sink_ = new dataStreamMojom.DataSink.proxyClass(this.router_); | 232 this.sink_ = new dataStreamMojom.DataSink.proxyClass(this.router_); |
| 246 this.router_.setIncomingReceiver(this); | 233 this.router_.setIncomingReceiver(this); |
| 247 /** | 234 /** |
| 248 * The async waiter used to wait for | 235 * A queue of sends that have not fully sent their data to the DataSink. |
| 249 * {@link module:data_sender.DataSender#sendPipe_} to be writable. | |
| 250 * @type {!module:async_waiter.AsyncWaiter} | |
| 251 * @private | |
| 252 */ | |
| 253 this.waiter_ = new asyncWaiter.AsyncWaiter( | |
| 254 this.sendPipe_, core.HANDLE_SIGNAL_WRITABLE, | |
| 255 this.onHandleReady_.bind(this)); | |
| 256 /** | |
| 257 * A queue of sends that have not fully written their data to the data pipe. | |
| 258 * @type {!module:data_sender~PendingSend[]} | 236 * @type {!module:data_sender~PendingSend[]} |
| 259 * @private | 237 * @private |
| 260 */ | 238 */ |
| 261 this.pendingSends_ = []; | 239 this.pendingSends_ = []; |
| 262 /** | 240 /** |
| 263 * A queue of sends that have written their data to the data pipe, but have | 241 * A queue of sends that have sent their data to the DataSink, but have not |
| 264 * not been received by the DataSink. | 242 * been received by the DataSink. |
| 265 * @type {!module:data_sender~PendingSend[]} | 243 * @type {!module:data_sender~PendingSend[]} |
| 266 * @private | 244 * @private |
| 267 */ | 245 */ |
| 268 this.sendsAwaitingAck_ = []; | 246 this.sendsAwaitingAck_ = []; |
| 269 | 247 |
| 270 /** | 248 /** |
| 271 * The callback that will resolve a pending cancel if one is in progress. | 249 * The callback that will resolve a pending cancel if one is in progress. |
| 272 * @type {?Function} | 250 * @type {?Function} |
| 273 * @private | 251 * @private |
| 274 */ | 252 */ |
| 275 this.pendingCancel_ = null; | 253 this.pendingCancel_ = null; |
| 276 | 254 |
| 277 /** | 255 /** |
| 278 * The promise that will be resolved when a pending cancel completes if one | 256 * The promise that will be resolved when a pending cancel completes if one |
| 279 * is in progress. | 257 * is in progress. |
| 280 * @type {Promise} | 258 * @type {Promise} |
| 281 * @private | 259 * @private |
| 282 */ | 260 */ |
| 283 this.cancelPromise_ = null; | 261 this.cancelPromise_ = null; |
| 262 /** | |
| 263 * The available send buffer capacity. | |
| 264 * @type {number} | |
| 265 * @private | |
| 266 */ | |
| 267 this.availableBufferCapacity_ = bufferSize; | |
| 284 }; | 268 }; |
| 285 | 269 |
| 286 /** | 270 /** |
| 287 * Serializes this DataSender. | 271 * Serializes this DataSender. |
| 288 * This will cancel any sends in progress before the returned promise | 272 * This will cancel any sends in progress before the returned promise |
| 289 * resolves. | 273 * resolves. |
| 290 * @return {!Promise.<SerializedDataSender>} A promise that will resolve to | 274 * @return {!Promise.<SerializedDataSender>} A promise that will resolve to |
| 291 * the serialization of this DataSender. If this DataSender has shut down, | 275 * the serialization of this DataSender. If this DataSender has shut down, |
| 292 * the promise will resolve to null. | 276 * the promise will resolve to null. |
| 293 */ | 277 */ |
| 294 DataSender.prototype.serialize = function() { | 278 DataSender.prototype.serialize = function() { |
| 295 if (this.shutDown_) | 279 if (this.shutDown_) |
| 296 return Promise.resolve(null); | 280 return Promise.resolve(null); |
| 297 | 281 |
| 298 var readyToSerialize = Promise.resolve(); | 282 var readyToSerialize = Promise.resolve(); |
| 299 if (this.pendingSends_.length) { | 283 if (this.pendingSends_.length || this.sendsAwaitingAck_.length) { |
|
raymes
2014/10/27 03:02:23
Was this a bug before?
Sam McNally
2014/10/27 05:39:14
Yes.
raymes
2014/10/27 22:57:29
We need to make sure it gets merged when we revert
Sam McNally
2014/10/27 23:33:57
Done.
| |
| 300 if (this.pendingCancel_) | 284 if (this.pendingCancel_) |
| 301 readyToSerialize = this.cancelPromise_; | 285 readyToSerialize = this.cancelPromise_; |
| 302 else | 286 else |
| 303 readyToSerialize = this.cancel(this.fatalErrorValue_); | 287 readyToSerialize = this.cancel(this.fatalErrorValue_); |
| 304 } | 288 } |
| 305 return readyToSerialize.then(function() { | 289 return readyToSerialize.then(function() { |
| 306 this.waiter_.stop(); | |
| 307 var serialized = new serialization.SerializedDataSender(); | 290 var serialized = new serialization.SerializedDataSender(); |
| 308 serialized.sink = this.router_.connector_.handle_, | 291 serialized.sink = this.router_.connector_.handle_; |
| 309 serialized.data_pipe = this.sendPipe_, | 292 serialized.fatal_error_value = this.fatalErrorValue_; |
| 310 serialized.fatal_error_value = this.fatalErrorValue_, | 293 serialized.buffer_size = this.availableBufferCapacity_; |
| 311 this.router_.connector_.handle_ = null; | 294 this.router_.connector_.handle_ = null; |
| 312 this.router_.close(); | 295 this.router_.close(); |
| 313 this.shutDown_ = true; | 296 this.shutDown_ = true; |
| 314 return serialized; | 297 return serialized; |
| 315 }.bind(this)); | 298 }.bind(this)); |
| 316 }; | 299 }; |
| 317 | 300 |
| 318 /** | 301 /** |
| 319 * Deserializes a SerializedDataSender. | 302 * Deserializes a SerializedDataSender. |
| 320 * @param {SerializedDataSender} serialized The serialized DataSender. | 303 * @param {SerializedDataSender} serialized The serialized DataSender. |
| 321 * @return {!DataSender} The deserialized DataSender. | 304 * @return {!DataSender} The deserialized DataSender. |
| 322 */ | 305 */ |
| 323 DataSender.deserialize = function(serialized) { | 306 DataSender.deserialize = function(serialized) { |
| 324 var sender = $Object.create(DataSender.prototype); | 307 var sender = $Object.create(DataSender.prototype); |
| 325 sender.deserialize_(serialized); | 308 sender.deserialize_(serialized); |
| 326 return sender; | 309 return sender; |
| 327 }; | 310 }; |
| 328 | 311 |
| 329 /** | 312 /** |
| 330 * Deserializes a SerializedDataSender into this DataSender. | 313 * Deserializes a SerializedDataSender into this DataSender. |
| 331 * @param {SerializedDataSender} serialized The serialized DataSender. | 314 * @param {SerializedDataSender} serialized The serialized DataSender. |
| 332 * @private | 315 * @private |
| 333 */ | 316 */ |
| 334 DataSender.prototype.deserialize_ = function(serialized) { | 317 DataSender.prototype.deserialize_ = function(serialized) { |
| 335 if (!serialized) { | 318 if (!serialized) { |
| 336 this.shutDown_ = true; | 319 this.shutDown_ = true; |
| 337 return; | 320 return; |
| 338 } | 321 } |
| 339 this.init_( | 322 this.init_( |
| 340 serialized.sink, serialized.data_pipe, serialized.fatal_error_value); | 323 serialized.sink, serialized.fatal_error_value, serialized.buffer_size); |
| 341 }; | 324 }; |
| 342 | 325 |
| 343 /** | 326 /** |
| 344 * Sends data to the DataSink. | 327 * Sends data to the DataSink. |
| 345 * @return {!Promise.<number>} A promise to the number of bytes sent. If an | 328 * @return {!Promise.<number>} A promise to the number of bytes sent. If an |
| 346 * error occurs, the promise will reject with an Error object with a | 329 * error occurs, the promise will reject with an Error object with a |
| 347 * property error containing the error code. | 330 * property error containing the error code. |
| 348 * @throws Will throw if this has encountered a fatal error or a cancel is in | 331 * @throws Will throw if this has encountered a fatal error or a cancel is in |
| 349 * progress. | 332 * progress. |
| 350 */ | 333 */ |
| 351 DataSender.prototype.send = function(data) { | 334 DataSender.prototype.send = function(data) { |
| 352 if (this.shutDown_) | 335 if (this.shutDown_) |
| 353 throw new Error('DataSender has been closed'); | 336 throw new Error('DataSender has been closed'); |
| 354 if (this.pendingCancel_) | 337 if (this.pendingCancel_) |
| 355 throw new Error('Cancel in progress'); | 338 throw new Error('Cancel in progress'); |
| 356 var send = new PendingSend(data); | 339 var send = new PendingSend(data); |
| 357 this.pendingSends_.push(send); | 340 this.pendingSends_.push(send); |
| 358 if (!this.waiter_.isWaiting()) | 341 this.sendInternal_(); |
| 359 this.waiter_.start(); | |
| 360 return send.getPromise(); | 342 return send.getPromise(); |
| 361 }; | 343 }; |
| 362 | 344 |
| 345 DataSender.prototype.sendInternal_ = function() { | |
| 346 while (this.pendingSends_.length && this.availableBufferCapacity_) { | |
| 347 var result = this.pendingSends_[0].sendData( | |
| 348 this.sink_, this.availableBufferCapacity_); | |
| 349 this.availableBufferCapacity_ = result.remainingBufferCapacity; | |
| 350 if (result.completed) { | |
| 351 this.sendsAwaitingAck_.push(this.pendingSends_.shift()); | |
| 352 } | |
| 353 } | |
| 354 } | |
| 355 | |
| 363 /** | 356 /** |
| 364 * Requests the cancellation of any in-progress sends. Calls to | 357 * Requests the cancellation of any in-progress sends. Calls to |
| 365 * [send()]{@link module:data_sender.DataSender#send} will fail until the | 358 * [send()]{@link module:data_sender.DataSender#send} will fail until the |
| 366 * cancel has completed. | 359 * cancel has completed. |
| 367 * @param {number} error The error to report for cancelled sends. | 360 * @param {number} error The error to report for cancelled sends. |
| 368 * @return {!Promise} A promise that will resolve when the cancel completes. | 361 * @return {!Promise} A promise that will resolve when the cancel completes. |
| 369 * @throws Will throw if this has encountered a fatal error or another cancel | 362 * @throws Will throw if this has encountered a fatal error or another cancel |
| 370 * is in progress. | 363 * is in progress. |
| 371 */ | 364 */ |
| 372 DataSender.prototype.cancel = function(error) { | 365 DataSender.prototype.cancel = function(error) { |
| 373 if (this.shutDown_) | 366 if (this.shutDown_) |
| 374 throw new Error('DataSender has been closed'); | 367 throw new Error('DataSender has been closed'); |
| 375 if (this.pendingCancel_) | 368 if (this.pendingCancel_) |
| 376 throw new Error('Cancel already in progress'); | 369 throw new Error('Cancel already in progress'); |
| 377 if (this.pendingSends_.length + this.sendsAwaitingAck_.length == 0) | 370 if (this.pendingSends_.length + this.sendsAwaitingAck_.length == 0) |
| 378 return Promise.resolve(); | 371 return Promise.resolve(); |
| 379 | 372 |
| 380 this.sink_.cancel(error); | 373 this.sink_.cancel(error); |
| 381 this.cancelPromise_ = new Promise(function(resolve) { | 374 this.cancelPromise_ = new Promise(function(resolve) { |
| 382 this.pendingCancel_ = resolve; | 375 this.pendingCancel_ = resolve; |
| 383 }.bind(this)); | 376 }.bind(this)); |
| 384 return this.cancelPromise_; | 377 return this.cancelPromise_; |
| 385 }; | 378 }; |
| 386 | 379 |
| 387 /** | 380 /** |
| 388 * Invoked when | |
| 389 * |[sendPipe_]{@link module:data_sender.DataSender#sendPipe_}| is ready to | |
| 390 * write. Writes to the data pipe if the wait is successful. | |
| 391 * @param {number} waitResult The result of the asynchronous wait. | |
| 392 * @private | |
| 393 */ | |
| 394 DataSender.prototype.onHandleReady_ = function(result) { | |
| 395 if (result != core.RESULT_OK) { | |
| 396 this.close(); | |
| 397 return; | |
| 398 } | |
| 399 while (this.pendingSends_.length) { | |
| 400 var result = this.pendingSends_[0].sendData(this.sendPipe_); | |
| 401 if (result == core.RESULT_OK) { | |
| 402 this.sendsAwaitingAck_.push(this.pendingSends_.shift()); | |
| 403 } else if (result == core.RESULT_SHOULD_WAIT) { | |
| 404 this.waiter_.start(); | |
| 405 return; | |
| 406 } else { | |
| 407 this.close(); | |
| 408 return; | |
| 409 } | |
| 410 } | |
| 411 }; | |
| 412 | |
| 413 /** | |
| 414 * Calls and clears the pending cancel callback if one is pending. | 381 * Calls and clears the pending cancel callback if one is pending. |
| 415 * @private | 382 * @private |
| 416 */ | 383 */ |
| 417 DataSender.prototype.callCancelCallback_ = function() { | 384 DataSender.prototype.callCancelCallback_ = function() { |
| 418 if (this.pendingCancel_) { | 385 if (this.pendingCancel_) { |
| 419 this.cancelPromise_ = null; | 386 this.cancelPromise_ = null; |
| 420 this.pendingCancel_(); | 387 this.pendingCancel_(); |
| 421 this.pendingCancel_ = null; | 388 this.pendingCancel_ = null; |
| 422 } | 389 } |
| 423 }; | 390 }; |
| 424 | 391 |
| 425 /** | 392 /** |
| 426 * Invoked by the DataSink to report that data has been successfully sent. | 393 * Invoked by the DataSink to report that data has been successfully sent. |
| 427 * @param {number} numBytes The number of bytes sent. | 394 * @param {number} numBytes The number of bytes sent. |
| 428 * @private | 395 * @private |
| 429 */ | 396 */ |
| 430 DataSender.prototype.reportBytesSent = function(numBytes) { | 397 DataSender.prototype.reportBytesSent = function(numBytes) { |
| 398 this.availableBufferCapacity_ += numBytes; | |
|
raymes
2014/10/27 03:02:23
Should this be -=? I'm not sure if we should test
Sam McNally
2014/10/27 05:39:14
No. Buffer space is freed up as the data is passed
raymes
2014/10/27 22:57:29
I don't see where the availableBufferCapacity_ is
Sam McNally
2014/10/27 23:33:57
It's set in sendInternal_().
| |
| 431 while (numBytes > 0 && this.sendsAwaitingAck_.length) { | 399 while (numBytes > 0 && this.sendsAwaitingAck_.length) { |
| 432 var result = this.sendsAwaitingAck_[0].reportBytesSent(numBytes); | 400 var result = this.sendsAwaitingAck_[0].reportBytesSent(numBytes); |
| 433 numBytes = result.bytesUnreported; | 401 numBytes = result.bytesUnreported; |
| 434 if (result.done) | 402 if (result.done) |
| 435 this.sendsAwaitingAck_.shift(); | 403 this.sendsAwaitingAck_.shift(); |
| 436 } | 404 } |
| 437 if (numBytes > 0 && this.pendingSends_.length) { | 405 if (numBytes > 0 && this.pendingSends_.length) { |
| 438 var result = this.pendingSends_[0].reportBytesSent(numBytes); | 406 var result = this.pendingSends_[0].reportBytesSent(numBytes); |
| 439 numBytes = result.bytesUnreported; | 407 numBytes = result.bytesUnreported; |
| 440 } | 408 } |
| 441 // A cancel is completed when all of the sends that were in progress have | 409 // A cancel is completed when all of the sends that were in progress have |
| 442 // completed or failed. This is the case where all sends complete | 410 // completed or failed. This is the case where all sends complete |
| 443 // successfully. | 411 // successfully. |
| 444 if (this.pendingSends_.length + this.sendsAwaitingAck_.length == 0) | 412 if (this.pendingSends_.length + this.sendsAwaitingAck_.length == 0) |
| 445 this.callCancelCallback_(); | 413 this.callCancelCallback_(); |
| 414 | |
| 415 this.sendInternal_(); | |
| 446 }; | 416 }; |
| 447 | 417 |
| 448 /** | 418 /** |
| 449 * Invoked by the DataSink to report an error in sending data. | 419 * Invoked by the DataSink to report an error in sending data. |
| 450 * @param {number} numBytes The number of bytes sent. | 420 * @param {number} numBytes The number of bytes sent. |
| 451 * @param {number} error The error reported by the DataSink. | 421 * @param {number} error The error reported by the DataSink. |
| 452 * @private | 422 * @private |
| 453 */ | 423 */ |
| 454 DataSender.prototype.reportBytesSentAndError = function(numBytes, error) { | 424 DataSender.prototype.reportBytesSentAndError = function(numBytes, error) { |
| 455 var bytesToFlush = 0; | 425 this.availableBufferCapacity_ += numBytes; |
| 456 while (this.sendsAwaitingAck_.length) { | 426 while (this.sendsAwaitingAck_.length) { |
| 457 var result = this.sendsAwaitingAck_[0].reportBytesSentAndError( | 427 var result = this.sendsAwaitingAck_[0].reportBytesSentAndError( |
| 458 numBytes, error); | 428 numBytes, error); |
| 459 numBytes = result.bytesUnreported; | 429 numBytes = result.bytesUnreported; |
| 460 this.sendsAwaitingAck_.shift(); | 430 this.sendsAwaitingAck_.shift(); |
| 461 bytesToFlush += result.bytesToFlush; | 431 this.availableBufferCapacity_ += result.bytesToFlush; |
| 462 } | 432 } |
| 463 while (this.pendingSends_.length) { | 433 while (this.pendingSends_.length) { |
| 464 var result = this.pendingSends_[0].reportBytesSentAndError( | 434 var result = this.pendingSends_[0].reportBytesSentAndError( |
| 465 numBytes, error); | 435 numBytes, error); |
| 466 numBytes = result.bytesUnreported; | 436 numBytes = result.bytesUnreported; |
| 467 this.pendingSends_.shift(); | 437 this.pendingSends_.shift(); |
| 468 // Note: Only the first PendingSend in |pendingSends_| will have data to | 438 // Note: Only the first PendingSend in |pendingSends_| will have data to |
| 469 // flush as only the first can have written data to the data pipe. | 439 // flush as only the first can have sent data to the DataSink. |
| 470 bytesToFlush += result.bytesToFlush; | 440 this.availableBufferCapacity_ += result.bytesToFlush; |
| 471 } | 441 } |
| 472 this.callCancelCallback_(); | 442 this.callCancelCallback_(); |
| 473 return Promise.resolve({bytes_to_flush: bytesToFlush}); | 443 return Promise.resolve(); |
| 474 }; | 444 }; |
| 475 | 445 |
| 476 return {DataSender: DataSender}; | 446 return {DataSender: DataSender}; |
| 477 }); | 447 }); |
| OLD | NEW |