Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(727)

Unified Diff: third_party/WebKit/Source/core/streams/WritableStream.js

Issue 2596883002: Update writable streams for the latest spec changes (Closed)
Patch Set: Adjust indents Created 3 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « third_party/WebKit/LayoutTests/http/tests/streams/writable-streams/general.js ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: third_party/WebKit/Source/core/streams/WritableStream.js
diff --git a/third_party/WebKit/Source/core/streams/WritableStream.js b/third_party/WebKit/Source/core/streams/WritableStream.js
index dcffef65ddf835bf99a0d9b47d8901e9d544265b..e2f88482db6098225beaa3d1fe5b36f4faf66035 100644
--- a/third_party/WebKit/Source/core/streams/WritableStream.js
+++ b/third_party/WebKit/Source/core/streams/WritableStream.js
@@ -13,6 +13,9 @@
// Private symbols. These correspond to the internal slots in the standard.
// "[[X]]" in the standard is spelt _X in this implementation.
+ const _pendingWriteRequest = v8.createPrivateSymbol('[[pendingWriteRequest]]');
+ const _pendingCloseRequest = v8.createPrivateSymbol('[[pendingCloseRequest]]');
+ const _pendingAbortRequest = v8.createPrivateSymbol('[[pendingAbortRequest]]');
const _state = v8.createPrivateSymbol('[[state]]');
const _storedError = v8.createPrivateSymbol('[[storedError]]');
const _writer = v8.createPrivateSymbol('[[writer]]');
@@ -32,11 +35,13 @@
const _underlyingSink = v8.createPrivateSymbol('[[underlyingSink]]');
// _defaultControllerFlags combines WritableStreamDefaultController's internal
- // slots [[started]] and [[writing]] into a single bitmask for efficiency.
+ // slots [[started]], [[writing]], and [[inClose]] into a single bitmask for
+ // efficiency.
const _defaultControllerFlags =
v8.createPrivateSymbol('[[defaultControllerFlags]]');
const FLAG_STARTED = 0b1;
const FLAG_WRITING = 0b10;
+ const FLAG_INCLOSE = 0b100;
// For efficiency, WritableStream [[state]] contains numeric values.
const WRITABLE = 0;
@@ -53,7 +58,6 @@
const defineProperty = global.Object.defineProperty;
const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty);
- const Function_call = v8.uncurryThis(global.Function.prototype.call);
const Function_apply = v8.uncurryThis(global.Function.prototype.apply);
const TypeError = global.TypeError;
@@ -129,6 +133,14 @@
setDefaultControllerFlag(controller, FLAG_WRITING, value);
}
+ function getDefaultControllerInCloseFlag(controller) {
+ return Boolean(controller[_defaultControllerFlags] & FLAG_INCLOSE);
+ }
+
+ function setDefaultControllerInCloseFlag(controller, value) {
+ setDefaultControllerFlag(controller, FLAG_INCLOSE, value);
+ }
+
function rejectPromises(array, e) {
// array is an InternalPackedArray so forEach won't work.
for (let i = 0; i < array.length; ++i) {
@@ -149,9 +161,12 @@
}
v8.log(`Assertion failed: ${message}\n`);
v8.logStackTrace();
- class WritableStreamInternalError {
+ class WritableStreamInternalError extends Error {
+ constructor(message) {
+ super(message);
+ }
}
- throw new WritableStreamInternalError();
+ throw new WritableStreamInternalError(message);
}
class WritableStream {
@@ -160,6 +175,9 @@
this[_storedError] = undefined;
this[_writer] = undefined;
this[_writableStreamController] = undefined;
+ this[_pendingWriteRequest] = undefined;
+ this[_pendingCloseRequest] = undefined;
+ this[_pendingAbortRequest] = undefined;
this[_writeRequests] = new v8.InternalPackedArray();
const type = underlyingSink.type;
if (type !== undefined) {
@@ -223,8 +241,25 @@
'state is "writable" or "closing".');
const error = new TypeError(errStreamAborted);
WritableStreamError(stream, error);
- return WritableStreamDefaultControllerAbort(
- stream[_writableStreamController], reason);
+
+ const controller = stream[_writableStreamController];
+ TEMP_ASSERT(controller !== undefined,
+ 'controller is not undefined');
+
+ const isWriting = getDefaultControllerWritingFlag(controller);
+ if (isWriting || getDefaultControllerInCloseFlag(controller)) {
+ const promise = v8.createPromise();
+ stream[_pendingAbortRequest] = promise;
+
+ if (isWriting) {
+ return thenPromise(promise, () => {
+ return WritableStreamDefaultControllerAbort(controller, reason);
+ });
+ }
+ return promise;
+ }
+
+ return WritableStreamDefaultControllerAbort(controller, reason);
}
// Writable Stream Abstract Operations Used by Controllers
@@ -240,40 +275,74 @@
}
function WritableStreamError(stream, e) {
- const state = stream[_state];
- TEMP_ASSERT(state === WRITABLE || state === CLOSING,
- 'state is "writable" or "closing".');
- rejectPromises(stream[_writeRequests], e);
- stream[_writeRequests] = new v8.InternalPackedArray();
+ const oldState = stream[_state];
+ TEMP_ASSERT(oldState === WRITABLE || oldState === CLOSING,
+ 'oldState is "writable" or "closing".');
+
+ stream[_state] = ERRORED;
+ stream[_storedError] = e;
+
+ const controller = stream[_writableStreamController];
+ if (controller === undefined ||
+ (!getDefaultControllerWritingFlag(controller) &&
+ !getDefaultControllerInCloseFlag(controller))) {
+ WritableStreamRejectPromisesInReactionToError(stream);
+ }
+
const writer = stream[_writer];
if (writer !== undefined) {
- v8.rejectPromise(writer[_closedPromise], e);
- if (state === WRITABLE &&
- WritableStreamDefaultControllerGetBackpressure(
- stream[_writableStreamController])) {
+ if (oldState === WRITABLE &&
+ WritableStreamDefaultControllerGetBackpressure(controller) === true) {
v8.rejectPromise(writer[_readyPromise], e);
} else {
writer[_readyPromise] = Promise_reject(e);
}
+ v8.markPromiseAsHandled(writer[_readyPromise]);
}
- stream[_state] = ERRORED;
- stream[_storedError] = e;
}
function WritableStreamFinishClose(stream) {
- TEMP_ASSERT(stream[_state] === CLOSING,
- 'stream.[[state]] is "closing".');
- TEMP_ASSERT(stream[_writer] !== undefined,
- 'stream.[[writer]] is not undefined.');
- stream[_state] = CLOSED;
- v8.resolvePromise(stream[_writer][_closedPromise], undefined);
+ const state = stream[_state];
+ TEMP_ASSERT(state === CLOSING || state === ERRORED,
+ 'state is "closing" or "errored"');
+
+ if (state === CLOSING) {
+ v8.resolvePromise(stream[_writer][_closedPromise], undefined);
+ stream[_state] = CLOSED;
+ } else {
+ TEMP_ASSERT(state === ERRORED, 'state is "errored"');
+ v8.rejectPromise(stream[_writer][_closedPromise], stream[_storedError]);
+ v8.markPromiseAsHandled(stream[_writer][_closedPromise]);
+ }
+
+ if (stream[_pendingAbortRequest] !== undefined) {
+ v8.resolvePromise(stream[_pendingAbortRequest], undefined);
+ stream[_pendingAbortRequest] = undefined;
+ }
}
- function WritableStreamFulfillWriteRequest(stream) {
- TEMP_ASSERT(stream[_writeRequests].length !== 0,
- 'stream.[[writeRequests]] is not empty.');
- const writeRequest = stream[_writeRequests].shift();
- v8.resolvePromise(writeRequest, undefined);
+ function WritableStreamRejectPromisesInReactionToError(stream) {
+ TEMP_ASSERT(stream[_state] === ERRORED, 'stream.[[state]] is "errored"');
+ TEMP_ASSERT(stream[_pendingWriteRequest] === undefined,
+ 'stream.[[pendingWriteRequest]] is undefined');
+
+ const storedError = stream[_storedError];
+ rejectPromises(stream[_writeRequests], storedError);
+ stream[_writeRequests] = new v8.InternalPackedArray();
+
+ if (stream[_pendingCloseRequest] !== undefined) {
+ TEMP_ASSERT(
+ getDefaultControllerInCloseFlag(stream[_writableStreamController]) ===
+ false, 'stream.[[writableStreamController]].[[inClose]] === false');
+ v8.rejectPromise(stream[_pendingCloseRequest], storedError);
+ stream[_pendingCloseRequest] = undefined;
+ }
+
+ const writer = stream[_writer];
+ if (writer !== undefined) {
+ v8.rejectPromise(writer[_closedPromise], storedError);
+ v8.markPromiseAsHandled(writer[_closedPromise]);
+ }
}
function WritableStreamUpdateBackpressure(stream, backpressure) {
@@ -311,6 +380,7 @@
TEMP_ASSERT(state === ERRORED,
'state is "errored".');
this[_closedPromise] = Promise_reject(stream[_storedError]);
+ v8.markPromiseAsHandled(this[_closedPromise]);
}
if (state === WRITABLE &&
WritableStreamDefaultControllerGetBackpressure(
@@ -421,14 +491,14 @@
}
TEMP_ASSERT(state === WRITABLE,
'state is "writable".');
- const promise = WritableStreamAddWriteRequest(stream);
+ stream[_pendingCloseRequest] = v8.createPromise();
if (WritableStreamDefaultControllerGetBackpressure(
stream[_writableStreamController])) {
v8.resolvePromise(writer[_readyPromise], undefined);
}
stream[_state] = CLOSING;
WritableStreamDefaultControllerClose(stream[_writableStreamController]);
- return promise;
+ return stream[_pendingCloseRequest];
}
function WritableStreamDefaultWriterGetDesiredSize(writer) {
@@ -452,11 +522,14 @@
'stream.[[writer]] is writer.');
const releasedError = new TypeError(errReleasedWriterClosedPromise);
const state = stream[_state];
- if (state === WRITABLE || state === CLOSING) {
+ if (state === WRITABLE || state === CLOSING ||
+ stream[_pendingAbortRequest] !== undefined) {
v8.rejectPromise(writer[_closedPromise], releasedError);
} else {
writer[_closedPromise] = Promise_reject(releasedError);
}
+ v8.markPromiseAsHandled(writer[_closedPromise]);
+
if (state === WRITABLE &&
WritableStreamDefaultControllerGetBackpressure(
stream[_writableStreamController])) {
@@ -464,6 +537,8 @@
} else {
writer[_readyPromise] = Promise_reject(releasedError);
}
+ v8.markPromiseAsHandled(writer[_readyPromise]);
+
stream[_writer] = undefined;
writer[_ownerWritableStream] = undefined;
}
@@ -560,23 +635,23 @@
TEMP_ASSERT(stream[_state] === WRITABLE,
'stream.[[state]] is "writable".');
let chunkSize = 1;
- if (controller[_strategySize] !== undefined) {
+ const strategySize = controller[_strategySize];
+ if (strategySize !== undefined) {
try {
- chunkSize = Function_call(controller[_strategySize], undefined, chunk);
+ chunkSize = strategySize(chunk);
} catch (e) {
WritableStreamDefaultControllerErrorIfNeeded(controller, e);
- return Promise_reject(e);
+ return;
}
}
const writeRecord = {chunk};
const lastBackpressure =
WritableStreamDefaultControllerGetBackpressure(controller);
try {
- const enqueueResult =
- EnqueueValueWithSizeForController(controller, writeRecord, chunkSize);
+ EnqueueValueWithSizeForController(controller, writeRecord, chunkSize);
} catch (e) {
WritableStreamDefaultControllerErrorIfNeeded(controller, e);
- return Promise_reject(e);
+ return;
}
if (stream[_state] === WRITABLE) {
const backpressure =
@@ -625,37 +700,87 @@
DequeueValueForController(controller);
TEMP_ASSERT(controller[_queue].length === 0,
'controller.[[queue]] is empty.');
+ setDefaultControllerInCloseFlag(controller, true);
const sinkClosePromise = PromiseInvokeOrNoop(controller[_underlyingSink],
- 'close', [controller]);
+ 'close', [controller]);
thenPromise(sinkClosePromise,
() => {
- if (stream[_state] !== CLOSING) {
+ TEMP_ASSERT(getDefaultControllerInCloseFlag(controller)
+ === true,
+ 'controller.[[inClose]] is true');
+ setDefaultControllerInCloseFlag(controller, false);
+
+ if (stream[_state] !== CLOSING &&
+ stream[_state] !== ERRORED) {
return;
}
- WritableStreamFulfillWriteRequest(stream);
+
+ TEMP_ASSERT(stream[_pendingCloseRequest] !== undefined);
+ v8.resolvePromise(stream[_pendingCloseRequest], undefined);
+ stream[_pendingCloseRequest] = undefined;
+
WritableStreamFinishClose(stream);
},
- r => WritableStreamDefaultControllerErrorIfNeeded(controller, r)
+ r => {
+ TEMP_ASSERT(getDefaultControllerInCloseFlag(controller)
+ === true,
+ 'controller.[[inClose]] is true');
+ setDefaultControllerInCloseFlag(controller, false);
+
+ TEMP_ASSERT(stream[_pendingCloseRequest] !== undefined);
+ v8.rejectPromise(stream[_pendingCloseRequest], r);
+ stream[_pendingCloseRequest] = undefined;
+
+ if (stream[_pendingAbortRequest] !== undefined) {
+ v8.rejectPromise(stream[_pendingAbortRequest], r);
+ stream[_pendingAbortRequest] = undefined;
+ }
+
+ WritableStreamDefaultControllerErrorIfNeeded(controller, r);
+ }
);
}
function WritableStreamDefaultControllerProcessWrite(controller, chunk) {
setDefaultControllerWritingFlag(controller, true);
+
+ const stream = controller[_controlledWritableStream];
+
+ TEMP_ASSERT(stream[_pendingWriteRequest] === undefined,
+ 'stream.[[pendingWriteRequest]] is undefined');
+ TEMP_ASSERT(stream[_writeRequests].length > 0,
+ 'stream.[[writeRequests]] is not empty');
+ stream[_pendingWriteRequest] = stream[_writeRequests].shift();
+
const sinkWritePromise = PromiseInvokeOrNoop(controller[_underlyingSink],
'write', [chunk, controller]);
thenPromise(
sinkWritePromise,
() => {
- const stream = controller[_controlledWritableStream];
+ TEMP_ASSERT(getDefaultControllerWritingFlag(controller) === true,
+ 'controller.[[writing]] is true');
+ setDefaultControllerWritingFlag(controller, false);
+
+ TEMP_ASSERT(stream[_pendingWriteRequest] !== undefined,
+ 'stream.[[pendingWriteRequest]] is not undefined');
+ v8.resolvePromise(stream[_pendingWriteRequest], undefined);
+ stream[_pendingWriteRequest] = undefined;
+
const state = stream[_state];
- if (state === ERRORED || state === CLOSED) {
+ if (state === ERRORED) {
+ WritableStreamRejectPromisesInReactionToError(stream);
+
+ if (stream[_pendingAbortRequest] !== undefined) {
+ v8.resolvePromise(stream[_pendingAbortRequest], undefined);
+ stream[_pendingAbortRequest] = undefined;
+ }
return;
}
- setDefaultControllerWritingFlag(controller, false);
- WritableStreamFulfillWriteRequest(stream);
+
const lastBackpressure =
WritableStreamDefaultControllerGetBackpressure(controller);
DequeueValueForController(controller);
+
if (state !== CLOSING) {
const backpressure =
WritableStreamDefaultControllerGetBackpressure(controller);
@@ -664,10 +789,32 @@
controller[_controlledWritableStream], backpressure);
}
}
+
WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
},
- r => WritableStreamDefaultControllerErrorIfNeeded(controller, r)
- );
+ r => {
+ TEMP_ASSERT(getDefaultControllerWritingFlag(controller) === true,
+ 'controller.[[writing]] is true');
+ setDefaultControllerWritingFlag(controller, false);
+
+ TEMP_ASSERT(stream[_pendingWriteRequest] !== undefined,
+ 'stream.[[pendingWriteRequest]] is not undefined');
+ v8.rejectPromise(stream[_pendingWriteRequest], r);
+ stream[_pendingWriteRequest] = undefined;
+
+ if (stream[_state] === ERRORED) {
+ stream[_storedError] = r;
+ WritableStreamRejectPromisesInReactionToError(stream);
+ }
+
+ if (stream[_pendingAbortRequest] !== undefined) {
+ v8.rejectPromise(stream[_pendingAbortRequest], r);
+ stream[_pendingAbortRequest] = undefined;
+ }
+
+ WritableStreamDefaultControllerErrorIfNeeded(controller, r);
+ }
+ );
}
function WritableStreamDefaultControllerGetBackpressure(controller) {
« no previous file with comments | « third_party/WebKit/LayoutTests/http/tests/streams/writable-streams/general.js ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698