Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(275)

Unified Diff: third_party/WebKit/Source/core/streams/ReadableStream.js

Issue 2561443004: Implementation of ReadableStream pipeTo and pipeThrough (Closed)
Patch Set: Rebase and move includes to .gn Created 3 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/WebKit/Source/core/streams/ReadableStream.js
diff --git a/third_party/WebKit/Source/core/streams/ReadableStream.js b/third_party/WebKit/Source/core/streams/ReadableStream.js
index b8d33caf451f90ce1511a4da69284ec61148c0c1..50b377508ab23d4d770dbc6b049f72a2bef276e2 100644
--- a/third_party/WebKit/Source/core/streams/ReadableStream.js
+++ b/third_party/WebKit/Source/core/streams/ReadableStream.js
@@ -49,6 +49,7 @@
const defineProperty = global.Object.defineProperty;
const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty);
const callFunction = v8.uncurryThis(global.Function.prototype.call);
+ const applyFunction = v8.uncurryThis(global.Function.prototype.apply);
const TypeError = global.TypeError;
const RangeError = global.RangeError;
@@ -93,6 +94,9 @@
const errTmplMustBeFunctionOrUndefined = name =>
`${name} must be a function or undefined`;
+ const errCannotPipeLockedStream = 'Cannot pipe a locked stream';
+ const errCannotPipeToALockedStream = 'Cannot pipe to a locked stream';
+ const errDestinationStreamClosed = 'Destination stream closed';
class ReadableStream {
constructor() {
@@ -181,6 +185,207 @@
}
}
+ // TODO(ricea): Move this into the class definition once it ships.
+ function ReadableStream_prototype_pipeThrough({writable, readable}, options) {
+ this.pipeTo(writable, options);
+ return readable;
+ }
+
+ // TODO(ricea): Move this into the class definition once it ships.
+ function ReadableStream_prototype_pipeTo(
+ dest, {preventClose, preventAbort, preventCancel} = {}) {
+ if (!IsReadableStream(this)) {
+ return Promise_reject(new TypeError(streamErrors.illegalInvocation));
+ }
+
+ if (!binding.IsWritableStream(dest)) {
+ // TODO(ricea): Think about having a better error message.
+ return Promise_reject(new TypeError(streamErrors.illegalInvocation));
+ }
+
+ preventClose = Boolean(preventClose);
+ preventAbort = Boolean(preventAbort);
+ preventCancel = Boolean(preventCancel);
+
+ const readable = this;
+ if (IsReadableStreamLocked(readable)) {
+ return Promise_reject(new TypeError(errCannotPipeLockedStream));
+ }
+
+ if (binding.IsWritableStreamLocked(dest)) {
+ return Promise_reject(new TypeError(errCannotPipeToALockedStream));
+ }
+
+ const reader = AcquireReadableStreamDefaultReader(readable);
+ const writer = binding.AcquireWritableStreamDefaultWriter(dest);
+ let shuttingDown = false;
+ const promise = v8.createPromise();
+ let reading = false;
+
+ if (checkInitialState()) {
+ // Need to detect closing and error when we are not reading.
+ thenPromise(reader[_closedPromise], onReaderClosed, readableError);
+ // Need to detect error when we are not writing.
+ thenPromise(
+ binding.getWritableStreamDefaultWriterClosedPromise(writer),
+ undefined, writableError);
+ pump();
+ }
+
+ // Checks the state of the streams and executes the shutdown handlers if
+ // necessary. Returns true if piping can continue.
+ function checkInitialState() {
+ const state = ReadableStreamGetState(readable);
+
+ // Both streams can be errored or closed. To perform the right action the
+ // order of the checks must match the standard.
+ if (state === STATE_ERRORED) {
+ readableError(readable[_storedError]);
+ return false;
+ }
+
+ if (binding.isWritableStreamErrored(dest)) {
+ writableError(binding.getWritableStreamStoredError(dest));
+ return false;
+ }
+
+ if (state === STATE_CLOSED) {
+ readableClosed();
+ return false;
+ }
+
+ if (binding.isWritableStreamClosingOrClosed(dest)) {
+ writableStartedClosed();
+ return false;
+ }
+
+ return true;
+ }
+
+ function pump() {
+ if (shuttingDown) {
+ return;
+ }
+ const desiredSize =
+ binding.WritableStreamDefaultWriterGetDesiredSize(writer);
+ if (desiredSize === null) {
+ writableError(binding.getWritableStreamStoredError(dest));
+ }
+ if (desiredSize <= 0) {
+ thenPromise(
+ binding.getWritableStreamDefaultWriterReadyPromise(writer), pump,
+ writableError);
+ return;
+ }
+ reading = true;
+ // TODO(ricea): Delay reads heuristically when desiredSize is low.
+ thenPromise(
+ ReadableStreamDefaultReaderRead(reader), readFulfilled, readRejected);
+ }
+
+ function readFulfilled({value, done}) {
+ reading = false;
+ if (shuttingDown) {
+ return;
+ }
+ if (done) {
+ readableClosed();
+ return;
+ }
+ const write = binding.WritableStreamDefaultWriterWrite(writer, value);
+ thenPromise(write, undefined, writableError);
+ pump();
+ }
+
+ function readRejected() {
+ reading = false;
+ readableError(readable[_storedError]);
+ }
+
+ // If read() is in progress, then wait for it to tell us that the stream is
+ // closed so that we write all the data before shutdown.
+ function onReaderClosed() {
+ if (!reading) {
+ readableClosed();
+ }
+ }
+
+ // These steps are from "Errors must be propagated forward" in the
+ // standard.
+ function readableError(error) {
+ if (!preventAbort) {
+ shutdownWithAction(
+ binding.WritableStreamAbort, [dest, error], error, true);
+ } else {
+ shutdown(error, true);
+ }
+ }
+
+ // These steps are from "Errors must be propagated backward".
+ function writableError(error) {
+ if (!preventCancel) {
+ shutdownWithAction(
+ ReadableStreamCancel, [readable, error], error, true);
+ } else {
+ shutdown(error, true);
+ }
+ }
+
+ // These steps are from "Closing must be propagated forward".
+ function readableClosed() {
+ if (!preventClose) {
+ shutdownWithAction(
+ binding.WritableStreamDefaultWriterCloseWithErrorPropagation,
+ [writer]);
+ } else {
+ shutdown();
+ }
+ }
+
+ // These steps are from "Closing must be propagated backward".
+ function writableStartedClosed() {
+ const destClosed = new TypeError(errDestinationStreamClosed);
+ if (!preventCancel) {
+ shutdownWithAction(
+ ReadableStreamCancel, [readable, destClosed], destClosed, true);
+ } else {
+ shutdown(destClosed, true);
+ }
+ }
+
+ function shutdownWithAction(
+ action, args, originalError = undefined, errorGiven = false) {
+ if (shuttingDown) {
+ return;
+ }
+ shuttingDown = true;
+ const p = applyFunction(action, undefined, args);
+ thenPromise(
+ p, () => finalize(originalError, errorGiven),
+ newError => finalize(newError, true));
+ }
+
+ function shutdown(error = undefined, errorGiven = false) {
+ if (shuttingDown) {
+ return;
+ }
+ shuttingDown = true;
+ finalize(error, errorGiven);
+ }
+
+ function finalize(error, errorGiven) {
+ binding.WritableStreamDefaultWriterRelease(writer);
+ ReadableStreamReaderGenericRelease(reader);
+ if (errorGiven) {
+ v8.rejectPromise(promise, error);
+ } else {
+ v8.resolvePromise(promise, undefined);
+ }
+ }
+
+ return promise;
+ }
+
class ReadableStreamDefaultController {
constructor(stream, underlyingSource, size, highWaterMark, isExternallyControlled) {
if (IsReadableStream(stream) === false) {
@@ -973,4 +1178,9 @@
return new ReadableStream(
underlyingSource, strategy, createWithExternalControllerSentinel);
};
+
+ // Temporary exports while pipeTo() and pipeThrough() are behind flags
+ binding.ReadableStream_prototype_pipeThrough =
+ ReadableStream_prototype_pipeThrough;
+ binding.ReadableStream_prototype_pipeTo = ReadableStream_prototype_pipeTo;
});

Powered by Google App Engine
This is Rietveld 408576698