Chromium Code Reviews| Index: third_party/WebKit/Source/core/streams/ReadableStream.js |
| diff --git a/third_party/WebKit/Source/core/streams/ReadableStream.js b/third_party/WebKit/Source/core/streams/ReadableStream.js |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..be25fa219310f57f96428135b37fce191e95807d |
| --- /dev/null |
| +++ b/third_party/WebKit/Source/core/streams/ReadableStream.js |
| @@ -0,0 +1,761 @@ |
| +(function(global, binding, v8) { |
| + 'use strict'; |
| + |
| + // TODO(domenic): factor out error messages, and reuse existing Blink or V8 |
| + // ones? |
| + |
| + const readableStreamCloseRequested = |
| + v8.createPrivateSymbol('[[closeRequested]]'); |
| + const readableStreamController = v8.createPrivateSymbol('[[controller]]'); |
| + const readableStreamPullAgain = v8.createPrivateSymbol('[[pullAgain]]'); |
| + const readableStreamPulling = v8.createPrivateSymbol('[[pulling]]'); |
| + const readableStreamQueue = v8.createPrivateSymbol('[[queue]]'); |
| + const readableStreamQueueSize = |
| + v8.createPrivateSymbol('[[queue]] total size'); |
| + const readableStreamReader = v8.createPrivateSymbol('[[reader]]'); |
| + const readableStreamStarted = v8.createPrivateSymbol('[[started]]'); |
| + const readableStreamState = v8.createPrivateSymbol('[[state]]'); |
| + const readableStreamStoredError = v8.createPrivateSymbol('[[storedError]]'); |
| + const readableStreamStrategySize = v8.createPrivateSymbol('[[strategySize]]'); |
| + const readableStreamStrategyHWM = v8.createPrivateSymbol('[[strategyHWM]]'); |
| + const readableStreamUnderlyingSource = |
| + v8.createPrivateSymbol('[[underlyingSource]]'); |
| + const readableStreamDisturbed = v8.createPrivateSymbol('[[disturbed]]'); |
| + |
| + const readableStreamControllerControlledReadableStream = |
| + v8.createPrivateSymbol('[[controlledReadableStream]]'); |
| + |
| + const readableStreamReaderClosedPromise = |
| + v8.createPrivateSymbol('[[closedPromise]]'); |
| + const readableStreamReaderOwnerReadableStream = |
| + v8.createPrivateSymbol('[[ownerReadableStream]]'); |
| + const readableStreamReaderReadRequests = |
| + v8.createPrivateSymbol('[[readRequests]]'); |
| + |
| + const STATE_READABLE = 0; |
| + const STATE_CLOSED = 1; |
| + const STATE_ERRORED = 2; |
| + |
| + const undefined = global.undefined; |
| + const Infinity = global.Infinity; |
| + |
| + const defineProperty = global.Object.defineProperty; |
| + const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty); |
| + const callFunction = v8.uncurryThis(global.Function.prototype.call); |
| + |
| + const TypeError = global.TypeError; |
| + const RangeError = global.RangeError; |
| + |
| + const Number = global.Number; |
| + const Number_isNaN = Number.isNaN; |
| + const Number_isFinite = Number.isFinite; |
| + |
| + const Promise = global.Promise; |
| + const thenPromise = v8.uncurryThis(Promise.prototype.then); |
| + const Promise_resolve = v8.simpleBind(Promise.resolve, Promise); |
| + const Promise_reject = v8.simpleBind(Promise.reject, Promise); |
| + |
| + class ReadableStream { |
| + constructor() { |
| + // TODO(domenic): when V8 gets default parameters and destructuring, all |
| + // this can be cleaned up. |
| + const underlyingSource = arguments[0] === undefined ? {} : arguments[0]; |
| + const strategy = arguments[1] === undefined ? {} : arguments[1]; |
| + const size = strategy.size; |
| + let highWaterMark = strategy.highWaterMark; |
| + if (highWaterMark === undefined) { |
| + highWaterMark = 1; |
| + } |
| + |
| + const normalizedStrategy = |
| + ValidateAndNormalizeQueuingStrategy(size, highWaterMark); |
| + |
| + this[readableStreamUnderlyingSource] = underlyingSource; |
| + |
| + this[readableStreamQueue] = new v8.InternalPackedArray(); |
| + this[readableStreamQueueSize] = 0; |
| + |
| + // TODO(domenic) consolidate booleans into a bit field? |
| + // TODO(domenic) use integers for state? (or put in bit field?) |
| + this[readableStreamState] = STATE_READABLE; |
| + this[readableStreamStarted] = false; |
| + this[readableStreamCloseRequested] = false; |
| + this[readableStreamPulling] = false; |
| + this[readableStreamPullAgain] = false; |
| + this[readableStreamReader] = undefined; |
| + this[readableStreamStoredError] = undefined; |
| + |
| + this[readableStreamDisturbed] = false; |
| + |
| + this[readableStreamStrategySize] = normalizedStrategy.size; |
| + this[readableStreamStrategyHWM] = normalizedStrategy.highWaterMark; |
| + |
| + const controller = new ReadableStreamController(this); |
| + this[readableStreamController] = controller; |
| + |
| + const that = this; |
| + const startResult = CallOrNoop( |
| + underlyingSource, 'start', controller, 'underlyingSource.start'); |
| + thenPromise(Promise_resolve(startResult), |
| + function() { |
| + that[readableStreamStarted] = true; |
| + RequestReadableStreamPull(that); |
| + }, |
| + function(r) { |
| + if (that[readableStreamState] === STATE_READABLE) { |
| + return ErrorReadableStream(that, r); |
| + } |
| + }); |
| + } |
| + |
| + get locked() { |
| + if (IsReadableStream(this) === false) { |
| + throw new TypeError( |
| + 'ReadableStream.prototype.locked can only be used on a ReadableStream'); |
| + } |
| + |
| + return IsReadableStreamLocked(this); |
| + } |
| + |
| + cancel(reason) { |
| + if (IsReadableStream(this) === false) { |
| + return Promise_reject(new TypeError( |
| + 'ReadableStream.prototype.cancel can only be used on a ReadableStream')); |
| + } |
| + |
| + if (IsReadableStreamLocked(this) === true) { |
| + return Promise_reject( |
| + new TypeError('Cannot cancel a stream that already has a reader')); |
| + } |
| + |
| + return CancelReadableStream(this, reason); |
| + } |
| + |
| + getReader() { |
| + if (IsReadableStream(this) === false) { |
| + throw new TypeError( |
| + 'ReadableStream.prototype.getReader can only be used on a ReadableStream'); |
| + } |
| + |
| + return AcquireReadableStreamReader(this); |
| + } |
| + |
| + tee() { |
| + if (IsReadableStream(this) === false) { |
| + throw new TypeError( |
| + 'ReadableStream.prototype.tee can only be used on a ReadableStream'); |
| + } |
| + |
| + return TeeReadableStream(this); |
| + } |
| + } |
| + |
| + class ReadableStreamController { |
| + constructor(stream) { |
| + if (IsReadableStream(stream) === false) { |
| + throw new TypeError( |
| + 'ReadableStreamController can only be constructed with a ReadableStream instance'); |
| + } |
| + |
| + if (stream[readableStreamController] !== undefined) { |
| + throw new TypeError( |
| + 'ReadableStreamController instances can only be created by the ReadableStream constructor'); |
| + } |
| + |
| + this[readableStreamControllerControlledReadableStream] = stream; |
| + } |
| + |
| + get desiredSize() { |
| + if (IsReadableStreamController(this) === false) { |
| + throw new TypeError( |
| + 'ReadableStreamController.prototype.desiredSize can only be used on a ReadableStreamController'); |
| + } |
| + |
| + return GetReadableStreamDesiredSize( |
| + this[readableStreamControllerControlledReadableStream]); |
| + } |
| + |
| + close() { |
| + if (IsReadableStreamController(this) === false) { |
| + throw new TypeError( |
| + 'ReadableStreamController.prototype.close can only be used on a ReadableStreamController'); |
| + } |
| + |
| + const stream = this[readableStreamControllerControlledReadableStream]; |
| + |
| + if (stream[readableStreamCloseRequested] === true) { |
| + throw new TypeError( |
| + 'The stream has already been closed; do not close it again!'); |
| + } |
| + if (stream[readableStreamState] === STATE_ERRORED) { |
| + throw new TypeError( |
| + 'The stream is in an errored state and cannot be closed'); |
| + } |
| + |
| + return CloseReadableStream(stream); |
| + } |
| + |
| + enqueue(chunk) { |
| + if (IsReadableStreamController(this) === false) { |
| + throw new TypeError( |
| + 'ReadableStreamController.prototype.enqueue can only be used on a ReadableStreamController'); |
| + } |
| + |
| + const stream = this[readableStreamControllerControlledReadableStream]; |
| + |
| + if (stream[readableStreamState] === STATE_ERRORED) { |
| + throw stream[readableStreamStoredError]; |
| + } |
| + |
| + if (stream[readableStreamCloseRequested] === true) { |
| + throw new TypeError('stream is closed or draining'); |
| + } |
| + |
| + return EnqueueInReadableStream(stream, chunk); |
| + } |
| + |
| + error(e) { |
| + if (IsReadableStreamController(this) === false) { |
| + throw new TypeError( |
| + 'ReadableStreamController.prototype.error can only be used on a ReadableStreamController'); |
| + } |
| + |
| + const stream = this[readableStreamControllerControlledReadableStream]; |
| + |
| + const state = stream[readableStreamState]; |
| + if (state !== STATE_READABLE) { |
| + throw new TypeError(`The stream is ${state} and so cannot be errored`); |
| + } |
| + |
| + return ErrorReadableStream(stream, e); |
| + } |
| + } |
| + |
| + class ReadableStreamReader { |
| + constructor(stream) { |
| + if (IsReadableStream(stream) === false) { |
| + throw new TypeError( |
| + 'ReadableStreamReader can only be constructed with a ReadableStream instance'); |
| + } |
| + if (IsReadableStreamLocked(stream) === true) { |
| + throw new TypeError( |
| + 'This stream has already been locked for exclusive reading by another reader'); |
| + } |
| + |
| + this[readableStreamReaderOwnerReadableStream] = stream; |
| + stream[readableStreamReader] = this; |
| + |
| + this[readableStreamReaderReadRequests] = new v8.InternalPackedArray(); |
| + |
| + switch (stream[readableStreamState]) { |
| + case STATE_READABLE: |
| + this[readableStreamReaderClosedPromise] = v8.createPromise(); |
| + break; |
| + case STATE_CLOSED: |
| + this[readableStreamReaderClosedPromise] = Promise_resolve(undefined); |
| + break; |
| + case STATE_ERRORED: |
| + this[readableStreamReaderClosedPromise] = |
| + Promise_reject(stream[readableStreamStoredError]); |
| + break; |
| + } |
| + } |
| + |
| + get closed() { |
| + if (IsReadableStreamReader(this) === false) { |
| + return Promise_reject(new TypeError( |
| + 'ReadableStreamReader.prototype.closed can only be used on a ReadableStreamReader')); |
| + } |
| + |
| + return this[readableStreamReaderClosedPromise]; |
| + } |
| + |
| + cancel(reason) { |
| + if (IsReadableStreamReader(this) === false) { |
| + return Promise_reject(new TypeError( |
| + 'ReadableStreamReader.prototype.cancel can only be used on a ReadableStreamReader')); |
| + } |
| + |
| + const stream = this[readableStreamReaderOwnerReadableStream]; |
| + if (stream === undefined) { |
| + return Promise_reject( |
| + new TypeError('Cannot cancel a stream using a released reader')); |
| + } |
| + |
| + return CancelReadableStream(stream, reason); |
| + } |
| + |
| + read() { |
| + if (IsReadableStreamReader(this) === false) { |
| + return Promise_reject(new TypeError( |
| + 'ReadableStreamReader.prototype.read can only be used on a ReadableStreamReader')); |
| + } |
| + |
| + if (this[readableStreamReaderOwnerReadableStream] === undefined) { |
| + return Promise_reject( |
| + new TypeError('Cannot read from a released reader')); |
| + } |
| + |
| + return ReadFromReadableStreamReader(this); |
| + } |
| + |
| + releaseLock() { |
| + if (IsReadableStreamReader(this) === false) { |
| + throw new TypeError( |
| + 'ReadableStreamReader.prototype.releaseLock can only be used on a ReadableStreamReader'); |
| + } |
| + |
| + const stream = this[readableStreamReaderOwnerReadableStream]; |
| + if (stream === undefined) { |
| + return undefined; |
| + } |
| + |
| + if (this[readableStreamReaderReadRequests].length > 0) { |
| + throw new TypeError( |
| + 'Tried to release a reader lock when that reader has pending read() calls un-settled'); |
| + } |
| + |
| + if (stream[readableStreamState] === STATE_READABLE) { |
| + v8.rejectPromise( |
| + this[readableStreamReaderClosedPromise], |
| + new TypeError( |
| + 'Reader was released and can no longer be used to monitor the stream\'s closedness')); |
| + } else { |
| + this[readableStreamReaderClosedPromise] = Promise_reject(new TypeError( |
| + 'Reader was released and can no longer be used to monitor the stream\'s closedness')); |
| + } |
| + |
| + this[readableStreamReaderOwnerReadableStream][readableStreamReader] = |
| + undefined; |
| + this[readableStreamReaderOwnerReadableStream] = undefined; |
| + } |
| + } |
| + |
| + // |
| + // Readable stream abstract operations |
| + // |
| + |
| + function AcquireReadableStreamReader(stream) { |
| + return new ReadableStreamReader(stream); |
| + } |
| + |
| + function CancelReadableStream(stream, reason) { |
| + stream[readableStreamDisturbed] = true; |
| + |
| + const state = stream[readableStreamState]; |
| + if (state === STATE_CLOSED) { |
| + return Promise_resolve(undefined); |
| + } |
| + if (state === STATE_ERRORED) { |
| + return Promise_reject(stream[readableStreamStoredError]); |
| + } |
| + |
| + stream[readableStreamQueue] = new v8.InternalPackedArray(); |
| + FinishClosingReadableStream(stream); |
| + |
| + const underlyingSource = stream[readableStreamUnderlyingSource]; |
| + const sourceCancelPromise = PromiseCallOrNoop( |
| + underlyingSource, 'cancel', reason, 'underlyingSource.cancel'); |
| + return thenPromise(sourceCancelPromise, function() { return undefined; }); |
| + } |
| + |
| + function CloseReadableStream(stream) { |
| + if (stream[readableStreamState] === STATE_CLOSED) { |
| + return undefined; |
| + } |
| + |
| + stream[readableStreamCloseRequested] = true; |
| + |
| + if (stream[readableStreamQueue].length === 0) { |
| + return FinishClosingReadableStream(stream); |
| + } |
| + } |
| + |
| + function EnqueueInReadableStream(stream, chunk) { |
| + if (stream[readableStreamState] === STATE_CLOSED) { |
| + return undefined; |
| + } |
| + |
| + if (IsReadableStreamLocked(stream) === true && |
| + stream[readableStreamReader][readableStreamReaderReadRequests].length > |
| + 0) { |
| + const readRequest = |
| + stream[readableStreamReader][readableStreamReaderReadRequests] |
| + .shift(); |
| + v8.resolvePromise(readRequest, CreateIterResultObject(chunk, false)); |
| + } else { |
| + let chunkSize = 1; |
| + |
| + const strategySize = stream[readableStreamStrategySize]; |
| + if (strategySize !== undefined) { |
| + try { |
| + chunkSize = strategySize(chunk); |
| + } catch (chunkSizeE) { |
| + ErrorReadableStream(stream, chunkSizeE); |
| + throw chunkSizeE; |
| + } |
| + } |
| + |
| + try { |
| + EnqueueValueWithSize(stream, chunk, chunkSize); |
| + } catch (enqueueE) { |
| + ErrorReadableStream(stream, enqueueE); |
| + throw enqueueE; |
| + } |
| + } |
| + |
| + RequestReadableStreamPull(stream); |
| + } |
| + |
| + function ErrorReadableStream(stream, e) { |
| + stream[readableStreamQueue] = new v8.InternalPackedArray(); |
| + stream[readableStreamStoredError] = e; |
| + stream[readableStreamState] = STATE_ERRORED; |
| + |
| + const reader = stream[readableStreamReader]; |
| + if (reader === undefined) { |
| + return undefined; |
| + } |
| + |
| + const readRequests = reader[readableStreamReaderReadRequests]; |
| + for (let i = 0; i < readRequests.length; ++i) { |
| + v8.rejectPromise(readRequests[i], e); |
| + } |
| + reader[readableStreamReaderReadRequests] = new v8.InternalPackedArray(); |
| + |
| + v8.rejectPromise(reader[readableStreamReaderClosedPromise], e); |
| + } |
| + |
| + function FinishClosingReadableStream(stream) { |
| + stream[readableStreamState] = STATE_CLOSED; |
| + |
| + const reader = stream[readableStreamReader]; |
| + if (reader === undefined) { |
| + return undefined; |
| + } |
| + |
| + |
| + const readRequests = reader[readableStreamReaderReadRequests]; |
| + for (let i = 0; i < readRequests.length; ++i) { |
| + v8.resolvePromise( |
| + readRequests[i], CreateIterResultObject(undefined, true)); |
| + } |
| + reader[readableStreamReaderReadRequests] = new v8.InternalPackedArray(); |
| + |
| + v8.resolvePromise(reader[readableStreamReaderClosedPromise], undefined); |
| + } |
| + |
| + function GetReadableStreamDesiredSize(stream) { |
| + const queueSize = GetTotalQueueSize(stream); |
| + return stream[readableStreamStrategyHWM] - queueSize; |
| + } |
| + |
| + // TODO can we use `in` instead of hasOwnProperty? |
|
yhirano
2015/10/14 10:28:49
TODO(domenic):
domenic
2015/10/14 16:25:31
Removed; I remembered why it was invalid to use `i
|
| + function IsReadableStream(x) { |
| + return hasOwnProperty(x, readableStreamUnderlyingSource); |
| + } |
| + |
| + function IsReadableStreamDisturbed(stream) { |
| + return stream[readableStreamDisturbed]; |
| + } |
| + |
| + function IsReadableStreamLocked(stream) { |
| + return stream[readableStreamReader] !== undefined; |
| + } |
| + |
| + function IsReadableStreamController(x) { |
| + return hasOwnProperty(x, readableStreamControllerControlledReadableStream); |
| + } |
| + |
| + function IsReadableStreamReader(x) { |
| + return hasOwnProperty(x, readableStreamReaderOwnerReadableStream); |
| + } |
| + |
| + function ReadFromReadableStreamReader(reader) { |
| + const stream = reader[readableStreamReaderOwnerReadableStream]; |
| + stream[readableStreamDisturbed] = true; |
| + |
| + if (stream[readableStreamState] === STATE_CLOSED) { |
| + return Promise_resolve(CreateIterResultObject(undefined, true)); |
| + } |
| + |
| + if (stream[readableStreamState] === STATE_ERRORED) { |
| + return Promise_reject(stream[readableStreamStoredError]); |
| + } |
| + |
| + const queue = stream[readableStreamQueue]; |
| + if (queue.length > 0) { |
| + const chunk = DequeueValue(stream); |
| + |
| + if (stream[readableStreamCloseRequested] === true && |
| + queue.length === 0) { |
| + FinishClosingReadableStream(stream); |
| + } else { |
| + RequestReadableStreamPull(stream); |
| + } |
| + |
| + return Promise_resolve(CreateIterResultObject(chunk, false)); |
| + } else { |
| + const readRequest = v8.createPromise(); |
| + |
| + reader[readableStreamReaderReadRequests].push(readRequest); |
| + RequestReadableStreamPull(stream); |
| + return readRequest; |
| + } |
| + } |
| + |
| + function RequestReadableStreamPull(stream) { |
| + const shouldPull = ShouldReadableStreamPull(stream); |
| + if (shouldPull === false) { |
| + return undefined; |
| + } |
| + |
| + if (stream[readableStreamPulling] === true) { |
| + stream[readableStreamPullAgain] = true; |
| + return undefined; |
| + } |
| + |
| + stream[readableStreamPulling] = true; |
| + |
| + const underlyingSource = stream[readableStreamUnderlyingSource]; |
| + const controller = stream[readableStreamController]; |
| + const pullPromise = PromiseCallOrNoop( |
| + underlyingSource, 'pull', controller, 'underlyingSource.pull'); |
| + |
| + thenPromise(pullPromise, |
| + function() { |
| + stream[readableStreamPulling] = false; |
| + |
| + if (stream[readableStreamPullAgain] === true) { |
| + stream[readableStreamPullAgain] = false; |
| + return RequestReadableStreamPull(stream); |
| + } |
| + }, |
| + function(e) { |
| + if (stream[readableStreamState] === STATE_READABLE) { |
| + return ErrorReadableStream(stream, e); |
| + } |
| + }); |
| + } |
| + |
| + function ShouldReadableStreamPull(stream) { |
| + const state = stream[readableStreamState]; |
| + if (state === STATE_CLOSED || state === STATE_ERRORED) { |
| + return false; |
| + } |
| + |
| + if (stream[readableStreamCloseRequested] === true) { |
| + return false; |
| + } |
| + |
| + if (stream[readableStreamStarted] === false) { |
| + return false; |
| + } |
| + |
| + if (IsReadableStreamLocked(stream) === true) { |
| + const reader = stream[readableStreamReader]; |
| + const readRequests = reader[readableStreamReaderReadRequests]; |
| + if (readRequests.length > 0) { |
| + return true; |
| + } |
| + } |
| + |
| + const desiredSize = GetReadableStreamDesiredSize(stream); |
| + if (desiredSize > 0) { |
| + return true; |
| + } |
| + |
| + return false; |
| + } |
| + |
| + // Potential future optimization: use class instances for the underlying |
| + // sources, so that we don't re-create |
| + // closures every time. |
| + |
| + function TeeReadableStream( |
| + stream) { // shouldClone argument from spec not supported yet |
|
yhirano
2015/10/14 10:28:49
TODO(domenic):
domenic
2015/10/14 16:25:31
done
|
| + const reader = AcquireReadableStreamReader(stream); |
| + |
| + let closedOrErrored = false; |
| + let canceled1 = false; |
| + let canceled2 = false; |
| + let reason1; |
| + let reason2; |
| + let promise = v8.createPromise(); |
| + |
| + const branch1 = new ReadableStream({pull, cancel: cancel1}); |
| + |
| + const branch2 = new ReadableStream({pull, cancel: cancel2}); |
| + |
| + thenPromise( |
| + reader[readableStreamReaderClosedPromise], undefined, function(r) { |
| + if (closedOrErrored === true) { |
| + return; |
| + } |
| + |
| + ErrorReadableStream(branch1, r); |
| + ErrorReadableStream(branch2, r); |
| + closedOrErrored = true; |
| + }); |
| + |
| + return [branch1, branch2]; |
| + |
| + |
| + function pull() { |
| + return thenPromise( |
| + ReadFromReadableStreamReader(reader), function(result) { |
| + const value = result.value; |
| + const done = result.done; |
| + |
| + if (done === true && closedOrErrored === false) { |
| + CloseReadableStream(branch1); |
| + CloseReadableStream(branch2); |
| + closedOrErrored = true; |
| + } |
| + |
| + if (closedOrErrored === true) { |
| + return; |
| + } |
| + |
| + if (canceled1 === false) { |
| + EnqueueInReadableStream(branch1, value); |
| + } |
| + |
| + if (canceled2 === false) { |
| + EnqueueInReadableStream(branch2, value); |
| + } |
| + }); |
| + } |
| + |
| + function cancel1(reason) { |
| + canceled1 = true; |
| + reason1 = reason; |
| + |
| + if (canceled2 === true) { |
| + const compositeReason = [reason1, reason2]; |
| + const cancelResult = CancelReadableStream(stream, compositeReason); |
| + v8.resolvePromise(promise, cancelResult); |
| + } |
| + |
| + return promise; |
| + } |
| + |
| + function cancel2(reason) { |
| + canceled2 = true; |
| + reason2 = reason; |
| + |
| + if (canceled1 === true) { |
| + const compositeReason = [reason1, reason2]; |
| + const cancelResult = CancelReadableStream(stream, compositeReason); |
| + v8.resolvePromise(promise, cancelResult); |
| + } |
| + |
| + return promise; |
| + } |
| + } |
| + |
| + // |
| + // Queue-with-sizes |
| + // Modified from taking the queue (as in the spec) to taking the stream, so we |
| + // can modify the queue size alongside. |
| + // |
| + |
| + function DequeueValue(stream) { |
| + const result = stream[readableStreamQueue].shift(); |
| + stream[readableStreamQueueSize] -= result.size; |
| + return result.value; |
| + } |
| + |
| + function EnqueueValueWithSize(stream, value, size) { |
| + size = Number(size); |
| + if (Number_isNaN(size) || size === +Infinity || size < 0) { |
| + throw new RangeError('size must be a finite, non-NaN, non-negative number.'); |
| + } |
| + |
| + stream[readableStreamQueueSize] += size; |
| + stream[readableStreamQueue].push({value, size}); |
| + } |
| + |
| + function GetTotalQueueSize(stream) { return stream[readableStreamQueueSize]; } |
| + |
| + // |
| + // Other helpers |
| + // |
| + |
| + function ValidateAndNormalizeQueuingStrategy(size, highWaterMark) { |
| + if (size !== undefined && typeof size !== 'function') { |
| + throw new TypeError( |
| + 'size property of a queuing strategy must be a function'); |
| + } |
| + |
| + highWaterMark = Number(highWaterMark); |
| + if (Number_isNaN(highWaterMark)) { |
| + throw new TypeError( |
| + 'highWaterMark property of a queuing strategy must be convertible to a non-NaN number'); |
| + } |
| + if (highWaterMark < 0) { |
| + throw new RangeError( |
| + 'highWaterMark property of a queuing strategy must be nonnegative'); |
| + } |
| + |
| + return {size, highWaterMark}; |
| + } |
| + |
| + // Modified from InvokeOrNoop in spec |
| + function CallOrNoop(O, P, arg, nameForError) { |
| + const method = O[P]; |
| + if (method === undefined) { |
| + return undefined; |
| + } |
| + if (typeof method !== 'function') { |
| + throw new TypeError(`${nameForError} must be a function or undefined`); |
| + } |
| + |
| + return callFunction(method, O, arg); |
| + } |
| + |
| + |
| + // Modified from PromiseInvokeOrNoop in spec |
| + function PromiseCallOrNoop(O, P, arg, nameForError) { |
| + let method; |
| + try { |
| + method = O[P]; |
| + } catch (methodE) { |
| + return Promise_reject(methodE); |
| + } |
| + |
| + if (method === undefined) { |
| + return Promise_resolve(undefined); |
| + } |
| + |
| + if (typeof method !== 'function') { |
| + return Promise_reject(`${nameForError} must be a function or undefined`); |
| + } |
| + |
| + try { |
| + return Promise_resolve(callFunction(method, O, arg)); |
| + } catch (e) { |
| + return Promise_reject(e); |
| + } |
| + } |
| + |
| + function CreateIterResultObject(value, done) { return {value, done}; } |
| + |
| + |
| + // |
| + // Additions to the global |
| + // |
| + |
| + defineProperty(global, 'ReadableStream', { |
| + value: ReadableStream, |
| + enumerable: false, |
| + configurable: true, |
| + writable: true |
| + }); |
| + |
| + // |
| + // Exports to Blink |
| + // |
| + |
| + binding.IsReadableStreamDisturbed = IsReadableStreamDisturbed; |
| +}); |