OLD | NEW |
---|---|
(Empty) | |
1 (function(global, binding, v8) { | |
2 'use strict'; | |
3 | |
4 // TODO(domenic): factor out error messages, and reuse existing Blink or V8 | |
5 // ones? | |
6 | |
7 const readableStreamCloseRequested = | |
8 v8.createPrivateSymbol('[[closeRequested]]'); | |
9 const readableStreamController = v8.createPrivateSymbol('[[controller]]'); | |
10 const readableStreamPullAgain = v8.createPrivateSymbol('[[pullAgain]]'); | |
11 const readableStreamPulling = v8.createPrivateSymbol('[[pulling]]'); | |
12 const readableStreamQueue = v8.createPrivateSymbol('[[queue]]'); | |
13 const readableStreamQueueSize = | |
14 v8.createPrivateSymbol('[[queue]] total size'); | |
15 const readableStreamReader = v8.createPrivateSymbol('[[reader]]'); | |
16 const readableStreamStarted = v8.createPrivateSymbol('[[started]]'); | |
17 const readableStreamState = v8.createPrivateSymbol('[[state]]'); | |
18 const readableStreamStoredError = v8.createPrivateSymbol('[[storedError]]'); | |
19 const readableStreamStrategySize = v8.createPrivateSymbol('[[strategySize]]'); | |
20 const readableStreamStrategyHWM = v8.createPrivateSymbol('[[strategyHWM]]'); | |
21 const readableStreamUnderlyingSource = | |
22 v8.createPrivateSymbol('[[underlyingSource]]'); | |
23 const readableStreamDisturbed = v8.createPrivateSymbol('[[disturbed]]'); | |
24 | |
25 const readableStreamControllerControlledReadableStream = | |
26 v8.createPrivateSymbol('[[controlledReadableStream]]'); | |
27 | |
28 const readableStreamReaderClosedPromise = | |
29 v8.createPrivateSymbol('[[closedPromise]]'); | |
30 const readableStreamReaderOwnerReadableStream = | |
31 v8.createPrivateSymbol('[[ownerReadableStream]]'); | |
32 const readableStreamReaderReadRequests = | |
33 v8.createPrivateSymbol('[[readRequests]]'); | |
34 | |
35 const STATE_READABLE = 0; | |
36 const STATE_CLOSED = 1; | |
37 const STATE_ERRORED = 2; | |
38 | |
39 const undefined = global.undefined; | |
40 const Infinity = global.Infinity; | |
41 | |
42 const defineProperty = global.Object.defineProperty; | |
43 const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty); | |
44 const callFunction = v8.uncurryThis(global.Function.prototype.call); | |
45 | |
46 const TypeError = global.TypeError; | |
47 const RangeError = global.RangeError; | |
48 | |
49 const Number = global.Number; | |
50 const Number_isNaN = Number.isNaN; | |
51 const Number_isFinite = Number.isFinite; | |
52 | |
53 const Promise = global.Promise; | |
54 const thenPromise = v8.uncurryThis(Promise.prototype.then); | |
55 const Promise_resolve = v8.simpleBind(Promise.resolve, Promise); | |
56 const Promise_reject = v8.simpleBind(Promise.reject, Promise); | |
57 | |
58 class ReadableStream { | |
59 constructor() { | |
60 // TODO(domenic): when V8 gets default parameters and destructuring, all | |
61 // this can be cleaned up. | |
62 const underlyingSource = arguments[0] === undefined ? {} : arguments[0]; | |
63 const strategy = arguments[1] === undefined ? {} : arguments[1]; | |
64 const size = strategy.size; | |
65 let highWaterMark = strategy.highWaterMark; | |
66 if (highWaterMark === undefined) { | |
67 highWaterMark = 1; | |
68 } | |
69 | |
70 const normalizedStrategy = | |
71 ValidateAndNormalizeQueuingStrategy(size, highWaterMark); | |
72 | |
73 this[readableStreamUnderlyingSource] = underlyingSource; | |
74 | |
75 this[readableStreamQueue] = new v8.InternalPackedArray(); | |
76 this[readableStreamQueueSize] = 0; | |
77 | |
78 // TODO(domenic) consolidate booleans into a bit field? | |
79 // TODO(domenic) use integers for state? (or put in bit field?) | |
80 this[readableStreamState] = STATE_READABLE; | |
81 this[readableStreamStarted] = false; | |
82 this[readableStreamCloseRequested] = false; | |
83 this[readableStreamPulling] = false; | |
84 this[readableStreamPullAgain] = false; | |
85 this[readableStreamReader] = undefined; | |
86 this[readableStreamStoredError] = undefined; | |
87 | |
88 this[readableStreamDisturbed] = false; | |
89 | |
90 this[readableStreamStrategySize] = normalizedStrategy.size; | |
91 this[readableStreamStrategyHWM] = normalizedStrategy.highWaterMark; | |
92 | |
93 const controller = new ReadableStreamController(this); | |
94 this[readableStreamController] = controller; | |
95 | |
96 const that = this; | |
97 const startResult = CallOrNoop( | |
98 underlyingSource, 'start', controller, 'underlyingSource.start'); | |
99 thenPromise(Promise_resolve(startResult), | |
100 function() { | |
101 that[readableStreamStarted] = true; | |
102 RequestReadableStreamPull(that); | |
103 }, | |
104 function(r) { | |
105 if (that[readableStreamState] === STATE_READABLE) { | |
106 return ErrorReadableStream(that, r); | |
107 } | |
108 }); | |
109 } | |
110 | |
111 get locked() { | |
112 if (IsReadableStream(this) === false) { | |
113 throw new TypeError( | |
114 'ReadableStream.prototype.locked can only be used on a ReadableStrea m'); | |
115 } | |
116 | |
117 return IsReadableStreamLocked(this); | |
118 } | |
119 | |
120 cancel(reason) { | |
121 if (IsReadableStream(this) === false) { | |
122 return Promise_reject(new TypeError( | |
123 'ReadableStream.prototype.cancel can only be used on a ReadableStrea m')); | |
124 } | |
125 | |
126 if (IsReadableStreamLocked(this) === true) { | |
127 return Promise_reject( | |
128 new TypeError('Cannot cancel a stream that already has a reader')); | |
129 } | |
130 | |
131 return CancelReadableStream(this, reason); | |
132 } | |
133 | |
134 getReader() { | |
135 if (IsReadableStream(this) === false) { | |
136 throw new TypeError( | |
137 'ReadableStream.prototype.getReader can only be used on a ReadableSt ream'); | |
138 } | |
139 | |
140 return AcquireReadableStreamReader(this); | |
141 } | |
142 | |
143 tee() { | |
144 if (IsReadableStream(this) === false) { | |
145 throw new TypeError( | |
146 'ReadableStream.prototype.tee can only be used on a ReadableStream') ; | |
147 } | |
148 | |
149 return TeeReadableStream(this); | |
150 } | |
151 } | |
152 | |
153 class ReadableStreamController { | |
154 constructor(stream) { | |
155 if (IsReadableStream(stream) === false) { | |
156 throw new TypeError( | |
157 'ReadableStreamController can only be constructed with a ReadableStr eam instance'); | |
158 } | |
159 | |
160 if (stream[readableStreamController] !== undefined) { | |
161 throw new TypeError( | |
162 'ReadableStreamController instances can only be created by the Reada bleStream constructor'); | |
163 } | |
164 | |
165 this[readableStreamControllerControlledReadableStream] = stream; | |
166 } | |
167 | |
168 get desiredSize() { | |
169 if (IsReadableStreamController(this) === false) { | |
170 throw new TypeError( | |
171 'ReadableStreamController.prototype.desiredSize can only be used on a ReadableStreamController'); | |
172 } | |
173 | |
174 return GetReadableStreamDesiredSize( | |
175 this[readableStreamControllerControlledReadableStream]); | |
176 } | |
177 | |
178 close() { | |
179 if (IsReadableStreamController(this) === false) { | |
180 throw new TypeError( | |
181 'ReadableStreamController.prototype.close can only be used on a Read ableStreamController'); | |
182 } | |
183 | |
184 const stream = this[readableStreamControllerControlledReadableStream]; | |
185 | |
186 if (stream[readableStreamCloseRequested] === true) { | |
187 throw new TypeError( | |
188 'The stream has already been closed; do not close it again!'); | |
189 } | |
190 if (stream[readableStreamState] === STATE_ERRORED) { | |
191 throw new TypeError( | |
192 'The stream is in an errored state and cannot be closed'); | |
193 } | |
194 | |
195 return CloseReadableStream(stream); | |
196 } | |
197 | |
198 enqueue(chunk) { | |
199 if (IsReadableStreamController(this) === false) { | |
200 throw new TypeError( | |
201 'ReadableStreamController.prototype.enqueue can only be used on a Re adableStreamController'); | |
202 } | |
203 | |
204 const stream = this[readableStreamControllerControlledReadableStream]; | |
205 | |
206 if (stream[readableStreamState] === STATE_ERRORED) { | |
207 throw stream[readableStreamStoredError]; | |
208 } | |
209 | |
210 if (stream[readableStreamCloseRequested] === true) { | |
211 throw new TypeError('stream is closed or draining'); | |
212 } | |
213 | |
214 return EnqueueInReadableStream(stream, chunk); | |
215 } | |
216 | |
217 error(e) { | |
218 if (IsReadableStreamController(this) === false) { | |
219 throw new TypeError( | |
220 'ReadableStreamController.prototype.error can only be used on a Read ableStreamController'); | |
221 } | |
222 | |
223 const stream = this[readableStreamControllerControlledReadableStream]; | |
224 | |
225 const state = stream[readableStreamState]; | |
226 if (state !== STATE_READABLE) { | |
227 throw new TypeError(`The stream is ${state} and so cannot be errored`); | |
228 } | |
229 | |
230 return ErrorReadableStream(stream, e); | |
231 } | |
232 } | |
233 | |
234 class ReadableStreamReader { | |
235 constructor(stream) { | |
236 if (IsReadableStream(stream) === false) { | |
237 throw new TypeError( | |
238 'ReadableStreamReader can only be constructed with a ReadableStream instance'); | |
239 } | |
240 if (IsReadableStreamLocked(stream) === true) { | |
241 throw new TypeError( | |
242 'This stream has already been locked for exclusive reading by anothe r reader'); | |
243 } | |
244 | |
245 this[readableStreamReaderOwnerReadableStream] = stream; | |
246 stream[readableStreamReader] = this; | |
247 | |
248 this[readableStreamReaderReadRequests] = new v8.InternalPackedArray(); | |
249 | |
250 switch (stream[readableStreamState]) { | |
251 case STATE_READABLE: | |
252 this[readableStreamReaderClosedPromise] = v8.createPromise(); | |
253 break; | |
254 case STATE_CLOSED: | |
255 this[readableStreamReaderClosedPromise] = Promise_resolve(undefined); | |
256 break; | |
257 case STATE_ERRORED: | |
258 this[readableStreamReaderClosedPromise] = | |
259 Promise_reject(stream[readableStreamStoredError]); | |
260 break; | |
261 } | |
262 } | |
263 | |
264 get closed() { | |
265 if (IsReadableStreamReader(this) === false) { | |
266 return Promise_reject(new TypeError( | |
267 'ReadableStreamReader.prototype.closed can only be used on a Readabl eStreamReader')); | |
268 } | |
269 | |
270 return this[readableStreamReaderClosedPromise]; | |
271 } | |
272 | |
273 cancel(reason) { | |
274 if (IsReadableStreamReader(this) === false) { | |
275 return Promise_reject(new TypeError( | |
276 'ReadableStreamReader.prototype.cancel can only be used on a Readabl eStreamReader')); | |
277 } | |
278 | |
279 const stream = this[readableStreamReaderOwnerReadableStream]; | |
280 if (stream === undefined) { | |
281 return Promise_reject( | |
282 new TypeError('Cannot cancel a stream using a released reader')); | |
283 } | |
284 | |
285 return CancelReadableStream(stream, reason); | |
286 } | |
287 | |
288 read() { | |
289 if (IsReadableStreamReader(this) === false) { | |
290 return Promise_reject(new TypeError( | |
291 'ReadableStreamReader.prototype.read can only be used on a ReadableS treamReader')); | |
292 } | |
293 | |
294 if (this[readableStreamReaderOwnerReadableStream] === undefined) { | |
295 return Promise_reject( | |
296 new TypeError('Cannot read from a released reader')); | |
297 } | |
298 | |
299 return ReadFromReadableStreamReader(this); | |
300 } | |
301 | |
302 releaseLock() { | |
303 if (IsReadableStreamReader(this) === false) { | |
304 throw new TypeError( | |
305 'ReadableStreamReader.prototype.releaseLock can only be used on a Re adableStreamReader'); | |
306 } | |
307 | |
308 const stream = this[readableStreamReaderOwnerReadableStream]; | |
309 if (stream === undefined) { | |
310 return undefined; | |
311 } | |
312 | |
313 if (this[readableStreamReaderReadRequests].length > 0) { | |
314 throw new TypeError( | |
315 'Tried to release a reader lock when that reader has pending read() calls un-settled'); | |
316 } | |
317 | |
318 if (stream[readableStreamState] === STATE_READABLE) { | |
319 v8.rejectPromise( | |
320 this[readableStreamReaderClosedPromise], | |
321 new TypeError( | |
322 'Reader was released and can no longer be used to monitor the st ream\'s closedness')); | |
323 } else { | |
324 this[readableStreamReaderClosedPromise] = Promise_reject(new TypeError( | |
325 'Reader was released and can no longer be used to monitor the stream \'s closedness')); | |
326 } | |
327 | |
328 this[readableStreamReaderOwnerReadableStream][readableStreamReader] = | |
329 undefined; | |
330 this[readableStreamReaderOwnerReadableStream] = undefined; | |
331 } | |
332 } | |
333 | |
334 // | |
335 // Readable stream abstract operations | |
336 // | |
337 | |
338 function AcquireReadableStreamReader(stream) { | |
339 return new ReadableStreamReader(stream); | |
340 } | |
341 | |
342 function CancelReadableStream(stream, reason) { | |
343 stream[readableStreamDisturbed] = true; | |
344 | |
345 const state = stream[readableStreamState]; | |
346 if (state === STATE_CLOSED) { | |
347 return Promise_resolve(undefined); | |
348 } | |
349 if (state === STATE_ERRORED) { | |
350 return Promise_reject(stream[readableStreamStoredError]); | |
351 } | |
352 | |
353 stream[readableStreamQueue] = new v8.InternalPackedArray(); | |
354 FinishClosingReadableStream(stream); | |
355 | |
356 const underlyingSource = stream[readableStreamUnderlyingSource]; | |
357 const sourceCancelPromise = PromiseCallOrNoop( | |
358 underlyingSource, 'cancel', reason, 'underlyingSource.cancel'); | |
359 return thenPromise(sourceCancelPromise, function() { return undefined; }); | |
360 } | |
361 | |
362 function CloseReadableStream(stream) { | |
363 if (stream[readableStreamState] === STATE_CLOSED) { | |
364 return undefined; | |
365 } | |
366 | |
367 stream[readableStreamCloseRequested] = true; | |
368 | |
369 if (stream[readableStreamQueue].length === 0) { | |
370 return FinishClosingReadableStream(stream); | |
371 } | |
372 } | |
373 | |
374 function EnqueueInReadableStream(stream, chunk) { | |
375 if (stream[readableStreamState] === STATE_CLOSED) { | |
376 return undefined; | |
377 } | |
378 | |
379 if (IsReadableStreamLocked(stream) === true && | |
380 stream[readableStreamReader][readableStreamReaderReadRequests].length > | |
381 0) { | |
382 const readRequest = | |
383 stream[readableStreamReader][readableStreamReaderReadRequests] | |
384 .shift(); | |
385 v8.resolvePromise(readRequest, CreateIterResultObject(chunk, false)); | |
386 } else { | |
387 let chunkSize = 1; | |
388 | |
389 const strategySize = stream[readableStreamStrategySize]; | |
390 if (strategySize !== undefined) { | |
391 try { | |
392 chunkSize = strategySize(chunk); | |
393 } catch (chunkSizeE) { | |
394 ErrorReadableStream(stream, chunkSizeE); | |
395 throw chunkSizeE; | |
396 } | |
397 } | |
398 | |
399 try { | |
400 EnqueueValueWithSize(stream, chunk, chunkSize); | |
401 } catch (enqueueE) { | |
402 ErrorReadableStream(stream, enqueueE); | |
403 throw enqueueE; | |
404 } | |
405 } | |
406 | |
407 RequestReadableStreamPull(stream); | |
408 } | |
409 | |
410 function ErrorReadableStream(stream, e) { | |
411 stream[readableStreamQueue] = new v8.InternalPackedArray(); | |
412 stream[readableStreamStoredError] = e; | |
413 stream[readableStreamState] = STATE_ERRORED; | |
414 | |
415 const reader = stream[readableStreamReader]; | |
416 if (reader === undefined) { | |
417 return undefined; | |
418 } | |
419 | |
420 const readRequests = reader[readableStreamReaderReadRequests]; | |
421 for (let i = 0; i < readRequests.length; ++i) { | |
422 v8.rejectPromise(readRequests[i], e); | |
423 } | |
424 reader[readableStreamReaderReadRequests] = new v8.InternalPackedArray(); | |
425 | |
426 v8.rejectPromise(reader[readableStreamReaderClosedPromise], e); | |
427 } | |
428 | |
429 function FinishClosingReadableStream(stream) { | |
430 stream[readableStreamState] = STATE_CLOSED; | |
431 | |
432 const reader = stream[readableStreamReader]; | |
433 if (reader === undefined) { | |
434 return undefined; | |
435 } | |
436 | |
437 | |
438 const readRequests = reader[readableStreamReaderReadRequests]; | |
439 for (let i = 0; i < readRequests.length; ++i) { | |
440 v8.resolvePromise( | |
441 readRequests[i], CreateIterResultObject(undefined, true)); | |
442 } | |
443 reader[readableStreamReaderReadRequests] = new v8.InternalPackedArray(); | |
444 | |
445 v8.resolvePromise(reader[readableStreamReaderClosedPromise], undefined); | |
446 } | |
447 | |
448 function GetReadableStreamDesiredSize(stream) { | |
449 const queueSize = GetTotalQueueSize(stream); | |
450 return stream[readableStreamStrategyHWM] - queueSize; | |
451 } | |
452 | |
453 // TODO can we use `in` instead of hasOwnProperty? | |
yhirano
2015/10/14 10:28:49
TODO(domenic):
domenic
2015/10/14 16:25:31
Removed; I remembered why it was invalid to use `i
| |
454 function IsReadableStream(x) { | |
455 return hasOwnProperty(x, readableStreamUnderlyingSource); | |
456 } | |
457 | |
458 function IsReadableStreamDisturbed(stream) { | |
459 return stream[readableStreamDisturbed]; | |
460 } | |
461 | |
462 function IsReadableStreamLocked(stream) { | |
463 return stream[readableStreamReader] !== undefined; | |
464 } | |
465 | |
466 function IsReadableStreamController(x) { | |
467 return hasOwnProperty(x, readableStreamControllerControlledReadableStream); | |
468 } | |
469 | |
470 function IsReadableStreamReader(x) { | |
471 return hasOwnProperty(x, readableStreamReaderOwnerReadableStream); | |
472 } | |
473 | |
474 function ReadFromReadableStreamReader(reader) { | |
475 const stream = reader[readableStreamReaderOwnerReadableStream]; | |
476 stream[readableStreamDisturbed] = true; | |
477 | |
478 if (stream[readableStreamState] === STATE_CLOSED) { | |
479 return Promise_resolve(CreateIterResultObject(undefined, true)); | |
480 } | |
481 | |
482 if (stream[readableStreamState] === STATE_ERRORED) { | |
483 return Promise_reject(stream[readableStreamStoredError]); | |
484 } | |
485 | |
486 const queue = stream[readableStreamQueue]; | |
487 if (queue.length > 0) { | |
488 const chunk = DequeueValue(stream); | |
489 | |
490 if (stream[readableStreamCloseRequested] === true && | |
491 queue.length === 0) { | |
492 FinishClosingReadableStream(stream); | |
493 } else { | |
494 RequestReadableStreamPull(stream); | |
495 } | |
496 | |
497 return Promise_resolve(CreateIterResultObject(chunk, false)); | |
498 } else { | |
499 const readRequest = v8.createPromise(); | |
500 | |
501 reader[readableStreamReaderReadRequests].push(readRequest); | |
502 RequestReadableStreamPull(stream); | |
503 return readRequest; | |
504 } | |
505 } | |
506 | |
507 function RequestReadableStreamPull(stream) { | |
508 const shouldPull = ShouldReadableStreamPull(stream); | |
509 if (shouldPull === false) { | |
510 return undefined; | |
511 } | |
512 | |
513 if (stream[readableStreamPulling] === true) { | |
514 stream[readableStreamPullAgain] = true; | |
515 return undefined; | |
516 } | |
517 | |
518 stream[readableStreamPulling] = true; | |
519 | |
520 const underlyingSource = stream[readableStreamUnderlyingSource]; | |
521 const controller = stream[readableStreamController]; | |
522 const pullPromise = PromiseCallOrNoop( | |
523 underlyingSource, 'pull', controller, 'underlyingSource.pull'); | |
524 | |
525 thenPromise(pullPromise, | |
526 function() { | |
527 stream[readableStreamPulling] = false; | |
528 | |
529 if (stream[readableStreamPullAgain] === true) { | |
530 stream[readableStreamPullAgain] = false; | |
531 return RequestReadableStreamPull(stream); | |
532 } | |
533 }, | |
534 function(e) { | |
535 if (stream[readableStreamState] === STATE_READABLE) { | |
536 return ErrorReadableStream(stream, e); | |
537 } | |
538 }); | |
539 } | |
540 | |
541 function ShouldReadableStreamPull(stream) { | |
542 const state = stream[readableStreamState]; | |
543 if (state === STATE_CLOSED || state === STATE_ERRORED) { | |
544 return false; | |
545 } | |
546 | |
547 if (stream[readableStreamCloseRequested] === true) { | |
548 return false; | |
549 } | |
550 | |
551 if (stream[readableStreamStarted] === false) { | |
552 return false; | |
553 } | |
554 | |
555 if (IsReadableStreamLocked(stream) === true) { | |
556 const reader = stream[readableStreamReader]; | |
557 const readRequests = reader[readableStreamReaderReadRequests]; | |
558 if (readRequests.length > 0) { | |
559 return true; | |
560 } | |
561 } | |
562 | |
563 const desiredSize = GetReadableStreamDesiredSize(stream); | |
564 if (desiredSize > 0) { | |
565 return true; | |
566 } | |
567 | |
568 return false; | |
569 } | |
570 | |
571 // Potential future optimization: use class instances for the underlying | |
572 // sources, so that we don't re-create | |
573 // closures every time. | |
574 | |
575 function TeeReadableStream( | |
576 stream) { // shouldClone argument from spec not supported yet | |
yhirano
2015/10/14 10:28:49
TODO(domenic):
domenic
2015/10/14 16:25:31
done
| |
577 const reader = AcquireReadableStreamReader(stream); | |
578 | |
579 let closedOrErrored = false; | |
580 let canceled1 = false; | |
581 let canceled2 = false; | |
582 let reason1; | |
583 let reason2; | |
584 let promise = v8.createPromise(); | |
585 | |
586 const branch1 = new ReadableStream({pull, cancel: cancel1}); | |
587 | |
588 const branch2 = new ReadableStream({pull, cancel: cancel2}); | |
589 | |
590 thenPromise( | |
591 reader[readableStreamReaderClosedPromise], undefined, function(r) { | |
592 if (closedOrErrored === true) { | |
593 return; | |
594 } | |
595 | |
596 ErrorReadableStream(branch1, r); | |
597 ErrorReadableStream(branch2, r); | |
598 closedOrErrored = true; | |
599 }); | |
600 | |
601 return [branch1, branch2]; | |
602 | |
603 | |
604 function pull() { | |
605 return thenPromise( | |
606 ReadFromReadableStreamReader(reader), function(result) { | |
607 const value = result.value; | |
608 const done = result.done; | |
609 | |
610 if (done === true && closedOrErrored === false) { | |
611 CloseReadableStream(branch1); | |
612 CloseReadableStream(branch2); | |
613 closedOrErrored = true; | |
614 } | |
615 | |
616 if (closedOrErrored === true) { | |
617 return; | |
618 } | |
619 | |
620 if (canceled1 === false) { | |
621 EnqueueInReadableStream(branch1, value); | |
622 } | |
623 | |
624 if (canceled2 === false) { | |
625 EnqueueInReadableStream(branch2, value); | |
626 } | |
627 }); | |
628 } | |
629 | |
630 function cancel1(reason) { | |
631 canceled1 = true; | |
632 reason1 = reason; | |
633 | |
634 if (canceled2 === true) { | |
635 const compositeReason = [reason1, reason2]; | |
636 const cancelResult = CancelReadableStream(stream, compositeReason); | |
637 v8.resolvePromise(promise, cancelResult); | |
638 } | |
639 | |
640 return promise; | |
641 } | |
642 | |
643 function cancel2(reason) { | |
644 canceled2 = true; | |
645 reason2 = reason; | |
646 | |
647 if (canceled1 === true) { | |
648 const compositeReason = [reason1, reason2]; | |
649 const cancelResult = CancelReadableStream(stream, compositeReason); | |
650 v8.resolvePromise(promise, cancelResult); | |
651 } | |
652 | |
653 return promise; | |
654 } | |
655 } | |
656 | |
657 // | |
658 // Queue-with-sizes | |
659 // Modified from taking the queue (as in the spec) to taking the stream, so we | |
660 // can modify the queue size alongside. | |
661 // | |
662 | |
663 function DequeueValue(stream) { | |
664 const result = stream[readableStreamQueue].shift(); | |
665 stream[readableStreamQueueSize] -= result.size; | |
666 return result.value; | |
667 } | |
668 | |
669 function EnqueueValueWithSize(stream, value, size) { | |
670 size = Number(size); | |
671 if (Number_isNaN(size) || size === +Infinity || size < 0) { | |
672 throw new RangeError('size must be a finite, non-NaN, non-negative number. '); | |
673 } | |
674 | |
675 stream[readableStreamQueueSize] += size; | |
676 stream[readableStreamQueue].push({value, size}); | |
677 } | |
678 | |
679 function GetTotalQueueSize(stream) { return stream[readableStreamQueueSize]; } | |
680 | |
681 // | |
682 // Other helpers | |
683 // | |
684 | |
685 function ValidateAndNormalizeQueuingStrategy(size, highWaterMark) { | |
686 if (size !== undefined && typeof size !== 'function') { | |
687 throw new TypeError( | |
688 'size property of a queuing strategy must be a function'); | |
689 } | |
690 | |
691 highWaterMark = Number(highWaterMark); | |
692 if (Number_isNaN(highWaterMark)) { | |
693 throw new TypeError( | |
694 'highWaterMark property of a queuing strategy must be convertible to a non-NaN number'); | |
695 } | |
696 if (highWaterMark < 0) { | |
697 throw new RangeError( | |
698 'highWaterMark property of a queuing strategy must be nonnegative'); | |
699 } | |
700 | |
701 return {size, highWaterMark}; | |
702 } | |
703 | |
704 // Modified from InvokeOrNoop in spec | |
705 function CallOrNoop(O, P, arg, nameForError) { | |
706 const method = O[P]; | |
707 if (method === undefined) { | |
708 return undefined; | |
709 } | |
710 if (typeof method !== 'function') { | |
711 throw new TypeError(`${nameForError} must be a function or undefined`); | |
712 } | |
713 | |
714 return callFunction(method, O, arg); | |
715 } | |
716 | |
717 | |
718 // Modified from PromiseInvokeOrNoop in spec | |
719 function PromiseCallOrNoop(O, P, arg, nameForError) { | |
720 let method; | |
721 try { | |
722 method = O[P]; | |
723 } catch (methodE) { | |
724 return Promise_reject(methodE); | |
725 } | |
726 | |
727 if (method === undefined) { | |
728 return Promise_resolve(undefined); | |
729 } | |
730 | |
731 if (typeof method !== 'function') { | |
732 return Promise_reject(`${nameForError} must be a function or undefined`); | |
733 } | |
734 | |
735 try { | |
736 return Promise_resolve(callFunction(method, O, arg)); | |
737 } catch (e) { | |
738 return Promise_reject(e); | |
739 } | |
740 } | |
741 | |
742 function CreateIterResultObject(value, done) { return {value, done}; } | |
743 | |
744 | |
745 // | |
746 // Additions to the global | |
747 // | |
748 | |
749 defineProperty(global, 'ReadableStream', { | |
750 value: ReadableStream, | |
751 enumerable: false, | |
752 configurable: true, | |
753 writable: true | |
754 }); | |
755 | |
756 // | |
757 // Exports to Blink | |
758 // | |
759 | |
760 binding.IsReadableStreamDisturbed = IsReadableStreamDisturbed; | |
761 }); | |
OLD | NEW |