OLD | NEW |
---|---|
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 (function(global, binding, v8) { | 5 (function(global, binding, v8) { |
6 'use strict'; | 6 'use strict'; |
7 | 7 |
8 const _reader = v8.createPrivateSymbol('[[reader]]'); | 8 const _reader = v8.createPrivateSymbol('[[reader]]'); |
9 const _storedError = v8.createPrivateSymbol('[[storedError]]'); | 9 const _storedError = v8.createPrivateSymbol('[[storedError]]'); |
10 const _controller = v8.createPrivateSymbol('[[controller]]'); | 10 const _controller = v8.createPrivateSymbol('[[controller]]'); |
(...skipping 177 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
188 } | 188 } |
189 | 189 |
190 if (stream[_controller] !== undefined) { | 190 if (stream[_controller] !== undefined) { |
191 throw new TypeError(streamErrors.illegalConstructor); | 191 throw new TypeError(streamErrors.illegalConstructor); |
192 } | 192 } |
193 | 193 |
194 this[_controlledReadableStream] = stream; | 194 this[_controlledReadableStream] = stream; |
195 | 195 |
196 this[_underlyingSource] = underlyingSource; | 196 this[_underlyingSource] = underlyingSource; |
197 | 197 |
198 this[_queue] = new v8.InternalPackedArray(); | 198 this[_queue] = new Queue(); |
199 this[_totalQueuedSize] = 0; | 199 this[_totalQueuedSize] = 0; |
200 | 200 |
201 this[_readableStreamDefaultControllerBits] = 0b0; | 201 this[_readableStreamDefaultControllerBits] = 0b0; |
202 if (isExternallyControlled === true) { | 202 if (isExternallyControlled === true) { |
203 this[_readableStreamDefaultControllerBits] |= EXTERNALLY_CONTROLLED; | 203 this[_readableStreamDefaultControllerBits] |= EXTERNALLY_CONTROLLED; |
204 } | 204 } |
205 | 205 |
206 const normalizedStrategy = | 206 const normalizedStrategy = |
207 ValidateAndNormalizeQueuingStrategy(size, highWaterMark); | 207 ValidateAndNormalizeQueuingStrategy(size, highWaterMark); |
208 this[_strategySize] = normalizedStrategy.size; | 208 this[_strategySize] = normalizedStrategy.size; |
(...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
289 } | 289 } |
290 if (state === STATE_CLOSED) { | 290 if (state === STATE_CLOSED) { |
291 throw new TypeError(errErrorClosedStream); | 291 throw new TypeError(errErrorClosedStream); |
292 } | 292 } |
293 | 293 |
294 return ReadableStreamDefaultControllerError(this, e); | 294 return ReadableStreamDefaultControllerError(this, e); |
295 } | 295 } |
296 } | 296 } |
297 | 297 |
298 function ReadableStreamDefaultControllerCancel(controller, reason) { | 298 function ReadableStreamDefaultControllerCancel(controller, reason) { |
299 controller[_queue] = new v8.InternalPackedArray(); | 299 controller[_queue] = new Queue(); |
300 | 300 |
301 const underlyingSource = controller[_underlyingSource]; | 301 const underlyingSource = controller[_underlyingSource]; |
302 return PromiseCallOrNoop(underlyingSource, 'cancel', reason, 'underlyingSour ce.cancel'); | 302 return PromiseCallOrNoop(underlyingSource, 'cancel', reason, 'underlyingSour ce.cancel'); |
303 } | 303 } |
304 | 304 |
305 function ReadableStreamDefaultControllerPull(controller) { | 305 function ReadableStreamDefaultControllerPull(controller) { |
306 const stream = controller[_controlledReadableStream]; | 306 const stream = controller[_controlledReadableStream]; |
307 | 307 |
308 if (controller[_queue].length > 0) { | 308 if (controller[_queue].length > 0) { |
309 const chunk = DequeueValue(controller); | 309 const chunk = DequeueValue(controller); |
(...skipping 166 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
476 function ReadableStreamGetState(stream) { | 476 function ReadableStreamGetState(stream) { |
477 return (stream[_readableStreamBits] & STATE_MASK) >> STATE_BITS_OFFSET; | 477 return (stream[_readableStreamBits] & STATE_MASK) >> STATE_BITS_OFFSET; |
478 } | 478 } |
479 | 479 |
480 function ReadableStreamSetState(stream, state) { | 480 function ReadableStreamSetState(stream, state) { |
481 stream[_readableStreamBits] = (stream[_readableStreamBits] & ~STATE_MASK) | | 481 stream[_readableStreamBits] = (stream[_readableStreamBits] & ~STATE_MASK) | |
482 (state << STATE_BITS_OFFSET); | 482 (state << STATE_BITS_OFFSET); |
483 } | 483 } |
484 | 484 |
485 function ReadableStreamDefaultControllerError(controller, e) { | 485 function ReadableStreamDefaultControllerError(controller, e) { |
486 controller[_queue] = new v8.InternalPackedArray(); | 486 controller[_queue] = new Queue(); |
487 const stream = controller[_controlledReadableStream]; | 487 const stream = controller[_controlledReadableStream]; |
488 ReadableStreamError(stream, e); | 488 ReadableStreamError(stream, e); |
489 } | 489 } |
490 | 490 |
491 function ReadableStreamError(stream, e) { | 491 function ReadableStreamError(stream, e) { |
492 stream[_storedError] = e; | 492 stream[_storedError] = e; |
493 ReadableStreamSetState(stream, STATE_ERRORED); | 493 ReadableStreamSetState(stream, STATE_ERRORED); |
494 | 494 |
495 const reader = stream[_reader]; | 495 const reader = stream[_reader]; |
496 if (reader === undefined) { | 496 if (reader === undefined) { |
(...skipping 292 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
789 return promise; | 789 return promise; |
790 } | 790 } |
791 } | 791 } |
792 | 792 |
793 // | 793 // |
794 // Queue-with-sizes | 794 // Queue-with-sizes |
795 // Modified from taking the queue (as in the spec) to taking the stream, so we | 795 // Modified from taking the queue (as in the spec) to taking the stream, so we |
796 // can modify the queue size alongside. | 796 // can modify the queue size alongside. |
797 // | 797 // |
798 | 798 |
799 // Simple queue structure. Avoids scalability issues with using | |
800 // InternalPackedArray directly by using multiple arrays | |
801 // in a linked list and keeping the size of each below 32768 elements. | |
802 class Queue { | |
803 constructor() { | |
804 this.front = { | |
805 elements: new v8.InternalPackedArray(), | |
806 next: undefined, | |
807 }; | |
808 this.back = this.front; | |
809 this.size = 0; | |
810 } | |
811 | |
812 get length() { | |
yhirano
2017/01/17 08:23:37
How about having |empty| predicate?
Adam Rice
2017/01/17 08:33:39
I am hoping to get feedback from someone more know
| |
813 return this.size; | |
814 } | |
815 | |
816 push(element) { | |
817 ++this.size; | |
818 if (this.back.elements.length === 32767) { | |
819 const oldBack = this.back; | |
820 this.back = { | |
821 elements: new v8.InternalPackedArray(), | |
822 next: undefined, | |
823 }; | |
824 oldBack.next = this.back; | |
825 } | |
826 this.back.elements.push(element); | |
827 } | |
828 | |
829 shift() { | |
830 --this.size; | |
831 const element = this.front.elements.shift(); | |
832 if (this.front.elements.length === 0 && this.front.next !== undefined) { | |
833 this.front = this.front.next; | |
834 } | |
835 return element; | |
836 } | |
837 } | |
838 | |
799 function DequeueValue(controller) { | 839 function DequeueValue(controller) { |
800 const result = controller[_queue].shift(); | 840 const result = controller[_queue].shift(); |
801 controller[_totalQueuedSize] -= result.size; | 841 controller[_totalQueuedSize] -= result.size; |
802 return result.value; | 842 return result.value; |
803 } | 843 } |
804 | 844 |
805 function EnqueueValueWithSize(controller, value, size) { | 845 function EnqueueValueWithSize(controller, value, size) { |
806 size = Number(size); | 846 size = Number(size); |
807 if (Number_isNaN(size) || size === +Infinity || size < 0) { | 847 if (Number_isNaN(size) || size === +Infinity || size < 0) { |
808 throw new RangeError(streamErrors.invalidSize); | 848 throw new RangeError(streamErrors.invalidSize); |
(...skipping 96 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
905 binding.ReadableStreamDefaultControllerGetDesiredSize = ReadableStreamDefaultC ontrollerGetDesiredSize; | 945 binding.ReadableStreamDefaultControllerGetDesiredSize = ReadableStreamDefaultC ontrollerGetDesiredSize; |
906 binding.ReadableStreamDefaultControllerEnqueue = ReadableStreamDefaultControll erEnqueue; | 946 binding.ReadableStreamDefaultControllerEnqueue = ReadableStreamDefaultControll erEnqueue; |
907 binding.ReadableStreamDefaultControllerError = ReadableStreamDefaultController Error; | 947 binding.ReadableStreamDefaultControllerError = ReadableStreamDefaultController Error; |
908 | 948 |
909 binding.createReadableStreamWithExternalController = | 949 binding.createReadableStreamWithExternalController = |
910 (underlyingSource, strategy) => { | 950 (underlyingSource, strategy) => { |
911 return new ReadableStream( | 951 return new ReadableStream( |
912 underlyingSource, strategy, createWithExternalControllerSentinel); | 952 underlyingSource, strategy, createWithExternalControllerSentinel); |
913 }; | 953 }; |
914 }); | 954 }); |
OLD | NEW |