Chromium Code Reviews| Index: third_party/WebKit/Source/core/streams/WritableStream.js |
| diff --git a/third_party/WebKit/Source/core/streams/WritableStream.js b/third_party/WebKit/Source/core/streams/WritableStream.js |
| index ed39672b2175b71fe753f3c65e2ad50ae6e8f459..4a4429cba4265467df633a4571b661315efbae27 100644 |
| --- a/third_party/WebKit/Source/core/streams/WritableStream.js |
| +++ b/third_party/WebKit/Source/core/streams/WritableStream.js |
| @@ -13,14 +13,20 @@ |
| // Private symbols. These correspond to the internal slots in the standard. |
| // "[[X]]" in the standard is spelt _X in this implementation. |
| - const _pendingWriteRequest = v8.createPrivateSymbol('[[pendingWriteRequest]]'); |
| - const _pendingCloseRequest = v8.createPrivateSymbol('[[pendingCloseRequest]]'); |
| - const _pendingAbortRequest = v8.createPrivateSymbol('[[pendingAbortRequest]]'); |
| - const _state = v8.createPrivateSymbol('[[state]]'); |
| + |
| + // TODO(ricea): Optimise [[closeRequest]] and [[inFlightCloseRequest]] into a |
| + // single slot + a flag to say which one is set at the moment. |
| + const _closeRequest = v8.createPrivateSymbol('[[closeRequest]]'); |
| + const _inFlightWriteRequest = v8.createPrivateSymbol('[[inFlightWriteRequest]]'); |
| + const _inFlightCloseRequest = v8.createPrivateSymbol('[[inFlightCloseRequest]]'); |
| + const _pendingAbortRequest = |
| + v8.createPrivateSymbol('[[pendingAbortRequest]]'); |
| + // Flags and state are combined into a single integer field for efficiency. |
| + const _stateAndFlags = v8.createPrivateSymbol('[[state]] and flags'); |
| const _storedError = v8.createPrivateSymbol('[[storedError]]'); |
| - const _writer = v8.createPrivateSymbol('[[writer]]'); |
| const _writableStreamController = |
| v8.createPrivateSymbol('[[writableStreamController]]'); |
| + const _writer = v8.createPrivateSymbol('[[writer]]'); |
| const _writeRequests = v8.createPrivateSymbol('[[writeRequests]]'); |
| const _closedPromise = v8.createPrivateSymbol('[[closedPromise]]'); |
| const _ownerWritableStream = |
| @@ -29,25 +35,21 @@ |
| const _controlledWritableStream = |
| v8.createPrivateSymbol('[[controlledWritableStream]]'); |
| const _queue = v8.createPrivateSymbol('[[queue]]'); |
| - const _queueSize = v8.createPrivateSymbol('[[queueSize]]'); |
| + const _queueTotalSize = v8.createPrivateSymbol('[[queueTotalSize]]'); |
| + const _started = v8.createPrivateSymbol('[[started]]'); |
| const _strategyHWM = v8.createPrivateSymbol('[[strategyHWM]]'); |
| const _strategySize = v8.createPrivateSymbol('[[strategySize]]'); |
| const _underlyingSink = v8.createPrivateSymbol('[[underlyingSink]]'); |
| - // _defaultControllerFlags combines WritableStreamDefaultController's internal |
| - // slots [[started]], [[writing]], and [[inClose]] into a single bitmask for |
| - // efficiency. |
| - const _defaultControllerFlags = |
| - v8.createPrivateSymbol('[[defaultControllerFlags]]'); |
| - const FLAG_STARTED = 0b1; |
| - const FLAG_WRITING = 0b10; |
| - const FLAG_INCLOSE = 0b100; |
| - |
| - // For efficiency, WritableStream [[state]] contains numeric values. |
| + // Numeric encodings of states |
| const WRITABLE = 0; |
| - const CLOSING = 1; |
| - const CLOSED = 2; |
| - const ERRORED = 3; |
| + const CLOSED = 1; |
| + const ERRORED = 2; |
| + |
| + // Mask to extract or assign states to _stateAndFlags |
| + const STATE_MASK = 0xF; |
| + |
| + const BACKPRESSURE_FLAG = 0x10; |
| // Javascript functions. It is important to use these copies, as the ones on |
| // the global object may have been overwritten. See "V8 Extras Design Doc", |
| @@ -59,6 +61,7 @@ |
| const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty); |
| const Function_apply = v8.uncurryThis(global.Function.prototype.apply); |
| + const Function_call = v8.uncurryThis(global.Function.prototype.call); |
| const TypeError = global.TypeError; |
| const RangeError = global.RangeError; |
| @@ -77,6 +80,7 @@ |
| const streamErrors = binding.streamErrors; |
| const errAbortLockedStream = 'Cannot abort a writable stream that is locked to a writer'; |
| const errStreamAborted = 'The stream has been aborted'; |
| + const errStreamAborting = 'The stream is in the process of being aborted'; |
| const errWriterLockReleasedPrefix = 'This writable stream writer has been released and cannot be '; |
| const errCloseCloseRequestedStream = |
| 'Cannot close a writable stream that has already been requested to be closed'; |
| @@ -107,40 +111,6 @@ |
| templateErrorCannotActionOnStateStream(action, stateNames[state])); |
| } |
| - function setDefaultControllerFlag(controller, flag, value) { |
| - let flags = controller[_defaultControllerFlags]; |
| - if (value) { |
| - flags = flags | flag; |
| - } else { |
| - flags = flags & ~flag; |
| - } |
| - controller[_defaultControllerFlags] = flags; |
| - } |
| - |
| - function getDefaultControllerStartedFlag(controller) { |
| - return Boolean(controller[_defaultControllerFlags] & FLAG_STARTED); |
| - } |
| - |
| - function setDefaultControllerStartedFlag(controller, value) { |
| - setDefaultControllerFlag(controller, FLAG_STARTED, value); |
| - } |
| - |
| - function getDefaultControllerWritingFlag(controller) { |
| - return Boolean(controller[_defaultControllerFlags] & FLAG_WRITING); |
| - } |
| - |
| - function setDefaultControllerWritingFlag(controller, value) { |
| - setDefaultControllerFlag(controller, FLAG_WRITING, value); |
| - } |
| - |
| - function getDefaultControllerInCloseFlag(controller) { |
| - return Boolean(controller[_defaultControllerFlags] & FLAG_INCLOSE); |
| - } |
| - |
| - function setDefaultControllerInCloseFlag(controller, value) { |
| - setDefaultControllerFlag(controller, FLAG_INCLOSE, value); |
| - } |
| - |
| function rejectPromises(queue, e) { |
| queue.forEach(promise => v8.rejectPromise(promise, e)); |
| } |
| @@ -168,12 +138,13 @@ |
| class WritableStream { |
| constructor(underlyingSink = {}, { size, highWaterMark = 1 } = {}) { |
| - this[_state] = WRITABLE; |
| + this[_stateAndFlags] = WRITABLE; |
| this[_storedError] = undefined; |
| this[_writer] = undefined; |
| this[_writableStreamController] = undefined; |
| - this[_pendingWriteRequest] = undefined; |
| - this[_pendingCloseRequest] = undefined; |
| + this[_inFlightWriteRequest] = undefined; |
| + this[_closeRequest] = undefined; |
| + this[_inFlightCloseRequest] = undefined; |
| this[_pendingAbortRequest] = undefined; |
| this[_writeRequests] = new binding.SimpleQueue(); |
| const type = underlyingSink.type; |
| @@ -183,6 +154,7 @@ |
| this[_writableStreamController] = |
| new WritableStreamDefaultController(this, underlyingSink, size, |
| highWaterMark); |
| + WritableStreamDefaultControllerStartSteps(this[_writableStreamController]); |
| } |
| get locked() { |
| @@ -227,36 +199,55 @@ |
| } |
| function WritableStreamAbort(stream, reason) { |
| - const state = stream[_state]; |
| + const state = stream[_stateAndFlags] & STATE_MASK; |
| if (state === CLOSED) { |
| return Promise_resolve(undefined); |
| } |
| if (state === ERRORED) { |
| return Promise_reject(stream[_storedError]); |
| } |
| - TEMP_ASSERT(state === WRITABLE || state === CLOSING, |
| - 'state is "writable" or "closing".'); |
| - const error = new TypeError(errStreamAborted); |
| - WritableStreamError(stream, error); |
| + TEMP_ASSERT(state === WRITABLE, |
| + 'state is "writable".'); |
| + const error = new TypeError(errStreamAborting); |
| + if (stream[_pendingAbortRequest] !== undefined) { |
| + return Promise_reject(error); |
| + } |
| const controller = stream[_writableStreamController]; |
| TEMP_ASSERT(controller !== undefined, |
| 'controller is not undefined'); |
| + if (!WritableStreamHasOperationMarkedInFlight(stream) && |
| + controller[_started]) { |
| + WritableStreamFinishAbort(stream); |
| + return WritableStreamDefaultControllerAbortSteps(controller, reason); |
| + } |
| + const writer = stream[_writer]; |
| + if (writer !== undefined) { |
| + WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, error); |
| + } |
| + const promise = v8.createPromise(); |
| + stream[_pendingAbortRequest] = {promise, reason}; |
| + return promise; |
| + } |
| - const isWriting = getDefaultControllerWritingFlag(controller); |
| - if (isWriting || getDefaultControllerInCloseFlag(controller)) { |
| - const promise = v8.createPromise(); |
| - stream[_pendingAbortRequest] = promise; |
| - |
| - if (isWriting) { |
| - return thenPromise(promise, () => { |
| - return WritableStreamDefaultControllerAbort(controller, reason); |
| - }); |
| + function WritableStreamError(stream, error) { |
| + stream[_stateAndFlags] = (stream[_stateAndFlags] & ~STATE_MASK) | ERRORED; |
| + stream[_storedError] = error; |
| + WritableStreamDefaultControllerErrorSteps(stream[_writableStreamController]); |
| + if (stream[_pendingAbortRequest] === undefined) { |
| + const writer = stream[_writer]; |
| + if (writer !== undefined) { |
| + WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, error); |
| } |
| - return promise; |
| } |
| + if (!WritableStreamHasOperationMarkedInFlight(stream)) { |
| + WritableStreamRejectPromisesInReactionToError(stream); |
| + } |
| + } |
| - return WritableStreamDefaultControllerAbort(controller, reason); |
| + function WritableStreamFinishAbort(stream) { |
| + const error = new TypeError(errStreamAborted); |
| + WritableStreamError(stream, error); |
| } |
| // Writable Stream Abstract Operations Used by Controllers |
| @@ -264,100 +255,182 @@ |
| function WritableStreamAddWriteRequest(stream) { |
| TEMP_ASSERT(IsWritableStreamLocked(stream), |
| '! IsWritableStreamLocked(writer) is true.'); |
| - TEMP_ASSERT(stream[_state] === WRITABLE, |
| + TEMP_ASSERT((stream[_stateAndFlags] & STATE_MASK) === WRITABLE, |
| 'stream.[[state]] is "writable".'); |
| const promise = v8.createPromise(); |
| stream[_writeRequests].push(promise); |
| return promise; |
| } |
| - function WritableStreamError(stream, e) { |
| - const oldState = stream[_state]; |
| - TEMP_ASSERT(oldState === WRITABLE || oldState === CLOSING, |
| - 'oldState is "writable" or "closing".'); |
| + function WritableStreamFinishInFlightWrite(stream) { |
| + TEMP_ASSERT(stream[_inFlightWriteRequest] !== undefined, |
| + '_stream_.[[inFlightWriteRequest]] is not *undefined*.'); |
| + v8.resolvePromise(stream[_inFlightWriteRequest], undefined); |
| + stream[_inFlightWriteRequest] = undefined; |
| + const state = stream[_stateAndFlags] & STATE_MASK; |
| + if (state === ERRORED) { |
| + WritableStreamFinishInFlightWriteInErroredState(stream); |
| + return; |
| + } |
| + TEMP_ASSERT(state === WRITABLE, '_state_ is `"writable"`.'); |
| + WritableStreamHandleAbortRequestIfPending(stream); |
| + } |
| - stream[_state] = ERRORED; |
| - stream[_storedError] = e; |
| + function WritableStreamFinishInFlightWriteInErroredState(stream) { |
| + WritableStreamRejectAbortRequestIfPending(stream); |
| + WritableStreamRejectPromisesInReactionToError(stream); |
| + } |
| - const controller = stream[_writableStreamController]; |
| - if (controller === undefined || |
| - (!getDefaultControllerWritingFlag(controller) && |
| - !getDefaultControllerInCloseFlag(controller))) { |
| - WritableStreamRejectPromisesInReactionToError(stream); |
| + function WritableStreamFinishInFlightWriteWithError(stream, error) { |
| + TEMP_ASSERT(stream[_inFlightWriteRequest] !== undefined, |
| + '_stream_.[[inFlightWriteRequest]] is not *undefined*.'); |
| + v8.rejectPromise(stream[_inFlightWriteRequest], error); |
| + stream[_inFlightWriteRequest] = undefined; |
| + const state = stream[_stateAndFlags] & STATE_MASK; |
| + if (state === ERRORED) { |
| + WritableStreamFinishInFlightWriteInErroredState(stream); |
| + return; |
| } |
| + TEMP_ASSERT(state === WRITABLE, '_state_ is `"writable"`.'); |
| + WritableStreamError(stream, error); |
| + WritableStreamRejectAbortRequestIfPending(stream); |
| + } |
| + function WritableStreamFinishInFlightClose(stream) { |
| + TEMP_ASSERT(stream[_inFlightCloseRequest] !== undefined, |
| + '_stream_.[[inFlightCloseRequest]] is not *undefined*.'); |
| + v8.resolvePromise(stream[_inFlightCloseRequest], undefined); |
| + stream[_inFlightCloseRequest] = undefined; |
| + const state = stream[_stateAndFlags] & STATE_MASK; |
| + if (state === ERRORED) { |
| + WritableStreamFinishInFlightCloseInErroredState(stream); |
| + return; |
| + } |
| + TEMP_ASSERT(state === WRITABLE, '_state_ is `"writable"`.'); |
| + stream[_stateAndFlags] = (stream[_stateAndFlags] & ~STATE_MASK) | CLOSED; |
| const writer = stream[_writer]; |
| if (writer !== undefined) { |
| - if (oldState === WRITABLE && |
| - WritableStreamDefaultControllerGetBackpressure(controller) === true) { |
| - v8.rejectPromise(writer[_readyPromise], e); |
| - } else { |
| - writer[_readyPromise] = Promise_reject(e); |
| - } |
| - v8.markPromiseAsHandled(writer[_readyPromise]); |
| + v8.resolvePromise(writer[_closedPromise], undefined); |
| } |
| + if (stream[_pendingAbortRequest] !== undefined) { |
| + v8.resolvePromise(stream[_pendingAbortRequest].promise, undefined); |
| + stream[_pendingAbortRequest] = undefined; |
| + } |
| + } |
| + |
| + function WritableStreamFinishInFlightCloseInErroredState(stream) { |
| + WritableStreamRejectAbortRequestIfPending(stream); |
| + WritableStreamRejectClosedPromiseInReactionToError(stream); |
| + } |
| + |
| + function WritableStreamFinishInFlightCloseWithError(stream, error) { |
| + TEMP_ASSERT(stream[_inFlightCloseRequest] !== undefined, |
| + '_stream_.[[inFlightCloseRequest]] is not *undefined*.'); |
| + v8.rejectPromise(stream[_inFlightCloseRequest], error); |
| + stream[_inFlightCloseRequest] = undefined; |
| + const state = stream[_stateAndFlags] & STATE_MASK; |
| + if (state === ERRORED) { |
| + WritableStreamFinishInFlightCloseInErroredState(stream); |
| + return; |
| + } |
| + TEMP_ASSERT(state === WRITABLE, '_state_ is `"writable"`.'); |
| + WritableStreamError(stream, error); |
| + WritableStreamRejectAbortRequestIfPending(stream); |
| + } |
| + |
| + function WritableStreamCloseQueuedOrInFlight(stream) { |
| + return stream[_closeRequest] !== undefined || |
| + stream[_inFlightCloseRequest] !== undefined; |
| + } |
| + |
| + function WritableStreamHandleAbortRequestIfPending(stream) { |
| + if (stream[_pendingAbortRequest] === undefined) { |
| + return; |
| + } |
| + WritableStreamFinishAbort(stream); |
| + const abortRequest = stream[_pendingAbortRequest]; |
| + stream[_pendingAbortRequest] = undefined; |
| + const promise = |
| + WritableStreamDefaultControllerAbortSteps(stream[_writableStreamController], |
| + abortRequest.reason); |
| + thenPromise(promise, |
| + result => v8.resolvePromise(abortRequest.promise, result), |
| + reason => v8.rejectPromise(abortRequest.promise, reason)); |
| + } |
| + |
| + function WritableStreamHasOperationMarkedInFlight(stream) { |
| + return stream[_inFlightWriteRequest] !== undefined || |
| + stream[_inFlightCloseRequest] !== undefined; |
| + } |
| + |
| + function WritableStreamMarkCloseRequestInFlight(stream) { |
| + TEMP_ASSERT(stream[_inFlightCloseRequest] === undefined, |
| + '_stream_.[[inFlightCloseRequest]] is *undefined*.'); |
| + TEMP_ASSERT(stream[_closeRequest] !== undefined, |
| + '_stream_.[[closeRequest]] is not *undefined*.'); |
| + stream[_inFlightCloseRequest] = stream[_closeRequest]; |
| + stream[_closeRequest] = undefined; |
| } |
| - function WritableStreamFinishClose(stream) { |
| - const state = stream[_state]; |
| - TEMP_ASSERT(state === CLOSING || state === ERRORED, |
| - 'state is "closing" or "errored"'); |
| + function WritableStreamMarkFirstWriteRequestInFlight(stream) { |
| + TEMP_ASSERT(stream[_inFlightWriteRequest] === undefined, |
| + '_stream_.[[inFlightWriteRequest]] is *undefined*.'); |
| + TEMP_ASSERT(stream[_writeRequests].length !== 0, |
| + '_stream_.[[writeRequests]] is not empty.'); |
| + const writeRequest = stream[_writeRequests].shift(); |
| + stream[_inFlightWriteRequest] = writeRequest; |
| + } |
| + function WritableStreamRejectClosedPromiseInReactionToError(stream) { |
| const writer = stream[_writer]; |
| - if (state === CLOSING) { |
| - if (writer !== undefined) { |
| - v8.resolvePromise(writer[_closedPromise], undefined); |
| - } |
| - stream[_state] = CLOSED; |
| - } else if (writer !== undefined) { |
| - TEMP_ASSERT(state === ERRORED, 'state is "errored"'); |
| + if (writer !== undefined) { |
| v8.rejectPromise(writer[_closedPromise], stream[_storedError]); |
| v8.markPromiseAsHandled(writer[_closedPromise]); |
| } |
| + } |
| + function WritableStreamRejectAbortRequestIfPending(stream) { |
| if (stream[_pendingAbortRequest] !== undefined) { |
| - v8.resolvePromise(stream[_pendingAbortRequest], undefined); |
| + v8.rejectPromise(stream[_pendingAbortRequest].promise, |
| + stream[_storedError]); |
| stream[_pendingAbortRequest] = undefined; |
| } |
| } |
| function WritableStreamRejectPromisesInReactionToError(stream) { |
| - TEMP_ASSERT(stream[_state] === ERRORED, 'stream.[[state]] is "errored"'); |
| - TEMP_ASSERT(stream[_pendingWriteRequest] === undefined, |
| - 'stream.[[pendingWriteRequest]] is undefined'); |
| - |
| const storedError = stream[_storedError]; |
| rejectPromises(stream[_writeRequests], storedError); |
| stream[_writeRequests] = new binding.SimpleQueue(); |
| - if (stream[_pendingCloseRequest] !== undefined) { |
| - TEMP_ASSERT( |
| - getDefaultControllerInCloseFlag(stream[_writableStreamController]) === |
| - false, 'stream.[[writableStreamController]].[[inClose]] === false'); |
| - v8.rejectPromise(stream[_pendingCloseRequest], storedError); |
| - stream[_pendingCloseRequest] = undefined; |
| + if (stream[_closeRequest] !== undefined) { |
| + TEMP_ASSERT(stream[_inFlightCloseRequest] === undefined, |
| + '_stream_.[[inFlightCloseRequest]] is *undefined*.'); |
| + v8.rejectPromise(stream[_closeRequest], storedError); |
| + stream[_closeRequest] = undefined; |
| } |
| - const writer = stream[_writer]; |
| - if (writer !== undefined) { |
| - v8.rejectPromise(writer[_closedPromise], storedError); |
| - v8.markPromiseAsHandled(writer[_closedPromise]); |
| - } |
| + WritableStreamRejectClosedPromiseInReactionToError(stream); |
| } |
| function WritableStreamUpdateBackpressure(stream, backpressure) { |
| - TEMP_ASSERT(stream[_state] === WRITABLE, |
| + TEMP_ASSERT((stream[_stateAndFlags] & STATE_MASK) === WRITABLE, |
| 'stream.[[state]] is "writable".'); |
| + TEMP_ASSERT(!WritableStreamCloseQueuedOrInFlight(stream), |
| + 'WritableStreamCloseQueuedOrInFlight(_stream_) is *false*.'); |
| const writer = stream[_writer]; |
| - if (writer === undefined) { |
| - return; |
| + if (writer !== undefined && |
| + backpressure !== Boolean(stream[_stateAndFlags] & BACKPRESSURE_FLAG)) { |
| + if (backpressure) { |
| + writer[_readyPromise] = v8.createPromise(); |
| + } else { |
| + TEMP_ASSERT(!backpressure, '_backpressure_ is *false*.'); |
| + v8.resolvePromise(writer[_readyPromise], undefined); |
| + } |
| } |
| if (backpressure) { |
| - writer[_readyPromise] = v8.createPromise(); |
| + stream[_stateAndFlags] |= BACKPRESSURE_FLAG; |
| } else { |
| - TEMP_ASSERT(backpressure === false, |
| - 'backpressure is false.'); |
| - v8.resolvePromise(writer[_readyPromise], undefined); |
| + stream[_stateAndFlags] &= ~BACKPRESSURE_FLAG; |
| } |
| } |
| @@ -366,13 +439,14 @@ |
| function isWritableStreamErrored(stream) { |
| TEMP_ASSERT( |
| IsWritableStream(stream), '! IsWritableStream(stream) is true.'); |
| - return stream[_state] === ERRORED; |
| + return (stream[_stateAndFlags] & STATE_MASK) === ERRORED; |
| } |
| function isWritableStreamClosingOrClosed(stream) { |
| TEMP_ASSERT( |
| IsWritableStream(stream), '! IsWritableStream(stream) is true.'); |
| - return stream[_state] === CLOSING || stream[_state] === CLOSED; |
| + return WritableStreamCloseQueuedOrInFlight(stream) || |
| + (stream[_stateAndFlags] & STATE_MASK) === CLOSED; |
| } |
| function getWritableStreamStoredError(stream) { |
| @@ -391,24 +465,30 @@ |
| } |
| this[_ownerWritableStream] = stream; |
| stream[_writer] = this; |
| - const state = stream[_state]; |
| - if (state === WRITABLE || state === CLOSING) { |
| + const state = stream[_stateAndFlags] & STATE_MASK; |
| + if (state === WRITABLE) { |
| + if (stream[_pendingAbortRequest] !== undefined) { |
| + const error = new TypeError(errStreamAborting); |
| + this[_readyPromise] = Promise_reject(error); |
| + v8.markPromiseAsHandled(this[_readyPromise]); |
| + } else if (!WritableStreamCloseQueuedOrInFlight(stream) && |
| + stream[_stateAndFlags] & BACKPRESSURE_FLAG) { |
| + this[_readyPromise] = v8.createPromise(); |
| + } else { |
| + this[_readyPromise] = Promise_resolve(undefined); |
| + } |
| this[_closedPromise] = v8.createPromise(); |
| } else if (state === CLOSED) { |
| + this[_readyPromise] = Promise_resolve(undefined); |
| this[_closedPromise] = Promise_resolve(undefined); |
| } else { |
| - TEMP_ASSERT(state === ERRORED, |
| - 'state is "errored".'); |
| - this[_closedPromise] = Promise_reject(stream[_storedError]); |
| + TEMP_ASSERT(state === ERRORED, '_state_ is `"errored"`.'); |
| + const storedError = stream[_storedError]; |
| + this[_readyPromise] = Promise_reject(storedError); |
| + v8.markPromiseAsHandled(this[_readyPromise]); |
| + this[_closedPromise] = Promise_reject(storedError); |
| v8.markPromiseAsHandled(this[_closedPromise]); |
| } |
| - if (state === WRITABLE && |
| - WritableStreamDefaultControllerGetBackpressure( |
| - stream[_writableStreamController])) { |
| - this[_readyPromise] = v8.createPromise(); |
| - } else { |
| - this[_readyPromise] = Promise_resolve(undefined); |
| - } |
| } |
| get closed() { |
| @@ -453,7 +533,7 @@ |
| if (stream === undefined) { |
| return Promise_reject(createWriterLockReleasedError(verbClosed)); |
| } |
| - if (stream[_state] === CLOSING) { |
| + if (WritableStreamCloseQueuedOrInFlight(stream)) { |
| return Promise_reject(new TypeError(errCloseCloseRequestedStream)); |
| } |
| return WritableStreamDefaultWriterClose(this); |
| @@ -476,13 +556,9 @@ |
| if (!IsWritableStreamDefaultWriter(this)) { |
| return Promise_reject(new TypeError(streamErrors.illegalInvocation)); |
| } |
| - const stream = this[_ownerWritableStream]; |
| - if (stream === undefined) { |
| + if (this[_ownerWritableStream] === undefined) { |
| return Promise_reject(createWriterLockReleasedError(verbWrittenTo)); |
| } |
| - if (stream[_state] === CLOSING) { |
| - return Promise_reject(new TypeError(errWriteCloseRequestedStream)); |
| - } |
| return WritableStreamDefaultWriterWrite(this, chunk); |
| } |
| } |
| @@ -502,30 +578,32 @@ |
| function WritableStreamDefaultWriterClose(writer) { |
| const stream = writer[_ownerWritableStream]; |
| - TEMP_ASSERT(stream !== undefined, |
| - 'stream is not undefined.'); |
| - const state = stream[_state]; |
| + TEMP_ASSERT(stream !== undefined, 'stream is not undefined.'); |
| + const state = stream[_stateAndFlags] & STATE_MASK; |
| if (state === CLOSED || state === ERRORED) { |
| return Promise_reject( |
| createCannotActionOnStateStreamError('close', state)); |
| } |
| - TEMP_ASSERT(state === WRITABLE, |
| - 'state is "writable".'); |
| - stream[_pendingCloseRequest] = v8.createPromise(); |
| - if (WritableStreamDefaultControllerGetBackpressure( |
| - stream[_writableStreamController])) { |
| + if (stream[_pendingAbortRequest] !== undefined) { |
| + return Promise_reject(new TypeError(errStreamAborting)); |
| + } |
| + TEMP_ASSERT(state === WRITABLE, '_state_ is `"writable"`.'); |
| + TEMP_ASSERT(!WritableStreamCloseQueuedOrInFlight(stream), |
| + '! WritableStreamCloseQueuedOrInFlight(_stream_) is *false*.'); |
| + const promise = v8.createPromise(); |
| + stream[_closeRequest] = promise; |
| + if (stream[_stateAndFlags] & BACKPRESSURE_FLAG) { |
| v8.resolvePromise(writer[_readyPromise], undefined); |
| } |
| - stream[_state] = CLOSING; |
| WritableStreamDefaultControllerClose(stream[_writableStreamController]); |
| - return stream[_pendingCloseRequest]; |
| + return promise; |
| } |
| function WritableStreamDefaultWriterCloseWithErrorPropagation(writer) { |
| const stream = writer[_ownerWritableStream]; |
| TEMP_ASSERT(stream !== undefined, 'stream is not undefined.'); |
| - const state = stream[_state]; |
| - if (state === CLOSING || state === CLOSED) { |
| + const state = stream[_stateAndFlags] & STATE_MASK; |
| + if (WritableStreamCloseQueuedOrInFlight(stream) || state === CLOSED) { |
| return Promise_resolve(undefined); |
| } |
| if (state === ERRORED) { |
| @@ -535,10 +613,20 @@ |
| return WritableStreamDefaultWriterClose(writer); |
| } |
| + function WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, |
| + error) { |
| + if (v8.promiseState(writer[_readyPromise]) === v8.kPROMISE_PENDING) { |
| + v8.rejectPromise(writer[_readyPromise], error); |
| + } else { |
| + writer[_readyPromise] = Promise_reject(error); |
| + } |
| + v8.markPromiseAsHandled(writer[_readyPromise]); |
| + } |
| + |
| function WritableStreamDefaultWriterGetDesiredSize(writer) { |
| const stream = writer[_ownerWritableStream]; |
| - const state = stream[_state]; |
| - if (state === ERRORED) { |
| + const state = stream[_stateAndFlags] & STATE_MASK; |
| + if (state === ERRORED || stream[_pendingAbortRequest] !== undefined) { |
| return null; |
| } |
| if (state === CLOSED) { |
| @@ -555,42 +643,46 @@ |
| TEMP_ASSERT(stream[_writer] === writer, |
| 'stream.[[writer]] is writer.'); |
| const releasedError = new TypeError(errReleasedWriterClosedPromise); |
| - const state = stream[_state]; |
| - if (state === WRITABLE || state === CLOSING || |
| - stream[_pendingAbortRequest] !== undefined) { |
| + const state = stream[_stateAndFlags] & STATE_MASK; |
| + WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, |
| + releasedError); |
| + if (state === WRITABLE || |
| + WritableStreamHasOperationMarkedInFlight(stream)) { |
| v8.rejectPromise(writer[_closedPromise], releasedError); |
| } else { |
| writer[_closedPromise] = Promise_reject(releasedError); |
| } |
| v8.markPromiseAsHandled(writer[_closedPromise]); |
| - |
| - if (state === WRITABLE && |
| - WritableStreamDefaultControllerGetBackpressure( |
| - stream[_writableStreamController])) { |
| - v8.rejectPromise(writer[_readyPromise], releasedError); |
| - } else { |
| - writer[_readyPromise] = Promise_reject(releasedError); |
| - } |
| - v8.markPromiseAsHandled(writer[_readyPromise]); |
| - |
| stream[_writer] = undefined; |
| writer[_ownerWritableStream] = undefined; |
| } |
| function WritableStreamDefaultWriterWrite(writer, chunk) { |
| const stream = writer[_ownerWritableStream]; |
| - TEMP_ASSERT(stream !== undefined, |
| - 'stream is not undefined.'); |
| - const state = stream[_state]; |
| - if (state === CLOSED || state === ERRORED) { |
| + TEMP_ASSERT(stream !== undefined, 'stream is not undefined.'); |
| + const controller = stream[_writableStreamController]; |
| + const chunkSize = |
| + WritableStreamDefaultControllerGetChunkSize(controller, chunk); |
| + if (stream !== writer[_ownerWritableStream]) { |
| + return Promise_reject(createWriterLockReleasedError(verbWrittenTo)); |
| + } |
| + const state = stream[_stateAndFlags] & STATE_MASK; |
| + if (state === ERRORED) { |
| + return Promise_reject(stream[_storedError]); |
| + } |
| + if (WritableStreamCloseQueuedOrInFlight(stream)) { |
| + return Promise_reject(new TypeError( |
| + templateErrorCannotActionOnStateStream('write to', 'closing'))); |
| + } |
| + if (state === CLOSED) { |
| return Promise_reject( |
| - createCannotActionOnStateStreamError('write to', state)); |
| + createCannotActionOnStateStreamError('write to', CLOSED)); |
| + } |
| + if (stream[_pendingAbortRequest] !== undefined) { |
| + return Promise_reject(new TypeError(errStreamAborting)); |
| } |
| - TEMP_ASSERT(state === WRITABLE, |
| - 'state is "writable".'); |
| const promise = WritableStreamAddWriteRequest(stream); |
| - WritableStreamDefaultControllerWrite(stream[_writableStreamController], |
| - chunk); |
| + WritableStreamDefaultControllerWrite(controller, chunk, chunkSize); |
| return promise; |
| } |
| @@ -620,34 +712,25 @@ |
| } |
| this[_controlledWritableStream] = stream; |
| this[_underlyingSink] = underlyingSink; |
| - this[_queue] = new binding.SimpleQueue(); |
| - this[_queueSize] = 0; |
| - this[_defaultControllerFlags] = 0; |
| + // These are just initialised to avoid triggering the assert() in |
| + // ResetQueue. They are overwritten by ResetQueue(). |
| + this[_queue] = undefined; |
| + this[_queueTotalSize] = undefined; |
| + ResetQueue(this); |
| + this[_started] = false; |
| const normalizedStrategy = |
| ValidateAndNormalizeQueuingStrategy(size, highWaterMark); |
| this[_strategySize] = normalizedStrategy.size; |
| this[_strategyHWM] = normalizedStrategy.highWaterMark; |
| const backpressure = WritableStreamDefaultControllerGetBackpressure(this); |
| - if (backpressure) { |
| - WritableStreamUpdateBackpressure(stream, backpressure); |
| - } |
| - const controller = this; |
| - const startResult = InvokeOrNoop(underlyingSink, 'start', [this]); |
| - const onFulfilled = () => { |
| - setDefaultControllerStartedFlag(controller, true); |
| - WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller); |
| - }; |
| - const onRejected = r => { |
| - WritableStreamDefaultControllerErrorIfNeeded(controller, r); |
| - }; |
| - thenPromise(Promise_resolve(startResult), onFulfilled, onRejected); |
| + WritableStreamUpdateBackpressure(stream, backpressure); |
| } |
| error(e) { |
| if (!IsWritableStreamDefaultController(this)) { |
| throw new TypeError(streamErrors.illegalInvocation); |
| } |
| - const state = this[_controlledWritableStream][_state]; |
| + const state = this[_controlledWritableStream][_stateAndFlags] & STATE_MASK; |
| if (state === CLOSED || state === ERRORED) { |
| throw createCannotActionOnStateStreamError('error', state); |
| } |
| @@ -655,78 +738,110 @@ |
| } |
| } |
| - // Writable Stream Default Controller Abstract Operations |
| + // Writable Stream Default Controller Internal Methods |
| - function IsWritableStreamDefaultController(x) { |
| - return hasOwnProperty(x, _underlyingSink); |
| - } |
| - |
| - function WritableStreamDefaultControllerAbort(controller, reason) { |
| - controller[_queue] = new binding.SimpleQueue(); |
| - controller[_queueSize] = 0; |
| + // TODO(ricea): Virtual dispatch via V8 Private Symbols seems to be difficult |
| + // or impossible, so use static dispatch for now. This will have to be fixed |
| + // when adding a byte controller. |
| + function WritableStreamDefaultControllerAbortSteps(controller, reason) { |
| const sinkAbortPromise = |
| PromiseInvokeOrNoop(controller[_underlyingSink], 'abort', [reason]); |
| return thenPromise(sinkAbortPromise, () => undefined); |
| } |
| + function WritableStreamDefaultControllerErrorSteps(controller) { |
| + ResetQueue(controller); |
| + } |
| + |
| + function WritableStreamDefaultControllerStartSteps(controller) { |
| + const startResult = |
| + InvokeOrNoop(controller[_underlyingSink], 'start', [controller]); |
| + const stream = controller[_controlledWritableStream]; |
| + const startPromise = Promise_resolve(startResult); |
| + thenPromise( |
| + startPromise, |
| + () => { |
| + controller[_started] = true; |
| + if ((stream[_stateAndFlags] & STATE_MASK) === ERRORED) { |
| + WritableStreamRejectAbortRequestIfPending(stream); |
| + } else { |
| + WritableStreamHandleAbortRequestIfPending(stream); |
| + } |
| + WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller); |
| + }, |
| + r => { |
| + TEMP_ASSERT( |
| + (stream[_stateAndFlags] & STATE_MASK) === WRITABLE || |
| + (stream[_stateAndFlags] & STATE_MASK) === ERRORED, |
| + '_stream_.[[state]] is `"writable"` or `"errored"`.'); |
| + WritableStreamDefaultControllerErrorIfNeeded(controller, r); |
| + WritableStreamRejectAbortRequestIfPending(stream); |
| + }); |
| + } |
| + |
| + // Writable Stream Default Controller Abstract Operations |
| + |
| + function IsWritableStreamDefaultController(x) { |
| + return hasOwnProperty(x, _underlyingSink); |
| + } |
| + |
| function WritableStreamDefaultControllerClose(controller) { |
| - EnqueueValueWithSizeForController(controller, 'close', 0); |
| + EnqueueValueWithSize(controller, 'close', 0); |
| WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller); |
| } |
| + function WritableStreamDefaultControllerGetChunkSize(controller, chunk) { |
| + const strategySize = controller[_strategySize]; |
| + if (strategySize === undefined) { |
| + return 1; |
| + } |
| + let value; |
| + try { |
| + value = Function_call(strategySize, undefined, chunk); |
| + } catch (e) { |
| + WritableStreamDefaultControllerErrorIfNeeded(controller, e); |
| + return 1; |
| + } |
| + return value; |
| + } |
| + |
| function WritableStreamDefaultControllerGetDesiredSize(controller) { |
| - const queueSize = GetTotalQueueSizeForController(controller); |
| - return controller[_strategyHWM] - queueSize; |
| + return controller[_strategyHWM] - controller[_queueTotalSize]; |
| } |
| - function WritableStreamDefaultControllerWrite(controller, chunk) { |
| - const stream = controller[_controlledWritableStream]; |
| - TEMP_ASSERT(stream[_state] === WRITABLE, |
| - 'stream.[[state]] is "writable".'); |
| - let chunkSize = 1; |
| - const strategySize = controller[_strategySize]; |
| - if (strategySize !== undefined) { |
| - try { |
| - chunkSize = strategySize(chunk); |
| - } catch (e) { |
| - WritableStreamDefaultControllerErrorIfNeeded(controller, e); |
| - return; |
| - } |
| - } |
| + function WritableStreamDefaultControllerWrite(controller, chunk, chunkSize) { |
| const writeRecord = {chunk}; |
| - const lastBackpressure = |
| - WritableStreamDefaultControllerGetBackpressure(controller); |
| try { |
| - EnqueueValueWithSizeForController(controller, writeRecord, chunkSize); |
| + EnqueueValueWithSize(controller, writeRecord, chunkSize); |
| } catch (e) { |
| WritableStreamDefaultControllerErrorIfNeeded(controller, e); |
| return; |
| } |
| - if (stream[_state] === WRITABLE) { |
| + const stream = controller[_controlledWritableStream]; |
| + if (!WritableStreamCloseQueuedOrInFlight(stream)) { |
| const backpressure = |
| WritableStreamDefaultControllerGetBackpressure(controller); |
| - if (lastBackpressure !== backpressure) { |
| - WritableStreamUpdateBackpressure(stream, backpressure); |
| - } |
| + WritableStreamUpdateBackpressure(stream, backpressure); |
| } |
| WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller); |
| } |
| function WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller) { |
| - const state = controller[_controlledWritableStream][_state]; |
| + const stream = controller[_controlledWritableStream]; |
| + const state = stream[_stateAndFlags] & STATE_MASK; |
| if (state === CLOSED || state === ERRORED) { |
| return; |
| } |
| - if (!getDefaultControllerStartedFlag(controller)) { |
| + if (!controller[_started]) { |
| return; |
| } |
| - if (getDefaultControllerWritingFlag(controller)) { |
| + if (stream[_inFlightWriteRequest] !== undefined) { |
| return; |
| } |
| if (controller[_queue].length === 0) { |
| return; |
| } |
| - const writeRecord = PeekQueueValue(controller[_queue]); |
| + const writeRecord = PeekQueueValue(controller); |
| if (writeRecord === 'close') { |
| WritableStreamDefaultControllerProcessClose(controller); |
| } else { |
| @@ -735,135 +850,59 @@ |
| } |
| } |
| - function WritableStreamDefaultControllerErrorIfNeeded(controller, e) { |
| - const state = controller[_controlledWritableStream][_state]; |
| - if (state === WRITABLE || state === CLOSING) { |
| - WritableStreamDefaultControllerError(controller, e); |
| + function WritableStreamDefaultControllerErrorIfNeeded(controller, error) { |
| + const state = controller[_controlledWritableStream][_stateAndFlags] & STATE_MASK; |
| + if (state === WRITABLE) { |
| + WritableStreamDefaultControllerError(controller, error); |
| } |
| } |
| function WritableStreamDefaultControllerProcessClose(controller) { |
| const stream = controller[_controlledWritableStream]; |
| - TEMP_ASSERT(stream[_state] === CLOSING, |
| - 'stream.[[state]] is "closing".'); |
| - DequeueValueForController(controller); |
| + WritableStreamMarkCloseRequestInFlight(stream); |
| + DequeueValue(controller); |
| TEMP_ASSERT(controller[_queue].length === 0, |
| 'controller.[[queue]] is empty.'); |
| - setDefaultControllerInCloseFlag(controller, true); |
| const sinkClosePromise = PromiseInvokeOrNoop(controller[_underlyingSink], |
| 'close', [controller]); |
| - thenPromise(sinkClosePromise, |
| - () => { |
| - TEMP_ASSERT(getDefaultControllerInCloseFlag(controller) |
| - === true, |
| - 'controller.[[inClose]] is true'); |
| - setDefaultControllerInCloseFlag(controller, false); |
| - |
| - if (stream[_state] !== CLOSING && |
| - stream[_state] !== ERRORED) { |
| - return; |
| - } |
| - |
| - TEMP_ASSERT(stream[_pendingCloseRequest] !== undefined); |
| - v8.resolvePromise(stream[_pendingCloseRequest], undefined); |
| - stream[_pendingCloseRequest] = undefined; |
| - |
| - WritableStreamFinishClose(stream); |
| - }, |
| - r => { |
| - TEMP_ASSERT(getDefaultControllerInCloseFlag(controller) |
| - === true, |
| - 'controller.[[inClose]] is true'); |
| - setDefaultControllerInCloseFlag(controller, false); |
| - |
| - TEMP_ASSERT(stream[_pendingCloseRequest] !== undefined); |
| - v8.rejectPromise(stream[_pendingCloseRequest], r); |
| - stream[_pendingCloseRequest] = undefined; |
| - |
| - if (stream[_pendingAbortRequest] !== undefined) { |
| - v8.rejectPromise(stream[_pendingAbortRequest], r); |
| - stream[_pendingAbortRequest] = undefined; |
| - } |
| - |
| - WritableStreamDefaultControllerErrorIfNeeded(controller, r); |
| - } |
| - ); |
| + thenPromise( |
| + sinkClosePromise, |
| + () => WritableStreamFinishInFlightClose(stream), |
| + reason => WritableStreamFinishInFlightCloseWithError(stream, reason) |
| + ); |
| } |
| function WritableStreamDefaultControllerProcessWrite(controller, chunk) { |
| - setDefaultControllerWritingFlag(controller, true); |
| - |
| const stream = controller[_controlledWritableStream]; |
| - |
| - TEMP_ASSERT(stream[_pendingWriteRequest] === undefined, |
| - 'stream.[[pendingWriteRequest]] is undefined'); |
| - TEMP_ASSERT(stream[_writeRequests].length > 0, |
| - 'stream.[[writeRequests]] is not empty'); |
| - stream[_pendingWriteRequest] = stream[_writeRequests].shift(); |
| - |
| + WritableStreamMarkFirstWriteRequestInFlight(stream); |
| const sinkWritePromise = PromiseInvokeOrNoop(controller[_underlyingSink], |
| 'write', [chunk, controller]); |
| thenPromise( |
| sinkWritePromise, |
| () => { |
| - TEMP_ASSERT(getDefaultControllerWritingFlag(controller) === true, |
| - 'controller.[[writing]] is true'); |
| - setDefaultControllerWritingFlag(controller, false); |
| - |
| - TEMP_ASSERT(stream[_pendingWriteRequest] !== undefined, |
| - 'stream.[[pendingWriteRequest]] is not undefined'); |
| - v8.resolvePromise(stream[_pendingWriteRequest], undefined); |
| - stream[_pendingWriteRequest] = undefined; |
| - |
| - const state = stream[_state]; |
| + WritableStreamFinishInFlightWrite(stream); |
| + const state = stream[_stateAndFlags] & STATE_MASK; |
| if (state === ERRORED) { |
| - WritableStreamRejectPromisesInReactionToError(stream); |
| - |
| - if (stream[_pendingAbortRequest] !== undefined) { |
| - v8.resolvePromise(stream[_pendingAbortRequest], undefined); |
| - stream[_pendingAbortRequest] = undefined; |
| - } |
| return; |
| } |
| - |
| - const lastBackpressure = |
| - WritableStreamDefaultControllerGetBackpressure(controller); |
| - DequeueValueForController(controller); |
| - |
| - if (state !== CLOSING) { |
| + TEMP_ASSERT(state === WRITABLE, '_state_ is `"writable"`.'); |
| + DequeueValue(controller); |
| + if (!WritableStreamCloseQueuedOrInFlight(stream)) { |
| const backpressure = |
| WritableStreamDefaultControllerGetBackpressure(controller); |
| - if (lastBackpressure !== backpressure) { |
| - WritableStreamUpdateBackpressure( |
| - controller[_controlledWritableStream], backpressure); |
| - } |
| + WritableStreamUpdateBackpressure(stream, backpressure); |
| } |
| - |
| WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller); |
| }, |
| - r => { |
| - TEMP_ASSERT(getDefaultControllerWritingFlag(controller) === true, |
| - 'controller.[[writing]] is true'); |
| - setDefaultControllerWritingFlag(controller, false); |
| - |
| - TEMP_ASSERT(stream[_pendingWriteRequest] !== undefined, |
| - 'stream.[[pendingWriteRequest]] is not undefined'); |
| - v8.rejectPromise(stream[_pendingWriteRequest], r); |
| - stream[_pendingWriteRequest] = undefined; |
| - |
| - if (stream[_state] === ERRORED) { |
| - stream[_storedError] = r; |
| - WritableStreamRejectPromisesInReactionToError(stream); |
| + reason => { |
| + const wasErrored = (stream[_stateAndFlags] & STATE_MASK) === ERRORED; |
| + WritableStreamFinishInFlightWriteWithError(stream, reason); |
| + TEMP_ASSERT((stream[_stateAndFlags] & STATE_MASK) === ERRORED, |
| + '_stream_.[[state]] is `"errored"`.'); |
| + if (!wasErrored) { |
| + ResetQueue(controller); |
|
tyoshino (SeeGerritForStatus)
2017/04/12 04:21:13
In the spec, only [[queue]] is cleared here. Was t
Adam Rice
2017/04/13 05:54:40
I think I intended to update the spec, but it look
tyoshino (SeeGerritForStatus)
2017/04/13 07:38:07
OK. Thanks
|
| } |
| - |
| - if (stream[_pendingAbortRequest] !== undefined) { |
| - v8.rejectPromise(stream[_pendingAbortRequest], r); |
| - stream[_pendingAbortRequest] = undefined; |
| - } |
| - |
| - WritableStreamDefaultControllerErrorIfNeeded(controller, r); |
| - } |
| - ); |
| + }); |
| } |
| function WritableStreamDefaultControllerGetBackpressure(controller) { |
| @@ -872,45 +911,67 @@ |
| return desiredSize <= 0; |
| } |
| - function WritableStreamDefaultControllerError(controller, e) { |
| + function WritableStreamDefaultControllerError(controller, error) { |
| const stream = controller[_controlledWritableStream]; |
| - const state = stream[_state]; |
| - TEMP_ASSERT(state === WRITABLE || state === CLOSING, |
| - 'stream.[[state]] is "writable" or "closing".'); |
| - WritableStreamError(stream, e); |
| - controller[_queue] = new binding.SimpleQueue(); |
| - controller[_queueSize] = 0; |
| + TEMP_ASSERT((stream[_stateAndFlags] & STATE_MASK) === WRITABLE, |
| + '_stream_.[[state]] is `"writable"`.'); |
| + WritableStreamError(stream, error); |
| } |
| // Queue-with-Sizes Operations |
| // |
| // TODO(ricea): Share these operations with ReadableStream.js. |
| - function DequeueValueForController(controller) { |
| - TEMP_ASSERT(controller[_queue].length !== 0, |
| - 'queue is not empty.'); |
| - const result = controller[_queue].shift(); |
| - controller[_queueSize] -= result.size; |
| - return result.value; |
| + function DequeueValue(container) { |
| + TEMP_ASSERT( |
| + hasOwnProperty(container, _queue) && |
| + hasOwnProperty(container, _queueTotalSize), |
| + 'Assert: _container_ has [[queue]] and [[queueTotalSize]] internal ' + |
| + 'slots.'); |
| + TEMP_ASSERT(container[_queue].length !== 0, |
| + '_container_.[[queue]] is not empty.'); |
| + const pair = container[_queue].shift(); |
| + container[_queueTotalSize] -= pair.size; |
| + if (container[_queueTotalSize] < 0) { |
| + container[_queueTotalSize] = 0; |
| + } |
| + return pair.value; |
| } |
| - function EnqueueValueWithSizeForController(controller, value, size) { |
| + function EnqueueValueWithSize(container, value, size) { |
| + TEMP_ASSERT( |
| + hasOwnProperty(container, _queue) && |
| + hasOwnProperty(container, _queueTotalSize), |
| + 'Assert: _container_ has [[queue]] and [[queueTotalSize]] internal ' + |
| + 'slots.'); |
| size = Number(size); |
| if (!IsFiniteNonNegativeNumber(size)) { |
| throw new RangeError(streamErrors.invalidSize); |
| } |
| - controller[_queueSize] += size; |
| - controller[_queue].push({value, size}); |
| + container[_queue].push({value, size}); |
| + container[_queueTotalSize] += size; |
| } |
| - function GetTotalQueueSizeForController(controller) { |
| - return controller[_queueSize]; |
| + function PeekQueueValue(container) { |
| + TEMP_ASSERT( |
| + hasOwnProperty(container, _queue) && |
| + hasOwnProperty(container, _queueTotalSize), |
| + 'Assert: _container_ has [[queue]] and [[queueTotalSize]] internal ' + |
| + 'slots.'); |
| + TEMP_ASSERT(container[_queue].length !== 0, |
| + '_container_.[[queue]] is not empty.'); |
| + const pair = container[_queue].peek(); |
| + return pair.value; |
| } |
| - function PeekQueueValue(queue) { |
| - TEMP_ASSERT(queue.length !== 0, |
| - 'queue is not empty.'); |
| - return queue.peek().value; |
| + function ResetQueue(container) { |
| + TEMP_ASSERT( |
| + hasOwnProperty(container, _queue) && |
| + hasOwnProperty(container, _queueTotalSize), |
| + 'Assert: _container_ has [[queue]] and [[queueTotalSize]] internal ' + |
| + 'slots.'); |
| + container[_queue] = new binding.SimpleQueue(); |
| + container[_queueTotalSize] = 0; |
| } |
| // Miscellaneous Operations |