Chromium Code Reviews| Index: third_party/WebKit/Source/core/streams/ReadableStream.js |
| diff --git a/third_party/WebKit/Source/core/streams/ReadableStream.js b/third_party/WebKit/Source/core/streams/ReadableStream.js |
| index 1586b0e61500db517bc361c1b456f8867c0db6a7..5b4dbce02d1316cbdbfffa7d3e22c9b2a6fdb7c0 100644 |
| --- a/third_party/WebKit/Source/core/streams/ReadableStream.js |
| +++ b/third_party/WebKit/Source/core/streams/ReadableStream.js |
| @@ -49,6 +49,7 @@ |
| const defineProperty = global.Object.defineProperty; |
| const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty); |
| const callFunction = v8.uncurryThis(global.Function.prototype.call); |
| + const applyFunction = v8.uncurryThis(global.Function.prototype.apply); |
| const TypeError = global.TypeError; |
| const RangeError = global.RangeError; |
| @@ -93,6 +94,9 @@ |
| const errTmplMustBeFunctionOrUndefined = name => |
| `${name} must be a function or undefined`; |
| + const errCannotPipeLockedStream = 'Cannot pipe a locked stream'; |
| + const errCannotPipeToALockedStream = 'Cannot pipe to a locked stream'; |
| + const errDestinationStreamClosed = 'Destination stream closed'; |
| class ReadableStream { |
| constructor() { |
| @@ -181,6 +185,194 @@ |
| } |
| } |
| + // TODO(ricea): Move this into the class definition once it ships. |
| + function ReadableStream_prototype_pipeThrough({writable, readable}, options) { |
| + this.pipeTo(writable, options); |
| + return readable; |
| + } |
| + |
| + // TODO(ricea): Move this into the class definition once it ships. |
| + function ReadableStream_prototype_pipeTo( |
| + dest, { preventClose, preventAbort, preventCancel } = {}) { |
| + if (!IsReadableStream(this)) { |
| + return Promise_reject(new TypeError(streamErrors.illegalInvocation)); |
| + } |
| + |
| + if (!binding.IsWritableStream(dest)) { |
| + // TODO(ricea): Think about having a better error message. |
| + return Promise_reject(new TypeError(streamErrors.illegalInvocation)); |
| + } |
| + |
| + preventClose = Boolean(preventClose); |
| + preventAbort = Boolean(preventAbort); |
| + preventCancel = Boolean(preventCancel); |
| + |
| + const readable = this; |
| + if (IsReadableStreamLocked(readable)) { |
| + return Promise_reject(new TypeError(errCannotPipeLockedStream)); |
| + } |
| + |
| + if (binding.IsWritableStreamLocked(dest)) { |
| + return Promise_reject(new TypeError(errCannotPipeToALockedStream)); |
| + } |
| + |
| + const reader = AcquireReadableStreamDefaultReader(readable); |
| + const writer = binding.AcquireWritableStreamDefaultWriter(dest); |
| + let shuttingDown = false; |
| + const promise = v8.createPromise(); |
| + let reading = false; |
| + |
| + if (initialStateOk()) { |
| + // Need to detect closing and error when we are not reading. |
| + thenPromise(reader[_closedPromise], readableClosed, readableError); |
| + // Need to detect error when we are not writing. |
| + thenPromise(binding.getWritableStreamDefaultWriterClosedPromise(writer), |
| + undefined, writableError); |
| + pump(); |
| + } |
| + |
| + function initialStateOk() { |
|
tyoshino (SeeGerritForStatus)
2017/01/23 10:42:57
This function name sounds like checking states and
Adam Rice
2017/01/23 13:24:30
Done.
|
| + const state = ReadableStreamGetState(readable); |
| + if (state === STATE_ERRORED) { |
| + readableError(); |
| + return false; |
| + } |
| + |
| + if (binding.isWritableStreamErrored(dest)) { |
| + writableError(); |
| + return false; |
| + } |
|
tyoshino (SeeGerritForStatus)
2017/01/23 10:42:57
are these error check blocks for optimization?
Adam Rice
2017/01/23 13:24:30
They are to perform the checks in the correct orde
tyoshino (SeeGerritForStatus)
2017/01/24 09:52:33
Oh, thanks for the pointer. Sorry for being unawar
|
| + |
| + if (state === STATE_CLOSED) { |
| + readableClosed(); |
|
tyoshino (SeeGerritForStatus)
2017/01/23 10:42:57
how about calling factoring out the latter half of
Adam Rice
2017/01/23 13:24:30
The first check in readableClosed() didn't belong
|
| + return false; |
| + } |
|
tyoshino (SeeGerritForStatus)
2017/01/23 10:42:57
Is this an optimization or to have pipeTo finish s
Adam Rice
2017/01/23 13:24:30
It's purely to perform the checks in the correct o
|
| + |
| + if (binding.isWritableStreamClosingOrClosed(dest)) { |
| + writableStartedClosed(); |
| + return false; |
| + } |
| + |
| + return true; |
| + } |
| + |
| + function pump() { |
| + if (shuttingDown) { |
| + return; |
| + } |
| + const desiredSize = |
| + binding.WritableStreamDefaultWriterGetDesiredSize(writer); |
| + if (desiredSize === null) { |
| + writableError(); |
| + } |
| + if (desiredSize <= 0) { |
| + thenPromise(binding.getWritableStreamDefaultWriterReadyPromise(writer), |
| + pump, writableError); |
| + return; |
| + } |
| + reading = true; |
| + // TODO(ricea): Delay reads heuristically when desiredSize is low. |
| + thenPromise(ReadableStreamDefaultReaderRead(reader), readFulfilled, |
| + readRejected); |
| + } |
| + |
| + function readFulfilled({value, done}) { |
| + reading = false; |
| + if (shuttingDown) { |
| + return; |
| + } |
| + if (done) { |
| + readableClosed(); |
|
tyoshino (SeeGerritForStatus)
2017/01/23 10:42:57
how about calling factoring out the latter half of
Adam Rice
2017/01/23 13:24:30
I removed the first half instead, to make the oper
|
| + return; |
| + } |
| + const write = binding.WritableStreamDefaultWriterWrite(writer, value); |
| + thenPromise(write, undefined, writableError); |
| + pump(); |
| + } |
| + |
| + function readRejected() { |
| + reading = false; |
| + readableError(); |
| + } |
| + |
| + function readableError() { |
| + if (!preventAbort) { |
| + shutdownWithAction(binding.WritableStreamAbort, |
| + [dest, readable[_storedError]], |
| + readable[_storedError], true); |
| + } else { |
| + shutdown(readable[_storedError], true); |
| + } |
| + } |
| + |
| + function writableError() { |
| + const storedError = binding.getWritableStreamStoredError(dest); |
|
tyoshino (SeeGerritForStatus)
2017/01/23 10:42:57
can't we use the rejection value? or intentionally
Adam Rice
2017/01/23 13:24:30
I'm not sure about this, because now we're looking
|
| + if (!preventCancel) { |
| + shutdownWithAction(ReadableStreamCancel, [readable, storedError], |
| + storedError, true); |
| + } else { |
| + shutdown(storedError, true); |
| + } |
| + } |
| + |
| + function readableClosed() { |
| + if (reading) { |
| + // Handle the close status from the read() method rather than the |
| + // [[closedPromise]]. |
| + return; |
| + } |
| + if (!preventClose) { |
| + shutdownWithAction( |
| + binding.WritableStreamDefaultWriterCloseWithErrorPropagation, |
| + [writer]); |
| + } else { |
| + shutdown(); |
| + } |
| + } |
| + |
| + function writableStartedClosed() { |
| + const destClosed = new TypeError(errDestinationStreamClosed); |
| + if (!preventCancel) { |
| + shutdownWithAction(ReadableStreamCancel, [readable, destClosed], |
| + destClosed, true); |
| + } else { |
| + shutdown(destClosed, true); |
| + } |
| + } |
| + |
| + function shutdownWithAction(action, args, originalError = undefined, |
| + errorGiven = false) { |
|
tyoshino (SeeGerritForStatus)
2017/01/23 10:42:57
default argument is used for better correspondence
Adam Rice
2017/01/23 13:24:30
Exactly. This worked better before Domenic pointed
|
| + if (shuttingDown) { |
| + return; |
| + } |
| + shuttingDown = true; |
| + const p = applyFunction(action, undefined, args); |
| + thenPromise(p, |
| + () => finalize(originalError, errorGiven), |
| + newError => finalize(newError, true)); |
| + } |
|
tyoshino (SeeGerritForStatus)
2017/01/23 10:42:57
ah, we need to update the reference implementation
Adam Rice
2017/01/23 13:24:30
Yes. I discussed this with Domenic but it looks li
|
| + |
| + function shutdown(error = undefined, errorGiven = false) { |
| + if (shuttingDown) { |
| + return; |
| + } |
| + shuttingDown = true; |
| + finalize(error, errorGiven); |
| + } |
| + |
| + function finalize(error, errorGiven) { |
| + binding.WritableStreamDefaultWriterRelease(writer); |
| + ReadableStreamReaderGenericRelease(reader); |
| + if (errorGiven) { |
| + v8.rejectPromise(promise, error); |
| + } else { |
| + v8.resolvePromise(promise, undefined); |
| + } |
| + } |
| + |
| + return promise; |
| + } |
| + |
| class ReadableStreamDefaultController { |
| constructor(stream, underlyingSource, size, highWaterMark, isExternallyControlled) { |
| if (IsReadableStream(stream) === false) { |
| @@ -911,4 +1103,9 @@ |
| return new ReadableStream( |
| underlyingSource, strategy, createWithExternalControllerSentinel); |
| }; |
| + |
| + // Temporary exports while pipeTo() and pipeThrough() are behind flags |
| + binding.ReadableStream_prototype_pipeThrough = |
| + ReadableStream_prototype_pipeThrough; |
| + binding.ReadableStream_prototype_pipeTo = ReadableStream_prototype_pipeTo; |
| }); |