Index: third_party/WebKit/Source/core/streams/WritableStream.js |
diff --git a/third_party/WebKit/Source/core/streams/WritableStream.js b/third_party/WebKit/Source/core/streams/WritableStream.js |
index dcffef65ddf835bf99a0d9b47d8901e9d544265b..e2f88482db6098225beaa3d1fe5b36f4faf66035 100644 |
--- a/third_party/WebKit/Source/core/streams/WritableStream.js |
+++ b/third_party/WebKit/Source/core/streams/WritableStream.js |
@@ -13,6 +13,9 @@ |
// Private symbols. These correspond to the internal slots in the standard. |
// "[[X]]" in the standard is spelt _X in this implementation. |
+ const _pendingWriteRequest = v8.createPrivateSymbol('[[pendingWriteRequest]]'); |
+ const _pendingCloseRequest = v8.createPrivateSymbol('[[pendingCloseRequest]]'); |
+ const _pendingAbortRequest = v8.createPrivateSymbol('[[pendingAbortRequest]]'); |
const _state = v8.createPrivateSymbol('[[state]]'); |
const _storedError = v8.createPrivateSymbol('[[storedError]]'); |
const _writer = v8.createPrivateSymbol('[[writer]]'); |
@@ -32,11 +35,13 @@ |
const _underlyingSink = v8.createPrivateSymbol('[[underlyingSink]]'); |
// _defaultControllerFlags combines WritableStreamDefaultController's internal |
- // slots [[started]] and [[writing]] into a single bitmask for efficiency. |
+ // slots [[started]], [[writing]], and [[inClose]] into a single bitmask for |
+ // efficiency. |
const _defaultControllerFlags = |
v8.createPrivateSymbol('[[defaultControllerFlags]]'); |
const FLAG_STARTED = 0b1; |
const FLAG_WRITING = 0b10; |
+ const FLAG_INCLOSE = 0b100; |
// For efficiency, WritableStream [[state]] contains numeric values. |
const WRITABLE = 0; |
@@ -53,7 +58,6 @@ |
const defineProperty = global.Object.defineProperty; |
const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty); |
- const Function_call = v8.uncurryThis(global.Function.prototype.call); |
const Function_apply = v8.uncurryThis(global.Function.prototype.apply); |
const TypeError = global.TypeError; |
@@ -129,6 +133,14 @@ |
setDefaultControllerFlag(controller, FLAG_WRITING, value); |
} |
+ function getDefaultControllerInCloseFlag(controller) { |
+ return Boolean(controller[_defaultControllerFlags] & FLAG_INCLOSE); |
+ } |
+ |
+ function setDefaultControllerInCloseFlag(controller, value) { |
+ setDefaultControllerFlag(controller, FLAG_INCLOSE, value); |
+ } |
+ |
function rejectPromises(array, e) { |
// array is an InternalPackedArray so forEach won't work. |
for (let i = 0; i < array.length; ++i) { |
@@ -149,9 +161,12 @@ |
} |
v8.log(`Assertion failed: ${message}\n`); |
v8.logStackTrace(); |
- class WritableStreamInternalError { |
+ class WritableStreamInternalError extends Error { |
+ constructor(message) { |
+ super(message); |
+ } |
} |
- throw new WritableStreamInternalError(); |
+ throw new WritableStreamInternalError(message); |
} |
class WritableStream { |
@@ -160,6 +175,9 @@ |
this[_storedError] = undefined; |
this[_writer] = undefined; |
this[_writableStreamController] = undefined; |
+ this[_pendingWriteRequest] = undefined; |
+ this[_pendingCloseRequest] = undefined; |
+ this[_pendingAbortRequest] = undefined; |
this[_writeRequests] = new v8.InternalPackedArray(); |
const type = underlyingSink.type; |
if (type !== undefined) { |
@@ -223,8 +241,25 @@ |
'state is "writable" or "closing".'); |
const error = new TypeError(errStreamAborted); |
WritableStreamError(stream, error); |
- return WritableStreamDefaultControllerAbort( |
- stream[_writableStreamController], reason); |
+ |
+ const controller = stream[_writableStreamController]; |
+ TEMP_ASSERT(controller !== undefined, |
+ 'controller is not undefined'); |
+ |
+ const isWriting = getDefaultControllerWritingFlag(controller); |
+ if (isWriting || getDefaultControllerInCloseFlag(controller)) { |
+ const promise = v8.createPromise(); |
+ stream[_pendingAbortRequest] = promise; |
+ |
+ if (isWriting) { |
+ return thenPromise(promise, () => { |
+ return WritableStreamDefaultControllerAbort(controller, reason); |
+ }); |
+ } |
+ return promise; |
+ } |
+ |
+ return WritableStreamDefaultControllerAbort(controller, reason); |
} |
// Writable Stream Abstract Operations Used by Controllers |
@@ -240,40 +275,74 @@ |
} |
function WritableStreamError(stream, e) { |
- const state = stream[_state]; |
- TEMP_ASSERT(state === WRITABLE || state === CLOSING, |
- 'state is "writable" or "closing".'); |
- rejectPromises(stream[_writeRequests], e); |
- stream[_writeRequests] = new v8.InternalPackedArray(); |
+ const oldState = stream[_state]; |
+ TEMP_ASSERT(oldState === WRITABLE || oldState === CLOSING, |
+ 'oldState is "writable" or "closing".'); |
+ |
+ stream[_state] = ERRORED; |
+ stream[_storedError] = e; |
+ |
+ const controller = stream[_writableStreamController]; |
+ if (controller === undefined || |
+ (!getDefaultControllerWritingFlag(controller) && |
+ !getDefaultControllerInCloseFlag(controller))) { |
+ WritableStreamRejectPromisesInReactionToError(stream); |
+ } |
+ |
const writer = stream[_writer]; |
if (writer !== undefined) { |
- v8.rejectPromise(writer[_closedPromise], e); |
- if (state === WRITABLE && |
- WritableStreamDefaultControllerGetBackpressure( |
- stream[_writableStreamController])) { |
+ if (oldState === WRITABLE && |
+ WritableStreamDefaultControllerGetBackpressure(controller) === true) { |
v8.rejectPromise(writer[_readyPromise], e); |
} else { |
writer[_readyPromise] = Promise_reject(e); |
} |
+ v8.markPromiseAsHandled(writer[_readyPromise]); |
} |
- stream[_state] = ERRORED; |
- stream[_storedError] = e; |
} |
function WritableStreamFinishClose(stream) { |
- TEMP_ASSERT(stream[_state] === CLOSING, |
- 'stream.[[state]] is "closing".'); |
- TEMP_ASSERT(stream[_writer] !== undefined, |
- 'stream.[[writer]] is not undefined.'); |
- stream[_state] = CLOSED; |
- v8.resolvePromise(stream[_writer][_closedPromise], undefined); |
+ const state = stream[_state]; |
+ TEMP_ASSERT(state === CLOSING || state === ERRORED, |
+ 'state is "closing" or "errored"'); |
+ |
+ if (state === CLOSING) { |
+ v8.resolvePromise(stream[_writer][_closedPromise], undefined); |
+ stream[_state] = CLOSED; |
+ } else { |
+ TEMP_ASSERT(state === ERRORED, 'state is "errored"'); |
+ v8.rejectPromise(stream[_writer][_closedPromise], stream[_storedError]); |
+ v8.markPromiseAsHandled(stream[_writer][_closedPromise]); |
+ } |
+ |
+ if (stream[_pendingAbortRequest] !== undefined) { |
+ v8.resolvePromise(stream[_pendingAbortRequest], undefined); |
+ stream[_pendingAbortRequest] = undefined; |
+ } |
} |
- function WritableStreamFulfillWriteRequest(stream) { |
- TEMP_ASSERT(stream[_writeRequests].length !== 0, |
- 'stream.[[writeRequests]] is not empty.'); |
- const writeRequest = stream[_writeRequests].shift(); |
- v8.resolvePromise(writeRequest, undefined); |
+ function WritableStreamRejectPromisesInReactionToError(stream) { |
+ TEMP_ASSERT(stream[_state] === ERRORED, 'stream.[[state]] is "errored"'); |
+ TEMP_ASSERT(stream[_pendingWriteRequest] === undefined, |
+ 'stream.[[pendingWriteRequest]] is undefined'); |
+ |
+ const storedError = stream[_storedError]; |
+ rejectPromises(stream[_writeRequests], storedError); |
+ stream[_writeRequests] = new v8.InternalPackedArray(); |
+ |
+ if (stream[_pendingCloseRequest] !== undefined) { |
+ TEMP_ASSERT( |
+ getDefaultControllerInCloseFlag(stream[_writableStreamController]) === |
+ false, 'stream.[[writableStreamController]].[[inClose]] === false'); |
+ v8.rejectPromise(stream[_pendingCloseRequest], storedError); |
+ stream[_pendingCloseRequest] = undefined; |
+ } |
+ |
+ const writer = stream[_writer]; |
+ if (writer !== undefined) { |
+ v8.rejectPromise(writer[_closedPromise], storedError); |
+ v8.markPromiseAsHandled(writer[_closedPromise]); |
+ } |
} |
function WritableStreamUpdateBackpressure(stream, backpressure) { |
@@ -311,6 +380,7 @@ |
TEMP_ASSERT(state === ERRORED, |
'state is "errored".'); |
this[_closedPromise] = Promise_reject(stream[_storedError]); |
+ v8.markPromiseAsHandled(this[_closedPromise]); |
} |
if (state === WRITABLE && |
WritableStreamDefaultControllerGetBackpressure( |
@@ -421,14 +491,14 @@ |
} |
TEMP_ASSERT(state === WRITABLE, |
'state is "writable".'); |
- const promise = WritableStreamAddWriteRequest(stream); |
+ stream[_pendingCloseRequest] = v8.createPromise(); |
if (WritableStreamDefaultControllerGetBackpressure( |
stream[_writableStreamController])) { |
v8.resolvePromise(writer[_readyPromise], undefined); |
} |
stream[_state] = CLOSING; |
WritableStreamDefaultControllerClose(stream[_writableStreamController]); |
- return promise; |
+ return stream[_pendingCloseRequest]; |
} |
function WritableStreamDefaultWriterGetDesiredSize(writer) { |
@@ -452,11 +522,14 @@ |
'stream.[[writer]] is writer.'); |
const releasedError = new TypeError(errReleasedWriterClosedPromise); |
const state = stream[_state]; |
- if (state === WRITABLE || state === CLOSING) { |
+ if (state === WRITABLE || state === CLOSING || |
+ stream[_pendingAbortRequest] !== undefined) { |
v8.rejectPromise(writer[_closedPromise], releasedError); |
} else { |
writer[_closedPromise] = Promise_reject(releasedError); |
} |
+ v8.markPromiseAsHandled(writer[_closedPromise]); |
+ |
if (state === WRITABLE && |
WritableStreamDefaultControllerGetBackpressure( |
stream[_writableStreamController])) { |
@@ -464,6 +537,8 @@ |
} else { |
writer[_readyPromise] = Promise_reject(releasedError); |
} |
+ v8.markPromiseAsHandled(writer[_readyPromise]); |
+ |
stream[_writer] = undefined; |
writer[_ownerWritableStream] = undefined; |
} |
@@ -560,23 +635,23 @@ |
TEMP_ASSERT(stream[_state] === WRITABLE, |
'stream.[[state]] is "writable".'); |
let chunkSize = 1; |
- if (controller[_strategySize] !== undefined) { |
+ const strategySize = controller[_strategySize]; |
+ if (strategySize !== undefined) { |
try { |
- chunkSize = Function_call(controller[_strategySize], undefined, chunk); |
+ chunkSize = strategySize(chunk); |
} catch (e) { |
WritableStreamDefaultControllerErrorIfNeeded(controller, e); |
- return Promise_reject(e); |
+ return; |
} |
} |
const writeRecord = {chunk}; |
const lastBackpressure = |
WritableStreamDefaultControllerGetBackpressure(controller); |
try { |
- const enqueueResult = |
- EnqueueValueWithSizeForController(controller, writeRecord, chunkSize); |
+ EnqueueValueWithSizeForController(controller, writeRecord, chunkSize); |
} catch (e) { |
WritableStreamDefaultControllerErrorIfNeeded(controller, e); |
- return Promise_reject(e); |
+ return; |
} |
if (stream[_state] === WRITABLE) { |
const backpressure = |
@@ -625,37 +700,87 @@ |
DequeueValueForController(controller); |
TEMP_ASSERT(controller[_queue].length === 0, |
'controller.[[queue]] is empty.'); |
+ setDefaultControllerInCloseFlag(controller, true); |
const sinkClosePromise = PromiseInvokeOrNoop(controller[_underlyingSink], |
- 'close', [controller]); |
+ 'close', [controller]); |
thenPromise(sinkClosePromise, |
() => { |
- if (stream[_state] !== CLOSING) { |
+ TEMP_ASSERT(getDefaultControllerInCloseFlag(controller) |
+ === true, |
+ 'controller.[[inClose]] is true'); |
+ setDefaultControllerInCloseFlag(controller, false); |
+ |
+ if (stream[_state] !== CLOSING && |
+ stream[_state] !== ERRORED) { |
return; |
} |
- WritableStreamFulfillWriteRequest(stream); |
+ |
+ TEMP_ASSERT(stream[_pendingCloseRequest] !== undefined); |
+ v8.resolvePromise(stream[_pendingCloseRequest], undefined); |
+ stream[_pendingCloseRequest] = undefined; |
+ |
WritableStreamFinishClose(stream); |
}, |
- r => WritableStreamDefaultControllerErrorIfNeeded(controller, r) |
+ r => { |
+ TEMP_ASSERT(getDefaultControllerInCloseFlag(controller) |
+ === true, |
+ 'controller.[[inClose]] is true'); |
+ setDefaultControllerInCloseFlag(controller, false); |
+ |
+ TEMP_ASSERT(stream[_pendingCloseRequest] !== undefined); |
+ v8.rejectPromise(stream[_pendingCloseRequest], r); |
+ stream[_pendingCloseRequest] = undefined; |
+ |
+ if (stream[_pendingAbortRequest] !== undefined) { |
+ v8.rejectPromise(stream[_pendingAbortRequest], r); |
+ stream[_pendingAbortRequest] = undefined; |
+ } |
+ |
+ WritableStreamDefaultControllerErrorIfNeeded(controller, r); |
+ } |
); |
} |
function WritableStreamDefaultControllerProcessWrite(controller, chunk) { |
setDefaultControllerWritingFlag(controller, true); |
+ |
+ const stream = controller[_controlledWritableStream]; |
+ |
+ TEMP_ASSERT(stream[_pendingWriteRequest] === undefined, |
+ 'stream.[[pendingWriteRequest]] is undefined'); |
+ TEMP_ASSERT(stream[_writeRequests].length > 0, |
+ 'stream.[[writeRequests]] is not empty'); |
+ stream[_pendingWriteRequest] = stream[_writeRequests].shift(); |
+ |
const sinkWritePromise = PromiseInvokeOrNoop(controller[_underlyingSink], |
'write', [chunk, controller]); |
thenPromise( |
sinkWritePromise, |
() => { |
- const stream = controller[_controlledWritableStream]; |
+ TEMP_ASSERT(getDefaultControllerWritingFlag(controller) === true, |
+ 'controller.[[writing]] is true'); |
+ setDefaultControllerWritingFlag(controller, false); |
+ |
+ TEMP_ASSERT(stream[_pendingWriteRequest] !== undefined, |
+ 'stream.[[pendingWriteRequest]] is not undefined'); |
+ v8.resolvePromise(stream[_pendingWriteRequest], undefined); |
+ stream[_pendingWriteRequest] = undefined; |
+ |
const state = stream[_state]; |
- if (state === ERRORED || state === CLOSED) { |
+ if (state === ERRORED) { |
+ WritableStreamRejectPromisesInReactionToError(stream); |
+ |
+ if (stream[_pendingAbortRequest] !== undefined) { |
+ v8.resolvePromise(stream[_pendingAbortRequest], undefined); |
+ stream[_pendingAbortRequest] = undefined; |
+ } |
return; |
} |
- setDefaultControllerWritingFlag(controller, false); |
- WritableStreamFulfillWriteRequest(stream); |
+ |
const lastBackpressure = |
WritableStreamDefaultControllerGetBackpressure(controller); |
DequeueValueForController(controller); |
+ |
if (state !== CLOSING) { |
const backpressure = |
WritableStreamDefaultControllerGetBackpressure(controller); |
@@ -664,10 +789,32 @@ |
controller[_controlledWritableStream], backpressure); |
} |
} |
+ |
WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller); |
}, |
- r => WritableStreamDefaultControllerErrorIfNeeded(controller, r) |
- ); |
+ r => { |
+ TEMP_ASSERT(getDefaultControllerWritingFlag(controller) === true, |
+ 'controller.[[writing]] is true'); |
+ setDefaultControllerWritingFlag(controller, false); |
+ |
+ TEMP_ASSERT(stream[_pendingWriteRequest] !== undefined, |
+ 'stream.[[pendingWriteRequest]] is not undefined'); |
+ v8.rejectPromise(stream[_pendingWriteRequest], r); |
+ stream[_pendingWriteRequest] = undefined; |
+ |
+ if (stream[_state] === ERRORED) { |
+ stream[_storedError] = r; |
+ WritableStreamRejectPromisesInReactionToError(stream); |
+ } |
+ |
+ if (stream[_pendingAbortRequest] !== undefined) { |
+ v8.rejectPromise(stream[_pendingAbortRequest], r); |
+ stream[_pendingAbortRequest] = undefined; |
+ } |
+ |
+ WritableStreamDefaultControllerErrorIfNeeded(controller, r); |
+ } |
+ ); |
} |
function WritableStreamDefaultControllerGetBackpressure(controller) { |