| Index: third_party/WebKit/Source/core/streams/ReadableStream.js
|
| diff --git a/third_party/WebKit/Source/core/streams/ReadableStream.js b/third_party/WebKit/Source/core/streams/ReadableStream.js
|
| index 644af5008f268f77c1451710fb000ffdbdcca3fc..c02dce5727a1ce6b946ca710ddf5d64c70ba7e83 100644
|
| --- a/third_party/WebKit/Source/core/streams/ReadableStream.js
|
| +++ b/third_party/WebKit/Source/core/streams/ReadableStream.js
|
| @@ -5,42 +5,53 @@
|
| (function(global, binding, v8) {
|
| 'use strict';
|
|
|
| - const readableStreamController = v8.createPrivateSymbol('[[controller]]');
|
| - const readableStreamQueue = v8.createPrivateSymbol('[[queue]]');
|
| - const readableStreamQueueSize =
|
| - v8.createPrivateSymbol('[[queue]] total size');
|
| const readableStreamReader = v8.createPrivateSymbol('[[reader]]');
|
| - const readableStreamState = v8.createPrivateSymbol('[[state]]');
|
| const readableStreamStoredError = v8.createPrivateSymbol('[[storedError]]');
|
| - const readableStreamStrategySize = v8.createPrivateSymbol('[[strategySize]]');
|
| - const readableStreamStrategyHWM = v8.createPrivateSymbol('[[strategyHWM]]');
|
| - const readableStreamUnderlyingSource =
|
| - v8.createPrivateSymbol('[[underlyingSource]]');
|
| -
|
| - const readableStreamControllerControlledReadableStream =
|
| - v8.createPrivateSymbol('[[controlledReadableStream]]');
|
| + const readableStreamController = v8.createPrivateSymbol('[[controller]]');
|
|
|
| const readableStreamReaderClosedPromise =
|
| v8.createPrivateSymbol('[[closedPromise]]');
|
| const readableStreamReaderOwnerReadableStream =
|
| v8.createPrivateSymbol('[[ownerReadableStream]]');
|
| - const readableStreamReaderReadRequests =
|
| +
|
| + const readableStreamDefaultReaderReadRequests =
|
| v8.createPrivateSymbol('[[readRequests]]');
|
|
|
| const createWithExternalControllerSentinel =
|
| v8.createPrivateSymbol('flag for UA-created ReadableStream to pass');
|
|
|
| + const readableStreamBits = v8.createPrivateSymbol('bit field for [[state]] and [[disturbed]]');
|
| + const DISTURBED = 0b1;
|
| + // The 2nd and 3rd bit are for [[state]].
|
| + const STATE_MASK = 0b110;
|
| + const STATE_BITS_OFFSET = 1;
|
| const STATE_READABLE = 0;
|
| const STATE_CLOSED = 1;
|
| const STATE_ERRORED = 2;
|
|
|
| - const readableStreamBits = v8.createPrivateSymbol(
|
| - 'bit field for [[started]], [[closeRequested]], [[pulling]], [[pullAgain]], [[disturbed]]');
|
| + const readableStreamDefaultControllerUnderlyingSource =
|
| + v8.createPrivateSymbol('[[underlyingSource]]');
|
| + const readableStreamDefaultControllerControlledReadableStream =
|
| + v8.createPrivateSymbol('[[controlledReadableStream]]');
|
| + const readableStreamDefaultControllerQueue = v8.createPrivateSymbol('[[queue]]');
|
| + const readableStreamDefaultControllerQueueSize =
|
| + v8.createPrivateSymbol('[[queue]] total size');
|
| + const readableStreamDefaultControllerStrategySize =
|
| + v8.createPrivateSymbol('[[strategySize]]');
|
| + const readableStreamDefaultControllerStrategyHWM =
|
| + v8.createPrivateSymbol('[[strategyHWM]]');
|
| +
|
| + const readableStreamDefaultControllerBits = v8.createPrivateSymbol(
|
| + 'bit field for [[started]], [[closeRequested]], [[pulling]], [[pullAgain]]');
|
| const STARTED = 0b1;
|
| const CLOSE_REQUESTED = 0b10;
|
| const PULLING = 0b100;
|
| const PULL_AGAIN = 0b1000;
|
| - const DISTURBED = 0b10000;
|
| + const EXTERNALLY_CONTROLLED = 0b10000;
|
| +
|
| + const readableStreamControllerCancel =
|
| + v8.createPrivateSymbol('[[InternalCancel]]');
|
| + const readableStreamControllerPull = v8.createPrivateSymbol('[[InternalPull]]');
|
|
|
| const undefined = global.undefined;
|
| const Infinity = global.Infinity;
|
| @@ -65,7 +76,7 @@
|
| const errIllegalConstructor = 'Illegal constructor';
|
| const errCancelLockedStream =
|
| 'Cannot cancel a readable stream that is locked to a reader';
|
| - const errEnqueueInCloseRequestedStream =
|
| + const errEnqueueCloseRequestedStream =
|
| 'Cannot enqueue a chunk into a readable stream that is closed or has been requested to be closed';
|
| const errCancelReleasedReader =
|
| 'This readable stream reader has been released and cannot be used to cancel its previous owner stream';
|
| @@ -73,10 +84,15 @@
|
| 'This readable stream reader has been released and cannot be used to read from its previous owner stream';
|
| const errCloseCloseRequestedStream =
|
| 'Cannot close a readable stream that has already been requested to be closed';
|
| + const errEnqueueClosedStream = 'Cannot enqueue a chunk into a closed readable stream';
|
| + const errEnqueueErroredStream = 'Cannot enqueue a chunk into an errored readable stream';
|
| + const errCloseClosedStream = 'Cannot close a closed readable stream';
|
| const errCloseErroredStream = 'Cannot close an errored readable stream';
|
| const errErrorClosedStream = 'Cannot error a close readable stream';
|
| const errErrorErroredStream =
|
| 'Cannot error a readable stream that is already errored';
|
| + const errGetReaderNotByteStream = 'This readable stream does not support BYOB readers';
|
| + const errGetReaderBadMode = 'Invalid reader mode given: expected undefined or "byob"';
|
| const errReaderConstructorBadArgument =
|
| 'ReadableStreamReader constructor argument is not a readable stream';
|
| const errReaderConstructorStreamAlreadyLocked =
|
| @@ -106,50 +122,28 @@
|
| highWaterMark = 1;
|
| }
|
|
|
| - const normalizedStrategy =
|
| - ValidateAndNormalizeQueuingStrategy(size, highWaterMark);
|
| -
|
| - this[readableStreamUnderlyingSource] = underlyingSource;
|
| -
|
| - this[readableStreamQueue] = new v8.InternalPackedArray();
|
| - this[readableStreamQueueSize] = 0;
|
| -
|
| - this[readableStreamState] = STATE_READABLE;
|
| this[readableStreamBits] = 0b0;
|
| + ReadableStreamSetState(this, STATE_READABLE);
|
| this[readableStreamReader] = undefined;
|
| this[readableStreamStoredError] = undefined;
|
|
|
| - this[readableStreamStrategySize] = normalizedStrategy.size;
|
| - this[readableStreamStrategyHWM] = normalizedStrategy.highWaterMark;
|
| -
|
| // Avoid allocating the controller if the stream is going to be controlled
|
| // externally (i.e. from C++) anyway. All calls to underlyingSource
|
| // methods will disregard their controller argument in such situations
|
| // (but see below).
|
|
|
| - const isControlledExternally =
|
| - arguments[2] === createWithExternalControllerSentinel;
|
| - const controller =
|
| - isControlledExternally ? null : new ReadableStreamController(this);
|
| - this[readableStreamController] = controller;
|
| + this[readableStreamController] = undefined;
|
|
|
| - // We need to pass ourself to the underlyingSource start method for
|
| - // externally-controlled streams. We use the now-useless controller
|
| - // argument to do so.
|
| - const argToStart = isControlledExternally ? this : controller;
|
| + const type = underlyingSource.type;
|
| + const typeString = String(type);
|
| + if (typeString === 'bytes') {
|
| + throw new RangeError('bytes type is not yet implemented');
|
| + } else if (type !== undefined) {
|
| + throw new RangeError('Invalid type is specified');
|
| + }
|
|
|
| - const startResult = CallOrNoop(
|
| - underlyingSource, 'start', argToStart, 'underlyingSource.start');
|
| - thenPromise(Promise_resolve(startResult),
|
| - () => {
|
| - this[readableStreamBits] |= STARTED;
|
| - RequestReadableStreamPull(this);
|
| - },
|
| - r => {
|
| - if (this[readableStreamState] === STATE_READABLE) {
|
| - return ErrorReadableStream(this, r);
|
| - }
|
| - });
|
| + this[readableStreamController] =
|
| + new ReadableStreamDefaultController(this, underlyingSource, size, highWaterMark, arguments[2] === createWithExternalControllerSentinel);
|
| }
|
|
|
| get locked() {
|
| @@ -169,15 +163,27 @@
|
| return Promise_reject(new TypeError(errCancelLockedStream));
|
| }
|
|
|
| - return CancelReadableStream(this, reason);
|
| + return ReadableStreamCancel(this, reason);
|
| }
|
|
|
| - getReader() {
|
| + getReader({ mode } = {}) {
|
| if (IsReadableStream(this) === false) {
|
| throw new TypeError(errIllegalInvocation);
|
| }
|
|
|
| - return AcquireReadableStreamReader(this);
|
| + if (mode === 'byob') {
|
| + if (IsReadableByteStreamDefaultController(this[readableStreamController]) === false) {
|
| + throw new TypeError(errGetReaderNotByteStream);
|
| + }
|
| +
|
| + return AcquireReadableStreamBYOBReader(this);
|
| + }
|
| +
|
| + if (mode === undefined) {
|
| + return AcquireReadableStreamDefaultReader(this);
|
| + }
|
| +
|
| + throw new RangeError(errGetReaderBadMode);
|
| }
|
|
|
| tee() {
|
| @@ -185,12 +191,12 @@
|
| throw new TypeError(errIllegalInvocation);
|
| }
|
|
|
| - return TeeReadableStream(this);
|
| + return ReadableStreamTee(this);
|
| }
|
| }
|
|
|
| - class ReadableStreamController {
|
| - constructor(stream) {
|
| + class ReadableStreamDefaultController {
|
| + constructor(stream, underlyingSource, size, highWaterMark, isExternallyControlled) {
|
| if (IsReadableStream(stream) === false) {
|
| throw new TypeError(errIllegalConstructor);
|
| }
|
| @@ -199,75 +205,145 @@
|
| throw new TypeError(errIllegalConstructor);
|
| }
|
|
|
| - this[readableStreamControllerControlledReadableStream] = stream;
|
| + this[readableStreamDefaultControllerControlledReadableStream] = stream;
|
| +
|
| + this[readableStreamDefaultControllerUnderlyingSource] = underlyingSource;
|
| +
|
| + this[readableStreamDefaultControllerQueue] = new v8.InternalPackedArray();
|
| + this[readableStreamDefaultControllerQueueSize] = 0;
|
| +
|
| + this[readableStreamDefaultControllerBits] = 0b0;
|
| + if (isExternallyControlled === true) {
|
| + this[readableStreamDefaultControllerBits] |= EXTERNALLY_CONTROLLED;
|
| + }
|
| +
|
| + const normalizedStrategy =
|
| + ValidateAndNormalizeQueuingStrategy(size, highWaterMark);
|
| + this[readableStreamDefaultControllerStrategySize] = normalizedStrategy.size;
|
| + this[readableStreamDefaultControllerStrategyHWM] = normalizedStrategy.highWaterMark;
|
| +
|
| + const controller = this;
|
| +
|
| + const startResult = CallOrNoop(
|
| + underlyingSource, 'start', this, 'underlyingSource.start');
|
| + thenPromise(Promise_resolve(startResult),
|
| + () => {
|
| + controller[readableStreamDefaultControllerBits] |= STARTED;
|
| + ReadableStreamDefaultControllerCallPullIfNeeded(controller);
|
| + },
|
| + r => {
|
| + if (ReadableStreamGetState(stream) === STATE_READABLE) {
|
| + ReadableStreamDefaultControllerError(controller, r);
|
| + }
|
| + });
|
| }
|
|
|
| get desiredSize() {
|
| - if (IsReadableStreamController(this) === false) {
|
| + if (IsReadableStreamDefaultController(this) === false) {
|
| throw new TypeError(errIllegalInvocation);
|
| }
|
|
|
| - return GetReadableStreamDesiredSize(
|
| - this[readableStreamControllerControlledReadableStream]);
|
| + return ReadableStreamDefaultControllerGetDesiredSize(this);
|
| }
|
|
|
| close() {
|
| - if (IsReadableStreamController(this) === false) {
|
| + if (IsReadableStreamDefaultController(this) === false) {
|
| throw new TypeError(errIllegalInvocation);
|
| }
|
|
|
| - const stream = this[readableStreamControllerControlledReadableStream];
|
| + const stream = this[readableStreamDefaultControllerControlledReadableStream];
|
|
|
| - if (stream[readableStreamBits] & CLOSE_REQUESTED) {
|
| + if (this[readableStreamDefaultControllerBits] & CLOSE_REQUESTED) {
|
| throw new TypeError(errCloseCloseRequestedStream);
|
| }
|
| - if (stream[readableStreamState] === STATE_ERRORED) {
|
| +
|
| + const state = ReadableStreamGetState(stream);
|
| + if (state === STATE_ERRORED) {
|
| throw new TypeError(errCloseErroredStream);
|
| }
|
| + if (state === STATE_CLOSED) {
|
| + throw new TypeError(errCloseClosedStream);
|
| + }
|
|
|
| - return CloseReadableStream(stream);
|
| + return ReadableStreamDefaultControllerClose(this);
|
| }
|
|
|
| enqueue(chunk) {
|
| - if (IsReadableStreamController(this) === false) {
|
| + if (IsReadableStreamDefaultController(this) === false) {
|
| throw new TypeError(errIllegalInvocation);
|
| }
|
|
|
| - const stream = this[readableStreamControllerControlledReadableStream];
|
| + const stream = this[readableStreamDefaultControllerControlledReadableStream];
|
|
|
| - if (stream[readableStreamState] === STATE_ERRORED) {
|
| - throw stream[readableStreamStoredError];
|
| + if (this[readableStreamDefaultControllerBits] & CLOSE_REQUESTED) {
|
| + throw new TypeError(errEnqueueCloseRequestedStream);
|
| }
|
|
|
| - if (stream[readableStreamBits] & CLOSE_REQUESTED) {
|
| - throw new TypeError(errEnqueueInCloseRequestedStream);
|
| + const state = ReadableStreamGetState(stream);
|
| + if (state === STATE_ERRORED) {
|
| + throw new TypeError(errEnqueueErroredStream);
|
| + }
|
| + if (state === STATE_CLOSED) {
|
| + throw new TypeError(errEnqueueClosedStream);
|
| }
|
|
|
| - return EnqueueInReadableStream(stream, chunk);
|
| + return ReadableStreamDefaultControllerEnqueue(this, chunk);
|
| }
|
|
|
| error(e) {
|
| - if (IsReadableStreamController(this) === false) {
|
| + if (IsReadableStreamDefaultController(this) === false) {
|
| throw new TypeError(errIllegalInvocation);
|
| }
|
|
|
| - const stream = this[readableStreamControllerControlledReadableStream];
|
| + const stream = this[readableStreamDefaultControllerControlledReadableStream];
|
|
|
| - const state = stream[readableStreamState];
|
| - if (state !== STATE_READABLE) {
|
| - if (state === STATE_ERRORED) {
|
| - throw new TypeError(errErrorErroredStream);
|
| - }
|
| - if (state === STATE_CLOSED) {
|
| - throw new TypeError(errErrorClosedStream);
|
| - }
|
| + const state = ReadableStreamGetState(stream);
|
| + if (state === STATE_ERRORED) {
|
| + throw new TypeError(errErrorErroredStream);
|
| + }
|
| + if (state === STATE_CLOSED) {
|
| + throw new TypeError(errErrorClosedStream);
|
| + }
|
| +
|
| + return ReadableStreamDefaultControllerError(this, e);
|
| + }
|
| + }
|
| +
|
| + function ReadableStreamDefaultControllerCancel(controller, reason) {
|
| + controller[readableStreamDefaultControllerQueue] = new v8.InternalPackedArray();
|
| +
|
| + const underlyingSource = controller[readableStreamDefaultControllerUnderlyingSource];
|
| + return PromiseCallOrNoop(underlyingSource, 'cancel', reason, 'underlyingSource.cancel');
|
| + }
|
| +
|
| + function ReadableStreamDefaultControllerPull(controller) {
|
| + const stream = controller[readableStreamDefaultControllerControlledReadableStream];
|
| +
|
| + if (controller[readableStreamDefaultControllerQueue].length > 0) {
|
| + const chunk = DequeueValue(controller);
|
| +
|
| + if ((controller[readableStreamDefaultControllerBits] & CLOSE_REQUESTED) &&
|
| + controller[readableStreamDefaultControllerQueue].length === 0) {
|
| + ReadableStreamClose(stream);
|
| + } else {
|
| + ReadableStreamDefaultControllerCallPullIfNeeded(controller);
|
| }
|
|
|
| - return ErrorReadableStream(stream, e);
|
| + return Promise_resolve(CreateIterResultObject(chunk, false));
|
| }
|
| +
|
| + const pendingPromise = ReadableStreamAddReadRequest(stream);
|
| + ReadableStreamDefaultControllerCallPullIfNeeded(controller);
|
| + return pendingPromise;
|
| + }
|
| +
|
| + function ReadableStreamAddReadRequest(stream) {
|
| + const promise = v8.createPromise();
|
| + stream[readableStreamReader][readableStreamDefaultReaderReadRequests].push(promise);
|
| + return promise;
|
| }
|
|
|
| - class ReadableStreamReader {
|
| + class ReadableStreamDefaultReader {
|
| constructor(stream) {
|
| if (IsReadableStream(stream) === false) {
|
| throw new TypeError(errReaderConstructorBadArgument);
|
| @@ -276,36 +352,13 @@
|
| throw new TypeError(errReaderConstructorStreamAlreadyLocked);
|
| }
|
|
|
| - // TODO(yhirano): Remove this when we don't need hasPendingActivity in
|
| - // blink::UnderlyingSourceBase.
|
| - if (stream[readableStreamController] === null) {
|
| - // The stream is created with an external controller (i.e. made in
|
| - // Blink).
|
| - const underlyingSource = stream[readableStreamUnderlyingSource];
|
| - callFunction(underlyingSource.notifyLockAcquired, underlyingSource);
|
| - }
|
| -
|
| - this[readableStreamReaderOwnerReadableStream] = stream;
|
| - stream[readableStreamReader] = this;
|
| -
|
| - this[readableStreamReaderReadRequests] = new v8.InternalPackedArray();
|
| + ReadableStreamReaderGenericInitialize(this, stream);
|
|
|
| - switch (stream[readableStreamState]) {
|
| - case STATE_READABLE:
|
| - this[readableStreamReaderClosedPromise] = v8.createPromise();
|
| - break;
|
| - case STATE_CLOSED:
|
| - this[readableStreamReaderClosedPromise] = Promise_resolve(undefined);
|
| - break;
|
| - case STATE_ERRORED:
|
| - this[readableStreamReaderClosedPromise] =
|
| - Promise_reject(stream[readableStreamStoredError]);
|
| - break;
|
| - }
|
| + this[readableStreamDefaultReaderReadRequests] = new v8.InternalPackedArray();
|
| }
|
|
|
| get closed() {
|
| - if (IsReadableStreamReader(this) === false) {
|
| + if (IsReadableStreamDefaultReader(this) === false) {
|
| return Promise_reject(new TypeError(errIllegalInvocation));
|
| }
|
|
|
| @@ -313,7 +366,7 @@
|
| }
|
|
|
| cancel(reason) {
|
| - if (IsReadableStreamReader(this) === false) {
|
| + if (IsReadableStreamDefaultReader(this) === false) {
|
| return Promise_reject(new TypeError(errIllegalInvocation));
|
| }
|
|
|
| @@ -322,11 +375,11 @@
|
| return Promise_reject(new TypeError(errCancelReleasedReader));
|
| }
|
|
|
| - return CancelReadableStream(stream, reason);
|
| + return ReadableStreamReaderGenericCancel(this, reason);
|
| }
|
|
|
| read() {
|
| - if (IsReadableStreamReader(this) === false) {
|
| + if (IsReadableStreamDefaultReader(this) === false) {
|
| return Promise_reject(new TypeError(errIllegalInvocation));
|
| }
|
|
|
| @@ -334,11 +387,11 @@
|
| return Promise_reject(new TypeError(errReadReleasedReader));
|
| }
|
|
|
| - return ReadFromReadableStreamReader(this);
|
| + return ReadableStreamDefaultReaderRead(this);
|
| }
|
|
|
| releaseLock() {
|
| - if (IsReadableStreamReader(this) === false) {
|
| + if (IsReadableStreamDefaultReader(this) === false) {
|
| throw new TypeError(errIllegalInvocation);
|
| }
|
|
|
| @@ -347,45 +400,30 @@
|
| return undefined;
|
| }
|
|
|
| - if (this[readableStreamReaderReadRequests].length > 0) {
|
| + if (this[readableStreamDefaultReaderReadRequests].length > 0) {
|
| throw new TypeError(errReleaseReaderWithPendingRead);
|
| }
|
|
|
| - // TODO(yhirano): Remove this when we don't need hasPendingActivity in
|
| - // blink::UnderlyingSourceBase.
|
| - if (stream[readableStreamController] === null) {
|
| - // The stream is created with an external controller (i.e. made in
|
| - // Blink).
|
| - const underlyingSource = stream[readableStreamUnderlyingSource];
|
| - callFunction(underlyingSource.notifyLockReleased, underlyingSource);
|
| - }
|
| -
|
| - if (stream[readableStreamState] === STATE_READABLE) {
|
| - v8.rejectPromise(this[readableStreamReaderClosedPromise],
|
| - new TypeError(errReleasedReaderClosedPromise));
|
| - } else {
|
| - this[readableStreamReaderClosedPromise] =
|
| - Promise_reject(new TypeError(errReleasedReaderClosedPromise));
|
| - }
|
| -
|
| - this[readableStreamReaderOwnerReadableStream][readableStreamReader] =
|
| - undefined;
|
| - this[readableStreamReaderOwnerReadableStream] = undefined;
|
| + ReadableStreamReaderGenericRelease(this);
|
| }
|
| }
|
|
|
| + function ReadableStreamReaderGenericCancel(reader, reason) {
|
| + return ReadableStreamCancel(reader[readableStreamReaderOwnerReadableStream], reason);
|
| + }
|
| +
|
| //
|
| // Readable stream abstract operations
|
| //
|
|
|
| - function AcquireReadableStreamReader(stream) {
|
| - return new ReadableStreamReader(stream);
|
| + function AcquireReadableStreamDefaultReader(stream) {
|
| + return new ReadableStreamDefaultReader(stream);
|
| }
|
|
|
| - function CancelReadableStream(stream, reason) {
|
| + function ReadableStreamCancel(stream, reason) {
|
| stream[readableStreamBits] |= DISTURBED;
|
|
|
| - const state = stream[readableStreamState];
|
| + const state = ReadableStreamGetState(stream);
|
| if (state === STATE_CLOSED) {
|
| return Promise_resolve(undefined);
|
| }
|
| @@ -393,112 +431,126 @@
|
| return Promise_reject(stream[readableStreamStoredError]);
|
| }
|
|
|
| - stream[readableStreamQueue] = new v8.InternalPackedArray();
|
| - FinishClosingReadableStream(stream);
|
| + ReadableStreamClose(stream);
|
|
|
| - const underlyingSource = stream[readableStreamUnderlyingSource];
|
| - const sourceCancelPromise = PromiseCallOrNoop(
|
| - underlyingSource, 'cancel', reason, 'underlyingSource.cancel');
|
| + const sourceCancelPromise = ReadableStreamDefaultControllerCancel(stream[readableStreamController], reason);
|
| return thenPromise(sourceCancelPromise, () => undefined);
|
| }
|
|
|
| - function CloseReadableStream(stream) {
|
| - if (stream[readableStreamState] === STATE_CLOSED) {
|
| - return undefined;
|
| - }
|
| + function ReadableStreamDefaultControllerClose(controller) {
|
| + const stream = controller[readableStreamDefaultControllerControlledReadableStream];
|
|
|
| - stream[readableStreamBits] |= CLOSE_REQUESTED;
|
| + controller[readableStreamDefaultControllerBits] |= CLOSE_REQUESTED;
|
|
|
| - if (stream[readableStreamQueue].length === 0) {
|
| - return FinishClosingReadableStream(stream);
|
| + if (controller[readableStreamDefaultControllerQueue].length === 0) {
|
| + ReadableStreamClose(stream);
|
| }
|
| }
|
|
|
| - function EnqueueInReadableStream(stream, chunk) {
|
| - if (stream[readableStreamState] === STATE_CLOSED) {
|
| - return undefined;
|
| - }
|
| + function ReadableStreamFulfillReadRequest(stream, chunk, done) {
|
| + const reader = stream[readableStreamReader];
|
| +
|
| + const readRequest =
|
| + stream[readableStreamReader][readableStreamDefaultReaderReadRequests]
|
| + .shift();
|
| + v8.resolvePromise(readRequest, CreateIterResultObject(chunk, done));
|
| + }
|
| +
|
| + function ReadableStreamDefaultControllerEnqueue(controller, chunk) {
|
| + const stream = controller[readableStreamDefaultControllerControlledReadableStream];
|
|
|
| - if (IsReadableStreamLocked(stream) === true &&
|
| - stream[readableStreamReader][readableStreamReaderReadRequests].length >
|
| - 0) {
|
| - const readRequest =
|
| - stream[readableStreamReader][readableStreamReaderReadRequests]
|
| - .shift();
|
| - v8.resolvePromise(readRequest, CreateIterResultObject(chunk, false));
|
| + if (IsReadableStreamLocked(stream) === true && ReadableStreamGetNumReadRequests(stream) > 0) {
|
| + ReadableStreamFulfillReadRequest(stream, chunk, false);
|
| } else {
|
| let chunkSize = 1;
|
|
|
| - const strategySize = stream[readableStreamStrategySize];
|
| + const strategySize = controller[readableStreamDefaultControllerStrategySize];
|
| if (strategySize !== undefined) {
|
| try {
|
| chunkSize = strategySize(chunk);
|
| } catch (chunkSizeE) {
|
| - if (stream[readableStreamState] === STATE_READABLE) {
|
| - ErrorReadableStream(stream, chunkSizeE);
|
| + if (ReadableStreamGetState(stream) === STATE_READABLE) {
|
| + ReadableStreamDefaultControllerError(controller, chunkSizeE);
|
| }
|
| throw chunkSizeE;
|
| }
|
| }
|
|
|
| try {
|
| - EnqueueValueWithSize(stream, chunk, chunkSize);
|
| + EnqueueValueWithSize(controller, chunk, chunkSize);
|
| } catch (enqueueE) {
|
| - if (stream[readableStreamState] === STATE_READABLE) {
|
| - ErrorReadableStream(stream, enqueueE);
|
| + if (ReadableStreamGetState(stream) === STATE_READABLE) {
|
| + ReadableStreamDefaultControllerError(controller, enqueueE);
|
| }
|
| throw enqueueE;
|
| }
|
| }
|
|
|
| - RequestReadableStreamPull(stream);
|
| + ReadableStreamDefaultControllerCallPullIfNeeded(controller);
|
| + }
|
| +
|
| + function ReadableStreamGetState(stream) {
|
| + return (stream[readableStreamBits] & STATE_MASK) >> STATE_BITS_OFFSET;
|
| + }
|
| +
|
| + function ReadableStreamSetState(stream, state) {
|
| + stream[readableStreamBits] = (stream[readableStreamBits] & ~STATE_MASK) |
|
| + (state << STATE_BITS_OFFSET);
|
| + }
|
| +
|
| + function ReadableStreamDefaultControllerError(controller, e) {
|
| + controller[readableStreamDefaultControllerQueue] = new v8.InternalPackedArray();
|
| + const stream = controller[readableStreamDefaultControllerControlledReadableStream];
|
| + ReadableStreamError(stream, e);
|
| }
|
|
|
| - function ErrorReadableStream(stream, e) {
|
| - stream[readableStreamQueue] = new v8.InternalPackedArray();
|
| + function ReadableStreamError(stream, e) {
|
| stream[readableStreamStoredError] = e;
|
| - stream[readableStreamState] = STATE_ERRORED;
|
| + ReadableStreamSetState(stream, STATE_ERRORED);
|
|
|
| const reader = stream[readableStreamReader];
|
| if (reader === undefined) {
|
| return undefined;
|
| }
|
|
|
| - const readRequests = reader[readableStreamReaderReadRequests];
|
| - for (let i = 0; i < readRequests.length; ++i) {
|
| - v8.rejectPromise(readRequests[i], e);
|
| + if (IsReadableStreamDefaultReader(reader) === true) {
|
| + const readRequests = reader[readableStreamDefaultReaderReadRequests];
|
| + for (let i = 0; i < readRequests.length; i++) {
|
| + v8.rejectPromise(readRequests[i], e);
|
| + }
|
| + reader[readableStreamDefaultReaderReadRequests] = new v8.InternalPackedArray();
|
| }
|
| - reader[readableStreamReaderReadRequests] = new v8.InternalPackedArray();
|
|
|
| v8.rejectPromise(reader[readableStreamReaderClosedPromise], e);
|
| }
|
|
|
| - function FinishClosingReadableStream(stream) {
|
| - stream[readableStreamState] = STATE_CLOSED;
|
| + function ReadableStreamClose(stream) {
|
| + ReadableStreamSetState(stream, STATE_CLOSED);
|
|
|
| const reader = stream[readableStreamReader];
|
| if (reader === undefined) {
|
| return undefined;
|
| }
|
|
|
| -
|
| - const readRequests = reader[readableStreamReaderReadRequests];
|
| - for (let i = 0; i < readRequests.length; ++i) {
|
| - v8.resolvePromise(
|
| - readRequests[i], CreateIterResultObject(undefined, true));
|
| + if (IsReadableStreamDefaultReader(reader) === true) {
|
| + const readRequests = reader[readableStreamDefaultReaderReadRequests];
|
| + for (let i = 0; i < readRequests.length; i++) {
|
| + v8.resolvePromise(
|
| + readRequests[i], CreateIterResultObject(undefined, true));
|
| + }
|
| + reader[readableStreamDefaultReaderReadRequests] = new v8.InternalPackedArray();
|
| }
|
| - reader[readableStreamReaderReadRequests] = new v8.InternalPackedArray();
|
|
|
| v8.resolvePromise(reader[readableStreamReaderClosedPromise], undefined);
|
| }
|
|
|
| - function GetReadableStreamDesiredSize(stream) {
|
| - const queueSize = GetTotalQueueSize(stream);
|
| - return stream[readableStreamStrategyHWM] - queueSize;
|
| + function ReadableStreamDefaultControllerGetDesiredSize(controller) {
|
| + const queueSize = GetTotalQueueSize(controller);
|
| + return controller[readableStreamDefaultControllerStrategyHWM] - queueSize;
|
| }
|
|
|
| function IsReadableStream(x) {
|
| - return hasOwnProperty(x, readableStreamUnderlyingSource);
|
| + return hasOwnProperty(x, readableStreamController);
|
| }
|
|
|
| function IsReadableStreamDisturbed(stream) {
|
| @@ -509,115 +561,145 @@
|
| return stream[readableStreamReader] !== undefined;
|
| }
|
|
|
| - function IsReadableStreamController(x) {
|
| - return hasOwnProperty(x, readableStreamControllerControlledReadableStream);
|
| + function IsReadableStreamDefaultController(x) {
|
| + return hasOwnProperty(x, readableStreamDefaultControllerControlledReadableStream);
|
| + }
|
| +
|
| + function IsReadableStreamDefaultReader(x) {
|
| + return hasOwnProperty(x, readableStreamDefaultReaderReadRequests);
|
| }
|
|
|
| function IsReadableStreamReadable(stream) {
|
| - return stream[readableStreamState] === STATE_READABLE;
|
| + return ReadableStreamGetState(stream) === STATE_READABLE;
|
| }
|
|
|
| function IsReadableStreamClosed(stream) {
|
| - return stream[readableStreamState] === STATE_CLOSED;
|
| + return ReadableStreamGetState(stream) === STATE_CLOSED;
|
| }
|
|
|
| function IsReadableStreamErrored(stream) {
|
| - return stream[readableStreamState] === STATE_ERRORED;
|
| + return ReadableStreamGetState(stream) === STATE_ERRORED;
|
| + }
|
| +
|
| + function ReadableStreamReaderGenericInitialize(reader, stream) {
|
| + // TODO(yhirano): Remove this when we don't need hasPendingActivity in
|
| + // blink::UnderlyingSourceBase.
|
| + const controller = stream[readableStreamController];
|
| + if (controller[readableStreamDefaultControllerBits] & EXTERNALLY_CONTROLLED) {
|
| + // The stream is created with an external controller (i.e. made in
|
| + // Blink).
|
| + const underlyingSource = controller[readableStreamDefaultControllerUnderlyingSource];
|
| + callFunction(underlyingSource.notifyLockAcquired, underlyingSource);
|
| + }
|
| +
|
| + reader[readableStreamReaderOwnerReadableStream] = stream;
|
| + stream[readableStreamReader] = reader;
|
| +
|
| + switch (ReadableStreamGetState(stream)) {
|
| + case STATE_READABLE:
|
| + reader[readableStreamReaderClosedPromise] = v8.createPromise();
|
| + break;
|
| + case STATE_CLOSED:
|
| + reader[readableStreamReaderClosedPromise] = Promise_resolve(undefined);
|
| + break;
|
| + case STATE_ERRORED:
|
| + reader[readableStreamReaderClosedPromise] =
|
| + Promise_reject(stream[readableStreamStoredError]);
|
| + break;
|
| + }
|
| }
|
|
|
| - function IsReadableStreamReader(x) {
|
| - return hasOwnProperty(x, readableStreamReaderOwnerReadableStream);
|
| + function ReadableStreamReaderGenericRelease(reader) {
|
| + // TODO(yhirano): Remove this when we don't need hasPendingActivity in
|
| + // blink::UnderlyingSourceBase.
|
| + const controller = reader[readableStreamReaderOwnerReadableStream][readableStreamController];
|
| + if (controller[readableStreamDefaultControllerBits] & EXTERNALLY_CONTROLLED) {
|
| + // The stream is created with an external controller (i.e. made in
|
| + // Blink).
|
| + const underlyingSource = controller[readableStreamDefaultControllerUnderlyingSource];
|
| + callFunction(underlyingSource.notifyLockReleased, underlyingSource);
|
| + }
|
| +
|
| + if (ReadableStreamGetState(reader[readableStreamReaderOwnerReadableStream]) === STATE_READABLE) {
|
| + v8.rejectPromise(reader[readableStreamReaderClosedPromise], new TypeError(errReleasedReaderClosedPromise));
|
| + } else {
|
| + reader[readableStreamReaderClosedPromise] = Promise_reject(new TypeError(errReleasedReaderClosedPromise));
|
| + }
|
| +
|
| + reader[readableStreamReaderOwnerReadableStream][readableStreamReader] =
|
| + undefined;
|
| + reader[readableStreamReaderOwnerReadableStream] = undefined;
|
| }
|
|
|
| - function ReadFromReadableStreamReader(reader) {
|
| + function ReadableStreamDefaultReaderRead(reader) {
|
| const stream = reader[readableStreamReaderOwnerReadableStream];
|
| stream[readableStreamBits] |= DISTURBED;
|
|
|
| - if (stream[readableStreamState] === STATE_CLOSED) {
|
| + if (ReadableStreamGetState(stream) === STATE_CLOSED) {
|
| return Promise_resolve(CreateIterResultObject(undefined, true));
|
| }
|
|
|
| - if (stream[readableStreamState] === STATE_ERRORED) {
|
| + if (ReadableStreamGetState(stream) === STATE_ERRORED) {
|
| return Promise_reject(stream[readableStreamStoredError]);
|
| }
|
|
|
| - const queue = stream[readableStreamQueue];
|
| - if (queue.length > 0) {
|
| - const chunk = DequeueValue(stream);
|
| -
|
| - if (stream[readableStreamBits] & CLOSE_REQUESTED && queue.length === 0) {
|
| - FinishClosingReadableStream(stream);
|
| - } else {
|
| - RequestReadableStreamPull(stream);
|
| - }
|
| -
|
| - return Promise_resolve(CreateIterResultObject(chunk, false));
|
| - } else {
|
| - const readRequest = v8.createPromise();
|
| -
|
| - reader[readableStreamReaderReadRequests].push(readRequest);
|
| - RequestReadableStreamPull(stream);
|
| - return readRequest;
|
| - }
|
| + return ReadableStreamDefaultControllerPull(stream[readableStreamController]);
|
| }
|
|
|
| - function RequestReadableStreamPull(stream) {
|
| - const shouldPull = ShouldReadableStreamPull(stream);
|
| + function ReadableStreamDefaultControllerCallPullIfNeeded(controller) {
|
| + const shouldPull = ReadableStreamDefaultControllerShouldCallPull(controller);
|
| if (shouldPull === false) {
|
| return undefined;
|
| }
|
|
|
| - if (stream[readableStreamBits] & PULLING) {
|
| - stream[readableStreamBits] |= PULL_AGAIN;
|
| + if (controller[readableStreamDefaultControllerBits] & PULLING) {
|
| + controller[readableStreamDefaultControllerBits] |= PULL_AGAIN;
|
| return undefined;
|
| }
|
|
|
| - stream[readableStreamBits] |= PULLING;
|
| + controller[readableStreamDefaultControllerBits] |= PULLING;
|
|
|
| - const underlyingSource = stream[readableStreamUnderlyingSource];
|
| - const controller = stream[readableStreamController];
|
| + const underlyingSource = controller[readableStreamDefaultControllerUnderlyingSource];
|
| const pullPromise = PromiseCallOrNoop(
|
| underlyingSource, 'pull', controller, 'underlyingSource.pull');
|
|
|
| thenPromise(pullPromise,
|
| () => {
|
| - stream[readableStreamBits] &= ~PULLING;
|
| + controller[readableStreamDefaultControllerBits] &= ~PULLING;
|
|
|
| - if (stream[readableStreamBits] & PULL_AGAIN) {
|
| - stream[readableStreamBits] &= ~PULL_AGAIN;
|
| - return RequestReadableStreamPull(stream);
|
| + if (controller[readableStreamDefaultControllerBits] & PULL_AGAIN) {
|
| + controller[readableStreamDefaultControllerBits] &= ~PULL_AGAIN;
|
| + ReadableStreamDefaultControllerCallPullIfNeeded(controller);
|
| }
|
| },
|
| e => {
|
| - if (stream[readableStreamState] === STATE_READABLE) {
|
| - return ErrorReadableStream(stream, e);
|
| + if (ReadableStreamGetState(controller[readableStreamDefaultControllerControlledReadableStream]) === STATE_READABLE) {
|
| + ReadableStreamDefaultControllerError(controller, e);
|
| }
|
| });
|
| }
|
|
|
| - function ShouldReadableStreamPull(stream) {
|
| - const state = stream[readableStreamState];
|
| + function ReadableStreamDefaultControllerShouldCallPull(controller) {
|
| + const stream = controller[readableStreamDefaultControllerControlledReadableStream];
|
| +
|
| + const state = ReadableStreamGetState(stream);
|
| if (state === STATE_CLOSED || state === STATE_ERRORED) {
|
| return false;
|
| }
|
|
|
| - if (stream[readableStreamBits] & CLOSE_REQUESTED) {
|
| + if (controller[readableStreamDefaultControllerBits] & CLOSE_REQUESTED) {
|
| return false;
|
| }
|
|
|
| - if (!(stream[readableStreamBits] & STARTED)) {
|
| + if (!(controller[readableStreamDefaultControllerBits] & STARTED)) {
|
| return false;
|
| }
|
|
|
| - if (IsReadableStreamLocked(stream) === true) {
|
| - const reader = stream[readableStreamReader];
|
| - const readRequests = reader[readableStreamReaderReadRequests];
|
| - if (readRequests.length > 0) {
|
| - return true;
|
| - }
|
| + if (IsReadableStreamLocked(stream) === true && ReadableStreamGetNumReadRequests(stream) > 0) {
|
| + return true;
|
| }
|
|
|
| - const desiredSize = GetReadableStreamDesiredSize(stream);
|
| + const desiredSize = ReadableStreamDefaultControllerGetDesiredSize(controller);
|
| if (desiredSize > 0) {
|
| return true;
|
| }
|
| @@ -625,13 +707,19 @@
|
| return false;
|
| }
|
|
|
| + function ReadableStreamGetNumReadRequests(stream) {
|
| + const reader = stream[readableStreamReader];
|
| + const readRequests = reader[readableStreamDefaultReaderReadRequests];
|
| + return readRequests.length;
|
| + }
|
| +
|
| // Potential future optimization: use class instances for the underlying
|
| // sources, so that we don't re-create
|
| // closures every time.
|
|
|
| // TODO(domenic): shouldClone argument from spec not supported yet
|
| - function TeeReadableStream(stream) {
|
| - const reader = AcquireReadableStreamReader(stream);
|
| + function ReadableStreamTee(stream) {
|
| + const reader = AcquireReadableStreamDefaultReader(stream);
|
|
|
| let closedOrErrored = false;
|
| let canceled1 = false;
|
| @@ -640,9 +728,12 @@
|
| let reason2;
|
| let promise = v8.createPromise();
|
|
|
| - const branch1 = new ReadableStream({pull, cancel: cancel1});
|
| + const branch1Stream = new ReadableStream({pull, cancel: cancel1});
|
| +
|
| + const branch2Stream = new ReadableStream({pull, cancel: cancel2});
|
|
|
| - const branch2 = new ReadableStream({pull, cancel: cancel2});
|
| + const branch1 = branch1Stream[readableStreamController];
|
| + const branch2 = branch2Stream[readableStreamController];
|
|
|
| thenPromise(
|
| reader[readableStreamReaderClosedPromise], undefined, function(r) {
|
| @@ -650,23 +741,26 @@
|
| return;
|
| }
|
|
|
| - ErrorReadableStream(branch1, r);
|
| - ErrorReadableStream(branch2, r);
|
| + ReadableStreamDefaultControllerError(branch1, r);
|
| + ReadableStreamDefaultControllerError(branch2, r);
|
| closedOrErrored = true;
|
| });
|
|
|
| - return [branch1, branch2];
|
| -
|
| + return [branch1Stream, branch2Stream];
|
|
|
| function pull() {
|
| return thenPromise(
|
| - ReadFromReadableStreamReader(reader), function(result) {
|
| + ReadableStreamDefaultReaderRead(reader), function(result) {
|
| const value = result.value;
|
| const done = result.done;
|
|
|
| if (done === true && closedOrErrored === false) {
|
| - CloseReadableStream(branch1);
|
| - CloseReadableStream(branch2);
|
| + if (canceled1 === false) {
|
| + ReadableStreamDefaultControllerClose(branch1);
|
| + }
|
| + if (canceled2 === false) {
|
| + ReadableStreamDefaultControllerClose(branch2);
|
| + }
|
| closedOrErrored = true;
|
| }
|
|
|
| @@ -675,11 +769,11 @@
|
| }
|
|
|
| if (canceled1 === false) {
|
| - EnqueueInReadableStream(branch1, value);
|
| + ReadableStreamDefaultControllerEnqueue(branch1, value);
|
| }
|
|
|
| if (canceled2 === false) {
|
| - EnqueueInReadableStream(branch2, value);
|
| + ReadableStreamDefaultControllerEnqueue(branch2, value);
|
| }
|
| });
|
| }
|
| @@ -690,7 +784,7 @@
|
|
|
| if (canceled2 === true) {
|
| const compositeReason = [reason1, reason2];
|
| - const cancelResult = CancelReadableStream(stream, compositeReason);
|
| + const cancelResult = ReadableStreamCancel(stream, compositeReason);
|
| v8.resolvePromise(promise, cancelResult);
|
| }
|
|
|
| @@ -703,7 +797,7 @@
|
|
|
| if (canceled1 === true) {
|
| const compositeReason = [reason1, reason2];
|
| - const cancelResult = CancelReadableStream(stream, compositeReason);
|
| + const cancelResult = ReadableStreamCancel(stream, compositeReason);
|
| v8.resolvePromise(promise, cancelResult);
|
| }
|
|
|
| @@ -717,23 +811,23 @@
|
| // can modify the queue size alongside.
|
| //
|
|
|
| - function DequeueValue(stream) {
|
| - const result = stream[readableStreamQueue].shift();
|
| - stream[readableStreamQueueSize] -= result.size;
|
| + function DequeueValue(controller) {
|
| + const result = controller[readableStreamDefaultControllerQueue].shift();
|
| + controller[readableStreamDefaultControllerQueueSize] -= result.size;
|
| return result.value;
|
| }
|
|
|
| - function EnqueueValueWithSize(stream, value, size) {
|
| + function EnqueueValueWithSize(controller, value, size) {
|
| size = Number(size);
|
| if (Number_isNaN(size) || size === +Infinity || size < 0) {
|
| throw new RangeError(errInvalidSize);
|
| }
|
|
|
| - stream[readableStreamQueueSize] += size;
|
| - stream[readableStreamQueue].push({value, size});
|
| + controller[readableStreamDefaultControllerQueueSize] += size;
|
| + controller[readableStreamDefaultControllerQueue].push({value, size});
|
| }
|
|
|
| - function GetTotalQueueSize(stream) { return stream[readableStreamQueueSize]; }
|
| + function GetTotalQueueSize(controller) { return controller[readableStreamDefaultControllerQueueSize]; }
|
|
|
| //
|
| // Other helpers
|
| @@ -783,7 +877,7 @@
|
| }
|
|
|
| if (typeof method !== 'function') {
|
| - return Promise_reject(errTmplMustBeFunctionOrUndefined(nameForError));
|
| + return Promise_reject(new TypeError(errTmplMustBeFunctionOrUndefined(nameForError)));
|
| }
|
|
|
| try {
|
| @@ -811,20 +905,20 @@
|
| // Exports to Blink
|
| //
|
|
|
| - binding.AcquireReadableStreamReader = AcquireReadableStreamReader;
|
| + binding.AcquireReadableStreamDefaultReader = AcquireReadableStreamDefaultReader;
|
| binding.IsReadableStream = IsReadableStream;
|
| binding.IsReadableStreamDisturbed = IsReadableStreamDisturbed;
|
| binding.IsReadableStreamLocked = IsReadableStreamLocked;
|
| binding.IsReadableStreamReadable = IsReadableStreamReadable;
|
| binding.IsReadableStreamClosed = IsReadableStreamClosed;
|
| binding.IsReadableStreamErrored = IsReadableStreamErrored;
|
| - binding.IsReadableStreamReader = IsReadableStreamReader;
|
| - binding.ReadFromReadableStreamReader = ReadFromReadableStreamReader;
|
| + binding.IsReadableStreamDefaultReader = IsReadableStreamDefaultReader;
|
| + binding.ReadableStreamDefaultReaderRead = ReadableStreamDefaultReaderRead;
|
|
|
| - binding.CloseReadableStream = CloseReadableStream;
|
| - binding.GetReadableStreamDesiredSize = GetReadableStreamDesiredSize;
|
| - binding.EnqueueInReadableStream = EnqueueInReadableStream;
|
| - binding.ErrorReadableStream = ErrorReadableStream;
|
| + binding.ReadableStreamDefaultControllerClose = ReadableStreamDefaultControllerClose;
|
| + binding.ReadableStreamDefaultControllerGetDesiredSize = ReadableStreamDefaultControllerGetDesiredSize;
|
| + binding.ReadableStreamDefaultControllerEnqueue = ReadableStreamDefaultControllerEnqueue;
|
| + binding.ReadableStreamDefaultControllerError = ReadableStreamDefaultControllerError;
|
|
|
| binding.createReadableStreamWithExternalController =
|
| (underlyingSource, strategy) => {
|
|
|