Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 (function(global, binding, v8) { | 5 (function(global, binding, v8) { |
| 6 'use strict'; | 6 'use strict'; |
| 7 | 7 |
| 8 const _reader = v8.createPrivateSymbol('[[reader]]'); | 8 const _reader = v8.createPrivateSymbol('[[reader]]'); |
| 9 const _storedError = v8.createPrivateSymbol('[[storedError]]'); | 9 const _storedError = v8.createPrivateSymbol('[[storedError]]'); |
| 10 const _controller = v8.createPrivateSymbol('[[controller]]'); | 10 const _controller = v8.createPrivateSymbol('[[controller]]'); |
| (...skipping 177 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 188 } | 188 } |
| 189 | 189 |
| 190 if (stream[_controller] !== undefined) { | 190 if (stream[_controller] !== undefined) { |
| 191 throw new TypeError(streamErrors.illegalConstructor); | 191 throw new TypeError(streamErrors.illegalConstructor); |
| 192 } | 192 } |
| 193 | 193 |
| 194 this[_controlledReadableStream] = stream; | 194 this[_controlledReadableStream] = stream; |
| 195 | 195 |
| 196 this[_underlyingSource] = underlyingSource; | 196 this[_underlyingSource] = underlyingSource; |
| 197 | 197 |
| 198 this[_queue] = new v8.InternalPackedArray(); | 198 this[_queue] = new Queue(); |
| 199 this[_totalQueuedSize] = 0; | 199 this[_totalQueuedSize] = 0; |
| 200 | 200 |
| 201 this[_readableStreamDefaultControllerBits] = 0b0; | 201 this[_readableStreamDefaultControllerBits] = 0b0; |
| 202 if (isExternallyControlled === true) { | 202 if (isExternallyControlled === true) { |
| 203 this[_readableStreamDefaultControllerBits] |= EXTERNALLY_CONTROLLED; | 203 this[_readableStreamDefaultControllerBits] |= EXTERNALLY_CONTROLLED; |
| 204 } | 204 } |
| 205 | 205 |
| 206 const normalizedStrategy = | 206 const normalizedStrategy = |
| 207 ValidateAndNormalizeQueuingStrategy(size, highWaterMark); | 207 ValidateAndNormalizeQueuingStrategy(size, highWaterMark); |
| 208 this[_strategySize] = normalizedStrategy.size; | 208 this[_strategySize] = normalizedStrategy.size; |
| (...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 289 } | 289 } |
| 290 if (state === STATE_CLOSED) { | 290 if (state === STATE_CLOSED) { |
| 291 throw new TypeError(errErrorClosedStream); | 291 throw new TypeError(errErrorClosedStream); |
| 292 } | 292 } |
| 293 | 293 |
| 294 return ReadableStreamDefaultControllerError(this, e); | 294 return ReadableStreamDefaultControllerError(this, e); |
| 295 } | 295 } |
| 296 } | 296 } |
| 297 | 297 |
| 298 function ReadableStreamDefaultControllerCancel(controller, reason) { | 298 function ReadableStreamDefaultControllerCancel(controller, reason) { |
| 299 controller[_queue] = new v8.InternalPackedArray(); | 299 controller[_queue] = new Queue(); |
| 300 | 300 |
| 301 const underlyingSource = controller[_underlyingSource]; | 301 const underlyingSource = controller[_underlyingSource]; |
| 302 return PromiseCallOrNoop(underlyingSource, 'cancel', reason, 'underlyingSour ce.cancel'); | 302 return PromiseCallOrNoop(underlyingSource, 'cancel', reason, 'underlyingSour ce.cancel'); |
| 303 } | 303 } |
| 304 | 304 |
| 305 function ReadableStreamDefaultControllerPull(controller) { | 305 function ReadableStreamDefaultControllerPull(controller) { |
| 306 const stream = controller[_controlledReadableStream]; | 306 const stream = controller[_controlledReadableStream]; |
| 307 | 307 |
| 308 if (controller[_queue].length > 0) { | 308 if (controller[_queue].length > 0) { |
| 309 const chunk = DequeueValue(controller); | 309 const chunk = DequeueValue(controller); |
| (...skipping 23 matching lines...) Expand all Loading... | |
| 333 constructor(stream) { | 333 constructor(stream) { |
| 334 if (IsReadableStream(stream) === false) { | 334 if (IsReadableStream(stream) === false) { |
| 335 throw new TypeError(errReaderConstructorBadArgument); | 335 throw new TypeError(errReaderConstructorBadArgument); |
| 336 } | 336 } |
| 337 if (IsReadableStreamLocked(stream) === true) { | 337 if (IsReadableStreamLocked(stream) === true) { |
| 338 throw new TypeError(errReaderConstructorStreamAlreadyLocked); | 338 throw new TypeError(errReaderConstructorStreamAlreadyLocked); |
| 339 } | 339 } |
| 340 | 340 |
| 341 ReadableStreamReaderGenericInitialize(this, stream); | 341 ReadableStreamReaderGenericInitialize(this, stream); |
| 342 | 342 |
| 343 this[_readRequests] = new v8.InternalPackedArray(); | 343 this[_readRequests] = new Queue(); |
| 344 } | 344 } |
| 345 | 345 |
| 346 get closed() { | 346 get closed() { |
| 347 if (IsReadableStreamDefaultReader(this) === false) { | 347 if (IsReadableStreamDefaultReader(this) === false) { |
| 348 return Promise_reject(new TypeError(streamErrors.illegalInvocation)); | 348 return Promise_reject(new TypeError(streamErrors.illegalInvocation)); |
| 349 } | 349 } |
| 350 | 350 |
| 351 return this[_closedPromise]; | 351 return this[_closedPromise]; |
| 352 } | 352 } |
| 353 | 353 |
| (...skipping 122 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 476 function ReadableStreamGetState(stream) { | 476 function ReadableStreamGetState(stream) { |
| 477 return (stream[_readableStreamBits] & STATE_MASK) >> STATE_BITS_OFFSET; | 477 return (stream[_readableStreamBits] & STATE_MASK) >> STATE_BITS_OFFSET; |
| 478 } | 478 } |
| 479 | 479 |
| 480 function ReadableStreamSetState(stream, state) { | 480 function ReadableStreamSetState(stream, state) { |
| 481 stream[_readableStreamBits] = (stream[_readableStreamBits] & ~STATE_MASK) | | 481 stream[_readableStreamBits] = (stream[_readableStreamBits] & ~STATE_MASK) | |
| 482 (state << STATE_BITS_OFFSET); | 482 (state << STATE_BITS_OFFSET); |
| 483 } | 483 } |
| 484 | 484 |
| 485 function ReadableStreamDefaultControllerError(controller, e) { | 485 function ReadableStreamDefaultControllerError(controller, e) { |
| 486 controller[_queue] = new v8.InternalPackedArray(); | 486 controller[_queue] = new Queue(); |
| 487 const stream = controller[_controlledReadableStream]; | 487 const stream = controller[_controlledReadableStream]; |
| 488 ReadableStreamError(stream, e); | 488 ReadableStreamError(stream, e); |
| 489 } | 489 } |
| 490 | 490 |
| 491 function ReadableStreamError(stream, e) { | 491 function ReadableStreamError(stream, e) { |
| 492 stream[_storedError] = e; | 492 stream[_storedError] = e; |
| 493 ReadableStreamSetState(stream, STATE_ERRORED); | 493 ReadableStreamSetState(stream, STATE_ERRORED); |
| 494 | 494 |
| 495 const reader = stream[_reader]; | 495 const reader = stream[_reader]; |
| 496 if (reader === undefined) { | 496 if (reader === undefined) { |
| 497 return undefined; | 497 return undefined; |
| 498 } | 498 } |
| 499 | 499 |
| 500 if (IsReadableStreamDefaultReader(reader) === true) { | 500 if (IsReadableStreamDefaultReader(reader) === true) { |
| 501 const readRequests = reader[_readRequests]; | 501 reader[_readRequests].forEach(request => v8.rejectPromise(request, e)); |
| 502 for (let i = 0; i < readRequests.length; i++) { | 502 reader[_readRequests] = new Queue(); |
| 503 v8.rejectPromise(readRequests[i], e); | |
| 504 } | |
| 505 reader[_readRequests] = new v8.InternalPackedArray(); | |
| 506 } | 503 } |
| 507 | 504 |
| 508 v8.rejectPromise(reader[_closedPromise], e); | 505 v8.rejectPromise(reader[_closedPromise], e); |
| 509 v8.markPromiseAsHandled(reader[_closedPromise]); | 506 v8.markPromiseAsHandled(reader[_closedPromise]); |
| 510 } | 507 } |
| 511 | 508 |
| 512 function ReadableStreamClose(stream) { | 509 function ReadableStreamClose(stream) { |
| 513 ReadableStreamSetState(stream, STATE_CLOSED); | 510 ReadableStreamSetState(stream, STATE_CLOSED); |
| 514 | 511 |
| 515 const reader = stream[_reader]; | 512 const reader = stream[_reader]; |
| 516 if (reader === undefined) { | 513 if (reader === undefined) { |
| 517 return undefined; | 514 return undefined; |
| 518 } | 515 } |
| 519 | 516 |
| 520 if (IsReadableStreamDefaultReader(reader) === true) { | 517 if (IsReadableStreamDefaultReader(reader) === true) { |
| 521 const readRequests = reader[_readRequests]; | 518 reader[_readRequests].forEach(request => |
| 522 for (let i = 0; i < readRequests.length; i++) { | 519 v8.resolvePromise(request, CreateIterResultObject(undefined, true))); |
| 523 v8.resolvePromise( | 520 reader[_readRequests] = new Queue(); |
| 524 readRequests[i], CreateIterResultObject(undefined, true)); | |
| 525 } | |
| 526 reader[_readRequests] = new v8.InternalPackedArray(); | |
| 527 } | 521 } |
| 528 | 522 |
| 529 v8.resolvePromise(reader[_closedPromise], undefined); | 523 v8.resolvePromise(reader[_closedPromise], undefined); |
| 530 } | 524 } |
| 531 | 525 |
| 532 function ReadableStreamDefaultControllerGetDesiredSize(controller) { | 526 function ReadableStreamDefaultControllerGetDesiredSize(controller) { |
| 533 const queueSize = GetTotalQueueSize(controller); | 527 const queueSize = GetTotalQueueSize(controller); |
| 534 return controller[_strategyHWM] - queueSize; | 528 return controller[_strategyHWM] - queueSize; |
| 535 } | 529 } |
| 536 | 530 |
| (...skipping 252 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 789 return promise; | 783 return promise; |
| 790 } | 784 } |
| 791 } | 785 } |
| 792 | 786 |
| 793 // | 787 // |
| 794 // Queue-with-sizes | 788 // Queue-with-sizes |
| 795 // Modified from taking the queue (as in the spec) to taking the stream, so we | 789 // Modified from taking the queue (as in the spec) to taking the stream, so we |
| 796 // can modify the queue size alongside. | 790 // can modify the queue size alongside. |
| 797 // | 791 // |
| 798 | 792 |
| 793 // Simple queue structure. Avoids scalability issues with using | |
| 794 // InternalPackedArray directly by using multiple arrays | |
| 795 // in a linked list and keeping the array size bounded. | |
| 796 class Queue { | |
| 797 constructor() { | |
| 798 this.front = { | |
| 799 elements: new v8.InternalPackedArray(), | |
| 800 next: undefined, | |
| 801 }; | |
| 802 this.back = this.front; | |
| 803 // The cursor is used to avoid calling InternalPackedArray.shift(). | |
| 804 this.cursor = 0; | |
| 805 this.size = 0; | |
| 806 } | |
| 807 | |
| 808 get length() { | |
| 809 return this.size; | |
| 810 } | |
| 811 | |
| 812 push(element) { | |
| 813 ++this.size; | |
| 814 if (this.back.elements.length === 16384) { | |
|
yhirano
2017/01/24 11:14:13
[optional] Having a constant might be good given t
Adam Rice
2017/01/24 12:33:02
I didn't do this because I wasn't sure that v8 wou
| |
| 815 const oldBack = this.back; | |
| 816 this.back = { | |
| 817 elements: new v8.InternalPackedArray(), | |
| 818 next: undefined, | |
| 819 }; | |
| 820 oldBack.next = this.back; | |
| 821 } | |
| 822 this.back.elements.push(element); | |
| 823 } | |
| 824 | |
| 825 shift() { | |
| 826 // assert(this.size > 0); | |
| 827 --this.size; | |
| 828 if (this.front.elements.length === this.cursor) { | |
| 829 // assert(this.cursor === MAX_ARRAY_LENGTH); | |
| 830 if (this.front.next !== undefined) { | |
|
yhirano
2017/01/24 11:14:13
this.front.next === undefined && this.front.elemen
Adam Rice
2017/01/24 12:33:02
You are right. Done.
| |
| 831 this.front = this.front.next; | |
| 832 } else { | |
| 833 // assert(this.front === this.back); | |
| 834 // assert(this.back.elements === MAX_ARRAY_LENGTH); | |
| 835 this.front.elements.length = 0; | |
| 836 } | |
| 837 this.cursor = 0; | |
| 838 } | |
| 839 const element = this.front.elements[this.cursor]; | |
| 840 // Permit shifted element to be garbage collected. | |
| 841 this.front.elements[this.cursor] = undefined; | |
| 842 ++this.cursor; | |
| 843 | |
| 844 return element; | |
| 845 } | |
| 846 | |
| 847 forEach(callback) { | |
| 848 let i = this.cursor; | |
| 849 let node = this.front; | |
| 850 let elements = node.elements; | |
| 851 while (i !== elements.length || node.next !== undefined) { | |
| 852 if (i === elements.length) { | |
| 853 // assert(node.next !== undefined); | |
| 854 // assert(i === MAX_ARRAY_LENGTH); | |
| 855 node = node.next; | |
| 856 elements = node.elements; | |
| 857 i = 0; | |
| 858 } | |
| 859 callback(elements[i]); | |
| 860 ++i; | |
| 861 } | |
| 862 } | |
| 863 } | |
| 864 | |
| 799 function DequeueValue(controller) { | 865 function DequeueValue(controller) { |
| 800 const result = controller[_queue].shift(); | 866 const result = controller[_queue].shift(); |
| 801 controller[_totalQueuedSize] -= result.size; | 867 controller[_totalQueuedSize] -= result.size; |
| 802 return result.value; | 868 return result.value; |
| 803 } | 869 } |
| 804 | 870 |
| 805 function EnqueueValueWithSize(controller, value, size) { | 871 function EnqueueValueWithSize(controller, value, size) { |
| 806 size = Number(size); | 872 size = Number(size); |
| 807 if (Number_isNaN(size) || size === +Infinity || size < 0) { | 873 if (Number_isNaN(size) || size === +Infinity || size < 0) { |
| 808 throw new RangeError(streamErrors.invalidSize); | 874 throw new RangeError(streamErrors.invalidSize); |
| (...skipping 96 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 905 binding.ReadableStreamDefaultControllerGetDesiredSize = ReadableStreamDefaultC ontrollerGetDesiredSize; | 971 binding.ReadableStreamDefaultControllerGetDesiredSize = ReadableStreamDefaultC ontrollerGetDesiredSize; |
| 906 binding.ReadableStreamDefaultControllerEnqueue = ReadableStreamDefaultControll erEnqueue; | 972 binding.ReadableStreamDefaultControllerEnqueue = ReadableStreamDefaultControll erEnqueue; |
| 907 binding.ReadableStreamDefaultControllerError = ReadableStreamDefaultController Error; | 973 binding.ReadableStreamDefaultControllerError = ReadableStreamDefaultController Error; |
| 908 | 974 |
| 909 binding.createReadableStreamWithExternalController = | 975 binding.createReadableStreamWithExternalController = |
| 910 (underlyingSource, strategy) => { | 976 (underlyingSource, strategy) => { |
| 911 return new ReadableStream( | 977 return new ReadableStream( |
| 912 underlyingSource, strategy, createWithExternalControllerSentinel); | 978 underlyingSource, strategy, createWithExternalControllerSentinel); |
| 913 }; | 979 }; |
| 914 }); | 980 }); |
| OLD | NEW |