Chromium Code Reviews| Index: third_party/WebKit/Source/core/streams/ReadableStream.js |
| diff --git a/third_party/WebKit/Source/core/streams/ReadableStream.js b/third_party/WebKit/Source/core/streams/ReadableStream.js |
| index 46eef5435553303d0e47641cef816609637f1980..1e6303646608fd40c891730dc9a6184b289130d3 100644 |
| --- a/third_party/WebKit/Source/core/streams/ReadableStream.js |
| +++ b/third_party/WebKit/Source/core/streams/ReadableStream.js |
| @@ -5,26 +5,17 @@ |
| (function(global, binding, v8) { |
| 'use strict'; |
| - const readableStreamController = v8.createPrivateSymbol('[[controller]]'); |
| - const readableStreamQueue = v8.createPrivateSymbol('[[queue]]'); |
| - const readableStreamQueueSize = |
| - v8.createPrivateSymbol('[[queue]] total size'); |
| const readableStreamReader = v8.createPrivateSymbol('[[reader]]'); |
| const readableStreamState = v8.createPrivateSymbol('[[state]]'); |
| const readableStreamStoredError = v8.createPrivateSymbol('[[storedError]]'); |
| - const readableStreamStrategySize = v8.createPrivateSymbol('[[strategySize]]'); |
| - const readableStreamStrategyHWM = v8.createPrivateSymbol('[[strategyHWM]]'); |
| - const readableStreamUnderlyingSource = |
| - v8.createPrivateSymbol('[[underlyingSource]]'); |
| - |
| - const readableStreamControllerControlledReadableStream = |
| - v8.createPrivateSymbol('[[controlledReadableStream]]'); |
| + const readableStreamController = v8.createPrivateSymbol('[[controller]]'); |
| const readableStreamReaderClosedPromise = |
| v8.createPrivateSymbol('[[closedPromise]]'); |
| const readableStreamReaderOwnerReadableStream = |
| v8.createPrivateSymbol('[[ownerReadableStream]]'); |
| - const readableStreamReaderReadRequests = |
| + |
| + const readableStreamDefaultReaderReadRequests = |
| v8.createPrivateSymbol('[[readRequests]]'); |
| const createWithExternalControllerSentinel = |
| @@ -34,13 +25,32 @@ |
| const STATE_CLOSED = 1; |
| const STATE_ERRORED = 2; |
| - const readableStreamBits = v8.createPrivateSymbol( |
| - 'bit field for [[started]], [[closeRequested]], [[pulling]], [[pullAgain]], [[disturbed]]'); |
| + const readableStreamBits = v8.createPrivateSymbol('bit field for [[disturbed]]'); |
|
domenic
2016/04/27 21:12:23
It's a bit unfortunate that we need a whole new 64
tyoshino (SeeGerritForStatus)
2016/04/28 14:51:11
Oh, yes.
Combined with [[state]] instead.
|
| + const DISTURBED = 0b1; |
| + |
| + const readableStreamDefaultControllerUnderlyingSource = |
| + v8.createPrivateSymbol('[[underlyingSource]]'); |
| + const readableStreamDefaultControllerControlledReadableStream = |
| + v8.createPrivateSymbol('[[controlledReadableStream]]'); |
| + const readableStreamDefaultControllerQueue = v8.createPrivateSymbol('[[queue]]'); |
| + const readableStreamDefaultControllerQueueSize = |
| + v8.createPrivateSymbol('[[queue]] total size'); |
| + const readableStreamDefaultControllerStrategySize = |
| + v8.createPrivateSymbol('[[strategySize]]'); |
| + const readableStreamDefaultControllerStrategyHWM = |
| + v8.createPrivateSymbol('[[strategyHWM]]'); |
| + |
| + const readableStreamDefaultControllerBits = v8.createPrivateSymbol( |
| + 'bit field for [[started]], [[closeRequested]], [[pulling]], [[pullAgain]]'); |
| const STARTED = 0b1; |
| const CLOSE_REQUESTED = 0b10; |
| const PULLING = 0b100; |
| const PULL_AGAIN = 0b1000; |
| - const DISTURBED = 0b10000; |
| + const EXTERNALLY_CONTROLLED = 0b10000; |
| + |
| + const readableStreamControllerCancel = |
| + v8.createPrivateSymbol('[[InternalCancel]]'); |
| + const readableStreamControllerPull = v8.createPrivateSymbol('[[InternalPull]]'); |
| const undefined = global.undefined; |
| const Infinity = global.Infinity; |
| @@ -65,7 +75,7 @@ |
| const errIllegalConstructor = 'Illegal constructor'; |
| const errCancelLockedStream = |
| 'Cannot cancel a readable stream that is locked to a reader'; |
| - const errEnqueueInCloseRequestedStream = |
| + const errEnqueueCloseRequestedStream = |
| 'Cannot enqueue a chunk into a readable stream that is closed or has been requested to be closed'; |
| const errCancelReleasedReader = |
| 'This readable stream reader has been released and cannot be used to cancel its previous owner stream'; |
| @@ -73,10 +83,15 @@ |
| 'This readable stream reader has been released and cannot be used to read from its previous owner stream'; |
| const errCloseCloseRequestedStream = |
| 'Cannot close a readable stream that has already been requested to be closed'; |
| + const errEnqueueClosedStream = 'Cannot enqueue a closed readable stream'; |
|
domenic
2016/04/27 21:12:24
"Cannot enqueue a chunk into a closed readable str
tyoshino (SeeGerritForStatus)
2016/04/28 14:51:11
Done.
|
| + const errEnqueueErroredStream = 'Cannot enqueue an errored readable stream'; |
|
domenic
2016/04/27 21:12:24
"Cannot enqueue a chunk into an errored readable s
tyoshino (SeeGerritForStatus)
2016/04/28 14:51:11
Done.
|
| + const errCloseClosedStream = 'Cannot close a closed readable stream'; |
| const errCloseErroredStream = 'Cannot close an errored readable stream'; |
| const errErrorClosedStream = 'Cannot error a close readable stream'; |
| const errErrorErroredStream = |
| 'Cannot error a readable stream that is already errored'; |
| + const errGetReaderNotByteStream = 'Cannot get a ReadableStreamBYOBReader for a stream not constructed with a byte source'; |
|
domenic
2016/04/27 21:12:24
"This readable stream does not support BYOB reader
tyoshino (SeeGerritForStatus)
2016/04/28 14:51:11
Done.
|
| + const errGetReaderBadMode = 'Invalid mode is specified'; |
|
domenic
2016/04/27 21:12:24
'Invalid reader mode given: expected undefined or
tyoshino (SeeGerritForStatus)
2016/04/28 14:51:11
Done.
|
| const errReaderConstructorBadArgument = |
| 'ReadableStreamReader constructor argument is not a readable stream'; |
| const errReaderConstructorStreamAlreadyLocked = |
| @@ -106,50 +121,28 @@ |
| highWaterMark = 1; |
| } |
| - const normalizedStrategy = |
| - ValidateAndNormalizeQueuingStrategy(size, highWaterMark); |
| - |
| - this[readableStreamUnderlyingSource] = underlyingSource; |
| - |
| - this[readableStreamQueue] = new v8.InternalPackedArray(); |
| - this[readableStreamQueueSize] = 0; |
| - |
| this[readableStreamState] = STATE_READABLE; |
| this[readableStreamBits] = 0b0; |
| this[readableStreamReader] = undefined; |
| this[readableStreamStoredError] = undefined; |
| - this[readableStreamStrategySize] = normalizedStrategy.size; |
| - this[readableStreamStrategyHWM] = normalizedStrategy.highWaterMark; |
| - |
| // Avoid allocating the controller if the stream is going to be controlled |
| // externally (i.e. from C++) anyway. All calls to underlyingSource |
| // methods will disregard their controller argument in such situations |
| // (but see below). |
| - const isControlledExternally = |
| - arguments[2] === createWithExternalControllerSentinel; |
| - const controller = |
| - isControlledExternally ? null : new ReadableStreamController(this); |
| - this[readableStreamController] = controller; |
| + this[readableStreamController] = undefined; |
| - // We need to pass ourself to the underlyingSource start method for |
| - // externally-controlled streams. We use the now-useless controller |
| - // argument to do so. |
| - const argToStart = isControlledExternally ? this : controller; |
| + const type = underlyingSource['type']; |
|
domenic
2016/04/27 21:12:23
Nit: use .type notation
tyoshino (SeeGerritForStatus)
2016/04/28 14:51:11
Done.
|
| + const typeString = String(type); |
| + if (typeString === 'bytes') { |
| + throw new RangeError('bytes type is not yet implemented'); |
| + } else if (type !== undefined) { |
| + throw new RangeError('Invalid type is specified'); |
| + } |
| - const startResult = CallOrNoop( |
| - underlyingSource, 'start', argToStart, 'underlyingSource.start'); |
| - thenPromise(Promise_resolve(startResult), |
| - () => { |
| - this[readableStreamBits] |= STARTED; |
| - RequestReadableStreamPull(this); |
| - }, |
| - r => { |
| - if (this[readableStreamState] === STATE_READABLE) { |
| - return ErrorReadableStream(this, r); |
| - } |
| - }); |
| + this[readableStreamController] = |
| + new ReadableStreamDefaultController(this, underlyingSource, size, highWaterMark, arguments[2] === createWithExternalControllerSentinel); |
| } |
| get locked() { |
| @@ -169,15 +162,30 @@ |
| return Promise_reject(new TypeError(errCancelLockedStream)); |
| } |
| - return CancelReadableStream(this, reason); |
| + return ReadableStreamCancel(this, reason); |
| } |
| getReader() { |
|
domenic
2016/04/27 21:12:23
In recent versions of Chrome we can do this as `ge
tyoshino (SeeGerritForStatus)
2016/04/28 14:51:11
Done.
|
| + const options = arguments[0] === undefined ? {} : arguments[0]; |
| + |
| if (IsReadableStream(this) === false) { |
| throw new TypeError(errIllegalInvocation); |
| } |
| - return AcquireReadableStreamReader(this); |
| + const mode = options.mode; |
| + if (mode === 'byob') { |
| + if (IsReadableByteStreamDefaultController(this[readableStreamController]) === false) { |
| + throw new TypeError(errGetReaderNotByteStream); |
| + } |
| + |
| + return AcquireReadableStreamBYOBReader(this); |
| + } |
| + |
| + if (mode === undefined) { |
| + return AcquireReadableStreamDefaultReader(this); |
| + } |
| + |
| + throw new RangeError(errGetReaderBadMode);; |
| } |
| tee() { |
| @@ -185,12 +193,12 @@ |
| throw new TypeError(errIllegalInvocation); |
| } |
| - return TeeReadableStream(this); |
| + return ReadableStreamTee(this); |
| } |
| } |
| - class ReadableStreamController { |
| - constructor(stream) { |
| + class ReadableStreamDefaultController { |
| + constructor(stream, underlyingSource, size, highWaterMark, isExternallyControlled) { |
| if (IsReadableStream(stream) === false) { |
| throw new TypeError(errIllegalConstructor); |
| } |
| @@ -199,75 +207,144 @@ |
| throw new TypeError(errIllegalConstructor); |
| } |
| - this[readableStreamControllerControlledReadableStream] = stream; |
| + this[readableStreamDefaultControllerControlledReadableStream] = stream; |
| + |
| + this[readableStreamDefaultControllerUnderlyingSource] = underlyingSource; |
| + |
| + this[readableStreamDefaultControllerQueue] = new v8.InternalPackedArray(); |
| + this[readableStreamDefaultControllerQueueSize] = 0; |
| + |
| + this[readableStreamDefaultControllerBits] = 0b0; |
| + if (isExternallyControlled === true) { |
| + this[readableStreamDefaultControllerBits] |= EXTERNALLY_CONTROLLED; |
| + } |
| + |
| + const normalizedStrategy = |
| + ValidateAndNormalizeQueuingStrategy(size, highWaterMark); |
| + this[readableStreamDefaultControllerStrategySize] = normalizedStrategy.size; |
| + this[readableStreamDefaultControllerStrategyHWM] = normalizedStrategy.highWaterMark; |
| + |
| + const controller = this; |
| + |
| + const startResult = CallOrNoop( |
| + underlyingSource, 'start', this, 'underlyingSource.start'); |
| + thenPromise(Promise_resolve(startResult), |
| + () => { |
| + controller[readableStreamDefaultControllerBits] |= STARTED; |
| + ReadableStreamDefaultControllerCallPullIfNeeded(controller); |
| + }, |
| + r => { |
| + if (stream[readableStreamState] === STATE_READABLE) { |
| + return ReadableStreamDefaultControllerError(controller, r); |
|
domenic
2016/04/27 21:12:24
Nit: no need for return here
tyoshino (SeeGerritForStatus)
2016/04/28 14:51:11
Done.
|
| + } |
| + }); |
| } |
| get desiredSize() { |
| - if (IsReadableStreamController(this) === false) { |
| + if (IsReadableStreamDefaultController(this) === false) { |
| throw new TypeError(errIllegalInvocation); |
| } |
| - return GetReadableStreamDesiredSize( |
| - this[readableStreamControllerControlledReadableStream]); |
| + return ReadableStreamDefaultControllerGetDesiredSize(this); |
| } |
| close() { |
| - if (IsReadableStreamController(this) === false) { |
| + if (IsReadableStreamDefaultController(this) === false) { |
| throw new TypeError(errIllegalInvocation); |
| } |
| - const stream = this[readableStreamControllerControlledReadableStream]; |
| + const stream = this[readableStreamDefaultControllerControlledReadableStream]; |
| - if (stream[readableStreamBits] & CLOSE_REQUESTED) { |
| + if (this[readableStreamDefaultControllerBits] & CLOSE_REQUESTED) { |
| throw new TypeError(errCloseCloseRequestedStream); |
| } |
| - if (stream[readableStreamState] === STATE_ERRORED) { |
| + |
| + const state = stream[readableStreamState]; |
| + if (state === STATE_ERRORED) { |
| throw new TypeError(errCloseErroredStream); |
| } |
| + if (state === STATE_CLOSED) { |
| + throw new TypeError(errCloseClosedStream); |
| + } |
| - return CloseReadableStream(stream); |
| + return ReadableStreamDefaultControllerClose(this); |
| } |
| enqueue(chunk) { |
| - if (IsReadableStreamController(this) === false) { |
| + if (IsReadableStreamDefaultController(this) === false) { |
| throw new TypeError(errIllegalInvocation); |
| } |
| - const stream = this[readableStreamControllerControlledReadableStream]; |
| + const stream = this[readableStreamDefaultControllerControlledReadableStream]; |
| - if (stream[readableStreamState] === STATE_ERRORED) { |
| - throw stream[readableStreamStoredError]; |
| + if (this[readableStreamDefaultControllerBits] & CLOSE_REQUESTED) { |
| + throw new TypeError(errEnqueueCloseRequestedStream); |
| } |
| - if (stream[readableStreamBits] & CLOSE_REQUESTED) { |
| - throw new TypeError(errEnqueueInCloseRequestedStream); |
| + const state = stream[readableStreamState]; |
| + if (state === STATE_ERRORED) { |
| + throw new TypeError(errEnqueueErroredStream); |
| + } |
| + if (state === STATE_CLOSED) { |
| + throw new TypeError(errEnqueueClosedStream); |
| } |
| - return EnqueueInReadableStream(stream, chunk); |
| + return ReadableStreamDefaultControllerEnqueue(this, chunk); |
| } |
| error(e) { |
| - if (IsReadableStreamController(this) === false) { |
| + if (IsReadableStreamDefaultController(this) === false) { |
| throw new TypeError(errIllegalInvocation); |
| } |
| - const stream = this[readableStreamControllerControlledReadableStream]; |
| + const stream = this[readableStreamDefaultControllerControlledReadableStream]; |
| const state = stream[readableStreamState]; |
| - if (state !== STATE_READABLE) { |
| - if (state === STATE_ERRORED) { |
| - throw new TypeError(errErrorErroredStream); |
| - } |
| - if (state === STATE_CLOSED) { |
| - throw new TypeError(errErrorClosedStream); |
| - } |
| + if (state === STATE_ERRORED) { |
| + throw new TypeError(errErrorErroredStream); |
| + } |
| + if (state === STATE_CLOSED) { |
| + throw new TypeError(errErrorClosedStream); |
| } |
| - return ErrorReadableStream(stream, e); |
| + return ReadableStreamDefaultControllerError(this, e); |
| } |
| } |
| - class ReadableStreamReader { |
| + function ReadableStreamDefaultControllerCancel(controller, reason) { |
| + controller[readableStreamDefaultControllerQueue] = new v8.InternalPackedArray(); |
| + |
| + const underlyingSource = controller[readableStreamDefaultControllerUnderlyingSource]; |
| + return PromiseCallOrNoop(underlyingSource, 'cancel', reason, 'underlyingSource.cancel'); |
| + } |
| + |
| + function ReadableStreamDefaultControllerPull(controller) { |
| + const stream = controller[readableStreamDefaultControllerControlledReadableStream]; |
| + |
| + if (controller[readableStreamDefaultControllerQueue].length > 0) { |
| + const chunk = DequeueValue(controller); |
| + |
| + if (controller[readableStreamDefaultControllerBits] & CLOSE_REQUESTED && controller[readableStreamDefaultControllerQueue].length === 0) { |
| + ReadableStreamClose(stream); |
| + } else { |
| + ReadableStreamDefaultControllerCallPullIfNeeded(controller); |
| + } |
| + |
| + return Promise_resolve(CreateIterResultObject(chunk, false)); |
| + } |
| + |
| + const pendingPromise = ReadableStreamAddReadRequest(stream); |
| + ReadableStreamDefaultControllerCallPullIfNeeded(controller); |
| + return pendingPromise; |
| + } |
| + |
| + function ReadableStreamAddReadRequest(stream) { |
| + const promise = v8.createPromise(); |
| + stream[readableStreamReader][readableStreamDefaultReaderReadRequests].push(promise); |
| + return promise; |
| + } |
| + |
| + class ReadableStreamDefaultReader { |
| constructor(stream) { |
| if (IsReadableStream(stream) === false) { |
| throw new TypeError(errReaderConstructorBadArgument); |
| @@ -276,36 +353,13 @@ |
| throw new TypeError(errReaderConstructorStreamAlreadyLocked); |
| } |
| - // TODO(yhirano): Remove this when we don't need hasPendingActivity in |
| - // blink::UnderlyingSourceBase. |
| - if (stream[readableStreamController] === null) { |
| - // The stream is created with an external controller (i.e. made in |
| - // Blink). |
| - const underlyingSource = stream[readableStreamUnderlyingSource]; |
| - callFunction(underlyingSource.notifyLockAcquired, underlyingSource); |
| - } |
| + ReadableStreamReaderGenericInitialize(this, stream); |
| - this[readableStreamReaderOwnerReadableStream] = stream; |
| - stream[readableStreamReader] = this; |
| - |
| - this[readableStreamReaderReadRequests] = new v8.InternalPackedArray(); |
| - |
| - switch (stream[readableStreamState]) { |
| - case STATE_READABLE: |
| - this[readableStreamReaderClosedPromise] = v8.createPromise(); |
| - break; |
| - case STATE_CLOSED: |
| - this[readableStreamReaderClosedPromise] = Promise_resolve(undefined); |
| - break; |
| - case STATE_ERRORED: |
| - this[readableStreamReaderClosedPromise] = |
| - Promise_reject(stream[readableStreamStoredError]); |
| - break; |
| - } |
| + this[readableStreamDefaultReaderReadRequests] = new v8.InternalPackedArray(); |
| } |
| get closed() { |
| - if (IsReadableStreamReader(this) === false) { |
| + if (IsReadableStreamDefaultReader(this) === false) { |
| return Promise_reject(new TypeError(errIllegalInvocation)); |
| } |
| @@ -313,7 +367,7 @@ |
| } |
| cancel(reason) { |
| - if (IsReadableStreamReader(this) === false) { |
| + if (IsReadableStreamDefaultReader(this) === false) { |
| return Promise_reject(new TypeError(errIllegalInvocation)); |
| } |
| @@ -322,11 +376,11 @@ |
| return Promise_reject(new TypeError(errCancelReleasedReader)); |
| } |
| - return CancelReadableStream(stream, reason); |
| + return ReadableStreamReaderGenericCancel(this, reason); |
| } |
| read() { |
| - if (IsReadableStreamReader(this) === false) { |
| + if (IsReadableStreamDefaultReader(this) === false) { |
| return Promise_reject(new TypeError(errIllegalInvocation)); |
| } |
| @@ -334,11 +388,11 @@ |
| return Promise_reject(new TypeError(errReadReleasedReader)); |
| } |
| - return ReadFromReadableStreamReader(this); |
| + return ReadableStreamDefaultReaderRead(this); |
| } |
| releaseLock() { |
| - if (IsReadableStreamReader(this) === false) { |
| + if (IsReadableStreamDefaultReader(this) === false) { |
| throw new TypeError(errIllegalInvocation); |
| } |
| @@ -347,42 +401,27 @@ |
| return undefined; |
| } |
| - if (this[readableStreamReaderReadRequests].length > 0) { |
| + if (this[readableStreamDefaultReaderReadRequests].length > 0) { |
| throw new TypeError(errReleaseReaderWithPendingRead); |
| } |
| - // TODO(yhirano): Remove this when we don't need hasPendingActivity in |
| - // blink::UnderlyingSourceBase. |
| - if (stream[readableStreamController] === null) { |
| - // The stream is created with an external controller (i.e. made in |
| - // Blink). |
| - const underlyingSource = stream[readableStreamUnderlyingSource]; |
| - callFunction(underlyingSource.notifyLockReleased, underlyingSource); |
| - } |
| - |
| - if (stream[readableStreamState] === STATE_READABLE) { |
| - v8.rejectPromise(this[readableStreamReaderClosedPromise], |
| - new TypeError(errReleasedReaderClosedPromise)); |
| - } else { |
| - this[readableStreamReaderClosedPromise] = |
| - Promise_reject(new TypeError(errReleasedReaderClosedPromise)); |
| - } |
| - |
| - this[readableStreamReaderOwnerReadableStream][readableStreamReader] = |
| - undefined; |
| - this[readableStreamReaderOwnerReadableStream] = undefined; |
| + ReadableStreamReaderGenericRelease(this); |
| } |
| } |
| + function ReadableStreamReaderGenericCancel(reader, reason) { |
| + return ReadableStreamCancel(reader[readableStreamReaderOwnerReadableStream], reason); |
| + } |
| + |
| // |
| // Readable stream abstract operations |
| // |
| - function AcquireReadableStreamReader(stream) { |
| - return new ReadableStreamReader(stream); |
| + function AcquireReadableStreamDefaultReader(stream) { |
| + return new ReadableStreamDefaultReader(stream); |
| } |
| - function CancelReadableStream(stream, reason) { |
| + function ReadableStreamCancel(stream, reason) { |
| stream[readableStreamBits] |= DISTURBED; |
| const state = stream[readableStreamState]; |
| @@ -393,69 +432,71 @@ |
| return Promise_reject(stream[readableStreamStoredError]); |
| } |
| - stream[readableStreamQueue] = new v8.InternalPackedArray(); |
| - FinishClosingReadableStream(stream); |
| + ReadableStreamClose(stream); |
| - const underlyingSource = stream[readableStreamUnderlyingSource]; |
| - const sourceCancelPromise = PromiseCallOrNoop( |
| - underlyingSource, 'cancel', reason, 'underlyingSource.cancel'); |
| + const sourceCancelPromise = ReadableStreamDefaultControllerCancel(stream[readableStreamController], reason); |
| return thenPromise(sourceCancelPromise, () => undefined); |
| } |
| - function CloseReadableStream(stream) { |
| - if (stream[readableStreamState] === STATE_CLOSED) { |
| - return undefined; |
| - } |
| + function ReadableStreamDefaultControllerClose(controller) { |
| + const stream = controller[readableStreamDefaultControllerControlledReadableStream]; |
| - stream[readableStreamBits] |= CLOSE_REQUESTED; |
| + controller[readableStreamDefaultControllerBits] |= CLOSE_REQUESTED; |
| - if (stream[readableStreamQueue].length === 0) { |
| - return FinishClosingReadableStream(stream); |
| + if (controller[readableStreamDefaultControllerQueue].length === 0) { |
| + ReadableStreamClose(stream); |
| } |
| } |
| - function EnqueueInReadableStream(stream, chunk) { |
| - if (stream[readableStreamState] === STATE_CLOSED) { |
| - return undefined; |
| - } |
| + function ReadableStreamFulfillReadRequest(stream, chunk, done) { |
| + const reader = stream[readableStreamReader]; |
| + |
| + const readRequest = |
| + stream[readableStreamReader][readableStreamDefaultReaderReadRequests] |
| + .shift(); |
| + v8.resolvePromise(readRequest, CreateIterResultObject(chunk, done)); |
| + } |
| + |
| + function ReadableStreamDefaultControllerEnqueue(controller, chunk) { |
| + const stream = controller[readableStreamDefaultControllerControlledReadableStream]; |
| - if (IsReadableStreamLocked(stream) === true && |
| - stream[readableStreamReader][readableStreamReaderReadRequests].length > |
| - 0) { |
| - const readRequest = |
| - stream[readableStreamReader][readableStreamReaderReadRequests] |
| - .shift(); |
| - v8.resolvePromise(readRequest, CreateIterResultObject(chunk, false)); |
| + if (IsReadableStreamLocked(stream) === true && ReadableStreamGetNumReadRequests(stream) > 0) { |
| + ReadableStreamFulfillReadRequest(stream, chunk, false); |
| } else { |
| let chunkSize = 1; |
| - const strategySize = stream[readableStreamStrategySize]; |
| + const strategySize = controller[readableStreamDefaultControllerStrategySize]; |
| if (strategySize !== undefined) { |
| try { |
| chunkSize = strategySize(chunk); |
| } catch (chunkSizeE) { |
| if (stream[readableStreamState] === STATE_READABLE) { |
| - ErrorReadableStream(stream, chunkSizeE); |
| + ReadableStreamDefaultControllerError(controller, chunkSizeE); |
| } |
| throw chunkSizeE; |
| } |
| } |
| try { |
| - EnqueueValueWithSize(stream, chunk, chunkSize); |
| + EnqueueValueWithSize(controller, chunk, chunkSize); |
| } catch (enqueueE) { |
| if (stream[readableStreamState] === STATE_READABLE) { |
| - ErrorReadableStream(stream, enqueueE); |
| + ReadableStreamDefaultControllerError(controller, enqueueE); |
| } |
| throw enqueueE; |
| } |
| } |
| - RequestReadableStreamPull(stream); |
| + ReadableStreamDefaultControllerCallPullIfNeeded(controller); |
| } |
| - function ErrorReadableStream(stream, e) { |
| - stream[readableStreamQueue] = new v8.InternalPackedArray(); |
| + function ReadableStreamDefaultControllerError(controller, e) { |
| + controller[readableStreamDefaultControllerQueue] = new v8.InternalPackedArray(); |
| + const stream = controller[readableStreamDefaultControllerControlledReadableStream]; |
| + ReadableStreamError(stream, e); |
| + } |
| + |
| + function ReadableStreamError(stream, e) { |
| stream[readableStreamStoredError] = e; |
| stream[readableStreamState] = STATE_ERRORED; |
| @@ -464,16 +505,24 @@ |
| return undefined; |
| } |
| - const readRequests = reader[readableStreamReaderReadRequests]; |
| - for (let i = 0; i < readRequests.length; ++i) { |
| - v8.rejectPromise(readRequests[i], e); |
| + if (IsReadableStreamDefaultReader(reader) === true) { |
| + const readRequests = reader[readableStreamDefaultReaderReadRequests]; |
| + for (let i = 0; i < readRequests.length; ++i) { |
| + v8.rejectPromise(readRequests[i], e); |
| + } |
| + reader[readableStreamDefaultReaderReadRequests] = new v8.InternalPackedArray(); |
| + } else { |
| + const readIntoRequests = reader[readableStreamReaderReadIntoRequests]; |
| + for (let i = 0; i < readIntoRequests.length; ++i) { |
| + v8.rejectPromise(readIntoRequests[i], e); |
| + } |
| + reader[readableStreamReaderReadIntoRequests] = new v8.InternalPackedArray(); |
| } |
| - reader[readableStreamReaderReadRequests] = new v8.InternalPackedArray(); |
| v8.rejectPromise(reader[readableStreamReaderClosedPromise], e); |
| } |
| - function FinishClosingReadableStream(stream) { |
| + function ReadableStreamClose(stream) { |
| stream[readableStreamState] = STATE_CLOSED; |
| const reader = stream[readableStreamReader]; |
| @@ -481,24 +530,25 @@ |
| return undefined; |
| } |
| - |
| - const readRequests = reader[readableStreamReaderReadRequests]; |
| - for (let i = 0; i < readRequests.length; ++i) { |
| - v8.resolvePromise( |
| - readRequests[i], CreateIterResultObject(undefined, true)); |
| + if (IsReadableStreamDefaultReader(reader) === true) { |
| + const readRequests = reader[readableStreamDefaultReaderReadRequests]; |
| + for (let i = 0; i < readRequests.length; ++i) { |
| + v8.resolvePromise( |
| + readRequests[i], CreateIterResultObject(undefined, true)); |
| + } |
| + reader[readableStreamDefaultReaderReadRequests] = new v8.InternalPackedArray(); |
| } |
| - reader[readableStreamReaderReadRequests] = new v8.InternalPackedArray(); |
| v8.resolvePromise(reader[readableStreamReaderClosedPromise], undefined); |
| } |
| - function GetReadableStreamDesiredSize(stream) { |
| - const queueSize = GetTotalQueueSize(stream); |
| - return stream[readableStreamStrategyHWM] - queueSize; |
| + function ReadableStreamDefaultControllerGetDesiredSize(controller) { |
| + const queueSize = GetTotalQueueSize(controller); |
| + return controller[readableStreamDefaultControllerStrategyHWM] - queueSize; |
| } |
| function IsReadableStream(x) { |
| - return hasOwnProperty(x, readableStreamUnderlyingSource); |
| + return hasOwnProperty(x, readableStreamController); |
| } |
| function IsReadableStreamDisturbed(stream) { |
| @@ -513,8 +563,12 @@ |
| return stream[readableStreamReader] !== undefined; |
| } |
| - function IsReadableStreamController(x) { |
| - return hasOwnProperty(x, readableStreamControllerControlledReadableStream); |
| + function IsReadableStreamDefaultController(x) { |
| + return hasOwnProperty(x, readableStreamDefaultControllerControlledReadableStream); |
| + } |
| + |
| + function IsReadableStreamDefaultReader(x) { |
| + return hasOwnProperty(x, readableStreamDefaultReaderReadRequests); |
| } |
| function IsReadableStreamReadable(stream) { |
| @@ -529,11 +583,57 @@ |
| return stream[readableStreamState] === STATE_ERRORED; |
| } |
| - function IsReadableStreamReader(x) { |
| - return hasOwnProperty(x, readableStreamReaderOwnerReadableStream); |
| + function ReadableStreamReaderGenericInitialize(reader, stream) { |
| + // TODO(yhirano): Remove this when we don't need hasPendingActivity in |
| + // blink::UnderlyingSourceBase. |
| + const controller = stream[readableStreamController]; |
| + if (controller[readableStreamDefaultControllerBits] & EXTERNALLY_CONTROLLED) { |
| + // The stream is created with an external controller (i.e. made in |
| + // Blink). |
| + const underlyingSource = controller[readableStreamDefaultControllerUnderlyingSource]; |
| + callFunction(underlyingSource.notifyLockAcquired, underlyingSource); |
| + } |
| + |
| + reader[readableStreamReaderOwnerReadableStream] = stream; |
| + stream[readableStreamReader] = reader; |
| + |
| + switch (stream[readableStreamState]) { |
| + case STATE_READABLE: |
| + reader[readableStreamReaderClosedPromise] = v8.createPromise(); |
| + break; |
| + case STATE_CLOSED: |
| + reader[readableStreamReaderClosedPromise] = Promise_resolve(undefined); |
| + break; |
| + case STATE_ERRORED: |
| + reader[readableStreamReaderClosedPromise] = |
| + Promise_reject(stream[readableStreamStoredError]); |
| + break; |
| + } |
| + } |
| + |
| + function ReadableStreamReaderGenericRelease(reader) { |
| + // TODO(yhirano): Remove this when we don't need hasPendingActivity in |
| + // blink::UnderlyingSourceBase. |
| + const controller = reader[readableStreamReaderOwnerReadableStream][readableStreamController]; |
| + if (controller[readableStreamDefaultControllerBits] & EXTERNALLY_CONTROLLED) { |
| + // The stream is created with an external controller (i.e. made in |
| + // Blink). |
| + const underlyingSource = controller[readableStreamDefaultControllerUnderlyingSource]; |
| + callFunction(underlyingSource.notifyLockReleased, underlyingSource); |
| + } |
| + |
| + if (reader[readableStreamReaderOwnerReadableStream][readableStreamState] === STATE_READABLE) { |
| + v8.rejectPromise(reader[readableStreamReaderClosedPromise], new TypeError(errReleasedReaderClosedPromise)); |
| + } else { |
| + reader[readableStreamReaderClosedPromise] = Promise_reject(new TypeError(errReleasedReaderClosedPromise)); |
| + } |
| + |
| + reader[readableStreamReaderOwnerReadableStream][readableStreamReader] = |
| + undefined; |
| + reader[readableStreamReaderOwnerReadableStream] = undefined; |
| } |
| - function ReadFromReadableStreamReader(reader) { |
| + function ReadableStreamDefaultReaderRead(reader) { |
| const stream = reader[readableStreamReaderOwnerReadableStream]; |
| stream[readableStreamBits] |= DISTURBED; |
| @@ -545,83 +645,63 @@ |
| return Promise_reject(stream[readableStreamStoredError]); |
| } |
| - const queue = stream[readableStreamQueue]; |
| - if (queue.length > 0) { |
| - const chunk = DequeueValue(stream); |
| - |
| - if (stream[readableStreamBits] & CLOSE_REQUESTED && queue.length === 0) { |
| - FinishClosingReadableStream(stream); |
| - } else { |
| - RequestReadableStreamPull(stream); |
| - } |
| - |
| - return Promise_resolve(CreateIterResultObject(chunk, false)); |
| - } else { |
| - const readRequest = v8.createPromise(); |
| - |
| - reader[readableStreamReaderReadRequests].push(readRequest); |
| - RequestReadableStreamPull(stream); |
| - return readRequest; |
| - } |
| + return ReadableStreamDefaultControllerPull(stream[readableStreamController]); |
| } |
| - function RequestReadableStreamPull(stream) { |
| - const shouldPull = ShouldReadableStreamPull(stream); |
| + function ReadableStreamDefaultControllerCallPullIfNeeded(controller) { |
| + const shouldPull = ReadableStreamDefaultControllerShouldPull(controller); |
| if (shouldPull === false) { |
| return undefined; |
| } |
| - if (stream[readableStreamBits] & PULLING) { |
| - stream[readableStreamBits] |= PULL_AGAIN; |
| + if (controller[readableStreamDefaultControllerBits] & PULLING) { |
| + controller[readableStreamDefaultControllerBits] |= PULL_AGAIN; |
| return undefined; |
| } |
| - stream[readableStreamBits] |= PULLING; |
| + controller[readableStreamDefaultControllerBits] |= PULLING; |
| - const underlyingSource = stream[readableStreamUnderlyingSource]; |
| - const controller = stream[readableStreamController]; |
| + const underlyingSource = controller[readableStreamDefaultControllerUnderlyingSource]; |
| const pullPromise = PromiseCallOrNoop( |
| underlyingSource, 'pull', controller, 'underlyingSource.pull'); |
| thenPromise(pullPromise, |
| () => { |
| - stream[readableStreamBits] &= ~PULLING; |
| + controller[readableStreamDefaultControllerBits] &= ~PULLING; |
| - if (stream[readableStreamBits] & PULL_AGAIN) { |
| - stream[readableStreamBits] &= ~PULL_AGAIN; |
| - return RequestReadableStreamPull(stream); |
| + if (controller[readableStreamDefaultControllerBits] & PULL_AGAIN) { |
| + controller[readableStreamDefaultControllerBits] &= ~PULL_AGAIN; |
| + return ReadableStreamDefaultControllerCallPullIfNeeded(controller); |
| } |
| }, |
| e => { |
| - if (stream[readableStreamState] === STATE_READABLE) { |
| - return ErrorReadableStream(stream, e); |
| + if (controller[readableStreamDefaultControllerControlledReadableStream][readableStreamState] === STATE_READABLE) { |
| + return ReadableStreamDefaultControllerError(controller, e); |
| } |
| }); |
| } |
| - function ShouldReadableStreamPull(stream) { |
| + function ReadableStreamDefaultControllerShouldPull(controller) { |
| + const stream = controller[readableStreamDefaultControllerControlledReadableStream]; |
| + |
| const state = stream[readableStreamState]; |
| if (state === STATE_CLOSED || state === STATE_ERRORED) { |
| return false; |
| } |
| - if (stream[readableStreamBits] & CLOSE_REQUESTED) { |
| + if (controller[readableStreamDefaultControllerBits] & CLOSE_REQUESTED) { |
| return false; |
| } |
| - if (!(stream[readableStreamBits] & STARTED)) { |
| + if (!(controller[readableStreamDefaultControllerBits] & STARTED)) { |
| return false; |
| } |
| - if (IsReadableStreamLocked(stream) === true) { |
| - const reader = stream[readableStreamReader]; |
| - const readRequests = reader[readableStreamReaderReadRequests]; |
| - if (readRequests.length > 0) { |
| - return true; |
| - } |
| + if (IsReadableStreamLocked(stream) === true && ReadableStreamGetNumReadRequests(stream) > 0) { |
| + return true; |
| } |
| - const desiredSize = GetReadableStreamDesiredSize(stream); |
| + const desiredSize = ReadableStreamDefaultControllerGetDesiredSize(controller); |
| if (desiredSize > 0) { |
| return true; |
| } |
| @@ -629,13 +709,19 @@ |
| return false; |
| } |
| + function ReadableStreamGetNumReadRequests(stream) { |
| + const reader = stream[readableStreamReader]; |
| + const readRequests = reader[readableStreamDefaultReaderReadRequests]; |
| + return readRequests.length; |
| + } |
| + |
| // Potential future optimization: use class instances for the underlying |
| // sources, so that we don't re-create |
| // closures every time. |
| // TODO(domenic): shouldClone argument from spec not supported yet |
| - function TeeReadableStream(stream) { |
| - const reader = AcquireReadableStreamReader(stream); |
| + function ReadableStreamTee(stream) { |
| + const reader = AcquireReadableStreamDefaultReader(stream); |
| let closedOrErrored = false; |
| let canceled1 = false; |
| @@ -644,9 +730,12 @@ |
| let reason2; |
| let promise = v8.createPromise(); |
| - const branch1 = new ReadableStream({pull, cancel: cancel1}); |
| + const branch1Stream = new ReadableStream({pull, cancel: cancel1}); |
| - const branch2 = new ReadableStream({pull, cancel: cancel2}); |
| + const branch2Stream = new ReadableStream({pull, cancel: cancel2}); |
| + |
| + const branch1 = branch1Stream[readableStreamController]; |
| + const branch2 = branch2Stream[readableStreamController]; |
| thenPromise( |
| reader[readableStreamReaderClosedPromise], undefined, function(r) { |
| @@ -654,23 +743,26 @@ |
| return; |
| } |
| - ErrorReadableStream(branch1, r); |
| - ErrorReadableStream(branch2, r); |
| + ReadableStreamDefaultControllerError(branch1, r); |
| + ReadableStreamDefaultControllerError(branch2, r); |
| closedOrErrored = true; |
| }); |
| - return [branch1, branch2]; |
| - |
| + return [branch1Stream, branch2Stream]; |
| function pull() { |
| return thenPromise( |
| - ReadFromReadableStreamReader(reader), function(result) { |
| + ReadableStreamDefaultReaderRead(reader), function(result) { |
| const value = result.value; |
| const done = result.done; |
| if (done === true && closedOrErrored === false) { |
| - CloseReadableStream(branch1); |
| - CloseReadableStream(branch2); |
| + if (canceled1 === false) { |
| + ReadableStreamDefaultControllerClose(branch1); |
| + } |
| + if (canceled2 === false) { |
| + ReadableStreamDefaultControllerClose(branch2); |
| + } |
| closedOrErrored = true; |
| } |
| @@ -679,11 +771,11 @@ |
| } |
| if (canceled1 === false) { |
| - EnqueueInReadableStream(branch1, value); |
| + ReadableStreamDefaultControllerEnqueue(branch1, value); |
| } |
| if (canceled2 === false) { |
| - EnqueueInReadableStream(branch2, value); |
| + ReadableStreamDefaultControllerEnqueue(branch2, value); |
| } |
| }); |
| } |
| @@ -694,7 +786,7 @@ |
| if (canceled2 === true) { |
| const compositeReason = [reason1, reason2]; |
| - const cancelResult = CancelReadableStream(stream, compositeReason); |
| + const cancelResult = ReadableStreamCancel(stream, compositeReason); |
| v8.resolvePromise(promise, cancelResult); |
| } |
| @@ -707,7 +799,7 @@ |
| if (canceled1 === true) { |
| const compositeReason = [reason1, reason2]; |
| - const cancelResult = CancelReadableStream(stream, compositeReason); |
| + const cancelResult = ReadableStreamCancel(stream, compositeReason); |
| v8.resolvePromise(promise, cancelResult); |
| } |
| @@ -721,23 +813,23 @@ |
| // can modify the queue size alongside. |
| // |
| - function DequeueValue(stream) { |
| - const result = stream[readableStreamQueue].shift(); |
| - stream[readableStreamQueueSize] -= result.size; |
| + function DequeueValue(controller) { |
| + const result = controller[readableStreamDefaultControllerQueue].shift(); |
| + controller[readableStreamDefaultControllerQueueSize] -= result.size; |
| return result.value; |
| } |
| - function EnqueueValueWithSize(stream, value, size) { |
| + function EnqueueValueWithSize(controller, value, size) { |
| size = Number(size); |
| if (Number_isNaN(size) || size === +Infinity || size < 0) { |
| throw new RangeError(errInvalidSize); |
| } |
| - stream[readableStreamQueueSize] += size; |
| - stream[readableStreamQueue].push({value, size}); |
| + controller[readableStreamDefaultControllerQueueSize] += size; |
| + controller[readableStreamDefaultControllerQueue].push({value, size}); |
| } |
| - function GetTotalQueueSize(stream) { return stream[readableStreamQueueSize]; } |
| + function GetTotalQueueSize(controller) { return controller[readableStreamDefaultControllerQueueSize]; } |
| // |
| // Other helpers |
| @@ -787,7 +879,7 @@ |
| } |
| if (typeof method !== 'function') { |
| - return Promise_reject(errTmplMustBeFunctionOrUndefined(nameForError)); |
| + return Promise_reject(new TypeError(errTmplMustBeFunctionOrUndefined(nameForError))); |
| } |
| try { |
| @@ -815,7 +907,7 @@ |
| // Exports to Blink |
| // |
| - binding.AcquireReadableStreamReader = AcquireReadableStreamReader; |
| + binding.AcquireReadableStreamDefaultReader = AcquireReadableStreamDefaultReader; |
| binding.IsReadableStream = IsReadableStream; |
| binding.IsReadableStreamDisturbed = IsReadableStreamDisturbed; |
| binding.SetReadableStreamDisturbed = SetReadableStreamDisturbed; |
| @@ -823,13 +915,13 @@ |
| binding.IsReadableStreamReadable = IsReadableStreamReadable; |
| binding.IsReadableStreamClosed = IsReadableStreamClosed; |
| binding.IsReadableStreamErrored = IsReadableStreamErrored; |
| - binding.IsReadableStreamReader = IsReadableStreamReader; |
| - binding.ReadFromReadableStreamReader = ReadFromReadableStreamReader; |
| + binding.IsReadableStreamDefaultReader = IsReadableStreamDefaultReader; |
| + binding.ReadableStreamDefaultReaderRead = ReadableStreamDefaultReaderRead; |
| - binding.CloseReadableStream = CloseReadableStream; |
| - binding.GetReadableStreamDesiredSize = GetReadableStreamDesiredSize; |
| - binding.EnqueueInReadableStream = EnqueueInReadableStream; |
| - binding.ErrorReadableStream = ErrorReadableStream; |
| + binding.ReadableStreamDefaultControllerClose = ReadableStreamDefaultControllerClose; |
| + binding.ReadableStreamDefaultControllerGetDesiredSize = ReadableStreamDefaultControllerGetDesiredSize; |
| + binding.ReadableStreamDefaultControllerEnqueue = ReadableStreamDefaultControllerEnqueue; |
| + binding.ReadableStreamDefaultControllerError = ReadableStreamDefaultControllerError; |
| binding.createReadableStreamWithExternalController = |
| (underlyingSource, strategy) => { |