Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2)

Side by Side Diff: third_party/WebKit/Source/core/streams/WritableStream.js

Issue 2453713003: Implementation of WritableStream (Closed)
Patch Set: Add global-interface-listing-service-worker Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 // Implementation of WritableStream for Blink. See
6 // https://streams.spec.whatwg.org/#ws The implementation closely follows the
7 // standard, except where required for performanc or integration with Blink. In
tyoshino (SeeGerritForStatus) 2016/11/01 05:54:09 typo: performance
Adam Rice 2016/11/02 11:54:42 Done.
8 // particular, classes, methods and abstract operations are implemented in the
9 // same order as in the standard, to simplify side-by-side reading.
10
11 (function(global, binding, v8) {
12 'use strict';
13
14 // Private symbols. These correspond to the internal slots in the standard.
15 // "[[X]]" in the standard is spelt _X in this implementation.
16 const _state = v8.createPrivateSymbol('[[state]]');
17 const _storedError = v8.createPrivateSymbol('[[storedError]]');
18 const _writer = v8.createPrivateSymbol('[[writer]]');
19 const _writableStreamController =
20 v8.createPrivateSymbol('[[writableStreamController]]');
21 const _writeRequests = v8.createPrivateSymbol('[[writeRequests]]');
22 const _closedPromise = v8.createPrivateSymbol('[[closedPromise]]');
23 const _ownerWritableStream =
24 v8.createPrivateSymbol('[[ownerWritableStream]]');
25 const _readyPromise = v8.createPrivateSymbol('[[readyPromise]]');
26 const _controlledWritableStream =
27 v8.createPrivateSymbol('[[controlledWritableStream]]');
28 const _queue = v8.createPrivateSymbol('[[queue]]');
29 const _queueSize = v8.createPrivateSymbol('[[queueSize]]');
30 const _started = v8.createPrivateSymbol('[[started]]');
31 const _strategyHWM = v8.createPrivateSymbol('[[strategyHWM]]');
32 const _strategySize = v8.createPrivateSymbol('[[strategySize]]');
33 const _underlyingSink = v8.createPrivateSymbol('[[underlyingSink]]');
34 const _writing = v8.createPrivateSymbol('[[writing]]');
35
36 // _defaultControllerFlags combines WritableStreamDefaultController's internal
37 // slots [[started]] and [[writing]] into a single bitmask for efficiency.
38 const _defaultControllerFlags =
39 v8.createPrivateSymbol('[[defaultControllerFlags]]');
40 const FLAG_STARTED = 0b1;
41 const FLAG_WRITING = 0b10;
42
43 // For efficiency, WritableStream [[state]] contains numeric values.
44 const WRITABLE = 0;
45 const CLOSING = 1;
46 const CLOSED = 2;
47 const ERRORED = 3;
48
49 // Javascript functions. It is important to use these copies, as the ones on
50 // the global object may have been overwritten. See "V8 Extras Design Doc",
51 // section "Security Considerations".
52 // https://docs.google.com/document/d/1AT5-T0aHGp7Lt29vPWFr2-qG8r3l9CByyvKwEuA 8Ec0/edit#heading=h.9yixony1a18r
53 const undefined = global.undefined;
54
55 const defineProperty = global.Object.defineProperty;
56 const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty);
57
58 const Function_call = v8.uncurryThis(global.Function.prototype.call);
59 const Function_apply = v8.uncurryThis(global.Function.prototype.apply);
60
61 const TypeError = global.TypeError;
62 const RangeError = global.RangeError;
63
64 const Boolean = global.Boolean;
65 const Number = global.Number;
66 const Number_isNaN = Number.isNaN;
67 const Number_isFinite = Number.isFinite;
68
69 const Promise = global.Promise;
70 const thenPromise = v8.uncurryThis(Promise.prototype.then);
71 const Promise_resolve = v8.simpleBind(Promise.resolve, Promise);
72 const Promise_reject = v8.simpleBind(Promise.reject, Promise);
73
74 // User-visible strings. Many of these should be shared with ReadableStream.
75 const errIllegalInvocation = 'Illegal invocation';
76 const errIllegalConstructor = 'Illegal constructor';
77 const errInvalidType = 'Invalid type is specified';
78 const errAbortLockedStream = 'Cannot abort a writable stream that is locked t o a writer';
79 const errStreamAborted = 'The stream has been aborted';
80 const errDesiredSizeReleasedLock = 'Attempt to read desiredSize after releaseL ock';
tyoshino (SeeGerritForStatus) 2016/11/01 07:43:40 unused?
Adam Rice 2016/11/02 11:54:42 Sorry, yes. I changed the code to use errWriterLoc
81 const errWriterLockReleasedPrefix = 'This writable stream writer has been rele ased and cannot be ';
82 const errCloseCloseRequestedStream =
83 'Cannot close a readable stream that has already been requested to be clos ed';
84 const errWriteCloseRequestedStream =
85 'Cannot write to a readable stream that has already been requested to be c losed';
86 const errClosing =
tyoshino (SeeGerritForStatus) 2016/11/01 07:43:40 unused?
Adam Rice 2016/11/02 11:54:42 Sorry. Same thing as with errDesiredSizeReleasedLo
87 'Cannot release the lock on a closing stream';
88 const templateCannotActionOnStateStream =
tyoshino (SeeGerritForStatus) 2016/11/01 07:43:40 errTemplate... or templateError...?
Adam Rice 2016/11/02 11:54:41 I went with templateError.
89 (action, state) => "`Cannot ${action} a ${state} writable stream`";
90 const errReleasedWriterClosedPromise =
91 'This writable stream writer has been released and cannot be used to monit or the stream\'s state';
92 const templateIsNotAFunction = f => "`${f} is not a function`";
93 const errSizeNotAFunction =
94 'A queuing strategy\'s size property must be a function';
95 const errInvalidHWM =
96 'A queueing strategy\'s highWaterMark property must be a nonnegative, non- NaN number';
tyoshino (SeeGerritForStatus) 2016/11/01 07:43:40 how about non-negative? if nonnegative would be be
tyoshino (SeeGerritForStatus) 2016/11/01 07:43:41 use either of queueing or queuing everywhere?
Adam Rice 2016/11/02 11:54:42 The Streams Standard doesn't agree on this either.
Adam Rice 2016/11/02 11:54:42 It seems the standard spelling changed while I was
tyoshino (SeeGerritForStatus) 2016/11/04 06:04:18 Thanks for filing that!
97 const errInvalidSize =
98 'The return value of a queuing strategy\'s size function must be a finite, non-NaN, non-negative number';
99
100 // These verbs are used after errWriterLockReleasedPrefix
101 const verbUsedToGetTheDesiredSize = 'used to get the desiredSize';
102 const verbAborted = 'aborted';
103 const verbClosed = 'closed';
104 const verbReleasedAgain = 'released again';
105 const verbWrittenTo = 'written to';
106
107 // Utility functions (not from the standard).
108 function createWriterLockReleasedError(verb) {
109 return new TypeError(errWriterLockReleasedPrefix + verb);
110 }
111
112 const stateNames = {CLOSED: 'closed', ERRORED: 'errored'};
113 function createCannotActionOnStateStreamError(action, state) {
114 return new TypeError(templateCannotActionOnStateStream(action,
tyoshino (SeeGerritForStatus) 2016/11/01 07:43:40 assert that we have an entry for state in stateNam
Adam Rice 2016/11/02 11:54:42 Done.
115 stateNames[state]));
116 }
117
118 function getDefaultControllerStartedFlag(controller) {
119 return Boolean(controller[_defaultControllerFlags] & FLAG_STARTED);
120 }
121
122 function setDefaultControllerFlag(controller, flag, value) {
tyoshino (SeeGerritForStatus) 2016/11/01 07:43:40 move this to above getDefaultControllerStartedFlag
Adam Rice 2016/11/02 11:54:42 Done.
123 let flags = controller[_defaultControllerFlags];
124 flags = (flags & ~flag) | (value ? flag : 0);
tyoshino (SeeGerritForStatus) 2016/11/01 07:43:41 how about this? if (value) { flags = flags | fl
Adam Rice 2016/11/02 11:54:42 Done, thanks.
125 controller[_defaultControllerFlags] = flags;
126 }
127
128 function setDefaultControllerStartedFlag(controller, value) {
129 setDefaultControllerFlag(controller, FLAG_STARTED, value);
130 }
131
132 function getDefaultControllerWritingFlag(controller) {
133 return Boolean(controller[_defaultControllerFlags] & FLAG_WRITING);
134 }
135
136 function setDefaultControllerWritingFlag(controller, value) {
137 setDefaultControllerFlag(controller, FLAG_WRITING, value);
138 }
139
140 function rejectPromises(array, e) {
141 // array is an InternalPackedArray so forEach won't work.
142 for (let i = 0; i < array.length; ++i) {
143 v8.rejectPromise(array[i], e);
144 }
145 }
146
147 // https://tc39.github.io/ecma262/#sec-ispropertykey
148 function IsPropertyKey(argument) {
149 return typeof argument === 'string' || typeof argument === 'symbol';
150 }
151
152 // TODO(ricea): Remove all asserts once the implementation has stabilised.
153 function TEMP_ASSERT(predicate, message) {
154 if (!predicate) {
tyoshino (SeeGerritForStatus) 2016/11/01 07:43:40 early return when predicate is true?
Adam Rice 2016/11/02 11:54:42 Done.
155 v8.log(`Assertion failed: ${message}\n`);
156 v8.logStackTrace();
157 class WritableStreamInternalError {
158 }
159 throw new WritableStreamInternalError();
160 }
161 }
162
163 function TEMP_LOG(message) {
tyoshino (SeeGerritForStatus) 2016/11/01 07:43:40 unused?
Adam Rice 2016/11/02 11:54:42 Removed.
164 v8.log(message + '\n');
165 }
166
167 class WritableStream {
168 constructor(underlyingSink = {}, { size, highWaterMark = 1 } = {}) {
169 this[_state] = WRITABLE;
170 this[_storedError] = undefined;
171 this[_writer] = undefined;
172 this[_writableStreamController] = undefined;
173 this[_writeRequests] = new v8.InternalPackedArray();
174 let type = underlyingSink.type;
tyoshino (SeeGerritForStatus) 2016/11/01 05:54:09 use const where possible though "let" is the same
Adam Rice 2016/11/02 11:54:42 Sorry. I had a todo to change all the "let"s to "c
tyoshino (SeeGerritForStatus) 2016/11/04 06:04:19 ok. no problem. thanks!
175 if (type !== undefined) {
176 throw new RangeError(errInvalidType);
177 }
178 this[_writableStreamController] =
179 new WritableStreamDefaultController(this, underlyingSink, size,
180 highWaterMark);
181 }
182
183 get locked() {
184 if (!IsWritableStream(this)) {
185 throw new TypeError(errIllegalInvocation);
186 }
187 return IsWritableStreamLocked(this);
188 }
189
190 abort(reason) {
191 if (!IsWritableStream(this)) {
192 return Promise_reject(new TypeError(errIllegalInvocation));
193 }
194 if (IsWritableStreamLocked(this)) {
195 return Promise_reject(new TypeError(errAbortLockedStream));
196 }
197 return WritableStreamAbort(this, reason);
198 }
tyoshino (SeeGerritForStatus) 2016/11/01 05:54:09 blank line
Adam Rice 2016/11/02 11:54:42 Done.
199 getWriter() {
200 if (!IsWritableStream(this)) {
201 throw new TypeError(errIllegalInvocation);
202 }
203 return AcquireWritableStreamDefaultWriter(this);
204 }
205 }
206
207 // General Writable Stream Abstract Operations
tyoshino (SeeGerritForStatus) 2016/11/01 05:54:09 blank line
Adam Rice 2016/11/02 11:54:42 Done.
208 function AcquireWritableStreamDefaultWriter(stream) {
209 return new WritableStreamDefaultWriter(stream);
210 }
211
212 function IsWritableStream(x) {
213 return hasOwnProperty(x, _writableStreamController);
214 }
215
216 function IsWritableStreamLocked(stream) {
217 TEMP_ASSERT(IsWritableStream(stream),
218 '! IsWritableStream(stream) is true.');
219 return stream[_writer] !== undefined;
220 }
221
222 function WritableStreamAbort(stream, reason) {
223 let state = stream[_state];
224 if (state === CLOSED) {
225 return Promise_resolve(undefined);
226 }
227 if (state === ERRORED) {
228 return Promise_reject(stream[_storedError]);
229 }
230 TEMP_ASSERT(state === WRITABLE || state === CLOSING,
231 'state is "writable" or "closing".');
232 let error = new TypeError(errStreamAborted);
233 WritableStreamError(stream, error);
234 return WritableStreamDefaultControllerAbort(
235 stream[_writableStreamController], reason);
236 }
237
238 // Writable Stream Abstract Operations Used by Controllers
239
240 function WritableStreamAddWriteRequest(stream) {
241 TEMP_ASSERT(IsWritableStreamLocked(stream),
242 '! IsWritableStreamLocked(writer) is true.');
243 TEMP_ASSERT(stream[_state] === WRITABLE,
244 'stream.[[state]] is "writable".');
245 let promise = v8.createPromise();
246 stream[_writeRequests].push(promise);
247 return promise;
248 }
249
250 function WritableStreamError(stream, e) {
251 let state = stream[_state];
252 TEMP_ASSERT(state === WRITABLE || state === CLOSING,
253 'state is "writable" or "closing".');
254 rejectPromises(stream[_writeRequests], e);
255 stream[_writeRequests] = new v8.InternalPackedArray();
256 let writer = stream[_writer];
257 if (writer !== undefined) {
258 v8.rejectPromise(writer[_closedPromise], e);
259 if (state === WRITABLE &&
260 WritableStreamDefaultControllerGetBackpressure(
261 stream[_writableStreamController])) {
262 v8.rejectPromise(writer[_readyPromise], e);
263 } else {
264 writer[_readyPromise] = Promise_reject(e);
265 }
266 }
267 stream[_state] = ERRORED;
268 stream[_storedError] = e;
269 }
270
271 function WritableStreamFinishClose(stream) {
272 TEMP_ASSERT(stream[_state] === CLOSING,
273 'stream.[[state]] is "closing".');
274 TEMP_ASSERT(stream[_writer] !== undefined,
275 'stream.[[writer]] is not undefined.');
276 stream[_state] = CLOSED;
277 v8.resolvePromise(stream[_writer][_closedPromise], undefined);
278 }
279
280 function WritableStreamFulfillWriteRequest(stream) {
281 TEMP_ASSERT(stream[_writeRequests].length !== 0,
282 'stream.[[writeRequests]] is not empty.');
283 let writeRequest = stream[_writeRequests].shift();
284 v8.resolvePromise(writeRequest, undefined);
285 }
286
287 function WritableStreamUpdateBackpressure(stream, backpressure) {
288 TEMP_ASSERT(stream[_state] === WRITABLE,
289 'stream.[[state]] is "writable".');
290 let writer = stream[_writer];
291 if (writer === undefined) {
292 return;
293 }
294 if (backpressure) {
295 writer[_readyPromise] = v8.createPromise();
296 } else {
297 TEMP_ASSERT(backpressure === false,
298 'backpressure is false.');
299 v8.resolvePromise(writer[_readyPromise], undefined);
300 }
301 }
302
303 class WritableStreamDefaultWriter {
304 constructor(stream) {
305 if (!IsWritableStream(stream)) {
306 throw TypeError(errInvalidType);
tyoshino (SeeGerritForStatus) 2016/11/01 05:54:09 For now, this should be errIllegalConstructor? err
Adam Rice 2016/11/02 11:54:42 Fixed. I agree the clarity of the error could be i
307 }
308 if (IsWritableStreamLocked(stream)) {
309 throw TypeError(errIllegalConstructor);
310 }
311 this[_ownerWritableStream] = stream;
312 stream[_writer] = this;
313 let state = stream[_state];
314 if (state === WRITABLE || state === CLOSING) {
315 this[_closedPromise] = v8.createPromise();
316 } else if (state === CLOSED) {
317 this[_closedPromise] = Promise_resolve(undefined);
318 } else {
319 TEMP_ASSERT(state === ERRORED,
320 'state is "errored".');
321 this[_closedPromise] = Promise_reject(stream[_storedError]);
322 }
323 if (state === WRITABLE &&
324 WritableStreamDefaultControllerGetBackpressure(
325 stream[_writableStreamController])) {
326 this[_readyPromise] = v8.createPromise();
327 } else {
328 this[_readyPromise] = Promise_resolve(undefined);
329 }
330 }
331
332 get closed() {
333 if (!IsWritableStreamDefaultWriter(this)) {
334 return Promise_reject(new TypeError(errIllegalInvocation));
335 }
336 return this[_closedPromise];
337 }
338
339 get desiredSize() {
340 if (!IsWritableStreamDefaultWriter(this)) {
341 throw TypeError(errIllegalInvocation);
342 }
343 if (this[_ownerWritableStream] === undefined) {
344 throw createWriterLockReleasedError(verbUsedToGetTheDesiredSize);
345 }
346 return WritableStreamDefaultWriterGetDesiredSize(this);
347 }
348
349 get ready() {
350 if (!IsWritableStreamDefaultWriter(this)) {
351 return Promise_reject(new TypeError(errIllegalInvocation));
352 }
353 return this[_readyPromise];
354 }
355
356 abort(reason) {
357 if (!IsWritableStreamDefaultWriter(this)) {
358 return Promise_reject(new TypeError(errIllegalInvocation));
359 }
360 if (this[_ownerWritableStream] === undefined) {
361 return Promise_reject(createWriterLockReleasedError(verbAborted));
362 }
363 return WritableStreamDefaultWriterAbort(this, reason);
364 }
365
366 close() {
367 if (!IsWritableStreamDefaultWriter(this)) {
368 return Promise_reject(new TypeError(errIllegalInvocation));
369 }
370 let stream = this[_ownerWritableStream];
371 if (stream === undefined) {
372 return Promise_reject(createWriterLockReleasedError(verbClosed));
373 }
374 if (stream[_state] === CLOSING) {
375 return Promise_reject(new TypeError(errCloseCloseRequestedStream));
376 }
377 return WritableStreamDefaultWriterClose(this);
378 }
379
380 releaseLock() {
381 if (!IsWritableStreamDefaultWriter(this)) {
382 return Promise_reject(TypeError(errIllegalInvocation));
tyoshino (SeeGerritForStatus) 2016/11/01 07:43:40 just throw
Adam Rice 2016/11/02 11:54:42 Fixed. I seem to have done a particular poor job t
tyoshino (SeeGerritForStatus) 2016/11/04 06:04:19 ok!
383 }
384 let stream = this[_ownerWritableStream];
385 if (stream === undefined) {
386 return Promise_reject(createWriterLockReleasedError(verbReleasedAgain));
tyoshino (SeeGerritForStatus) 2016/11/01 07:43:40 return undefined. i.e. no-op
Adam Rice 2016/11/02 11:54:42 Done. I'm adding a test for this upstream.
tyoshino (SeeGerritForStatus) 2016/11/04 06:04:19 Great!
387 }
388 TEMP_ASSERT(stream[_writer] !== undefined,
389 'stream.[[writer]] is not undefined.');
390 return WritableStreamDefaultWriterRelease(this);
391 }
392
393 write(chunk) {
394 if (!IsWritableStreamDefaultWriter(this)) {
395 return Promise_reject(TypeError(errIllegalInvocation));
tyoshino (SeeGerritForStatus) 2016/11/01 07:43:40 new TypeError
Adam Rice 2016/11/02 11:54:42 Done.
396 }
397 let stream = this[_ownerWritableStream];
398 if (stream === undefined) {
399 return Promise_reject(createWriterLockReleasedError(verbWrittenTo));
400 }
401 if (stream[_state] === CLOSING) {
402 return Promise_reject(new TypeError(errWriteCloseRequestedStream));
403 }
404 return WritableStreamDefaultWriterWrite(this, chunk);
405 }
406 }
407
408 // Writable Stream Writer Abstract Operations
409
410 function IsWritableStreamDefaultWriter(x) {
411 return hasOwnProperty(x, _ownerWritableStream);
412 }
413
414 function WritableStreamDefaultWriterAbort(writer, reason) {
415 let stream = writer[_ownerWritableStream];
416 TEMP_ASSERT(stream !== undefined,
417 'stream is not undefined.');
418 return WritableStreamAbort(stream, reason);
419 }
420
421 function WritableStreamDefaultWriterClose(writer) {
422 let stream = writer[_ownerWritableStream];
423 TEMP_ASSERT(stream !== undefined,
424 'stream is not undefined.');
425 let state = stream[_state];
426 if (state === CLOSED || state === ERRORED) {
427 return Promise_reject(
428 createCannotActionOnStateStreamError('close', state));
429 }
430 TEMP_ASSERT(state === WRITABLE,
431 'state is "writable".');
432 let promise = WritableStreamAddWriteRequest(stream);
433 if (WritableStreamDefaultControllerGetBackpressure(
434 stream[_writableStreamController])) {
435 v8.resolvePromise(writer[_readyPromise], undefined);
436 }
437 stream[_state] = CLOSING;
438 WritableStreamDefaultControllerClose(stream[_writableStreamController]);
439 return promise;
440 }
441
442 function WritableStreamDefaultWriterGetDesiredSize(writer) {
443 let stream = writer[_ownerWritableStream];
444 let state = stream[_state];
445 if (state === ERRORED) {
446 return null;
447 }
448 if (state === CLOSED) {
449 return 0;
450 }
451 return WritableStreamDefaultControllerGetDesiredSize(
452 stream[_writableStreamController]);
453 }
454
455 function WritableStreamDefaultWriterRelease(writer) {
456 let stream = writer[_ownerWritableStream];
457 TEMP_ASSERT(stream !== undefined,
458 'stream is not undefined.');
459 TEMP_ASSERT(stream[_writer] === writer,
460 'stream.[[writer]] is writer.');
461 let releasedError = new TypeError(errReleasedWriterClosedPromise);
462 let state = stream[_state];
463 if (state === WRITABLE || state === CLOSING) {
464 v8.rejectPromise(writer[_closedPromise], releasedError);
465 } else {
466 writer[_closedPromise] = Promise_reject(releasedError);
467 }
468 if (state === WRITABLE &&
469 WritableStreamDefaultControllerGetBackpressure(
470 stream[_writableStreamController])) {
471 v8.rejectPromise(writer[_readyPromise], releasedError);
472 } else {
473 writer[_readyPromise] = Promise_reject(releasedError);
474 }
475 stream[_writer] = undefined;
476 writer[_ownerWritableStream] = undefined;
477 }
478
479 function WritableStreamDefaultWriterWrite(writer, chunk) {
480 let stream = writer[_ownerWritableStream];
481 TEMP_ASSERT(stream !== undefined,
482 'stream is not undefined.');
483 let state = stream[_state];
484 if (state === CLOSED || state === ERRORED) {
485 return Promise_reject(
486 createCannotActionOnStateStreamError('write to', state));
487 }
488 TEMP_ASSERT(state === WRITABLE,
489 'state is "writable".');
490 let promise = WritableStreamAddWriteRequest(stream);
491 WritableStreamDefaultControllerWrite(stream[_writableStreamController],
492 chunk);
493 return promise;
494 }
495
496 class WritableStreamDefaultController {
497 constructor(stream, underlyingSink, size, highWaterMark) {
498 if (!IsWritableStream(stream)) {
499 throw new TypeError(errIllegalConstructor);
500 }
501 if (stream[_controlledWritableStream] !== undefined) {
502 throw new TypeError(errIllegalInvocation);
tyoshino (SeeGerritForStatus) 2016/11/01 07:43:40 use errIllegalConstructor for now? or shall we dec
Adam Rice 2016/11/02 11:54:42 I think errIllegalConstructor is fine. Since calli
503 }
504 this[_controlledWritableStream] = stream;
505 this[_underlyingSink] = underlyingSink;
506 this[_queue] = new v8.InternalPackedArray();
507 this[_queueSize] = 0;
508 this[_defaultControllerFlags] = 0;
509 let normalizedStrategy =
510 ValidateAndNormalizeQueuingStrategy(size, highWaterMark);
511 this[_strategySize] = normalizedStrategy.size;
512 this[_strategyHWM] = normalizedStrategy.highWaterMark;
513 let backpressure = WritableStreamDefaultControllerGetBackpressure(this);
514 if (backpressure) {
515 WritableStreamUpdateBackpressure(stream, backpressure);
516 }
517 let controller = this;
518 let startResult = InvokeOrNoop(underlyingSink, 'start', [this]);
519 Promise_resolve(startResult)
520 .then(
521 () => {
522 setDefaultControllerStartedFlag(controller, true);
523 WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
524 },
525 r => WritableStreamDefaultControllerErrorIfNeeded(controller, r)
526 );
527 }
528
529 error(e) {
530 if (!IsWritableStreamDefaultController(this)) {
531 throw new TypeError(errIllegalInvocation);
532 }
533 let state = this[_controlledWritableStream][_state];
534 if (state === CLOSED || state === ERRORED) {
535 throw createCannotActionOnStateStreamError('error', state);
536 }
537 WritableStreamDefaultControllerError(this, e);
538 }
539 }
540
541 // Writable Stream Default Controller Abstract Operations
542
543 function IsWritableStreamDefaultController(x) {
544 return hasOwnProperty(x, _underlyingSink);
545 }
546
547 function WritableStreamDefaultControllerAbort(controller, reason) {
548 controller[_queue] = v8.InternalPackedArray();
549 controller[_queueSize] = 0;
550 let sinkAbortPromise =
551 PromiseInvokeOrFallbackOrNoop(controller[_underlyingSink],
552 'abort', [reason], 'close', [controller]);
553 return thenPromise(sinkAbortPromise, () => undefined);
554 }
555
556 function WritableStreamDefaultControllerClose(controller) {
557 EnqueueValueWithSizeForController(controller, 'close', 0);
558 WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
559 }
560
561 function WritableStreamDefaultControllerGetDesiredSize(controller) {
562 let queueSize = GetTotalQueueSizeForController(controller);
563 return controller[_strategyHWM] - queueSize;
564 }
565
566 function WritableStreamDefaultControllerWrite(controller, chunk) {
567 let stream = controller[_controlledWritableStream];
568 TEMP_ASSERT(stream[_state] === WRITABLE,
569 'stream.[[state]] is "writable".');
570 let chunkSize = 1;
571 if (controller[_strategySize] !== undefined) {
572 try {
573 chunkSize = Function_call(controller[_strategySize], undefined, chunk);
574 } catch (e) {
575 WritableStreamDefaultControllerErrorIfNeeded(controller, e);
576 return Promise_reject(e);
577 }
578 }
579 let writeRecord = {chunk};
580 let lastBackpressure =
581 WritableStreamDefaultControllerGetBackpressure(controller);
582 try {
583 let enqueueResult =
584 EnqueueValueWithSizeForController(controller, writeRecord, chunkSize);
585 } catch (e) {
586 WritableStreamDefaultControllerErrorIfNeeded(controller, e);
587 return Promise_reject(e);
588 }
589 if (stream[_state] === WRITABLE) {
590 let backpressure =
591 WritableStreamDefaultControllerGetBackpressure(controller);
592 if (lastBackpressure !== backpressure) {
593 WritableStreamUpdateBackpressure(stream, backpressure);
594 }
595 }
596 WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
597 }
598
599 function WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller) {
600 let state = controller[_controlledWritableStream][_state];
601 if (state === CLOSED || state === ERRORED) {
602 return;
603 }
604 if (!getDefaultControllerStartedFlag(controller)) {
605 return;
606 }
607 if (getDefaultControllerWritingFlag(controller)) {
608 return;
609 }
610 if (controller[_queue].length === 0) {
611 return;
612 }
613 let writeRecord = PeekQueueValue(controller[_queue]);
614 if (writeRecord === 'close') {
615 WritableStreamDefaultControllerProcessClose(controller);
616 } else {
617 WritableStreamDefaultControllerProcessWrite(controller,
618 writeRecord.chunk);
619 }
620 }
621
622 function WritableStreamDefaultControllerErrorIfNeeded(controller, e) {
623 let state = controller[_controlledWritableStream][_state];
624 if (state === WRITABLE || state === CLOSING) {
625 WritableStreamDefaultControllerError(controller, e);
626 }
627 }
628
629 function WritableStreamDefaultControllerProcessClose(controller) {
630 let stream = controller[_controlledWritableStream];
631 TEMP_ASSERT(stream[_state] === CLOSING,
632 'stream.[[state]] is "closing".');
633 DequeueValueForController(controller);
634 TEMP_ASSERT(controller[_queue].length === 0,
635 'controller.[[queue]] is empty.');
636 let sinkClosePromise = PromiseInvokeOrNoop(controller[_underlyingSink],
637 'close', [controller]);
638 thenPromise(sinkClosePromise,
639 () => {
640 if (stream[_state] !== CLOSING) {
641 return;
642 }
643 WritableStreamFulfillWriteRequest(stream);
644 WritableStreamFinishClose(stream);
645 },
646 r => WritableStreamDefaultControllerErrorIfNeeded(controller, r)
647 );
tyoshino (SeeGerritForStatus) 2016/11/01 07:43:40 this is also formatted by PRESUBMIT?
Adam Rice 2016/11/02 11:54:42 No. Any formatting errors in this file are purely
648 }
649
650 function WritableStreamDefaultControllerProcessWrite(controller, chunk) {
651 setDefaultControllerWritingFlag(controller, true);
652 let sinkWritePromise = PromiseInvokeOrNoop(controller[_underlyingSink],
653 'write', [chunk, controller]);
654 thenPromise(
655 sinkWritePromise,
656 () => {
657 let stream = controller[_controlledWritableStream];
658 let state = stream[_state];
659 if (state === ERRORED || state === CLOSED) {
660 return;
661 }
662 setDefaultControllerWritingFlag(controller, false);
663 WritableStreamFulfillWriteRequest(stream);
664 let lastBackpressure =
665 WritableStreamDefaultControllerGetBackpressure(controller);
666 DequeueValueForController(controller);
667 if (state !== CLOSING) {
668 let backpressure =
669 WritableStreamDefaultControllerGetBackpressure(controller);
670 if (lastBackpressure !== backpressure) {
671 WritableStreamUpdateBackpressure(
672 controller[_controlledWritableStream], backpressure);
673 }
674 }
675 WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
676 },
677 r => WritableStreamDefaultControllerErrorIfNeeded(controller, r)
678 );
679 }
680
681 function WritableStreamDefaultControllerGetBackpressure(controller) {
682 let desiredSize = WritableStreamDefaultControllerGetDesiredSize(controller);
683 return desiredSize <= 0;
684 }
685
686 function WritableStreamDefaultControllerError(controller, e) {
687 let stream = controller[_controlledWritableStream];
688 let state = stream[_state];
689 TEMP_ASSERT(state === WRITABLE || state === CLOSING,
690 'stream.[[state]] is "writable" or "closing".');
691 WritableStreamError(stream, e);
692 controller[_queue] = new v8.InternalPackedArray();
693 controller[_queueSize] = 0;
694 }
695
696 // Queue-with-Sizes Operations
697 //
698 // These differ from the versions in the standard: they take a controller
699 // argument in order to cache the total queue size. This is necessary to avoid
700 // O(N^2) behaviour.
701 function DequeueValueForController(controller) {
702 TEMP_ASSERT(controller[_queue].length !== 0,
703 'queue is not empty.');
704 const result = controller[_queue].shift();
705 controller[_queueSize] -= result.size;
706 return result.value;
707 }
708
709 function EnqueueValueWithSizeForController(controller, value, size) {
710 size = Number(size);
711 if (!IsFiniteNonNegativeNumber(size)) {
712 throw new RangeError(errInvalidSize);
713 }
714
715 controller[_queueSize] += size;
716 controller[_queue].push({value, size});
717 }
718
719 function GetTotalQueueSizeForController(controller) {
720 return controller[_queueSize];
721 }
722
723 function PeekQueueValue(queue) {
724 TEMP_ASSERT(queue.length !== 0,
725 'queue is not empty.');
726 return queue[0].value;
727 }
728
729 // Miscellaneous Operations
730
731 // This differs from "CallOrNoop" in the ReadableStream implementation in
732 // that it takes the arguments as an array, so that multiple arguments can be
733 // passed.
734 function InvokeOrNoop(O, P, args) {
735 TEMP_ASSERT(IsPropertyKey(P),
736 'P is a valid property key.');
737 if (args === undefined) {
738 args = [];
739 }
740 let method = O[P];
741 if (method === undefined) {
742 return undefined;
743 }
744 if (typeof method !== 'function') {
745 throw new TypeError(templateIsNotAFunction(P));
746 }
747 return Function_apply(method, O, args);
748 }
749
750 function IsFiniteNonNegativeNumber(v) {
751 return !Number_isNaN(v) && Number_isFinite(v) && v >= 0;
tyoshino (SeeGerritForStatus) 2016/11/01 07:43:41 Isn't it sufficient to have only Number_isFinite
Adam Rice 2016/11/02 11:54:42 Yes. Done.
752 }
753
754 function PromiseInvokeOrFallbackOrNoop(O, P1, args1, P2, args2) {
755 TEMP_ASSERT(IsPropertyKey(P1),
756 'P1 is a valid property key.');
757 TEMP_ASSERT(IsPropertyKey(P2),
758 'P2 is a valid property key.');
759 try {
760 let method = O[P1];
761 if (method === undefined) {
762 return PromiseInvokeOrNoop(O, P2, args2);
763 }
764 if (typeof method !== 'function') {
765 return Promise_reject(new TypeError(templateIsNotAFunction(P1)));
766 }
767 return Promise_resolve(Function_apply(method, O, args1));
768 } catch (e) {
769 return Promise_reject(e);
770 }
771 }
772
773 function PromiseInvokeOrNoop(O, P, args) {
774 try {
775 return Promise_resolve(InvokeOrNoop(O, P, args));
776 } catch (e) {
777 return Promise_reject(e);
778 }
779 }
780
781 // This is identical to the version in ReadableStream.js and should be shared.
782 function ValidateAndNormalizeQueuingStrategy(size, highWaterMark) {
783 if (size !== undefined && typeof size !== 'function') {
784 throw new TypeError(errSizeNotAFunction);
785 }
786
787 highWaterMark = Number(highWaterMark);
788 if (Number_isNaN(highWaterMark)) {
789 throw new TypeError(errInvalidHWM);
790 }
791 if (highWaterMark < 0) {
792 throw new RangeError(errInvalidHWM);
793 }
794
795 return {size, highWaterMark};
796 }
797
798 //
799 // Additions to the global object
800 //
801
802 defineProperty(global, 'WritableStream', {
803 value: WritableStream,
804 enumerable: false,
805 configurable: true,
806 writable: true
807 });
808
809 // TODO(ricea): Exports to Blink
810 });
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698