OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 (function(global, binding, v8) { | 5 (function(global, binding, v8) { |
6 'use strict'; | 6 'use strict'; |
7 | 7 |
8 const _reader = v8.createPrivateSymbol('[[reader]]'); | 8 const _reader = v8.createPrivateSymbol('[[reader]]'); |
9 const _storedError = v8.createPrivateSymbol('[[storedError]]'); | 9 const _storedError = v8.createPrivateSymbol('[[storedError]]'); |
10 const _controller = v8.createPrivateSymbol('[[controller]]'); | 10 const _controller = v8.createPrivateSymbol('[[controller]]'); |
(...skipping 177 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
188 } | 188 } |
189 | 189 |
190 if (stream[_controller] !== undefined) { | 190 if (stream[_controller] !== undefined) { |
191 throw new TypeError(streamErrors.illegalConstructor); | 191 throw new TypeError(streamErrors.illegalConstructor); |
192 } | 192 } |
193 | 193 |
194 this[_controlledReadableStream] = stream; | 194 this[_controlledReadableStream] = stream; |
195 | 195 |
196 this[_underlyingSource] = underlyingSource; | 196 this[_underlyingSource] = underlyingSource; |
197 | 197 |
198 this[_queue] = new v8.InternalPackedArray(); | 198 this[_queue] = new Queue(); |
199 this[_totalQueuedSize] = 0; | 199 this[_totalQueuedSize] = 0; |
200 | 200 |
201 this[_readableStreamDefaultControllerBits] = 0b0; | 201 this[_readableStreamDefaultControllerBits] = 0b0; |
202 if (isExternallyControlled === true) { | 202 if (isExternallyControlled === true) { |
203 this[_readableStreamDefaultControllerBits] |= EXTERNALLY_CONTROLLED; | 203 this[_readableStreamDefaultControllerBits] |= EXTERNALLY_CONTROLLED; |
204 } | 204 } |
205 | 205 |
206 const normalizedStrategy = | 206 const normalizedStrategy = |
207 ValidateAndNormalizeQueuingStrategy(size, highWaterMark); | 207 ValidateAndNormalizeQueuingStrategy(size, highWaterMark); |
208 this[_strategySize] = normalizedStrategy.size; | 208 this[_strategySize] = normalizedStrategy.size; |
(...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
289 } | 289 } |
290 if (state === STATE_CLOSED) { | 290 if (state === STATE_CLOSED) { |
291 throw new TypeError(errErrorClosedStream); | 291 throw new TypeError(errErrorClosedStream); |
292 } | 292 } |
293 | 293 |
294 return ReadableStreamDefaultControllerError(this, e); | 294 return ReadableStreamDefaultControllerError(this, e); |
295 } | 295 } |
296 } | 296 } |
297 | 297 |
298 function ReadableStreamDefaultControllerCancel(controller, reason) { | 298 function ReadableStreamDefaultControllerCancel(controller, reason) { |
299 controller[_queue] = new v8.InternalPackedArray(); | 299 controller[_queue] = new Queue(); |
300 | 300 |
301 const underlyingSource = controller[_underlyingSource]; | 301 const underlyingSource = controller[_underlyingSource]; |
302 return PromiseCallOrNoop(underlyingSource, 'cancel', reason, 'underlyingSour
ce.cancel'); | 302 return PromiseCallOrNoop(underlyingSource, 'cancel', reason, 'underlyingSour
ce.cancel'); |
303 } | 303 } |
304 | 304 |
305 function ReadableStreamDefaultControllerPull(controller) { | 305 function ReadableStreamDefaultControllerPull(controller) { |
306 const stream = controller[_controlledReadableStream]; | 306 const stream = controller[_controlledReadableStream]; |
307 | 307 |
308 if (controller[_queue].length > 0) { | 308 if (controller[_queue].length > 0) { |
309 const chunk = DequeueValue(controller); | 309 const chunk = DequeueValue(controller); |
(...skipping 23 matching lines...) Expand all Loading... |
333 constructor(stream) { | 333 constructor(stream) { |
334 if (IsReadableStream(stream) === false) { | 334 if (IsReadableStream(stream) === false) { |
335 throw new TypeError(errReaderConstructorBadArgument); | 335 throw new TypeError(errReaderConstructorBadArgument); |
336 } | 336 } |
337 if (IsReadableStreamLocked(stream) === true) { | 337 if (IsReadableStreamLocked(stream) === true) { |
338 throw new TypeError(errReaderConstructorStreamAlreadyLocked); | 338 throw new TypeError(errReaderConstructorStreamAlreadyLocked); |
339 } | 339 } |
340 | 340 |
341 ReadableStreamReaderGenericInitialize(this, stream); | 341 ReadableStreamReaderGenericInitialize(this, stream); |
342 | 342 |
343 this[_readRequests] = new v8.InternalPackedArray(); | 343 this[_readRequests] = new Queue(); |
344 } | 344 } |
345 | 345 |
346 get closed() { | 346 get closed() { |
347 if (IsReadableStreamDefaultReader(this) === false) { | 347 if (IsReadableStreamDefaultReader(this) === false) { |
348 return Promise_reject(new TypeError(streamErrors.illegalInvocation)); | 348 return Promise_reject(new TypeError(streamErrors.illegalInvocation)); |
349 } | 349 } |
350 | 350 |
351 return this[_closedPromise]; | 351 return this[_closedPromise]; |
352 } | 352 } |
353 | 353 |
(...skipping 122 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
476 function ReadableStreamGetState(stream) { | 476 function ReadableStreamGetState(stream) { |
477 return (stream[_readableStreamBits] & STATE_MASK) >> STATE_BITS_OFFSET; | 477 return (stream[_readableStreamBits] & STATE_MASK) >> STATE_BITS_OFFSET; |
478 } | 478 } |
479 | 479 |
480 function ReadableStreamSetState(stream, state) { | 480 function ReadableStreamSetState(stream, state) { |
481 stream[_readableStreamBits] = (stream[_readableStreamBits] & ~STATE_MASK) | | 481 stream[_readableStreamBits] = (stream[_readableStreamBits] & ~STATE_MASK) | |
482 (state << STATE_BITS_OFFSET); | 482 (state << STATE_BITS_OFFSET); |
483 } | 483 } |
484 | 484 |
485 function ReadableStreamDefaultControllerError(controller, e) { | 485 function ReadableStreamDefaultControllerError(controller, e) { |
486 controller[_queue] = new v8.InternalPackedArray(); | 486 controller[_queue] = new Queue(); |
487 const stream = controller[_controlledReadableStream]; | 487 const stream = controller[_controlledReadableStream]; |
488 ReadableStreamError(stream, e); | 488 ReadableStreamError(stream, e); |
489 } | 489 } |
490 | 490 |
491 function ReadableStreamError(stream, e) { | 491 function ReadableStreamError(stream, e) { |
492 stream[_storedError] = e; | 492 stream[_storedError] = e; |
493 ReadableStreamSetState(stream, STATE_ERRORED); | 493 ReadableStreamSetState(stream, STATE_ERRORED); |
494 | 494 |
495 const reader = stream[_reader]; | 495 const reader = stream[_reader]; |
496 if (reader === undefined) { | 496 if (reader === undefined) { |
497 return undefined; | 497 return undefined; |
498 } | 498 } |
499 | 499 |
500 if (IsReadableStreamDefaultReader(reader) === true) { | 500 if (IsReadableStreamDefaultReader(reader) === true) { |
501 const readRequests = reader[_readRequests]; | 501 reader[_readRequests].forEach(request => v8.rejectPromise(request, e)); |
502 for (let i = 0; i < readRequests.length; i++) { | 502 reader[_readRequests] = new Queue(); |
503 v8.rejectPromise(readRequests[i], e); | |
504 } | |
505 reader[_readRequests] = new v8.InternalPackedArray(); | |
506 } | 503 } |
507 | 504 |
508 v8.rejectPromise(reader[_closedPromise], e); | 505 v8.rejectPromise(reader[_closedPromise], e); |
509 v8.markPromiseAsHandled(reader[_closedPromise]); | 506 v8.markPromiseAsHandled(reader[_closedPromise]); |
510 } | 507 } |
511 | 508 |
512 function ReadableStreamClose(stream) { | 509 function ReadableStreamClose(stream) { |
513 ReadableStreamSetState(stream, STATE_CLOSED); | 510 ReadableStreamSetState(stream, STATE_CLOSED); |
514 | 511 |
515 const reader = stream[_reader]; | 512 const reader = stream[_reader]; |
516 if (reader === undefined) { | 513 if (reader === undefined) { |
517 return undefined; | 514 return undefined; |
518 } | 515 } |
519 | 516 |
520 if (IsReadableStreamDefaultReader(reader) === true) { | 517 if (IsReadableStreamDefaultReader(reader) === true) { |
521 const readRequests = reader[_readRequests]; | 518 reader[_readRequests].forEach(request => |
522 for (let i = 0; i < readRequests.length; i++) { | 519 v8.resolvePromise(request, CreateIterResultObject(undefined, true))); |
523 v8.resolvePromise( | 520 reader[_readRequests] = new Queue(); |
524 readRequests[i], CreateIterResultObject(undefined, true)); | |
525 } | |
526 reader[_readRequests] = new v8.InternalPackedArray(); | |
527 } | 521 } |
528 | 522 |
529 v8.resolvePromise(reader[_closedPromise], undefined); | 523 v8.resolvePromise(reader[_closedPromise], undefined); |
530 } | 524 } |
531 | 525 |
532 function ReadableStreamDefaultControllerGetDesiredSize(controller) { | 526 function ReadableStreamDefaultControllerGetDesiredSize(controller) { |
533 const queueSize = GetTotalQueueSize(controller); | 527 const queueSize = GetTotalQueueSize(controller); |
534 return controller[_strategyHWM] - queueSize; | 528 return controller[_strategyHWM] - queueSize; |
535 } | 529 } |
536 | 530 |
(...skipping 252 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
789 return promise; | 783 return promise; |
790 } | 784 } |
791 } | 785 } |
792 | 786 |
793 // | 787 // |
794 // Queue-with-sizes | 788 // Queue-with-sizes |
795 // Modified from taking the queue (as in the spec) to taking the stream, so we | 789 // Modified from taking the queue (as in the spec) to taking the stream, so we |
796 // can modify the queue size alongside. | 790 // can modify the queue size alongside. |
797 // | 791 // |
798 | 792 |
| 793 // Simple queue structure. Avoids scalability issues with using |
| 794 // InternalPackedArray directly by using multiple arrays |
| 795 // in a linked list and keeping the array size bounded. |
| 796 const QUEUE_MAX_ARRAY_SIZE = 16384; |
| 797 class Queue { |
| 798 constructor() { |
| 799 this.front = { |
| 800 elements: new v8.InternalPackedArray(), |
| 801 next: undefined, |
| 802 }; |
| 803 this.back = this.front; |
| 804 // The cursor is used to avoid calling InternalPackedArray.shift(). |
| 805 this.cursor = 0; |
| 806 this.size = 0; |
| 807 } |
| 808 |
| 809 get length() { |
| 810 return this.size; |
| 811 } |
| 812 |
| 813 push(element) { |
| 814 ++this.size; |
| 815 if (this.back.elements.length === QUEUE_MAX_ARRAY_SIZE) { |
| 816 const oldBack = this.back; |
| 817 this.back = { |
| 818 elements: new v8.InternalPackedArray(), |
| 819 next: undefined, |
| 820 }; |
| 821 oldBack.next = this.back; |
| 822 } |
| 823 this.back.elements.push(element); |
| 824 } |
| 825 |
| 826 shift() { |
| 827 // assert(this.size > 0); |
| 828 --this.size; |
| 829 if (this.front.elements.length === this.cursor) { |
| 830 // assert(this.cursor === QUEUE_MAX_ARRAY_SIZE); |
| 831 // assert(this.front.next !== undefined); |
| 832 this.front = this.front.next; |
| 833 this.cursor = 0; |
| 834 } |
| 835 const element = this.front.elements[this.cursor]; |
| 836 // Permit shifted element to be garbage collected. |
| 837 this.front.elements[this.cursor] = undefined; |
| 838 ++this.cursor; |
| 839 |
| 840 return element; |
| 841 } |
| 842 |
| 843 forEach(callback) { |
| 844 let i = this.cursor; |
| 845 let node = this.front; |
| 846 let elements = node.elements; |
| 847 while (i !== elements.length || node.next !== undefined) { |
| 848 if (i === elements.length) { |
| 849 // assert(node.next !== undefined); |
| 850 // assert(i === QUEUE_MAX_ARRAY_SIZE); |
| 851 node = node.next; |
| 852 elements = node.elements; |
| 853 i = 0; |
| 854 } |
| 855 callback(elements[i]); |
| 856 ++i; |
| 857 } |
| 858 } |
| 859 } |
| 860 |
799 function DequeueValue(controller) { | 861 function DequeueValue(controller) { |
800 const result = controller[_queue].shift(); | 862 const result = controller[_queue].shift(); |
801 controller[_totalQueuedSize] -= result.size; | 863 controller[_totalQueuedSize] -= result.size; |
802 return result.value; | 864 return result.value; |
803 } | 865 } |
804 | 866 |
805 function EnqueueValueWithSize(controller, value, size) { | 867 function EnqueueValueWithSize(controller, value, size) { |
806 size = Number(size); | 868 size = Number(size); |
807 if (Number_isNaN(size) || size === +Infinity || size < 0) { | 869 if (Number_isNaN(size) || size === +Infinity || size < 0) { |
808 throw new RangeError(streamErrors.invalidSize); | 870 throw new RangeError(streamErrors.invalidSize); |
(...skipping 96 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
905 binding.ReadableStreamDefaultControllerGetDesiredSize = ReadableStreamDefaultC
ontrollerGetDesiredSize; | 967 binding.ReadableStreamDefaultControllerGetDesiredSize = ReadableStreamDefaultC
ontrollerGetDesiredSize; |
906 binding.ReadableStreamDefaultControllerEnqueue = ReadableStreamDefaultControll
erEnqueue; | 968 binding.ReadableStreamDefaultControllerEnqueue = ReadableStreamDefaultControll
erEnqueue; |
907 binding.ReadableStreamDefaultControllerError = ReadableStreamDefaultController
Error; | 969 binding.ReadableStreamDefaultControllerError = ReadableStreamDefaultController
Error; |
908 | 970 |
909 binding.createReadableStreamWithExternalController = | 971 binding.createReadableStreamWithExternalController = |
910 (underlyingSource, strategy) => { | 972 (underlyingSource, strategy) => { |
911 return new ReadableStream( | 973 return new ReadableStream( |
912 underlyingSource, strategy, createWithExternalControllerSentinel); | 974 underlyingSource, strategy, createWithExternalControllerSentinel); |
913 }; | 975 }; |
914 }); | 976 }); |
OLD | NEW |