| Index: third_party/WebKit/Source/core/streams/ReadableStream.js
|
| diff --git a/third_party/WebKit/Source/core/streams/ReadableStream.js b/third_party/WebKit/Source/core/streams/ReadableStream.js
|
| index b8d33caf451f90ce1511a4da69284ec61148c0c1..50b377508ab23d4d770dbc6b049f72a2bef276e2 100644
|
| --- a/third_party/WebKit/Source/core/streams/ReadableStream.js
|
| +++ b/third_party/WebKit/Source/core/streams/ReadableStream.js
|
| @@ -49,6 +49,7 @@
|
| const defineProperty = global.Object.defineProperty;
|
| const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty);
|
| const callFunction = v8.uncurryThis(global.Function.prototype.call);
|
| + const applyFunction = v8.uncurryThis(global.Function.prototype.apply);
|
|
|
| const TypeError = global.TypeError;
|
| const RangeError = global.RangeError;
|
| @@ -93,6 +94,9 @@
|
|
|
| const errTmplMustBeFunctionOrUndefined = name =>
|
| `${name} must be a function or undefined`;
|
| + const errCannotPipeLockedStream = 'Cannot pipe a locked stream';
|
| + const errCannotPipeToALockedStream = 'Cannot pipe to a locked stream';
|
| + const errDestinationStreamClosed = 'Destination stream closed';
|
|
|
| class ReadableStream {
|
| constructor() {
|
| @@ -181,6 +185,207 @@
|
| }
|
| }
|
|
|
| + // TODO(ricea): Move this into the class definition once it ships.
|
| + function ReadableStream_prototype_pipeThrough({writable, readable}, options) {
|
| + this.pipeTo(writable, options);
|
| + return readable;
|
| + }
|
| +
|
| + // TODO(ricea): Move this into the class definition once it ships.
|
| + function ReadableStream_prototype_pipeTo(
|
| + dest, {preventClose, preventAbort, preventCancel} = {}) {
|
| + if (!IsReadableStream(this)) {
|
| + return Promise_reject(new TypeError(streamErrors.illegalInvocation));
|
| + }
|
| +
|
| + if (!binding.IsWritableStream(dest)) {
|
| + // TODO(ricea): Think about having a better error message.
|
| + return Promise_reject(new TypeError(streamErrors.illegalInvocation));
|
| + }
|
| +
|
| + preventClose = Boolean(preventClose);
|
| + preventAbort = Boolean(preventAbort);
|
| + preventCancel = Boolean(preventCancel);
|
| +
|
| + const readable = this;
|
| + if (IsReadableStreamLocked(readable)) {
|
| + return Promise_reject(new TypeError(errCannotPipeLockedStream));
|
| + }
|
| +
|
| + if (binding.IsWritableStreamLocked(dest)) {
|
| + return Promise_reject(new TypeError(errCannotPipeToALockedStream));
|
| + }
|
| +
|
| + const reader = AcquireReadableStreamDefaultReader(readable);
|
| + const writer = binding.AcquireWritableStreamDefaultWriter(dest);
|
| + let shuttingDown = false;
|
| + const promise = v8.createPromise();
|
| + let reading = false;
|
| +
|
| + if (checkInitialState()) {
|
| + // Need to detect closing and error when we are not reading.
|
| + thenPromise(reader[_closedPromise], onReaderClosed, readableError);
|
| + // Need to detect error when we are not writing.
|
| + thenPromise(
|
| + binding.getWritableStreamDefaultWriterClosedPromise(writer),
|
| + undefined, writableError);
|
| + pump();
|
| + }
|
| +
|
| + // Checks the state of the streams and executes the shutdown handlers if
|
| + // necessary. Returns true if piping can continue.
|
| + function checkInitialState() {
|
| + const state = ReadableStreamGetState(readable);
|
| +
|
| + // Both streams can be errored or closed. To perform the right action the
|
| + // order of the checks must match the standard.
|
| + if (state === STATE_ERRORED) {
|
| + readableError(readable[_storedError]);
|
| + return false;
|
| + }
|
| +
|
| + if (binding.isWritableStreamErrored(dest)) {
|
| + writableError(binding.getWritableStreamStoredError(dest));
|
| + return false;
|
| + }
|
| +
|
| + if (state === STATE_CLOSED) {
|
| + readableClosed();
|
| + return false;
|
| + }
|
| +
|
| + if (binding.isWritableStreamClosingOrClosed(dest)) {
|
| + writableStartedClosed();
|
| + return false;
|
| + }
|
| +
|
| + return true;
|
| + }
|
| +
|
| + function pump() {
|
| + if (shuttingDown) {
|
| + return;
|
| + }
|
| + const desiredSize =
|
| + binding.WritableStreamDefaultWriterGetDesiredSize(writer);
|
| + if (desiredSize === null) {
|
| + writableError(binding.getWritableStreamStoredError(dest));
|
| + }
|
| + if (desiredSize <= 0) {
|
| + thenPromise(
|
| + binding.getWritableStreamDefaultWriterReadyPromise(writer), pump,
|
| + writableError);
|
| + return;
|
| + }
|
| + reading = true;
|
| + // TODO(ricea): Delay reads heuristically when desiredSize is low.
|
| + thenPromise(
|
| + ReadableStreamDefaultReaderRead(reader), readFulfilled, readRejected);
|
| + }
|
| +
|
| + function readFulfilled({value, done}) {
|
| + reading = false;
|
| + if (shuttingDown) {
|
| + return;
|
| + }
|
| + if (done) {
|
| + readableClosed();
|
| + return;
|
| + }
|
| + const write = binding.WritableStreamDefaultWriterWrite(writer, value);
|
| + thenPromise(write, undefined, writableError);
|
| + pump();
|
| + }
|
| +
|
| + function readRejected() {
|
| + reading = false;
|
| + readableError(readable[_storedError]);
|
| + }
|
| +
|
| + // If read() is in progress, then wait for it to tell us that the stream is
|
| + // closed so that we write all the data before shutdown.
|
| + function onReaderClosed() {
|
| + if (!reading) {
|
| + readableClosed();
|
| + }
|
| + }
|
| +
|
| + // These steps are from "Errors must be propagated forward" in the
|
| + // standard.
|
| + function readableError(error) {
|
| + if (!preventAbort) {
|
| + shutdownWithAction(
|
| + binding.WritableStreamAbort, [dest, error], error, true);
|
| + } else {
|
| + shutdown(error, true);
|
| + }
|
| + }
|
| +
|
| + // These steps are from "Errors must be propagated backward".
|
| + function writableError(error) {
|
| + if (!preventCancel) {
|
| + shutdownWithAction(
|
| + ReadableStreamCancel, [readable, error], error, true);
|
| + } else {
|
| + shutdown(error, true);
|
| + }
|
| + }
|
| +
|
| + // These steps are from "Closing must be propagated forward".
|
| + function readableClosed() {
|
| + if (!preventClose) {
|
| + shutdownWithAction(
|
| + binding.WritableStreamDefaultWriterCloseWithErrorPropagation,
|
| + [writer]);
|
| + } else {
|
| + shutdown();
|
| + }
|
| + }
|
| +
|
| + // These steps are from "Closing must be propagated backward".
|
| + function writableStartedClosed() {
|
| + const destClosed = new TypeError(errDestinationStreamClosed);
|
| + if (!preventCancel) {
|
| + shutdownWithAction(
|
| + ReadableStreamCancel, [readable, destClosed], destClosed, true);
|
| + } else {
|
| + shutdown(destClosed, true);
|
| + }
|
| + }
|
| +
|
| + function shutdownWithAction(
|
| + action, args, originalError = undefined, errorGiven = false) {
|
| + if (shuttingDown) {
|
| + return;
|
| + }
|
| + shuttingDown = true;
|
| + const p = applyFunction(action, undefined, args);
|
| + thenPromise(
|
| + p, () => finalize(originalError, errorGiven),
|
| + newError => finalize(newError, true));
|
| + }
|
| +
|
| + function shutdown(error = undefined, errorGiven = false) {
|
| + if (shuttingDown) {
|
| + return;
|
| + }
|
| + shuttingDown = true;
|
| + finalize(error, errorGiven);
|
| + }
|
| +
|
| + function finalize(error, errorGiven) {
|
| + binding.WritableStreamDefaultWriterRelease(writer);
|
| + ReadableStreamReaderGenericRelease(reader);
|
| + if (errorGiven) {
|
| + v8.rejectPromise(promise, error);
|
| + } else {
|
| + v8.resolvePromise(promise, undefined);
|
| + }
|
| + }
|
| +
|
| + return promise;
|
| + }
|
| +
|
| class ReadableStreamDefaultController {
|
| constructor(stream, underlyingSource, size, highWaterMark, isExternallyControlled) {
|
| if (IsReadableStream(stream) === false) {
|
| @@ -973,4 +1178,9 @@
|
| return new ReadableStream(
|
| underlyingSource, strategy, createWithExternalControllerSentinel);
|
| };
|
| +
|
| + // Temporary exports while pipeTo() and pipeThrough() are behind flags
|
| + binding.ReadableStream_prototype_pipeThrough =
|
| + ReadableStream_prototype_pipeThrough;
|
| + binding.ReadableStream_prototype_pipeTo = ReadableStream_prototype_pipeTo;
|
| });
|
|
|