Index: Source/core/streams/ReadableStream2.js |
diff --git a/Source/core/streams/ReadableStream2.js b/Source/core/streams/ReadableStream2.js |
new file mode 100644 |
index 0000000000000000000000000000000000000000..4f0b24c21448bf82fd9f681a23b6b64d70112b27 |
--- /dev/null |
+++ b/Source/core/streams/ReadableStream2.js |
@@ -0,0 +1,784 @@ |
+(function(global, exports) { |
+ 'use strict'; |
+ |
+ // V8 "Imports": |
+ // - %CreatePrivateOwnSymbol |
+ // - %_CallFunction |
+ // - %AddNamedProperty |
+ // - %HasOwnProperty |
+ // - $promiseThen, $promiseCreate, $promiseResolve, $promiseReject |
+ |
+ const readableStreamClosedPromise = %CreatePrivateOwnSymbol('[[closedPromise]]'); |
+ const readableStreamCloseRequested = %CreatePrivateOwnSymbol('[[closeRequested]]'); |
+ const readableStreamController = %CreatePrivateOwnSymbol('[[controller]]'); |
+ const readableStreamPullAgain = %CreatePrivateOwnSymbol('[[pullAgain]]'); |
+ const readableStreamPulling = %CreatePrivateOwnSymbol('[[pulling]]'); |
+ const readableStreamQueue = %CreatePrivateOwnSymbol('[[queue]]'); |
+ const readableStreamQueueSize = %CreatePrivateOwnSymbol('[[queue]] total size'); |
+ const readableStreamReader = %CreatePrivateOwnSymbol('[[reader]]'); |
+ const readableStreamStarted = %CreatePrivateOwnSymbol('[[started]]'); |
+ const readableStreamState = %CreatePrivateOwnSymbol('[[state]]'); |
+ const readableStreamStoredError = %CreatePrivateOwnSymbol('[[storedError]]'); |
+ const readableStreamStrategySize = %CreatePrivateOwnSymbol('[[strategySize]]'); |
+ const readableStreamStrategyHWM = %CreatePrivateOwnSymbol('[[strategyHWM]]'); |
+ const readableStreamUnderlyingSource = %CreatePrivateOwnSymbol('[[underlyingSource]]'); |
+ |
+ const readableStreamControllerControlledReadableStream = %CreatePrivateOwnSymbol('[[controlledReadableStream]]'); |
+ |
+ const readableStreamReaderClosedPromise = %CreatePrivateOwnSymbol('[[closedPromise]]'); |
+ const readableStreamReaderOwnerReadableStream = %CreatePrivateOwnSymbol('[[ownerReadableStream]]'); |
+ const readableStreamReaderReadRequests = %CreatePrivateOwnSymbol('[[readRequests]]'); |
+ const readableStreamReaderState = %CreatePrivateOwnSymbol('[[state]]'); |
+ const readableStreamReaderStoredError = %CreatePrivateOwnSymbol('[[storedError]]'); |
+ |
+ const createWithExternalControllerSentinel = %CreatePrivateOwnSymbol( |
+ 'flag for UA-controlled ReadableStreams to pass'); |
+ |
+ const undefined = void 0; |
+ const DONT_ENUM = 2; |
+ |
+ const STATE_READABLE = 0; |
+ const STATE_CLOSED = 1; |
+ const STATE_ERRORED = 2; |
+ |
+ const TypeError = global.TypeError; |
+ const RangeError = global.RangeError; |
+ const Promise = global.Promise; |
+ |
+ const Number = global.Number; |
+ const Number_isNaN = global.Number.isNaN; |
+ const Number_isFinite = global.Number.isFinite; |
+ |
+ // TODO(domenic): update to use InternalPackedArray once yangguo gives us access in extras |
+ const InternalPackedArray = global.Array; |
+ |
+ function thenPromise(promise, f, r) { |
+ return %_CallFunction(promise, f, r, $promiseThen); |
+ } |
+ |
+ // Manually create "bound" versions since Function.prototype.bind is slow. |
+ const Promise_resolve = (function() { |
+ const unbound = Promise.resolve; |
+ return function(x) { |
+ return %_CallFunction(Promise, x, unbound); |
+ }; |
+ })(); |
+ |
+ const Promise_reject = (function() { |
+ const unbound = Promise.reject; |
+ return function(r) { |
+ return %_CallFunction(Promise, r, unbound); |
+ }; |
+ })(); |
+ |
+ class ReadableStream { |
+ constructor(underlyingSource, strategy) { |
+ if (underlyingSource === undefined) { |
+ underlyingSource = {}; |
+ } |
+ if (strategy === undefined) { |
+ strategy = {}; |
+ } |
+ const size = strategy.size; |
+ let highWaterMark = strategy.highWaterMark; |
+ if (highWaterMark === undefined) { |
+ highWaterMark = 1; |
+ } |
+ |
+ const normalizedStrategy = ValidateAndNormalizeQueuingStrategy(size, highWaterMark); |
+ |
+ this[readableStreamUnderlyingSource] = underlyingSource; |
+ |
+ this[readableStreamQueue] = new InternalPackedArray(); |
+ this[readableStreamQueueSize] = 0; |
+ |
+ // TODO(domenic) consolidate booleans into a bit field? |
+ // TODO(domenic) use integers for state? (or put in bit field?) |
+ this[readableStreamState] = STATE_READABLE; |
+ this[readableStreamStarted] = false; |
+ this[readableStreamCloseRequested] = false; |
+ this[readableStreamPulling] = false; |
+ this[readableStreamPullAgain] = false; |
+ this[readableStreamReader] = undefined; |
+ |
+ this[readableStreamStoredError] = undefined; |
+ this[readableStreamStrategySize] = normalizedStrategy.size; |
+ this[readableStreamStrategyHWM] = normalizedStrategy.highWaterMark; |
+ |
+ // Avoid allocating a controller if the stream is going to be controlled externally (i.e. from C++) anyway. |
+ // All calls to underlyingSource methods will disregard their controller argument in such situations |
+ // (but see below). |
+ const controller = arguments[2] === createWithExternalControllerSentinel ? |
+ null : |
+ new ReadableStreamController(this); |
+ this[readableStreamController] = controller; |
+ |
+ // We need to pass ourself to the underlyingSource start method for externally-controlled streams. We |
+ // use the now-useless controller argument to do so. |
+ const argToStart = arguments[2] === createWithExternalControllerSentinel ? this : controller; |
+ |
+ const that = this; |
+ const startResult = CallOrNoop(underlyingSource, 'start', argToStart, 'underlyingSource.start'); |
+ thenPromise(Promise_resolve(startResult), |
+ function() { |
+ that[readableStreamStarted] = true; |
+ RequestReadableStreamPull(that); |
+ }, |
+ function(r) { |
+ if (that[readableStreamState] === STATE_READABLE) { |
+ return ErrorReadableStream(that, r); |
+ } |
+ } |
+ ); |
+ } |
+ |
+ cancel(reason) { |
+ if (IsReadableStream(this) === false) { |
+ return Promise_reject(new TypeError( |
+ 'ReadableStream.prototype.cancel can only be used on a ReadableStream')); |
+ } |
+ |
+ if (IsReadableStreamLocked(this) === true) { |
+ return Promise_reject(new TypeError( |
+ 'Cannot cancel a stream that already has a reader')); |
+ } |
+ |
+ return CancelReadableStream(this, reason); |
+ } |
+ |
+ getReader() { |
+ if (IsReadableStream(this) === false) { |
+ throw new TypeError('ReadableStream.prototype.getReader can only be used on a ReadableStream'); |
+ } |
+ |
+ return AcquireReadableStreamReader(this); |
+ } |
+ |
+ tee() { |
+ if (IsReadableStream(this) === false) { |
+ throw new TypeError('ReadableStream.prototype.tee can only be used on a ReadableStream'); |
+ } |
+ |
+ return TeeReadableStream(this); |
+ } |
+ } |
+ |
+ class ReadableStreamController { |
+ constructor(stream) { |
+ if (IsReadableStream(stream) === false) { |
+ throw new TypeError('ReadableStreamController can only be constructed with a ReadableStream instance'); |
+ } |
+ |
+ if (stream[readableStreamController] !== undefined) { |
+ throw new TypeError( |
+ 'ReadableStreamController instances can only be created by the ReadableStream constructor'); |
+ } |
+ |
+ this[readableStreamControllerControlledReadableStream] = stream; |
+ } |
+ |
+ get desiredSize() { |
+ if (IsReadableStreamController(this) === false) { |
+ throw new TypeError( |
+ 'ReadableStreamController.prototype.desiredSize can only be used on a ReadableStreamController'); |
+ } |
+ |
+ return GetReadableStreamDesiredSize(this[readableStreamControllerControlledReadableStream]); |
+ } |
+ |
+ close() { |
+ if (IsReadableStreamController(this) === false) { |
+ throw new TypeError( |
+ 'ReadableStreamController.prototype.close can only be used on a ReadableStreamController'); |
+ } |
+ |
+ const stream = this[readableStreamControllerControlledReadableStream]; |
+ |
+ if (stream[readableStreamCloseRequested] === true) { |
+ throw new TypeError('The stream has already been closed; do not close it again!'); |
+ } |
+ if (stream[readableStreamState] === STATE_ERRORED) { |
+ throw new TypeError('The stream is in an errored state and cannot be closed'); |
+ } |
+ |
+ return CloseReadableStream(stream); |
+ } |
+ |
+ enqueue(chunk) { |
+ if (IsReadableStreamController(this) === false) { |
+ throw new TypeError( |
+ 'ReadableStreamController.prototype.enqueue can only be used on a ReadableStreamController'); |
+ } |
+ |
+ const stream = this[readableStreamControllerControlledReadableStream]; |
+ |
+ if (stream[readableStreamState] === STATE_ERRORED) { |
+ throw stream[readableStreamStoredError]; |
+ } |
+ |
+ if (stream[readableStreamCloseRequested] === true) { |
+ throw new TypeError('stream is closed or draining'); |
+ } |
+ |
+ return EnqueueInReadableStream(stream, chunk); |
+ } |
+ |
+ error(e) { |
+ if (IsReadableStreamController(this) === false) { |
+ throw new TypeError( |
+ 'ReadableStreamController.prototype.error can only be used on a ReadableStreamController'); |
+ } |
+ |
+ const stream = this[readableStreamControllerControlledReadableStream]; |
+ |
+ const state = stream[readableStreamState]; |
+ if (state !== STATE_READABLE) { |
+ throw new TypeError(`The stream is ${state} and so cannot be errored`); |
+ } |
+ |
+ return ErrorReadableStream(stream, e); |
+ } |
+ } |
+ |
+ class ReadableStreamReader { |
+ constructor(stream) { |
+ if (IsReadableStream(stream) === false) { |
+ throw new TypeError('ReadableStreamReader can only be constructed with a ReadableStream instance'); |
+ } |
+ if (IsReadableStreamLocked(stream) === true) { |
+ throw new TypeError('This stream has already been locked for exclusive reading by another reader'); |
+ } |
+ |
+ stream[readableStreamReader] = this; |
+ this[readableStreamReaderOwnerReadableStream] = stream; |
+ |
+ // TODO(domenic): use integers for state? |
+ this[readableStreamReaderState] = STATE_READABLE; |
+ this[readableStreamReaderStoredError] = undefined; |
+ this[readableStreamReaderReadRequests] = new InternalPackedArray(); |
+ this[readableStreamReaderClosedPromise] = $promiseCreate(); |
+ |
+ const streamState = stream[readableStreamState]; |
+ if (streamState === STATE_CLOSED || streamState === STATE_ERRORED) { |
+ ReleaseReadableStreamReader(this); |
+ } |
+ } |
+ |
+ get closed() { |
+ if (IsReadableStreamReader(this) === false) { |
+ return Promise_reject( |
+ new TypeError('ReadableStreamReader.prototype.closed can only be used on a ReadableStreamReader')); |
+ } |
+ |
+ return this[readableStreamReaderClosedPromise]; |
+ } |
+ |
+ cancel(reason) { |
+ if (IsReadableStreamReader(this) === false) { |
+ return Promise_reject( |
+ new TypeError('ReadableStreamReader.prototype.cancel can only be used on a ReadableStreamReader')); |
+ } |
+ |
+ const state = this[readableStreamReaderState]; |
+ if (state === STATE_CLOSED) { |
+ return Promise_resolve(undefined); |
+ } |
+ |
+ if (state === STATE_ERRORED) { |
+ return Promise_reject(this[readableStreamReaderStoredError]); |
+ } |
+ |
+ return CancelReadableStream(this[readableStreamReaderOwnerReadableStream], reason); |
+ } |
+ |
+ read() { |
+ if (IsReadableStreamReader(this) === false) { |
+ return Promise_reject( |
+ new TypeError('ReadableStreamReader.prototype.read can only be used on a ReadableStreamReader')); |
+ } |
+ |
+ return ReadFromReadableStreamReader(this); |
+ } |
+ |
+ releaseLock() { |
+ if (IsReadableStreamReader(this) === false) { |
+ throw new TypeError( |
+ 'ReadableStreamReader.prototype.releaseLock can only be used on a ReadableStreamReader'); |
+ } |
+ |
+ if (this[readableStreamReaderOwnerReadableStream] === undefined) { |
+ return undefined; |
+ } |
+ |
+ if (this[readableStreamReaderReadRequests].length > 0) { |
+ throw new TypeError( |
+ 'Tried to release a reader lock when that reader has pending read() calls un-settled'); |
+ } |
+ |
+ return ReleaseReadableStreamReader(this); |
+ } |
+ } |
+ |
+ // |
+ // Readable stream abstract operations |
+ // |
+ |
+ function AcquireReadableStreamReader(stream) { |
+ return new ReadableStreamReader(stream); |
+ } |
+ |
+ function CancelReadableStream(stream, reason) { |
+ const state = stream[readableStreamState]; |
+ if (state === STATE_CLOSED) { |
+ return Promise_resolve(undefined); |
+ } |
+ if (state === STATE_ERRORED) { |
+ return Promise_reject(stream[readableStreamStoredError]); |
+ } |
+ |
+ stream[readableStreamQueue] = new InternalPackedArray(); |
+ FinishClosingReadableStream(stream); |
+ |
+ const underlyingSource = stream[readableStreamUnderlyingSource]; |
+ const sourceCancelPromise = PromiseCallOrNoop(underlyingSource, 'cancel', reason, 'underlyingSource.cancel'); |
+ return thenPromise(sourceCancelPromise, function() { return undefined; }); |
+ } |
+ |
+ function CloseReadableStream(stream) { |
+ if (stream[readableStreamState] === STATE_CLOSED) { |
+ return undefined; |
+ } |
+ |
+ stream[readableStreamCloseRequested] = true; |
+ |
+ if (stream[readableStreamQueue].length === 0) { |
+ return FinishClosingReadableStream(stream); |
+ } |
+ } |
+ |
+ |
+ function EnqueueInReadableStream(stream, chunk) { |
+ if (stream[readableStreamState] === STATE_CLOSED) { |
+ return undefined; |
+ } |
+ |
+ if (IsReadableStreamLocked(stream) === true && |
+ stream[readableStreamReader][readableStreamReaderReadRequests].length > 0) { |
+ const readRequest = stream[readableStreamReader][readableStreamReaderReadRequests].shift(); |
+ $promiseResolve(readRequest, CreateIterResultObject(chunk, false)); |
+ } else { |
+ let chunkSize = 1; |
+ |
+ const strategySize = stream[readableStreamStrategySize]; |
+ if (strategySize !== undefined) { |
+ try { |
+ chunkSize = strategySize(chunk); |
+ } catch (chunkSizeE) { |
+ ErrorReadableStream(stream, chunkSizeE); |
+ throw chunkSizeE; |
+ } |
+ } |
+ |
+ try { |
+ EnqueueValueWithSize(stream, chunk, chunkSize); |
+ } catch (enqueueE) { |
+ ErrorReadableStream(stream, enqueueE); |
+ throw enqueueE; |
+ } |
+ } |
+ |
+ RequestReadableStreamPull(stream); |
+ } |
+ |
+ function ErrorReadableStream(stream, e) { |
+ stream[readableStreamQueue] = new InternalPackedArray(); |
+ stream[readableStreamStoredError] = e; |
+ stream[readableStreamState] = STATE_ERRORED; |
+ |
+ if (IsReadableStreamLocked(stream) === true) { |
+ return ReleaseReadableStreamReader(stream[readableStreamReader]); |
+ } |
+ } |
+ |
+ function FinishClosingReadableStream(stream) { |
+ stream[readableStreamState] = STATE_CLOSED; |
+ |
+ if (IsReadableStreamLocked(stream) === true) { |
+ return ReleaseReadableStreamReader(stream[readableStreamReader]); |
+ } |
+ } |
+ |
+ function GetReadableStreamDesiredSize(stream) { |
+ const queueSize = GetTotalQueueSize(stream); |
+ return stream[readableStreamStrategyHWM] - queueSize; |
+ } |
+ |
+ function IsReadableStream(x) { |
+ return %HasOwnProperty(x, readableStreamUnderlyingSource); |
+ } |
+ |
+ function IsReadableStreamLocked(stream) { |
+ return stream[readableStreamReader] !== undefined; |
+ } |
+ |
+ function IsReadableStreamController(x) { |
+ return %HasOwnProperty(x, readableStreamControllerControlledReadableStream); |
+ } |
+ |
+ function IsReadableStreamReader(x) { |
+ return %HasOwnProperty(x, readableStreamReaderOwnerReadableStream); |
+ } |
+ |
+ function ReadFromReadableStreamReader(reader) { |
+ const state = reader[readableStreamReaderState]; |
+ if (state === STATE_CLOSED) { |
+ return Promise_resolve(CreateIterResultObject(undefined, true)); |
+ } |
+ |
+ if (state === STATE_ERRORED) { |
+ return Promise_reject(reader[readableStreamReaderStoredError]); |
+ } |
+ |
+ const ownerReadableStream = reader[readableStreamReaderOwnerReadableStream]; |
+ const queue = ownerReadableStream[readableStreamQueue]; |
+ if (queue.length > 0) { |
+ const chunk = DequeueValue(ownerReadableStream); |
+ |
+ if (ownerReadableStream[readableStreamCloseRequested] === true && queue.length === 0) { |
+ FinishClosingReadableStream(ownerReadableStream); |
+ } else { |
+ RequestReadableStreamPull(ownerReadableStream); |
+ } |
+ |
+ return Promise_resolve(CreateIterResultObject(chunk, false)); |
+ } else { |
+ const readRequest = $promiseCreate(); |
+ |
+ reader[readableStreamReaderReadRequests].push(readRequest); |
+ RequestReadableStreamPull(ownerReadableStream); |
+ return readRequest; |
+ } |
+ } |
+ |
+ function ReleaseReadableStreamReader(reader) { |
+ const ownerReadableStream = reader[readableStreamReaderOwnerReadableStream]; |
+ if (ownerReadableStream[readableStreamState] === STATE_ERRORED) { |
+ reader[readableStreamReaderState] = STATE_ERRORED; |
+ |
+ const e = ownerReadableStream[readableStreamStoredError]; |
+ reader[readableStreamReaderStoredError] = e; |
+ $promiseReject(reader[readableStreamReaderClosedPromise], e); |
+ |
+ const readRequests = reader[readableStreamReaderReadRequests]; |
+ for (let i = 0; i < readRequests.length; ++i) { |
+ $promiseReject(readRequests[i], e); |
+ } |
+ } else { |
+ reader[readableStreamReaderState] = STATE_CLOSED; |
+ $promiseResolve(reader[readableStreamReaderClosedPromise], undefined); |
+ |
+ const readRequests = reader[readableStreamReaderReadRequests]; |
+ for (let i = 0; i < readRequests.length; ++i) { |
+ $promiseResolve(readRequests[i], CreateIterResultObject(undefined, true)); |
+ } |
+ } |
+ |
+ reader[readableStreamReaderReadRequests] = new InternalPackedArray(); |
+ ownerReadableStream[readableStreamReader] = undefined; |
+ reader[readableStreamReaderOwnerReadableStream] = undefined; |
+ } |
+ |
+ function RequestReadableStreamPull(stream) { |
+ const shouldPull = ShouldReadableStreamPull(stream); |
+ if (shouldPull === false) { |
+ return undefined; |
+ } |
+ |
+ if (stream[readableStreamPulling] === true) { |
+ stream[readableStreamPullAgain] = true; |
+ return undefined; |
+ } |
+ |
+ stream[readableStreamPulling] = true; |
+ |
+ const underlyingSource = stream[readableStreamUnderlyingSource]; |
+ const controller = stream[readableStreamController]; |
+ const pullPromise = PromiseCallOrNoop(underlyingSource, 'pull', controller, 'underlyingSource.pull'); |
+ |
+ thenPromise(pullPromise, |
+ function() { |
+ stream[readableStreamPulling] = false; |
+ |
+ if (stream[readableStreamPullAgain] === true) { |
+ stream[readableStreamPullAgain] = false; |
+ return RequestReadableStreamPull(stream); |
+ } |
+ }, |
+ function(e) { |
+ if (stream[readableStreamState] === STATE_READABLE) { |
+ return ErrorReadableStream(stream, e); |
+ } |
+ } |
+ ); |
+ } |
+ |
+ function ShouldReadableStreamPull(stream) { |
+ const state = stream[readableStreamState]; |
+ if (state === STATE_CLOSED || state === STATE_ERRORED) { |
+ return false; |
+ } |
+ |
+ if (stream[readableStreamCloseRequested] === true) { |
+ return false; |
+ } |
+ |
+ if (stream[readableStreamStarted] === false) { |
+ return false; |
+ } |
+ |
+ if (IsReadableStreamLocked(stream) === true) { |
+ const reader = stream[readableStreamReader]; |
+ const readRequests = reader[readableStreamReaderReadRequests]; |
+ if (readRequests.length > 0) { |
+ return true; |
+ } |
+ } |
+ |
+ const desiredSize = GetReadableStreamDesiredSize(stream); |
+ if (desiredSize > 0) { |
+ return true; |
+ } |
+ |
+ return false; |
+ } |
+ |
+ // Potential future optimization: use class instances for the underlying sources, so that we don't re-create |
+ // closures every time. |
+ |
+ function TeeReadableStream(stream) { // shouldClone argument from spec not supported yet |
+ const reader = AcquireReadableStreamReader(stream); |
+ |
+ let closedOrErrored = false; |
+ let canceled1 = false; |
+ let canceled2 = false; |
+ let reason1; |
+ let reason2; |
+ let promise = $promiseCreate(); |
+ |
+ const branch1 = new ReadableStream({ |
+ pull, |
+ cancel: cancel1 |
+ }); |
+ |
+ const branch2 = new ReadableStream({ |
+ pull, |
+ cancel: cancel2 |
+ }); |
+ |
+ thenPromise(reader[readableStreamReaderClosedPromise], undefined, function (r) { |
+ if (closedOrErrored === true) { |
+ return; |
+ } |
+ |
+ ErrorReadableStream(branch1, r); |
+ ErrorReadableStream(branch2, r); |
+ closedOrErrored = true; |
+ }); |
+ |
+ return [branch1, branch2]; |
+ |
+ |
+ function pull() { |
+ return thenPromise(ReadFromReadableStreamReader(reader), function (result) { |
+ const value = result.value; |
+ const done = result.done; |
+ |
+ if (done === true && closedOrErrored === false) { |
+ CloseReadableStream(branch1); |
+ CloseReadableStream(branch2); |
+ closedOrErrored = true; |
+ } |
+ |
+ if (closedOrErrored === true) { |
+ return; |
+ } |
+ |
+ if (canceled1 === false) { |
+ EnqueueInReadableStream(branch1, value); |
+ } |
+ |
+ if (canceled2 === false) { |
+ EnqueueInReadableStream(branch2, value); |
+ } |
+ }); |
+ } |
+ |
+ function cancel1(reason) { |
+ canceled1 = true; |
+ reason1 = reason; |
+ |
+ if (canceled2 === true) { |
+ const compositeReason = [reason1, reason2]; |
+ const cancelResult = CancelReadableStream(stream, compositeReason); |
+ $promiseResolve(promise, cancelResult); |
+ } |
+ |
+ return promise; |
+ } |
+ |
+ function cancel2(reason) { |
+ canceled2 = true; |
+ reason2 = reason; |
+ |
+ if (canceled1 === true) { |
+ const compositeReason = [reason1, reason2]; |
+ const cancelResult = CancelReadableStream(stream, compositeReason); |
+ $promiseResolve(promise, cancelResult); |
+ } |
+ |
+ return promise; |
+ } |
+ } |
+ |
+ // |
+ // Queue-with-sizes |
+ // Modified from taking the queue (as in the spec) to taking the stream, so we can modify the queue size alongside. |
+ // |
+ |
+ function DequeueValue(stream) { |
+ const result = stream[readableStreamQueue].shift(); |
+ stream[readableStreamQueueSize] -= result.size; |
+ return result.value; |
+ } |
+ |
+ function EnqueueValueWithSize(stream, value, size) { |
+ size = Number(size); |
+ if (!Number_isFinite(size)) { |
+ throw new RangeError('size must be a finite, non-NaN number.'); |
+ } |
+ |
+ stream[readableStreamQueueSize] += size; |
+ stream[readableStreamQueue].push({ value, size }); |
+ } |
+ |
+ function GetTotalQueueSize(stream) { |
+ return stream[readableStreamQueueSize]; |
+ } |
+ |
+ // |
+ // Other helpers |
+ // |
+ |
+ function ValidateAndNormalizeQueuingStrategy(size, highWaterMark) { |
+ if (size !== undefined && typeof size !== 'function') { |
+ throw new TypeError('size property of a queuing strategy must be a function'); |
+ } |
+ |
+ highWaterMark = Number(highWaterMark); |
+ if (Number_isNaN(highWaterMark)) { |
+ throw new TypeError('highWaterMark property of a queuing strategy must be convertible to a non-NaN number'); |
+ } |
+ if (highWaterMark < 0) { |
+ throw new RangeError('highWaterMark property of a queuing strategy must be nonnegative'); |
+ } |
+ |
+ return { size, highWaterMark }; |
+ } |
+ |
+ function CallOrNoop(O, P, arg, nameForError) { // Modified from InvokeOrNoop in spec |
+ const method = O[P]; |
+ if (method === undefined) { |
+ return undefined; |
+ } |
+ if (typeof method !== 'function') { |
+ throw new TypeError(`${nameForError} must be a function or undefined`); |
+ } |
+ |
+ return %_CallFunction(O, arg, method); |
+ } |
+ |
+ |
+ function PromiseCallOrNoop(O, P, arg, nameForError) { // Modified from PromiseInvokeOrNoop in spec |
+ let method; |
+ try { |
+ method = O[P]; |
+ } catch (methodE) { |
+ return Promise_reject(methodE); |
+ } |
+ |
+ if (method === undefined) { |
+ return Promise_resolve(undefined); |
+ } |
+ |
+ if (typeof method !== 'function') { |
+ return Promise_reject(`${nameForError} must be a function or undefined`); |
+ } |
+ |
+ try { |
+ return Promise_resolve(%_CallFunction(O, arg, method)); |
+ } catch (e) { |
+ return Promise_reject(e); |
+ } |
+ } |
+ |
+ function CreateIterResultObject(value, done) { |
+ return { value, done }; |
+ } |
+ |
+ |
+ // |
+ // Additions to the global |
+ // |
+ |
+ %AddNamedProperty(global, 'ReadableStream', ReadableStream, DONT_ENUM); |
+ |
+ // |
+ // Exports for Blink to use |
+ // |
+ |
+ exports.ReadableStream = ReadableStream; |
+ exports.createWithExternalControllerSentinel = createWithExternalControllerSentinel; |
+ exports.ErrorReadableStream = ErrorReadableStream; |
+ exports.EnqueueInReadableStream = EnqueueInReadableStream; |
+ exports.CloseReadableStream = CloseReadableStream; |
+ exports.GetReadableStreamDesiredSize = GetReadableStreamDesiredSize; |
+ exports.IsReadableStreamLocked = IsReadableStreamLocked; |
+ |
+ exports.$readAllInternal = function(stream) { |
+ if (IsReadableStreamLocked(stream) === true) { |
+ throw new TypeError("Cannot read the queue of a locked stream"); |
+ } |
+ |
+ // Make sure to lock while doing anything that could call author code (e.g. calling pull). |
+ const reader = AcquireReadableStreamReader(stream); |
+ |
+ RequestReadableStreamPull(); |
+ |
+ const result = new InternalPackedArray(); |
+ |
+ const queue = stream[readableStreamQueue]; |
+ while (queue.length > 0) { |
+ result.push(DequeueValue(ownerReadableStream)); |
+ } |
+ |
+ if (stream[readableStreamCloseRequested] === true) { |
+ FinishClosingReadableStream(stream); |
+ } else { |
+ ReleaseReadableStreamReader(reader); |
+ } |
+ |
+ return result; |
+ }; |
+ |
+ exports.$stateInternal = function(stream) { |
+ return stream[readableStreamState]; |
+ }; |
+ |
+ exports.$storedErrorInternal = function(stream) { |
+ return stream[readableStreamStoredError]; |
+ }; |
+ |
+ exports.$lockForeverInternal = function(stream) { |
+ AcquireReadableStreamReader(stream); |
+ }; |
+}); |