Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(170)

Unified Diff: third_party/WebKit/Source/core/streams/ReadableStream.js

Issue 2561443004: Implementation of ReadableStream pipeTo and pipeThrough (Closed)
Patch Set: Changes from domenic review Created 3 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/WebKit/Source/core/streams/ReadableStream.js
diff --git a/third_party/WebKit/Source/core/streams/ReadableStream.js b/third_party/WebKit/Source/core/streams/ReadableStream.js
index 1586b0e61500db517bc361c1b456f8867c0db6a7..5b4dbce02d1316cbdbfffa7d3e22c9b2a6fdb7c0 100644
--- a/third_party/WebKit/Source/core/streams/ReadableStream.js
+++ b/third_party/WebKit/Source/core/streams/ReadableStream.js
@@ -49,6 +49,7 @@
const defineProperty = global.Object.defineProperty;
const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty);
const callFunction = v8.uncurryThis(global.Function.prototype.call);
+ const applyFunction = v8.uncurryThis(global.Function.prototype.apply);
const TypeError = global.TypeError;
const RangeError = global.RangeError;
@@ -93,6 +94,9 @@
const errTmplMustBeFunctionOrUndefined = name =>
`${name} must be a function or undefined`;
+ const errCannotPipeLockedStream = 'Cannot pipe a locked stream';
+ const errCannotPipeToALockedStream = 'Cannot pipe to a locked stream';
+ const errDestinationStreamClosed = 'Destination stream closed';
class ReadableStream {
constructor() {
@@ -181,6 +185,194 @@
}
}
+ // TODO(ricea): Move this into the class definition once it ships.
+ function ReadableStream_prototype_pipeThrough({writable, readable}, options) {
+ this.pipeTo(writable, options);
+ return readable;
+ }
+
+ // TODO(ricea): Move this into the class definition once it ships.
+ function ReadableStream_prototype_pipeTo(
+ dest, { preventClose, preventAbort, preventCancel } = {}) {
+ if (!IsReadableStream(this)) {
+ return Promise_reject(new TypeError(streamErrors.illegalInvocation));
+ }
+
+ if (!binding.IsWritableStream(dest)) {
+ // TODO(ricea): Think about having a better error message.
+ return Promise_reject(new TypeError(streamErrors.illegalInvocation));
+ }
+
+ preventClose = Boolean(preventClose);
+ preventAbort = Boolean(preventAbort);
+ preventCancel = Boolean(preventCancel);
+
+ const readable = this;
+ if (IsReadableStreamLocked(readable)) {
+ return Promise_reject(new TypeError(errCannotPipeLockedStream));
+ }
+
+ if (binding.IsWritableStreamLocked(dest)) {
+ return Promise_reject(new TypeError(errCannotPipeToALockedStream));
+ }
+
+ const reader = AcquireReadableStreamDefaultReader(readable);
+ const writer = binding.AcquireWritableStreamDefaultWriter(dest);
+ let shuttingDown = false;
+ const promise = v8.createPromise();
+ let reading = false;
+
+ if (initialStateOk()) {
+ // Need to detect closing and error when we are not reading.
+ thenPromise(reader[_closedPromise], readableClosed, readableError);
+ // Need to detect error when we are not writing.
+ thenPromise(binding.getWritableStreamDefaultWriterClosedPromise(writer),
+ undefined, writableError);
+ pump();
+ }
+
+ function initialStateOk() {
tyoshino (SeeGerritForStatus) 2017/01/23 10:42:57 This function name sounds like checking states and
Adam Rice 2017/01/23 13:24:30 Done.
+ const state = ReadableStreamGetState(readable);
+ if (state === STATE_ERRORED) {
+ readableError();
+ return false;
+ }
+
+ if (binding.isWritableStreamErrored(dest)) {
+ writableError();
+ return false;
+ }
tyoshino (SeeGerritForStatus) 2017/01/23 10:42:57 are these error check blocks for optimization?
Adam Rice 2017/01/23 13:24:30 They are to perform the checks in the correct orde
tyoshino (SeeGerritForStatus) 2017/01/24 09:52:33 Oh, thanks for the pointer. Sorry for being unawar
+
+ if (state === STATE_CLOSED) {
+ readableClosed();
tyoshino (SeeGerritForStatus) 2017/01/23 10:42:57 how about calling factoring out the latter half of
Adam Rice 2017/01/23 13:24:30 The first check in readableClosed() didn't belong
+ return false;
+ }
tyoshino (SeeGerritForStatus) 2017/01/23 10:42:57 Is this an optimization or to have pipeTo finish s
Adam Rice 2017/01/23 13:24:30 It's purely to perform the checks in the correct o
+
+ if (binding.isWritableStreamClosingOrClosed(dest)) {
+ writableStartedClosed();
+ return false;
+ }
+
+ return true;
+ }
+
+ function pump() {
+ if (shuttingDown) {
+ return;
+ }
+ const desiredSize =
+ binding.WritableStreamDefaultWriterGetDesiredSize(writer);
+ if (desiredSize === null) {
+ writableError();
+ }
+ if (desiredSize <= 0) {
+ thenPromise(binding.getWritableStreamDefaultWriterReadyPromise(writer),
+ pump, writableError);
+ return;
+ }
+ reading = true;
+ // TODO(ricea): Delay reads heuristically when desiredSize is low.
+ thenPromise(ReadableStreamDefaultReaderRead(reader), readFulfilled,
+ readRejected);
+ }
+
+ function readFulfilled({value, done}) {
+ reading = false;
+ if (shuttingDown) {
+ return;
+ }
+ if (done) {
+ readableClosed();
tyoshino (SeeGerritForStatus) 2017/01/23 10:42:57 how about calling factoring out the latter half of
Adam Rice 2017/01/23 13:24:30 I removed the first half instead, to make the oper
+ return;
+ }
+ const write = binding.WritableStreamDefaultWriterWrite(writer, value);
+ thenPromise(write, undefined, writableError);
+ pump();
+ }
+
+ function readRejected() {
+ reading = false;
+ readableError();
+ }
+
+ function readableError() {
+ if (!preventAbort) {
+ shutdownWithAction(binding.WritableStreamAbort,
+ [dest, readable[_storedError]],
+ readable[_storedError], true);
+ } else {
+ shutdown(readable[_storedError], true);
+ }
+ }
+
+ function writableError() {
+ const storedError = binding.getWritableStreamStoredError(dest);
tyoshino (SeeGerritForStatus) 2017/01/23 10:42:57 can't we use the rejection value? or intentionally
Adam Rice 2017/01/23 13:24:30 I'm not sure about this, because now we're looking
+ if (!preventCancel) {
+ shutdownWithAction(ReadableStreamCancel, [readable, storedError],
+ storedError, true);
+ } else {
+ shutdown(storedError, true);
+ }
+ }
+
+ function readableClosed() {
+ if (reading) {
+ // Handle the close status from the read() method rather than the
+ // [[closedPromise]].
+ return;
+ }
+ if (!preventClose) {
+ shutdownWithAction(
+ binding.WritableStreamDefaultWriterCloseWithErrorPropagation,
+ [writer]);
+ } else {
+ shutdown();
+ }
+ }
+
+ function writableStartedClosed() {
+ const destClosed = new TypeError(errDestinationStreamClosed);
+ if (!preventCancel) {
+ shutdownWithAction(ReadableStreamCancel, [readable, destClosed],
+ destClosed, true);
+ } else {
+ shutdown(destClosed, true);
+ }
+ }
+
+ function shutdownWithAction(action, args, originalError = undefined,
+ errorGiven = false) {
tyoshino (SeeGerritForStatus) 2017/01/23 10:42:57 default argument is used for better correspondence
Adam Rice 2017/01/23 13:24:30 Exactly. This worked better before Domenic pointed
+ if (shuttingDown) {
+ return;
+ }
+ shuttingDown = true;
+ const p = applyFunction(action, undefined, args);
+ thenPromise(p,
+ () => finalize(originalError, errorGiven),
+ newError => finalize(newError, true));
+ }
tyoshino (SeeGerritForStatus) 2017/01/23 10:42:57 ah, we need to update the reference implementation
Adam Rice 2017/01/23 13:24:30 Yes. I discussed this with Domenic but it looks li
+
+ function shutdown(error = undefined, errorGiven = false) {
+ if (shuttingDown) {
+ return;
+ }
+ shuttingDown = true;
+ finalize(error, errorGiven);
+ }
+
+ function finalize(error, errorGiven) {
+ binding.WritableStreamDefaultWriterRelease(writer);
+ ReadableStreamReaderGenericRelease(reader);
+ if (errorGiven) {
+ v8.rejectPromise(promise, error);
+ } else {
+ v8.resolvePromise(promise, undefined);
+ }
+ }
+
+ return promise;
+ }
+
class ReadableStreamDefaultController {
constructor(stream, underlyingSource, size, highWaterMark, isExternallyControlled) {
if (IsReadableStream(stream) === false) {
@@ -911,4 +1103,9 @@
return new ReadableStream(
underlyingSource, strategy, createWithExternalControllerSentinel);
};
+
+ // Temporary exports while pipeTo() and pipeThrough() are behind flags
+ binding.ReadableStream_prototype_pipeThrough =
+ ReadableStream_prototype_pipeThrough;
+ binding.ReadableStream_prototype_pipeTo = ReadableStream_prototype_pipeTo;
});

Powered by Google App Engine
This is Rietveld 408576698