Chromium Code Reviews| Index: third_party/WebKit/Source/core/streams/WritableStream.js |
| diff --git a/third_party/WebKit/Source/core/streams/WritableStream.js b/third_party/WebKit/Source/core/streams/WritableStream.js |
| index dcffef65ddf835bf99a0d9b47d8901e9d544265b..d8fe2b685ced133d6719c67d2fbd596e63cc4a5e 100644 |
| --- a/third_party/WebKit/Source/core/streams/WritableStream.js |
| +++ b/third_party/WebKit/Source/core/streams/WritableStream.js |
| @@ -13,6 +13,9 @@ |
| // Private symbols. These correspond to the internal slots in the standard. |
| // "[[X]]" in the standard is spelt _X in this implementation. |
| + const _pendingWriteRequest = v8.createPrivateSymbol('[[pendingWriteRequest]]'); |
| + const _pendingCloseRequest = v8.createPrivateSymbol('[[pendingCloseRequest]]'); |
| + const _pendingAbortRequest = v8.createPrivateSymbol('[[pendingAbortRequest]]'); |
| const _state = v8.createPrivateSymbol('[[state]]'); |
| const _storedError = v8.createPrivateSymbol('[[storedError]]'); |
| const _writer = v8.createPrivateSymbol('[[writer]]'); |
| @@ -32,11 +35,13 @@ |
| const _underlyingSink = v8.createPrivateSymbol('[[underlyingSink]]'); |
| // _defaultControllerFlags combines WritableStreamDefaultController's internal |
| - // slots [[started]] and [[writing]] into a single bitmask for efficiency. |
| + // slots [[started]], [[writing]], and [[inClose]] into a single bitmask for |
| + // efficiency. |
| const _defaultControllerFlags = |
| v8.createPrivateSymbol('[[defaultControllerFlags]]'); |
| const FLAG_STARTED = 0b1; |
| const FLAG_WRITING = 0b10; |
| + const FLAG_INCLOSE = 0b100; |
| // For efficiency, WritableStream [[state]] contains numeric values. |
| const WRITABLE = 0; |
| @@ -53,7 +58,6 @@ |
| const defineProperty = global.Object.defineProperty; |
| const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty); |
| - const Function_call = v8.uncurryThis(global.Function.prototype.call); |
| const Function_apply = v8.uncurryThis(global.Function.prototype.apply); |
| const TypeError = global.TypeError; |
| @@ -129,6 +133,14 @@ |
| setDefaultControllerFlag(controller, FLAG_WRITING, value); |
| } |
| + function getDefaultControllerInCloseFlag(controller) { |
| + return Boolean(controller[_defaultControllerFlags] & FLAG_INCLOSE); |
| + } |
| + |
| + function setDefaultControllerInCloseFlag(controller, value) { |
| + setDefaultControllerFlag(controller, FLAG_INCLOSE, value); |
| + } |
| + |
| function rejectPromises(array, e) { |
| // array is an InternalPackedArray so forEach won't work. |
| for (let i = 0; i < array.length; ++i) { |
| @@ -149,9 +161,12 @@ |
| } |
| v8.log(`Assertion failed: ${message}\n`); |
| v8.logStackTrace(); |
| - class WritableStreamInternalError { |
| + class WritableStreamInternalError extends Error { |
| + constructor(message) { |
| + super(message); |
| + } |
| } |
| - throw new WritableStreamInternalError(); |
| + throw new WritableStreamInternalError(message); |
| } |
| class WritableStream { |
| @@ -160,6 +175,9 @@ |
| this[_storedError] = undefined; |
| this[_writer] = undefined; |
| this[_writableStreamController] = undefined; |
| + this[_pendingWriteRequest] = undefined; |
| + this[_pendingCloseRequest] = undefined; |
| + this[_pendingAbortRequest] = undefined; |
| this[_writeRequests] = new v8.InternalPackedArray(); |
| const type = underlyingSink.type; |
| if (type !== undefined) { |
| @@ -223,8 +241,25 @@ |
| 'state is "writable" or "closing".'); |
| const error = new TypeError(errStreamAborted); |
| WritableStreamError(stream, error); |
| - return WritableStreamDefaultControllerAbort( |
| - stream[_writableStreamController], reason); |
| + |
| + const controller = stream[_writableStreamController]; |
| + TEMP_ASSERT(controller !== undefined, |
| + 'controller is not undefined'); |
| + |
| + const isWriting = getDefaultControllerWritingFlag(controller); |
| + if (isWriting || getDefaultControllerInCloseFlag(controller)) { |
| + const promise = v8.createPromise(); |
| + stream[_pendingAbortRequest] = promise; |
| + |
| + if (isWriting) { |
| + return thenPromise(promise, () => { |
| + return WritableStreamDefaultControllerAbort(controller, reason); |
| + }); |
| + } |
| + return promise; |
| + } |
| + |
| + return WritableStreamDefaultControllerAbort(controller, reason); |
| } |
| // Writable Stream Abstract Operations Used by Controllers |
| @@ -240,40 +275,74 @@ |
| } |
| function WritableStreamError(stream, e) { |
| - const state = stream[_state]; |
| - TEMP_ASSERT(state === WRITABLE || state === CLOSING, |
| - 'state is "writable" or "closing".'); |
| - rejectPromises(stream[_writeRequests], e); |
| - stream[_writeRequests] = new v8.InternalPackedArray(); |
| + const oldState = stream[_state]; |
| + TEMP_ASSERT(oldState === WRITABLE || oldState === CLOSING, |
| + 'oldState is "writable" or "closing".'); |
| + |
| + stream[_state] = ERRORED; |
| + stream[_storedError] = e; |
| + |
| + const controller = stream[_writableStreamController]; |
| + if (controller === undefined || |
|
tyoshino (SeeGerritForStatus)
2017/01/06 05:47:19
wh
tyoshino (SeeGerritForStatus)
2017/01/06 05:47:49
Sorry, it's a mistake. please ignore.
|
| + (!getDefaultControllerWritingFlag(controller) && |
| + !getDefaultControllerInCloseFlag(controller))) { |
| + WritableStreamRejectPromisesInReactionToError(stream); |
| + } |
| + |
| const writer = stream[_writer]; |
| if (writer !== undefined) { |
| - v8.rejectPromise(writer[_closedPromise], e); |
| - if (state === WRITABLE && |
| - WritableStreamDefaultControllerGetBackpressure( |
| - stream[_writableStreamController])) { |
| + if (oldState === WRITABLE && |
| + WritableStreamDefaultControllerGetBackpressure(controller) === true) { |
| v8.rejectPromise(writer[_readyPromise], e); |
| } else { |
| writer[_readyPromise] = Promise_reject(e); |
| } |
| + v8.markPromiseAsHandled(writer[_readyPromise]); |
| } |
| - stream[_state] = ERRORED; |
| - stream[_storedError] = e; |
| } |
| function WritableStreamFinishClose(stream) { |
| - TEMP_ASSERT(stream[_state] === CLOSING, |
| - 'stream.[[state]] is "closing".'); |
| - TEMP_ASSERT(stream[_writer] !== undefined, |
| - 'stream.[[writer]] is not undefined.'); |
| - stream[_state] = CLOSED; |
| - v8.resolvePromise(stream[_writer][_closedPromise], undefined); |
| + const state = stream[_state]; |
| + TEMP_ASSERT(state === CLOSING || state === ERRORED, |
| + 'state is "closing" or "errored"'); |
| + |
| + if (state === CLOSING) { |
| + v8.resolvePromise(stream[_writer][_closedPromise], undefined); |
| + stream[_state] = CLOSED; |
| + } else { |
| + TEMP_ASSERT(state === ERRORED, 'state is "errored"'); |
| + v8.rejectPromise(stream[_writer][_closedPromise], stream[_storedError]); |
| + v8.markPromiseAsHandled(stream[_writer][_closedPromise]); |
| + } |
| + |
| + if (stream[_pendingAbortRequest] !== undefined) { |
| + v8.resolvePromise(stream[_pendingAbortRequest], undefined); |
| + stream[_pendingAbortRequest] = undefined; |
| + } |
| } |
| - function WritableStreamFulfillWriteRequest(stream) { |
| - TEMP_ASSERT(stream[_writeRequests].length !== 0, |
| - 'stream.[[writeRequests]] is not empty.'); |
| - const writeRequest = stream[_writeRequests].shift(); |
| - v8.resolvePromise(writeRequest, undefined); |
| + function WritableStreamRejectPromisesInReactionToError(stream) { |
| + TEMP_ASSERT(stream[_state] === ERRORED, 'stream.[[state]] is "errored"'); |
| + TEMP_ASSERT(stream[_pendingWriteRequest] === undefined, |
| + 'stream.[[pendingWriteRequest]] is undefined'); |
| + |
| + const storedError = stream[_storedError]; |
| + rejectPromises(stream[_writeRequests], storedError); |
| + stream[_writeRequests] = new v8.InternalPackedArray(); |
| + |
| + if (stream[_pendingCloseRequest] !== undefined) { |
| + TEMP_ASSERT( |
| + getDefaultControllerInCloseFlag(stream[_writableStreamController]) === |
|
tyoshino (SeeGerritForStatus)
2017/01/06 05:47:19
use 4 space indent after line-wrapping right after
|
| + false, 'stream.[[writableStreamController]].[[inClose]] === false'); |
| + v8.rejectPromise(stream[_pendingCloseRequest], storedError); |
| + stream[_pendingCloseRequest] = undefined; |
| + } |
| + |
| + const writer = stream[_writer]; |
| + if (writer !== undefined) { |
| + v8.rejectPromise(writer[_closedPromise], storedError); |
| + v8.markPromiseAsHandled(writer[_closedPromise]); |
| + } |
| } |
| function WritableStreamUpdateBackpressure(stream, backpressure) { |
| @@ -311,6 +380,7 @@ |
| TEMP_ASSERT(state === ERRORED, |
| 'state is "errored".'); |
| this[_closedPromise] = Promise_reject(stream[_storedError]); |
| + v8.markPromiseAsHandled(this[_closedPromise]); |
| } |
| if (state === WRITABLE && |
| WritableStreamDefaultControllerGetBackpressure( |
| @@ -421,14 +491,14 @@ |
| } |
| TEMP_ASSERT(state === WRITABLE, |
| 'state is "writable".'); |
| - const promise = WritableStreamAddWriteRequest(stream); |
| + stream[_pendingCloseRequest] = v8.createPromise(); |
| if (WritableStreamDefaultControllerGetBackpressure( |
| stream[_writableStreamController])) { |
| v8.resolvePromise(writer[_readyPromise], undefined); |
| } |
| stream[_state] = CLOSING; |
| WritableStreamDefaultControllerClose(stream[_writableStreamController]); |
| - return promise; |
| + return stream[_pendingCloseRequest]; |
| } |
| function WritableStreamDefaultWriterGetDesiredSize(writer) { |
| @@ -452,11 +522,14 @@ |
| 'stream.[[writer]] is writer.'); |
| const releasedError = new TypeError(errReleasedWriterClosedPromise); |
| const state = stream[_state]; |
| - if (state === WRITABLE || state === CLOSING) { |
| + if (state === WRITABLE || state === CLOSING || |
| + stream[_pendingAbortRequest] !== undefined) { |
| v8.rejectPromise(writer[_closedPromise], releasedError); |
| } else { |
| writer[_closedPromise] = Promise_reject(releasedError); |
| } |
| + v8.markPromiseAsHandled(writer[_closedPromise]); |
| + |
| if (state === WRITABLE && |
| WritableStreamDefaultControllerGetBackpressure( |
| stream[_writableStreamController])) { |
| @@ -464,6 +537,8 @@ |
| } else { |
| writer[_readyPromise] = Promise_reject(releasedError); |
| } |
| + v8.markPromiseAsHandled(writer[_readyPromise]); |
| + |
| stream[_writer] = undefined; |
| writer[_ownerWritableStream] = undefined; |
| } |
| @@ -560,23 +635,23 @@ |
| TEMP_ASSERT(stream[_state] === WRITABLE, |
| 'stream.[[state]] is "writable".'); |
| let chunkSize = 1; |
| - if (controller[_strategySize] !== undefined) { |
| + const strategySize = controller[_strategySize]; |
| + if (strategySize !== undefined) { |
| try { |
| - chunkSize = Function_call(controller[_strategySize], undefined, chunk); |
| + chunkSize = strategySize(chunk); |
| } catch (e) { |
| WritableStreamDefaultControllerErrorIfNeeded(controller, e); |
| - return Promise_reject(e); |
| + return; |
| } |
| } |
| const writeRecord = {chunk}; |
| const lastBackpressure = |
| WritableStreamDefaultControllerGetBackpressure(controller); |
| try { |
| - const enqueueResult = |
| - EnqueueValueWithSizeForController(controller, writeRecord, chunkSize); |
| + EnqueueValueWithSizeForController(controller, writeRecord, chunkSize); |
| } catch (e) { |
| WritableStreamDefaultControllerErrorIfNeeded(controller, e); |
| - return Promise_reject(e); |
| + return; |
| } |
| if (stream[_state] === WRITABLE) { |
| const backpressure = |
| @@ -625,37 +700,87 @@ |
| DequeueValueForController(controller); |
| TEMP_ASSERT(controller[_queue].length === 0, |
| 'controller.[[queue]] is empty.'); |
| + setDefaultControllerInCloseFlag(controller, true); |
| const sinkClosePromise = PromiseInvokeOrNoop(controller[_underlyingSink], |
| 'close', [controller]); |
|
tyoshino (SeeGerritForStatus)
2017/01/06 05:47:19
let's adjust indentation using this opportunity (o
|
| thenPromise(sinkClosePromise, |
| () => { |
| - if (stream[_state] !== CLOSING) { |
| + TEMP_ASSERT(getDefaultControllerInCloseFlag(controller) |
| + === true, |
| + 'controller.[[inClose]] is true'); |
| + setDefaultControllerInCloseFlag(controller, false); |
| + |
| + if (stream[_state] !== CLOSING && |
| + stream[_state] !== ERRORED) { |
| return; |
| } |
| - WritableStreamFulfillWriteRequest(stream); |
| + |
| + TEMP_ASSERT(stream[_pendingCloseRequest] !== undefined); |
| + v8.resolvePromise(stream[_pendingCloseRequest], undefined); |
| + stream[_pendingCloseRequest] = undefined; |
| + |
| WritableStreamFinishClose(stream); |
| }, |
| - r => WritableStreamDefaultControllerErrorIfNeeded(controller, r) |
| + r => { |
| + TEMP_ASSERT(getDefaultControllerInCloseFlag(controller) |
| + === true, |
| + 'controller.[[inClose]] is true'); |
| + setDefaultControllerInCloseFlag(controller, false); |
| + |
| + TEMP_ASSERT(stream[_pendingCloseRequest] !== undefined); |
| + v8.rejectPromise(stream[_pendingCloseRequest], r); |
| + stream[_pendingCloseRequest] = undefined; |
| + |
| + if (stream[_pendingAbortRequest] !== undefined) { |
| + v8.rejectPromise(stream[_pendingAbortRequest], r); |
| + stream[_pendingAbortRequest] = undefined; |
| + } |
| + |
| + WritableStreamDefaultControllerErrorIfNeeded(controller, r); |
| + } |
| ); |
| } |
| function WritableStreamDefaultControllerProcessWrite(controller, chunk) { |
| setDefaultControllerWritingFlag(controller, true); |
| + |
| + const stream = controller[_controlledWritableStream]; |
| + |
| + TEMP_ASSERT(stream[_pendingWriteRequest] === undefined, |
| + 'stream.[[pendingWriteRequest]] is undefined'); |
| + TEMP_ASSERT(stream[_writeRequests].length > 0, |
| + 'stream.[[writeRequests]] is not empty'); |
| + stream[_pendingWriteRequest] = stream[_writeRequests].shift(); |
| + |
| const sinkWritePromise = PromiseInvokeOrNoop(controller[_underlyingSink], |
| 'write', [chunk, controller]); |
| thenPromise( |
| sinkWritePromise, |
| () => { |
| - const stream = controller[_controlledWritableStream]; |
| + TEMP_ASSERT(getDefaultControllerWritingFlag(controller) === true, |
| + 'controller.[[writing]] is true'); |
| + setDefaultControllerWritingFlag(controller, false); |
| + |
| + TEMP_ASSERT(stream[_pendingWriteRequest] !== undefined, |
| + 'stream.[[pendingWriteRequest]] is not undefined'); |
| + v8.resolvePromise(stream[_pendingWriteRequest], undefined); |
| + stream[_pendingWriteRequest] = undefined; |
| + |
| const state = stream[_state]; |
| - if (state === ERRORED || state === CLOSED) { |
| + if (state === ERRORED) { |
| + WritableStreamRejectPromisesInReactionToError(stream); |
| + |
| + if (stream[_pendingAbortRequest] !== undefined) { |
| + v8.resolvePromise(stream[_pendingAbortRequest], undefined); |
| + stream[_pendingAbortRequest] = undefined; |
| + } |
| return; |
| } |
| - setDefaultControllerWritingFlag(controller, false); |
| - WritableStreamFulfillWriteRequest(stream); |
| + |
| const lastBackpressure = |
| WritableStreamDefaultControllerGetBackpressure(controller); |
| DequeueValueForController(controller); |
| + |
| if (state !== CLOSING) { |
| const backpressure = |
| WritableStreamDefaultControllerGetBackpressure(controller); |
| @@ -664,10 +789,32 @@ |
| controller[_controlledWritableStream], backpressure); |
| } |
| } |
| + |
| WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller); |
| }, |
| - r => WritableStreamDefaultControllerErrorIfNeeded(controller, r) |
| - ); |
| + r => { |
| + TEMP_ASSERT(getDefaultControllerWritingFlag(controller) === true, |
| + 'controller.[[writing]] is true'); |
| + setDefaultControllerWritingFlag(controller, false); |
| + |
| + TEMP_ASSERT(stream[_pendingWriteRequest] !== undefined, |
| + 'stream.[[pendingWriteRequest]] is not undefined'); |
| + v8.rejectPromise(stream[_pendingWriteRequest], r); |
| + stream[_pendingWriteRequest] = undefined; |
| + |
| + if (stream[_state] === ERRORED) { |
| + stream[_storedError] = r; |
| + WritableStreamRejectPromisesInReactionToError(stream); |
| + } |
| + |
| + if (stream[_pendingAbortRequest] !== undefined) { |
| + v8.rejectPromise(stream[_pendingAbortRequest], r); |
| + stream[_pendingAbortRequest] = undefined; |
| + } |
| + |
| + WritableStreamDefaultControllerErrorIfNeeded(controller, r); |
| + } |
| + ); |
| } |
| function WritableStreamDefaultControllerGetBackpressure(controller) { |