OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 (function(global, binding, v8) { | 5 (function(global, binding, v8) { |
6 'use strict'; | 6 'use strict'; |
7 | 7 |
8 const _reader = v8.createPrivateSymbol('[[reader]]'); | 8 const _reader = v8.createPrivateSymbol('[[reader]]'); |
9 const _storedError = v8.createPrivateSymbol('[[storedError]]'); | 9 const _storedError = v8.createPrivateSymbol('[[storedError]]'); |
10 const _controller = v8.createPrivateSymbol('[[controller]]'); | 10 const _controller = v8.createPrivateSymbol('[[controller]]'); |
(...skipping 382 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
393 } | 393 } |
394 | 394 |
395 if (stream[_controller] !== undefined) { | 395 if (stream[_controller] !== undefined) { |
396 throw new TypeError(streamErrors.illegalConstructor); | 396 throw new TypeError(streamErrors.illegalConstructor); |
397 } | 397 } |
398 | 398 |
399 this[_controlledReadableStream] = stream; | 399 this[_controlledReadableStream] = stream; |
400 | 400 |
401 this[_underlyingSource] = underlyingSource; | 401 this[_underlyingSource] = underlyingSource; |
402 | 402 |
403 this[_queue] = new Queue(); | 403 this[_queue] = new binding.SimpleQueue(); |
404 this[_totalQueuedSize] = 0; | 404 this[_totalQueuedSize] = 0; |
405 | 405 |
406 this[_readableStreamDefaultControllerBits] = 0b0; | 406 this[_readableStreamDefaultControllerBits] = 0b0; |
407 if (isExternallyControlled === true) { | 407 if (isExternallyControlled === true) { |
408 this[_readableStreamDefaultControllerBits] |= EXTERNALLY_CONTROLLED; | 408 this[_readableStreamDefaultControllerBits] |= EXTERNALLY_CONTROLLED; |
409 } | 409 } |
410 | 410 |
411 const normalizedStrategy = | 411 const normalizedStrategy = |
412 ValidateAndNormalizeQueuingStrategy(size, highWaterMark); | 412 ValidateAndNormalizeQueuingStrategy(size, highWaterMark); |
413 this[_strategySize] = normalizedStrategy.size; | 413 this[_strategySize] = normalizedStrategy.size; |
(...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
494 } | 494 } |
495 if (state === STATE_CLOSED) { | 495 if (state === STATE_CLOSED) { |
496 throw new TypeError(errErrorClosedStream); | 496 throw new TypeError(errErrorClosedStream); |
497 } | 497 } |
498 | 498 |
499 return ReadableStreamDefaultControllerError(this, e); | 499 return ReadableStreamDefaultControllerError(this, e); |
500 } | 500 } |
501 } | 501 } |
502 | 502 |
503 function ReadableStreamDefaultControllerCancel(controller, reason) { | 503 function ReadableStreamDefaultControllerCancel(controller, reason) { |
504 controller[_queue] = new Queue(); | 504 controller[_queue] = new binding.SimpleQueue(); |
505 | 505 |
506 const underlyingSource = controller[_underlyingSource]; | 506 const underlyingSource = controller[_underlyingSource]; |
507 return PromiseCallOrNoop(underlyingSource, 'cancel', reason, 'underlyingSour
ce.cancel'); | 507 return PromiseCallOrNoop(underlyingSource, 'cancel', reason, 'underlyingSour
ce.cancel'); |
508 } | 508 } |
509 | 509 |
510 function ReadableStreamDefaultControllerPull(controller) { | 510 function ReadableStreamDefaultControllerPull(controller) { |
511 const stream = controller[_controlledReadableStream]; | 511 const stream = controller[_controlledReadableStream]; |
512 | 512 |
513 if (controller[_queue].length > 0) { | 513 if (controller[_queue].length > 0) { |
514 const chunk = DequeueValue(controller); | 514 const chunk = DequeueValue(controller); |
(...skipping 23 matching lines...) Expand all Loading... |
538 constructor(stream) { | 538 constructor(stream) { |
539 if (IsReadableStream(stream) === false) { | 539 if (IsReadableStream(stream) === false) { |
540 throw new TypeError(errReaderConstructorBadArgument); | 540 throw new TypeError(errReaderConstructorBadArgument); |
541 } | 541 } |
542 if (IsReadableStreamLocked(stream) === true) { | 542 if (IsReadableStreamLocked(stream) === true) { |
543 throw new TypeError(errReaderConstructorStreamAlreadyLocked); | 543 throw new TypeError(errReaderConstructorStreamAlreadyLocked); |
544 } | 544 } |
545 | 545 |
546 ReadableStreamReaderGenericInitialize(this, stream); | 546 ReadableStreamReaderGenericInitialize(this, stream); |
547 | 547 |
548 this[_readRequests] = new Queue(); | 548 this[_readRequests] = new binding.SimpleQueue(); |
549 } | 549 } |
550 | 550 |
551 get closed() { | 551 get closed() { |
552 if (IsReadableStreamDefaultReader(this) === false) { | 552 if (IsReadableStreamDefaultReader(this) === false) { |
553 return Promise_reject(new TypeError(streamErrors.illegalInvocation)); | 553 return Promise_reject(new TypeError(streamErrors.illegalInvocation)); |
554 } | 554 } |
555 | 555 |
556 return this[_closedPromise]; | 556 return this[_closedPromise]; |
557 } | 557 } |
558 | 558 |
(...skipping 122 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
681 function ReadableStreamGetState(stream) { | 681 function ReadableStreamGetState(stream) { |
682 return (stream[_readableStreamBits] & STATE_MASK) >> STATE_BITS_OFFSET; | 682 return (stream[_readableStreamBits] & STATE_MASK) >> STATE_BITS_OFFSET; |
683 } | 683 } |
684 | 684 |
685 function ReadableStreamSetState(stream, state) { | 685 function ReadableStreamSetState(stream, state) { |
686 stream[_readableStreamBits] = (stream[_readableStreamBits] & ~STATE_MASK) | | 686 stream[_readableStreamBits] = (stream[_readableStreamBits] & ~STATE_MASK) | |
687 (state << STATE_BITS_OFFSET); | 687 (state << STATE_BITS_OFFSET); |
688 } | 688 } |
689 | 689 |
690 function ReadableStreamDefaultControllerError(controller, e) { | 690 function ReadableStreamDefaultControllerError(controller, e) { |
691 controller[_queue] = new Queue(); | 691 controller[_queue] = new binding.SimpleQueue(); |
692 const stream = controller[_controlledReadableStream]; | 692 const stream = controller[_controlledReadableStream]; |
693 ReadableStreamError(stream, e); | 693 ReadableStreamError(stream, e); |
694 } | 694 } |
695 | 695 |
696 function ReadableStreamError(stream, e) { | 696 function ReadableStreamError(stream, e) { |
697 stream[_storedError] = e; | 697 stream[_storedError] = e; |
698 ReadableStreamSetState(stream, STATE_ERRORED); | 698 ReadableStreamSetState(stream, STATE_ERRORED); |
699 | 699 |
700 const reader = stream[_reader]; | 700 const reader = stream[_reader]; |
701 if (reader === undefined) { | 701 if (reader === undefined) { |
702 return undefined; | 702 return undefined; |
703 } | 703 } |
704 | 704 |
705 if (IsReadableStreamDefaultReader(reader) === true) { | 705 if (IsReadableStreamDefaultReader(reader) === true) { |
706 reader[_readRequests].forEach(request => v8.rejectPromise(request, e)); | 706 reader[_readRequests].forEach(request => v8.rejectPromise(request, e)); |
707 reader[_readRequests] = new Queue(); | 707 reader[_readRequests] = new binding.SimpleQueue(); |
708 } | 708 } |
709 | 709 |
710 v8.rejectPromise(reader[_closedPromise], e); | 710 v8.rejectPromise(reader[_closedPromise], e); |
711 v8.markPromiseAsHandled(reader[_closedPromise]); | 711 v8.markPromiseAsHandled(reader[_closedPromise]); |
712 } | 712 } |
713 | 713 |
714 function ReadableStreamClose(stream) { | 714 function ReadableStreamClose(stream) { |
715 ReadableStreamSetState(stream, STATE_CLOSED); | 715 ReadableStreamSetState(stream, STATE_CLOSED); |
716 | 716 |
717 const reader = stream[_reader]; | 717 const reader = stream[_reader]; |
718 if (reader === undefined) { | 718 if (reader === undefined) { |
719 return undefined; | 719 return undefined; |
720 } | 720 } |
721 | 721 |
722 if (IsReadableStreamDefaultReader(reader) === true) { | 722 if (IsReadableStreamDefaultReader(reader) === true) { |
723 reader[_readRequests].forEach(request => | 723 reader[_readRequests].forEach(request => |
724 v8.resolvePromise(request, CreateIterResultObject(undefined, true))); | 724 v8.resolvePromise(request, CreateIterResultObject(undefined, true))); |
725 reader[_readRequests] = new Queue(); | 725 reader[_readRequests] = new binding.SimpleQueue(); |
726 } | 726 } |
727 | 727 |
728 v8.resolvePromise(reader[_closedPromise], undefined); | 728 v8.resolvePromise(reader[_closedPromise], undefined); |
729 } | 729 } |
730 | 730 |
731 function ReadableStreamDefaultControllerGetDesiredSize(controller) { | 731 function ReadableStreamDefaultControllerGetDesiredSize(controller) { |
732 const queueSize = GetTotalQueueSize(controller); | 732 const queueSize = GetTotalQueueSize(controller); |
733 return controller[_strategyHWM] - queueSize; | 733 return controller[_strategyHWM] - queueSize; |
734 } | 734 } |
735 | 735 |
(...skipping 248 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
984 const cancelResult = ReadableStreamCancel(stream, compositeReason); | 984 const cancelResult = ReadableStreamCancel(stream, compositeReason); |
985 v8.resolvePromise(promise, cancelResult); | 985 v8.resolvePromise(promise, cancelResult); |
986 } | 986 } |
987 | 987 |
988 return promise; | 988 return promise; |
989 } | 989 } |
990 } | 990 } |
991 | 991 |
992 // | 992 // |
993 // Queue-with-sizes | 993 // Queue-with-sizes |
994 // Modified from taking the queue (as in the spec) to taking the stream, so we | |
995 // can modify the queue size alongside. | |
996 // | 994 // |
997 | 995 |
998 // Simple queue structure. Avoids scalability issues with using | |
999 // InternalPackedArray directly by using multiple arrays | |
1000 // in a linked list and keeping the array size bounded. | |
1001 const QUEUE_MAX_ARRAY_SIZE = 16384; | |
1002 class Queue { | |
1003 constructor() { | |
1004 this.front = { | |
1005 elements: new v8.InternalPackedArray(), | |
1006 next: undefined, | |
1007 }; | |
1008 this.back = this.front; | |
1009 // The cursor is used to avoid calling InternalPackedArray.shift(). | |
1010 this.cursor = 0; | |
1011 this.size = 0; | |
1012 } | |
1013 | |
1014 get length() { | |
1015 return this.size; | |
1016 } | |
1017 | |
1018 push(element) { | |
1019 ++this.size; | |
1020 if (this.back.elements.length === QUEUE_MAX_ARRAY_SIZE) { | |
1021 const oldBack = this.back; | |
1022 this.back = { | |
1023 elements: new v8.InternalPackedArray(), | |
1024 next: undefined, | |
1025 }; | |
1026 oldBack.next = this.back; | |
1027 } | |
1028 this.back.elements.push(element); | |
1029 } | |
1030 | |
1031 shift() { | |
1032 // assert(this.size > 0); | |
1033 --this.size; | |
1034 if (this.front.elements.length === this.cursor) { | |
1035 // assert(this.cursor === QUEUE_MAX_ARRAY_SIZE); | |
1036 // assert(this.front.next !== undefined); | |
1037 this.front = this.front.next; | |
1038 this.cursor = 0; | |
1039 } | |
1040 const element = this.front.elements[this.cursor]; | |
1041 // Permit shifted element to be garbage collected. | |
1042 this.front.elements[this.cursor] = undefined; | |
1043 ++this.cursor; | |
1044 | |
1045 return element; | |
1046 } | |
1047 | |
1048 forEach(callback) { | |
1049 let i = this.cursor; | |
1050 let node = this.front; | |
1051 let elements = node.elements; | |
1052 while (i !== elements.length || node.next !== undefined) { | |
1053 if (i === elements.length) { | |
1054 // assert(node.next !== undefined); | |
1055 // assert(i === QUEUE_MAX_ARRAY_SIZE); | |
1056 node = node.next; | |
1057 elements = node.elements; | |
1058 i = 0; | |
1059 } | |
1060 callback(elements[i]); | |
1061 ++i; | |
1062 } | |
1063 } | |
1064 } | |
1065 | |
1066 function DequeueValue(controller) { | 996 function DequeueValue(controller) { |
1067 const result = controller[_queue].shift(); | 997 const result = controller[_queue].shift(); |
1068 controller[_totalQueuedSize] -= result.size; | 998 controller[_totalQueuedSize] -= result.size; |
1069 return result.value; | 999 return result.value; |
1070 } | 1000 } |
1071 | 1001 |
1072 function EnqueueValueWithSize(controller, value, size) { | 1002 function EnqueueValueWithSize(controller, value, size) { |
1073 size = Number(size); | 1003 size = Number(size); |
1074 if (Number_isNaN(size) || size === +Infinity || size < 0) { | 1004 if (Number_isNaN(size) || size === +Infinity || size < 0) { |
1075 throw new RangeError(streamErrors.invalidSize); | 1005 throw new RangeError(streamErrors.invalidSize); |
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1177 (underlyingSource, strategy) => { | 1107 (underlyingSource, strategy) => { |
1178 return new ReadableStream( | 1108 return new ReadableStream( |
1179 underlyingSource, strategy, createWithExternalControllerSentinel); | 1109 underlyingSource, strategy, createWithExternalControllerSentinel); |
1180 }; | 1110 }; |
1181 | 1111 |
1182 // Temporary exports while pipeTo() and pipeThrough() are behind flags | 1112 // Temporary exports while pipeTo() and pipeThrough() are behind flags |
1183 binding.ReadableStream_prototype_pipeThrough = | 1113 binding.ReadableStream_prototype_pipeThrough = |
1184 ReadableStream_prototype_pipeThrough; | 1114 ReadableStream_prototype_pipeThrough; |
1185 binding.ReadableStream_prototype_pipeTo = ReadableStream_prototype_pipeTo; | 1115 binding.ReadableStream_prototype_pipeTo = ReadableStream_prototype_pipeTo; |
1186 }); | 1116 }); |
OLD | NEW |