Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1107)

Side by Side Diff: third_party/WebKit/Source/core/streams/WritableStream.js

Issue 2453713003: Implementation of WritableStream (Closed)
Patch Set: Many fixes from tyoshino review Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « third_party/WebKit/LayoutTests/webexposed/global-interface-listing-shared-worker-expected.txt ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 // Implementation of WritableStream for Blink. See
6 // https://streams.spec.whatwg.org/#ws. The implementation closely follows the
7 // standard, except where required for performance or integration with Blink. In
8 // particular, classes, methods and abstract operations are implemented in the
9 // same order as in the standard, to simplify side-by-side reading.
10
11 (function(global, binding, v8) {
12 'use strict';
13
14 // Private symbols. These correspond to the internal slots in the standard.
15 // "[[X]]" in the standard is spelt _X in this implementation.
domenic 2016/11/04 17:32:17 This is nicer than ReadableStream.js. TODO/follow-
Adam Rice 2016/11/05 00:49:22 Filed http://crbug.com/662530
16 const _state = v8.createPrivateSymbol('[[state]]');
17 const _storedError = v8.createPrivateSymbol('[[storedError]]');
18 const _writer = v8.createPrivateSymbol('[[writer]]');
19 const _writableStreamController =
20 v8.createPrivateSymbol('[[writableStreamController]]');
21 const _writeRequests = v8.createPrivateSymbol('[[writeRequests]]');
22 const _closedPromise = v8.createPrivateSymbol('[[closedPromise]]');
23 const _ownerWritableStream =
24 v8.createPrivateSymbol('[[ownerWritableStream]]');
25 const _readyPromise = v8.createPrivateSymbol('[[readyPromise]]');
26 const _controlledWritableStream =
27 v8.createPrivateSymbol('[[controlledWritableStream]]');
28 const _queue = v8.createPrivateSymbol('[[queue]]');
29 const _queueSize = v8.createPrivateSymbol('[[queueSize]]');
30 const _started = v8.createPrivateSymbol('[[started]]');
31 const _strategyHWM = v8.createPrivateSymbol('[[strategyHWM]]');
32 const _strategySize = v8.createPrivateSymbol('[[strategySize]]');
33 const _underlyingSink = v8.createPrivateSymbol('[[underlyingSink]]');
34 const _writing = v8.createPrivateSymbol('[[writing]]');
domenic 2016/11/04 17:32:17 This seems unused given the flags. Same for _start
Adam Rice 2016/11/05 00:49:22 Ah. I was careful not to add these symbols, and th
35
36 // _defaultControllerFlags combines WritableStreamDefaultController's internal
37 // slots [[started]] and [[writing]] into a single bitmask for efficiency.
38 const _defaultControllerFlags =
39 v8.createPrivateSymbol('[[defaultControllerFlags]]');
40 const FLAG_STARTED = 0b1;
41 const FLAG_WRITING = 0b10;
42
43 // For efficiency, WritableStream [[state]] contains numeric values.
44 const WRITABLE = 0;
45 const CLOSING = 1;
46 const CLOSED = 2;
47 const ERRORED = 3;
48
49 // Javascript functions. It is important to use these copies, as the ones on
50 // the global object may have been overwritten. See "V8 Extras Design Doc",
51 // section "Security Considerations".
52 // https://docs.google.com/document/d/1AT5-T0aHGp7Lt29vPWFr2-qG8r3l9CByyvKwEuA 8Ec0/edit#heading=h.9yixony1a18r
53 const undefined = global.undefined;
54
55 const defineProperty = global.Object.defineProperty;
56 const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty);
57
58 const Function_call = v8.uncurryThis(global.Function.prototype.call);
59 const Function_apply = v8.uncurryThis(global.Function.prototype.apply);
60
61 const TypeError = global.TypeError;
62 const RangeError = global.RangeError;
63
64 const Boolean = global.Boolean;
65 const Number = global.Number;
66 const Number_isNaN = Number.isNaN;
67 const Number_isFinite = Number.isFinite;
68
69 const Promise = global.Promise;
70 const thenPromise = v8.uncurryThis(Promise.prototype.then);
71 const Promise_resolve = v8.simpleBind(Promise.resolve, Promise);
72 const Promise_reject = v8.simpleBind(Promise.reject, Promise);
73
74 // User-visible strings. Many of these should be shared with ReadableStream.
domenic 2016/11/04 17:32:17 Add a TODO or follow-up issue? Or do the refactori
Adam Rice 2016/11/05 00:49:22 It is sufficient just to export the symbols from s
75 const errIllegalInvocation = 'Illegal invocation';
76 const errIllegalConstructor = 'Illegal constructor';
77 const errInvalidType = 'Invalid type is specified';
78 const errAbortLockedStream = 'Cannot abort a writable stream that is locked t o a writer';
79 const errStreamAborted = 'The stream has been aborted';
80 const errWriterLockReleasedPrefix = 'This writable stream writer has been rele ased and cannot be ';
81 const errCloseCloseRequestedStream =
82 'Cannot close a writable stream that has already been requested to be clos ed';
83 const errWriteCloseRequestedStream =
84 'Cannot write to a writable stream that is due to be closed';
85 const templateErrorCannotActionOnStateStream =
86 (action, state) => `Cannot ${action} a ${state} writable stream`;
87 const errReleasedWriterClosedPromise =
88 'This writable stream writer has been released and cannot be used to monit or the stream\'s state';
89 const templateErrorIsNotAFunction = f => `${f} is not a function`;
90 const errSizeNotAFunction =
91 'A queuing strategy\'s size property must be a function';
92 const errInvalidHWM =
93 'A queuing strategy\'s highWaterMark property must be a non-negative, non- NaN number';
94 const errInvalidSize =
95 'The return value of a queuing strategy\'s size function must be a finite, non-NaN, non-negative number';
96
97 // These verbs are used after errWriterLockReleasedPrefix
98 const verbUsedToGetTheDesiredSize = 'used to get the desiredSize';
99 const verbAborted = 'aborted';
100 const verbClosed = 'closed';
101 const verbWrittenTo = 'written to';
102
103 // Utility functions (not from the standard).
104 function createWriterLockReleasedError(verb) {
105 return new TypeError(errWriterLockReleasedPrefix + verb);
106 }
107
108 const stateNames = {CLOSED: 'closed', ERRORED: 'errored'};
109 function createCannotActionOnStateStreamError(action, state) {
110 TEMP_ASSERT(stateNames[state] !== undefined,
111 'state name exists in stateNames');
112 return new TypeError(
113 templateErrorCannotActionOnStateStream(action, stateNames[state]));
114 }
115
116 function setDefaultControllerFlag(controller, flag, value) {
117 let flags = controller[_defaultControllerFlags];
118 if (value) {
119 flags = flags | flag;
120 } else {
121 flags = flags & ~flag;
122 }
123 controller[_defaultControllerFlags] = flags;
124 }
125
126 function getDefaultControllerStartedFlag(controller) {
127 return Boolean(controller[_defaultControllerFlags] & FLAG_STARTED);
128 }
129
130 function setDefaultControllerStartedFlag(controller, value) {
131 setDefaultControllerFlag(controller, FLAG_STARTED, value);
132 }
133
134 function getDefaultControllerWritingFlag(controller) {
135 return Boolean(controller[_defaultControllerFlags] & FLAG_WRITING);
136 }
137
138 function setDefaultControllerWritingFlag(controller, value) {
139 setDefaultControllerFlag(controller, FLAG_WRITING, value);
140 }
141
142 function rejectPromises(array, e) {
143 // array is an InternalPackedArray so forEach won't work.
144 for (let i = 0; i < array.length; ++i) {
145 v8.rejectPromise(array[i], e);
146 }
147 }
148
149 // https://tc39.github.io/ecma262/#sec-ispropertykey
150 function IsPropertyKey(argument) {
domenic 2016/11/04 17:32:17 Also add a TODO to remove this since it's only use
Adam Rice 2016/11/05 00:49:22 Done.
151 return typeof argument === 'string' || typeof argument === 'symbol';
152 }
153
154 // TODO(ricea): Remove all asserts once the implementation has stabilised.
155 function TEMP_ASSERT(predicate, message) {
domenic 2016/11/04 17:32:17 I wonder if we should consider adding something to
Adam Rice 2016/11/05 00:49:22 I would like that. I have filed issue 662542.
156 if (predicate) {
157 return;
158 }
159 v8.log(`Assertion failed: ${message}\n`);
160 v8.logStackTrace();
161 class WritableStreamInternalError {
162 }
163 throw new WritableStreamInternalError();
164 }
165
166 class WritableStream {
167 constructor(underlyingSink = {}, { size, highWaterMark = 1 } = {}) {
168 this[_state] = WRITABLE;
169 this[_storedError] = undefined;
170 this[_writer] = undefined;
171 this[_writableStreamController] = undefined;
172 this[_writeRequests] = new v8.InternalPackedArray();
173 const type = underlyingSink.type;
174 if (type !== undefined) {
175 throw new RangeError(errInvalidType);
176 }
177 this[_writableStreamController] =
178 new WritableStreamDefaultController(this, underlyingSink, size,
179 highWaterMark);
180 }
181
182 get locked() {
183 if (!IsWritableStream(this)) {
184 throw new TypeError(errIllegalInvocation);
185 }
186 return IsWritableStreamLocked(this);
187 }
188
189 abort(reason) {
190 if (!IsWritableStream(this)) {
191 return Promise_reject(new TypeError(errIllegalInvocation));
192 }
193 if (IsWritableStreamLocked(this)) {
194 return Promise_reject(new TypeError(errAbortLockedStream));
195 }
196 return WritableStreamAbort(this, reason);
197 }
198
199 getWriter() {
200 if (!IsWritableStream(this)) {
201 throw new TypeError(errIllegalInvocation);
202 }
203 return AcquireWritableStreamDefaultWriter(this);
204 }
205 }
206
207 // General Writable Stream Abstract Operations
208
209 function AcquireWritableStreamDefaultWriter(stream) {
210 return new WritableStreamDefaultWriter(stream);
211 }
212
213 function IsWritableStream(x) {
214 return hasOwnProperty(x, _writableStreamController);
215 }
216
217 function IsWritableStreamLocked(stream) {
218 TEMP_ASSERT(IsWritableStream(stream),
219 '! IsWritableStream(stream) is true.');
220 return stream[_writer] !== undefined;
221 }
222
223 function WritableStreamAbort(stream, reason) {
224 const state = stream[_state];
225 if (state === CLOSED) {
226 return Promise_resolve(undefined);
227 }
228 if (state === ERRORED) {
229 return Promise_reject(stream[_storedError]);
230 }
231 TEMP_ASSERT(state === WRITABLE || state === CLOSING,
232 'state is "writable" or "closing".');
233 const error = new TypeError(errStreamAborted);
234 WritableStreamError(stream, error);
235 return WritableStreamDefaultControllerAbort(
236 stream[_writableStreamController], reason);
237 }
238
239 // Writable Stream Abstract Operations Used by Controllers
240
241 function WritableStreamAddWriteRequest(stream) {
242 TEMP_ASSERT(IsWritableStreamLocked(stream),
243 '! IsWritableStreamLocked(writer) is true.');
244 TEMP_ASSERT(stream[_state] === WRITABLE,
245 'stream.[[state]] is "writable".');
246 const promise = v8.createPromise();
247 stream[_writeRequests].push(promise);
248 return promise;
249 }
250
251 function WritableStreamError(stream, e) {
252 const state = stream[_state];
253 TEMP_ASSERT(state === WRITABLE || state === CLOSING,
254 'state is "writable" or "closing".');
255 rejectPromises(stream[_writeRequests], e);
256 stream[_writeRequests] = new v8.InternalPackedArray();
257 const writer = stream[_writer];
258 if (writer !== undefined) {
259 v8.rejectPromise(writer[_closedPromise], e);
260 if (state === WRITABLE &&
261 WritableStreamDefaultControllerGetBackpressure(
262 stream[_writableStreamController])) {
domenic 2016/11/04 17:32:17 I assume these slightly-questionable formatting ch
Adam Rice 2016/11/05 00:49:22 Nope, it's all on me. I actually think this is the
263 v8.rejectPromise(writer[_readyPromise], e);
264 } else {
265 writer[_readyPromise] = Promise_reject(e);
266 }
267 }
268 stream[_state] = ERRORED;
269 stream[_storedError] = e;
270 }
271
272 function WritableStreamFinishClose(stream) {
273 TEMP_ASSERT(stream[_state] === CLOSING,
274 'stream.[[state]] is "closing".');
275 TEMP_ASSERT(stream[_writer] !== undefined,
276 'stream.[[writer]] is not undefined.');
277 stream[_state] = CLOSED;
278 v8.resolvePromise(stream[_writer][_closedPromise], undefined);
279 }
280
281 function WritableStreamFulfillWriteRequest(stream) {
282 TEMP_ASSERT(stream[_writeRequests].length !== 0,
283 'stream.[[writeRequests]] is not empty.');
284 const writeRequest = stream[_writeRequests].shift();
285 v8.resolvePromise(writeRequest, undefined);
286 }
287
288 function WritableStreamUpdateBackpressure(stream, backpressure) {
289 TEMP_ASSERT(stream[_state] === WRITABLE,
290 'stream.[[state]] is "writable".');
291 const writer = stream[_writer];
292 if (writer === undefined) {
293 return;
294 }
295 if (backpressure) {
296 writer[_readyPromise] = v8.createPromise();
297 } else {
298 TEMP_ASSERT(backpressure === false,
299 'backpressure is false.');
300 v8.resolvePromise(writer[_readyPromise], undefined);
301 }
302 }
303
304 class WritableStreamDefaultWriter {
305 constructor(stream) {
306 if (!IsWritableStream(stream)) {
307 throw new TypeError(errIllegalConstructor);
308 }
309 if (IsWritableStreamLocked(stream)) {
310 throw new TypeError(errIllegalConstructor);
311 }
312 this[_ownerWritableStream] = stream;
313 stream[_writer] = this;
314 const state = stream[_state];
315 if (state === WRITABLE || state === CLOSING) {
316 this[_closedPromise] = v8.createPromise();
317 } else if (state === CLOSED) {
318 this[_closedPromise] = Promise_resolve(undefined);
319 } else {
320 TEMP_ASSERT(state === ERRORED,
321 'state is "errored".');
322 this[_closedPromise] = Promise_reject(stream[_storedError]);
323 }
324 if (state === WRITABLE &&
325 WritableStreamDefaultControllerGetBackpressure(
326 stream[_writableStreamController])) {
327 this[_readyPromise] = v8.createPromise();
328 } else {
329 this[_readyPromise] = Promise_resolve(undefined);
330 }
331 }
332
333 get closed() {
334 if (!IsWritableStreamDefaultWriter(this)) {
335 return Promise_reject(new TypeError(errIllegalInvocation));
336 }
337 return this[_closedPromise];
338 }
339
340 get desiredSize() {
341 if (!IsWritableStreamDefaultWriter(this)) {
342 throw new TypeError(errIllegalInvocation);
343 }
344 if (this[_ownerWritableStream] === undefined) {
345 throw createWriterLockReleasedError(verbUsedToGetTheDesiredSize);
346 }
347 return WritableStreamDefaultWriterGetDesiredSize(this);
348 }
349
350 get ready() {
351 if (!IsWritableStreamDefaultWriter(this)) {
352 return Promise_reject(new TypeError(errIllegalInvocation));
353 }
354 return this[_readyPromise];
355 }
356
357 abort(reason) {
358 if (!IsWritableStreamDefaultWriter(this)) {
359 return Promise_reject(new TypeError(errIllegalInvocation));
360 }
361 if (this[_ownerWritableStream] === undefined) {
362 return Promise_reject(createWriterLockReleasedError(verbAborted));
363 }
364 return WritableStreamDefaultWriterAbort(this, reason);
365 }
366
367 close() {
368 if (!IsWritableStreamDefaultWriter(this)) {
369 return Promise_reject(new TypeError(errIllegalInvocation));
370 }
371 const stream = this[_ownerWritableStream];
372 if (stream === undefined) {
373 return Promise_reject(createWriterLockReleasedError(verbClosed));
374 }
375 if (stream[_state] === CLOSING) {
376 return Promise_reject(new TypeError(errCloseCloseRequestedStream));
377 }
378 return WritableStreamDefaultWriterClose(this);
379 }
380
381 releaseLock() {
382 if (!IsWritableStreamDefaultWriter(this)) {
383 throw new TypeError(errIllegalInvocation);
384 }
385 const stream = this[_ownerWritableStream];
386 if (stream === undefined) {
387 return;
388 }
389 TEMP_ASSERT(stream[_writer] !== undefined,
390 'stream.[[writer]] is not undefined.');
391 WritableStreamDefaultWriterRelease(this);
392 }
393
394 write(chunk) {
395 if (!IsWritableStreamDefaultWriter(this)) {
396 return Promise_reject(new TypeError(errIllegalInvocation));
397 }
398 const stream = this[_ownerWritableStream];
399 if (stream === undefined) {
400 return Promise_reject(createWriterLockReleasedError(verbWrittenTo));
401 }
402 if (stream[_state] === CLOSING) {
403 return Promise_reject(new TypeError(errWriteCloseRequestedStream));
404 }
405 return WritableStreamDefaultWriterWrite(this, chunk);
406 }
407 }
408
409 // Writable Stream Writer Abstract Operations
410
411 function IsWritableStreamDefaultWriter(x) {
412 return hasOwnProperty(x, _ownerWritableStream);
413 }
414
415 function WritableStreamDefaultWriterAbort(writer, reason) {
416 const stream = writer[_ownerWritableStream];
417 TEMP_ASSERT(stream !== undefined,
418 'stream is not undefined.');
419 return WritableStreamAbort(stream, reason);
420 }
421
422 function WritableStreamDefaultWriterClose(writer) {
423 const stream = writer[_ownerWritableStream];
424 TEMP_ASSERT(stream !== undefined,
425 'stream is not undefined.');
426 const state = stream[_state];
427 if (state === CLOSED || state === ERRORED) {
428 return Promise_reject(
429 createCannotActionOnStateStreamError('close', state));
430 }
431 TEMP_ASSERT(state === WRITABLE,
432 'state is "writable".');
433 const promise = WritableStreamAddWriteRequest(stream);
434 if (WritableStreamDefaultControllerGetBackpressure(
435 stream[_writableStreamController])) {
436 v8.resolvePromise(writer[_readyPromise], undefined);
437 }
438 stream[_state] = CLOSING;
439 WritableStreamDefaultControllerClose(stream[_writableStreamController]);
440 return promise;
441 }
442
443 function WritableStreamDefaultWriterGetDesiredSize(writer) {
444 const stream = writer[_ownerWritableStream];
445 const state = stream[_state];
446 if (state === ERRORED) {
447 return null;
448 }
449 if (state === CLOSED) {
450 return 0;
451 }
452 return WritableStreamDefaultControllerGetDesiredSize(
453 stream[_writableStreamController]);
454 }
455
456 function WritableStreamDefaultWriterRelease(writer) {
457 const stream = writer[_ownerWritableStream];
458 TEMP_ASSERT(stream !== undefined,
459 'stream is not undefined.');
460 TEMP_ASSERT(stream[_writer] === writer,
461 'stream.[[writer]] is writer.');
462 const releasedError = new TypeError(errReleasedWriterClosedPromise);
463 const state = stream[_state];
464 if (state === WRITABLE || state === CLOSING) {
465 v8.rejectPromise(writer[_closedPromise], releasedError);
466 } else {
467 writer[_closedPromise] = Promise_reject(releasedError);
468 }
469 if (state === WRITABLE &&
470 WritableStreamDefaultControllerGetBackpressure(
471 stream[_writableStreamController])) {
472 v8.rejectPromise(writer[_readyPromise], releasedError);
473 } else {
474 writer[_readyPromise] = Promise_reject(releasedError);
475 }
476 stream[_writer] = undefined;
477 writer[_ownerWritableStream] = undefined;
478 }
479
480 function WritableStreamDefaultWriterWrite(writer, chunk) {
481 const stream = writer[_ownerWritableStream];
482 TEMP_ASSERT(stream !== undefined,
483 'stream is not undefined.');
484 const state = stream[_state];
485 if (state === CLOSED || state === ERRORED) {
486 return Promise_reject(
487 createCannotActionOnStateStreamError('write to', state));
488 }
489 TEMP_ASSERT(state === WRITABLE,
490 'state is "writable".');
491 const promise = WritableStreamAddWriteRequest(stream);
492 WritableStreamDefaultControllerWrite(stream[_writableStreamController],
493 chunk);
494 return promise;
495 }
496
497 class WritableStreamDefaultController {
498 constructor(stream, underlyingSink, size, highWaterMark) {
499 if (!IsWritableStream(stream)) {
500 throw new TypeError(errIllegalConstructor);
501 }
502 if (stream[_controlledWritableStream] !== undefined) {
503 throw new TypeError(errIllegalConstructor);
504 }
505 this[_controlledWritableStream] = stream;
506 this[_underlyingSink] = underlyingSink;
507 this[_queue] = new v8.InternalPackedArray();
508 this[_queueSize] = 0;
509 this[_defaultControllerFlags] = 0;
510 const normalizedStrategy =
511 ValidateAndNormalizeQueuingStrategy(size, highWaterMark);
512 this[_strategySize] = normalizedStrategy.size;
513 this[_strategyHWM] = normalizedStrategy.highWaterMark;
514 const backpressure = WritableStreamDefaultControllerGetBackpressure(this);
515 if (backpressure) {
516 WritableStreamUpdateBackpressure(stream, backpressure);
517 }
518 const controller = this;
519 const startResult = InvokeOrNoop(underlyingSink, 'start', [this]);
520 Promise_resolve(startResult)
521 .then(
domenic 2016/11/04 17:32:17 Ahah! I found one!! Use thenPromise instead of .th
Adam Rice 2016/11/05 00:49:22 You got me. Fixed.
522 () => {
523 setDefaultControllerStartedFlag(controller, true);
524 WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
525 },
526 r => WritableStreamDefaultControllerErrorIfNeeded(controller, r));
527 }
528
529 error(e) {
530 if (!IsWritableStreamDefaultController(this)) {
531 throw new TypeError(errIllegalInvocation);
532 }
533 const state = this[_controlledWritableStream][_state];
534 if (state === CLOSED || state === ERRORED) {
535 throw createCannotActionOnStateStreamError('error', state);
536 }
537 WritableStreamDefaultControllerError(this, e);
538 }
539 }
540
541 // Writable Stream Default Controller Abstract Operations
542
543 function IsWritableStreamDefaultController(x) {
544 return hasOwnProperty(x, _underlyingSink);
545 }
546
547 function WritableStreamDefaultControllerAbort(controller, reason) {
548 controller[_queue] = v8.InternalPackedArray();
549 controller[_queueSize] = 0;
550 const sinkAbortPromise =
551 PromiseInvokeOrFallbackOrNoop(controller[_underlyingSink],
552 'abort', [reason], 'close', [controller]);
553 return thenPromise(sinkAbortPromise, () => undefined);
554 }
555
556 function WritableStreamDefaultControllerClose(controller) {
557 EnqueueValueWithSizeForController(controller, 'close', 0);
558 WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
559 }
560
561 function WritableStreamDefaultControllerGetDesiredSize(controller) {
562 const queueSize = GetTotalQueueSizeForController(controller);
563 return controller[_strategyHWM] - queueSize;
564 }
565
566 function WritableStreamDefaultControllerWrite(controller, chunk) {
567 const stream = controller[_controlledWritableStream];
568 TEMP_ASSERT(stream[_state] === WRITABLE,
569 'stream.[[state]] is "writable".');
570 let chunkSize = 1;
571 if (controller[_strategySize] !== undefined) {
572 try {
573 chunkSize = Function_call(controller[_strategySize], undefined, chunk);
574 } catch (e) {
575 WritableStreamDefaultControllerErrorIfNeeded(controller, e);
576 return Promise_reject(e);
577 }
578 }
579 const writeRecord = {chunk};
580 const lastBackpressure =
581 WritableStreamDefaultControllerGetBackpressure(controller);
582 try {
583 const enqueueResult =
584 EnqueueValueWithSizeForController(controller, writeRecord, chunkSize);
585 } catch (e) {
586 WritableStreamDefaultControllerErrorIfNeeded(controller, e);
587 return Promise_reject(e);
588 }
589 if (stream[_state] === WRITABLE) {
590 const backpressure =
591 WritableStreamDefaultControllerGetBackpressure(controller);
592 if (lastBackpressure !== backpressure) {
593 WritableStreamUpdateBackpressure(stream, backpressure);
594 }
595 }
596 WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
597 }
598
599 function WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller) {
600 const state = controller[_controlledWritableStream][_state];
601 if (state === CLOSED || state === ERRORED) {
602 return;
603 }
604 if (!getDefaultControllerStartedFlag(controller)) {
605 return;
606 }
607 if (getDefaultControllerWritingFlag(controller)) {
608 return;
609 }
610 if (controller[_queue].length === 0) {
611 return;
612 }
613 const writeRecord = PeekQueueValue(controller[_queue]);
614 if (writeRecord === 'close') {
615 WritableStreamDefaultControllerProcessClose(controller);
616 } else {
617 WritableStreamDefaultControllerProcessWrite(controller,
618 writeRecord.chunk);
619 }
620 }
621
622 function WritableStreamDefaultControllerErrorIfNeeded(controller, e) {
623 const state = controller[_controlledWritableStream][_state];
624 if (state === WRITABLE || state === CLOSING) {
625 WritableStreamDefaultControllerError(controller, e);
626 }
627 }
628
629 function WritableStreamDefaultControllerProcessClose(controller) {
630 const stream = controller[_controlledWritableStream];
631 TEMP_ASSERT(stream[_state] === CLOSING,
632 'stream.[[state]] is "closing".');
633 DequeueValueForController(controller);
634 TEMP_ASSERT(controller[_queue].length === 0,
635 'controller.[[queue]] is empty.');
636 const sinkClosePromise = PromiseInvokeOrNoop(controller[_underlyingSink],
637 'close', [controller]);
638 thenPromise(sinkClosePromise,
639 () => {
640 if (stream[_state] !== CLOSING) {
641 return;
642 }
643 WritableStreamFulfillWriteRequest(stream);
644 WritableStreamFinishClose(stream);
645 },
646 r => WritableStreamDefaultControllerErrorIfNeeded(controller, r)
647 );
648 }
649
650 function WritableStreamDefaultControllerProcessWrite(controller, chunk) {
651 setDefaultControllerWritingFlag(controller, true);
652 const sinkWritePromise = PromiseInvokeOrNoop(controller[_underlyingSink],
653 'write', [chunk, controller]);
654 thenPromise(
655 sinkWritePromise,
656 () => {
657 const stream = controller[_controlledWritableStream];
658 const state = stream[_state];
659 if (state === ERRORED || state === CLOSED) {
660 return;
661 }
662 setDefaultControllerWritingFlag(controller, false);
663 WritableStreamFulfillWriteRequest(stream);
664 const lastBackpressure =
665 WritableStreamDefaultControllerGetBackpressure(controller);
666 DequeueValueForController(controller);
667 if (state !== CLOSING) {
668 const backpressure =
669 WritableStreamDefaultControllerGetBackpressure(controller);
670 if (lastBackpressure !== backpressure) {
671 WritableStreamUpdateBackpressure(
672 controller[_controlledWritableStream], backpressure);
673 }
674 }
675 WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
676 },
677 r => WritableStreamDefaultControllerErrorIfNeeded(controller, r)
678 );
679 }
680
681 function WritableStreamDefaultControllerGetBackpressure(controller) {
682 const desiredSize =
683 WritableStreamDefaultControllerGetDesiredSize(controller);
684 return desiredSize <= 0;
685 }
686
687 function WritableStreamDefaultControllerError(controller, e) {
688 const stream = controller[_controlledWritableStream];
689 const state = stream[_state];
690 TEMP_ASSERT(state === WRITABLE || state === CLOSING,
691 'stream.[[state]] is "writable" or "closing".');
692 WritableStreamError(stream, e);
693 controller[_queue] = new v8.InternalPackedArray();
694 controller[_queueSize] = 0;
695 }
696
697 // Queue-with-Sizes Operations
698 //
699 // These differ from the versions in the standard: they take a controller
700 // argument in order to cache the total queue size. This is necessary to avoid
701 // O(N^2) behaviour.
702 function DequeueValueForController(controller) {
domenic 2016/11/04 17:32:17 IMO we really should abstract these out into a sha
Adam Rice 2016/11/05 00:49:22 Yes. I added a TODO.
703 TEMP_ASSERT(controller[_queue].length !== 0,
704 'queue is not empty.');
705 const result = controller[_queue].shift();
706 controller[_queueSize] -= result.size;
707 return result.value;
708 }
709
710 function EnqueueValueWithSizeForController(controller, value, size) {
711 size = Number(size);
712 if (!IsFiniteNonNegativeNumber(size)) {
713 throw new RangeError(errInvalidSize);
714 }
715
716 controller[_queueSize] += size;
717 controller[_queue].push({value, size});
718 }
719
720 function GetTotalQueueSizeForController(controller) {
721 return controller[_queueSize];
722 }
723
724 function PeekQueueValue(queue) {
725 TEMP_ASSERT(queue.length !== 0,
726 'queue is not empty.');
727 return queue[0].value;
728 }
729
730 // Miscellaneous Operations
731
732 // This differs from "CallOrNoop" in the ReadableStream implementation in
733 // that it takes the arguments as an array, so that multiple arguments can be
734 // passed.
735 function InvokeOrNoop(O, P, args) {
domenic 2016/11/04 17:32:17 Again it would be good to consolidate. Right now t
Adam Rice 2016/11/05 00:49:22 I don't know whether there's an additional cost fo
736 TEMP_ASSERT(IsPropertyKey(P),
737 'P is a valid property key.');
738 if (args === undefined) {
739 args = [];
740 }
741 const method = O[P];
742 if (method === undefined) {
743 return undefined;
744 }
745 if (typeof method !== 'function') {
746 throw new TypeError(templateErrorIsNotAFunction(P));
747 }
748 return Function_apply(method, O, args);
749 }
750
751 function IsFiniteNonNegativeNumber(v) {
752 return Number_isFinite(v) && v >= 0;
753 }
754
755 function PromiseInvokeOrFallbackOrNoop(O, P1, args1, P2, args2) {
756 TEMP_ASSERT(IsPropertyKey(P1),
757 'P1 is a valid property key.');
758 TEMP_ASSERT(IsPropertyKey(P2),
759 'P2 is a valid property key.');
760 try {
761 const method = O[P1];
762 if (method === undefined) {
763 return PromiseInvokeOrNoop(O, P2, args2);
764 }
765 if (typeof method !== 'function') {
766 return Promise_reject(new TypeError(templateErrorIsNotAFunction(P1)));
767 }
768 return Promise_resolve(Function_apply(method, O, args1));
769 } catch (e) {
770 return Promise_reject(e);
771 }
772 }
773
774 function PromiseInvokeOrNoop(O, P, args) {
775 try {
776 return Promise_resolve(InvokeOrNoop(O, P, args));
777 } catch (e) {
778 return Promise_reject(e);
779 }
780 }
781
782 // This is identical to the version in ReadableStream.js and should be shared.
783 function ValidateAndNormalizeQueuingStrategy(size, highWaterMark) {
784 if (size !== undefined && typeof size !== 'function') {
785 throw new TypeError(errSizeNotAFunction);
786 }
787
788 highWaterMark = Number(highWaterMark);
789 if (Number_isNaN(highWaterMark)) {
790 throw new TypeError(errInvalidHWM);
791 }
792 if (highWaterMark < 0) {
793 throw new RangeError(errInvalidHWM);
794 }
795
796 return {size, highWaterMark};
797 }
798
799 //
800 // Additions to the global object
801 //
802
803 defineProperty(global, 'WritableStream', {
804 value: WritableStream,
805 enumerable: false,
806 configurable: true,
807 writable: true
808 });
809
810 // TODO(ricea): Exports to Blink
811 });
OLDNEW
« no previous file with comments | « third_party/WebKit/LayoutTests/webexposed/global-interface-listing-shared-worker-expected.txt ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698