Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(49)

Side by Side Diff: third_party/WebKit/Source/core/streams/ReadableStream.js

Issue 2561443004: Implementation of ReadableStream pipeTo and pipeThrough (Closed)
Patch Set: Rebase and move includes to .gn Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 (function(global, binding, v8) { 5 (function(global, binding, v8) {
6 'use strict'; 6 'use strict';
7 7
8 const _reader = v8.createPrivateSymbol('[[reader]]'); 8 const _reader = v8.createPrivateSymbol('[[reader]]');
9 const _storedError = v8.createPrivateSymbol('[[storedError]]'); 9 const _storedError = v8.createPrivateSymbol('[[storedError]]');
10 const _controller = v8.createPrivateSymbol('[[controller]]'); 10 const _controller = v8.createPrivateSymbol('[[controller]]');
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
42 const PULLING = 0b100; 42 const PULLING = 0b100;
43 const PULL_AGAIN = 0b1000; 43 const PULL_AGAIN = 0b1000;
44 const EXTERNALLY_CONTROLLED = 0b10000; 44 const EXTERNALLY_CONTROLLED = 0b10000;
45 45
46 const undefined = global.undefined; 46 const undefined = global.undefined;
47 const Infinity = global.Infinity; 47 const Infinity = global.Infinity;
48 48
49 const defineProperty = global.Object.defineProperty; 49 const defineProperty = global.Object.defineProperty;
50 const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty); 50 const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty);
51 const callFunction = v8.uncurryThis(global.Function.prototype.call); 51 const callFunction = v8.uncurryThis(global.Function.prototype.call);
52 const applyFunction = v8.uncurryThis(global.Function.prototype.apply);
52 53
53 const TypeError = global.TypeError; 54 const TypeError = global.TypeError;
54 const RangeError = global.RangeError; 55 const RangeError = global.RangeError;
55 56
56 const Number = global.Number; 57 const Number = global.Number;
57 const Number_isNaN = Number.isNaN; 58 const Number_isNaN = Number.isNaN;
58 const Number_isFinite = Number.isFinite; 59 const Number_isFinite = Number.isFinite;
59 60
60 const Promise = global.Promise; 61 const Promise = global.Promise;
61 const thenPromise = v8.uncurryThis(Promise.prototype.then); 62 const thenPromise = v8.uncurryThis(Promise.prototype.then);
(...skipping 24 matching lines...) Expand all
86 'ReadableStreamReader constructor argument is not a readable stream'; 87 'ReadableStreamReader constructor argument is not a readable stream';
87 const errReaderConstructorStreamAlreadyLocked = 88 const errReaderConstructorStreamAlreadyLocked =
88 'ReadableStreamReader constructor can only accept readable streams that ar e not yet locked to a reader'; 89 'ReadableStreamReader constructor can only accept readable streams that ar e not yet locked to a reader';
89 const errReleaseReaderWithPendingRead = 90 const errReleaseReaderWithPendingRead =
90 'Cannot release a readable stream reader when it still has outstanding rea d() calls that have not yet settled'; 91 'Cannot release a readable stream reader when it still has outstanding rea d() calls that have not yet settled';
91 const errReleasedReaderClosedPromise = 92 const errReleasedReaderClosedPromise =
92 'This readable stream reader has been released and cannot be used to monit or the stream\'s state'; 93 'This readable stream reader has been released and cannot be used to monit or the stream\'s state';
93 94
94 const errTmplMustBeFunctionOrUndefined = name => 95 const errTmplMustBeFunctionOrUndefined = name =>
95 `${name} must be a function or undefined`; 96 `${name} must be a function or undefined`;
97 const errCannotPipeLockedStream = 'Cannot pipe a locked stream';
98 const errCannotPipeToALockedStream = 'Cannot pipe to a locked stream';
99 const errDestinationStreamClosed = 'Destination stream closed';
96 100
97 class ReadableStream { 101 class ReadableStream {
98 constructor() { 102 constructor() {
99 // TODO(domenic): when V8 gets default parameters and destructuring, all 103 // TODO(domenic): when V8 gets default parameters and destructuring, all
100 // this can be cleaned up. 104 // this can be cleaned up.
101 const underlyingSource = arguments[0] === undefined ? {} : arguments[0]; 105 const underlyingSource = arguments[0] === undefined ? {} : arguments[0];
102 const strategy = arguments[1] === undefined ? {} : arguments[1]; 106 const strategy = arguments[1] === undefined ? {} : arguments[1];
103 const size = strategy.size; 107 const size = strategy.size;
104 let highWaterMark = strategy.highWaterMark; 108 let highWaterMark = strategy.highWaterMark;
105 if (highWaterMark === undefined) { 109 if (highWaterMark === undefined) {
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after
174 178
175 tee() { 179 tee() {
176 if (IsReadableStream(this) === false) { 180 if (IsReadableStream(this) === false) {
177 throw new TypeError(streamErrors.illegalInvocation); 181 throw new TypeError(streamErrors.illegalInvocation);
178 } 182 }
179 183
180 return ReadableStreamTee(this); 184 return ReadableStreamTee(this);
181 } 185 }
182 } 186 }
183 187
188 // TODO(ricea): Move this into the class definition once it ships.
189 function ReadableStream_prototype_pipeThrough({writable, readable}, options) {
190 this.pipeTo(writable, options);
191 return readable;
192 }
193
194 // TODO(ricea): Move this into the class definition once it ships.
195 function ReadableStream_prototype_pipeTo(
196 dest, {preventClose, preventAbort, preventCancel} = {}) {
197 if (!IsReadableStream(this)) {
198 return Promise_reject(new TypeError(streamErrors.illegalInvocation));
199 }
200
201 if (!binding.IsWritableStream(dest)) {
202 // TODO(ricea): Think about having a better error message.
203 return Promise_reject(new TypeError(streamErrors.illegalInvocation));
204 }
205
206 preventClose = Boolean(preventClose);
207 preventAbort = Boolean(preventAbort);
208 preventCancel = Boolean(preventCancel);
209
210 const readable = this;
211 if (IsReadableStreamLocked(readable)) {
212 return Promise_reject(new TypeError(errCannotPipeLockedStream));
213 }
214
215 if (binding.IsWritableStreamLocked(dest)) {
216 return Promise_reject(new TypeError(errCannotPipeToALockedStream));
217 }
218
219 const reader = AcquireReadableStreamDefaultReader(readable);
220 const writer = binding.AcquireWritableStreamDefaultWriter(dest);
221 let shuttingDown = false;
222 const promise = v8.createPromise();
223 let reading = false;
224
225 if (checkInitialState()) {
226 // Need to detect closing and error when we are not reading.
227 thenPromise(reader[_closedPromise], onReaderClosed, readableError);
228 // Need to detect error when we are not writing.
229 thenPromise(
230 binding.getWritableStreamDefaultWriterClosedPromise(writer),
231 undefined, writableError);
232 pump();
233 }
234
235 // Checks the state of the streams and executes the shutdown handlers if
236 // necessary. Returns true if piping can continue.
237 function checkInitialState() {
238 const state = ReadableStreamGetState(readable);
239
240 // Both streams can be errored or closed. To perform the right action the
241 // order of the checks must match the standard.
242 if (state === STATE_ERRORED) {
243 readableError(readable[_storedError]);
244 return false;
245 }
246
247 if (binding.isWritableStreamErrored(dest)) {
248 writableError(binding.getWritableStreamStoredError(dest));
249 return false;
250 }
251
252 if (state === STATE_CLOSED) {
253 readableClosed();
254 return false;
255 }
256
257 if (binding.isWritableStreamClosingOrClosed(dest)) {
258 writableStartedClosed();
259 return false;
260 }
261
262 return true;
263 }
264
265 function pump() {
266 if (shuttingDown) {
267 return;
268 }
269 const desiredSize =
270 binding.WritableStreamDefaultWriterGetDesiredSize(writer);
271 if (desiredSize === null) {
272 writableError(binding.getWritableStreamStoredError(dest));
273 }
274 if (desiredSize <= 0) {
275 thenPromise(
276 binding.getWritableStreamDefaultWriterReadyPromise(writer), pump,
277 writableError);
278 return;
279 }
280 reading = true;
281 // TODO(ricea): Delay reads heuristically when desiredSize is low.
282 thenPromise(
283 ReadableStreamDefaultReaderRead(reader), readFulfilled, readRejected);
284 }
285
286 function readFulfilled({value, done}) {
287 reading = false;
288 if (shuttingDown) {
289 return;
290 }
291 if (done) {
292 readableClosed();
293 return;
294 }
295 const write = binding.WritableStreamDefaultWriterWrite(writer, value);
296 thenPromise(write, undefined, writableError);
297 pump();
298 }
299
300 function readRejected() {
301 reading = false;
302 readableError(readable[_storedError]);
303 }
304
305 // If read() is in progress, then wait for it to tell us that the stream is
306 // closed so that we write all the data before shutdown.
307 function onReaderClosed() {
308 if (!reading) {
309 readableClosed();
310 }
311 }
312
313 // These steps are from "Errors must be propagated forward" in the
314 // standard.
315 function readableError(error) {
316 if (!preventAbort) {
317 shutdownWithAction(
318 binding.WritableStreamAbort, [dest, error], error, true);
319 } else {
320 shutdown(error, true);
321 }
322 }
323
324 // These steps are from "Errors must be propagated backward".
325 function writableError(error) {
326 if (!preventCancel) {
327 shutdownWithAction(
328 ReadableStreamCancel, [readable, error], error, true);
329 } else {
330 shutdown(error, true);
331 }
332 }
333
334 // These steps are from "Closing must be propagated forward".
335 function readableClosed() {
336 if (!preventClose) {
337 shutdownWithAction(
338 binding.WritableStreamDefaultWriterCloseWithErrorPropagation,
339 [writer]);
340 } else {
341 shutdown();
342 }
343 }
344
345 // These steps are from "Closing must be propagated backward".
346 function writableStartedClosed() {
347 const destClosed = new TypeError(errDestinationStreamClosed);
348 if (!preventCancel) {
349 shutdownWithAction(
350 ReadableStreamCancel, [readable, destClosed], destClosed, true);
351 } else {
352 shutdown(destClosed, true);
353 }
354 }
355
356 function shutdownWithAction(
357 action, args, originalError = undefined, errorGiven = false) {
358 if (shuttingDown) {
359 return;
360 }
361 shuttingDown = true;
362 const p = applyFunction(action, undefined, args);
363 thenPromise(
364 p, () => finalize(originalError, errorGiven),
365 newError => finalize(newError, true));
366 }
367
368 function shutdown(error = undefined, errorGiven = false) {
369 if (shuttingDown) {
370 return;
371 }
372 shuttingDown = true;
373 finalize(error, errorGiven);
374 }
375
376 function finalize(error, errorGiven) {
377 binding.WritableStreamDefaultWriterRelease(writer);
378 ReadableStreamReaderGenericRelease(reader);
379 if (errorGiven) {
380 v8.rejectPromise(promise, error);
381 } else {
382 v8.resolvePromise(promise, undefined);
383 }
384 }
385
386 return promise;
387 }
388
184 class ReadableStreamDefaultController { 389 class ReadableStreamDefaultController {
185 constructor(stream, underlyingSource, size, highWaterMark, isExternallyContr olled) { 390 constructor(stream, underlyingSource, size, highWaterMark, isExternallyContr olled) {
186 if (IsReadableStream(stream) === false) { 391 if (IsReadableStream(stream) === false) {
187 throw new TypeError(streamErrors.illegalConstructor); 392 throw new TypeError(streamErrors.illegalConstructor);
188 } 393 }
189 394
190 if (stream[_controller] !== undefined) { 395 if (stream[_controller] !== undefined) {
191 throw new TypeError(streamErrors.illegalConstructor); 396 throw new TypeError(streamErrors.illegalConstructor);
192 } 397 }
193 398
(...skipping 772 matching lines...) Expand 10 before | Expand all | Expand 10 after
966 binding.ReadableStreamDefaultControllerClose = ReadableStreamDefaultController Close; 1171 binding.ReadableStreamDefaultControllerClose = ReadableStreamDefaultController Close;
967 binding.ReadableStreamDefaultControllerGetDesiredSize = ReadableStreamDefaultC ontrollerGetDesiredSize; 1172 binding.ReadableStreamDefaultControllerGetDesiredSize = ReadableStreamDefaultC ontrollerGetDesiredSize;
968 binding.ReadableStreamDefaultControllerEnqueue = ReadableStreamDefaultControll erEnqueue; 1173 binding.ReadableStreamDefaultControllerEnqueue = ReadableStreamDefaultControll erEnqueue;
969 binding.ReadableStreamDefaultControllerError = ReadableStreamDefaultController Error; 1174 binding.ReadableStreamDefaultControllerError = ReadableStreamDefaultController Error;
970 1175
971 binding.createReadableStreamWithExternalController = 1176 binding.createReadableStreamWithExternalController =
972 (underlyingSource, strategy) => { 1177 (underlyingSource, strategy) => {
973 return new ReadableStream( 1178 return new ReadableStream(
974 underlyingSource, strategy, createWithExternalControllerSentinel); 1179 underlyingSource, strategy, createWithExternalControllerSentinel);
975 }; 1180 };
1181
1182 // Temporary exports while pipeTo() and pipeThrough() are behind flags
1183 binding.ReadableStream_prototype_pipeThrough =
1184 ReadableStream_prototype_pipeThrough;
1185 binding.ReadableStream_prototype_pipeTo = ReadableStream_prototype_pipeTo;
976 }); 1186 });
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698