OLD | NEW |
---|---|
(Empty) | |
1 (function(global, binding, v8) { | |
yhirano
2015/10/21 03:45:27
LICENSE
| |
2 'use strict'; | |
3 | |
4 const readableStreamController = v8.createPrivateSymbol('[[controller]]'); | |
5 const readableStreamQueue = v8.createPrivateSymbol('[[queue]]'); | |
6 const readableStreamQueueSize = | |
7 v8.createPrivateSymbol('[[queue]] total size'); | |
8 const readableStreamReader = v8.createPrivateSymbol('[[reader]]'); | |
9 const readableStreamState = v8.createPrivateSymbol('[[state]]'); | |
10 const readableStreamStoredError = v8.createPrivateSymbol('[[storedError]]'); | |
11 const readableStreamStrategySize = v8.createPrivateSymbol('[[strategySize]]'); | |
12 const readableStreamStrategyHWM = v8.createPrivateSymbol('[[strategyHWM]]'); | |
13 const readableStreamUnderlyingSource = | |
14 v8.createPrivateSymbol('[[underlyingSource]]'); | |
15 | |
16 const readableStreamControllerControlledReadableStream = | |
17 v8.createPrivateSymbol('[[controlledReadableStream]]'); | |
18 | |
19 const readableStreamReaderClosedPromise = | |
20 v8.createPrivateSymbol('[[closedPromise]]'); | |
21 const readableStreamReaderOwnerReadableStream = | |
22 v8.createPrivateSymbol('[[ownerReadableStream]]'); | |
23 const readableStreamReaderReadRequests = | |
24 v8.createPrivateSymbol('[[readRequests]]'); | |
25 | |
26 const STATE_READABLE = 0; | |
27 const STATE_CLOSED = 1; | |
28 const STATE_ERRORED = 2; | |
29 | |
30 const readableStreamBits = v8.createPrivateSymbol( | |
31 'bit field for [[started]], [[closeRequested]], [[pulling]], [[pullAgain]] , [[disturbed]]'); | |
32 const STARTED = 0b1; | |
33 const CLOSE_REQUESTED = 0b10; | |
34 const PULLING = 0b100; | |
35 const PULL_AGAIN = 0b1000; | |
36 const DISTURBED = 0b10000; | |
37 | |
38 const undefined = global.undefined; | |
39 const Infinity = global.Infinity; | |
40 | |
41 const defineProperty = global.Object.defineProperty; | |
42 const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty); | |
43 const callFunction = v8.uncurryThis(global.Function.prototype.call); | |
44 | |
45 const TypeError = global.TypeError; | |
46 const RangeError = global.RangeError; | |
47 | |
48 const Number = global.Number; | |
49 const Number_isNaN = Number.isNaN; | |
50 const Number_isFinite = Number.isFinite; | |
51 | |
52 const Promise = global.Promise; | |
53 const thenPromise = v8.uncurryThis(Promise.prototype.then); | |
54 const Promise_resolve = v8.simpleBind(Promise.resolve, Promise); | |
55 const Promise_reject = v8.simpleBind(Promise.reject, Promise); | |
56 | |
57 const errIllegalInvocation = 'Illegal invocation'; | |
58 const errIllegalConstructor = 'Illegal constructor'; | |
59 const errCancelLockedStream = | |
60 'Cannot cancel a readable stream that is locked to a reader'; | |
61 const errEnqueueInCloseRequestedStream = | |
62 'Cannot enqueue a chunk into a readable stream that is closed or has been requested to be closed'; | |
63 const errCancelReleasedReader = | |
64 'This readable stream reader has been released and cannot be used to cance l its previous owner stream'; | |
65 const errReadReleasedReader = | |
66 'This readable stream reader has been released and cannot be used to read from its previous owner stream'; | |
67 const errCloseCloseRequestedStream = | |
68 'Cannot close a readable stream that has already been requested to be clos ed'; | |
69 const errCloseErroredStream = 'Cannot close an errored readable stream'; | |
70 const errErrorClosedStream = 'Cannot error a close readable stream'; | |
71 const errErrorErroredStream = | |
72 'Cannot error a readable stream that is already errored'; | |
73 const errReaderConstructorBadArgument = | |
74 'ReadableStreamReader constructor argument is not a readable stream'; | |
75 const errReaderConstructorStreamAlreadyLocked = | |
76 'ReadableStreamReader constructor can only accept readable streams that ar e not yet locked to a reader'; | |
77 const errReleaseReaderWithPendingRead = | |
78 'Cannot release a readable stream reader when it still has outstanding rea d() calls that have not yet settled'; | |
79 const errReleasedReaderClosedPromise = | |
80 'This readable stream reader has been released and cannot be used to monit or the stream\'s state'; | |
81 const errInvalidSize = | |
82 'The return value of a queuing strategy\'s size function must be a finite, non-NaN, non-negative number'; | |
83 const errSizeNotAFunction = | |
84 'A queuing strategy\'s size property must be a function'; | |
85 const errInvalidHWM = | |
86 'A queueing strategy\'s highWaterMark property must be a nonnegative, non- NaN number'; | |
87 const errTmplMustBeFunctionOrUndefined = name => | |
88 `${name} must be a function or undefined`; | |
89 | |
90 class ReadableStream { | |
91 constructor() { | |
92 // TODO(domenic): when V8 gets default parameters and destructuring, all | |
93 // this can be cleaned up. | |
94 const underlyingSource = arguments[0] === undefined ? {} : arguments[0]; | |
95 const strategy = arguments[1] === undefined ? {} : arguments[1]; | |
96 const size = strategy.size; | |
97 let highWaterMark = strategy.highWaterMark; | |
98 if (highWaterMark === undefined) { | |
99 highWaterMark = 1; | |
100 } | |
101 | |
102 const normalizedStrategy = | |
103 ValidateAndNormalizeQueuingStrategy(size, highWaterMark); | |
104 | |
105 this[readableStreamUnderlyingSource] = underlyingSource; | |
106 | |
107 this[readableStreamQueue] = new v8.InternalPackedArray(); | |
108 this[readableStreamQueueSize] = 0; | |
109 | |
110 this[readableStreamState] = STATE_READABLE; | |
111 this[readableStreamBits] = 0b0; | |
112 this[readableStreamReader] = undefined; | |
113 this[readableStreamStoredError] = undefined; | |
114 | |
115 this[readableStreamStrategySize] = normalizedStrategy.size; | |
116 this[readableStreamStrategyHWM] = normalizedStrategy.highWaterMark; | |
117 | |
118 const controller = new ReadableStreamController(this); | |
119 this[readableStreamController] = controller; | |
120 | |
121 const startResult = CallOrNoop( | |
122 underlyingSource, 'start', controller, 'underlyingSource.start'); | |
123 thenPromise(Promise_resolve(startResult), | |
124 () => { | |
125 this[readableStreamBits] |= STARTED; | |
126 RequestReadableStreamPull(this); | |
127 }, | |
128 r => { | |
129 if (this[readableStreamState] === STATE_READABLE) { | |
130 return ErrorReadableStream(this, r); | |
131 } | |
132 }); | |
133 } | |
134 | |
135 get locked() { | |
136 if (IsReadableStream(this) === false) { | |
137 throw new TypeError(errIllegalInvocation); | |
138 } | |
139 | |
140 return IsReadableStreamLocked(this); | |
141 } | |
142 | |
143 cancel(reason) { | |
144 if (IsReadableStream(this) === false) { | |
145 return Promise_reject(new TypeError(errIllegalInvocation)); | |
146 } | |
147 | |
148 if (IsReadableStreamLocked(this) === true) { | |
149 return Promise_reject(new TypeError(errCancelLockedStream)); | |
150 } | |
151 | |
152 return CancelReadableStream(this, reason); | |
153 } | |
154 | |
155 getReader() { | |
156 if (IsReadableStream(this) === false) { | |
157 throw new TypeError(errIllegalInvocation); | |
158 } | |
159 | |
160 return AcquireReadableStreamReader(this); | |
161 } | |
162 | |
163 tee() { | |
164 if (IsReadableStream(this) === false) { | |
165 throw new TypeError(errIllegalInvocation); | |
166 } | |
167 | |
168 return TeeReadableStream(this); | |
169 } | |
170 } | |
171 | |
172 class ReadableStreamController { | |
173 constructor(stream) { | |
174 if (IsReadableStream(stream) === false) { | |
175 throw new TypeError(errIllegalConstructor); | |
176 } | |
177 | |
178 if (stream[readableStreamController] !== undefined) { | |
179 throw new TypeError(errIllegalConstructor); | |
180 } | |
181 | |
182 this[readableStreamControllerControlledReadableStream] = stream; | |
183 } | |
184 | |
185 get desiredSize() { | |
186 if (IsReadableStreamController(this) === false) { | |
187 throw new TypeError(errIllegalInvocation); | |
188 } | |
189 | |
190 return GetReadableStreamDesiredSize( | |
191 this[readableStreamControllerControlledReadableStream]); | |
192 } | |
193 | |
194 close() { | |
195 if (IsReadableStreamController(this) === false) { | |
196 throw new TypeError(errIllegalInvocation); | |
197 } | |
198 | |
199 const stream = this[readableStreamControllerControlledReadableStream]; | |
200 | |
201 if (stream[readableStreamBits] & CLOSE_REQUESTED) { | |
202 throw new TypeError(errCloseCloseRequestedStream); | |
203 } | |
204 if (stream[readableStreamState] === STATE_ERRORED) { | |
205 throw new TypeError(errCloseErroredStream); | |
206 } | |
207 | |
208 return CloseReadableStream(stream); | |
209 } | |
210 | |
211 enqueue(chunk) { | |
212 if (IsReadableStreamController(this) === false) { | |
213 throw new TypeError(errIllegalInvocation); | |
214 } | |
215 | |
216 const stream = this[readableStreamControllerControlledReadableStream]; | |
217 | |
218 if (stream[readableStreamState] === STATE_ERRORED) { | |
219 throw stream[readableStreamStoredError]; | |
220 } | |
221 | |
222 if (stream[readableStreamBits] & CLOSE_REQUESTED) { | |
223 throw new TypeError(errEnqueueInCloseRequestedStream); | |
224 } | |
225 | |
226 return EnqueueInReadableStream(stream, chunk); | |
227 } | |
228 | |
229 error(e) { | |
230 if (IsReadableStreamController(this) === false) { | |
231 throw new TypeError(errIllegalInvocation); | |
232 } | |
233 | |
234 const stream = this[readableStreamControllerControlledReadableStream]; | |
235 | |
236 const state = stream[readableStreamState]; | |
237 if (state !== STATE_READABLE) { | |
238 if (state === STATE_ERRORED) { | |
239 throw new TypeError(errErrorErroredStream); | |
240 } | |
241 if (state === STATE_CLOSED) { | |
242 throw new TypeError(errErrorClosedStream); | |
243 } | |
244 } | |
245 | |
246 return ErrorReadableStream(stream, e); | |
247 } | |
248 } | |
249 | |
250 class ReadableStreamReader { | |
251 constructor(stream) { | |
252 if (IsReadableStream(stream) === false) { | |
253 throw new TypeError(errReaderConstructorBadArgument); | |
254 } | |
255 if (IsReadableStreamLocked(stream) === true) { | |
256 throw new TypeError(errReaderConstructorStreamAlreadyLocked); | |
257 } | |
258 | |
259 this[readableStreamReaderOwnerReadableStream] = stream; | |
260 stream[readableStreamReader] = this; | |
261 | |
262 this[readableStreamReaderReadRequests] = new v8.InternalPackedArray(); | |
263 | |
264 switch (stream[readableStreamState]) { | |
265 case STATE_READABLE: | |
266 this[readableStreamReaderClosedPromise] = v8.createPromise(); | |
267 break; | |
268 case STATE_CLOSED: | |
269 this[readableStreamReaderClosedPromise] = Promise_resolve(undefined); | |
270 break; | |
271 case STATE_ERRORED: | |
272 this[readableStreamReaderClosedPromise] = | |
273 Promise_reject(stream[readableStreamStoredError]); | |
274 break; | |
275 } | |
276 } | |
277 | |
278 get closed() { | |
279 if (IsReadableStreamReader(this) === false) { | |
280 return Promise_reject(new TypeError(errIllegalInvocation)); | |
281 } | |
282 | |
283 return this[readableStreamReaderClosedPromise]; | |
284 } | |
285 | |
286 cancel(reason) { | |
287 if (IsReadableStreamReader(this) === false) { | |
288 return Promise_reject(new TypeError(errIllegalInvocation)); | |
289 } | |
290 | |
291 const stream = this[readableStreamReaderOwnerReadableStream]; | |
292 if (stream === undefined) { | |
293 return Promise_reject(new TypeError(errCancelReleasedReader)); | |
294 } | |
295 | |
296 return CancelReadableStream(stream, reason); | |
297 } | |
298 | |
299 read() { | |
300 if (IsReadableStreamReader(this) === false) { | |
301 return Promise_reject(new TypeError(errIllegalInvocation)); | |
302 } | |
303 | |
304 if (this[readableStreamReaderOwnerReadableStream] === undefined) { | |
305 return Promise_reject(new TypeError(errReadReleasedReader)); | |
306 } | |
307 | |
308 return ReadFromReadableStreamReader(this); | |
309 } | |
310 | |
311 releaseLock() { | |
312 if (IsReadableStreamReader(this) === false) { | |
313 throw new TypeError(errIllegalInvocation); | |
314 } | |
315 | |
316 const stream = this[readableStreamReaderOwnerReadableStream]; | |
317 if (stream === undefined) { | |
318 return undefined; | |
319 } | |
320 | |
321 if (this[readableStreamReaderReadRequests].length > 0) { | |
322 throw new TypeError(errReleaseReaderWithPendingRead); | |
323 } | |
324 | |
325 if (stream[readableStreamState] === STATE_READABLE) { | |
326 v8.rejectPromise(this[readableStreamReaderClosedPromise], | |
327 new TypeError(errReleasedReaderClosedPromise)); | |
328 } else { | |
329 this[readableStreamReaderClosedPromise] = Promise_reject(new TypeError( | |
330 errReleasedReaderClosedPromise)); | |
331 } | |
332 | |
333 this[readableStreamReaderOwnerReadableStream][readableStreamReader] = | |
334 undefined; | |
335 this[readableStreamReaderOwnerReadableStream] = undefined; | |
336 } | |
337 } | |
338 | |
339 // | |
340 // Readable stream abstract operations | |
341 // | |
342 | |
343 function AcquireReadableStreamReader(stream) { | |
344 return new ReadableStreamReader(stream); | |
345 } | |
346 | |
347 function CancelReadableStream(stream, reason) { | |
348 stream[readableStreamBits] |= DISTURBED; | |
349 | |
350 const state = stream[readableStreamState]; | |
351 if (state === STATE_CLOSED) { | |
352 return Promise_resolve(undefined); | |
353 } | |
354 if (state === STATE_ERRORED) { | |
355 return Promise_reject(stream[readableStreamStoredError]); | |
356 } | |
357 | |
358 stream[readableStreamQueue] = new v8.InternalPackedArray(); | |
359 FinishClosingReadableStream(stream); | |
360 | |
361 const underlyingSource = stream[readableStreamUnderlyingSource]; | |
362 const sourceCancelPromise = PromiseCallOrNoop( | |
363 underlyingSource, 'cancel', reason, 'underlyingSource.cancel'); | |
364 return thenPromise(sourceCancelPromise, () => undefined); | |
365 } | |
366 | |
367 function CloseReadableStream(stream) { | |
368 if (stream[readableStreamState] === STATE_CLOSED) { | |
369 return undefined; | |
370 } | |
371 | |
372 stream[readableStreamBits] |= CLOSE_REQUESTED; | |
373 | |
374 if (stream[readableStreamQueue].length === 0) { | |
375 return FinishClosingReadableStream(stream); | |
376 } | |
377 } | |
378 | |
379 function EnqueueInReadableStream(stream, chunk) { | |
380 if (stream[readableStreamState] === STATE_CLOSED) { | |
381 return undefined; | |
382 } | |
383 | |
384 if (IsReadableStreamLocked(stream) === true && | |
385 stream[readableStreamReader][readableStreamReaderReadRequests].length > | |
386 0) { | |
387 const readRequest = | |
388 stream[readableStreamReader][readableStreamReaderReadRequests] | |
389 .shift(); | |
390 v8.resolvePromise(readRequest, CreateIterResultObject(chunk, false)); | |
391 } else { | |
392 let chunkSize = 1; | |
393 | |
394 const strategySize = stream[readableStreamStrategySize]; | |
395 if (strategySize !== undefined) { | |
396 try { | |
397 chunkSize = strategySize(chunk); | |
398 } catch (chunkSizeE) { | |
399 ErrorReadableStream(stream, chunkSizeE); | |
400 throw chunkSizeE; | |
401 } | |
402 } | |
403 | |
404 try { | |
405 EnqueueValueWithSize(stream, chunk, chunkSize); | |
406 } catch (enqueueE) { | |
407 ErrorReadableStream(stream, enqueueE); | |
408 throw enqueueE; | |
409 } | |
410 } | |
411 | |
412 RequestReadableStreamPull(stream); | |
413 } | |
414 | |
415 function ErrorReadableStream(stream, e) { | |
416 stream[readableStreamQueue] = new v8.InternalPackedArray(); | |
417 stream[readableStreamStoredError] = e; | |
418 stream[readableStreamState] = STATE_ERRORED; | |
419 | |
420 const reader = stream[readableStreamReader]; | |
421 if (reader === undefined) { | |
422 return undefined; | |
423 } | |
424 | |
425 const readRequests = reader[readableStreamReaderReadRequests]; | |
426 for (let i = 0; i < readRequests.length; ++i) { | |
427 v8.rejectPromise(readRequests[i], e); | |
428 } | |
429 reader[readableStreamReaderReadRequests] = new v8.InternalPackedArray(); | |
430 | |
431 v8.rejectPromise(reader[readableStreamReaderClosedPromise], e); | |
432 } | |
433 | |
434 function FinishClosingReadableStream(stream) { | |
435 stream[readableStreamState] = STATE_CLOSED; | |
436 | |
437 const reader = stream[readableStreamReader]; | |
438 if (reader === undefined) { | |
439 return undefined; | |
440 } | |
441 | |
442 | |
443 const readRequests = reader[readableStreamReaderReadRequests]; | |
444 for (let i = 0; i < readRequests.length; ++i) { | |
445 v8.resolvePromise( | |
446 readRequests[i], CreateIterResultObject(undefined, true)); | |
447 } | |
448 reader[readableStreamReaderReadRequests] = new v8.InternalPackedArray(); | |
449 | |
450 v8.resolvePromise(reader[readableStreamReaderClosedPromise], undefined); | |
451 } | |
452 | |
453 function GetReadableStreamDesiredSize(stream) { | |
454 const queueSize = GetTotalQueueSize(stream); | |
455 return stream[readableStreamStrategyHWM] - queueSize; | |
456 } | |
457 | |
458 function IsReadableStream(x) { | |
459 return hasOwnProperty(x, readableStreamUnderlyingSource); | |
460 } | |
461 | |
462 function IsReadableStreamDisturbed(stream) { | |
463 return stream[readableStreamBits] & DISTURBED; | |
464 } | |
465 | |
466 function IsReadableStreamLocked(stream) { | |
467 return stream[readableStreamReader] !== undefined; | |
468 } | |
469 | |
470 function IsReadableStreamController(x) { | |
471 return hasOwnProperty(x, readableStreamControllerControlledReadableStream); | |
472 } | |
473 | |
474 function IsReadableStreamReader(x) { | |
475 return hasOwnProperty(x, readableStreamReaderOwnerReadableStream); | |
476 } | |
477 | |
478 function ReadFromReadableStreamReader(reader) { | |
479 const stream = reader[readableStreamReaderOwnerReadableStream]; | |
480 stream[readableStreamBits] |= DISTURBED; | |
481 | |
482 if (stream[readableStreamState] === STATE_CLOSED) { | |
483 return Promise_resolve(CreateIterResultObject(undefined, true)); | |
484 } | |
485 | |
486 if (stream[readableStreamState] === STATE_ERRORED) { | |
487 return Promise_reject(stream[readableStreamStoredError]); | |
488 } | |
489 | |
490 const queue = stream[readableStreamQueue]; | |
491 if (queue.length > 0) { | |
492 const chunk = DequeueValue(stream); | |
493 | |
494 if (stream[readableStreamBits] & CLOSE_REQUESTED && queue.length === 0) { | |
495 FinishClosingReadableStream(stream); | |
496 } else { | |
497 RequestReadableStreamPull(stream); | |
498 } | |
499 | |
500 return Promise_resolve(CreateIterResultObject(chunk, false)); | |
501 } else { | |
502 const readRequest = v8.createPromise(); | |
503 | |
504 reader[readableStreamReaderReadRequests].push(readRequest); | |
505 RequestReadableStreamPull(stream); | |
506 return readRequest; | |
507 } | |
508 } | |
509 | |
510 function RequestReadableStreamPull(stream) { | |
511 const shouldPull = ShouldReadableStreamPull(stream); | |
512 if (shouldPull === false) { | |
513 return undefined; | |
514 } | |
515 | |
516 if (stream[readableStreamBits] & PULLING) { | |
517 stream[readableStreamBits] |= PULL_AGAIN; | |
518 return undefined; | |
519 } | |
520 | |
521 stream[readableStreamBits] |= PULLING; | |
522 | |
523 const underlyingSource = stream[readableStreamUnderlyingSource]; | |
524 const controller = stream[readableStreamController]; | |
525 const pullPromise = PromiseCallOrNoop( | |
526 underlyingSource, 'pull', controller, 'underlyingSource.pull'); | |
527 | |
528 thenPromise(pullPromise, | |
529 () => { | |
530 stream[readableStreamBits] &= ~PULLING; | |
531 | |
532 if (stream[readableStreamBits] & PULL_AGAIN) { | |
533 stream[readableStreamBits] &= ~PULL_AGAIN; | |
534 return RequestReadableStreamPull(stream); | |
535 } | |
536 }, | |
537 e => { | |
538 if (stream[readableStreamState] === STATE_READABLE) { | |
539 return ErrorReadableStream(stream, e); | |
540 } | |
541 }); | |
542 } | |
543 | |
544 function ShouldReadableStreamPull(stream) { | |
545 const state = stream[readableStreamState]; | |
546 if (state === STATE_CLOSED || state === STATE_ERRORED) { | |
547 return false; | |
548 } | |
549 | |
550 if (stream[readableStreamBits] & CLOSE_REQUESTED) { | |
551 return false; | |
552 } | |
553 | |
554 if (!(stream[readableStreamBits] & STARTED)) { | |
555 return false; | |
556 } | |
557 | |
558 if (IsReadableStreamLocked(stream) === true) { | |
559 const reader = stream[readableStreamReader]; | |
560 const readRequests = reader[readableStreamReaderReadRequests]; | |
561 if (readRequests.length > 0) { | |
562 return true; | |
563 } | |
564 } | |
565 | |
566 const desiredSize = GetReadableStreamDesiredSize(stream); | |
567 if (desiredSize > 0) { | |
568 return true; | |
569 } | |
570 | |
571 return false; | |
572 } | |
573 | |
574 // Potential future optimization: use class instances for the underlying | |
575 // sources, so that we don't re-create | |
576 // closures every time. | |
577 | |
578 // TODO(domenic): shouldClone argument from spec not supported yet | |
579 function TeeReadableStream(stream) { | |
580 const reader = AcquireReadableStreamReader(stream); | |
581 | |
582 let closedOrErrored = false; | |
583 let canceled1 = false; | |
584 let canceled2 = false; | |
585 let reason1; | |
586 let reason2; | |
587 let promise = v8.createPromise(); | |
588 | |
589 const branch1 = new ReadableStream({pull, cancel: cancel1}); | |
590 | |
591 const branch2 = new ReadableStream({pull, cancel: cancel2}); | |
592 | |
593 thenPromise( | |
594 reader[readableStreamReaderClosedPromise], undefined, function(r) { | |
595 if (closedOrErrored === true) { | |
596 return; | |
597 } | |
598 | |
599 ErrorReadableStream(branch1, r); | |
600 ErrorReadableStream(branch2, r); | |
601 closedOrErrored = true; | |
602 }); | |
603 | |
604 return [branch1, branch2]; | |
605 | |
606 | |
607 function pull() { | |
608 return thenPromise( | |
609 ReadFromReadableStreamReader(reader), function(result) { | |
610 const value = result.value; | |
611 const done = result.done; | |
612 | |
613 if (done === true && closedOrErrored === false) { | |
614 CloseReadableStream(branch1); | |
615 CloseReadableStream(branch2); | |
616 closedOrErrored = true; | |
617 } | |
618 | |
619 if (closedOrErrored === true) { | |
620 return; | |
621 } | |
622 | |
623 if (canceled1 === false) { | |
624 EnqueueInReadableStream(branch1, value); | |
625 } | |
626 | |
627 if (canceled2 === false) { | |
628 EnqueueInReadableStream(branch2, value); | |
629 } | |
630 }); | |
631 } | |
632 | |
633 function cancel1(reason) { | |
634 canceled1 = true; | |
635 reason1 = reason; | |
636 | |
637 if (canceled2 === true) { | |
638 const compositeReason = [reason1, reason2]; | |
639 const cancelResult = CancelReadableStream(stream, compositeReason); | |
640 v8.resolvePromise(promise, cancelResult); | |
641 } | |
642 | |
643 return promise; | |
644 } | |
645 | |
646 function cancel2(reason) { | |
647 canceled2 = true; | |
648 reason2 = reason; | |
649 | |
650 if (canceled1 === true) { | |
651 const compositeReason = [reason1, reason2]; | |
652 const cancelResult = CancelReadableStream(stream, compositeReason); | |
653 v8.resolvePromise(promise, cancelResult); | |
654 } | |
655 | |
656 return promise; | |
657 } | |
658 } | |
659 | |
660 // | |
661 // Queue-with-sizes | |
662 // Modified from taking the queue (as in the spec) to taking the stream, so we | |
663 // can modify the queue size alongside. | |
664 // | |
665 | |
666 function DequeueValue(stream) { | |
667 const result = stream[readableStreamQueue].shift(); | |
668 stream[readableStreamQueueSize] -= result.size; | |
669 return result.value; | |
670 } | |
671 | |
672 function EnqueueValueWithSize(stream, value, size) { | |
673 size = Number(size); | |
674 if (Number_isNaN(size) || size === +Infinity || size < 0) { | |
675 throw new RangeError(errInvalidSize); | |
676 } | |
677 | |
678 stream[readableStreamQueueSize] += size; | |
679 stream[readableStreamQueue].push({value, size}); | |
680 } | |
681 | |
682 function GetTotalQueueSize(stream) { return stream[readableStreamQueueSize]; } | |
683 | |
684 // | |
685 // Other helpers | |
686 // | |
687 | |
688 function ValidateAndNormalizeQueuingStrategy(size, highWaterMark) { | |
689 if (size !== undefined && typeof size !== 'function') { | |
690 throw new TypeError(errSizeNotAFunction); | |
691 } | |
692 | |
693 highWaterMark = Number(highWaterMark); | |
694 if (Number_isNaN(highWaterMark)) { | |
695 throw new TypeError(errInvalidHWM); | |
696 } | |
697 if (highWaterMark < 0) { | |
698 throw new RangeError(errInvalidHWM); | |
699 } | |
700 | |
701 return {size, highWaterMark}; | |
702 } | |
703 | |
704 // Modified from InvokeOrNoop in spec | |
705 function CallOrNoop(O, P, arg, nameForError) { | |
706 const method = O[P]; | |
707 if (method === undefined) { | |
708 return undefined; | |
709 } | |
710 if (typeof method !== 'function') { | |
711 throw new TypeError(errTmplMustBeFunctionOrUndefined(nameForError)); | |
712 } | |
713 | |
714 return callFunction(method, O, arg); | |
715 } | |
716 | |
717 | |
718 // Modified from PromiseInvokeOrNoop in spec | |
719 function PromiseCallOrNoop(O, P, arg, nameForError) { | |
720 let method; | |
721 try { | |
722 method = O[P]; | |
723 } catch (methodE) { | |
724 return Promise_reject(methodE); | |
725 } | |
726 | |
727 if (method === undefined) { | |
728 return Promise_resolve(undefined); | |
729 } | |
730 | |
731 if (typeof method !== 'function') { | |
732 return Promise_reject(errTmplMustBeFunctionOrUndefined(nameForError)); | |
733 } | |
734 | |
735 try { | |
736 return Promise_resolve(callFunction(method, O, arg)); | |
737 } catch (e) { | |
738 return Promise_reject(e); | |
739 } | |
740 } | |
741 | |
742 function CreateIterResultObject(value, done) { return {value, done}; } | |
743 | |
744 | |
745 // | |
746 // Additions to the global | |
747 // | |
748 | |
749 defineProperty(global, 'ReadableStream', { | |
750 value: ReadableStream, | |
751 enumerable: false, | |
752 configurable: true, | |
753 writable: true | |
754 }); | |
755 | |
756 // | |
757 // Exports to Blink | |
758 // | |
759 | |
760 binding.IsReadableStreamDisturbed = IsReadableStreamDisturbed; | |
761 }); | |
OLD | NEW |