Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(192)

Side by Side Diff: Source/platform/streams/ReadableStream.js

Issue 924713002: [WIP] ReadableStream V8 extension (Closed) Base URL: https://chromium.googlesource.com/chromium/blink.git@master
Patch Set: More complete implementation Created 5 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « Source/platform/streams/ReadableStream.cpp ('k') | Source/platform/streams/WebStreams.cpp » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 (function (global) {
2 'use strict';
3
4 native function SET_PRIVATE();
5 native function GET_PRIVATE();
6 native function HAS_PRIVATE();
7
8 const kQueueSize = Symbol('Queue-with-sizes queue size');
9 const kPromise = Symbol('promise');
10 const kResolve = Symbol('resolve corresponding promise');
11 const kReject = Symbol('reject corresponding promise');
12
13 // TODO(domenic): come up with a better way of getting at intrinsics
14 const Number = global.Number;
15 const TypeError = global.TypeError;
16 const RangeError = global.RangeError;
17 const Promise = global.Promise;
18
19 const Number_isNaN = Number.isNaN;
20 const Promise_resolve = Promise.resolve.bind(Promise);
21 const Promise_reject = Promise.reject.bind(Promise);
22
23 const uncurryThis = Function.prototype.bind.bind(Function.prototype.call);
24 const applyFunction = uncurryThis(Function.prototype.apply);
25 const thenPromise = uncurryThis(Promise.prototype.then);
26 const shiftArray = uncurryThis(Array.prototype.shift);
27 const pushArray = uncurryThis(Array.prototype.push);
28
29 // TODO(domenic): need to censor Function.prototype.toString for these; use V8 API presumably
30 class ReadableStream {
31 constructor(underlyingSource, strategy) {
32 if (underlyingSource === undefined) {
33 underlyingSource = {};
34 }
35 if (strategy === undefined) {
36 strategy = {};
37 }
38 const size = strategy.size;
39 let highWaterMark = strategy.highWaterMark;
40 if (highWaterMark === undefined) {
41 highWaterMark = 1;
42 }
43
44 const normalizedStrategy = ValidateAndNormalizeQueuingStrategy(size, highWaterMark);
45
46 SET_PRIVATE(this, 'ReadableStream#underlyingSource', underlyingSourc e);
47
48 // TODO(domenic) use a real queue data structure
49 const queue = [];
50 queue[kQueueSize] = 0;
51 SET_PRIVATE(this, 'ReadableStream#queue', queue);
52
53 // TODO(domenic) consolidate booleans into a bit field?
54 // TODO(domenic) use integers for state? (or put in bit field?)
55 SET_PRIVATE(this, 'ReadableStream#state', 'readable');
56 SET_PRIVATE(this, 'ReadableStream#started', false);
57 SET_PRIVATE(this, 'ReadableStream#closeRequested', false);
58 SET_PRIVATE(this, 'ReadableStream#pulling', false);
59 SET_PRIVATE(this, 'ReadableStream#pullAgain', false);
60 SET_PRIVATE(this, 'ReadableStream#reader', undefined);
61
62 SET_PRIVATE(this, 'ReadableStream#storedError', undefined);
63 SET_PRIVATE(this, 'ReadableStream#strategySize', normalizedStrategy. size);
64 SET_PRIVATE(this, 'ReadableStream#stratgyHWM', normalizedStrategy.hi ghWaterMark);
65 SET_PRIVATE(this, 'ReadableStream#stratgyHWM', normalizedStrategy.hi ghWaterMark);
66
67 const controller = new ReadableStreamController(this);
68 SET_PRIVATE(this, 'ReadableStream#controller', controller);
69
70 const that = this;
71 const startResult = InvokeOrNoop(underlyingSource, 'start', [control ler]);
72 thenPromise(Promise_resolve(startResult),
73 function() {
74 SET_PRIVATE(that, 'ReadableStream#started', true);
75 RequestReadableStreamPull(that);
76 },
77 function (r) {
78 if (GET_PRIVATE(that, 'ReadableStream#state' === 'readable') ) {
79 return ErrorReadableStream(that, r);
80 }
81 }
82 );
83 }
84
85 cancel(reason) {
86 if (IsReadableStream(this) === false) {
87 return Promise_reject(new TypeError(
88 'ReadableStream.prototype.cancel can only be used on a Reada bleStream'));
89 }
90
91 if (IsReadableStreamLocked(this) === true) {
92 return Promise_reject(new TypeError(
93 'Cannot cancel a stream that already has a reader'));
94 }
95
96 return CancelReadableStream(this, reason);
97 }
98
99 getReader() {
100 if (IsReadableStream(this) === false) {
101 throw new TypeError('ReadableStream.prototype.getReader can only be used on a ReadableStream');
102 }
103
104 return AcquireReadableStreamReader(this);
105 }
106 }
107
108 class ReadableStreamController {
109 constructor(stream) {
110 if (IsReadableStream(stream) === false) {
111 throw new TypeError('ReadableStreamController can only be constr ucted with a ReadableStream instance');
112 }
113
114 if (GET_PRIVATE(stream, 'ReadableStream#controller') !== undefined) {
115 throw new TypeError(
116 'ReadableStreamController instances can only be created by t he ReadableStream constructor');
117 }
118
119 SET_PRIVATE(this, 'ReadableStreamController#controlledReadableStream ', stream);
120 }
121
122 get desiredSize() {
123 if (IsReadableStreamController(this) === false) {
124 throw new TypeError(
125 'ReadableStreamController.prototype.desiredSize can only be used on a ReadableStreamController');
126 }
127
128 return GetReadableStreamDesiredSize(GET_PRIVATE(this, 'ReadableStrea mController#controlledReadableStream'));
129 }
130
131 close() {
132 if (IsReadableStreamController(this) === false) {
133 throw new TypeError(
134 'ReadableStreamController.prototype.close can only be used o n a ReadableStreamController');
135 }
136
137 const stream = GET_PRIVATE(this, 'ReadableStreamController#controlle dReadableStream');
138
139 if (GET_PRIVATE(stream, 'ReadableStream#closeRequested') === true) {
140 throw new TypeError('The stream has already been closed; do not close it again!');
141 }
142 if (GET_PRIVATE(stream, 'ReadableStream#state') === 'errored') {
143 throw new TypeError('The stream is in an errored state and canno t be closed');
144 }
145
146 return CloseReadableStream(stream);
147 }
148
149 enqueue(chunk) {
150 if (IsReadableStreamController(this) === false) {
151 throw new TypeError(
152 'ReadableStreamController.prototype.enqueue can only be used on a ReadableStreamController');
153 }
154
155 const stream = GET_PRIVATE(this, 'ReadableStreamController#controlle dReadableStream');
156
157 if (GET_PRIVATE(stream, 'ReadableStream#state') === 'errored') {
158 throw GET_PRIVATE(stream, 'ReadableStream#storedError');
159 }
160
161 if (GET_PRIVATE(stream, 'ReadableStream#closeRequested') === true) {
162 throw new TypeError('stream is closed or draining');
163 }
164
165 return EnqueueInReadableStream(stream, chunk);
166 }
167
168 error(e) {
169 if (IsReadableStreamController(this) === false) {
170 throw new TypeError(
171 'ReadableStreamController.prototype.error can only be used o n a ReadableStreamController');
172 }
173
174 const stream = GET_PRIVATE(this, 'ReadableStreamController#controlle dReadableStream');
175
176 const state = GET_PRIVATE(stream, 'ReadableStream#state');
177 if (state !== 'readable') {
178 throw new TypeError(`The stream is ${state} and so cannot be err ored`);
179 }
180
181 return ErrorReadableStream(stream, e);
182 }
183 }
184
185 class ReadableStreamReader {
186 constructor(stream) {
187 if (IsReadableStream(stream) === false) {
188 throw new TypeError('ReadableStreamReader can only be constructed with a ReadableStream instance');
189 }
190 if (IsReadableStreamLocked(stream) === true) {
191 throw new TypeError('This stream has already been locked for exclu sive reading by another reader');
192 }
193
194 SET_PRIVATE(stream, 'ReadableStream#reader', this);
195 SET_PRIVATE(this, 'ReadableStreamReader#ownerReadableStream', stream );
196
197 // TODO(domenic): use integers for state>
198 SET_PRIVATE(this, 'ReadableStreamReader#state', 'readable');
199 SET_PRIVATE(this, 'ReadableStreamReader#storedError', undefined);
200
201 // TODO(domenic): use a real queue data structure
202 SET_PRIVATE(this, 'ReadableStreamReader#readRequests', []);
203
204 // TODO(domenic): use faster means of creating/resolving/rejecting p romises
205 const that = this;
206 SET_PRIVATE(this, 'ReadableStreamReader#closedPromise', new Promise( function (resolve, reject) {
207 SET_PRIVATE(that, 'ReadableStreamReader#closedPromise_resolve', re solve);
208 SET_PRIVATE(that, 'ReadableStreamReader#closedPromise_reject', rej ect);
209 }));
210
211 const streamState = GET_PRIVATE(stream, 'ReadableStream#state');
212 if (streamState === 'closed' || streamState === 'errored') {
213 ReleaseReadableStreamReader(this);
214 }
215 }
216
217 get closed() {
218 if (IsReadableStreamReader(this) === false) {
219 return Promise_reject(
220 new TypeError('ReadableStreamReader.prototype.closed can onl y be used on a ReadableStreamReader'));
221 }
222
223 return GET_PRIVATE(this, 'ReadableStreamReader#closedPromise');
224 }
225
226 cancel(reason) {
227 if (IsReadableStreamReader(this) === false) {
228 return Promise_reject(
229 new TypeError('ReadableStreamReader.prototype.cancel can onl y be used on a ReadableStreamReader'));
230 }
231
232 const state = GET_PRIVATE(this, 'ReadableStreamReader#state');
233 if (state === 'closed') {
234 return Promise_resolve(undefined);
235 }
236
237 if (state === 'errored') {
238 return Promise_reject(GET_PRIVATE(this, 'ReadableStreamReader#st oredError'));
239 }
240
241 return CancelReadableStream(GET_PRIVATE(this, 'ReadableStreamReader# ownerReadableStream'), reason);
242 }
243
244 read() {
245 if (IsReadableStreamReader(this) === false) {
246 return Promise_reject(
247 new TypeError('ReadableStreamReader.prototype.read can only be used on a ReadableStreamReader'));
248 }
249
250 return ReadFromReadableStreamReader(this);
251 }
252
253 releaseLock() {
254 if (IsReadableStreamReader(this) === false) {
255 throw new TypeError(
256 'ReadableStreamReader.prototype.releaseLock can only be used on a ReadableStreamReader');
257 }
258
259 if (GET_PRIVATE(this, 'ReadableStreamReader#ownerReadableStream') == = undefined) {
260 return undefined;
261 }
262
263 // TODO(domenic): is getting array lengths safe? I don't think so.
264 // Might become moot if we have a better data structure.
265 if (GET_PRIVATE(this, 'ReadableStreamReader#readRequests').length > 0) {
266 throw new TypeError(
267 'Tried to release a reader lock when that reader has pending read() calls un-settled');
268 }
269
270 return ReleaseReadableStreamReader(this);
271 }
272 }
273
274 // Readable stream abstract operations
275
276 function AcquireReadableStreamReader(stream) {
277 return new ReadableStreamReader(stream);
278 }
279
280 function CancelReadableStream(stream, reason) {
281 const state = GET_PRIVATE(stream, 'ReadableStream#state');
282 if (state === 'closed') {
283 return Promise_resolve(undefined);
284 }
285 if (state === 'errored') {
286 return Promise_reject(GET_PRIVATE(stream, 'ReadableStream#storedErro r'));
287 }
288
289 SET_PRIVATE(stream, 'ReadableStream#queue', []);
290 FinishClosingReadableStream(stream);
291
292 const underlyingSource = GET_PRIVATE(stream, 'ReadableStream#underlyingS ource');
293 const sourceCancelPromise = PromiseInvokeOrNoop(underlyingSource, 'cance l', [reason]);
294 return thenPromise(sourceCancelPromise, function() { return undefined; } );
295 }
296
297 function CloseReadableStream(stream) {
298 if (GET_PRIVATE(stream, 'ReadableStream#state') === 'closed') {
299 return undefined;
300 }
301
302 SET_PRIVATE(stream, 'ReadableStream#closeRequested', true);
303
304 if (GET_PRIVATE(stream, 'ReadableStream#queue').length === 0) {
305 return FinishClosingReadableStream(stream);
306 }
307 }
308
309
310 function EnqueueInReadableStream(stream, chunk) {
311 if (GET_PRIVATE(stream, 'ReadableStream#state') === 'closed') {
312 return undefined;
313 }
314
315 if (IsReadableStreamLocked(stream) === true &&
316 GET_PRIVATE(GET_PRIVATE(stream, 'ReadableStream#reader'), 'ReadableS treamReader#readRequests').length > 0) {
317 const readRequest = shiftArray(readRequests);
318 readRequest[resolve](CreateIterResultObject(chunk, false));
319 } else {
320 let chunkSize = 1;
321
322 const strategySize = GET_PRIVATE(stream, 'ReadableStream#strategySiz e');
323 if (strategySize !== undefined) {
324 try {
325 chunkSize = strategySize(chunk);
326 } catch (chunkSizeE) {
327 ErrorReadableStream(stream, chunkSizeE);
328 throw chunkSizeE;
329 }
330 }
331
332 try {
333 EnqueueValueWithSize(GET_PRIVATE(stream, 'ReadableStream#queue') , chunk, chunkSize);
334 } catch (enqueueE) {
335 ErrorReadableStream(stream, enqueueE);
336 throw enqueueE;
337 }
338 }
339
340 RequestReadableStreamPull(stream);
341 }
342
343 function ErrorReadableStream(stream, e) {
344 SET_PRIVATE(stream, 'ReadableStream#queue', []);
345 SET_PRIVATE(stream, 'ReadableStream#storedError', e);
346 SET_PRIVATE(stream, 'ReadableStream#state', 'errored');
347
348 if (IsReadableStreamLocked(stream) === true) {
349 return ReleaseReadableStreamReader(GET_PRIVATE(stream, 'ReadableStre am#reader'));
350 }
351 }
352
353 function FinishClosingReadableStream(stream) {
354 SET_PRIVATE(stream, 'ReadableStream#state', 'closed');
355
356 if (IsReadableStreamLocked(stream) === true) {
357 return ReleaseReadableStreamReader(GET_PRIVATE(stream, 'ReadableStre am#reader'));
358 }
359 }
360
361 function GetReadableStreamDesiredSize(stream) {
362 const queueSize = GetTotalQueueSize(GET_PRIVATE(stream, 'ReadableStream# queue'));
363 return GET_PRIVATE(stream, 'ReadableStream#strategyHWM') - queueSize;
364 }
365
366 function IsReadableStream(x) {
367 // TODO(domenic): is it safe to allow this to be called on non-objects?
368
369 return HAS_PRIVATE(x, 'ReadableStream#underlyingSource');
370 }
371
372 function IsReadableStreamLocked(stream) {
373 return GET_PRIVATE(stream, 'ReadableStream#reader') !== undefined;
374 }
375
376 function IsReadableStreamController(x) {
377 return HAS_PRIVATE(x, 'ReadableStreamController#controlledReadableStream ');
378 }
379
380 function IsReadableStreamReader(x) {
381 return HAS_PRIVATE(x, 'ReadableStreamReader#ownerReadableStream');
382 }
383
384 function ReadFromReadableStreamReader(reader) {
385 const state = GET_PRIVATE(reader, 'ReadableStreamReader#state');
386 if (state === 'closed') {
387 return Promise_resolve(CreateIterResultObject(undefined, true));
388 }
389
390 if (state === 'errored') {
391 return Promise_reject(GET_PRIVATE(reader, 'ReadableStreamReader#stor edError'));
392 }
393
394 const ownerReadableStream = GET_PRIVATE(reader, 'ReadableStreamReader#ow nerReadableStream');
395 const queue = GET_PRIVATE(ownerReadableStream, 'ReadableStream#queue');
396 if (queue.length > 0) {
397 const chunk = DequeueValue(queue);
398
399 if (GET_PRIVATE(ownerReadableStream, 'ReadableStream#closeRequested' ) === true && queue.length === 0) {
400 FinishClosingReadableStream(ownerReadableStream);
401 } else {
402 RequestReadableStreamPull(ownerReadableStream);
403 }
404
405 return Promise_resolve(CreateIterResultObject(chunk, false));
406 } else {
407 const readRequest = {};
408 readRequest[kPromise] = new Promise(function (resolve, reject) {
409 readRequest[kResolve] = resolve;
410 readRequest[kReject] = reject;
411 });
412
413 pushArray(GET_PRIVATE(reader, 'ReadableStreamReader#readRequests'), readRequest);
414 RequestReadableStreamPull(ownerReadableStream);
415 return readRequest[kPromise];
416 }
417 }
418
419 function ReleaseReadableStreamReader(reader) {
420 const ownerReadableStream = GET_PRIVATE(reader, 'ReadableStreamReader#ow nerReadableStream');
421 if (GET_PRIVATE(ownerReadableStream, 'ReadableStream#state') === 'errore d') {
422 SET_PRIVATE(reader, 'ReadableStreamReader#state', 'errored');
423
424 const e = GET_PRIVATE(ownerReadableStream, 'ReadableStream#storedErr or');
425 SET_PRIVATE(reader, 'ReadableStreamReader#storedError', e);
426 GET_PRIVATE(reader, 'ReadableStreamReader#closedPromise_reject')(e);
427
428 for (const readRequest of GET_PRIVATE(reader, 'ReadableStreamReader# readRequests')) {
429 readRequest[kReject](e);
430 }
431 } else {
432 SET_PRIVATE(reader, 'ReadableStreamReader#state', 'closed');
433 GET_PRIVATE(reader, 'ReadableStreamReader#closedPromise_resolve')(un defined);
434
435 for (const readRequest of GET_PRIVATE(reader, 'ReadableStreamReader# readRequests')) {
436 readRequest[kResolve](CreateIterResultObject(undefined, true));
437 }
438 }
439
440 SET_PRIVATE(reader, 'ReadableStreamReader#readRequests', []);
441 SET_PRIVATE(ownerReadableStream, 'ReadableStream#reader', undefined);
442 SET_PRIVATE(reader, 'ReadableStreamReader#ownerReadableStream', undefine d);
443 }
444
445 function RequestReadableStreamPull(stream) {
446 const shouldPull = ShouldReadableStreamPull(stream);
447 if (shouldPull === false) {
448 return undefined;
449 }
450
451 if (GET_PRIVATE(stream, 'ReadableStream#pulling') === true) {
452 SET_PRIVATE(stream, 'ReadableStream#pullAgain', true);
453 return undefined;
454 }
455
456 SET_PRIVATE(stream, 'ReadableStream#pulling', true);
457
458 const underlyingSource = GET_PRIVATE(stream, 'ReadableStream#underlyingS ource');
459 const controller = GET_PRIVATE(stream, 'ReadableStream#controller');
460 const pullPromise = PromiseInvokeOrNoop(underlyingSource, 'pull', [contr oller]);
461
462 thenPromise(pullPromise,
463 function () {
464 SET_PRIVATE(stream, 'ReadableStream#pulling', false);
465
466 if (GET_PRIVATE(stream, 'ReadableStream#pullAgain') === true) {
467 SET_PRIVATE(stream, 'ReadableStream#pullAgain', false);
468 return RequestReadableStreamPull(stream);
469 }
470 },
471 function (e) {
472 if (GET_PRIVATE(stream, 'ReadableStream#state') === 'readable') {
473 return ErrorReadableStream(stream, e);
474 }
475 }
476 );
477 }
478
479 function ShouldReadableStreamPull(stream) {
480 const state = GET_PRIVATE(stream, 'ReadableStream#state');
481 if (state === 'closed' || state === 'errored') {
482 return false;
483 }
484
485 if (GET_PRIVATE(stream, 'ReadableStream#closeRequested') === true) {
486 return false;
487 }
488
489 if (GET_PRIVATE(stream, 'ReadableStream#started') === false) {
490 return false;
491 }
492
493 if (IsReadableStreamLocked(stream) === true) {
494 const reader = GET_PRIVATE(stream, 'ReadableStream#reader');
495 const readRequests = GET_PRIVATE(reader, 'ReadableStreamReader#readR equests');
496 if (readRequests.length > 0) {
497 return true;
498 }
499 }
500
501 const desiredSize = GetReadableStreamDesiredSize(stream);
502 if (desiredSize < 0) {
503 return true;
504 }
505
506 return false;
507 }
508
509 // TODO TeeReadableStream
510
511
512
513 //
514 // Queue-with-sizes
515 //
516
517 // TODO(domenic): manipulating arrays seems fraught with peril in general; e .g. if someone defines getters/setters
518 // on the prototype chain, we can no longer shift and push.
519
520 function DequeueValue(queue) {
521 return shiftArray(queue).value;
522 }
523
524 function EnqueueValueWithSize(queue, value, size) {
525 size = Number(size);
526 if (Number_isNaN(size) || size === +Infinity || size === -Infinity) {
527 throw new RangeError('size must be a finite, non-NaN number.');
528 }
529
530 // TODO(domenic): is adding numbers safe? Overridden valueOf could ruin our day.
531 queue[kQueueSize] += size;
532
533 pushArray(queue, { value, size });
534 }
535
536 function GetTotalQueueSize(queue) {
537 return queue[kQueueSize];
538 }
539
540 //
541 // Other helpers
542 //
543
544 function ValidateAndNormalizeQueuingStrategy(size, highWaterMark) {
545 if (size !== undefined && typeof size !== 'function') {
546 throw new TypeError('size property of a queuing strategy must be a f unction');
547 }
548
549 highWaterMark = Number(highWaterMark);
550 if (Number_isNaN(highWaterMark)) {
551 throw new TypeError('highWaterMark property of a queuing strategy mu st be convertible to a non-NaN number');
552 }
553 if (highWaterMark < 0) {
554 throw new RangeError('highWaterMark property of a queuing strategy m ust be nonnegative');
555 }
556
557 return { size, highWaterMark };
558 }
559
560 function InvokeOrNoop(O, P, args) {
561 const method = O[P];
562 if (method === undefined) {
563 return undefined;
564 }
565 return applyFunction(method, O, args);
566 }
567
568
569 function PromiseInvokeOrNoop(O, P, args) {
570 let method;
571 try {
572 method = O[P];
573 } catch (methodE) {
574 return Promise_reject(methodE);
575 }
576
577 if (method === undefined) {
578 return Promise_resolve(undefined);
579 }
580
581 try {
582 return Promise_resolve(applyFunction(method, O, args));
583 } catch (e) {
584 return Promise_reject(e);
585 }
586 }
587
588 function CreateIterResultObject(value, done) {
589 return { value, done };
590 }
591
592
593 //
594 // Exports
595 //
596
597 Object.defineProperty(global, 'ReadableStream', {
598 enumerable: false,
599 writable: true,
600 configurable: true,
601 value: ReadableStream
602 });
603 }(this));
OLDNEW
« no previous file with comments | « Source/platform/streams/ReadableStream.cpp ('k') | Source/platform/streams/WebStreams.cpp » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698