Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(746)

Unified Diff: third_party/WebKit/Source/core/streams/ReadableStream.js

Issue 2561443004: Implementation of ReadableStream pipeTo and pipeThrough (Closed)
Patch Set: Stop waiting for writes to terminate at shutdown Created 3 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/WebKit/Source/core/streams/ReadableStream.js
diff --git a/third_party/WebKit/Source/core/streams/ReadableStream.js b/third_party/WebKit/Source/core/streams/ReadableStream.js
index 1586b0e61500db517bc361c1b456f8867c0db6a7..58f94b9a8105dcb47d60fa3e192b02ffb3cd27c3 100644
--- a/third_party/WebKit/Source/core/streams/ReadableStream.js
+++ b/third_party/WebKit/Source/core/streams/ReadableStream.js
@@ -49,6 +49,7 @@
const defineProperty = global.Object.defineProperty;
const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty);
const callFunction = v8.uncurryThis(global.Function.prototype.call);
+ const applyFunction = v8.uncurryThis(global.Function.prototype.apply);
const TypeError = global.TypeError;
const RangeError = global.RangeError;
@@ -93,6 +94,9 @@
const errTmplMustBeFunctionOrUndefined = name =>
`${name} must be a function or undefined`;
+ const errCannotPipeLockedStream = 'Cannot pipe a locked stream';
+ const errCannotPipeToALockedStream = 'Cannot pipe to a locked stream';
+ const errDestinationStreamClosed = 'Destination stream closed';
class ReadableStream {
constructor() {
@@ -181,6 +185,193 @@
}
}
+ // TODO(ricea): Move this into the class definition once it ships.
+ function ReadableStream_prototype_pipeThrough({writable, readable}, options) {
+ this.pipeTo(writable, options);
+ return readable;
+ }
+
+ // TODO(ricea): Move this into the class definition once it ships.
+ function ReadableStream_prototype_pipeTo(
+ dest, { preventClose, preventAbort, preventCancel } = {}) {
+ if (!IsReadableStream(this)) {
+ return Promise_reject(new TypeError(streamErrors.illegalInvocation));
+ }
+
+ if (!binding.IsWritableStream(dest)) {
+ // TODO(ricea): Think about having a better error message.
+ return Promise_reject(new TypeError(streamErrors.illegalInvocation));
+ }
+
+ preventClose = Boolean(preventClose);
+ preventAbort = Boolean(preventAbort);
+ preventCancel = Boolean(preventCancel);
+
+ const readable = this;
+ if (IsReadableStreamLocked(readable)) {
+ return Promise_reject(new TypeError(errCannotPipeLockedStream));
+ }
+
+ if (binding.IsWritableStreamLocked(dest)) {
+ return Promise_reject(new TypeError(errCannotPipeToALockedStream));
+ }
+
+ const reader = AcquireReadableStreamDefaultReader(readable);
+ const writer = binding.AcquireWritableStreamDefaultWriter(dest);
+ let shuttingDown = false;
+ const promise = v8.createPromise();
+ let reading = false;
+
+ if (initialStateOk()) {
+ // Need to detect closing and error when we are not reading.
+ thenPromise(reader[_closedPromise], readableClosed, readableError);
+ // Need to detect error when we are not writing.
+ thenPromise(binding.WritableStreamDefaultWriterClosedPromise(writer),
+ undefined, writableError);
+ pump();
+ }
+
+ function initialStateOk() {
domenic 2017/01/18 23:51:17 It might make sense to pull some of these out to t
Adam Rice 2017/01/19 14:02:47 This function calls state-changing functions, whic
+ const state = ReadableStreamGetState(readable);
+ if (state === STATE_ERRORED) {
+ readableError();
+ return false;
+ }
+
+ if (binding.IsWritableStreamErrored(dest)) {
domenic 2017/01/18 23:51:17 Maybe since these are not abstract operation names
Adam Rice 2017/01/19 14:02:47 It's not great for the getters, because I don't wa
+ writableError();
+ return false;
+ }
+
+ if (state === STATE_CLOSED) {
+ readableClosed();
+ return false;
+ }
+
+ if (binding.IsWritableStreamClosingOrClosed(dest)) {
+ writableStartedClosed();
+ return false;
+ }
+
+ return true;
+ }
+
+ function pump() {
+ if (shuttingDown) {
+ return;
+ }
+ const desiredSize =
+ binding.WritableStreamDefaultWriterGetDesiredSize(writer);
+ if (desiredSize === null) {
+ writableError();
+ }
+ if (desiredSize <= 0) {
+ thenPromise(binding.WritableStreamDefaultWriterReadyPromise(writer),
+ pump, writableError);
+ return;
+ }
+ reading = true;
+ // TODO(ricea): Delay reads heuristically when desiredSize is low.
+ thenPromise(ReadableStreamDefaultReaderRead(reader), readResolved,
+ readRejected);
+ }
+
+ function readResolved({value, done}) {
domenic 2017/01/18 23:51:17 Nit: "fulfilled" is more correct than "resolved" h
Adam Rice 2017/01/19 14:02:47 I hadn't seen that document before. It's cleared u
+ reading = false;
+ if (shuttingDown) {
+ return;
+ }
+ if (done) {
+ readableClosed();
+ return;
+ }
+ const write = binding.WritableStreamDefaultWriterWrite(writer, value);
+ thenPromise(write, undefined, writableError);
+ pump();
+ }
+
+ function readRejected() {
+ reading = false;
+ readableError();
+ }
+
+ function readableError() {
+ if (!preventAbort) {
+ shutdownWithAction(binding.WritableStreamAbort,
+ [dest, readable[_storedError]],
+ readable[_storedError]);
+ } else {
+ shutdown(readable[_storedError]);
+ }
+ }
+
+ function writableError() {
+ const storedError = binding.WritableStreamStoredError(dest);
+ if (!preventCancel) {
+ shutdownWithAction(ReadableStreamCancel, [readable,
+ storedError], storedError);
+ } else {
+ shutdown(storedError);
+ }
+ }
+
+ function readableClosed() {
+ if (reading) {
+ // Handle the close status from the read() method rather than the
+ // [[closedPromise]].
+ return;
+ }
+ if (!preventClose) {
+ shutdownWithAction(
+ binding.WritableStreamDefaultWriterCloseWithErrorPropagation,
+ [writer]);
+ } else {
+ shutdown();
+ }
+ }
+
+ function writableStartedClosed() {
+ const destClosed = new TypeError(errDestinationStreamClosed);
+ if (!preventCancel) {
+ shutdownWithAction(ReadableStreamCancel, [readable,
+ destClosed], destClosed);
+ } else {
+ shutdown(destClosed);
+ }
+ }
+
+ function shutdownWithAction(action, args, originalError = undefined) {
+ if (shuttingDown) {
+ return;
+ }
+ shuttingDown = true;
+ const p = applyFunction(action, undefined, args);
+ thenPromise(p,
+ () => finalize(originalError),
+ newError => finalize(newError));
+ }
+
+ function shutdown(error = undefined) {
+ if (shuttingDown) {
+ return;
+ }
+ shuttingDown = true;
+ finalize(error);
+ }
+
+ function finalize(error) {
+ binding.WritableStreamDefaultWriterRelease(writer);
+ ReadableStreamReaderGenericRelease(reader);
+ if (error !== undefined) {
domenic 2017/01/18 23:51:17 This seems like it will not work correctly when th
Adam Rice 2017/01/19 14:02:47 Yep. I went with passing booleans around as well.
+ v8.rejectPromise(promise, error);
+ } else {
+ v8.resolvePromise(promise, undefined);
+ }
+ }
+
+ return promise;
+ }
+
class ReadableStreamDefaultController {
constructor(stream, underlyingSource, size, highWaterMark, isExternallyControlled) {
if (IsReadableStream(stream) === false) {
@@ -911,4 +1102,9 @@
return new ReadableStream(
underlyingSource, strategy, createWithExternalControllerSentinel);
};
+
+ // Temporary exports while pipeTo() and pipeThrough() are behind flags
+ binding.ReadableStream_prototype_pipeThrough =
+ ReadableStream_prototype_pipeThrough;
+ binding.ReadableStream_prototype_pipeTo = ReadableStream_prototype_pipeTo;
});

Powered by Google App Engine
This is Rietveld 408576698