OLD | NEW |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 define('data_sender', [ | 5 define('data_sender', [ |
6 'async_waiter', | |
7 'device/serial/data_stream.mojom', | 6 'device/serial/data_stream.mojom', |
8 'device/serial/data_stream_serialization.mojom', | 7 'device/serial/data_stream_serialization.mojom', |
9 'mojo/public/js/bindings/core', | 8 'mojo/public/js/bindings/core', |
10 'mojo/public/js/bindings/router', | 9 'mojo/public/js/bindings/router', |
11 ], function(asyncWaiter, dataStreamMojom, serialization, core, routerModule) { | 10 ], function(dataStreamMojom, serialization, core, routerModule) { |
12 /** | 11 /** |
13 * @module data_sender | 12 * @module data_sender |
14 */ | 13 */ |
15 | 14 |
16 /** | 15 /** |
17 * A pending send operation. | 16 * A pending send operation. |
18 * @param {!ArrayBuffer} data The data to be sent. | 17 * @param {!ArrayBuffer} data The data to be sent. |
19 * @constructor | 18 * @constructor |
20 * @alias module:data_sender~PendingSend | 19 * @alias module:data_sender~PendingSend |
21 * @private | 20 * @private |
(...skipping 115 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
137 if (this.bytesReceivedBySink_ > this.length_) { | 136 if (this.bytesReceivedBySink_ > this.length_) { |
138 result.bytesUnreported = this.bytesReceivedBySink_ - this.length_; | 137 result.bytesUnreported = this.bytesReceivedBySink_ - this.length_; |
139 this.bytesReceivedBySink_ = this.length_; | 138 this.bytesReceivedBySink_ = this.length_; |
140 } | 139 } |
141 result.done = false; | 140 result.done = false; |
142 return result; | 141 return result; |
143 }; | 142 }; |
144 | 143 |
145 /** | 144 /** |
146 * Writes pending data into the data pipe. | 145 * Writes pending data into the data pipe. |
147 * @param {!MojoHandle} handle The handle to the data pipe. | 146 * @param {!DataSink} sink The DataSink to receive the data. |
148 * @return {number} The Mojo result corresponding to the outcome: | 147 * @return {number} The Mojo result corresponding to the outcome: |
149 * <ul> | 148 * <ul> |
150 * <li>RESULT_OK if the write completes successfully; | 149 * <li>RESULT_OK if the write completes successfully; |
151 * <li>RESULT_SHOULD_WAIT if some, but not all data was written; or | 150 * <li>RESULT_SHOULD_WAIT if some, but not all data was written; or |
152 * <li>the data pipe error if the write failed. | 151 * <li>the data pipe error if the write failed. |
153 * </ul> | 152 * </ul> |
154 */ | 153 */ |
155 PendingSend.prototype.sendData = function(handle) { | 154 PendingSend.prototype.sendData = function(sink, availableBufferCapacity) { |
156 var result = core.writeData( | 155 var numBytesToSend = |
157 handle, new Int8Array(this.data_), core.WRITE_DATA_FLAG_NONE); | 156 Math.min(availableBufferCapacity, this.data_.byteLength); |
158 if (result.result != core.RESULT_OK) | 157 sink.acceptData( |
159 return result.result; | 158 new Uint8Array(this.data_, 0, numBytesToSend)); |
160 this.data_ = this.data_.slice(result.numBytes); | 159 this.data_ = this.data_.slice(numBytesToSend); |
161 return this.data_.byteLength ? core.RESULT_SHOULD_WAIT : core.RESULT_OK; | 160 return { |
| 161 completed: this.data_.byteLength == 0, |
| 162 remainingBufferCapacity: availableBufferCapacity - numBytesToSend, |
| 163 }; |
162 }; | 164 }; |
163 | 165 |
164 /** | 166 /** |
165 * A DataSender that sends data to a DataSink. | 167 * A DataSender that sends data to a DataSink. |
166 * @param {!MojoHandle} handle The handle to the DataSink. | 168 * @param {!MojoHandle} handle The handle to the DataSink. |
167 * @param {number} bufferSize How large a buffer the data pipe should use. | 169 * @param {number} bufferSize How large a buffer to use for data. |
168 * @param {number} fatalErrorValue The send error value to report in the | 170 * @param {number} fatalErrorValue The send error value to report in the |
169 * event of a fatal error. | 171 * event of a fatal error. |
170 * @constructor | 172 * @constructor |
171 * @alias module:data_sender.DataSender | 173 * @alias module:data_sender.DataSender |
172 */ | 174 */ |
173 function DataSender(handle, bufferSize, fatalErrorValue) { | 175 function DataSender(handle, bufferSize, fatalErrorValue) { |
174 var dataPipeOptions = { | 176 this.init_(handle, fatalErrorValue, bufferSize); |
175 flags: core.CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, | 177 this.sink_.init(bufferSize); |
176 elementNumBytes: 1, | |
177 capacityNumBytes: bufferSize, | |
178 }; | |
179 var sendPipe = core.createDataPipe(dataPipeOptions); | |
180 this.init_(handle, sendPipe.producerHandle, fatalErrorValue); | |
181 this.sink_.init(sendPipe.consumerHandle); | |
182 } | 178 } |
183 | 179 |
184 DataSender.prototype = | 180 DataSender.prototype = |
185 $Object.create(dataStreamMojom.DataSinkClient.stubClass.prototype); | 181 $Object.create(dataStreamMojom.DataSinkClient.stubClass.prototype); |
186 | 182 |
187 /** | 183 /** |
188 * Closes this DataSender. | 184 * Closes this DataSender. |
189 */ | 185 */ |
190 DataSender.prototype.close = function() { | 186 DataSender.prototype.close = function() { |
191 if (this.shutDown_) | 187 if (this.shutDown_) |
192 return; | 188 return; |
193 this.shutDown_ = true; | 189 this.shutDown_ = true; |
194 this.waiter_.stop(); | |
195 this.router_.close(); | 190 this.router_.close(); |
196 core.close(this.sendPipe_); | |
197 while (this.pendingSends_.length) { | 191 while (this.pendingSends_.length) { |
198 this.pendingSends_.pop().reportBytesSentAndError( | 192 this.pendingSends_.pop().reportBytesSentAndError( |
199 0, this.fatalErrorValue_); | 193 0, this.fatalErrorValue_); |
200 } | 194 } |
201 while (this.sendsAwaitingAck_.length) { | 195 while (this.sendsAwaitingAck_.length) { |
202 this.sendsAwaitingAck_.pop().reportBytesSentAndError( | 196 this.sendsAwaitingAck_.pop().reportBytesSentAndError( |
203 0, this.fatalErrorValue_); | 197 0, this.fatalErrorValue_); |
204 } | 198 } |
205 this.callCancelCallback_(); | 199 this.callCancelCallback_(); |
206 }; | 200 }; |
207 | 201 |
208 /** | 202 /** |
209 * Initialize this DataSender. | 203 * Initialize this DataSender. |
210 * @param {!MojoHandle} sink A handle to the DataSink | 204 * @param {!MojoHandle} sink A handle to the DataSink |
211 * @param {!MojoHandle} dataPipe A handle to use for sending data to the | |
212 * DataSink. | |
213 * @param {number} fatalErrorValue The error to dispatch in the event of a | 205 * @param {number} fatalErrorValue The error to dispatch in the event of a |
214 * fatal error. | 206 * fatal error. |
| 207 * @param {number} bufferSize The size of the send buffer. |
215 * @private | 208 * @private |
216 */ | 209 */ |
217 DataSender.prototype.init_ = function(sink, dataPipe, fatalErrorValue) { | 210 DataSender.prototype.init_ = function(sink, fatalErrorValue, bufferSize) { |
218 /** | |
219 * The handle to the data pipe to use for sending data. | |
220 * @private | |
221 */ | |
222 this.sendPipe_ = dataPipe; | |
223 /** | 211 /** |
224 * The error to be dispatched in the event of a fatal error. | 212 * The error to be dispatched in the event of a fatal error. |
225 * @const {number} | 213 * @const {number} |
226 * @private | 214 * @private |
227 */ | 215 */ |
228 this.fatalErrorValue_ = fatalErrorValue; | 216 this.fatalErrorValue_ = fatalErrorValue; |
229 /** | 217 /** |
230 * Whether this DataSender has shut down. | 218 * Whether this DataSender has shut down. |
231 * @type {boolean} | 219 * @type {boolean} |
232 * @private | 220 * @private |
233 */ | 221 */ |
234 this.shutDown_ = false; | 222 this.shutDown_ = false; |
235 /** | 223 /** |
236 * The [Router]{@link module:mojo/public/js/bindings/router.Router} for the | 224 * The [Router]{@link module:mojo/public/js/bindings/router.Router} for the |
237 * connection to the DataSink. | 225 * connection to the DataSink. |
238 * @private | 226 * @private |
239 */ | 227 */ |
240 this.router_ = new routerModule.Router(sink); | 228 this.router_ = new routerModule.Router(sink); |
241 /** | 229 /** |
242 * The connection to the DataSink. | 230 * The connection to the DataSink. |
243 * @private | 231 * @private |
244 */ | 232 */ |
245 this.sink_ = new dataStreamMojom.DataSink.proxyClass(this.router_); | 233 this.sink_ = new dataStreamMojom.DataSink.proxyClass(this.router_); |
246 this.router_.setIncomingReceiver(this); | 234 this.router_.setIncomingReceiver(this); |
247 /** | 235 /** |
248 * The async waiter used to wait for | 236 * A queue of sends that have not fully sent their data to the DataSink. |
249 * {@link module:data_sender.DataSender#sendPipe_} to be writable. | |
250 * @type {!module:async_waiter.AsyncWaiter} | |
251 * @private | |
252 */ | |
253 this.waiter_ = new asyncWaiter.AsyncWaiter( | |
254 this.sendPipe_, core.HANDLE_SIGNAL_WRITABLE, | |
255 this.onHandleReady_.bind(this)); | |
256 /** | |
257 * A queue of sends that have not fully written their data to the data pipe. | |
258 * @type {!module:data_sender~PendingSend[]} | 237 * @type {!module:data_sender~PendingSend[]} |
259 * @private | 238 * @private |
260 */ | 239 */ |
261 this.pendingSends_ = []; | 240 this.pendingSends_ = []; |
262 /** | 241 /** |
263 * A queue of sends that have written their data to the data pipe, but have | 242 * A queue of sends that have sent their data to the DataSink, but have not |
264 * not been received by the DataSink. | 243 * been received by the DataSink. |
265 * @type {!module:data_sender~PendingSend[]} | 244 * @type {!module:data_sender~PendingSend[]} |
266 * @private | 245 * @private |
267 */ | 246 */ |
268 this.sendsAwaitingAck_ = []; | 247 this.sendsAwaitingAck_ = []; |
269 | 248 |
270 /** | 249 /** |
271 * The callback that will resolve a pending cancel if one is in progress. | 250 * The callback that will resolve a pending cancel if one is in progress. |
272 * @type {?Function} | 251 * @type {?Function} |
273 * @private | 252 * @private |
274 */ | 253 */ |
275 this.pendingCancel_ = null; | 254 this.pendingCancel_ = null; |
276 | 255 |
277 /** | 256 /** |
278 * The promise that will be resolved when a pending cancel completes if one | 257 * The promise that will be resolved when a pending cancel completes if one |
279 * is in progress. | 258 * is in progress. |
280 * @type {Promise} | 259 * @type {Promise} |
281 * @private | 260 * @private |
282 */ | 261 */ |
283 this.cancelPromise_ = null; | 262 this.cancelPromise_ = null; |
| 263 /** |
| 264 * The available send buffer capacity. |
| 265 * @type {number} |
| 266 * @private |
| 267 */ |
| 268 this.availableBufferCapacity_ = bufferSize; |
284 }; | 269 }; |
285 | 270 |
286 /** | 271 /** |
287 * Serializes this DataSender. | 272 * Serializes this DataSender. |
288 * This will cancel any sends in progress before the returned promise | 273 * This will cancel any sends in progress before the returned promise |
289 * resolves. | 274 * resolves. |
290 * @return {!Promise.<SerializedDataSender>} A promise that will resolve to | 275 * @return {!Promise.<SerializedDataSender>} A promise that will resolve to |
291 * the serialization of this DataSender. If this DataSender has shut down, | 276 * the serialization of this DataSender. If this DataSender has shut down, |
292 * the promise will resolve to null. | 277 * the promise will resolve to null. |
293 */ | 278 */ |
294 DataSender.prototype.serialize = function() { | 279 DataSender.prototype.serialize = function() { |
295 if (this.shutDown_) | 280 if (this.shutDown_) |
296 return Promise.resolve(null); | 281 return Promise.resolve(null); |
297 | 282 |
298 var readyToSerialize = Promise.resolve(); | 283 var readyToSerialize = Promise.resolve(); |
299 if (this.pendingSends_.length) { | 284 if (this.pendingSends_.length || this.sendsAwaitingAck_.length) { |
300 if (this.pendingCancel_) | 285 if (this.pendingCancel_) |
301 readyToSerialize = this.cancelPromise_; | 286 readyToSerialize = this.cancelPromise_; |
302 else | 287 else |
303 readyToSerialize = this.cancel(this.fatalErrorValue_); | 288 readyToSerialize = this.cancel(this.fatalErrorValue_); |
304 } | 289 } |
305 return readyToSerialize.then(function() { | 290 return readyToSerialize.then(function() { |
306 this.waiter_.stop(); | |
307 var serialized = new serialization.SerializedDataSender(); | 291 var serialized = new serialization.SerializedDataSender(); |
308 serialized.sink = this.router_.connector_.handle_, | 292 serialized.sink = this.router_.connector_.handle_; |
309 serialized.data_pipe = this.sendPipe_, | 293 serialized.fatal_error_value = this.fatalErrorValue_; |
310 serialized.fatal_error_value = this.fatalErrorValue_, | 294 serialized.buffer_size = this.availableBufferCapacity_; |
311 this.router_.connector_.handle_ = null; | 295 this.router_.connector_.handle_ = null; |
312 this.router_.close(); | 296 this.router_.close(); |
313 this.shutDown_ = true; | 297 this.shutDown_ = true; |
314 return serialized; | 298 return serialized; |
315 }.bind(this)); | 299 }.bind(this)); |
316 }; | 300 }; |
317 | 301 |
318 /** | 302 /** |
319 * Deserializes a SerializedDataSender. | 303 * Deserializes a SerializedDataSender. |
320 * @param {SerializedDataSender} serialized The serialized DataSender. | 304 * @param {SerializedDataSender} serialized The serialized DataSender. |
321 * @return {!DataSender} The deserialized DataSender. | 305 * @return {!DataSender} The deserialized DataSender. |
322 */ | 306 */ |
323 DataSender.deserialize = function(serialized) { | 307 DataSender.deserialize = function(serialized) { |
324 var sender = $Object.create(DataSender.prototype); | 308 var sender = $Object.create(DataSender.prototype); |
325 sender.deserialize_(serialized); | 309 sender.deserialize_(serialized); |
326 return sender; | 310 return sender; |
327 }; | 311 }; |
328 | 312 |
329 /** | 313 /** |
330 * Deserializes a SerializedDataSender into this DataSender. | 314 * Deserializes a SerializedDataSender into this DataSender. |
331 * @param {SerializedDataSender} serialized The serialized DataSender. | 315 * @param {SerializedDataSender} serialized The serialized DataSender. |
332 * @private | 316 * @private |
333 */ | 317 */ |
334 DataSender.prototype.deserialize_ = function(serialized) { | 318 DataSender.prototype.deserialize_ = function(serialized) { |
335 if (!serialized) { | 319 if (!serialized) { |
336 this.shutDown_ = true; | 320 this.shutDown_ = true; |
337 return; | 321 return; |
338 } | 322 } |
339 this.init_( | 323 this.init_( |
340 serialized.sink, serialized.data_pipe, serialized.fatal_error_value); | 324 serialized.sink, serialized.fatal_error_value, serialized.buffer_size); |
341 }; | 325 }; |
342 | 326 |
343 /** | 327 /** |
344 * Sends data to the DataSink. | 328 * Sends data to the DataSink. |
345 * @return {!Promise.<number>} A promise to the number of bytes sent. If an | 329 * @return {!Promise.<number>} A promise to the number of bytes sent. If an |
346 * error occurs, the promise will reject with an Error object with a | 330 * error occurs, the promise will reject with an Error object with a |
347 * property error containing the error code. | 331 * property error containing the error code. |
348 * @throws Will throw if this has encountered a fatal error or a cancel is in | 332 * @throws Will throw if this has encountered a fatal error or a cancel is in |
349 * progress. | 333 * progress. |
350 */ | 334 */ |
351 DataSender.prototype.send = function(data) { | 335 DataSender.prototype.send = function(data) { |
352 if (this.shutDown_) | 336 if (this.shutDown_) |
353 throw new Error('DataSender has been closed'); | 337 throw new Error('DataSender has been closed'); |
354 if (this.pendingCancel_) | 338 if (this.pendingCancel_) |
355 throw new Error('Cancel in progress'); | 339 throw new Error('Cancel in progress'); |
356 var send = new PendingSend(data); | 340 var send = new PendingSend(data); |
357 this.pendingSends_.push(send); | 341 this.pendingSends_.push(send); |
358 if (!this.waiter_.isWaiting()) | 342 this.sendInternal_(); |
359 this.waiter_.start(); | |
360 return send.getPromise(); | 343 return send.getPromise(); |
361 }; | 344 }; |
362 | 345 |
| 346 DataSender.prototype.sendInternal_ = function() { |
| 347 while (this.pendingSends_.length && this.availableBufferCapacity_) { |
| 348 var result = this.pendingSends_[0].sendData( |
| 349 this.sink_, this.availableBufferCapacity_); |
| 350 this.availableBufferCapacity_ = result.remainingBufferCapacity; |
| 351 if (result.completed) { |
| 352 this.sendsAwaitingAck_.push(this.pendingSends_.shift()); |
| 353 } |
| 354 } |
| 355 } |
| 356 |
363 /** | 357 /** |
364 * Requests the cancellation of any in-progress sends. Calls to | 358 * Requests the cancellation of any in-progress sends. Calls to |
365 * [send()]{@link module:data_sender.DataSender#send} will fail until the | 359 * [send()]{@link module:data_sender.DataSender#send} will fail until the |
366 * cancel has completed. | 360 * cancel has completed. |
367 * @param {number} error The error to report for cancelled sends. | 361 * @param {number} error The error to report for cancelled sends. |
368 * @return {!Promise} A promise that will resolve when the cancel completes. | 362 * @return {!Promise} A promise that will resolve when the cancel completes. |
369 * @throws Will throw if this has encountered a fatal error or another cancel | 363 * @throws Will throw if this has encountered a fatal error or another cancel |
370 * is in progress. | 364 * is in progress. |
371 */ | 365 */ |
372 DataSender.prototype.cancel = function(error) { | 366 DataSender.prototype.cancel = function(error) { |
373 if (this.shutDown_) | 367 if (this.shutDown_) |
374 throw new Error('DataSender has been closed'); | 368 throw new Error('DataSender has been closed'); |
375 if (this.pendingCancel_) | 369 if (this.pendingCancel_) |
376 throw new Error('Cancel already in progress'); | 370 throw new Error('Cancel already in progress'); |
377 if (this.pendingSends_.length + this.sendsAwaitingAck_.length == 0) | 371 if (this.pendingSends_.length + this.sendsAwaitingAck_.length == 0) |
378 return Promise.resolve(); | 372 return Promise.resolve(); |
379 | 373 |
380 this.sink_.cancel(error); | 374 this.sink_.cancel(error); |
381 this.cancelPromise_ = new Promise(function(resolve) { | 375 this.cancelPromise_ = new Promise(function(resolve) { |
382 this.pendingCancel_ = resolve; | 376 this.pendingCancel_ = resolve; |
383 }.bind(this)); | 377 }.bind(this)); |
384 return this.cancelPromise_; | 378 return this.cancelPromise_; |
385 }; | 379 }; |
386 | 380 |
387 /** | 381 /** |
388 * Invoked when | |
389 * |[sendPipe_]{@link module:data_sender.DataSender#sendPipe_}| is ready to | |
390 * write. Writes to the data pipe if the wait is successful. | |
391 * @param {number} waitResult The result of the asynchronous wait. | |
392 * @private | |
393 */ | |
394 DataSender.prototype.onHandleReady_ = function(result) { | |
395 if (result != core.RESULT_OK) { | |
396 this.close(); | |
397 return; | |
398 } | |
399 while (this.pendingSends_.length) { | |
400 var result = this.pendingSends_[0].sendData(this.sendPipe_); | |
401 if (result == core.RESULT_OK) { | |
402 this.sendsAwaitingAck_.push(this.pendingSends_.shift()); | |
403 } else if (result == core.RESULT_SHOULD_WAIT) { | |
404 this.waiter_.start(); | |
405 return; | |
406 } else { | |
407 this.close(); | |
408 return; | |
409 } | |
410 } | |
411 }; | |
412 | |
413 /** | |
414 * Calls and clears the pending cancel callback if one is pending. | 382 * Calls and clears the pending cancel callback if one is pending. |
415 * @private | 383 * @private |
416 */ | 384 */ |
417 DataSender.prototype.callCancelCallback_ = function() { | 385 DataSender.prototype.callCancelCallback_ = function() { |
418 if (this.pendingCancel_) { | 386 if (this.pendingCancel_) { |
419 this.cancelPromise_ = null; | 387 this.cancelPromise_ = null; |
420 this.pendingCancel_(); | 388 this.pendingCancel_(); |
421 this.pendingCancel_ = null; | 389 this.pendingCancel_ = null; |
422 } | 390 } |
423 }; | 391 }; |
424 | 392 |
425 /** | 393 /** |
426 * Invoked by the DataSink to report that data has been successfully sent. | 394 * Invoked by the DataSink to report that data has been successfully sent. |
427 * @param {number} numBytes The number of bytes sent. | 395 * @param {number} numBytes The number of bytes sent. |
428 * @private | 396 * @private |
429 */ | 397 */ |
430 DataSender.prototype.reportBytesSent = function(numBytes) { | 398 DataSender.prototype.reportBytesSent = function(numBytes) { |
| 399 this.availableBufferCapacity_ += numBytes; |
431 while (numBytes > 0 && this.sendsAwaitingAck_.length) { | 400 while (numBytes > 0 && this.sendsAwaitingAck_.length) { |
432 var result = this.sendsAwaitingAck_[0].reportBytesSent(numBytes); | 401 var result = this.sendsAwaitingAck_[0].reportBytesSent(numBytes); |
433 numBytes = result.bytesUnreported; | 402 numBytes = result.bytesUnreported; |
434 if (result.done) | 403 if (result.done) |
435 this.sendsAwaitingAck_.shift(); | 404 this.sendsAwaitingAck_.shift(); |
436 } | 405 } |
437 if (numBytes > 0 && this.pendingSends_.length) { | 406 if (numBytes > 0 && this.pendingSends_.length) { |
438 var result = this.pendingSends_[0].reportBytesSent(numBytes); | 407 var result = this.pendingSends_[0].reportBytesSent(numBytes); |
439 numBytes = result.bytesUnreported; | 408 numBytes = result.bytesUnreported; |
440 } | 409 } |
441 // A cancel is completed when all of the sends that were in progress have | 410 // A cancel is completed when all of the sends that were in progress have |
442 // completed or failed. This is the case where all sends complete | 411 // completed or failed. This is the case where all sends complete |
443 // successfully. | 412 // successfully. |
444 if (this.pendingSends_.length + this.sendsAwaitingAck_.length == 0) | 413 if (this.pendingSends_.length + this.sendsAwaitingAck_.length == 0) |
445 this.callCancelCallback_(); | 414 this.callCancelCallback_(); |
| 415 |
| 416 this.sendInternal_(); |
446 }; | 417 }; |
447 | 418 |
448 /** | 419 /** |
449 * Invoked by the DataSink to report an error in sending data. | 420 * Invoked by the DataSink to report an error in sending data. |
450 * @param {number} numBytes The number of bytes sent. | 421 * @param {number} numBytes The number of bytes sent. |
451 * @param {number} error The error reported by the DataSink. | 422 * @param {number} error The error reported by the DataSink. |
452 * @private | 423 * @private |
453 */ | 424 */ |
454 DataSender.prototype.reportBytesSentAndError = function(numBytes, error) { | 425 DataSender.prototype.reportBytesSentAndError = function(numBytes, error) { |
455 var bytesToFlush = 0; | 426 this.availableBufferCapacity_ += numBytes; |
456 while (this.sendsAwaitingAck_.length) { | 427 while (this.sendsAwaitingAck_.length) { |
457 var result = this.sendsAwaitingAck_[0].reportBytesSentAndError( | 428 var result = this.sendsAwaitingAck_[0].reportBytesSentAndError( |
458 numBytes, error); | 429 numBytes, error); |
459 numBytes = result.bytesUnreported; | 430 numBytes = result.bytesUnreported; |
460 this.sendsAwaitingAck_.shift(); | 431 this.sendsAwaitingAck_.shift(); |
461 bytesToFlush += result.bytesToFlush; | 432 this.availableBufferCapacity_ += result.bytesToFlush; |
462 } | 433 } |
463 while (this.pendingSends_.length) { | 434 while (this.pendingSends_.length) { |
464 var result = this.pendingSends_[0].reportBytesSentAndError( | 435 var result = this.pendingSends_[0].reportBytesSentAndError( |
465 numBytes, error); | 436 numBytes, error); |
466 numBytes = result.bytesUnreported; | 437 numBytes = result.bytesUnreported; |
467 this.pendingSends_.shift(); | 438 this.pendingSends_.shift(); |
468 // Note: Only the first PendingSend in |pendingSends_| will have data to | 439 // Note: Only the first PendingSend in |pendingSends_| will have data to |
469 // flush as only the first can have written data to the data pipe. | 440 // flush as only the first can have sent data to the DataSink. |
470 bytesToFlush += result.bytesToFlush; | 441 this.availableBufferCapacity_ += result.bytesToFlush; |
471 } | 442 } |
472 this.callCancelCallback_(); | 443 this.callCancelCallback_(); |
473 return Promise.resolve({bytes_to_flush: bytesToFlush}); | 444 return Promise.resolve(); |
474 }; | 445 }; |
475 | 446 |
476 return {DataSender: DataSender}; | 447 return {DataSender: DataSender}; |
477 }); | 448 }); |
OLD | NEW |