Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(53)

Side by Side Diff: third_party/WebKit/Source/core/streams/ReadableStream.js

Issue 2561443004: Implementation of ReadableStream pipeTo and pipeThrough (Closed)
Patch Set: Stop waiting for writes to terminate at shutdown Created 3 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 (function(global, binding, v8) { 5 (function(global, binding, v8) {
6 'use strict'; 6 'use strict';
7 7
8 const _reader = v8.createPrivateSymbol('[[reader]]'); 8 const _reader = v8.createPrivateSymbol('[[reader]]');
9 const _storedError = v8.createPrivateSymbol('[[storedError]]'); 9 const _storedError = v8.createPrivateSymbol('[[storedError]]');
10 const _controller = v8.createPrivateSymbol('[[controller]]'); 10 const _controller = v8.createPrivateSymbol('[[controller]]');
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
42 const PULLING = 0b100; 42 const PULLING = 0b100;
43 const PULL_AGAIN = 0b1000; 43 const PULL_AGAIN = 0b1000;
44 const EXTERNALLY_CONTROLLED = 0b10000; 44 const EXTERNALLY_CONTROLLED = 0b10000;
45 45
46 const undefined = global.undefined; 46 const undefined = global.undefined;
47 const Infinity = global.Infinity; 47 const Infinity = global.Infinity;
48 48
49 const defineProperty = global.Object.defineProperty; 49 const defineProperty = global.Object.defineProperty;
50 const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty); 50 const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty);
51 const callFunction = v8.uncurryThis(global.Function.prototype.call); 51 const callFunction = v8.uncurryThis(global.Function.prototype.call);
52 const applyFunction = v8.uncurryThis(global.Function.prototype.apply);
52 53
53 const TypeError = global.TypeError; 54 const TypeError = global.TypeError;
54 const RangeError = global.RangeError; 55 const RangeError = global.RangeError;
55 56
56 const Number = global.Number; 57 const Number = global.Number;
57 const Number_isNaN = Number.isNaN; 58 const Number_isNaN = Number.isNaN;
58 const Number_isFinite = Number.isFinite; 59 const Number_isFinite = Number.isFinite;
59 60
60 const Promise = global.Promise; 61 const Promise = global.Promise;
61 const thenPromise = v8.uncurryThis(Promise.prototype.then); 62 const thenPromise = v8.uncurryThis(Promise.prototype.then);
(...skipping 24 matching lines...) Expand all
86 'ReadableStreamReader constructor argument is not a readable stream'; 87 'ReadableStreamReader constructor argument is not a readable stream';
87 const errReaderConstructorStreamAlreadyLocked = 88 const errReaderConstructorStreamAlreadyLocked =
88 'ReadableStreamReader constructor can only accept readable streams that ar e not yet locked to a reader'; 89 'ReadableStreamReader constructor can only accept readable streams that ar e not yet locked to a reader';
89 const errReleaseReaderWithPendingRead = 90 const errReleaseReaderWithPendingRead =
90 'Cannot release a readable stream reader when it still has outstanding rea d() calls that have not yet settled'; 91 'Cannot release a readable stream reader when it still has outstanding rea d() calls that have not yet settled';
91 const errReleasedReaderClosedPromise = 92 const errReleasedReaderClosedPromise =
92 'This readable stream reader has been released and cannot be used to monit or the stream\'s state'; 93 'This readable stream reader has been released and cannot be used to monit or the stream\'s state';
93 94
94 const errTmplMustBeFunctionOrUndefined = name => 95 const errTmplMustBeFunctionOrUndefined = name =>
95 `${name} must be a function or undefined`; 96 `${name} must be a function or undefined`;
97 const errCannotPipeLockedStream = 'Cannot pipe a locked stream';
98 const errCannotPipeToALockedStream = 'Cannot pipe to a locked stream';
99 const errDestinationStreamClosed = 'Destination stream closed';
96 100
97 class ReadableStream { 101 class ReadableStream {
98 constructor() { 102 constructor() {
99 // TODO(domenic): when V8 gets default parameters and destructuring, all 103 // TODO(domenic): when V8 gets default parameters and destructuring, all
100 // this can be cleaned up. 104 // this can be cleaned up.
101 const underlyingSource = arguments[0] === undefined ? {} : arguments[0]; 105 const underlyingSource = arguments[0] === undefined ? {} : arguments[0];
102 const strategy = arguments[1] === undefined ? {} : arguments[1]; 106 const strategy = arguments[1] === undefined ? {} : arguments[1];
103 const size = strategy.size; 107 const size = strategy.size;
104 let highWaterMark = strategy.highWaterMark; 108 let highWaterMark = strategy.highWaterMark;
105 if (highWaterMark === undefined) { 109 if (highWaterMark === undefined) {
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after
174 178
175 tee() { 179 tee() {
176 if (IsReadableStream(this) === false) { 180 if (IsReadableStream(this) === false) {
177 throw new TypeError(streamErrors.illegalInvocation); 181 throw new TypeError(streamErrors.illegalInvocation);
178 } 182 }
179 183
180 return ReadableStreamTee(this); 184 return ReadableStreamTee(this);
181 } 185 }
182 } 186 }
183 187
188 // TODO(ricea): Move this into the class definition once it ships.
189 function ReadableStream_prototype_pipeThrough({writable, readable}, options) {
190 this.pipeTo(writable, options);
191 return readable;
192 }
193
194 // TODO(ricea): Move this into the class definition once it ships.
195 function ReadableStream_prototype_pipeTo(
196 dest, { preventClose, preventAbort, preventCancel } = {}) {
197 if (!IsReadableStream(this)) {
198 return Promise_reject(new TypeError(streamErrors.illegalInvocation));
199 }
200
201 if (!binding.IsWritableStream(dest)) {
202 // TODO(ricea): Think about having a better error message.
203 return Promise_reject(new TypeError(streamErrors.illegalInvocation));
204 }
205
206 preventClose = Boolean(preventClose);
207 preventAbort = Boolean(preventAbort);
208 preventCancel = Boolean(preventCancel);
209
210 const readable = this;
211 if (IsReadableStreamLocked(readable)) {
212 return Promise_reject(new TypeError(errCannotPipeLockedStream));
213 }
214
215 if (binding.IsWritableStreamLocked(dest)) {
216 return Promise_reject(new TypeError(errCannotPipeToALockedStream));
217 }
218
219 const reader = AcquireReadableStreamDefaultReader(readable);
220 const writer = binding.AcquireWritableStreamDefaultWriter(dest);
221 let shuttingDown = false;
222 const promise = v8.createPromise();
223 let reading = false;
224
225 if (initialStateOk()) {
226 // Need to detect closing and error when we are not reading.
227 thenPromise(reader[_closedPromise], readableClosed, readableError);
228 // Need to detect error when we are not writing.
229 thenPromise(binding.WritableStreamDefaultWriterClosedPromise(writer),
230 undefined, writableError);
231 pump();
232 }
233
234 function initialStateOk() {
domenic 2017/01/18 23:51:17 It might make sense to pull some of these out to t
Adam Rice 2017/01/19 14:02:47 This function calls state-changing functions, whic
235 const state = ReadableStreamGetState(readable);
236 if (state === STATE_ERRORED) {
237 readableError();
238 return false;
239 }
240
241 if (binding.IsWritableStreamErrored(dest)) {
domenic 2017/01/18 23:51:17 Maybe since these are not abstract operation names
Adam Rice 2017/01/19 14:02:47 It's not great for the getters, because I don't wa
242 writableError();
243 return false;
244 }
245
246 if (state === STATE_CLOSED) {
247 readableClosed();
248 return false;
249 }
250
251 if (binding.IsWritableStreamClosingOrClosed(dest)) {
252 writableStartedClosed();
253 return false;
254 }
255
256 return true;
257 }
258
259 function pump() {
260 if (shuttingDown) {
261 return;
262 }
263 const desiredSize =
264 binding.WritableStreamDefaultWriterGetDesiredSize(writer);
265 if (desiredSize === null) {
266 writableError();
267 }
268 if (desiredSize <= 0) {
269 thenPromise(binding.WritableStreamDefaultWriterReadyPromise(writer),
270 pump, writableError);
271 return;
272 }
273 reading = true;
274 // TODO(ricea): Delay reads heuristically when desiredSize is low.
275 thenPromise(ReadableStreamDefaultReaderRead(reader), readResolved,
276 readRejected);
277 }
278
279 function readResolved({value, done}) {
domenic 2017/01/18 23:51:17 Nit: "fulfilled" is more correct than "resolved" h
Adam Rice 2017/01/19 14:02:47 I hadn't seen that document before. It's cleared u
280 reading = false;
281 if (shuttingDown) {
282 return;
283 }
284 if (done) {
285 readableClosed();
286 return;
287 }
288 const write = binding.WritableStreamDefaultWriterWrite(writer, value);
289 thenPromise(write, undefined, writableError);
290 pump();
291 }
292
293 function readRejected() {
294 reading = false;
295 readableError();
296 }
297
298 function readableError() {
299 if (!preventAbort) {
300 shutdownWithAction(binding.WritableStreamAbort,
301 [dest, readable[_storedError]],
302 readable[_storedError]);
303 } else {
304 shutdown(readable[_storedError]);
305 }
306 }
307
308 function writableError() {
309 const storedError = binding.WritableStreamStoredError(dest);
310 if (!preventCancel) {
311 shutdownWithAction(ReadableStreamCancel, [readable,
312 storedError], storedError);
313 } else {
314 shutdown(storedError);
315 }
316 }
317
318 function readableClosed() {
319 if (reading) {
320 // Handle the close status from the read() method rather than the
321 // [[closedPromise]].
322 return;
323 }
324 if (!preventClose) {
325 shutdownWithAction(
326 binding.WritableStreamDefaultWriterCloseWithErrorPropagation,
327 [writer]);
328 } else {
329 shutdown();
330 }
331 }
332
333 function writableStartedClosed() {
334 const destClosed = new TypeError(errDestinationStreamClosed);
335 if (!preventCancel) {
336 shutdownWithAction(ReadableStreamCancel, [readable,
337 destClosed], destClosed);
338 } else {
339 shutdown(destClosed);
340 }
341 }
342
343 function shutdownWithAction(action, args, originalError = undefined) {
344 if (shuttingDown) {
345 return;
346 }
347 shuttingDown = true;
348 const p = applyFunction(action, undefined, args);
349 thenPromise(p,
350 () => finalize(originalError),
351 newError => finalize(newError));
352 }
353
354 function shutdown(error = undefined) {
355 if (shuttingDown) {
356 return;
357 }
358 shuttingDown = true;
359 finalize(error);
360 }
361
362 function finalize(error) {
363 binding.WritableStreamDefaultWriterRelease(writer);
364 ReadableStreamReaderGenericRelease(reader);
365 if (error !== undefined) {
domenic 2017/01/18 23:51:17 This seems like it will not work correctly when th
Adam Rice 2017/01/19 14:02:47 Yep. I went with passing booleans around as well.
366 v8.rejectPromise(promise, error);
367 } else {
368 v8.resolvePromise(promise, undefined);
369 }
370 }
371
372 return promise;
373 }
374
184 class ReadableStreamDefaultController { 375 class ReadableStreamDefaultController {
185 constructor(stream, underlyingSource, size, highWaterMark, isExternallyContr olled) { 376 constructor(stream, underlyingSource, size, highWaterMark, isExternallyContr olled) {
186 if (IsReadableStream(stream) === false) { 377 if (IsReadableStream(stream) === false) {
187 throw new TypeError(streamErrors.illegalConstructor); 378 throw new TypeError(streamErrors.illegalConstructor);
188 } 379 }
189 380
190 if (stream[_controller] !== undefined) { 381 if (stream[_controller] !== undefined) {
191 throw new TypeError(streamErrors.illegalConstructor); 382 throw new TypeError(streamErrors.illegalConstructor);
192 } 383 }
193 384
(...skipping 710 matching lines...) Expand 10 before | Expand all | Expand 10 after
904 binding.ReadableStreamDefaultControllerClose = ReadableStreamDefaultController Close; 1095 binding.ReadableStreamDefaultControllerClose = ReadableStreamDefaultController Close;
905 binding.ReadableStreamDefaultControllerGetDesiredSize = ReadableStreamDefaultC ontrollerGetDesiredSize; 1096 binding.ReadableStreamDefaultControllerGetDesiredSize = ReadableStreamDefaultC ontrollerGetDesiredSize;
906 binding.ReadableStreamDefaultControllerEnqueue = ReadableStreamDefaultControll erEnqueue; 1097 binding.ReadableStreamDefaultControllerEnqueue = ReadableStreamDefaultControll erEnqueue;
907 binding.ReadableStreamDefaultControllerError = ReadableStreamDefaultController Error; 1098 binding.ReadableStreamDefaultControllerError = ReadableStreamDefaultController Error;
908 1099
909 binding.createReadableStreamWithExternalController = 1100 binding.createReadableStreamWithExternalController =
910 (underlyingSource, strategy) => { 1101 (underlyingSource, strategy) => {
911 return new ReadableStream( 1102 return new ReadableStream(
912 underlyingSource, strategy, createWithExternalControllerSentinel); 1103 underlyingSource, strategy, createWithExternalControllerSentinel);
913 }; 1104 };
1105
1106 // Temporary exports while pipeTo() and pipeThrough() are behind flags
1107 binding.ReadableStream_prototype_pipeThrough =
1108 ReadableStream_prototype_pipeThrough;
1109 binding.ReadableStream_prototype_pipeTo = ReadableStream_prototype_pipeTo;
914 }); 1110 });
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698