Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(116)

Side by Side Diff: third_party/WebKit/Source/core/streams/WritableStream.js

Issue 2453713003: Implementation of WritableStream (Closed)
Patch Set: Add missing return to promise_test Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « third_party/WebKit/LayoutTests/webexposed/global-interface-listing-shared-worker-expected.txt ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 // Implementation of WritableStream for Blink. See
6 // https://streams.spec.whatwg.org/#ws. The implementation closely follows the
7 // standard, except where required for performance or integration with Blink. In
8 // particular, classes, methods and abstract operations are implemented in the
9 // same order as in the standard, to simplify side-by-side reading.
10
11 (function(global, binding, v8) {
12 'use strict';
13
14 // Private symbols. These correspond to the internal slots in the standard.
15 // "[[X]]" in the standard is spelt _X in this implementation.
16 const _state = v8.createPrivateSymbol('[[state]]');
17 const _storedError = v8.createPrivateSymbol('[[storedError]]');
18 const _writer = v8.createPrivateSymbol('[[writer]]');
19 const _writableStreamController =
20 v8.createPrivateSymbol('[[writableStreamController]]');
21 const _writeRequests = v8.createPrivateSymbol('[[writeRequests]]');
22 const _closedPromise = v8.createPrivateSymbol('[[closedPromise]]');
23 const _ownerWritableStream =
24 v8.createPrivateSymbol('[[ownerWritableStream]]');
25 const _readyPromise = v8.createPrivateSymbol('[[readyPromise]]');
26 const _controlledWritableStream =
27 v8.createPrivateSymbol('[[controlledWritableStream]]');
28 const _queue = v8.createPrivateSymbol('[[queue]]');
29 const _queueSize = v8.createPrivateSymbol('[[queueSize]]');
30 const _strategyHWM = v8.createPrivateSymbol('[[strategyHWM]]');
31 const _strategySize = v8.createPrivateSymbol('[[strategySize]]');
32 const _underlyingSink = v8.createPrivateSymbol('[[underlyingSink]]');
33
34 // _defaultControllerFlags combines WritableStreamDefaultController's internal
35 // slots [[started]] and [[writing]] into a single bitmask for efficiency.
36 const _defaultControllerFlags =
37 v8.createPrivateSymbol('[[defaultControllerFlags]]');
38 const FLAG_STARTED = 0b1;
39 const FLAG_WRITING = 0b10;
40
41 // For efficiency, WritableStream [[state]] contains numeric values.
42 const WRITABLE = 0;
43 const CLOSING = 1;
44 const CLOSED = 2;
45 const ERRORED = 3;
46
47 // Javascript functions. It is important to use these copies, as the ones on
48 // the global object may have been overwritten. See "V8 Extras Design Doc",
49 // section "Security Considerations".
50 // https://docs.google.com/document/d/1AT5-T0aHGp7Lt29vPWFr2-qG8r3l9CByyvKwEuA 8Ec0/edit#heading=h.9yixony1a18r
51 const undefined = global.undefined;
52
53 const defineProperty = global.Object.defineProperty;
54 const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty);
55
56 const Function_call = v8.uncurryThis(global.Function.prototype.call);
57 const Function_apply = v8.uncurryThis(global.Function.prototype.apply);
58
59 const TypeError = global.TypeError;
60 const RangeError = global.RangeError;
61
62 const Boolean = global.Boolean;
63 const Number = global.Number;
64 const Number_isNaN = Number.isNaN;
65 const Number_isFinite = Number.isFinite;
66
67 const Promise = global.Promise;
68 const thenPromise = v8.uncurryThis(Promise.prototype.then);
69 const Promise_resolve = v8.simpleBind(Promise.resolve, Promise);
70 const Promise_reject = v8.simpleBind(Promise.reject, Promise);
71
72 // User-visible strings.
73 // TODO(ricea): Share strings with ReadableStream that are identical in both.
74 const errIllegalInvocation = 'Illegal invocation';
75 const errIllegalConstructor = 'Illegal constructor';
76 const errInvalidType = 'Invalid type is specified';
77 const errAbortLockedStream = 'Cannot abort a writable stream that is locked t o a writer';
78 const errStreamAborted = 'The stream has been aborted';
79 const errWriterLockReleasedPrefix = 'This writable stream writer has been rele ased and cannot be ';
80 const errCloseCloseRequestedStream =
81 'Cannot close a writable stream that has already been requested to be clos ed';
82 const errWriteCloseRequestedStream =
83 'Cannot write to a writable stream that is due to be closed';
84 const templateErrorCannotActionOnStateStream =
85 (action, state) => `Cannot ${action} a ${state} writable stream`;
86 const errReleasedWriterClosedPromise =
87 'This writable stream writer has been released and cannot be used to monit or the stream\'s state';
88 const templateErrorIsNotAFunction = f => `${f} is not a function`;
89 const errSizeNotAFunction =
90 'A queuing strategy\'s size property must be a function';
91 const errInvalidHWM =
92 'A queuing strategy\'s highWaterMark property must be a non-negative, non- NaN number';
93 const errInvalidSize =
94 'The return value of a queuing strategy\'s size function must be a finite, non-NaN, non-negative number';
95
96 // These verbs are used after errWriterLockReleasedPrefix
97 const verbUsedToGetTheDesiredSize = 'used to get the desiredSize';
98 const verbAborted = 'aborted';
99 const verbClosed = 'closed';
100 const verbWrittenTo = 'written to';
101
102 // Utility functions (not from the standard).
103 function createWriterLockReleasedError(verb) {
104 return new TypeError(errWriterLockReleasedPrefix + verb);
105 }
106
107 const stateNames = {[CLOSED]: 'closed', [ERRORED]: 'errored'};
108 function createCannotActionOnStateStreamError(action, state) {
109 TEMP_ASSERT(stateNames[state] !== undefined,
110 `name for state ${state} exists in stateNames`);
111 return new TypeError(
112 templateErrorCannotActionOnStateStream(action, stateNames[state]));
113 }
114
115 function setDefaultControllerFlag(controller, flag, value) {
116 let flags = controller[_defaultControllerFlags];
117 if (value) {
118 flags = flags | flag;
119 } else {
120 flags = flags & ~flag;
121 }
122 controller[_defaultControllerFlags] = flags;
123 }
124
125 function getDefaultControllerStartedFlag(controller) {
126 return Boolean(controller[_defaultControllerFlags] & FLAG_STARTED);
127 }
128
129 function setDefaultControllerStartedFlag(controller, value) {
130 setDefaultControllerFlag(controller, FLAG_STARTED, value);
131 }
132
133 function getDefaultControllerWritingFlag(controller) {
134 return Boolean(controller[_defaultControllerFlags] & FLAG_WRITING);
135 }
136
137 function setDefaultControllerWritingFlag(controller, value) {
138 setDefaultControllerFlag(controller, FLAG_WRITING, value);
139 }
140
141 function rejectPromises(array, e) {
142 // array is an InternalPackedArray so forEach won't work.
143 for (let i = 0; i < array.length; ++i) {
144 v8.rejectPromise(array[i], e);
145 }
146 }
147
148 // https://tc39.github.io/ecma262/#sec-ispropertykey
149 // TODO(ricea): Remove this when the asserts using it are removed.
150 function IsPropertyKey(argument) {
151 return typeof argument === 'string' || typeof argument === 'symbol';
152 }
153
154 // TODO(ricea): Remove all asserts once the implementation has stabilised.
155 function TEMP_ASSERT(predicate, message) {
156 if (predicate) {
157 return;
158 }
159 v8.log(`Assertion failed: ${message}\n`);
160 v8.logStackTrace();
161 class WritableStreamInternalError {
162 }
163 throw new WritableStreamInternalError();
164 }
165
166 class WritableStream {
167 constructor(underlyingSink = {}, { size, highWaterMark = 1 } = {}) {
168 this[_state] = WRITABLE;
169 this[_storedError] = undefined;
170 this[_writer] = undefined;
171 this[_writableStreamController] = undefined;
172 this[_writeRequests] = new v8.InternalPackedArray();
173 const type = underlyingSink.type;
174 if (type !== undefined) {
175 throw new RangeError(errInvalidType);
176 }
177 this[_writableStreamController] =
178 new WritableStreamDefaultController(this, underlyingSink, size,
179 highWaterMark);
180 }
181
182 get locked() {
183 if (!IsWritableStream(this)) {
184 throw new TypeError(errIllegalInvocation);
185 }
186 return IsWritableStreamLocked(this);
187 }
188
189 abort(reason) {
190 if (!IsWritableStream(this)) {
191 return Promise_reject(new TypeError(errIllegalInvocation));
192 }
193 if (IsWritableStreamLocked(this)) {
194 return Promise_reject(new TypeError(errAbortLockedStream));
195 }
196 return WritableStreamAbort(this, reason);
197 }
198
199 getWriter() {
200 if (!IsWritableStream(this)) {
201 throw new TypeError(errIllegalInvocation);
202 }
203 return AcquireWritableStreamDefaultWriter(this);
204 }
205 }
206
207 // General Writable Stream Abstract Operations
208
209 function AcquireWritableStreamDefaultWriter(stream) {
210 return new WritableStreamDefaultWriter(stream);
211 }
212
213 function IsWritableStream(x) {
214 return hasOwnProperty(x, _writableStreamController);
215 }
216
217 function IsWritableStreamLocked(stream) {
218 TEMP_ASSERT(IsWritableStream(stream),
219 '! IsWritableStream(stream) is true.');
220 return stream[_writer] !== undefined;
221 }
222
223 function WritableStreamAbort(stream, reason) {
224 const state = stream[_state];
225 if (state === CLOSED) {
226 return Promise_resolve(undefined);
227 }
228 if (state === ERRORED) {
229 return Promise_reject(stream[_storedError]);
230 }
231 TEMP_ASSERT(state === WRITABLE || state === CLOSING,
232 'state is "writable" or "closing".');
233 const error = new TypeError(errStreamAborted);
234 WritableStreamError(stream, error);
235 return WritableStreamDefaultControllerAbort(
236 stream[_writableStreamController], reason);
237 }
238
239 // Writable Stream Abstract Operations Used by Controllers
240
241 function WritableStreamAddWriteRequest(stream) {
242 TEMP_ASSERT(IsWritableStreamLocked(stream),
243 '! IsWritableStreamLocked(writer) is true.');
244 TEMP_ASSERT(stream[_state] === WRITABLE,
245 'stream.[[state]] is "writable".');
246 const promise = v8.createPromise();
247 stream[_writeRequests].push(promise);
248 return promise;
249 }
250
251 function WritableStreamError(stream, e) {
252 const state = stream[_state];
253 TEMP_ASSERT(state === WRITABLE || state === CLOSING,
254 'state is "writable" or "closing".');
255 rejectPromises(stream[_writeRequests], e);
256 stream[_writeRequests] = new v8.InternalPackedArray();
257 const writer = stream[_writer];
258 if (writer !== undefined) {
259 v8.rejectPromise(writer[_closedPromise], e);
260 if (state === WRITABLE &&
261 WritableStreamDefaultControllerGetBackpressure(
262 stream[_writableStreamController])) {
263 v8.rejectPromise(writer[_readyPromise], e);
264 } else {
265 writer[_readyPromise] = Promise_reject(e);
266 }
267 }
268 stream[_state] = ERRORED;
269 stream[_storedError] = e;
270 }
271
272 function WritableStreamFinishClose(stream) {
273 TEMP_ASSERT(stream[_state] === CLOSING,
274 'stream.[[state]] is "closing".');
275 TEMP_ASSERT(stream[_writer] !== undefined,
276 'stream.[[writer]] is not undefined.');
277 stream[_state] = CLOSED;
278 v8.resolvePromise(stream[_writer][_closedPromise], undefined);
279 }
280
281 function WritableStreamFulfillWriteRequest(stream) {
282 TEMP_ASSERT(stream[_writeRequests].length !== 0,
283 'stream.[[writeRequests]] is not empty.');
284 const writeRequest = stream[_writeRequests].shift();
285 v8.resolvePromise(writeRequest, undefined);
286 }
287
288 function WritableStreamUpdateBackpressure(stream, backpressure) {
289 TEMP_ASSERT(stream[_state] === WRITABLE,
290 'stream.[[state]] is "writable".');
291 const writer = stream[_writer];
292 if (writer === undefined) {
293 return;
294 }
295 if (backpressure) {
296 writer[_readyPromise] = v8.createPromise();
297 } else {
298 TEMP_ASSERT(backpressure === false,
299 'backpressure is false.');
300 v8.resolvePromise(writer[_readyPromise], undefined);
301 }
302 }
303
304 class WritableStreamDefaultWriter {
305 constructor(stream) {
306 if (!IsWritableStream(stream)) {
307 throw new TypeError(errIllegalConstructor);
308 }
309 if (IsWritableStreamLocked(stream)) {
310 throw new TypeError(errIllegalConstructor);
311 }
312 this[_ownerWritableStream] = stream;
313 stream[_writer] = this;
314 const state = stream[_state];
315 if (state === WRITABLE || state === CLOSING) {
316 this[_closedPromise] = v8.createPromise();
317 } else if (state === CLOSED) {
318 this[_closedPromise] = Promise_resolve(undefined);
319 } else {
320 TEMP_ASSERT(state === ERRORED,
321 'state is "errored".');
322 this[_closedPromise] = Promise_reject(stream[_storedError]);
323 }
324 if (state === WRITABLE &&
325 WritableStreamDefaultControllerGetBackpressure(
326 stream[_writableStreamController])) {
327 this[_readyPromise] = v8.createPromise();
328 } else {
329 this[_readyPromise] = Promise_resolve(undefined);
330 }
331 }
332
333 get closed() {
334 if (!IsWritableStreamDefaultWriter(this)) {
335 return Promise_reject(new TypeError(errIllegalInvocation));
336 }
337 return this[_closedPromise];
338 }
339
340 get desiredSize() {
341 if (!IsWritableStreamDefaultWriter(this)) {
342 throw new TypeError(errIllegalInvocation);
343 }
344 if (this[_ownerWritableStream] === undefined) {
345 throw createWriterLockReleasedError(verbUsedToGetTheDesiredSize);
346 }
347 return WritableStreamDefaultWriterGetDesiredSize(this);
348 }
349
350 get ready() {
351 if (!IsWritableStreamDefaultWriter(this)) {
352 return Promise_reject(new TypeError(errIllegalInvocation));
353 }
354 return this[_readyPromise];
355 }
356
357 abort(reason) {
358 if (!IsWritableStreamDefaultWriter(this)) {
359 return Promise_reject(new TypeError(errIllegalInvocation));
360 }
361 if (this[_ownerWritableStream] === undefined) {
362 return Promise_reject(createWriterLockReleasedError(verbAborted));
363 }
364 return WritableStreamDefaultWriterAbort(this, reason);
365 }
366
367 close() {
368 if (!IsWritableStreamDefaultWriter(this)) {
369 return Promise_reject(new TypeError(errIllegalInvocation));
370 }
371 const stream = this[_ownerWritableStream];
372 if (stream === undefined) {
373 return Promise_reject(createWriterLockReleasedError(verbClosed));
374 }
375 if (stream[_state] === CLOSING) {
376 return Promise_reject(new TypeError(errCloseCloseRequestedStream));
377 }
378 return WritableStreamDefaultWriterClose(this);
379 }
380
381 releaseLock() {
382 if (!IsWritableStreamDefaultWriter(this)) {
383 throw new TypeError(errIllegalInvocation);
384 }
385 const stream = this[_ownerWritableStream];
386 if (stream === undefined) {
387 return;
388 }
389 TEMP_ASSERT(stream[_writer] !== undefined,
390 'stream.[[writer]] is not undefined.');
391 WritableStreamDefaultWriterRelease(this);
392 }
393
394 write(chunk) {
395 if (!IsWritableStreamDefaultWriter(this)) {
396 return Promise_reject(new TypeError(errIllegalInvocation));
397 }
398 const stream = this[_ownerWritableStream];
399 if (stream === undefined) {
400 return Promise_reject(createWriterLockReleasedError(verbWrittenTo));
401 }
402 if (stream[_state] === CLOSING) {
403 return Promise_reject(new TypeError(errWriteCloseRequestedStream));
404 }
405 return WritableStreamDefaultWriterWrite(this, chunk);
406 }
407 }
408
409 // Writable Stream Writer Abstract Operations
410
411 function IsWritableStreamDefaultWriter(x) {
412 return hasOwnProperty(x, _ownerWritableStream);
413 }
414
415 function WritableStreamDefaultWriterAbort(writer, reason) {
416 const stream = writer[_ownerWritableStream];
417 TEMP_ASSERT(stream !== undefined,
418 'stream is not undefined.');
419 return WritableStreamAbort(stream, reason);
420 }
421
422 function WritableStreamDefaultWriterClose(writer) {
423 const stream = writer[_ownerWritableStream];
424 TEMP_ASSERT(stream !== undefined,
425 'stream is not undefined.');
426 const state = stream[_state];
427 if (state === CLOSED || state === ERRORED) {
428 return Promise_reject(
429 createCannotActionOnStateStreamError('close', state));
430 }
431 TEMP_ASSERT(state === WRITABLE,
432 'state is "writable".');
433 const promise = WritableStreamAddWriteRequest(stream);
434 if (WritableStreamDefaultControllerGetBackpressure(
435 stream[_writableStreamController])) {
436 v8.resolvePromise(writer[_readyPromise], undefined);
437 }
438 stream[_state] = CLOSING;
439 WritableStreamDefaultControllerClose(stream[_writableStreamController]);
440 return promise;
441 }
442
443 function WritableStreamDefaultWriterGetDesiredSize(writer) {
444 const stream = writer[_ownerWritableStream];
445 const state = stream[_state];
446 if (state === ERRORED) {
447 return null;
448 }
449 if (state === CLOSED) {
450 return 0;
451 }
452 return WritableStreamDefaultControllerGetDesiredSize(
453 stream[_writableStreamController]);
454 }
455
456 function WritableStreamDefaultWriterRelease(writer) {
457 const stream = writer[_ownerWritableStream];
458 TEMP_ASSERT(stream !== undefined,
459 'stream is not undefined.');
460 TEMP_ASSERT(stream[_writer] === writer,
461 'stream.[[writer]] is writer.');
462 const releasedError = new TypeError(errReleasedWriterClosedPromise);
463 const state = stream[_state];
464 if (state === WRITABLE || state === CLOSING) {
465 v8.rejectPromise(writer[_closedPromise], releasedError);
466 } else {
467 writer[_closedPromise] = Promise_reject(releasedError);
468 }
469 if (state === WRITABLE &&
470 WritableStreamDefaultControllerGetBackpressure(
471 stream[_writableStreamController])) {
472 v8.rejectPromise(writer[_readyPromise], releasedError);
473 } else {
474 writer[_readyPromise] = Promise_reject(releasedError);
475 }
476 stream[_writer] = undefined;
477 writer[_ownerWritableStream] = undefined;
478 }
479
480 function WritableStreamDefaultWriterWrite(writer, chunk) {
481 const stream = writer[_ownerWritableStream];
482 TEMP_ASSERT(stream !== undefined,
483 'stream is not undefined.');
484 const state = stream[_state];
485 if (state === CLOSED || state === ERRORED) {
486 return Promise_reject(
487 createCannotActionOnStateStreamError('write to', state));
488 }
489 TEMP_ASSERT(state === WRITABLE,
490 'state is "writable".');
491 const promise = WritableStreamAddWriteRequest(stream);
492 WritableStreamDefaultControllerWrite(stream[_writableStreamController],
493 chunk);
494 return promise;
495 }
496
497 class WritableStreamDefaultController {
498 constructor(stream, underlyingSink, size, highWaterMark) {
499 if (!IsWritableStream(stream)) {
500 throw new TypeError(errIllegalConstructor);
501 }
502 if (stream[_controlledWritableStream] !== undefined) {
503 throw new TypeError(errIllegalConstructor);
504 }
505 this[_controlledWritableStream] = stream;
506 this[_underlyingSink] = underlyingSink;
507 this[_queue] = new v8.InternalPackedArray();
508 this[_queueSize] = 0;
509 this[_defaultControllerFlags] = 0;
510 const normalizedStrategy =
511 ValidateAndNormalizeQueuingStrategy(size, highWaterMark);
512 this[_strategySize] = normalizedStrategy.size;
513 this[_strategyHWM] = normalizedStrategy.highWaterMark;
514 const backpressure = WritableStreamDefaultControllerGetBackpressure(this);
515 if (backpressure) {
516 WritableStreamUpdateBackpressure(stream, backpressure);
517 }
518 const controller = this;
519 const startResult = InvokeOrNoop(underlyingSink, 'start', [this]);
520 const onFulfilled = () => {
521 setDefaultControllerStartedFlag(controller, true);
522 WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
523 };
524 const onRejected = r => {
525 WritableStreamDefaultControllerErrorIfNeeded(controller, r);
526 };
527 thenPromise(Promise_resolve(startResult), onFulfilled, onRejected);
528 }
529
530 error(e) {
531 if (!IsWritableStreamDefaultController(this)) {
532 throw new TypeError(errIllegalInvocation);
533 }
534 const state = this[_controlledWritableStream][_state];
535 if (state === CLOSED || state === ERRORED) {
536 throw createCannotActionOnStateStreamError('error', state);
537 }
538 WritableStreamDefaultControllerError(this, e);
539 }
540 }
541
542 // Writable Stream Default Controller Abstract Operations
543
544 function IsWritableStreamDefaultController(x) {
545 return hasOwnProperty(x, _underlyingSink);
546 }
547
548 function WritableStreamDefaultControllerAbort(controller, reason) {
549 controller[_queue] = v8.InternalPackedArray();
550 controller[_queueSize] = 0;
551 const sinkAbortPromise =
552 PromiseInvokeOrFallbackOrNoop(controller[_underlyingSink],
553 'abort', [reason], 'close', [controller]);
554 return thenPromise(sinkAbortPromise, () => undefined);
555 }
556
557 function WritableStreamDefaultControllerClose(controller) {
558 EnqueueValueWithSizeForController(controller, 'close', 0);
559 WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
560 }
561
562 function WritableStreamDefaultControllerGetDesiredSize(controller) {
563 const queueSize = GetTotalQueueSizeForController(controller);
564 return controller[_strategyHWM] - queueSize;
565 }
566
567 function WritableStreamDefaultControllerWrite(controller, chunk) {
568 const stream = controller[_controlledWritableStream];
569 TEMP_ASSERT(stream[_state] === WRITABLE,
570 'stream.[[state]] is "writable".');
571 let chunkSize = 1;
572 if (controller[_strategySize] !== undefined) {
573 try {
574 chunkSize = Function_call(controller[_strategySize], undefined, chunk);
575 } catch (e) {
576 WritableStreamDefaultControllerErrorIfNeeded(controller, e);
577 return Promise_reject(e);
578 }
579 }
580 const writeRecord = {chunk};
581 const lastBackpressure =
582 WritableStreamDefaultControllerGetBackpressure(controller);
583 try {
584 const enqueueResult =
585 EnqueueValueWithSizeForController(controller, writeRecord, chunkSize);
586 } catch (e) {
587 WritableStreamDefaultControllerErrorIfNeeded(controller, e);
588 return Promise_reject(e);
589 }
590 if (stream[_state] === WRITABLE) {
591 const backpressure =
592 WritableStreamDefaultControllerGetBackpressure(controller);
593 if (lastBackpressure !== backpressure) {
594 WritableStreamUpdateBackpressure(stream, backpressure);
595 }
596 }
597 WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
598 }
599
600 function WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller) {
601 const state = controller[_controlledWritableStream][_state];
602 if (state === CLOSED || state === ERRORED) {
603 return;
604 }
605 if (!getDefaultControllerStartedFlag(controller)) {
606 return;
607 }
608 if (getDefaultControllerWritingFlag(controller)) {
609 return;
610 }
611 if (controller[_queue].length === 0) {
612 return;
613 }
614 const writeRecord = PeekQueueValue(controller[_queue]);
615 if (writeRecord === 'close') {
616 WritableStreamDefaultControllerProcessClose(controller);
617 } else {
618 WritableStreamDefaultControllerProcessWrite(controller,
619 writeRecord.chunk);
620 }
621 }
622
623 function WritableStreamDefaultControllerErrorIfNeeded(controller, e) {
624 const state = controller[_controlledWritableStream][_state];
625 if (state === WRITABLE || state === CLOSING) {
626 WritableStreamDefaultControllerError(controller, e);
627 }
628 }
629
630 function WritableStreamDefaultControllerProcessClose(controller) {
631 const stream = controller[_controlledWritableStream];
632 TEMP_ASSERT(stream[_state] === CLOSING,
633 'stream.[[state]] is "closing".');
634 DequeueValueForController(controller);
635 TEMP_ASSERT(controller[_queue].length === 0,
636 'controller.[[queue]] is empty.');
637 const sinkClosePromise = PromiseInvokeOrNoop(controller[_underlyingSink],
638 'close', [controller]);
639 thenPromise(sinkClosePromise,
640 () => {
641 if (stream[_state] !== CLOSING) {
642 return;
643 }
644 WritableStreamFulfillWriteRequest(stream);
645 WritableStreamFinishClose(stream);
646 },
647 r => WritableStreamDefaultControllerErrorIfNeeded(controller, r)
648 );
649 }
650
651 function WritableStreamDefaultControllerProcessWrite(controller, chunk) {
652 setDefaultControllerWritingFlag(controller, true);
653 const sinkWritePromise = PromiseInvokeOrNoop(controller[_underlyingSink],
654 'write', [chunk, controller]);
655 thenPromise(
656 sinkWritePromise,
657 () => {
658 const stream = controller[_controlledWritableStream];
659 const state = stream[_state];
660 if (state === ERRORED || state === CLOSED) {
661 return;
662 }
663 setDefaultControllerWritingFlag(controller, false);
664 WritableStreamFulfillWriteRequest(stream);
665 const lastBackpressure =
666 WritableStreamDefaultControllerGetBackpressure(controller);
667 DequeueValueForController(controller);
668 if (state !== CLOSING) {
669 const backpressure =
670 WritableStreamDefaultControllerGetBackpressure(controller);
671 if (lastBackpressure !== backpressure) {
672 WritableStreamUpdateBackpressure(
673 controller[_controlledWritableStream], backpressure);
674 }
675 }
676 WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
677 },
678 r => WritableStreamDefaultControllerErrorIfNeeded(controller, r)
679 );
680 }
681
682 function WritableStreamDefaultControllerGetBackpressure(controller) {
683 const desiredSize =
684 WritableStreamDefaultControllerGetDesiredSize(controller);
685 return desiredSize <= 0;
686 }
687
688 function WritableStreamDefaultControllerError(controller, e) {
689 const stream = controller[_controlledWritableStream];
690 const state = stream[_state];
691 TEMP_ASSERT(state === WRITABLE || state === CLOSING,
692 'stream.[[state]] is "writable" or "closing".');
693 WritableStreamError(stream, e);
694 controller[_queue] = new v8.InternalPackedArray();
695 controller[_queueSize] = 0;
696 }
697
698 // Queue-with-Sizes Operations
699 //
700 // These differ from the versions in the standard: they take a controller
701 // argument in order to cache the total queue size. This is necessary to avoid
702 // O(N^2) behaviour.
703 //
704 // TODO(ricea): Share these operations with ReadableStream.js.
705 function DequeueValueForController(controller) {
706 TEMP_ASSERT(controller[_queue].length !== 0,
707 'queue is not empty.');
708 const result = controller[_queue].shift();
709 controller[_queueSize] -= result.size;
710 return result.value;
711 }
712
713 function EnqueueValueWithSizeForController(controller, value, size) {
714 size = Number(size);
715 if (!IsFiniteNonNegativeNumber(size)) {
716 throw new RangeError(errInvalidSize);
717 }
718
719 controller[_queueSize] += size;
720 controller[_queue].push({value, size});
721 }
722
723 function GetTotalQueueSizeForController(controller) {
724 return controller[_queueSize];
725 }
726
727 function PeekQueueValue(queue) {
728 TEMP_ASSERT(queue.length !== 0,
729 'queue is not empty.');
730 return queue[0].value;
731 }
732
733 // Miscellaneous Operations
734
735 // This differs from "CallOrNoop" in the ReadableStream implementation in
736 // that it takes the arguments as an array, so that multiple arguments can be
737 // passed.
738 //
739 // TODO(ricea): Consolidate with ReadableStream implementation.
740 function InvokeOrNoop(O, P, args) {
741 TEMP_ASSERT(IsPropertyKey(P),
742 'P is a valid property key.');
743 if (args === undefined) {
744 args = [];
745 }
746 const method = O[P];
747 if (method === undefined) {
748 return undefined;
749 }
750 if (typeof method !== 'function') {
751 throw new TypeError(templateErrorIsNotAFunction(P));
752 }
753 return Function_apply(method, O, args);
754 }
755
756 function IsFiniteNonNegativeNumber(v) {
757 return Number_isFinite(v) && v >= 0;
758 }
759
760 function PromiseInvokeOrFallbackOrNoop(O, P1, args1, P2, args2) {
761 TEMP_ASSERT(IsPropertyKey(P1),
762 'P1 is a valid property key.');
763 TEMP_ASSERT(IsPropertyKey(P2),
764 'P2 is a valid property key.');
765 try {
766 const method = O[P1];
767 if (method === undefined) {
768 return PromiseInvokeOrNoop(O, P2, args2);
769 }
770 if (typeof method !== 'function') {
771 return Promise_reject(new TypeError(templateErrorIsNotAFunction(P1)));
772 }
773 return Promise_resolve(Function_apply(method, O, args1));
774 } catch (e) {
775 return Promise_reject(e);
776 }
777 }
778
779 function PromiseInvokeOrNoop(O, P, args) {
780 try {
781 return Promise_resolve(InvokeOrNoop(O, P, args));
782 } catch (e) {
783 return Promise_reject(e);
784 }
785 }
786
787 // TODO(ricea): Share this operation with ReadableStream.js.
788 function ValidateAndNormalizeQueuingStrategy(size, highWaterMark) {
789 if (size !== undefined && typeof size !== 'function') {
790 throw new TypeError(errSizeNotAFunction);
791 }
792
793 highWaterMark = Number(highWaterMark);
794 if (Number_isNaN(highWaterMark)) {
795 throw new TypeError(errInvalidHWM);
796 }
797 if (highWaterMark < 0) {
798 throw new RangeError(errInvalidHWM);
799 }
800
801 return {size, highWaterMark};
802 }
803
804 //
805 // Additions to the global object
806 //
807
808 defineProperty(global, 'WritableStream', {
809 value: WritableStream,
810 enumerable: false,
811 configurable: true,
812 writable: true
813 });
814
815 // TODO(ricea): Exports to Blink
816 });
OLDNEW
« no previous file with comments | « third_party/WebKit/LayoutTests/webexposed/global-interface-listing-shared-worker-expected.txt ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698