| Index: Source/core/streams/ReadableStream2.js
|
| diff --git a/Source/core/streams/ReadableStream2.js b/Source/core/streams/ReadableStream2.js
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..4f0b24c21448bf82fd9f681a23b6b64d70112b27
|
| --- /dev/null
|
| +++ b/Source/core/streams/ReadableStream2.js
|
| @@ -0,0 +1,784 @@
|
| +(function(global, exports) {
|
| + 'use strict';
|
| +
|
| + // V8 "Imports":
|
| + // - %CreatePrivateOwnSymbol
|
| + // - %_CallFunction
|
| + // - %AddNamedProperty
|
| + // - %HasOwnProperty
|
| + // - $promiseThen, $promiseCreate, $promiseResolve, $promiseReject
|
| +
|
| + const readableStreamClosedPromise = %CreatePrivateOwnSymbol('[[closedPromise]]');
|
| + const readableStreamCloseRequested = %CreatePrivateOwnSymbol('[[closeRequested]]');
|
| + const readableStreamController = %CreatePrivateOwnSymbol('[[controller]]');
|
| + const readableStreamPullAgain = %CreatePrivateOwnSymbol('[[pullAgain]]');
|
| + const readableStreamPulling = %CreatePrivateOwnSymbol('[[pulling]]');
|
| + const readableStreamQueue = %CreatePrivateOwnSymbol('[[queue]]');
|
| + const readableStreamQueueSize = %CreatePrivateOwnSymbol('[[queue]] total size');
|
| + const readableStreamReader = %CreatePrivateOwnSymbol('[[reader]]');
|
| + const readableStreamStarted = %CreatePrivateOwnSymbol('[[started]]');
|
| + const readableStreamState = %CreatePrivateOwnSymbol('[[state]]');
|
| + const readableStreamStoredError = %CreatePrivateOwnSymbol('[[storedError]]');
|
| + const readableStreamStrategySize = %CreatePrivateOwnSymbol('[[strategySize]]');
|
| + const readableStreamStrategyHWM = %CreatePrivateOwnSymbol('[[strategyHWM]]');
|
| + const readableStreamUnderlyingSource = %CreatePrivateOwnSymbol('[[underlyingSource]]');
|
| +
|
| + const readableStreamControllerControlledReadableStream = %CreatePrivateOwnSymbol('[[controlledReadableStream]]');
|
| +
|
| + const readableStreamReaderClosedPromise = %CreatePrivateOwnSymbol('[[closedPromise]]');
|
| + const readableStreamReaderOwnerReadableStream = %CreatePrivateOwnSymbol('[[ownerReadableStream]]');
|
| + const readableStreamReaderReadRequests = %CreatePrivateOwnSymbol('[[readRequests]]');
|
| + const readableStreamReaderState = %CreatePrivateOwnSymbol('[[state]]');
|
| + const readableStreamReaderStoredError = %CreatePrivateOwnSymbol('[[storedError]]');
|
| +
|
| + const createWithExternalControllerSentinel = %CreatePrivateOwnSymbol(
|
| + 'flag for UA-controlled ReadableStreams to pass');
|
| +
|
| + const undefined = void 0;
|
| + const DONT_ENUM = 2;
|
| +
|
| + const STATE_READABLE = 0;
|
| + const STATE_CLOSED = 1;
|
| + const STATE_ERRORED = 2;
|
| +
|
| + const TypeError = global.TypeError;
|
| + const RangeError = global.RangeError;
|
| + const Promise = global.Promise;
|
| +
|
| + const Number = global.Number;
|
| + const Number_isNaN = global.Number.isNaN;
|
| + const Number_isFinite = global.Number.isFinite;
|
| +
|
| + // TODO(domenic): update to use InternalPackedArray once yangguo gives us access in extras
|
| + const InternalPackedArray = global.Array;
|
| +
|
| + function thenPromise(promise, f, r) {
|
| + return %_CallFunction(promise, f, r, $promiseThen);
|
| + }
|
| +
|
| + // Manually create "bound" versions since Function.prototype.bind is slow.
|
| + const Promise_resolve = (function() {
|
| + const unbound = Promise.resolve;
|
| + return function(x) {
|
| + return %_CallFunction(Promise, x, unbound);
|
| + };
|
| + })();
|
| +
|
| + const Promise_reject = (function() {
|
| + const unbound = Promise.reject;
|
| + return function(r) {
|
| + return %_CallFunction(Promise, r, unbound);
|
| + };
|
| + })();
|
| +
|
| + class ReadableStream {
|
| + constructor(underlyingSource, strategy) {
|
| + if (underlyingSource === undefined) {
|
| + underlyingSource = {};
|
| + }
|
| + if (strategy === undefined) {
|
| + strategy = {};
|
| + }
|
| + const size = strategy.size;
|
| + let highWaterMark = strategy.highWaterMark;
|
| + if (highWaterMark === undefined) {
|
| + highWaterMark = 1;
|
| + }
|
| +
|
| + const normalizedStrategy = ValidateAndNormalizeQueuingStrategy(size, highWaterMark);
|
| +
|
| + this[readableStreamUnderlyingSource] = underlyingSource;
|
| +
|
| + this[readableStreamQueue] = new InternalPackedArray();
|
| + this[readableStreamQueueSize] = 0;
|
| +
|
| + // TODO(domenic) consolidate booleans into a bit field?
|
| + // TODO(domenic) use integers for state? (or put in bit field?)
|
| + this[readableStreamState] = STATE_READABLE;
|
| + this[readableStreamStarted] = false;
|
| + this[readableStreamCloseRequested] = false;
|
| + this[readableStreamPulling] = false;
|
| + this[readableStreamPullAgain] = false;
|
| + this[readableStreamReader] = undefined;
|
| +
|
| + this[readableStreamStoredError] = undefined;
|
| + this[readableStreamStrategySize] = normalizedStrategy.size;
|
| + this[readableStreamStrategyHWM] = normalizedStrategy.highWaterMark;
|
| +
|
| + // Avoid allocating a controller if the stream is going to be controlled externally (i.e. from C++) anyway.
|
| + // All calls to underlyingSource methods will disregard their controller argument in such situations
|
| + // (but see below).
|
| + const controller = arguments[2] === createWithExternalControllerSentinel ?
|
| + null :
|
| + new ReadableStreamController(this);
|
| + this[readableStreamController] = controller;
|
| +
|
| + // We need to pass ourself to the underlyingSource start method for externally-controlled streams. We
|
| + // use the now-useless controller argument to do so.
|
| + const argToStart = arguments[2] === createWithExternalControllerSentinel ? this : controller;
|
| +
|
| + const that = this;
|
| + const startResult = CallOrNoop(underlyingSource, 'start', argToStart, 'underlyingSource.start');
|
| + thenPromise(Promise_resolve(startResult),
|
| + function() {
|
| + that[readableStreamStarted] = true;
|
| + RequestReadableStreamPull(that);
|
| + },
|
| + function(r) {
|
| + if (that[readableStreamState] === STATE_READABLE) {
|
| + return ErrorReadableStream(that, r);
|
| + }
|
| + }
|
| + );
|
| + }
|
| +
|
| + cancel(reason) {
|
| + if (IsReadableStream(this) === false) {
|
| + return Promise_reject(new TypeError(
|
| + 'ReadableStream.prototype.cancel can only be used on a ReadableStream'));
|
| + }
|
| +
|
| + if (IsReadableStreamLocked(this) === true) {
|
| + return Promise_reject(new TypeError(
|
| + 'Cannot cancel a stream that already has a reader'));
|
| + }
|
| +
|
| + return CancelReadableStream(this, reason);
|
| + }
|
| +
|
| + getReader() {
|
| + if (IsReadableStream(this) === false) {
|
| + throw new TypeError('ReadableStream.prototype.getReader can only be used on a ReadableStream');
|
| + }
|
| +
|
| + return AcquireReadableStreamReader(this);
|
| + }
|
| +
|
| + tee() {
|
| + if (IsReadableStream(this) === false) {
|
| + throw new TypeError('ReadableStream.prototype.tee can only be used on a ReadableStream');
|
| + }
|
| +
|
| + return TeeReadableStream(this);
|
| + }
|
| + }
|
| +
|
| + class ReadableStreamController {
|
| + constructor(stream) {
|
| + if (IsReadableStream(stream) === false) {
|
| + throw new TypeError('ReadableStreamController can only be constructed with a ReadableStream instance');
|
| + }
|
| +
|
| + if (stream[readableStreamController] !== undefined) {
|
| + throw new TypeError(
|
| + 'ReadableStreamController instances can only be created by the ReadableStream constructor');
|
| + }
|
| +
|
| + this[readableStreamControllerControlledReadableStream] = stream;
|
| + }
|
| +
|
| + get desiredSize() {
|
| + if (IsReadableStreamController(this) === false) {
|
| + throw new TypeError(
|
| + 'ReadableStreamController.prototype.desiredSize can only be used on a ReadableStreamController');
|
| + }
|
| +
|
| + return GetReadableStreamDesiredSize(this[readableStreamControllerControlledReadableStream]);
|
| + }
|
| +
|
| + close() {
|
| + if (IsReadableStreamController(this) === false) {
|
| + throw new TypeError(
|
| + 'ReadableStreamController.prototype.close can only be used on a ReadableStreamController');
|
| + }
|
| +
|
| + const stream = this[readableStreamControllerControlledReadableStream];
|
| +
|
| + if (stream[readableStreamCloseRequested] === true) {
|
| + throw new TypeError('The stream has already been closed; do not close it again!');
|
| + }
|
| + if (stream[readableStreamState] === STATE_ERRORED) {
|
| + throw new TypeError('The stream is in an errored state and cannot be closed');
|
| + }
|
| +
|
| + return CloseReadableStream(stream);
|
| + }
|
| +
|
| + enqueue(chunk) {
|
| + if (IsReadableStreamController(this) === false) {
|
| + throw new TypeError(
|
| + 'ReadableStreamController.prototype.enqueue can only be used on a ReadableStreamController');
|
| + }
|
| +
|
| + const stream = this[readableStreamControllerControlledReadableStream];
|
| +
|
| + if (stream[readableStreamState] === STATE_ERRORED) {
|
| + throw stream[readableStreamStoredError];
|
| + }
|
| +
|
| + if (stream[readableStreamCloseRequested] === true) {
|
| + throw new TypeError('stream is closed or draining');
|
| + }
|
| +
|
| + return EnqueueInReadableStream(stream, chunk);
|
| + }
|
| +
|
| + error(e) {
|
| + if (IsReadableStreamController(this) === false) {
|
| + throw new TypeError(
|
| + 'ReadableStreamController.prototype.error can only be used on a ReadableStreamController');
|
| + }
|
| +
|
| + const stream = this[readableStreamControllerControlledReadableStream];
|
| +
|
| + const state = stream[readableStreamState];
|
| + if (state !== STATE_READABLE) {
|
| + throw new TypeError(`The stream is ${state} and so cannot be errored`);
|
| + }
|
| +
|
| + return ErrorReadableStream(stream, e);
|
| + }
|
| + }
|
| +
|
| + class ReadableStreamReader {
|
| + constructor(stream) {
|
| + if (IsReadableStream(stream) === false) {
|
| + throw new TypeError('ReadableStreamReader can only be constructed with a ReadableStream instance');
|
| + }
|
| + if (IsReadableStreamLocked(stream) === true) {
|
| + throw new TypeError('This stream has already been locked for exclusive reading by another reader');
|
| + }
|
| +
|
| + stream[readableStreamReader] = this;
|
| + this[readableStreamReaderOwnerReadableStream] = stream;
|
| +
|
| + // TODO(domenic): use integers for state?
|
| + this[readableStreamReaderState] = STATE_READABLE;
|
| + this[readableStreamReaderStoredError] = undefined;
|
| + this[readableStreamReaderReadRequests] = new InternalPackedArray();
|
| + this[readableStreamReaderClosedPromise] = $promiseCreate();
|
| +
|
| + const streamState = stream[readableStreamState];
|
| + if (streamState === STATE_CLOSED || streamState === STATE_ERRORED) {
|
| + ReleaseReadableStreamReader(this);
|
| + }
|
| + }
|
| +
|
| + get closed() {
|
| + if (IsReadableStreamReader(this) === false) {
|
| + return Promise_reject(
|
| + new TypeError('ReadableStreamReader.prototype.closed can only be used on a ReadableStreamReader'));
|
| + }
|
| +
|
| + return this[readableStreamReaderClosedPromise];
|
| + }
|
| +
|
| + cancel(reason) {
|
| + if (IsReadableStreamReader(this) === false) {
|
| + return Promise_reject(
|
| + new TypeError('ReadableStreamReader.prototype.cancel can only be used on a ReadableStreamReader'));
|
| + }
|
| +
|
| + const state = this[readableStreamReaderState];
|
| + if (state === STATE_CLOSED) {
|
| + return Promise_resolve(undefined);
|
| + }
|
| +
|
| + if (state === STATE_ERRORED) {
|
| + return Promise_reject(this[readableStreamReaderStoredError]);
|
| + }
|
| +
|
| + return CancelReadableStream(this[readableStreamReaderOwnerReadableStream], reason);
|
| + }
|
| +
|
| + read() {
|
| + if (IsReadableStreamReader(this) === false) {
|
| + return Promise_reject(
|
| + new TypeError('ReadableStreamReader.prototype.read can only be used on a ReadableStreamReader'));
|
| + }
|
| +
|
| + return ReadFromReadableStreamReader(this);
|
| + }
|
| +
|
| + releaseLock() {
|
| + if (IsReadableStreamReader(this) === false) {
|
| + throw new TypeError(
|
| + 'ReadableStreamReader.prototype.releaseLock can only be used on a ReadableStreamReader');
|
| + }
|
| +
|
| + if (this[readableStreamReaderOwnerReadableStream] === undefined) {
|
| + return undefined;
|
| + }
|
| +
|
| + if (this[readableStreamReaderReadRequests].length > 0) {
|
| + throw new TypeError(
|
| + 'Tried to release a reader lock when that reader has pending read() calls un-settled');
|
| + }
|
| +
|
| + return ReleaseReadableStreamReader(this);
|
| + }
|
| + }
|
| +
|
| + //
|
| + // Readable stream abstract operations
|
| + //
|
| +
|
| + function AcquireReadableStreamReader(stream) {
|
| + return new ReadableStreamReader(stream);
|
| + }
|
| +
|
| + function CancelReadableStream(stream, reason) {
|
| + const state = stream[readableStreamState];
|
| + if (state === STATE_CLOSED) {
|
| + return Promise_resolve(undefined);
|
| + }
|
| + if (state === STATE_ERRORED) {
|
| + return Promise_reject(stream[readableStreamStoredError]);
|
| + }
|
| +
|
| + stream[readableStreamQueue] = new InternalPackedArray();
|
| + FinishClosingReadableStream(stream);
|
| +
|
| + const underlyingSource = stream[readableStreamUnderlyingSource];
|
| + const sourceCancelPromise = PromiseCallOrNoop(underlyingSource, 'cancel', reason, 'underlyingSource.cancel');
|
| + return thenPromise(sourceCancelPromise, function() { return undefined; });
|
| + }
|
| +
|
| + function CloseReadableStream(stream) {
|
| + if (stream[readableStreamState] === STATE_CLOSED) {
|
| + return undefined;
|
| + }
|
| +
|
| + stream[readableStreamCloseRequested] = true;
|
| +
|
| + if (stream[readableStreamQueue].length === 0) {
|
| + return FinishClosingReadableStream(stream);
|
| + }
|
| + }
|
| +
|
| +
|
| + function EnqueueInReadableStream(stream, chunk) {
|
| + if (stream[readableStreamState] === STATE_CLOSED) {
|
| + return undefined;
|
| + }
|
| +
|
| + if (IsReadableStreamLocked(stream) === true &&
|
| + stream[readableStreamReader][readableStreamReaderReadRequests].length > 0) {
|
| + const readRequest = stream[readableStreamReader][readableStreamReaderReadRequests].shift();
|
| + $promiseResolve(readRequest, CreateIterResultObject(chunk, false));
|
| + } else {
|
| + let chunkSize = 1;
|
| +
|
| + const strategySize = stream[readableStreamStrategySize];
|
| + if (strategySize !== undefined) {
|
| + try {
|
| + chunkSize = strategySize(chunk);
|
| + } catch (chunkSizeE) {
|
| + ErrorReadableStream(stream, chunkSizeE);
|
| + throw chunkSizeE;
|
| + }
|
| + }
|
| +
|
| + try {
|
| + EnqueueValueWithSize(stream, chunk, chunkSize);
|
| + } catch (enqueueE) {
|
| + ErrorReadableStream(stream, enqueueE);
|
| + throw enqueueE;
|
| + }
|
| + }
|
| +
|
| + RequestReadableStreamPull(stream);
|
| + }
|
| +
|
| + function ErrorReadableStream(stream, e) {
|
| + stream[readableStreamQueue] = new InternalPackedArray();
|
| + stream[readableStreamStoredError] = e;
|
| + stream[readableStreamState] = STATE_ERRORED;
|
| +
|
| + if (IsReadableStreamLocked(stream) === true) {
|
| + return ReleaseReadableStreamReader(stream[readableStreamReader]);
|
| + }
|
| + }
|
| +
|
| + function FinishClosingReadableStream(stream) {
|
| + stream[readableStreamState] = STATE_CLOSED;
|
| +
|
| + if (IsReadableStreamLocked(stream) === true) {
|
| + return ReleaseReadableStreamReader(stream[readableStreamReader]);
|
| + }
|
| + }
|
| +
|
| + function GetReadableStreamDesiredSize(stream) {
|
| + const queueSize = GetTotalQueueSize(stream);
|
| + return stream[readableStreamStrategyHWM] - queueSize;
|
| + }
|
| +
|
| + function IsReadableStream(x) {
|
| + return %HasOwnProperty(x, readableStreamUnderlyingSource);
|
| + }
|
| +
|
| + function IsReadableStreamLocked(stream) {
|
| + return stream[readableStreamReader] !== undefined;
|
| + }
|
| +
|
| + function IsReadableStreamController(x) {
|
| + return %HasOwnProperty(x, readableStreamControllerControlledReadableStream);
|
| + }
|
| +
|
| + function IsReadableStreamReader(x) {
|
| + return %HasOwnProperty(x, readableStreamReaderOwnerReadableStream);
|
| + }
|
| +
|
| + function ReadFromReadableStreamReader(reader) {
|
| + const state = reader[readableStreamReaderState];
|
| + if (state === STATE_CLOSED) {
|
| + return Promise_resolve(CreateIterResultObject(undefined, true));
|
| + }
|
| +
|
| + if (state === STATE_ERRORED) {
|
| + return Promise_reject(reader[readableStreamReaderStoredError]);
|
| + }
|
| +
|
| + const ownerReadableStream = reader[readableStreamReaderOwnerReadableStream];
|
| + const queue = ownerReadableStream[readableStreamQueue];
|
| + if (queue.length > 0) {
|
| + const chunk = DequeueValue(ownerReadableStream);
|
| +
|
| + if (ownerReadableStream[readableStreamCloseRequested] === true && queue.length === 0) {
|
| + FinishClosingReadableStream(ownerReadableStream);
|
| + } else {
|
| + RequestReadableStreamPull(ownerReadableStream);
|
| + }
|
| +
|
| + return Promise_resolve(CreateIterResultObject(chunk, false));
|
| + } else {
|
| + const readRequest = $promiseCreate();
|
| +
|
| + reader[readableStreamReaderReadRequests].push(readRequest);
|
| + RequestReadableStreamPull(ownerReadableStream);
|
| + return readRequest;
|
| + }
|
| + }
|
| +
|
| + function ReleaseReadableStreamReader(reader) {
|
| + const ownerReadableStream = reader[readableStreamReaderOwnerReadableStream];
|
| + if (ownerReadableStream[readableStreamState] === STATE_ERRORED) {
|
| + reader[readableStreamReaderState] = STATE_ERRORED;
|
| +
|
| + const e = ownerReadableStream[readableStreamStoredError];
|
| + reader[readableStreamReaderStoredError] = e;
|
| + $promiseReject(reader[readableStreamReaderClosedPromise], e);
|
| +
|
| + const readRequests = reader[readableStreamReaderReadRequests];
|
| + for (let i = 0; i < readRequests.length; ++i) {
|
| + $promiseReject(readRequests[i], e);
|
| + }
|
| + } else {
|
| + reader[readableStreamReaderState] = STATE_CLOSED;
|
| + $promiseResolve(reader[readableStreamReaderClosedPromise], undefined);
|
| +
|
| + const readRequests = reader[readableStreamReaderReadRequests];
|
| + for (let i = 0; i < readRequests.length; ++i) {
|
| + $promiseResolve(readRequests[i], CreateIterResultObject(undefined, true));
|
| + }
|
| + }
|
| +
|
| + reader[readableStreamReaderReadRequests] = new InternalPackedArray();
|
| + ownerReadableStream[readableStreamReader] = undefined;
|
| + reader[readableStreamReaderOwnerReadableStream] = undefined;
|
| + }
|
| +
|
| + function RequestReadableStreamPull(stream) {
|
| + const shouldPull = ShouldReadableStreamPull(stream);
|
| + if (shouldPull === false) {
|
| + return undefined;
|
| + }
|
| +
|
| + if (stream[readableStreamPulling] === true) {
|
| + stream[readableStreamPullAgain] = true;
|
| + return undefined;
|
| + }
|
| +
|
| + stream[readableStreamPulling] = true;
|
| +
|
| + const underlyingSource = stream[readableStreamUnderlyingSource];
|
| + const controller = stream[readableStreamController];
|
| + const pullPromise = PromiseCallOrNoop(underlyingSource, 'pull', controller, 'underlyingSource.pull');
|
| +
|
| + thenPromise(pullPromise,
|
| + function() {
|
| + stream[readableStreamPulling] = false;
|
| +
|
| + if (stream[readableStreamPullAgain] === true) {
|
| + stream[readableStreamPullAgain] = false;
|
| + return RequestReadableStreamPull(stream);
|
| + }
|
| + },
|
| + function(e) {
|
| + if (stream[readableStreamState] === STATE_READABLE) {
|
| + return ErrorReadableStream(stream, e);
|
| + }
|
| + }
|
| + );
|
| + }
|
| +
|
| + function ShouldReadableStreamPull(stream) {
|
| + const state = stream[readableStreamState];
|
| + if (state === STATE_CLOSED || state === STATE_ERRORED) {
|
| + return false;
|
| + }
|
| +
|
| + if (stream[readableStreamCloseRequested] === true) {
|
| + return false;
|
| + }
|
| +
|
| + if (stream[readableStreamStarted] === false) {
|
| + return false;
|
| + }
|
| +
|
| + if (IsReadableStreamLocked(stream) === true) {
|
| + const reader = stream[readableStreamReader];
|
| + const readRequests = reader[readableStreamReaderReadRequests];
|
| + if (readRequests.length > 0) {
|
| + return true;
|
| + }
|
| + }
|
| +
|
| + const desiredSize = GetReadableStreamDesiredSize(stream);
|
| + if (desiredSize > 0) {
|
| + return true;
|
| + }
|
| +
|
| + return false;
|
| + }
|
| +
|
| + // Potential future optimization: use class instances for the underlying sources, so that we don't re-create
|
| + // closures every time.
|
| +
|
| + function TeeReadableStream(stream) { // shouldClone argument from spec not supported yet
|
| + const reader = AcquireReadableStreamReader(stream);
|
| +
|
| + let closedOrErrored = false;
|
| + let canceled1 = false;
|
| + let canceled2 = false;
|
| + let reason1;
|
| + let reason2;
|
| + let promise = $promiseCreate();
|
| +
|
| + const branch1 = new ReadableStream({
|
| + pull,
|
| + cancel: cancel1
|
| + });
|
| +
|
| + const branch2 = new ReadableStream({
|
| + pull,
|
| + cancel: cancel2
|
| + });
|
| +
|
| + thenPromise(reader[readableStreamReaderClosedPromise], undefined, function (r) {
|
| + if (closedOrErrored === true) {
|
| + return;
|
| + }
|
| +
|
| + ErrorReadableStream(branch1, r);
|
| + ErrorReadableStream(branch2, r);
|
| + closedOrErrored = true;
|
| + });
|
| +
|
| + return [branch1, branch2];
|
| +
|
| +
|
| + function pull() {
|
| + return thenPromise(ReadFromReadableStreamReader(reader), function (result) {
|
| + const value = result.value;
|
| + const done = result.done;
|
| +
|
| + if (done === true && closedOrErrored === false) {
|
| + CloseReadableStream(branch1);
|
| + CloseReadableStream(branch2);
|
| + closedOrErrored = true;
|
| + }
|
| +
|
| + if (closedOrErrored === true) {
|
| + return;
|
| + }
|
| +
|
| + if (canceled1 === false) {
|
| + EnqueueInReadableStream(branch1, value);
|
| + }
|
| +
|
| + if (canceled2 === false) {
|
| + EnqueueInReadableStream(branch2, value);
|
| + }
|
| + });
|
| + }
|
| +
|
| + function cancel1(reason) {
|
| + canceled1 = true;
|
| + reason1 = reason;
|
| +
|
| + if (canceled2 === true) {
|
| + const compositeReason = [reason1, reason2];
|
| + const cancelResult = CancelReadableStream(stream, compositeReason);
|
| + $promiseResolve(promise, cancelResult);
|
| + }
|
| +
|
| + return promise;
|
| + }
|
| +
|
| + function cancel2(reason) {
|
| + canceled2 = true;
|
| + reason2 = reason;
|
| +
|
| + if (canceled1 === true) {
|
| + const compositeReason = [reason1, reason2];
|
| + const cancelResult = CancelReadableStream(stream, compositeReason);
|
| + $promiseResolve(promise, cancelResult);
|
| + }
|
| +
|
| + return promise;
|
| + }
|
| + }
|
| +
|
| + //
|
| + // Queue-with-sizes
|
| + // Modified from taking the queue (as in the spec) to taking the stream, so we can modify the queue size alongside.
|
| + //
|
| +
|
| + function DequeueValue(stream) {
|
| + const result = stream[readableStreamQueue].shift();
|
| + stream[readableStreamQueueSize] -= result.size;
|
| + return result.value;
|
| + }
|
| +
|
| + function EnqueueValueWithSize(stream, value, size) {
|
| + size = Number(size);
|
| + if (!Number_isFinite(size)) {
|
| + throw new RangeError('size must be a finite, non-NaN number.');
|
| + }
|
| +
|
| + stream[readableStreamQueueSize] += size;
|
| + stream[readableStreamQueue].push({ value, size });
|
| + }
|
| +
|
| + function GetTotalQueueSize(stream) {
|
| + return stream[readableStreamQueueSize];
|
| + }
|
| +
|
| + //
|
| + // Other helpers
|
| + //
|
| +
|
| + function ValidateAndNormalizeQueuingStrategy(size, highWaterMark) {
|
| + if (size !== undefined && typeof size !== 'function') {
|
| + throw new TypeError('size property of a queuing strategy must be a function');
|
| + }
|
| +
|
| + highWaterMark = Number(highWaterMark);
|
| + if (Number_isNaN(highWaterMark)) {
|
| + throw new TypeError('highWaterMark property of a queuing strategy must be convertible to a non-NaN number');
|
| + }
|
| + if (highWaterMark < 0) {
|
| + throw new RangeError('highWaterMark property of a queuing strategy must be nonnegative');
|
| + }
|
| +
|
| + return { size, highWaterMark };
|
| + }
|
| +
|
| + function CallOrNoop(O, P, arg, nameForError) { // Modified from InvokeOrNoop in spec
|
| + const method = O[P];
|
| + if (method === undefined) {
|
| + return undefined;
|
| + }
|
| + if (typeof method !== 'function') {
|
| + throw new TypeError(`${nameForError} must be a function or undefined`);
|
| + }
|
| +
|
| + return %_CallFunction(O, arg, method);
|
| + }
|
| +
|
| +
|
| + function PromiseCallOrNoop(O, P, arg, nameForError) { // Modified from PromiseInvokeOrNoop in spec
|
| + let method;
|
| + try {
|
| + method = O[P];
|
| + } catch (methodE) {
|
| + return Promise_reject(methodE);
|
| + }
|
| +
|
| + if (method === undefined) {
|
| + return Promise_resolve(undefined);
|
| + }
|
| +
|
| + if (typeof method !== 'function') {
|
| + return Promise_reject(`${nameForError} must be a function or undefined`);
|
| + }
|
| +
|
| + try {
|
| + return Promise_resolve(%_CallFunction(O, arg, method));
|
| + } catch (e) {
|
| + return Promise_reject(e);
|
| + }
|
| + }
|
| +
|
| + function CreateIterResultObject(value, done) {
|
| + return { value, done };
|
| + }
|
| +
|
| +
|
| + //
|
| + // Additions to the global
|
| + //
|
| +
|
| + %AddNamedProperty(global, 'ReadableStream', ReadableStream, DONT_ENUM);
|
| +
|
| + //
|
| + // Exports for Blink to use
|
| + //
|
| +
|
| + exports.ReadableStream = ReadableStream;
|
| + exports.createWithExternalControllerSentinel = createWithExternalControllerSentinel;
|
| + exports.ErrorReadableStream = ErrorReadableStream;
|
| + exports.EnqueueInReadableStream = EnqueueInReadableStream;
|
| + exports.CloseReadableStream = CloseReadableStream;
|
| + exports.GetReadableStreamDesiredSize = GetReadableStreamDesiredSize;
|
| + exports.IsReadableStreamLocked = IsReadableStreamLocked;
|
| +
|
| + exports.$readAllInternal = function(stream) {
|
| + if (IsReadableStreamLocked(stream) === true) {
|
| + throw new TypeError("Cannot read the queue of a locked stream");
|
| + }
|
| +
|
| + // Make sure to lock while doing anything that could call author code (e.g. calling pull).
|
| + const reader = AcquireReadableStreamReader(stream);
|
| +
|
| + RequestReadableStreamPull();
|
| +
|
| + const result = new InternalPackedArray();
|
| +
|
| + const queue = stream[readableStreamQueue];
|
| + while (queue.length > 0) {
|
| + result.push(DequeueValue(ownerReadableStream));
|
| + }
|
| +
|
| + if (stream[readableStreamCloseRequested] === true) {
|
| + FinishClosingReadableStream(stream);
|
| + } else {
|
| + ReleaseReadableStreamReader(reader);
|
| + }
|
| +
|
| + return result;
|
| + };
|
| +
|
| + exports.$stateInternal = function(stream) {
|
| + return stream[readableStreamState];
|
| + };
|
| +
|
| + exports.$storedErrorInternal = function(stream) {
|
| + return stream[readableStreamStoredError];
|
| + };
|
| +
|
| + exports.$lockForeverInternal = function(stream) {
|
| + AcquireReadableStreamReader(stream);
|
| + };
|
| +});
|
|
|