Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2214)

Unified Diff: third_party/WebKit/Source/core/streams/WritableStream.js

Issue 2772293002: Update WritableStream to new standard version (Closed)
Patch Set: Changes from domenic@ review Created 3 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « third_party/WebKit/Source/core/streams/ReadableStream.js ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: third_party/WebKit/Source/core/streams/WritableStream.js
diff --git a/third_party/WebKit/Source/core/streams/WritableStream.js b/third_party/WebKit/Source/core/streams/WritableStream.js
index ed39672b2175b71fe753f3c65e2ad50ae6e8f459..4a4429cba4265467df633a4571b661315efbae27 100644
--- a/third_party/WebKit/Source/core/streams/WritableStream.js
+++ b/third_party/WebKit/Source/core/streams/WritableStream.js
@@ -13,14 +13,20 @@
// Private symbols. These correspond to the internal slots in the standard.
// "[[X]]" in the standard is spelt _X in this implementation.
- const _pendingWriteRequest = v8.createPrivateSymbol('[[pendingWriteRequest]]');
- const _pendingCloseRequest = v8.createPrivateSymbol('[[pendingCloseRequest]]');
- const _pendingAbortRequest = v8.createPrivateSymbol('[[pendingAbortRequest]]');
- const _state = v8.createPrivateSymbol('[[state]]');
+
+ // TODO(ricea): Optimise [[closeRequest]] and [[inFlightCloseRequest]] into a
+ // single slot + a flag to say which one is set at the moment.
+ const _closeRequest = v8.createPrivateSymbol('[[closeRequest]]');
+ const _inFlightWriteRequest = v8.createPrivateSymbol('[[inFlightWriteRequest]]');
+ const _inFlightCloseRequest = v8.createPrivateSymbol('[[inFlightCloseRequest]]');
+ const _pendingAbortRequest =
+ v8.createPrivateSymbol('[[pendingAbortRequest]]');
+ // Flags and state are combined into a single integer field for efficiency.
+ const _stateAndFlags = v8.createPrivateSymbol('[[state]] and flags');
const _storedError = v8.createPrivateSymbol('[[storedError]]');
- const _writer = v8.createPrivateSymbol('[[writer]]');
const _writableStreamController =
v8.createPrivateSymbol('[[writableStreamController]]');
+ const _writer = v8.createPrivateSymbol('[[writer]]');
const _writeRequests = v8.createPrivateSymbol('[[writeRequests]]');
const _closedPromise = v8.createPrivateSymbol('[[closedPromise]]');
const _ownerWritableStream =
@@ -29,25 +35,21 @@
const _controlledWritableStream =
v8.createPrivateSymbol('[[controlledWritableStream]]');
const _queue = v8.createPrivateSymbol('[[queue]]');
- const _queueSize = v8.createPrivateSymbol('[[queueSize]]');
+ const _queueTotalSize = v8.createPrivateSymbol('[[queueTotalSize]]');
+ const _started = v8.createPrivateSymbol('[[started]]');
const _strategyHWM = v8.createPrivateSymbol('[[strategyHWM]]');
const _strategySize = v8.createPrivateSymbol('[[strategySize]]');
const _underlyingSink = v8.createPrivateSymbol('[[underlyingSink]]');
- // _defaultControllerFlags combines WritableStreamDefaultController's internal
- // slots [[started]], [[writing]], and [[inClose]] into a single bitmask for
- // efficiency.
- const _defaultControllerFlags =
- v8.createPrivateSymbol('[[defaultControllerFlags]]');
- const FLAG_STARTED = 0b1;
- const FLAG_WRITING = 0b10;
- const FLAG_INCLOSE = 0b100;
-
- // For efficiency, WritableStream [[state]] contains numeric values.
+ // Numeric encodings of states
const WRITABLE = 0;
- const CLOSING = 1;
- const CLOSED = 2;
- const ERRORED = 3;
+ const CLOSED = 1;
+ const ERRORED = 2;
+
+ // Mask to extract or assign states to _stateAndFlags
+ const STATE_MASK = 0xF;
+
+ const BACKPRESSURE_FLAG = 0x10;
// Javascript functions. It is important to use these copies, as the ones on
// the global object may have been overwritten. See "V8 Extras Design Doc",
@@ -59,6 +61,7 @@
const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty);
const Function_apply = v8.uncurryThis(global.Function.prototype.apply);
+ const Function_call = v8.uncurryThis(global.Function.prototype.call);
const TypeError = global.TypeError;
const RangeError = global.RangeError;
@@ -77,6 +80,7 @@
const streamErrors = binding.streamErrors;
const errAbortLockedStream = 'Cannot abort a writable stream that is locked to a writer';
const errStreamAborted = 'The stream has been aborted';
+ const errStreamAborting = 'The stream is in the process of being aborted';
const errWriterLockReleasedPrefix = 'This writable stream writer has been released and cannot be ';
const errCloseCloseRequestedStream =
'Cannot close a writable stream that has already been requested to be closed';
@@ -107,40 +111,6 @@
templateErrorCannotActionOnStateStream(action, stateNames[state]));
}
- function setDefaultControllerFlag(controller, flag, value) {
- let flags = controller[_defaultControllerFlags];
- if (value) {
- flags = flags | flag;
- } else {
- flags = flags & ~flag;
- }
- controller[_defaultControllerFlags] = flags;
- }
-
- function getDefaultControllerStartedFlag(controller) {
- return Boolean(controller[_defaultControllerFlags] & FLAG_STARTED);
- }
-
- function setDefaultControllerStartedFlag(controller, value) {
- setDefaultControllerFlag(controller, FLAG_STARTED, value);
- }
-
- function getDefaultControllerWritingFlag(controller) {
- return Boolean(controller[_defaultControllerFlags] & FLAG_WRITING);
- }
-
- function setDefaultControllerWritingFlag(controller, value) {
- setDefaultControllerFlag(controller, FLAG_WRITING, value);
- }
-
- function getDefaultControllerInCloseFlag(controller) {
- return Boolean(controller[_defaultControllerFlags] & FLAG_INCLOSE);
- }
-
- function setDefaultControllerInCloseFlag(controller, value) {
- setDefaultControllerFlag(controller, FLAG_INCLOSE, value);
- }
-
function rejectPromises(queue, e) {
queue.forEach(promise => v8.rejectPromise(promise, e));
}
@@ -168,12 +138,13 @@
class WritableStream {
constructor(underlyingSink = {}, { size, highWaterMark = 1 } = {}) {
- this[_state] = WRITABLE;
+ this[_stateAndFlags] = WRITABLE;
this[_storedError] = undefined;
this[_writer] = undefined;
this[_writableStreamController] = undefined;
- this[_pendingWriteRequest] = undefined;
- this[_pendingCloseRequest] = undefined;
+ this[_inFlightWriteRequest] = undefined;
+ this[_closeRequest] = undefined;
+ this[_inFlightCloseRequest] = undefined;
this[_pendingAbortRequest] = undefined;
this[_writeRequests] = new binding.SimpleQueue();
const type = underlyingSink.type;
@@ -183,6 +154,7 @@
this[_writableStreamController] =
new WritableStreamDefaultController(this, underlyingSink, size,
highWaterMark);
+ WritableStreamDefaultControllerStartSteps(this[_writableStreamController]);
}
get locked() {
@@ -227,36 +199,55 @@
}
function WritableStreamAbort(stream, reason) {
- const state = stream[_state];
+ const state = stream[_stateAndFlags] & STATE_MASK;
if (state === CLOSED) {
return Promise_resolve(undefined);
}
if (state === ERRORED) {
return Promise_reject(stream[_storedError]);
}
- TEMP_ASSERT(state === WRITABLE || state === CLOSING,
- 'state is "writable" or "closing".');
- const error = new TypeError(errStreamAborted);
- WritableStreamError(stream, error);
+ TEMP_ASSERT(state === WRITABLE,
+ 'state is "writable".');
+ const error = new TypeError(errStreamAborting);
+ if (stream[_pendingAbortRequest] !== undefined) {
+ return Promise_reject(error);
+ }
const controller = stream[_writableStreamController];
TEMP_ASSERT(controller !== undefined,
'controller is not undefined');
+ if (!WritableStreamHasOperationMarkedInFlight(stream) &&
+ controller[_started]) {
+ WritableStreamFinishAbort(stream);
+ return WritableStreamDefaultControllerAbortSteps(controller, reason);
+ }
+ const writer = stream[_writer];
+ if (writer !== undefined) {
+ WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, error);
+ }
+ const promise = v8.createPromise();
+ stream[_pendingAbortRequest] = {promise, reason};
+ return promise;
+ }
- const isWriting = getDefaultControllerWritingFlag(controller);
- if (isWriting || getDefaultControllerInCloseFlag(controller)) {
- const promise = v8.createPromise();
- stream[_pendingAbortRequest] = promise;
-
- if (isWriting) {
- return thenPromise(promise, () => {
- return WritableStreamDefaultControllerAbort(controller, reason);
- });
+ function WritableStreamError(stream, error) {
+ stream[_stateAndFlags] = (stream[_stateAndFlags] & ~STATE_MASK) | ERRORED;
+ stream[_storedError] = error;
+ WritableStreamDefaultControllerErrorSteps(stream[_writableStreamController]);
+ if (stream[_pendingAbortRequest] === undefined) {
+ const writer = stream[_writer];
+ if (writer !== undefined) {
+ WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, error);
}
- return promise;
}
+ if (!WritableStreamHasOperationMarkedInFlight(stream)) {
+ WritableStreamRejectPromisesInReactionToError(stream);
+ }
+ }
- return WritableStreamDefaultControllerAbort(controller, reason);
+ function WritableStreamFinishAbort(stream) {
+ const error = new TypeError(errStreamAborted);
+ WritableStreamError(stream, error);
}
// Writable Stream Abstract Operations Used by Controllers
@@ -264,100 +255,182 @@
function WritableStreamAddWriteRequest(stream) {
TEMP_ASSERT(IsWritableStreamLocked(stream),
'! IsWritableStreamLocked(writer) is true.');
- TEMP_ASSERT(stream[_state] === WRITABLE,
+ TEMP_ASSERT((stream[_stateAndFlags] & STATE_MASK) === WRITABLE,
'stream.[[state]] is "writable".');
const promise = v8.createPromise();
stream[_writeRequests].push(promise);
return promise;
}
- function WritableStreamError(stream, e) {
- const oldState = stream[_state];
- TEMP_ASSERT(oldState === WRITABLE || oldState === CLOSING,
- 'oldState is "writable" or "closing".');
+ function WritableStreamFinishInFlightWrite(stream) {
+ TEMP_ASSERT(stream[_inFlightWriteRequest] !== undefined,
+ '_stream_.[[inFlightWriteRequest]] is not *undefined*.');
+ v8.resolvePromise(stream[_inFlightWriteRequest], undefined);
+ stream[_inFlightWriteRequest] = undefined;
+ const state = stream[_stateAndFlags] & STATE_MASK;
+ if (state === ERRORED) {
+ WritableStreamFinishInFlightWriteInErroredState(stream);
+ return;
+ }
+ TEMP_ASSERT(state === WRITABLE, '_state_ is `"writable"`.');
+ WritableStreamHandleAbortRequestIfPending(stream);
+ }
- stream[_state] = ERRORED;
- stream[_storedError] = e;
+ function WritableStreamFinishInFlightWriteInErroredState(stream) {
+ WritableStreamRejectAbortRequestIfPending(stream);
+ WritableStreamRejectPromisesInReactionToError(stream);
+ }
- const controller = stream[_writableStreamController];
- if (controller === undefined ||
- (!getDefaultControllerWritingFlag(controller) &&
- !getDefaultControllerInCloseFlag(controller))) {
- WritableStreamRejectPromisesInReactionToError(stream);
+ function WritableStreamFinishInFlightWriteWithError(stream, error) {
+ TEMP_ASSERT(stream[_inFlightWriteRequest] !== undefined,
+ '_stream_.[[inFlightWriteRequest]] is not *undefined*.');
+ v8.rejectPromise(stream[_inFlightWriteRequest], error);
+ stream[_inFlightWriteRequest] = undefined;
+ const state = stream[_stateAndFlags] & STATE_MASK;
+ if (state === ERRORED) {
+ WritableStreamFinishInFlightWriteInErroredState(stream);
+ return;
}
+ TEMP_ASSERT(state === WRITABLE, '_state_ is `"writable"`.');
+ WritableStreamError(stream, error);
+ WritableStreamRejectAbortRequestIfPending(stream);
+ }
+ function WritableStreamFinishInFlightClose(stream) {
+ TEMP_ASSERT(stream[_inFlightCloseRequest] !== undefined,
+ '_stream_.[[inFlightCloseRequest]] is not *undefined*.');
+ v8.resolvePromise(stream[_inFlightCloseRequest], undefined);
+ stream[_inFlightCloseRequest] = undefined;
+ const state = stream[_stateAndFlags] & STATE_MASK;
+ if (state === ERRORED) {
+ WritableStreamFinishInFlightCloseInErroredState(stream);
+ return;
+ }
+ TEMP_ASSERT(state === WRITABLE, '_state_ is `"writable"`.');
+ stream[_stateAndFlags] = (stream[_stateAndFlags] & ~STATE_MASK) | CLOSED;
const writer = stream[_writer];
if (writer !== undefined) {
- if (oldState === WRITABLE &&
- WritableStreamDefaultControllerGetBackpressure(controller) === true) {
- v8.rejectPromise(writer[_readyPromise], e);
- } else {
- writer[_readyPromise] = Promise_reject(e);
- }
- v8.markPromiseAsHandled(writer[_readyPromise]);
+ v8.resolvePromise(writer[_closedPromise], undefined);
}
+ if (stream[_pendingAbortRequest] !== undefined) {
+ v8.resolvePromise(stream[_pendingAbortRequest].promise, undefined);
+ stream[_pendingAbortRequest] = undefined;
+ }
+ }
+
+ function WritableStreamFinishInFlightCloseInErroredState(stream) {
+ WritableStreamRejectAbortRequestIfPending(stream);
+ WritableStreamRejectClosedPromiseInReactionToError(stream);
+ }
+
+ function WritableStreamFinishInFlightCloseWithError(stream, error) {
+ TEMP_ASSERT(stream[_inFlightCloseRequest] !== undefined,
+ '_stream_.[[inFlightCloseRequest]] is not *undefined*.');
+ v8.rejectPromise(stream[_inFlightCloseRequest], error);
+ stream[_inFlightCloseRequest] = undefined;
+ const state = stream[_stateAndFlags] & STATE_MASK;
+ if (state === ERRORED) {
+ WritableStreamFinishInFlightCloseInErroredState(stream);
+ return;
+ }
+ TEMP_ASSERT(state === WRITABLE, '_state_ is `"writable"`.');
+ WritableStreamError(stream, error);
+ WritableStreamRejectAbortRequestIfPending(stream);
+ }
+
+ function WritableStreamCloseQueuedOrInFlight(stream) {
+ return stream[_closeRequest] !== undefined ||
+ stream[_inFlightCloseRequest] !== undefined;
+ }
+
+ function WritableStreamHandleAbortRequestIfPending(stream) {
+ if (stream[_pendingAbortRequest] === undefined) {
+ return;
+ }
+ WritableStreamFinishAbort(stream);
+ const abortRequest = stream[_pendingAbortRequest];
+ stream[_pendingAbortRequest] = undefined;
+ const promise =
+ WritableStreamDefaultControllerAbortSteps(stream[_writableStreamController],
+ abortRequest.reason);
+ thenPromise(promise,
+ result => v8.resolvePromise(abortRequest.promise, result),
+ reason => v8.rejectPromise(abortRequest.promise, reason));
+ }
+
+ function WritableStreamHasOperationMarkedInFlight(stream) {
+ return stream[_inFlightWriteRequest] !== undefined ||
+ stream[_inFlightCloseRequest] !== undefined;
+ }
+
+ function WritableStreamMarkCloseRequestInFlight(stream) {
+ TEMP_ASSERT(stream[_inFlightCloseRequest] === undefined,
+ '_stream_.[[inFlightCloseRequest]] is *undefined*.');
+ TEMP_ASSERT(stream[_closeRequest] !== undefined,
+ '_stream_.[[closeRequest]] is not *undefined*.');
+ stream[_inFlightCloseRequest] = stream[_closeRequest];
+ stream[_closeRequest] = undefined;
}
- function WritableStreamFinishClose(stream) {
- const state = stream[_state];
- TEMP_ASSERT(state === CLOSING || state === ERRORED,
- 'state is "closing" or "errored"');
+ function WritableStreamMarkFirstWriteRequestInFlight(stream) {
+ TEMP_ASSERT(stream[_inFlightWriteRequest] === undefined,
+ '_stream_.[[inFlightWriteRequest]] is *undefined*.');
+ TEMP_ASSERT(stream[_writeRequests].length !== 0,
+ '_stream_.[[writeRequests]] is not empty.');
+ const writeRequest = stream[_writeRequests].shift();
+ stream[_inFlightWriteRequest] = writeRequest;
+ }
+ function WritableStreamRejectClosedPromiseInReactionToError(stream) {
const writer = stream[_writer];
- if (state === CLOSING) {
- if (writer !== undefined) {
- v8.resolvePromise(writer[_closedPromise], undefined);
- }
- stream[_state] = CLOSED;
- } else if (writer !== undefined) {
- TEMP_ASSERT(state === ERRORED, 'state is "errored"');
+ if (writer !== undefined) {
v8.rejectPromise(writer[_closedPromise], stream[_storedError]);
v8.markPromiseAsHandled(writer[_closedPromise]);
}
+ }
+ function WritableStreamRejectAbortRequestIfPending(stream) {
if (stream[_pendingAbortRequest] !== undefined) {
- v8.resolvePromise(stream[_pendingAbortRequest], undefined);
+ v8.rejectPromise(stream[_pendingAbortRequest].promise,
+ stream[_storedError]);
stream[_pendingAbortRequest] = undefined;
}
}
function WritableStreamRejectPromisesInReactionToError(stream) {
- TEMP_ASSERT(stream[_state] === ERRORED, 'stream.[[state]] is "errored"');
- TEMP_ASSERT(stream[_pendingWriteRequest] === undefined,
- 'stream.[[pendingWriteRequest]] is undefined');
-
const storedError = stream[_storedError];
rejectPromises(stream[_writeRequests], storedError);
stream[_writeRequests] = new binding.SimpleQueue();
- if (stream[_pendingCloseRequest] !== undefined) {
- TEMP_ASSERT(
- getDefaultControllerInCloseFlag(stream[_writableStreamController]) ===
- false, 'stream.[[writableStreamController]].[[inClose]] === false');
- v8.rejectPromise(stream[_pendingCloseRequest], storedError);
- stream[_pendingCloseRequest] = undefined;
+ if (stream[_closeRequest] !== undefined) {
+ TEMP_ASSERT(stream[_inFlightCloseRequest] === undefined,
+ '_stream_.[[inFlightCloseRequest]] is *undefined*.');
+ v8.rejectPromise(stream[_closeRequest], storedError);
+ stream[_closeRequest] = undefined;
}
- const writer = stream[_writer];
- if (writer !== undefined) {
- v8.rejectPromise(writer[_closedPromise], storedError);
- v8.markPromiseAsHandled(writer[_closedPromise]);
- }
+ WritableStreamRejectClosedPromiseInReactionToError(stream);
}
function WritableStreamUpdateBackpressure(stream, backpressure) {
- TEMP_ASSERT(stream[_state] === WRITABLE,
+ TEMP_ASSERT((stream[_stateAndFlags] & STATE_MASK) === WRITABLE,
'stream.[[state]] is "writable".');
+ TEMP_ASSERT(!WritableStreamCloseQueuedOrInFlight(stream),
+ 'WritableStreamCloseQueuedOrInFlight(_stream_) is *false*.');
const writer = stream[_writer];
- if (writer === undefined) {
- return;
+ if (writer !== undefined &&
+ backpressure !== Boolean(stream[_stateAndFlags] & BACKPRESSURE_FLAG)) {
+ if (backpressure) {
+ writer[_readyPromise] = v8.createPromise();
+ } else {
+ TEMP_ASSERT(!backpressure, '_backpressure_ is *false*.');
+ v8.resolvePromise(writer[_readyPromise], undefined);
+ }
}
if (backpressure) {
- writer[_readyPromise] = v8.createPromise();
+ stream[_stateAndFlags] |= BACKPRESSURE_FLAG;
} else {
- TEMP_ASSERT(backpressure === false,
- 'backpressure is false.');
- v8.resolvePromise(writer[_readyPromise], undefined);
+ stream[_stateAndFlags] &= ~BACKPRESSURE_FLAG;
}
}
@@ -366,13 +439,14 @@
function isWritableStreamErrored(stream) {
TEMP_ASSERT(
IsWritableStream(stream), '! IsWritableStream(stream) is true.');
- return stream[_state] === ERRORED;
+ return (stream[_stateAndFlags] & STATE_MASK) === ERRORED;
}
function isWritableStreamClosingOrClosed(stream) {
TEMP_ASSERT(
IsWritableStream(stream), '! IsWritableStream(stream) is true.');
- return stream[_state] === CLOSING || stream[_state] === CLOSED;
+ return WritableStreamCloseQueuedOrInFlight(stream) ||
+ (stream[_stateAndFlags] & STATE_MASK) === CLOSED;
}
function getWritableStreamStoredError(stream) {
@@ -391,24 +465,30 @@
}
this[_ownerWritableStream] = stream;
stream[_writer] = this;
- const state = stream[_state];
- if (state === WRITABLE || state === CLOSING) {
+ const state = stream[_stateAndFlags] & STATE_MASK;
+ if (state === WRITABLE) {
+ if (stream[_pendingAbortRequest] !== undefined) {
+ const error = new TypeError(errStreamAborting);
+ this[_readyPromise] = Promise_reject(error);
+ v8.markPromiseAsHandled(this[_readyPromise]);
+ } else if (!WritableStreamCloseQueuedOrInFlight(stream) &&
+ stream[_stateAndFlags] & BACKPRESSURE_FLAG) {
+ this[_readyPromise] = v8.createPromise();
+ } else {
+ this[_readyPromise] = Promise_resolve(undefined);
+ }
this[_closedPromise] = v8.createPromise();
} else if (state === CLOSED) {
+ this[_readyPromise] = Promise_resolve(undefined);
this[_closedPromise] = Promise_resolve(undefined);
} else {
- TEMP_ASSERT(state === ERRORED,
- 'state is "errored".');
- this[_closedPromise] = Promise_reject(stream[_storedError]);
+ TEMP_ASSERT(state === ERRORED, '_state_ is `"errored"`.');
+ const storedError = stream[_storedError];
+ this[_readyPromise] = Promise_reject(storedError);
+ v8.markPromiseAsHandled(this[_readyPromise]);
+ this[_closedPromise] = Promise_reject(storedError);
v8.markPromiseAsHandled(this[_closedPromise]);
}
- if (state === WRITABLE &&
- WritableStreamDefaultControllerGetBackpressure(
- stream[_writableStreamController])) {
- this[_readyPromise] = v8.createPromise();
- } else {
- this[_readyPromise] = Promise_resolve(undefined);
- }
}
get closed() {
@@ -453,7 +533,7 @@
if (stream === undefined) {
return Promise_reject(createWriterLockReleasedError(verbClosed));
}
- if (stream[_state] === CLOSING) {
+ if (WritableStreamCloseQueuedOrInFlight(stream)) {
return Promise_reject(new TypeError(errCloseCloseRequestedStream));
}
return WritableStreamDefaultWriterClose(this);
@@ -476,13 +556,9 @@
if (!IsWritableStreamDefaultWriter(this)) {
return Promise_reject(new TypeError(streamErrors.illegalInvocation));
}
- const stream = this[_ownerWritableStream];
- if (stream === undefined) {
+ if (this[_ownerWritableStream] === undefined) {
return Promise_reject(createWriterLockReleasedError(verbWrittenTo));
}
- if (stream[_state] === CLOSING) {
- return Promise_reject(new TypeError(errWriteCloseRequestedStream));
- }
return WritableStreamDefaultWriterWrite(this, chunk);
}
}
@@ -502,30 +578,32 @@
function WritableStreamDefaultWriterClose(writer) {
const stream = writer[_ownerWritableStream];
- TEMP_ASSERT(stream !== undefined,
- 'stream is not undefined.');
- const state = stream[_state];
+ TEMP_ASSERT(stream !== undefined, 'stream is not undefined.');
+ const state = stream[_stateAndFlags] & STATE_MASK;
if (state === CLOSED || state === ERRORED) {
return Promise_reject(
createCannotActionOnStateStreamError('close', state));
}
- TEMP_ASSERT(state === WRITABLE,
- 'state is "writable".');
- stream[_pendingCloseRequest] = v8.createPromise();
- if (WritableStreamDefaultControllerGetBackpressure(
- stream[_writableStreamController])) {
+ if (stream[_pendingAbortRequest] !== undefined) {
+ return Promise_reject(new TypeError(errStreamAborting));
+ }
+ TEMP_ASSERT(state === WRITABLE, '_state_ is `"writable"`.');
+ TEMP_ASSERT(!WritableStreamCloseQueuedOrInFlight(stream),
+ '! WritableStreamCloseQueuedOrInFlight(_stream_) is *false*.');
+ const promise = v8.createPromise();
+ stream[_closeRequest] = promise;
+ if (stream[_stateAndFlags] & BACKPRESSURE_FLAG) {
v8.resolvePromise(writer[_readyPromise], undefined);
}
- stream[_state] = CLOSING;
WritableStreamDefaultControllerClose(stream[_writableStreamController]);
- return stream[_pendingCloseRequest];
+ return promise;
}
function WritableStreamDefaultWriterCloseWithErrorPropagation(writer) {
const stream = writer[_ownerWritableStream];
TEMP_ASSERT(stream !== undefined, 'stream is not undefined.');
- const state = stream[_state];
- if (state === CLOSING || state === CLOSED) {
+ const state = stream[_stateAndFlags] & STATE_MASK;
+ if (WritableStreamCloseQueuedOrInFlight(stream) || state === CLOSED) {
return Promise_resolve(undefined);
}
if (state === ERRORED) {
@@ -535,10 +613,20 @@
return WritableStreamDefaultWriterClose(writer);
}
+ function WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer,
+ error) {
+ if (v8.promiseState(writer[_readyPromise]) === v8.kPROMISE_PENDING) {
+ v8.rejectPromise(writer[_readyPromise], error);
+ } else {
+ writer[_readyPromise] = Promise_reject(error);
+ }
+ v8.markPromiseAsHandled(writer[_readyPromise]);
+ }
+
function WritableStreamDefaultWriterGetDesiredSize(writer) {
const stream = writer[_ownerWritableStream];
- const state = stream[_state];
- if (state === ERRORED) {
+ const state = stream[_stateAndFlags] & STATE_MASK;
+ if (state === ERRORED || stream[_pendingAbortRequest] !== undefined) {
return null;
}
if (state === CLOSED) {
@@ -555,42 +643,46 @@
TEMP_ASSERT(stream[_writer] === writer,
'stream.[[writer]] is writer.');
const releasedError = new TypeError(errReleasedWriterClosedPromise);
- const state = stream[_state];
- if (state === WRITABLE || state === CLOSING ||
- stream[_pendingAbortRequest] !== undefined) {
+ const state = stream[_stateAndFlags] & STATE_MASK;
+ WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer,
+ releasedError);
+ if (state === WRITABLE ||
+ WritableStreamHasOperationMarkedInFlight(stream)) {
v8.rejectPromise(writer[_closedPromise], releasedError);
} else {
writer[_closedPromise] = Promise_reject(releasedError);
}
v8.markPromiseAsHandled(writer[_closedPromise]);
-
- if (state === WRITABLE &&
- WritableStreamDefaultControllerGetBackpressure(
- stream[_writableStreamController])) {
- v8.rejectPromise(writer[_readyPromise], releasedError);
- } else {
- writer[_readyPromise] = Promise_reject(releasedError);
- }
- v8.markPromiseAsHandled(writer[_readyPromise]);
-
stream[_writer] = undefined;
writer[_ownerWritableStream] = undefined;
}
function WritableStreamDefaultWriterWrite(writer, chunk) {
const stream = writer[_ownerWritableStream];
- TEMP_ASSERT(stream !== undefined,
- 'stream is not undefined.');
- const state = stream[_state];
- if (state === CLOSED || state === ERRORED) {
+ TEMP_ASSERT(stream !== undefined, 'stream is not undefined.');
+ const controller = stream[_writableStreamController];
+ const chunkSize =
+ WritableStreamDefaultControllerGetChunkSize(controller, chunk);
+ if (stream !== writer[_ownerWritableStream]) {
+ return Promise_reject(createWriterLockReleasedError(verbWrittenTo));
+ }
+ const state = stream[_stateAndFlags] & STATE_MASK;
+ if (state === ERRORED) {
+ return Promise_reject(stream[_storedError]);
+ }
+ if (WritableStreamCloseQueuedOrInFlight(stream)) {
+ return Promise_reject(new TypeError(
+ templateErrorCannotActionOnStateStream('write to', 'closing')));
+ }
+ if (state === CLOSED) {
return Promise_reject(
- createCannotActionOnStateStreamError('write to', state));
+ createCannotActionOnStateStreamError('write to', CLOSED));
+ }
+ if (stream[_pendingAbortRequest] !== undefined) {
+ return Promise_reject(new TypeError(errStreamAborting));
}
- TEMP_ASSERT(state === WRITABLE,
- 'state is "writable".');
const promise = WritableStreamAddWriteRequest(stream);
- WritableStreamDefaultControllerWrite(stream[_writableStreamController],
- chunk);
+ WritableStreamDefaultControllerWrite(controller, chunk, chunkSize);
return promise;
}
@@ -620,34 +712,25 @@
}
this[_controlledWritableStream] = stream;
this[_underlyingSink] = underlyingSink;
- this[_queue] = new binding.SimpleQueue();
- this[_queueSize] = 0;
- this[_defaultControllerFlags] = 0;
+ // These are just initialised to avoid triggering the assert() in
+ // ResetQueue. They are overwritten by ResetQueue().
+ this[_queue] = undefined;
+ this[_queueTotalSize] = undefined;
+ ResetQueue(this);
+ this[_started] = false;
const normalizedStrategy =
ValidateAndNormalizeQueuingStrategy(size, highWaterMark);
this[_strategySize] = normalizedStrategy.size;
this[_strategyHWM] = normalizedStrategy.highWaterMark;
const backpressure = WritableStreamDefaultControllerGetBackpressure(this);
- if (backpressure) {
- WritableStreamUpdateBackpressure(stream, backpressure);
- }
- const controller = this;
- const startResult = InvokeOrNoop(underlyingSink, 'start', [this]);
- const onFulfilled = () => {
- setDefaultControllerStartedFlag(controller, true);
- WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
- };
- const onRejected = r => {
- WritableStreamDefaultControllerErrorIfNeeded(controller, r);
- };
- thenPromise(Promise_resolve(startResult), onFulfilled, onRejected);
+ WritableStreamUpdateBackpressure(stream, backpressure);
}
error(e) {
if (!IsWritableStreamDefaultController(this)) {
throw new TypeError(streamErrors.illegalInvocation);
}
- const state = this[_controlledWritableStream][_state];
+ const state = this[_controlledWritableStream][_stateAndFlags] & STATE_MASK;
if (state === CLOSED || state === ERRORED) {
throw createCannotActionOnStateStreamError('error', state);
}
@@ -655,78 +738,110 @@
}
}
- // Writable Stream Default Controller Abstract Operations
+ // Writable Stream Default Controller Internal Methods
- function IsWritableStreamDefaultController(x) {
- return hasOwnProperty(x, _underlyingSink);
- }
-
- function WritableStreamDefaultControllerAbort(controller, reason) {
- controller[_queue] = new binding.SimpleQueue();
- controller[_queueSize] = 0;
+ // TODO(ricea): Virtual dispatch via V8 Private Symbols seems to be difficult
+ // or impossible, so use static dispatch for now. This will have to be fixed
+ // when adding a byte controller.
+ function WritableStreamDefaultControllerAbortSteps(controller, reason) {
const sinkAbortPromise =
PromiseInvokeOrNoop(controller[_underlyingSink], 'abort', [reason]);
return thenPromise(sinkAbortPromise, () => undefined);
}
+ function WritableStreamDefaultControllerErrorSteps(controller) {
+ ResetQueue(controller);
+ }
+
+ function WritableStreamDefaultControllerStartSteps(controller) {
+ const startResult =
+ InvokeOrNoop(controller[_underlyingSink], 'start', [controller]);
+ const stream = controller[_controlledWritableStream];
+ const startPromise = Promise_resolve(startResult);
+ thenPromise(
+ startPromise,
+ () => {
+ controller[_started] = true;
+ if ((stream[_stateAndFlags] & STATE_MASK) === ERRORED) {
+ WritableStreamRejectAbortRequestIfPending(stream);
+ } else {
+ WritableStreamHandleAbortRequestIfPending(stream);
+ }
+ WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
+ },
+ r => {
+ TEMP_ASSERT(
+ (stream[_stateAndFlags] & STATE_MASK) === WRITABLE ||
+ (stream[_stateAndFlags] & STATE_MASK) === ERRORED,
+ '_stream_.[[state]] is `"writable"` or `"errored"`.');
+ WritableStreamDefaultControllerErrorIfNeeded(controller, r);
+ WritableStreamRejectAbortRequestIfPending(stream);
+ });
+ }
+
+ // Writable Stream Default Controller Abstract Operations
+
+ function IsWritableStreamDefaultController(x) {
+ return hasOwnProperty(x, _underlyingSink);
+ }
+
function WritableStreamDefaultControllerClose(controller) {
- EnqueueValueWithSizeForController(controller, 'close', 0);
+ EnqueueValueWithSize(controller, 'close', 0);
WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
}
+ function WritableStreamDefaultControllerGetChunkSize(controller, chunk) {
+ const strategySize = controller[_strategySize];
+ if (strategySize === undefined) {
+ return 1;
+ }
+ let value;
+ try {
+ value = Function_call(strategySize, undefined, chunk);
+ } catch (e) {
+ WritableStreamDefaultControllerErrorIfNeeded(controller, e);
+ return 1;
+ }
+ return value;
+ }
+
function WritableStreamDefaultControllerGetDesiredSize(controller) {
- const queueSize = GetTotalQueueSizeForController(controller);
- return controller[_strategyHWM] - queueSize;
+ return controller[_strategyHWM] - controller[_queueTotalSize];
}
- function WritableStreamDefaultControllerWrite(controller, chunk) {
- const stream = controller[_controlledWritableStream];
- TEMP_ASSERT(stream[_state] === WRITABLE,
- 'stream.[[state]] is "writable".');
- let chunkSize = 1;
- const strategySize = controller[_strategySize];
- if (strategySize !== undefined) {
- try {
- chunkSize = strategySize(chunk);
- } catch (e) {
- WritableStreamDefaultControllerErrorIfNeeded(controller, e);
- return;
- }
- }
+ function WritableStreamDefaultControllerWrite(controller, chunk, chunkSize) {
const writeRecord = {chunk};
- const lastBackpressure =
- WritableStreamDefaultControllerGetBackpressure(controller);
try {
- EnqueueValueWithSizeForController(controller, writeRecord, chunkSize);
+ EnqueueValueWithSize(controller, writeRecord, chunkSize);
} catch (e) {
WritableStreamDefaultControllerErrorIfNeeded(controller, e);
return;
}
- if (stream[_state] === WRITABLE) {
+ const stream = controller[_controlledWritableStream];
+ if (!WritableStreamCloseQueuedOrInFlight(stream)) {
const backpressure =
WritableStreamDefaultControllerGetBackpressure(controller);
- if (lastBackpressure !== backpressure) {
- WritableStreamUpdateBackpressure(stream, backpressure);
- }
+ WritableStreamUpdateBackpressure(stream, backpressure);
}
WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
}
function WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller) {
- const state = controller[_controlledWritableStream][_state];
+ const stream = controller[_controlledWritableStream];
+ const state = stream[_stateAndFlags] & STATE_MASK;
if (state === CLOSED || state === ERRORED) {
return;
}
- if (!getDefaultControllerStartedFlag(controller)) {
+ if (!controller[_started]) {
return;
}
- if (getDefaultControllerWritingFlag(controller)) {
+ if (stream[_inFlightWriteRequest] !== undefined) {
return;
}
if (controller[_queue].length === 0) {
return;
}
- const writeRecord = PeekQueueValue(controller[_queue]);
+ const writeRecord = PeekQueueValue(controller);
if (writeRecord === 'close') {
WritableStreamDefaultControllerProcessClose(controller);
} else {
@@ -735,135 +850,59 @@
}
}
- function WritableStreamDefaultControllerErrorIfNeeded(controller, e) {
- const state = controller[_controlledWritableStream][_state];
- if (state === WRITABLE || state === CLOSING) {
- WritableStreamDefaultControllerError(controller, e);
+ function WritableStreamDefaultControllerErrorIfNeeded(controller, error) {
+ const state = controller[_controlledWritableStream][_stateAndFlags] & STATE_MASK;
+ if (state === WRITABLE) {
+ WritableStreamDefaultControllerError(controller, error);
}
}
function WritableStreamDefaultControllerProcessClose(controller) {
const stream = controller[_controlledWritableStream];
- TEMP_ASSERT(stream[_state] === CLOSING,
- 'stream.[[state]] is "closing".');
- DequeueValueForController(controller);
+ WritableStreamMarkCloseRequestInFlight(stream);
+ DequeueValue(controller);
TEMP_ASSERT(controller[_queue].length === 0,
'controller.[[queue]] is empty.');
- setDefaultControllerInCloseFlag(controller, true);
const sinkClosePromise = PromiseInvokeOrNoop(controller[_underlyingSink],
'close', [controller]);
- thenPromise(sinkClosePromise,
- () => {
- TEMP_ASSERT(getDefaultControllerInCloseFlag(controller)
- === true,
- 'controller.[[inClose]] is true');
- setDefaultControllerInCloseFlag(controller, false);
-
- if (stream[_state] !== CLOSING &&
- stream[_state] !== ERRORED) {
- return;
- }
-
- TEMP_ASSERT(stream[_pendingCloseRequest] !== undefined);
- v8.resolvePromise(stream[_pendingCloseRequest], undefined);
- stream[_pendingCloseRequest] = undefined;
-
- WritableStreamFinishClose(stream);
- },
- r => {
- TEMP_ASSERT(getDefaultControllerInCloseFlag(controller)
- === true,
- 'controller.[[inClose]] is true');
- setDefaultControllerInCloseFlag(controller, false);
-
- TEMP_ASSERT(stream[_pendingCloseRequest] !== undefined);
- v8.rejectPromise(stream[_pendingCloseRequest], r);
- stream[_pendingCloseRequest] = undefined;
-
- if (stream[_pendingAbortRequest] !== undefined) {
- v8.rejectPromise(stream[_pendingAbortRequest], r);
- stream[_pendingAbortRequest] = undefined;
- }
-
- WritableStreamDefaultControllerErrorIfNeeded(controller, r);
- }
- );
+ thenPromise(
+ sinkClosePromise,
+ () => WritableStreamFinishInFlightClose(stream),
+ reason => WritableStreamFinishInFlightCloseWithError(stream, reason)
+ );
}
function WritableStreamDefaultControllerProcessWrite(controller, chunk) {
- setDefaultControllerWritingFlag(controller, true);
-
const stream = controller[_controlledWritableStream];
-
- TEMP_ASSERT(stream[_pendingWriteRequest] === undefined,
- 'stream.[[pendingWriteRequest]] is undefined');
- TEMP_ASSERT(stream[_writeRequests].length > 0,
- 'stream.[[writeRequests]] is not empty');
- stream[_pendingWriteRequest] = stream[_writeRequests].shift();
-
+ WritableStreamMarkFirstWriteRequestInFlight(stream);
const sinkWritePromise = PromiseInvokeOrNoop(controller[_underlyingSink],
'write', [chunk, controller]);
thenPromise(
sinkWritePromise,
() => {
- TEMP_ASSERT(getDefaultControllerWritingFlag(controller) === true,
- 'controller.[[writing]] is true');
- setDefaultControllerWritingFlag(controller, false);
-
- TEMP_ASSERT(stream[_pendingWriteRequest] !== undefined,
- 'stream.[[pendingWriteRequest]] is not undefined');
- v8.resolvePromise(stream[_pendingWriteRequest], undefined);
- stream[_pendingWriteRequest] = undefined;
-
- const state = stream[_state];
+ WritableStreamFinishInFlightWrite(stream);
+ const state = stream[_stateAndFlags] & STATE_MASK;
if (state === ERRORED) {
- WritableStreamRejectPromisesInReactionToError(stream);
-
- if (stream[_pendingAbortRequest] !== undefined) {
- v8.resolvePromise(stream[_pendingAbortRequest], undefined);
- stream[_pendingAbortRequest] = undefined;
- }
return;
}
-
- const lastBackpressure =
- WritableStreamDefaultControllerGetBackpressure(controller);
- DequeueValueForController(controller);
-
- if (state !== CLOSING) {
+ TEMP_ASSERT(state === WRITABLE, '_state_ is `"writable"`.');
+ DequeueValue(controller);
+ if (!WritableStreamCloseQueuedOrInFlight(stream)) {
const backpressure =
WritableStreamDefaultControllerGetBackpressure(controller);
- if (lastBackpressure !== backpressure) {
- WritableStreamUpdateBackpressure(
- controller[_controlledWritableStream], backpressure);
- }
+ WritableStreamUpdateBackpressure(stream, backpressure);
}
-
WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
},
- r => {
- TEMP_ASSERT(getDefaultControllerWritingFlag(controller) === true,
- 'controller.[[writing]] is true');
- setDefaultControllerWritingFlag(controller, false);
-
- TEMP_ASSERT(stream[_pendingWriteRequest] !== undefined,
- 'stream.[[pendingWriteRequest]] is not undefined');
- v8.rejectPromise(stream[_pendingWriteRequest], r);
- stream[_pendingWriteRequest] = undefined;
-
- if (stream[_state] === ERRORED) {
- stream[_storedError] = r;
- WritableStreamRejectPromisesInReactionToError(stream);
+ reason => {
+ const wasErrored = (stream[_stateAndFlags] & STATE_MASK) === ERRORED;
+ WritableStreamFinishInFlightWriteWithError(stream, reason);
+ TEMP_ASSERT((stream[_stateAndFlags] & STATE_MASK) === ERRORED,
+ '_stream_.[[state]] is `"errored"`.');
+ if (!wasErrored) {
+ ResetQueue(controller);
tyoshino (SeeGerritForStatus) 2017/04/12 04:21:13 In the spec, only [[queue]] is cleared here. Was t
Adam Rice 2017/04/13 05:54:40 I think I intended to update the spec, but it look
tyoshino (SeeGerritForStatus) 2017/04/13 07:38:07 OK. Thanks
}
-
- if (stream[_pendingAbortRequest] !== undefined) {
- v8.rejectPromise(stream[_pendingAbortRequest], r);
- stream[_pendingAbortRequest] = undefined;
- }
-
- WritableStreamDefaultControllerErrorIfNeeded(controller, r);
- }
- );
+ });
}
function WritableStreamDefaultControllerGetBackpressure(controller) {
@@ -872,45 +911,67 @@
return desiredSize <= 0;
}
- function WritableStreamDefaultControllerError(controller, e) {
+ function WritableStreamDefaultControllerError(controller, error) {
const stream = controller[_controlledWritableStream];
- const state = stream[_state];
- TEMP_ASSERT(state === WRITABLE || state === CLOSING,
- 'stream.[[state]] is "writable" or "closing".');
- WritableStreamError(stream, e);
- controller[_queue] = new binding.SimpleQueue();
- controller[_queueSize] = 0;
+ TEMP_ASSERT((stream[_stateAndFlags] & STATE_MASK) === WRITABLE,
+ '_stream_.[[state]] is `"writable"`.');
+ WritableStreamError(stream, error);
}
// Queue-with-Sizes Operations
//
// TODO(ricea): Share these operations with ReadableStream.js.
- function DequeueValueForController(controller) {
- TEMP_ASSERT(controller[_queue].length !== 0,
- 'queue is not empty.');
- const result = controller[_queue].shift();
- controller[_queueSize] -= result.size;
- return result.value;
+ function DequeueValue(container) {
+ TEMP_ASSERT(
+ hasOwnProperty(container, _queue) &&
+ hasOwnProperty(container, _queueTotalSize),
+ 'Assert: _container_ has [[queue]] and [[queueTotalSize]] internal ' +
+ 'slots.');
+ TEMP_ASSERT(container[_queue].length !== 0,
+ '_container_.[[queue]] is not empty.');
+ const pair = container[_queue].shift();
+ container[_queueTotalSize] -= pair.size;
+ if (container[_queueTotalSize] < 0) {
+ container[_queueTotalSize] = 0;
+ }
+ return pair.value;
}
- function EnqueueValueWithSizeForController(controller, value, size) {
+ function EnqueueValueWithSize(container, value, size) {
+ TEMP_ASSERT(
+ hasOwnProperty(container, _queue) &&
+ hasOwnProperty(container, _queueTotalSize),
+ 'Assert: _container_ has [[queue]] and [[queueTotalSize]] internal ' +
+ 'slots.');
size = Number(size);
if (!IsFiniteNonNegativeNumber(size)) {
throw new RangeError(streamErrors.invalidSize);
}
- controller[_queueSize] += size;
- controller[_queue].push({value, size});
+ container[_queue].push({value, size});
+ container[_queueTotalSize] += size;
}
- function GetTotalQueueSizeForController(controller) {
- return controller[_queueSize];
+ function PeekQueueValue(container) {
+ TEMP_ASSERT(
+ hasOwnProperty(container, _queue) &&
+ hasOwnProperty(container, _queueTotalSize),
+ 'Assert: _container_ has [[queue]] and [[queueTotalSize]] internal ' +
+ 'slots.');
+ TEMP_ASSERT(container[_queue].length !== 0,
+ '_container_.[[queue]] is not empty.');
+ const pair = container[_queue].peek();
+ return pair.value;
}
- function PeekQueueValue(queue) {
- TEMP_ASSERT(queue.length !== 0,
- 'queue is not empty.');
- return queue.peek().value;
+ function ResetQueue(container) {
+ TEMP_ASSERT(
+ hasOwnProperty(container, _queue) &&
+ hasOwnProperty(container, _queueTotalSize),
+ 'Assert: _container_ has [[queue]] and [[queueTotalSize]] internal ' +
+ 'slots.');
+ container[_queue] = new binding.SimpleQueue();
+ container[_queueTotalSize] = 0;
}
// Miscellaneous Operations
« no previous file with comments | « third_party/WebKit/Source/core/streams/ReadableStream.js ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698