Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(802)

Unified Diff: third_party/WebKit/Source/core/streams/ReadableStream.js

Issue 1902673003: Reflect recent spec changes to V8 Extra ReadableStream impl (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Fixed build Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/WebKit/Source/core/streams/ReadableStream.js
diff --git a/third_party/WebKit/Source/core/streams/ReadableStream.js b/third_party/WebKit/Source/core/streams/ReadableStream.js
index 644af5008f268f77c1451710fb000ffdbdcca3fc..c02dce5727a1ce6b946ca710ddf5d64c70ba7e83 100644
--- a/third_party/WebKit/Source/core/streams/ReadableStream.js
+++ b/third_party/WebKit/Source/core/streams/ReadableStream.js
@@ -5,42 +5,53 @@
(function(global, binding, v8) {
'use strict';
- const readableStreamController = v8.createPrivateSymbol('[[controller]]');
- const readableStreamQueue = v8.createPrivateSymbol('[[queue]]');
- const readableStreamQueueSize =
- v8.createPrivateSymbol('[[queue]] total size');
const readableStreamReader = v8.createPrivateSymbol('[[reader]]');
- const readableStreamState = v8.createPrivateSymbol('[[state]]');
const readableStreamStoredError = v8.createPrivateSymbol('[[storedError]]');
- const readableStreamStrategySize = v8.createPrivateSymbol('[[strategySize]]');
- const readableStreamStrategyHWM = v8.createPrivateSymbol('[[strategyHWM]]');
- const readableStreamUnderlyingSource =
- v8.createPrivateSymbol('[[underlyingSource]]');
-
- const readableStreamControllerControlledReadableStream =
- v8.createPrivateSymbol('[[controlledReadableStream]]');
+ const readableStreamController = v8.createPrivateSymbol('[[controller]]');
const readableStreamReaderClosedPromise =
v8.createPrivateSymbol('[[closedPromise]]');
const readableStreamReaderOwnerReadableStream =
v8.createPrivateSymbol('[[ownerReadableStream]]');
- const readableStreamReaderReadRequests =
+
+ const readableStreamDefaultReaderReadRequests =
v8.createPrivateSymbol('[[readRequests]]');
const createWithExternalControllerSentinel =
v8.createPrivateSymbol('flag for UA-created ReadableStream to pass');
+ const readableStreamBits = v8.createPrivateSymbol('bit field for [[state]] and [[disturbed]]');
+ const DISTURBED = 0b1;
+ // The 2nd and 3rd bit are for [[state]].
+ const STATE_MASK = 0b110;
+ const STATE_BITS_OFFSET = 1;
const STATE_READABLE = 0;
const STATE_CLOSED = 1;
const STATE_ERRORED = 2;
- const readableStreamBits = v8.createPrivateSymbol(
- 'bit field for [[started]], [[closeRequested]], [[pulling]], [[pullAgain]], [[disturbed]]');
+ const readableStreamDefaultControllerUnderlyingSource =
+ v8.createPrivateSymbol('[[underlyingSource]]');
+ const readableStreamDefaultControllerControlledReadableStream =
+ v8.createPrivateSymbol('[[controlledReadableStream]]');
+ const readableStreamDefaultControllerQueue = v8.createPrivateSymbol('[[queue]]');
+ const readableStreamDefaultControllerQueueSize =
+ v8.createPrivateSymbol('[[queue]] total size');
+ const readableStreamDefaultControllerStrategySize =
+ v8.createPrivateSymbol('[[strategySize]]');
+ const readableStreamDefaultControllerStrategyHWM =
+ v8.createPrivateSymbol('[[strategyHWM]]');
+
+ const readableStreamDefaultControllerBits = v8.createPrivateSymbol(
+ 'bit field for [[started]], [[closeRequested]], [[pulling]], [[pullAgain]]');
const STARTED = 0b1;
const CLOSE_REQUESTED = 0b10;
const PULLING = 0b100;
const PULL_AGAIN = 0b1000;
- const DISTURBED = 0b10000;
+ const EXTERNALLY_CONTROLLED = 0b10000;
+
+ const readableStreamControllerCancel =
+ v8.createPrivateSymbol('[[InternalCancel]]');
+ const readableStreamControllerPull = v8.createPrivateSymbol('[[InternalPull]]');
const undefined = global.undefined;
const Infinity = global.Infinity;
@@ -65,7 +76,7 @@
const errIllegalConstructor = 'Illegal constructor';
const errCancelLockedStream =
'Cannot cancel a readable stream that is locked to a reader';
- const errEnqueueInCloseRequestedStream =
+ const errEnqueueCloseRequestedStream =
'Cannot enqueue a chunk into a readable stream that is closed or has been requested to be closed';
const errCancelReleasedReader =
'This readable stream reader has been released and cannot be used to cancel its previous owner stream';
@@ -73,10 +84,15 @@
'This readable stream reader has been released and cannot be used to read from its previous owner stream';
const errCloseCloseRequestedStream =
'Cannot close a readable stream that has already been requested to be closed';
+ const errEnqueueClosedStream = 'Cannot enqueue a chunk into a closed readable stream';
+ const errEnqueueErroredStream = 'Cannot enqueue a chunk into an errored readable stream';
+ const errCloseClosedStream = 'Cannot close a closed readable stream';
const errCloseErroredStream = 'Cannot close an errored readable stream';
const errErrorClosedStream = 'Cannot error a close readable stream';
const errErrorErroredStream =
'Cannot error a readable stream that is already errored';
+ const errGetReaderNotByteStream = 'This readable stream does not support BYOB readers';
+ const errGetReaderBadMode = 'Invalid reader mode given: expected undefined or "byob"';
const errReaderConstructorBadArgument =
'ReadableStreamReader constructor argument is not a readable stream';
const errReaderConstructorStreamAlreadyLocked =
@@ -106,50 +122,28 @@
highWaterMark = 1;
}
- const normalizedStrategy =
- ValidateAndNormalizeQueuingStrategy(size, highWaterMark);
-
- this[readableStreamUnderlyingSource] = underlyingSource;
-
- this[readableStreamQueue] = new v8.InternalPackedArray();
- this[readableStreamQueueSize] = 0;
-
- this[readableStreamState] = STATE_READABLE;
this[readableStreamBits] = 0b0;
+ ReadableStreamSetState(this, STATE_READABLE);
this[readableStreamReader] = undefined;
this[readableStreamStoredError] = undefined;
- this[readableStreamStrategySize] = normalizedStrategy.size;
- this[readableStreamStrategyHWM] = normalizedStrategy.highWaterMark;
-
// Avoid allocating the controller if the stream is going to be controlled
// externally (i.e. from C++) anyway. All calls to underlyingSource
// methods will disregard their controller argument in such situations
// (but see below).
- const isControlledExternally =
- arguments[2] === createWithExternalControllerSentinel;
- const controller =
- isControlledExternally ? null : new ReadableStreamController(this);
- this[readableStreamController] = controller;
+ this[readableStreamController] = undefined;
- // We need to pass ourself to the underlyingSource start method for
- // externally-controlled streams. We use the now-useless controller
- // argument to do so.
- const argToStart = isControlledExternally ? this : controller;
+ const type = underlyingSource.type;
+ const typeString = String(type);
+ if (typeString === 'bytes') {
+ throw new RangeError('bytes type is not yet implemented');
+ } else if (type !== undefined) {
+ throw new RangeError('Invalid type is specified');
+ }
- const startResult = CallOrNoop(
- underlyingSource, 'start', argToStart, 'underlyingSource.start');
- thenPromise(Promise_resolve(startResult),
- () => {
- this[readableStreamBits] |= STARTED;
- RequestReadableStreamPull(this);
- },
- r => {
- if (this[readableStreamState] === STATE_READABLE) {
- return ErrorReadableStream(this, r);
- }
- });
+ this[readableStreamController] =
+ new ReadableStreamDefaultController(this, underlyingSource, size, highWaterMark, arguments[2] === createWithExternalControllerSentinel);
}
get locked() {
@@ -169,15 +163,27 @@
return Promise_reject(new TypeError(errCancelLockedStream));
}
- return CancelReadableStream(this, reason);
+ return ReadableStreamCancel(this, reason);
}
- getReader() {
+ getReader({ mode } = {}) {
if (IsReadableStream(this) === false) {
throw new TypeError(errIllegalInvocation);
}
- return AcquireReadableStreamReader(this);
+ if (mode === 'byob') {
+ if (IsReadableByteStreamDefaultController(this[readableStreamController]) === false) {
+ throw new TypeError(errGetReaderNotByteStream);
+ }
+
+ return AcquireReadableStreamBYOBReader(this);
+ }
+
+ if (mode === undefined) {
+ return AcquireReadableStreamDefaultReader(this);
+ }
+
+ throw new RangeError(errGetReaderBadMode);
}
tee() {
@@ -185,12 +191,12 @@
throw new TypeError(errIllegalInvocation);
}
- return TeeReadableStream(this);
+ return ReadableStreamTee(this);
}
}
- class ReadableStreamController {
- constructor(stream) {
+ class ReadableStreamDefaultController {
+ constructor(stream, underlyingSource, size, highWaterMark, isExternallyControlled) {
if (IsReadableStream(stream) === false) {
throw new TypeError(errIllegalConstructor);
}
@@ -199,75 +205,145 @@
throw new TypeError(errIllegalConstructor);
}
- this[readableStreamControllerControlledReadableStream] = stream;
+ this[readableStreamDefaultControllerControlledReadableStream] = stream;
+
+ this[readableStreamDefaultControllerUnderlyingSource] = underlyingSource;
+
+ this[readableStreamDefaultControllerQueue] = new v8.InternalPackedArray();
+ this[readableStreamDefaultControllerQueueSize] = 0;
+
+ this[readableStreamDefaultControllerBits] = 0b0;
+ if (isExternallyControlled === true) {
+ this[readableStreamDefaultControllerBits] |= EXTERNALLY_CONTROLLED;
+ }
+
+ const normalizedStrategy =
+ ValidateAndNormalizeQueuingStrategy(size, highWaterMark);
+ this[readableStreamDefaultControllerStrategySize] = normalizedStrategy.size;
+ this[readableStreamDefaultControllerStrategyHWM] = normalizedStrategy.highWaterMark;
+
+ const controller = this;
+
+ const startResult = CallOrNoop(
+ underlyingSource, 'start', this, 'underlyingSource.start');
+ thenPromise(Promise_resolve(startResult),
+ () => {
+ controller[readableStreamDefaultControllerBits] |= STARTED;
+ ReadableStreamDefaultControllerCallPullIfNeeded(controller);
+ },
+ r => {
+ if (ReadableStreamGetState(stream) === STATE_READABLE) {
+ ReadableStreamDefaultControllerError(controller, r);
+ }
+ });
}
get desiredSize() {
- if (IsReadableStreamController(this) === false) {
+ if (IsReadableStreamDefaultController(this) === false) {
throw new TypeError(errIllegalInvocation);
}
- return GetReadableStreamDesiredSize(
- this[readableStreamControllerControlledReadableStream]);
+ return ReadableStreamDefaultControllerGetDesiredSize(this);
}
close() {
- if (IsReadableStreamController(this) === false) {
+ if (IsReadableStreamDefaultController(this) === false) {
throw new TypeError(errIllegalInvocation);
}
- const stream = this[readableStreamControllerControlledReadableStream];
+ const stream = this[readableStreamDefaultControllerControlledReadableStream];
- if (stream[readableStreamBits] & CLOSE_REQUESTED) {
+ if (this[readableStreamDefaultControllerBits] & CLOSE_REQUESTED) {
throw new TypeError(errCloseCloseRequestedStream);
}
- if (stream[readableStreamState] === STATE_ERRORED) {
+
+ const state = ReadableStreamGetState(stream);
+ if (state === STATE_ERRORED) {
throw new TypeError(errCloseErroredStream);
}
+ if (state === STATE_CLOSED) {
+ throw new TypeError(errCloseClosedStream);
+ }
- return CloseReadableStream(stream);
+ return ReadableStreamDefaultControllerClose(this);
}
enqueue(chunk) {
- if (IsReadableStreamController(this) === false) {
+ if (IsReadableStreamDefaultController(this) === false) {
throw new TypeError(errIllegalInvocation);
}
- const stream = this[readableStreamControllerControlledReadableStream];
+ const stream = this[readableStreamDefaultControllerControlledReadableStream];
- if (stream[readableStreamState] === STATE_ERRORED) {
- throw stream[readableStreamStoredError];
+ if (this[readableStreamDefaultControllerBits] & CLOSE_REQUESTED) {
+ throw new TypeError(errEnqueueCloseRequestedStream);
}
- if (stream[readableStreamBits] & CLOSE_REQUESTED) {
- throw new TypeError(errEnqueueInCloseRequestedStream);
+ const state = ReadableStreamGetState(stream);
+ if (state === STATE_ERRORED) {
+ throw new TypeError(errEnqueueErroredStream);
+ }
+ if (state === STATE_CLOSED) {
+ throw new TypeError(errEnqueueClosedStream);
}
- return EnqueueInReadableStream(stream, chunk);
+ return ReadableStreamDefaultControllerEnqueue(this, chunk);
}
error(e) {
- if (IsReadableStreamController(this) === false) {
+ if (IsReadableStreamDefaultController(this) === false) {
throw new TypeError(errIllegalInvocation);
}
- const stream = this[readableStreamControllerControlledReadableStream];
+ const stream = this[readableStreamDefaultControllerControlledReadableStream];
- const state = stream[readableStreamState];
- if (state !== STATE_READABLE) {
- if (state === STATE_ERRORED) {
- throw new TypeError(errErrorErroredStream);
- }
- if (state === STATE_CLOSED) {
- throw new TypeError(errErrorClosedStream);
- }
+ const state = ReadableStreamGetState(stream);
+ if (state === STATE_ERRORED) {
+ throw new TypeError(errErrorErroredStream);
+ }
+ if (state === STATE_CLOSED) {
+ throw new TypeError(errErrorClosedStream);
+ }
+
+ return ReadableStreamDefaultControllerError(this, e);
+ }
+ }
+
+ function ReadableStreamDefaultControllerCancel(controller, reason) {
+ controller[readableStreamDefaultControllerQueue] = new v8.InternalPackedArray();
+
+ const underlyingSource = controller[readableStreamDefaultControllerUnderlyingSource];
+ return PromiseCallOrNoop(underlyingSource, 'cancel', reason, 'underlyingSource.cancel');
+ }
+
+ function ReadableStreamDefaultControllerPull(controller) {
+ const stream = controller[readableStreamDefaultControllerControlledReadableStream];
+
+ if (controller[readableStreamDefaultControllerQueue].length > 0) {
+ const chunk = DequeueValue(controller);
+
+ if ((controller[readableStreamDefaultControllerBits] & CLOSE_REQUESTED) &&
+ controller[readableStreamDefaultControllerQueue].length === 0) {
+ ReadableStreamClose(stream);
+ } else {
+ ReadableStreamDefaultControllerCallPullIfNeeded(controller);
}
- return ErrorReadableStream(stream, e);
+ return Promise_resolve(CreateIterResultObject(chunk, false));
}
+
+ const pendingPromise = ReadableStreamAddReadRequest(stream);
+ ReadableStreamDefaultControllerCallPullIfNeeded(controller);
+ return pendingPromise;
+ }
+
+ function ReadableStreamAddReadRequest(stream) {
+ const promise = v8.createPromise();
+ stream[readableStreamReader][readableStreamDefaultReaderReadRequests].push(promise);
+ return promise;
}
- class ReadableStreamReader {
+ class ReadableStreamDefaultReader {
constructor(stream) {
if (IsReadableStream(stream) === false) {
throw new TypeError(errReaderConstructorBadArgument);
@@ -276,36 +352,13 @@
throw new TypeError(errReaderConstructorStreamAlreadyLocked);
}
- // TODO(yhirano): Remove this when we don't need hasPendingActivity in
- // blink::UnderlyingSourceBase.
- if (stream[readableStreamController] === null) {
- // The stream is created with an external controller (i.e. made in
- // Blink).
- const underlyingSource = stream[readableStreamUnderlyingSource];
- callFunction(underlyingSource.notifyLockAcquired, underlyingSource);
- }
-
- this[readableStreamReaderOwnerReadableStream] = stream;
- stream[readableStreamReader] = this;
-
- this[readableStreamReaderReadRequests] = new v8.InternalPackedArray();
+ ReadableStreamReaderGenericInitialize(this, stream);
- switch (stream[readableStreamState]) {
- case STATE_READABLE:
- this[readableStreamReaderClosedPromise] = v8.createPromise();
- break;
- case STATE_CLOSED:
- this[readableStreamReaderClosedPromise] = Promise_resolve(undefined);
- break;
- case STATE_ERRORED:
- this[readableStreamReaderClosedPromise] =
- Promise_reject(stream[readableStreamStoredError]);
- break;
- }
+ this[readableStreamDefaultReaderReadRequests] = new v8.InternalPackedArray();
}
get closed() {
- if (IsReadableStreamReader(this) === false) {
+ if (IsReadableStreamDefaultReader(this) === false) {
return Promise_reject(new TypeError(errIllegalInvocation));
}
@@ -313,7 +366,7 @@
}
cancel(reason) {
- if (IsReadableStreamReader(this) === false) {
+ if (IsReadableStreamDefaultReader(this) === false) {
return Promise_reject(new TypeError(errIllegalInvocation));
}
@@ -322,11 +375,11 @@
return Promise_reject(new TypeError(errCancelReleasedReader));
}
- return CancelReadableStream(stream, reason);
+ return ReadableStreamReaderGenericCancel(this, reason);
}
read() {
- if (IsReadableStreamReader(this) === false) {
+ if (IsReadableStreamDefaultReader(this) === false) {
return Promise_reject(new TypeError(errIllegalInvocation));
}
@@ -334,11 +387,11 @@
return Promise_reject(new TypeError(errReadReleasedReader));
}
- return ReadFromReadableStreamReader(this);
+ return ReadableStreamDefaultReaderRead(this);
}
releaseLock() {
- if (IsReadableStreamReader(this) === false) {
+ if (IsReadableStreamDefaultReader(this) === false) {
throw new TypeError(errIllegalInvocation);
}
@@ -347,45 +400,30 @@
return undefined;
}
- if (this[readableStreamReaderReadRequests].length > 0) {
+ if (this[readableStreamDefaultReaderReadRequests].length > 0) {
throw new TypeError(errReleaseReaderWithPendingRead);
}
- // TODO(yhirano): Remove this when we don't need hasPendingActivity in
- // blink::UnderlyingSourceBase.
- if (stream[readableStreamController] === null) {
- // The stream is created with an external controller (i.e. made in
- // Blink).
- const underlyingSource = stream[readableStreamUnderlyingSource];
- callFunction(underlyingSource.notifyLockReleased, underlyingSource);
- }
-
- if (stream[readableStreamState] === STATE_READABLE) {
- v8.rejectPromise(this[readableStreamReaderClosedPromise],
- new TypeError(errReleasedReaderClosedPromise));
- } else {
- this[readableStreamReaderClosedPromise] =
- Promise_reject(new TypeError(errReleasedReaderClosedPromise));
- }
-
- this[readableStreamReaderOwnerReadableStream][readableStreamReader] =
- undefined;
- this[readableStreamReaderOwnerReadableStream] = undefined;
+ ReadableStreamReaderGenericRelease(this);
}
}
+ function ReadableStreamReaderGenericCancel(reader, reason) {
+ return ReadableStreamCancel(reader[readableStreamReaderOwnerReadableStream], reason);
+ }
+
//
// Readable stream abstract operations
//
- function AcquireReadableStreamReader(stream) {
- return new ReadableStreamReader(stream);
+ function AcquireReadableStreamDefaultReader(stream) {
+ return new ReadableStreamDefaultReader(stream);
}
- function CancelReadableStream(stream, reason) {
+ function ReadableStreamCancel(stream, reason) {
stream[readableStreamBits] |= DISTURBED;
- const state = stream[readableStreamState];
+ const state = ReadableStreamGetState(stream);
if (state === STATE_CLOSED) {
return Promise_resolve(undefined);
}
@@ -393,112 +431,126 @@
return Promise_reject(stream[readableStreamStoredError]);
}
- stream[readableStreamQueue] = new v8.InternalPackedArray();
- FinishClosingReadableStream(stream);
+ ReadableStreamClose(stream);
- const underlyingSource = stream[readableStreamUnderlyingSource];
- const sourceCancelPromise = PromiseCallOrNoop(
- underlyingSource, 'cancel', reason, 'underlyingSource.cancel');
+ const sourceCancelPromise = ReadableStreamDefaultControllerCancel(stream[readableStreamController], reason);
return thenPromise(sourceCancelPromise, () => undefined);
}
- function CloseReadableStream(stream) {
- if (stream[readableStreamState] === STATE_CLOSED) {
- return undefined;
- }
+ function ReadableStreamDefaultControllerClose(controller) {
+ const stream = controller[readableStreamDefaultControllerControlledReadableStream];
- stream[readableStreamBits] |= CLOSE_REQUESTED;
+ controller[readableStreamDefaultControllerBits] |= CLOSE_REQUESTED;
- if (stream[readableStreamQueue].length === 0) {
- return FinishClosingReadableStream(stream);
+ if (controller[readableStreamDefaultControllerQueue].length === 0) {
+ ReadableStreamClose(stream);
}
}
- function EnqueueInReadableStream(stream, chunk) {
- if (stream[readableStreamState] === STATE_CLOSED) {
- return undefined;
- }
+ function ReadableStreamFulfillReadRequest(stream, chunk, done) {
+ const reader = stream[readableStreamReader];
+
+ const readRequest =
+ stream[readableStreamReader][readableStreamDefaultReaderReadRequests]
+ .shift();
+ v8.resolvePromise(readRequest, CreateIterResultObject(chunk, done));
+ }
+
+ function ReadableStreamDefaultControllerEnqueue(controller, chunk) {
+ const stream = controller[readableStreamDefaultControllerControlledReadableStream];
- if (IsReadableStreamLocked(stream) === true &&
- stream[readableStreamReader][readableStreamReaderReadRequests].length >
- 0) {
- const readRequest =
- stream[readableStreamReader][readableStreamReaderReadRequests]
- .shift();
- v8.resolvePromise(readRequest, CreateIterResultObject(chunk, false));
+ if (IsReadableStreamLocked(stream) === true && ReadableStreamGetNumReadRequests(stream) > 0) {
+ ReadableStreamFulfillReadRequest(stream, chunk, false);
} else {
let chunkSize = 1;
- const strategySize = stream[readableStreamStrategySize];
+ const strategySize = controller[readableStreamDefaultControllerStrategySize];
if (strategySize !== undefined) {
try {
chunkSize = strategySize(chunk);
} catch (chunkSizeE) {
- if (stream[readableStreamState] === STATE_READABLE) {
- ErrorReadableStream(stream, chunkSizeE);
+ if (ReadableStreamGetState(stream) === STATE_READABLE) {
+ ReadableStreamDefaultControllerError(controller, chunkSizeE);
}
throw chunkSizeE;
}
}
try {
- EnqueueValueWithSize(stream, chunk, chunkSize);
+ EnqueueValueWithSize(controller, chunk, chunkSize);
} catch (enqueueE) {
- if (stream[readableStreamState] === STATE_READABLE) {
- ErrorReadableStream(stream, enqueueE);
+ if (ReadableStreamGetState(stream) === STATE_READABLE) {
+ ReadableStreamDefaultControllerError(controller, enqueueE);
}
throw enqueueE;
}
}
- RequestReadableStreamPull(stream);
+ ReadableStreamDefaultControllerCallPullIfNeeded(controller);
+ }
+
+ function ReadableStreamGetState(stream) {
+ return (stream[readableStreamBits] & STATE_MASK) >> STATE_BITS_OFFSET;
+ }
+
+ function ReadableStreamSetState(stream, state) {
+ stream[readableStreamBits] = (stream[readableStreamBits] & ~STATE_MASK) |
+ (state << STATE_BITS_OFFSET);
+ }
+
+ function ReadableStreamDefaultControllerError(controller, e) {
+ controller[readableStreamDefaultControllerQueue] = new v8.InternalPackedArray();
+ const stream = controller[readableStreamDefaultControllerControlledReadableStream];
+ ReadableStreamError(stream, e);
}
- function ErrorReadableStream(stream, e) {
- stream[readableStreamQueue] = new v8.InternalPackedArray();
+ function ReadableStreamError(stream, e) {
stream[readableStreamStoredError] = e;
- stream[readableStreamState] = STATE_ERRORED;
+ ReadableStreamSetState(stream, STATE_ERRORED);
const reader = stream[readableStreamReader];
if (reader === undefined) {
return undefined;
}
- const readRequests = reader[readableStreamReaderReadRequests];
- for (let i = 0; i < readRequests.length; ++i) {
- v8.rejectPromise(readRequests[i], e);
+ if (IsReadableStreamDefaultReader(reader) === true) {
+ const readRequests = reader[readableStreamDefaultReaderReadRequests];
+ for (let i = 0; i < readRequests.length; i++) {
+ v8.rejectPromise(readRequests[i], e);
+ }
+ reader[readableStreamDefaultReaderReadRequests] = new v8.InternalPackedArray();
}
- reader[readableStreamReaderReadRequests] = new v8.InternalPackedArray();
v8.rejectPromise(reader[readableStreamReaderClosedPromise], e);
}
- function FinishClosingReadableStream(stream) {
- stream[readableStreamState] = STATE_CLOSED;
+ function ReadableStreamClose(stream) {
+ ReadableStreamSetState(stream, STATE_CLOSED);
const reader = stream[readableStreamReader];
if (reader === undefined) {
return undefined;
}
-
- const readRequests = reader[readableStreamReaderReadRequests];
- for (let i = 0; i < readRequests.length; ++i) {
- v8.resolvePromise(
- readRequests[i], CreateIterResultObject(undefined, true));
+ if (IsReadableStreamDefaultReader(reader) === true) {
+ const readRequests = reader[readableStreamDefaultReaderReadRequests];
+ for (let i = 0; i < readRequests.length; i++) {
+ v8.resolvePromise(
+ readRequests[i], CreateIterResultObject(undefined, true));
+ }
+ reader[readableStreamDefaultReaderReadRequests] = new v8.InternalPackedArray();
}
- reader[readableStreamReaderReadRequests] = new v8.InternalPackedArray();
v8.resolvePromise(reader[readableStreamReaderClosedPromise], undefined);
}
- function GetReadableStreamDesiredSize(stream) {
- const queueSize = GetTotalQueueSize(stream);
- return stream[readableStreamStrategyHWM] - queueSize;
+ function ReadableStreamDefaultControllerGetDesiredSize(controller) {
+ const queueSize = GetTotalQueueSize(controller);
+ return controller[readableStreamDefaultControllerStrategyHWM] - queueSize;
}
function IsReadableStream(x) {
- return hasOwnProperty(x, readableStreamUnderlyingSource);
+ return hasOwnProperty(x, readableStreamController);
}
function IsReadableStreamDisturbed(stream) {
@@ -509,115 +561,145 @@
return stream[readableStreamReader] !== undefined;
}
- function IsReadableStreamController(x) {
- return hasOwnProperty(x, readableStreamControllerControlledReadableStream);
+ function IsReadableStreamDefaultController(x) {
+ return hasOwnProperty(x, readableStreamDefaultControllerControlledReadableStream);
+ }
+
+ function IsReadableStreamDefaultReader(x) {
+ return hasOwnProperty(x, readableStreamDefaultReaderReadRequests);
}
function IsReadableStreamReadable(stream) {
- return stream[readableStreamState] === STATE_READABLE;
+ return ReadableStreamGetState(stream) === STATE_READABLE;
}
function IsReadableStreamClosed(stream) {
- return stream[readableStreamState] === STATE_CLOSED;
+ return ReadableStreamGetState(stream) === STATE_CLOSED;
}
function IsReadableStreamErrored(stream) {
- return stream[readableStreamState] === STATE_ERRORED;
+ return ReadableStreamGetState(stream) === STATE_ERRORED;
+ }
+
+ function ReadableStreamReaderGenericInitialize(reader, stream) {
+ // TODO(yhirano): Remove this when we don't need hasPendingActivity in
+ // blink::UnderlyingSourceBase.
+ const controller = stream[readableStreamController];
+ if (controller[readableStreamDefaultControllerBits] & EXTERNALLY_CONTROLLED) {
+ // The stream is created with an external controller (i.e. made in
+ // Blink).
+ const underlyingSource = controller[readableStreamDefaultControllerUnderlyingSource];
+ callFunction(underlyingSource.notifyLockAcquired, underlyingSource);
+ }
+
+ reader[readableStreamReaderOwnerReadableStream] = stream;
+ stream[readableStreamReader] = reader;
+
+ switch (ReadableStreamGetState(stream)) {
+ case STATE_READABLE:
+ reader[readableStreamReaderClosedPromise] = v8.createPromise();
+ break;
+ case STATE_CLOSED:
+ reader[readableStreamReaderClosedPromise] = Promise_resolve(undefined);
+ break;
+ case STATE_ERRORED:
+ reader[readableStreamReaderClosedPromise] =
+ Promise_reject(stream[readableStreamStoredError]);
+ break;
+ }
}
- function IsReadableStreamReader(x) {
- return hasOwnProperty(x, readableStreamReaderOwnerReadableStream);
+ function ReadableStreamReaderGenericRelease(reader) {
+ // TODO(yhirano): Remove this when we don't need hasPendingActivity in
+ // blink::UnderlyingSourceBase.
+ const controller = reader[readableStreamReaderOwnerReadableStream][readableStreamController];
+ if (controller[readableStreamDefaultControllerBits] & EXTERNALLY_CONTROLLED) {
+ // The stream is created with an external controller (i.e. made in
+ // Blink).
+ const underlyingSource = controller[readableStreamDefaultControllerUnderlyingSource];
+ callFunction(underlyingSource.notifyLockReleased, underlyingSource);
+ }
+
+ if (ReadableStreamGetState(reader[readableStreamReaderOwnerReadableStream]) === STATE_READABLE) {
+ v8.rejectPromise(reader[readableStreamReaderClosedPromise], new TypeError(errReleasedReaderClosedPromise));
+ } else {
+ reader[readableStreamReaderClosedPromise] = Promise_reject(new TypeError(errReleasedReaderClosedPromise));
+ }
+
+ reader[readableStreamReaderOwnerReadableStream][readableStreamReader] =
+ undefined;
+ reader[readableStreamReaderOwnerReadableStream] = undefined;
}
- function ReadFromReadableStreamReader(reader) {
+ function ReadableStreamDefaultReaderRead(reader) {
const stream = reader[readableStreamReaderOwnerReadableStream];
stream[readableStreamBits] |= DISTURBED;
- if (stream[readableStreamState] === STATE_CLOSED) {
+ if (ReadableStreamGetState(stream) === STATE_CLOSED) {
return Promise_resolve(CreateIterResultObject(undefined, true));
}
- if (stream[readableStreamState] === STATE_ERRORED) {
+ if (ReadableStreamGetState(stream) === STATE_ERRORED) {
return Promise_reject(stream[readableStreamStoredError]);
}
- const queue = stream[readableStreamQueue];
- if (queue.length > 0) {
- const chunk = DequeueValue(stream);
-
- if (stream[readableStreamBits] & CLOSE_REQUESTED && queue.length === 0) {
- FinishClosingReadableStream(stream);
- } else {
- RequestReadableStreamPull(stream);
- }
-
- return Promise_resolve(CreateIterResultObject(chunk, false));
- } else {
- const readRequest = v8.createPromise();
-
- reader[readableStreamReaderReadRequests].push(readRequest);
- RequestReadableStreamPull(stream);
- return readRequest;
- }
+ return ReadableStreamDefaultControllerPull(stream[readableStreamController]);
}
- function RequestReadableStreamPull(stream) {
- const shouldPull = ShouldReadableStreamPull(stream);
+ function ReadableStreamDefaultControllerCallPullIfNeeded(controller) {
+ const shouldPull = ReadableStreamDefaultControllerShouldCallPull(controller);
if (shouldPull === false) {
return undefined;
}
- if (stream[readableStreamBits] & PULLING) {
- stream[readableStreamBits] |= PULL_AGAIN;
+ if (controller[readableStreamDefaultControllerBits] & PULLING) {
+ controller[readableStreamDefaultControllerBits] |= PULL_AGAIN;
return undefined;
}
- stream[readableStreamBits] |= PULLING;
+ controller[readableStreamDefaultControllerBits] |= PULLING;
- const underlyingSource = stream[readableStreamUnderlyingSource];
- const controller = stream[readableStreamController];
+ const underlyingSource = controller[readableStreamDefaultControllerUnderlyingSource];
const pullPromise = PromiseCallOrNoop(
underlyingSource, 'pull', controller, 'underlyingSource.pull');
thenPromise(pullPromise,
() => {
- stream[readableStreamBits] &= ~PULLING;
+ controller[readableStreamDefaultControllerBits] &= ~PULLING;
- if (stream[readableStreamBits] & PULL_AGAIN) {
- stream[readableStreamBits] &= ~PULL_AGAIN;
- return RequestReadableStreamPull(stream);
+ if (controller[readableStreamDefaultControllerBits] & PULL_AGAIN) {
+ controller[readableStreamDefaultControllerBits] &= ~PULL_AGAIN;
+ ReadableStreamDefaultControllerCallPullIfNeeded(controller);
}
},
e => {
- if (stream[readableStreamState] === STATE_READABLE) {
- return ErrorReadableStream(stream, e);
+ if (ReadableStreamGetState(controller[readableStreamDefaultControllerControlledReadableStream]) === STATE_READABLE) {
+ ReadableStreamDefaultControllerError(controller, e);
}
});
}
- function ShouldReadableStreamPull(stream) {
- const state = stream[readableStreamState];
+ function ReadableStreamDefaultControllerShouldCallPull(controller) {
+ const stream = controller[readableStreamDefaultControllerControlledReadableStream];
+
+ const state = ReadableStreamGetState(stream);
if (state === STATE_CLOSED || state === STATE_ERRORED) {
return false;
}
- if (stream[readableStreamBits] & CLOSE_REQUESTED) {
+ if (controller[readableStreamDefaultControllerBits] & CLOSE_REQUESTED) {
return false;
}
- if (!(stream[readableStreamBits] & STARTED)) {
+ if (!(controller[readableStreamDefaultControllerBits] & STARTED)) {
return false;
}
- if (IsReadableStreamLocked(stream) === true) {
- const reader = stream[readableStreamReader];
- const readRequests = reader[readableStreamReaderReadRequests];
- if (readRequests.length > 0) {
- return true;
- }
+ if (IsReadableStreamLocked(stream) === true && ReadableStreamGetNumReadRequests(stream) > 0) {
+ return true;
}
- const desiredSize = GetReadableStreamDesiredSize(stream);
+ const desiredSize = ReadableStreamDefaultControllerGetDesiredSize(controller);
if (desiredSize > 0) {
return true;
}
@@ -625,13 +707,19 @@
return false;
}
+ function ReadableStreamGetNumReadRequests(stream) {
+ const reader = stream[readableStreamReader];
+ const readRequests = reader[readableStreamDefaultReaderReadRequests];
+ return readRequests.length;
+ }
+
// Potential future optimization: use class instances for the underlying
// sources, so that we don't re-create
// closures every time.
// TODO(domenic): shouldClone argument from spec not supported yet
- function TeeReadableStream(stream) {
- const reader = AcquireReadableStreamReader(stream);
+ function ReadableStreamTee(stream) {
+ const reader = AcquireReadableStreamDefaultReader(stream);
let closedOrErrored = false;
let canceled1 = false;
@@ -640,9 +728,12 @@
let reason2;
let promise = v8.createPromise();
- const branch1 = new ReadableStream({pull, cancel: cancel1});
+ const branch1Stream = new ReadableStream({pull, cancel: cancel1});
+
+ const branch2Stream = new ReadableStream({pull, cancel: cancel2});
- const branch2 = new ReadableStream({pull, cancel: cancel2});
+ const branch1 = branch1Stream[readableStreamController];
+ const branch2 = branch2Stream[readableStreamController];
thenPromise(
reader[readableStreamReaderClosedPromise], undefined, function(r) {
@@ -650,23 +741,26 @@
return;
}
- ErrorReadableStream(branch1, r);
- ErrorReadableStream(branch2, r);
+ ReadableStreamDefaultControllerError(branch1, r);
+ ReadableStreamDefaultControllerError(branch2, r);
closedOrErrored = true;
});
- return [branch1, branch2];
-
+ return [branch1Stream, branch2Stream];
function pull() {
return thenPromise(
- ReadFromReadableStreamReader(reader), function(result) {
+ ReadableStreamDefaultReaderRead(reader), function(result) {
const value = result.value;
const done = result.done;
if (done === true && closedOrErrored === false) {
- CloseReadableStream(branch1);
- CloseReadableStream(branch2);
+ if (canceled1 === false) {
+ ReadableStreamDefaultControllerClose(branch1);
+ }
+ if (canceled2 === false) {
+ ReadableStreamDefaultControllerClose(branch2);
+ }
closedOrErrored = true;
}
@@ -675,11 +769,11 @@
}
if (canceled1 === false) {
- EnqueueInReadableStream(branch1, value);
+ ReadableStreamDefaultControllerEnqueue(branch1, value);
}
if (canceled2 === false) {
- EnqueueInReadableStream(branch2, value);
+ ReadableStreamDefaultControllerEnqueue(branch2, value);
}
});
}
@@ -690,7 +784,7 @@
if (canceled2 === true) {
const compositeReason = [reason1, reason2];
- const cancelResult = CancelReadableStream(stream, compositeReason);
+ const cancelResult = ReadableStreamCancel(stream, compositeReason);
v8.resolvePromise(promise, cancelResult);
}
@@ -703,7 +797,7 @@
if (canceled1 === true) {
const compositeReason = [reason1, reason2];
- const cancelResult = CancelReadableStream(stream, compositeReason);
+ const cancelResult = ReadableStreamCancel(stream, compositeReason);
v8.resolvePromise(promise, cancelResult);
}
@@ -717,23 +811,23 @@
// can modify the queue size alongside.
//
- function DequeueValue(stream) {
- const result = stream[readableStreamQueue].shift();
- stream[readableStreamQueueSize] -= result.size;
+ function DequeueValue(controller) {
+ const result = controller[readableStreamDefaultControllerQueue].shift();
+ controller[readableStreamDefaultControllerQueueSize] -= result.size;
return result.value;
}
- function EnqueueValueWithSize(stream, value, size) {
+ function EnqueueValueWithSize(controller, value, size) {
size = Number(size);
if (Number_isNaN(size) || size === +Infinity || size < 0) {
throw new RangeError(errInvalidSize);
}
- stream[readableStreamQueueSize] += size;
- stream[readableStreamQueue].push({value, size});
+ controller[readableStreamDefaultControllerQueueSize] += size;
+ controller[readableStreamDefaultControllerQueue].push({value, size});
}
- function GetTotalQueueSize(stream) { return stream[readableStreamQueueSize]; }
+ function GetTotalQueueSize(controller) { return controller[readableStreamDefaultControllerQueueSize]; }
//
// Other helpers
@@ -783,7 +877,7 @@
}
if (typeof method !== 'function') {
- return Promise_reject(errTmplMustBeFunctionOrUndefined(nameForError));
+ return Promise_reject(new TypeError(errTmplMustBeFunctionOrUndefined(nameForError)));
}
try {
@@ -811,20 +905,20 @@
// Exports to Blink
//
- binding.AcquireReadableStreamReader = AcquireReadableStreamReader;
+ binding.AcquireReadableStreamDefaultReader = AcquireReadableStreamDefaultReader;
binding.IsReadableStream = IsReadableStream;
binding.IsReadableStreamDisturbed = IsReadableStreamDisturbed;
binding.IsReadableStreamLocked = IsReadableStreamLocked;
binding.IsReadableStreamReadable = IsReadableStreamReadable;
binding.IsReadableStreamClosed = IsReadableStreamClosed;
binding.IsReadableStreamErrored = IsReadableStreamErrored;
- binding.IsReadableStreamReader = IsReadableStreamReader;
- binding.ReadFromReadableStreamReader = ReadFromReadableStreamReader;
+ binding.IsReadableStreamDefaultReader = IsReadableStreamDefaultReader;
+ binding.ReadableStreamDefaultReaderRead = ReadableStreamDefaultReaderRead;
- binding.CloseReadableStream = CloseReadableStream;
- binding.GetReadableStreamDesiredSize = GetReadableStreamDesiredSize;
- binding.EnqueueInReadableStream = EnqueueInReadableStream;
- binding.ErrorReadableStream = ErrorReadableStream;
+ binding.ReadableStreamDefaultControllerClose = ReadableStreamDefaultControllerClose;
+ binding.ReadableStreamDefaultControllerGetDesiredSize = ReadableStreamDefaultControllerGetDesiredSize;
+ binding.ReadableStreamDefaultControllerEnqueue = ReadableStreamDefaultControllerEnqueue;
+ binding.ReadableStreamDefaultControllerError = ReadableStreamDefaultControllerError;
binding.createReadableStreamWithExternalController =
(underlyingSource, strategy) => {

Powered by Google App Engine
This is Rietveld 408576698