Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(21)

Side by Side Diff: third_party/WebKit/Source/core/streams/ReadableStream.js

Issue 2561443004: Implementation of ReadableStream pipeTo and pipeThrough (Closed)
Patch Set: Changes from domenic review Created 3 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 (function(global, binding, v8) { 5 (function(global, binding, v8) {
6 'use strict'; 6 'use strict';
7 7
8 const _reader = v8.createPrivateSymbol('[[reader]]'); 8 const _reader = v8.createPrivateSymbol('[[reader]]');
9 const _storedError = v8.createPrivateSymbol('[[storedError]]'); 9 const _storedError = v8.createPrivateSymbol('[[storedError]]');
10 const _controller = v8.createPrivateSymbol('[[controller]]'); 10 const _controller = v8.createPrivateSymbol('[[controller]]');
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
42 const PULLING = 0b100; 42 const PULLING = 0b100;
43 const PULL_AGAIN = 0b1000; 43 const PULL_AGAIN = 0b1000;
44 const EXTERNALLY_CONTROLLED = 0b10000; 44 const EXTERNALLY_CONTROLLED = 0b10000;
45 45
46 const undefined = global.undefined; 46 const undefined = global.undefined;
47 const Infinity = global.Infinity; 47 const Infinity = global.Infinity;
48 48
49 const defineProperty = global.Object.defineProperty; 49 const defineProperty = global.Object.defineProperty;
50 const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty); 50 const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty);
51 const callFunction = v8.uncurryThis(global.Function.prototype.call); 51 const callFunction = v8.uncurryThis(global.Function.prototype.call);
52 const applyFunction = v8.uncurryThis(global.Function.prototype.apply);
52 53
53 const TypeError = global.TypeError; 54 const TypeError = global.TypeError;
54 const RangeError = global.RangeError; 55 const RangeError = global.RangeError;
55 56
56 const Number = global.Number; 57 const Number = global.Number;
57 const Number_isNaN = Number.isNaN; 58 const Number_isNaN = Number.isNaN;
58 const Number_isFinite = Number.isFinite; 59 const Number_isFinite = Number.isFinite;
59 60
60 const Promise = global.Promise; 61 const Promise = global.Promise;
61 const thenPromise = v8.uncurryThis(Promise.prototype.then); 62 const thenPromise = v8.uncurryThis(Promise.prototype.then);
(...skipping 24 matching lines...) Expand all
86 'ReadableStreamReader constructor argument is not a readable stream'; 87 'ReadableStreamReader constructor argument is not a readable stream';
87 const errReaderConstructorStreamAlreadyLocked = 88 const errReaderConstructorStreamAlreadyLocked =
88 'ReadableStreamReader constructor can only accept readable streams that ar e not yet locked to a reader'; 89 'ReadableStreamReader constructor can only accept readable streams that ar e not yet locked to a reader';
89 const errReleaseReaderWithPendingRead = 90 const errReleaseReaderWithPendingRead =
90 'Cannot release a readable stream reader when it still has outstanding rea d() calls that have not yet settled'; 91 'Cannot release a readable stream reader when it still has outstanding rea d() calls that have not yet settled';
91 const errReleasedReaderClosedPromise = 92 const errReleasedReaderClosedPromise =
92 'This readable stream reader has been released and cannot be used to monit or the stream\'s state'; 93 'This readable stream reader has been released and cannot be used to monit or the stream\'s state';
93 94
94 const errTmplMustBeFunctionOrUndefined = name => 95 const errTmplMustBeFunctionOrUndefined = name =>
95 `${name} must be a function or undefined`; 96 `${name} must be a function or undefined`;
97 const errCannotPipeLockedStream = 'Cannot pipe a locked stream';
98 const errCannotPipeToALockedStream = 'Cannot pipe to a locked stream';
99 const errDestinationStreamClosed = 'Destination stream closed';
96 100
97 class ReadableStream { 101 class ReadableStream {
98 constructor() { 102 constructor() {
99 // TODO(domenic): when V8 gets default parameters and destructuring, all 103 // TODO(domenic): when V8 gets default parameters and destructuring, all
100 // this can be cleaned up. 104 // this can be cleaned up.
101 const underlyingSource = arguments[0] === undefined ? {} : arguments[0]; 105 const underlyingSource = arguments[0] === undefined ? {} : arguments[0];
102 const strategy = arguments[1] === undefined ? {} : arguments[1]; 106 const strategy = arguments[1] === undefined ? {} : arguments[1];
103 const size = strategy.size; 107 const size = strategy.size;
104 let highWaterMark = strategy.highWaterMark; 108 let highWaterMark = strategy.highWaterMark;
105 if (highWaterMark === undefined) { 109 if (highWaterMark === undefined) {
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after
174 178
175 tee() { 179 tee() {
176 if (IsReadableStream(this) === false) { 180 if (IsReadableStream(this) === false) {
177 throw new TypeError(streamErrors.illegalInvocation); 181 throw new TypeError(streamErrors.illegalInvocation);
178 } 182 }
179 183
180 return ReadableStreamTee(this); 184 return ReadableStreamTee(this);
181 } 185 }
182 } 186 }
183 187
188 // TODO(ricea): Move this into the class definition once it ships.
189 function ReadableStream_prototype_pipeThrough({writable, readable}, options) {
190 this.pipeTo(writable, options);
191 return readable;
192 }
193
194 // TODO(ricea): Move this into the class definition once it ships.
195 function ReadableStream_prototype_pipeTo(
196 dest, { preventClose, preventAbort, preventCancel } = {}) {
197 if (!IsReadableStream(this)) {
198 return Promise_reject(new TypeError(streamErrors.illegalInvocation));
199 }
200
201 if (!binding.IsWritableStream(dest)) {
202 // TODO(ricea): Think about having a better error message.
203 return Promise_reject(new TypeError(streamErrors.illegalInvocation));
204 }
205
206 preventClose = Boolean(preventClose);
207 preventAbort = Boolean(preventAbort);
208 preventCancel = Boolean(preventCancel);
209
210 const readable = this;
211 if (IsReadableStreamLocked(readable)) {
212 return Promise_reject(new TypeError(errCannotPipeLockedStream));
213 }
214
215 if (binding.IsWritableStreamLocked(dest)) {
216 return Promise_reject(new TypeError(errCannotPipeToALockedStream));
217 }
218
219 const reader = AcquireReadableStreamDefaultReader(readable);
220 const writer = binding.AcquireWritableStreamDefaultWriter(dest);
221 let shuttingDown = false;
222 const promise = v8.createPromise();
223 let reading = false;
224
225 if (initialStateOk()) {
226 // Need to detect closing and error when we are not reading.
227 thenPromise(reader[_closedPromise], readableClosed, readableError);
228 // Need to detect error when we are not writing.
229 thenPromise(binding.getWritableStreamDefaultWriterClosedPromise(writer),
230 undefined, writableError);
231 pump();
232 }
233
234 function initialStateOk() {
tyoshino (SeeGerritForStatus) 2017/01/23 10:42:57 This function name sounds like checking states and
Adam Rice 2017/01/23 13:24:30 Done.
235 const state = ReadableStreamGetState(readable);
236 if (state === STATE_ERRORED) {
237 readableError();
238 return false;
239 }
240
241 if (binding.isWritableStreamErrored(dest)) {
242 writableError();
243 return false;
244 }
tyoshino (SeeGerritForStatus) 2017/01/23 10:42:57 are these error check blocks for optimization?
Adam Rice 2017/01/23 13:24:30 They are to perform the checks in the correct orde
tyoshino (SeeGerritForStatus) 2017/01/24 09:52:33 Oh, thanks for the pointer. Sorry for being unawar
245
246 if (state === STATE_CLOSED) {
247 readableClosed();
tyoshino (SeeGerritForStatus) 2017/01/23 10:42:57 how about calling factoring out the latter half of
Adam Rice 2017/01/23 13:24:30 The first check in readableClosed() didn't belong
248 return false;
249 }
tyoshino (SeeGerritForStatus) 2017/01/23 10:42:57 Is this an optimization or to have pipeTo finish s
Adam Rice 2017/01/23 13:24:30 It's purely to perform the checks in the correct o
250
251 if (binding.isWritableStreamClosingOrClosed(dest)) {
252 writableStartedClosed();
253 return false;
254 }
255
256 return true;
257 }
258
259 function pump() {
260 if (shuttingDown) {
261 return;
262 }
263 const desiredSize =
264 binding.WritableStreamDefaultWriterGetDesiredSize(writer);
265 if (desiredSize === null) {
266 writableError();
267 }
268 if (desiredSize <= 0) {
269 thenPromise(binding.getWritableStreamDefaultWriterReadyPromise(writer),
270 pump, writableError);
271 return;
272 }
273 reading = true;
274 // TODO(ricea): Delay reads heuristically when desiredSize is low.
275 thenPromise(ReadableStreamDefaultReaderRead(reader), readFulfilled,
276 readRejected);
277 }
278
279 function readFulfilled({value, done}) {
280 reading = false;
281 if (shuttingDown) {
282 return;
283 }
284 if (done) {
285 readableClosed();
tyoshino (SeeGerritForStatus) 2017/01/23 10:42:57 how about calling factoring out the latter half of
Adam Rice 2017/01/23 13:24:30 I removed the first half instead, to make the oper
286 return;
287 }
288 const write = binding.WritableStreamDefaultWriterWrite(writer, value);
289 thenPromise(write, undefined, writableError);
290 pump();
291 }
292
293 function readRejected() {
294 reading = false;
295 readableError();
296 }
297
298 function readableError() {
299 if (!preventAbort) {
300 shutdownWithAction(binding.WritableStreamAbort,
301 [dest, readable[_storedError]],
302 readable[_storedError], true);
303 } else {
304 shutdown(readable[_storedError], true);
305 }
306 }
307
308 function writableError() {
309 const storedError = binding.getWritableStreamStoredError(dest);
tyoshino (SeeGerritForStatus) 2017/01/23 10:42:57 can't we use the rejection value? or intentionally
Adam Rice 2017/01/23 13:24:30 I'm not sure about this, because now we're looking
310 if (!preventCancel) {
311 shutdownWithAction(ReadableStreamCancel, [readable, storedError],
312 storedError, true);
313 } else {
314 shutdown(storedError, true);
315 }
316 }
317
318 function readableClosed() {
319 if (reading) {
320 // Handle the close status from the read() method rather than the
321 // [[closedPromise]].
322 return;
323 }
324 if (!preventClose) {
325 shutdownWithAction(
326 binding.WritableStreamDefaultWriterCloseWithErrorPropagation,
327 [writer]);
328 } else {
329 shutdown();
330 }
331 }
332
333 function writableStartedClosed() {
334 const destClosed = new TypeError(errDestinationStreamClosed);
335 if (!preventCancel) {
336 shutdownWithAction(ReadableStreamCancel, [readable, destClosed],
337 destClosed, true);
338 } else {
339 shutdown(destClosed, true);
340 }
341 }
342
343 function shutdownWithAction(action, args, originalError = undefined,
344 errorGiven = false) {
tyoshino (SeeGerritForStatus) 2017/01/23 10:42:57 default argument is used for better correspondence
Adam Rice 2017/01/23 13:24:30 Exactly. This worked better before Domenic pointed
345 if (shuttingDown) {
346 return;
347 }
348 shuttingDown = true;
349 const p = applyFunction(action, undefined, args);
350 thenPromise(p,
351 () => finalize(originalError, errorGiven),
352 newError => finalize(newError, true));
353 }
tyoshino (SeeGerritForStatus) 2017/01/23 10:42:57 ah, we need to update the reference implementation
Adam Rice 2017/01/23 13:24:30 Yes. I discussed this with Domenic but it looks li
354
355 function shutdown(error = undefined, errorGiven = false) {
356 if (shuttingDown) {
357 return;
358 }
359 shuttingDown = true;
360 finalize(error, errorGiven);
361 }
362
363 function finalize(error, errorGiven) {
364 binding.WritableStreamDefaultWriterRelease(writer);
365 ReadableStreamReaderGenericRelease(reader);
366 if (errorGiven) {
367 v8.rejectPromise(promise, error);
368 } else {
369 v8.resolvePromise(promise, undefined);
370 }
371 }
372
373 return promise;
374 }
375
184 class ReadableStreamDefaultController { 376 class ReadableStreamDefaultController {
185 constructor(stream, underlyingSource, size, highWaterMark, isExternallyContr olled) { 377 constructor(stream, underlyingSource, size, highWaterMark, isExternallyContr olled) {
186 if (IsReadableStream(stream) === false) { 378 if (IsReadableStream(stream) === false) {
187 throw new TypeError(streamErrors.illegalConstructor); 379 throw new TypeError(streamErrors.illegalConstructor);
188 } 380 }
189 381
190 if (stream[_controller] !== undefined) { 382 if (stream[_controller] !== undefined) {
191 throw new TypeError(streamErrors.illegalConstructor); 383 throw new TypeError(streamErrors.illegalConstructor);
192 } 384 }
193 385
(...skipping 710 matching lines...) Expand 10 before | Expand all | Expand 10 after
904 binding.ReadableStreamDefaultControllerClose = ReadableStreamDefaultController Close; 1096 binding.ReadableStreamDefaultControllerClose = ReadableStreamDefaultController Close;
905 binding.ReadableStreamDefaultControllerGetDesiredSize = ReadableStreamDefaultC ontrollerGetDesiredSize; 1097 binding.ReadableStreamDefaultControllerGetDesiredSize = ReadableStreamDefaultC ontrollerGetDesiredSize;
906 binding.ReadableStreamDefaultControllerEnqueue = ReadableStreamDefaultControll erEnqueue; 1098 binding.ReadableStreamDefaultControllerEnqueue = ReadableStreamDefaultControll erEnqueue;
907 binding.ReadableStreamDefaultControllerError = ReadableStreamDefaultController Error; 1099 binding.ReadableStreamDefaultControllerError = ReadableStreamDefaultController Error;
908 1100
909 binding.createReadableStreamWithExternalController = 1101 binding.createReadableStreamWithExternalController =
910 (underlyingSource, strategy) => { 1102 (underlyingSource, strategy) => {
911 return new ReadableStream( 1103 return new ReadableStream(
912 underlyingSource, strategy, createWithExternalControllerSentinel); 1104 underlyingSource, strategy, createWithExternalControllerSentinel);
913 }; 1105 };
1106
1107 // Temporary exports while pipeTo() and pipeThrough() are behind flags
1108 binding.ReadableStream_prototype_pipeThrough =
1109 ReadableStream_prototype_pipeThrough;
1110 binding.ReadableStream_prototype_pipeTo = ReadableStream_prototype_pipeTo;
914 }); 1111 });
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698