Chromium Code Reviews| Index: third_party/WebKit/Source/core/streams/ReadableStream.js |
| diff --git a/third_party/WebKit/Source/core/streams/ReadableStream.js b/third_party/WebKit/Source/core/streams/ReadableStream.js |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..e5869087d596d2c78933c42a815a71abad5f22b8 |
| --- /dev/null |
| +++ b/third_party/WebKit/Source/core/streams/ReadableStream.js |
| @@ -0,0 +1,761 @@ |
| +(function(global, binding, v8) { |
|
yhirano
2015/10/21 03:45:27
LICENSE
|
| + 'use strict'; |
| + |
| + const readableStreamController = v8.createPrivateSymbol('[[controller]]'); |
| + const readableStreamQueue = v8.createPrivateSymbol('[[queue]]'); |
| + const readableStreamQueueSize = |
| + v8.createPrivateSymbol('[[queue]] total size'); |
| + const readableStreamReader = v8.createPrivateSymbol('[[reader]]'); |
| + const readableStreamState = v8.createPrivateSymbol('[[state]]'); |
| + const readableStreamStoredError = v8.createPrivateSymbol('[[storedError]]'); |
| + const readableStreamStrategySize = v8.createPrivateSymbol('[[strategySize]]'); |
| + const readableStreamStrategyHWM = v8.createPrivateSymbol('[[strategyHWM]]'); |
| + const readableStreamUnderlyingSource = |
| + v8.createPrivateSymbol('[[underlyingSource]]'); |
| + |
| + const readableStreamControllerControlledReadableStream = |
| + v8.createPrivateSymbol('[[controlledReadableStream]]'); |
| + |
| + const readableStreamReaderClosedPromise = |
| + v8.createPrivateSymbol('[[closedPromise]]'); |
| + const readableStreamReaderOwnerReadableStream = |
| + v8.createPrivateSymbol('[[ownerReadableStream]]'); |
| + const readableStreamReaderReadRequests = |
| + v8.createPrivateSymbol('[[readRequests]]'); |
| + |
| + const STATE_READABLE = 0; |
| + const STATE_CLOSED = 1; |
| + const STATE_ERRORED = 2; |
| + |
| + const readableStreamBits = v8.createPrivateSymbol( |
| + 'bit field for [[started]], [[closeRequested]], [[pulling]], [[pullAgain]], [[disturbed]]'); |
| + const STARTED = 0b1; |
| + const CLOSE_REQUESTED = 0b10; |
| + const PULLING = 0b100; |
| + const PULL_AGAIN = 0b1000; |
| + const DISTURBED = 0b10000; |
| + |
| + const undefined = global.undefined; |
| + const Infinity = global.Infinity; |
| + |
| + const defineProperty = global.Object.defineProperty; |
| + const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty); |
| + const callFunction = v8.uncurryThis(global.Function.prototype.call); |
| + |
| + const TypeError = global.TypeError; |
| + const RangeError = global.RangeError; |
| + |
| + const Number = global.Number; |
| + const Number_isNaN = Number.isNaN; |
| + const Number_isFinite = Number.isFinite; |
| + |
| + const Promise = global.Promise; |
| + const thenPromise = v8.uncurryThis(Promise.prototype.then); |
| + const Promise_resolve = v8.simpleBind(Promise.resolve, Promise); |
| + const Promise_reject = v8.simpleBind(Promise.reject, Promise); |
| + |
| + const errIllegalInvocation = 'Illegal invocation'; |
| + const errIllegalConstructor = 'Illegal constructor'; |
| + const errCancelLockedStream = |
| + 'Cannot cancel a readable stream that is locked to a reader'; |
| + const errEnqueueInCloseRequestedStream = |
| + 'Cannot enqueue a chunk into a readable stream that is closed or has been requested to be closed'; |
| + const errCancelReleasedReader = |
| + 'This readable stream reader has been released and cannot be used to cancel its previous owner stream'; |
| + const errReadReleasedReader = |
| + 'This readable stream reader has been released and cannot be used to read from its previous owner stream'; |
| + const errCloseCloseRequestedStream = |
| + 'Cannot close a readable stream that has already been requested to be closed'; |
| + const errCloseErroredStream = 'Cannot close an errored readable stream'; |
| + const errErrorClosedStream = 'Cannot error a close readable stream'; |
| + const errErrorErroredStream = |
| + 'Cannot error a readable stream that is already errored'; |
| + const errReaderConstructorBadArgument = |
| + 'ReadableStreamReader constructor argument is not a readable stream'; |
| + const errReaderConstructorStreamAlreadyLocked = |
| + 'ReadableStreamReader constructor can only accept readable streams that are not yet locked to a reader'; |
| + const errReleaseReaderWithPendingRead = |
| + 'Cannot release a readable stream reader when it still has outstanding read() calls that have not yet settled'; |
| + const errReleasedReaderClosedPromise = |
| + 'This readable stream reader has been released and cannot be used to monitor the stream\'s state'; |
| + const errInvalidSize = |
| + 'The return value of a queuing strategy\'s size function must be a finite, non-NaN, non-negative number'; |
| + const errSizeNotAFunction = |
| + 'A queuing strategy\'s size property must be a function'; |
| + const errInvalidHWM = |
| + 'A queueing strategy\'s highWaterMark property must be a nonnegative, non-NaN number'; |
| + const errTmplMustBeFunctionOrUndefined = name => |
| + `${name} must be a function or undefined`; |
| + |
| + class ReadableStream { |
| + constructor() { |
| + // TODO(domenic): when V8 gets default parameters and destructuring, all |
| + // this can be cleaned up. |
| + const underlyingSource = arguments[0] === undefined ? {} : arguments[0]; |
| + const strategy = arguments[1] === undefined ? {} : arguments[1]; |
| + const size = strategy.size; |
| + let highWaterMark = strategy.highWaterMark; |
| + if (highWaterMark === undefined) { |
| + highWaterMark = 1; |
| + } |
| + |
| + const normalizedStrategy = |
| + ValidateAndNormalizeQueuingStrategy(size, highWaterMark); |
| + |
| + this[readableStreamUnderlyingSource] = underlyingSource; |
| + |
| + this[readableStreamQueue] = new v8.InternalPackedArray(); |
| + this[readableStreamQueueSize] = 0; |
| + |
| + this[readableStreamState] = STATE_READABLE; |
| + this[readableStreamBits] = 0b0; |
| + this[readableStreamReader] = undefined; |
| + this[readableStreamStoredError] = undefined; |
| + |
| + this[readableStreamStrategySize] = normalizedStrategy.size; |
| + this[readableStreamStrategyHWM] = normalizedStrategy.highWaterMark; |
| + |
| + const controller = new ReadableStreamController(this); |
| + this[readableStreamController] = controller; |
| + |
| + const startResult = CallOrNoop( |
| + underlyingSource, 'start', controller, 'underlyingSource.start'); |
| + thenPromise(Promise_resolve(startResult), |
| + () => { |
| + this[readableStreamBits] |= STARTED; |
| + RequestReadableStreamPull(this); |
| + }, |
| + r => { |
| + if (this[readableStreamState] === STATE_READABLE) { |
| + return ErrorReadableStream(this, r); |
| + } |
| + }); |
| + } |
| + |
| + get locked() { |
| + if (IsReadableStream(this) === false) { |
| + throw new TypeError(errIllegalInvocation); |
| + } |
| + |
| + return IsReadableStreamLocked(this); |
| + } |
| + |
| + cancel(reason) { |
| + if (IsReadableStream(this) === false) { |
| + return Promise_reject(new TypeError(errIllegalInvocation)); |
| + } |
| + |
| + if (IsReadableStreamLocked(this) === true) { |
| + return Promise_reject(new TypeError(errCancelLockedStream)); |
| + } |
| + |
| + return CancelReadableStream(this, reason); |
| + } |
| + |
| + getReader() { |
| + if (IsReadableStream(this) === false) { |
| + throw new TypeError(errIllegalInvocation); |
| + } |
| + |
| + return AcquireReadableStreamReader(this); |
| + } |
| + |
| + tee() { |
| + if (IsReadableStream(this) === false) { |
| + throw new TypeError(errIllegalInvocation); |
| + } |
| + |
| + return TeeReadableStream(this); |
| + } |
| + } |
| + |
| + class ReadableStreamController { |
| + constructor(stream) { |
| + if (IsReadableStream(stream) === false) { |
| + throw new TypeError(errIllegalConstructor); |
| + } |
| + |
| + if (stream[readableStreamController] !== undefined) { |
| + throw new TypeError(errIllegalConstructor); |
| + } |
| + |
| + this[readableStreamControllerControlledReadableStream] = stream; |
| + } |
| + |
| + get desiredSize() { |
| + if (IsReadableStreamController(this) === false) { |
| + throw new TypeError(errIllegalInvocation); |
| + } |
| + |
| + return GetReadableStreamDesiredSize( |
| + this[readableStreamControllerControlledReadableStream]); |
| + } |
| + |
| + close() { |
| + if (IsReadableStreamController(this) === false) { |
| + throw new TypeError(errIllegalInvocation); |
| + } |
| + |
| + const stream = this[readableStreamControllerControlledReadableStream]; |
| + |
| + if (stream[readableStreamBits] & CLOSE_REQUESTED) { |
| + throw new TypeError(errCloseCloseRequestedStream); |
| + } |
| + if (stream[readableStreamState] === STATE_ERRORED) { |
| + throw new TypeError(errCloseErroredStream); |
| + } |
| + |
| + return CloseReadableStream(stream); |
| + } |
| + |
| + enqueue(chunk) { |
| + if (IsReadableStreamController(this) === false) { |
| + throw new TypeError(errIllegalInvocation); |
| + } |
| + |
| + const stream = this[readableStreamControllerControlledReadableStream]; |
| + |
| + if (stream[readableStreamState] === STATE_ERRORED) { |
| + throw stream[readableStreamStoredError]; |
| + } |
| + |
| + if (stream[readableStreamBits] & CLOSE_REQUESTED) { |
| + throw new TypeError(errEnqueueInCloseRequestedStream); |
| + } |
| + |
| + return EnqueueInReadableStream(stream, chunk); |
| + } |
| + |
| + error(e) { |
| + if (IsReadableStreamController(this) === false) { |
| + throw new TypeError(errIllegalInvocation); |
| + } |
| + |
| + const stream = this[readableStreamControllerControlledReadableStream]; |
| + |
| + const state = stream[readableStreamState]; |
| + if (state !== STATE_READABLE) { |
| + if (state === STATE_ERRORED) { |
| + throw new TypeError(errErrorErroredStream); |
| + } |
| + if (state === STATE_CLOSED) { |
| + throw new TypeError(errErrorClosedStream); |
| + } |
| + } |
| + |
| + return ErrorReadableStream(stream, e); |
| + } |
| + } |
| + |
| + class ReadableStreamReader { |
| + constructor(stream) { |
| + if (IsReadableStream(stream) === false) { |
| + throw new TypeError(errReaderConstructorBadArgument); |
| + } |
| + if (IsReadableStreamLocked(stream) === true) { |
| + throw new TypeError(errReaderConstructorStreamAlreadyLocked); |
| + } |
| + |
| + this[readableStreamReaderOwnerReadableStream] = stream; |
| + stream[readableStreamReader] = this; |
| + |
| + this[readableStreamReaderReadRequests] = new v8.InternalPackedArray(); |
| + |
| + switch (stream[readableStreamState]) { |
| + case STATE_READABLE: |
| + this[readableStreamReaderClosedPromise] = v8.createPromise(); |
| + break; |
| + case STATE_CLOSED: |
| + this[readableStreamReaderClosedPromise] = Promise_resolve(undefined); |
| + break; |
| + case STATE_ERRORED: |
| + this[readableStreamReaderClosedPromise] = |
| + Promise_reject(stream[readableStreamStoredError]); |
| + break; |
| + } |
| + } |
| + |
| + get closed() { |
| + if (IsReadableStreamReader(this) === false) { |
| + return Promise_reject(new TypeError(errIllegalInvocation)); |
| + } |
| + |
| + return this[readableStreamReaderClosedPromise]; |
| + } |
| + |
| + cancel(reason) { |
| + if (IsReadableStreamReader(this) === false) { |
| + return Promise_reject(new TypeError(errIllegalInvocation)); |
| + } |
| + |
| + const stream = this[readableStreamReaderOwnerReadableStream]; |
| + if (stream === undefined) { |
| + return Promise_reject(new TypeError(errCancelReleasedReader)); |
| + } |
| + |
| + return CancelReadableStream(stream, reason); |
| + } |
| + |
| + read() { |
| + if (IsReadableStreamReader(this) === false) { |
| + return Promise_reject(new TypeError(errIllegalInvocation)); |
| + } |
| + |
| + if (this[readableStreamReaderOwnerReadableStream] === undefined) { |
| + return Promise_reject(new TypeError(errReadReleasedReader)); |
| + } |
| + |
| + return ReadFromReadableStreamReader(this); |
| + } |
| + |
| + releaseLock() { |
| + if (IsReadableStreamReader(this) === false) { |
| + throw new TypeError(errIllegalInvocation); |
| + } |
| + |
| + const stream = this[readableStreamReaderOwnerReadableStream]; |
| + if (stream === undefined) { |
| + return undefined; |
| + } |
| + |
| + if (this[readableStreamReaderReadRequests].length > 0) { |
| + throw new TypeError(errReleaseReaderWithPendingRead); |
| + } |
| + |
| + if (stream[readableStreamState] === STATE_READABLE) { |
| + v8.rejectPromise(this[readableStreamReaderClosedPromise], |
| + new TypeError(errReleasedReaderClosedPromise)); |
| + } else { |
| + this[readableStreamReaderClosedPromise] = Promise_reject(new TypeError( |
| + errReleasedReaderClosedPromise)); |
| + } |
| + |
| + this[readableStreamReaderOwnerReadableStream][readableStreamReader] = |
| + undefined; |
| + this[readableStreamReaderOwnerReadableStream] = undefined; |
| + } |
| + } |
| + |
| + // |
| + // Readable stream abstract operations |
| + // |
| + |
| + function AcquireReadableStreamReader(stream) { |
| + return new ReadableStreamReader(stream); |
| + } |
| + |
| + function CancelReadableStream(stream, reason) { |
| + stream[readableStreamBits] |= DISTURBED; |
| + |
| + const state = stream[readableStreamState]; |
| + if (state === STATE_CLOSED) { |
| + return Promise_resolve(undefined); |
| + } |
| + if (state === STATE_ERRORED) { |
| + return Promise_reject(stream[readableStreamStoredError]); |
| + } |
| + |
| + stream[readableStreamQueue] = new v8.InternalPackedArray(); |
| + FinishClosingReadableStream(stream); |
| + |
| + const underlyingSource = stream[readableStreamUnderlyingSource]; |
| + const sourceCancelPromise = PromiseCallOrNoop( |
| + underlyingSource, 'cancel', reason, 'underlyingSource.cancel'); |
| + return thenPromise(sourceCancelPromise, () => undefined); |
| + } |
| + |
| + function CloseReadableStream(stream) { |
| + if (stream[readableStreamState] === STATE_CLOSED) { |
| + return undefined; |
| + } |
| + |
| + stream[readableStreamBits] |= CLOSE_REQUESTED; |
| + |
| + if (stream[readableStreamQueue].length === 0) { |
| + return FinishClosingReadableStream(stream); |
| + } |
| + } |
| + |
| + function EnqueueInReadableStream(stream, chunk) { |
| + if (stream[readableStreamState] === STATE_CLOSED) { |
| + return undefined; |
| + } |
| + |
| + if (IsReadableStreamLocked(stream) === true && |
| + stream[readableStreamReader][readableStreamReaderReadRequests].length > |
| + 0) { |
| + const readRequest = |
| + stream[readableStreamReader][readableStreamReaderReadRequests] |
| + .shift(); |
| + v8.resolvePromise(readRequest, CreateIterResultObject(chunk, false)); |
| + } else { |
| + let chunkSize = 1; |
| + |
| + const strategySize = stream[readableStreamStrategySize]; |
| + if (strategySize !== undefined) { |
| + try { |
| + chunkSize = strategySize(chunk); |
| + } catch (chunkSizeE) { |
| + ErrorReadableStream(stream, chunkSizeE); |
| + throw chunkSizeE; |
| + } |
| + } |
| + |
| + try { |
| + EnqueueValueWithSize(stream, chunk, chunkSize); |
| + } catch (enqueueE) { |
| + ErrorReadableStream(stream, enqueueE); |
| + throw enqueueE; |
| + } |
| + } |
| + |
| + RequestReadableStreamPull(stream); |
| + } |
| + |
| + function ErrorReadableStream(stream, e) { |
| + stream[readableStreamQueue] = new v8.InternalPackedArray(); |
| + stream[readableStreamStoredError] = e; |
| + stream[readableStreamState] = STATE_ERRORED; |
| + |
| + const reader = stream[readableStreamReader]; |
| + if (reader === undefined) { |
| + return undefined; |
| + } |
| + |
| + const readRequests = reader[readableStreamReaderReadRequests]; |
| + for (let i = 0; i < readRequests.length; ++i) { |
| + v8.rejectPromise(readRequests[i], e); |
| + } |
| + reader[readableStreamReaderReadRequests] = new v8.InternalPackedArray(); |
| + |
| + v8.rejectPromise(reader[readableStreamReaderClosedPromise], e); |
| + } |
| + |
| + function FinishClosingReadableStream(stream) { |
| + stream[readableStreamState] = STATE_CLOSED; |
| + |
| + const reader = stream[readableStreamReader]; |
| + if (reader === undefined) { |
| + return undefined; |
| + } |
| + |
| + |
| + const readRequests = reader[readableStreamReaderReadRequests]; |
| + for (let i = 0; i < readRequests.length; ++i) { |
| + v8.resolvePromise( |
| + readRequests[i], CreateIterResultObject(undefined, true)); |
| + } |
| + reader[readableStreamReaderReadRequests] = new v8.InternalPackedArray(); |
| + |
| + v8.resolvePromise(reader[readableStreamReaderClosedPromise], undefined); |
| + } |
| + |
| + function GetReadableStreamDesiredSize(stream) { |
| + const queueSize = GetTotalQueueSize(stream); |
| + return stream[readableStreamStrategyHWM] - queueSize; |
| + } |
| + |
| + function IsReadableStream(x) { |
| + return hasOwnProperty(x, readableStreamUnderlyingSource); |
| + } |
| + |
| + function IsReadableStreamDisturbed(stream) { |
| + return stream[readableStreamBits] & DISTURBED; |
| + } |
| + |
| + function IsReadableStreamLocked(stream) { |
| + return stream[readableStreamReader] !== undefined; |
| + } |
| + |
| + function IsReadableStreamController(x) { |
| + return hasOwnProperty(x, readableStreamControllerControlledReadableStream); |
| + } |
| + |
| + function IsReadableStreamReader(x) { |
| + return hasOwnProperty(x, readableStreamReaderOwnerReadableStream); |
| + } |
| + |
| + function ReadFromReadableStreamReader(reader) { |
| + const stream = reader[readableStreamReaderOwnerReadableStream]; |
| + stream[readableStreamBits] |= DISTURBED; |
| + |
| + if (stream[readableStreamState] === STATE_CLOSED) { |
| + return Promise_resolve(CreateIterResultObject(undefined, true)); |
| + } |
| + |
| + if (stream[readableStreamState] === STATE_ERRORED) { |
| + return Promise_reject(stream[readableStreamStoredError]); |
| + } |
| + |
| + const queue = stream[readableStreamQueue]; |
| + if (queue.length > 0) { |
| + const chunk = DequeueValue(stream); |
| + |
| + if (stream[readableStreamBits] & CLOSE_REQUESTED && queue.length === 0) { |
| + FinishClosingReadableStream(stream); |
| + } else { |
| + RequestReadableStreamPull(stream); |
| + } |
| + |
| + return Promise_resolve(CreateIterResultObject(chunk, false)); |
| + } else { |
| + const readRequest = v8.createPromise(); |
| + |
| + reader[readableStreamReaderReadRequests].push(readRequest); |
| + RequestReadableStreamPull(stream); |
| + return readRequest; |
| + } |
| + } |
| + |
| + function RequestReadableStreamPull(stream) { |
| + const shouldPull = ShouldReadableStreamPull(stream); |
| + if (shouldPull === false) { |
| + return undefined; |
| + } |
| + |
| + if (stream[readableStreamBits] & PULLING) { |
| + stream[readableStreamBits] |= PULL_AGAIN; |
| + return undefined; |
| + } |
| + |
| + stream[readableStreamBits] |= PULLING; |
| + |
| + const underlyingSource = stream[readableStreamUnderlyingSource]; |
| + const controller = stream[readableStreamController]; |
| + const pullPromise = PromiseCallOrNoop( |
| + underlyingSource, 'pull', controller, 'underlyingSource.pull'); |
| + |
| + thenPromise(pullPromise, |
| + () => { |
| + stream[readableStreamBits] &= ~PULLING; |
| + |
| + if (stream[readableStreamBits] & PULL_AGAIN) { |
| + stream[readableStreamBits] &= ~PULL_AGAIN; |
| + return RequestReadableStreamPull(stream); |
| + } |
| + }, |
| + e => { |
| + if (stream[readableStreamState] === STATE_READABLE) { |
| + return ErrorReadableStream(stream, e); |
| + } |
| + }); |
| + } |
| + |
| + function ShouldReadableStreamPull(stream) { |
| + const state = stream[readableStreamState]; |
| + if (state === STATE_CLOSED || state === STATE_ERRORED) { |
| + return false; |
| + } |
| + |
| + if (stream[readableStreamBits] & CLOSE_REQUESTED) { |
| + return false; |
| + } |
| + |
| + if (!(stream[readableStreamBits] & STARTED)) { |
| + return false; |
| + } |
| + |
| + if (IsReadableStreamLocked(stream) === true) { |
| + const reader = stream[readableStreamReader]; |
| + const readRequests = reader[readableStreamReaderReadRequests]; |
| + if (readRequests.length > 0) { |
| + return true; |
| + } |
| + } |
| + |
| + const desiredSize = GetReadableStreamDesiredSize(stream); |
| + if (desiredSize > 0) { |
| + return true; |
| + } |
| + |
| + return false; |
| + } |
| + |
| + // Potential future optimization: use class instances for the underlying |
| + // sources, so that we don't re-create |
| + // closures every time. |
| + |
| + // TODO(domenic): shouldClone argument from spec not supported yet |
| + function TeeReadableStream(stream) { |
| + const reader = AcquireReadableStreamReader(stream); |
| + |
| + let closedOrErrored = false; |
| + let canceled1 = false; |
| + let canceled2 = false; |
| + let reason1; |
| + let reason2; |
| + let promise = v8.createPromise(); |
| + |
| + const branch1 = new ReadableStream({pull, cancel: cancel1}); |
| + |
| + const branch2 = new ReadableStream({pull, cancel: cancel2}); |
| + |
| + thenPromise( |
| + reader[readableStreamReaderClosedPromise], undefined, function(r) { |
| + if (closedOrErrored === true) { |
| + return; |
| + } |
| + |
| + ErrorReadableStream(branch1, r); |
| + ErrorReadableStream(branch2, r); |
| + closedOrErrored = true; |
| + }); |
| + |
| + return [branch1, branch2]; |
| + |
| + |
| + function pull() { |
| + return thenPromise( |
| + ReadFromReadableStreamReader(reader), function(result) { |
| + const value = result.value; |
| + const done = result.done; |
| + |
| + if (done === true && closedOrErrored === false) { |
| + CloseReadableStream(branch1); |
| + CloseReadableStream(branch2); |
| + closedOrErrored = true; |
| + } |
| + |
| + if (closedOrErrored === true) { |
| + return; |
| + } |
| + |
| + if (canceled1 === false) { |
| + EnqueueInReadableStream(branch1, value); |
| + } |
| + |
| + if (canceled2 === false) { |
| + EnqueueInReadableStream(branch2, value); |
| + } |
| + }); |
| + } |
| + |
| + function cancel1(reason) { |
| + canceled1 = true; |
| + reason1 = reason; |
| + |
| + if (canceled2 === true) { |
| + const compositeReason = [reason1, reason2]; |
| + const cancelResult = CancelReadableStream(stream, compositeReason); |
| + v8.resolvePromise(promise, cancelResult); |
| + } |
| + |
| + return promise; |
| + } |
| + |
| + function cancel2(reason) { |
| + canceled2 = true; |
| + reason2 = reason; |
| + |
| + if (canceled1 === true) { |
| + const compositeReason = [reason1, reason2]; |
| + const cancelResult = CancelReadableStream(stream, compositeReason); |
| + v8.resolvePromise(promise, cancelResult); |
| + } |
| + |
| + return promise; |
| + } |
| + } |
| + |
| + // |
| + // Queue-with-sizes |
| + // Modified from taking the queue (as in the spec) to taking the stream, so we |
| + // can modify the queue size alongside. |
| + // |
| + |
| + function DequeueValue(stream) { |
| + const result = stream[readableStreamQueue].shift(); |
| + stream[readableStreamQueueSize] -= result.size; |
| + return result.value; |
| + } |
| + |
| + function EnqueueValueWithSize(stream, value, size) { |
| + size = Number(size); |
| + if (Number_isNaN(size) || size === +Infinity || size < 0) { |
| + throw new RangeError(errInvalidSize); |
| + } |
| + |
| + stream[readableStreamQueueSize] += size; |
| + stream[readableStreamQueue].push({value, size}); |
| + } |
| + |
| + function GetTotalQueueSize(stream) { return stream[readableStreamQueueSize]; } |
| + |
| + // |
| + // Other helpers |
| + // |
| + |
| + function ValidateAndNormalizeQueuingStrategy(size, highWaterMark) { |
| + if (size !== undefined && typeof size !== 'function') { |
| + throw new TypeError(errSizeNotAFunction); |
| + } |
| + |
| + highWaterMark = Number(highWaterMark); |
| + if (Number_isNaN(highWaterMark)) { |
| + throw new TypeError(errInvalidHWM); |
| + } |
| + if (highWaterMark < 0) { |
| + throw new RangeError(errInvalidHWM); |
| + } |
| + |
| + return {size, highWaterMark}; |
| + } |
| + |
| + // Modified from InvokeOrNoop in spec |
| + function CallOrNoop(O, P, arg, nameForError) { |
| + const method = O[P]; |
| + if (method === undefined) { |
| + return undefined; |
| + } |
| + if (typeof method !== 'function') { |
| + throw new TypeError(errTmplMustBeFunctionOrUndefined(nameForError)); |
| + } |
| + |
| + return callFunction(method, O, arg); |
| + } |
| + |
| + |
| + // Modified from PromiseInvokeOrNoop in spec |
| + function PromiseCallOrNoop(O, P, arg, nameForError) { |
| + let method; |
| + try { |
| + method = O[P]; |
| + } catch (methodE) { |
| + return Promise_reject(methodE); |
| + } |
| + |
| + if (method === undefined) { |
| + return Promise_resolve(undefined); |
| + } |
| + |
| + if (typeof method !== 'function') { |
| + return Promise_reject(errTmplMustBeFunctionOrUndefined(nameForError)); |
| + } |
| + |
| + try { |
| + return Promise_resolve(callFunction(method, O, arg)); |
| + } catch (e) { |
| + return Promise_reject(e); |
| + } |
| + } |
| + |
| + function CreateIterResultObject(value, done) { return {value, done}; } |
| + |
| + |
| + // |
| + // Additions to the global |
| + // |
| + |
| + defineProperty(global, 'ReadableStream', { |
| + value: ReadableStream, |
| + enumerable: false, |
| + configurable: true, |
| + writable: true |
| + }); |
| + |
| + // |
| + // Exports to Blink |
| + // |
| + |
| + binding.IsReadableStreamDisturbed = IsReadableStreamDisturbed; |
| +}); |