Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(64)

Side by Side Diff: third_party/WebKit/Source/core/streams/ReadableStream.js

Issue 2561443004: Implementation of ReadableStream pipeTo and pipeThrough (Closed)
Patch Set: Apply changes from tyoshino review Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 (function(global, binding, v8) { 5 (function(global, binding, v8) {
6 'use strict'; 6 'use strict';
7 7
8 const _reader = v8.createPrivateSymbol('[[reader]]'); 8 const _reader = v8.createPrivateSymbol('[[reader]]');
9 const _storedError = v8.createPrivateSymbol('[[storedError]]'); 9 const _storedError = v8.createPrivateSymbol('[[storedError]]');
10 const _controller = v8.createPrivateSymbol('[[controller]]'); 10 const _controller = v8.createPrivateSymbol('[[controller]]');
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
42 const PULLING = 0b100; 42 const PULLING = 0b100;
43 const PULL_AGAIN = 0b1000; 43 const PULL_AGAIN = 0b1000;
44 const EXTERNALLY_CONTROLLED = 0b10000; 44 const EXTERNALLY_CONTROLLED = 0b10000;
45 45
46 const undefined = global.undefined; 46 const undefined = global.undefined;
47 const Infinity = global.Infinity; 47 const Infinity = global.Infinity;
48 48
49 const defineProperty = global.Object.defineProperty; 49 const defineProperty = global.Object.defineProperty;
50 const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty); 50 const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty);
51 const callFunction = v8.uncurryThis(global.Function.prototype.call); 51 const callFunction = v8.uncurryThis(global.Function.prototype.call);
52 const applyFunction = v8.uncurryThis(global.Function.prototype.apply);
52 53
53 const TypeError = global.TypeError; 54 const TypeError = global.TypeError;
54 const RangeError = global.RangeError; 55 const RangeError = global.RangeError;
55 56
56 const Number = global.Number; 57 const Number = global.Number;
57 const Number_isNaN = Number.isNaN; 58 const Number_isNaN = Number.isNaN;
58 const Number_isFinite = Number.isFinite; 59 const Number_isFinite = Number.isFinite;
59 60
60 const Promise = global.Promise; 61 const Promise = global.Promise;
61 const thenPromise = v8.uncurryThis(Promise.prototype.then); 62 const thenPromise = v8.uncurryThis(Promise.prototype.then);
(...skipping 24 matching lines...) Expand all
86 'ReadableStreamReader constructor argument is not a readable stream'; 87 'ReadableStreamReader constructor argument is not a readable stream';
87 const errReaderConstructorStreamAlreadyLocked = 88 const errReaderConstructorStreamAlreadyLocked =
88 'ReadableStreamReader constructor can only accept readable streams that ar e not yet locked to a reader'; 89 'ReadableStreamReader constructor can only accept readable streams that ar e not yet locked to a reader';
89 const errReleaseReaderWithPendingRead = 90 const errReleaseReaderWithPendingRead =
90 'Cannot release a readable stream reader when it still has outstanding rea d() calls that have not yet settled'; 91 'Cannot release a readable stream reader when it still has outstanding rea d() calls that have not yet settled';
91 const errReleasedReaderClosedPromise = 92 const errReleasedReaderClosedPromise =
92 'This readable stream reader has been released and cannot be used to monit or the stream\'s state'; 93 'This readable stream reader has been released and cannot be used to monit or the stream\'s state';
93 94
94 const errTmplMustBeFunctionOrUndefined = name => 95 const errTmplMustBeFunctionOrUndefined = name =>
95 `${name} must be a function or undefined`; 96 `${name} must be a function or undefined`;
97 const errCannotPipeLockedStream = 'Cannot pipe a locked stream';
98 const errCannotPipeToALockedStream = 'Cannot pipe to a locked stream';
99 const errDestinationStreamClosed = 'Destination stream closed';
96 100
97 class ReadableStream { 101 class ReadableStream {
98 constructor() { 102 constructor() {
99 // TODO(domenic): when V8 gets default parameters and destructuring, all 103 // TODO(domenic): when V8 gets default parameters and destructuring, all
100 // this can be cleaned up. 104 // this can be cleaned up.
101 const underlyingSource = arguments[0] === undefined ? {} : arguments[0]; 105 const underlyingSource = arguments[0] === undefined ? {} : arguments[0];
102 const strategy = arguments[1] === undefined ? {} : arguments[1]; 106 const strategy = arguments[1] === undefined ? {} : arguments[1];
103 const size = strategy.size; 107 const size = strategy.size;
104 let highWaterMark = strategy.highWaterMark; 108 let highWaterMark = strategy.highWaterMark;
105 if (highWaterMark === undefined) { 109 if (highWaterMark === undefined) {
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after
174 178
175 tee() { 179 tee() {
176 if (IsReadableStream(this) === false) { 180 if (IsReadableStream(this) === false) {
177 throw new TypeError(streamErrors.illegalInvocation); 181 throw new TypeError(streamErrors.illegalInvocation);
178 } 182 }
179 183
180 return ReadableStreamTee(this); 184 return ReadableStreamTee(this);
181 } 185 }
182 } 186 }
183 187
188 // TODO(ricea): Move this into the class definition once it ships.
189 function ReadableStream_prototype_pipeThrough({writable, readable}, options) {
190 this.pipeTo(writable, options);
191 return readable;
192 }
193
194 // TODO(ricea): Move this into the class definition once it ships.
195 function ReadableStream_prototype_pipeTo(
196 dest, {preventClose, preventAbort, preventCancel} = {}) {
197 if (!IsReadableStream(this)) {
198 return Promise_reject(new TypeError(streamErrors.illegalInvocation));
199 }
200
201 if (!binding.IsWritableStream(dest)) {
202 // TODO(ricea): Think about having a better error message.
203 return Promise_reject(new TypeError(streamErrors.illegalInvocation));
204 }
205
206 preventClose = Boolean(preventClose);
207 preventAbort = Boolean(preventAbort);
208 preventCancel = Boolean(preventCancel);
209
210 const readable = this;
211 if (IsReadableStreamLocked(readable)) {
212 return Promise_reject(new TypeError(errCannotPipeLockedStream));
213 }
214
215 if (binding.IsWritableStreamLocked(dest)) {
216 return Promise_reject(new TypeError(errCannotPipeToALockedStream));
217 }
218
219 const reader = AcquireReadableStreamDefaultReader(readable);
220 const writer = binding.AcquireWritableStreamDefaultWriter(dest);
221 let shuttingDown = false;
222 const promise = v8.createPromise();
223 let reading = false;
224
225 if (checkInitialState()) {
226 // Need to detect closing and error when we are not reading.
227 thenPromise(reader[_closedPromise], onReaderClosed, readableError);
228 // Need to detect error when we are not writing.
229 thenPromise(
230 binding.getWritableStreamDefaultWriterClosedPromise(writer),
231 undefined, writableError);
232 pump();
233 }
234
235 // Checks the state of the streams and executes the shutdown handlers if
236 // necessary. Returns true if piping can continue.
237 function checkInitialState() {
238 const state = ReadableStreamGetState(readable);
239 if (state === STATE_ERRORED) {
240 readableError(readable[_storedError]);
241 return false;
242 }
243
244 if (binding.isWritableStreamErrored(dest)) {
245 writableError(binding.getWritableStreamStoredError(dest));
246 return false;
247 }
248
249 if (state === STATE_CLOSED) {
250 readableClosed();
251 return false;
252 }
253
254 if (binding.isWritableStreamClosingOrClosed(dest)) {
255 writableStartedClosed();
256 return false;
257 }
258
259 return true;
260 }
261
262 function pump() {
263 if (shuttingDown) {
264 return;
265 }
266 const desiredSize =
267 binding.WritableStreamDefaultWriterGetDesiredSize(writer);
268 if (desiredSize === null) {
269 writableError(binding.getWritableStreamStoredError(dest));
270 }
271 if (desiredSize <= 0) {
272 thenPromise(
273 binding.getWritableStreamDefaultWriterReadyPromise(writer), pump,
274 writableError);
275 return;
276 }
277 reading = true;
278 // TODO(ricea): Delay reads heuristically when desiredSize is low.
279 thenPromise(
280 ReadableStreamDefaultReaderRead(reader), readFulfilled, readRejected);
281 }
282
283 function readFulfilled({value, done}) {
284 reading = false;
285 if (shuttingDown) {
286 return;
287 }
288 if (done) {
289 readableClosed();
290 return;
291 }
292 const write = binding.WritableStreamDefaultWriterWrite(writer, value);
293 thenPromise(write, undefined, writableError);
294 pump();
295 }
296
297 function readRejected() {
298 reading = false;
299 readableError(readable[_storedError]);
300 }
301
302 // If read() is in progress, then wait for it to tell us that the stream is
303 // closed so that we write all the data before shutdown.
304 function onReaderClosed() {
305 if (!reading) {
306 readableClosed();
307 }
308 }
309
310 // These steps are from "Errors must be propagated forward" in the
311 // standard.
312 function readableError(error) {
313 if (!preventAbort) {
314 shutdownWithAction(
315 binding.WritableStreamAbort, [dest, error], error, true);
316 } else {
317 shutdown(error, true);
318 }
319 }
320
321 // These steps are from "Errors must be propagated backward".
322 function writableError(error) {
323 if (!preventCancel) {
324 shutdownWithAction(
325 ReadableStreamCancel, [readable, error], error, true);
326 } else {
327 shutdown(error, true);
328 }
329 }
330
331 // These steps are from "Closing must be propagated forward".
332 function readableClosed() {
333 if (!preventClose) {
334 shutdownWithAction(
335 binding.WritableStreamDefaultWriterCloseWithErrorPropagation,
336 [writer]);
337 } else {
338 shutdown();
339 }
340 }
341
342 // These steps are from "Closing must be propagated backward".
343 function writableStartedClosed() {
344 const destClosed = new TypeError(errDestinationStreamClosed);
345 if (!preventCancel) {
346 shutdownWithAction(
347 ReadableStreamCancel, [readable, destClosed], destClosed, true);
348 } else {
349 shutdown(destClosed, true);
350 }
351 }
352
353 function shutdownWithAction(
354 action, args, originalError = undefined, errorGiven = false) {
355 if (shuttingDown) {
356 return;
357 }
358 shuttingDown = true;
359 const p = applyFunction(action, undefined, args);
360 thenPromise(
361 p, () => finalize(originalError, errorGiven),
362 newError => finalize(newError, true));
363 }
364
365 function shutdown(error = undefined, errorGiven = false) {
366 if (shuttingDown) {
367 return;
368 }
369 shuttingDown = true;
370 finalize(error, errorGiven);
371 }
372
373 function finalize(error, errorGiven) {
374 binding.WritableStreamDefaultWriterRelease(writer);
375 ReadableStreamReaderGenericRelease(reader);
376 if (errorGiven) {
377 v8.rejectPromise(promise, error);
378 } else {
379 v8.resolvePromise(promise, undefined);
380 }
381 }
382
383 return promise;
384 }
385
184 class ReadableStreamDefaultController { 386 class ReadableStreamDefaultController {
185 constructor(stream, underlyingSource, size, highWaterMark, isExternallyContr olled) { 387 constructor(stream, underlyingSource, size, highWaterMark, isExternallyContr olled) {
186 if (IsReadableStream(stream) === false) { 388 if (IsReadableStream(stream) === false) {
187 throw new TypeError(streamErrors.illegalConstructor); 389 throw new TypeError(streamErrors.illegalConstructor);
188 } 390 }
189 391
190 if (stream[_controller] !== undefined) { 392 if (stream[_controller] !== undefined) {
191 throw new TypeError(streamErrors.illegalConstructor); 393 throw new TypeError(streamErrors.illegalConstructor);
192 } 394 }
193 395
(...skipping 710 matching lines...) Expand 10 before | Expand all | Expand 10 after
904 binding.ReadableStreamDefaultControllerClose = ReadableStreamDefaultController Close; 1106 binding.ReadableStreamDefaultControllerClose = ReadableStreamDefaultController Close;
905 binding.ReadableStreamDefaultControllerGetDesiredSize = ReadableStreamDefaultC ontrollerGetDesiredSize; 1107 binding.ReadableStreamDefaultControllerGetDesiredSize = ReadableStreamDefaultC ontrollerGetDesiredSize;
906 binding.ReadableStreamDefaultControllerEnqueue = ReadableStreamDefaultControll erEnqueue; 1108 binding.ReadableStreamDefaultControllerEnqueue = ReadableStreamDefaultControll erEnqueue;
907 binding.ReadableStreamDefaultControllerError = ReadableStreamDefaultController Error; 1109 binding.ReadableStreamDefaultControllerError = ReadableStreamDefaultController Error;
908 1110
909 binding.createReadableStreamWithExternalController = 1111 binding.createReadableStreamWithExternalController =
910 (underlyingSource, strategy) => { 1112 (underlyingSource, strategy) => {
911 return new ReadableStream( 1113 return new ReadableStream(
912 underlyingSource, strategy, createWithExternalControllerSentinel); 1114 underlyingSource, strategy, createWithExternalControllerSentinel);
913 }; 1115 };
1116
1117 // Temporary exports while pipeTo() and pipeThrough() are behind flags
1118 binding.ReadableStream_prototype_pipeThrough =
1119 ReadableStream_prototype_pipeThrough;
1120 binding.ReadableStream_prototype_pipeTo = ReadableStream_prototype_pipeTo;
914 }); 1121 });
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698