Chromium Code Reviews| Index: third_party/WebKit/Source/core/streams/ReadableStream.js |
| diff --git a/third_party/WebKit/Source/core/streams/ReadableStream.js b/third_party/WebKit/Source/core/streams/ReadableStream.js |
| index 1586b0e61500db517bc361c1b456f8867c0db6a7..58f94b9a8105dcb47d60fa3e192b02ffb3cd27c3 100644 |
| --- a/third_party/WebKit/Source/core/streams/ReadableStream.js |
| +++ b/third_party/WebKit/Source/core/streams/ReadableStream.js |
| @@ -49,6 +49,7 @@ |
| const defineProperty = global.Object.defineProperty; |
| const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty); |
| const callFunction = v8.uncurryThis(global.Function.prototype.call); |
| + const applyFunction = v8.uncurryThis(global.Function.prototype.apply); |
| const TypeError = global.TypeError; |
| const RangeError = global.RangeError; |
| @@ -93,6 +94,9 @@ |
| const errTmplMustBeFunctionOrUndefined = name => |
| `${name} must be a function or undefined`; |
| + const errCannotPipeLockedStream = 'Cannot pipe a locked stream'; |
| + const errCannotPipeToALockedStream = 'Cannot pipe to a locked stream'; |
| + const errDestinationStreamClosed = 'Destination stream closed'; |
| class ReadableStream { |
| constructor() { |
| @@ -181,6 +185,193 @@ |
| } |
| } |
| + // TODO(ricea): Move this into the class definition once it ships. |
| + function ReadableStream_prototype_pipeThrough({writable, readable}, options) { |
| + this.pipeTo(writable, options); |
| + return readable; |
| + } |
| + |
| + // TODO(ricea): Move this into the class definition once it ships. |
| + function ReadableStream_prototype_pipeTo( |
| + dest, { preventClose, preventAbort, preventCancel } = {}) { |
| + if (!IsReadableStream(this)) { |
| + return Promise_reject(new TypeError(streamErrors.illegalInvocation)); |
| + } |
| + |
| + if (!binding.IsWritableStream(dest)) { |
| + // TODO(ricea): Think about having a better error message. |
| + return Promise_reject(new TypeError(streamErrors.illegalInvocation)); |
| + } |
| + |
| + preventClose = Boolean(preventClose); |
| + preventAbort = Boolean(preventAbort); |
| + preventCancel = Boolean(preventCancel); |
| + |
| + const readable = this; |
| + if (IsReadableStreamLocked(readable)) { |
| + return Promise_reject(new TypeError(errCannotPipeLockedStream)); |
| + } |
| + |
| + if (binding.IsWritableStreamLocked(dest)) { |
| + return Promise_reject(new TypeError(errCannotPipeToALockedStream)); |
| + } |
| + |
| + const reader = AcquireReadableStreamDefaultReader(readable); |
| + const writer = binding.AcquireWritableStreamDefaultWriter(dest); |
| + let shuttingDown = false; |
| + const promise = v8.createPromise(); |
| + let reading = false; |
| + |
| + if (initialStateOk()) { |
| + // Need to detect closing and error when we are not reading. |
| + thenPromise(reader[_closedPromise], readableClosed, readableError); |
| + // Need to detect error when we are not writing. |
| + thenPromise(binding.WritableStreamDefaultWriterClosedPromise(writer), |
| + undefined, writableError); |
| + pump(); |
| + } |
| + |
| + function initialStateOk() { |
|
domenic
2017/01/18 23:51:17
It might make sense to pull some of these out to t
Adam Rice
2017/01/19 14:02:47
This function calls state-changing functions, whic
|
| + const state = ReadableStreamGetState(readable); |
| + if (state === STATE_ERRORED) { |
| + readableError(); |
| + return false; |
| + } |
| + |
| + if (binding.IsWritableStreamErrored(dest)) { |
|
domenic
2017/01/18 23:51:17
Maybe since these are not abstract operation names
Adam Rice
2017/01/19 14:02:47
It's not great for the getters, because I don't wa
|
| + writableError(); |
| + return false; |
| + } |
| + |
| + if (state === STATE_CLOSED) { |
| + readableClosed(); |
| + return false; |
| + } |
| + |
| + if (binding.IsWritableStreamClosingOrClosed(dest)) { |
| + writableStartedClosed(); |
| + return false; |
| + } |
| + |
| + return true; |
| + } |
| + |
| + function pump() { |
| + if (shuttingDown) { |
| + return; |
| + } |
| + const desiredSize = |
| + binding.WritableStreamDefaultWriterGetDesiredSize(writer); |
| + if (desiredSize === null) { |
| + writableError(); |
| + } |
| + if (desiredSize <= 0) { |
| + thenPromise(binding.WritableStreamDefaultWriterReadyPromise(writer), |
| + pump, writableError); |
| + return; |
| + } |
| + reading = true; |
| + // TODO(ricea): Delay reads heuristically when desiredSize is low. |
| + thenPromise(ReadableStreamDefaultReaderRead(reader), readResolved, |
| + readRejected); |
| + } |
| + |
| + function readResolved({value, done}) { |
|
domenic
2017/01/18 23:51:17
Nit: "fulfilled" is more correct than "resolved" h
Adam Rice
2017/01/19 14:02:47
I hadn't seen that document before. It's cleared u
|
| + reading = false; |
| + if (shuttingDown) { |
| + return; |
| + } |
| + if (done) { |
| + readableClosed(); |
| + return; |
| + } |
| + const write = binding.WritableStreamDefaultWriterWrite(writer, value); |
| + thenPromise(write, undefined, writableError); |
| + pump(); |
| + } |
| + |
| + function readRejected() { |
| + reading = false; |
| + readableError(); |
| + } |
| + |
| + function readableError() { |
| + if (!preventAbort) { |
| + shutdownWithAction(binding.WritableStreamAbort, |
| + [dest, readable[_storedError]], |
| + readable[_storedError]); |
| + } else { |
| + shutdown(readable[_storedError]); |
| + } |
| + } |
| + |
| + function writableError() { |
| + const storedError = binding.WritableStreamStoredError(dest); |
| + if (!preventCancel) { |
| + shutdownWithAction(ReadableStreamCancel, [readable, |
| + storedError], storedError); |
| + } else { |
| + shutdown(storedError); |
| + } |
| + } |
| + |
| + function readableClosed() { |
| + if (reading) { |
| + // Handle the close status from the read() method rather than the |
| + // [[closedPromise]]. |
| + return; |
| + } |
| + if (!preventClose) { |
| + shutdownWithAction( |
| + binding.WritableStreamDefaultWriterCloseWithErrorPropagation, |
| + [writer]); |
| + } else { |
| + shutdown(); |
| + } |
| + } |
| + |
| + function writableStartedClosed() { |
| + const destClosed = new TypeError(errDestinationStreamClosed); |
| + if (!preventCancel) { |
| + shutdownWithAction(ReadableStreamCancel, [readable, |
| + destClosed], destClosed); |
| + } else { |
| + shutdown(destClosed); |
| + } |
| + } |
| + |
| + function shutdownWithAction(action, args, originalError = undefined) { |
| + if (shuttingDown) { |
| + return; |
| + } |
| + shuttingDown = true; |
| + const p = applyFunction(action, undefined, args); |
| + thenPromise(p, |
| + () => finalize(originalError), |
| + newError => finalize(newError)); |
| + } |
| + |
| + function shutdown(error = undefined) { |
| + if (shuttingDown) { |
| + return; |
| + } |
| + shuttingDown = true; |
| + finalize(error); |
| + } |
| + |
| + function finalize(error) { |
| + binding.WritableStreamDefaultWriterRelease(writer); |
| + ReadableStreamReaderGenericRelease(reader); |
| + if (error !== undefined) { |
|
domenic
2017/01/18 23:51:17
This seems like it will not work correctly when th
Adam Rice
2017/01/19 14:02:47
Yep. I went with passing booleans around as well.
|
| + v8.rejectPromise(promise, error); |
| + } else { |
| + v8.resolvePromise(promise, undefined); |
| + } |
| + } |
| + |
| + return promise; |
| + } |
| + |
| class ReadableStreamDefaultController { |
| constructor(stream, underlyingSource, size, highWaterMark, isExternallyControlled) { |
| if (IsReadableStream(stream) === false) { |
| @@ -911,4 +1102,9 @@ |
| return new ReadableStream( |
| underlyingSource, strategy, createWithExternalControllerSentinel); |
| }; |
| + |
| + // Temporary exports while pipeTo() and pipeThrough() are behind flags |
| + binding.ReadableStream_prototype_pipeThrough = |
| + ReadableStream_prototype_pipeThrough; |
| + binding.ReadableStream_prototype_pipeTo = ReadableStream_prototype_pipeTo; |
| }); |