Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2)

Side by Side Diff: third_party/WebKit/Source/core/streams/WritableStream.js

Issue 2823563002: Unified error handling for WritableStream (Closed)
Patch Set: SharedWorker tests failing due to issue 712124. Redisable them. Created 3 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 // Implementation of WritableStream for Blink. See 5 // Implementation of WritableStream for Blink. See
6 // https://streams.spec.whatwg.org/#ws. The implementation closely follows the 6 // https://streams.spec.whatwg.org/#ws. The implementation closely follows the
7 // standard, except where required for performance or integration with Blink. In 7 // standard, except where required for performance or integration with Blink. In
8 // particular, classes, methods and abstract operations are implemented in the 8 // particular, classes, methods and abstract operations are implemented in the
9 // same order as in the standard, to simplify side-by-side reading. 9 // same order as in the standard, to simplify side-by-side reading.
10 10
(...skipping 26 matching lines...) Expand all
37 const _queue = v8.createPrivateSymbol('[[queue]]'); 37 const _queue = v8.createPrivateSymbol('[[queue]]');
38 const _queueTotalSize = v8.createPrivateSymbol('[[queueTotalSize]]'); 38 const _queueTotalSize = v8.createPrivateSymbol('[[queueTotalSize]]');
39 const _started = v8.createPrivateSymbol('[[started]]'); 39 const _started = v8.createPrivateSymbol('[[started]]');
40 const _strategyHWM = v8.createPrivateSymbol('[[strategyHWM]]'); 40 const _strategyHWM = v8.createPrivateSymbol('[[strategyHWM]]');
41 const _strategySize = v8.createPrivateSymbol('[[strategySize]]'); 41 const _strategySize = v8.createPrivateSymbol('[[strategySize]]');
42 const _underlyingSink = v8.createPrivateSymbol('[[underlyingSink]]'); 42 const _underlyingSink = v8.createPrivateSymbol('[[underlyingSink]]');
43 43
44 // Numeric encodings of states 44 // Numeric encodings of states
45 const WRITABLE = 0; 45 const WRITABLE = 0;
46 const CLOSED = 1; 46 const CLOSED = 1;
47 const ERRORED = 2; 47 const ERRORING = 2;
48 const ERRORED = 3;
domenic 2017/04/17 22:20:28 I guess the tests prove me wrong, but I would have
Adam Rice 2017/04/18 02:55:45 Only one state is active at a time. That's why the
48 49
49 // Mask to extract or assign states to _stateAndFlags 50 // Mask to extract or assign states to _stateAndFlags
50 const STATE_MASK = 0xF; 51 const STATE_MASK = 0xF;
51 52
52 const BACKPRESSURE_FLAG = 0x10; 53 const BACKPRESSURE_FLAG = 0x10;
53 54
54 // Javascript functions. It is important to use these copies, as the ones on 55 // Javascript functions. It is important to use these copies, as the ones on
55 // the global object may have been overwritten. See "V8 Extras Design Doc", 56 // the global object may have been overwritten. See "V8 Extras Design Doc",
56 // section "Security Considerations". 57 // section "Security Considerations".
57 // https://docs.google.com/document/d/1AT5-T0aHGp7Lt29vPWFr2-qG8r3l9CByyvKwEuA 8Ec0/edit#heading=h.9yixony1a18r 58 // https://docs.google.com/document/d/1AT5-T0aHGp7Lt29vPWFr2-qG8r3l9CByyvKwEuA 8Ec0/edit#heading=h.9yixony1a18r
(...skipping 141 matching lines...) Expand 10 before | Expand all | Expand 10 after
199 } 200 }
200 201
201 function WritableStreamAbort(stream, reason) { 202 function WritableStreamAbort(stream, reason) {
202 const state = stream[_stateAndFlags] & STATE_MASK; 203 const state = stream[_stateAndFlags] & STATE_MASK;
203 if (state === CLOSED) { 204 if (state === CLOSED) {
204 return Promise_resolve(undefined); 205 return Promise_resolve(undefined);
205 } 206 }
206 if (state === ERRORED) { 207 if (state === ERRORED) {
207 return Promise_reject(stream[_storedError]); 208 return Promise_reject(stream[_storedError]);
208 } 209 }
209 TEMP_ASSERT(state === WRITABLE,
210 'state is "writable".');
211 const error = new TypeError(errStreamAborting); 210 const error = new TypeError(errStreamAborting);
212 if (stream[_pendingAbortRequest] !== undefined) { 211 if (stream[_pendingAbortRequest] !== undefined) {
213 return Promise_reject(error); 212 return Promise_reject(error);
214 } 213 }
215 214
216 const controller = stream[_writableStreamController]; 215 TEMP_ASSERT(state === WRITABLE || state === ERRORING,
217 TEMP_ASSERT(controller !== undefined, 216 '_state_ is `"writable"` or `"erroring"`');
218 'controller is not undefined'); 217
219 if (!WritableStreamHasOperationMarkedInFlight(stream) && 218 const wasAlreadyErroring = state === ERRORING;
220 controller[_started]) { 219 if (wasAlreadyErroring) {
221 WritableStreamFinishAbort(stream); 220 reason = undefined;
222 return WritableStreamDefaultControllerAbortSteps(controller, reason);
223 } 221 }
224 const writer = stream[_writer]; 222
225 if (writer !== undefined) { 223 const promise = v8.createPromise();
226 WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, error); 224 stream[_pendingAbortRequest] = {promise, reason, wasAlreadyErroring};
225
226 if (!wasAlreadyErroring) {
227 WritableStreamStartErroring(stream, error);
227 } 228 }
228 const promise = v8.createPromise();
229 stream[_pendingAbortRequest] = {promise, reason};
230 return promise; 229 return promise;
231 } 230 }
232 231
233 function WritableStreamError(stream, error) {
234 stream[_stateAndFlags] = (stream[_stateAndFlags] & ~STATE_MASK) | ERRORED;
235 stream[_storedError] = error;
236 WritableStreamDefaultControllerErrorSteps(stream[_writableStreamController]) ;
237 if (stream[_pendingAbortRequest] === undefined) {
238 const writer = stream[_writer];
239 if (writer !== undefined) {
240 WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, error);
241 }
242 }
243 if (!WritableStreamHasOperationMarkedInFlight(stream)) {
244 WritableStreamRejectPromisesInReactionToError(stream);
245 }
246 }
247
248 function WritableStreamFinishAbort(stream) {
249 const error = new TypeError(errStreamAborted);
250 WritableStreamError(stream, error);
251 }
252
253 // Writable Stream Abstract Operations Used by Controllers 232 // Writable Stream Abstract Operations Used by Controllers
254 233
255 function WritableStreamAddWriteRequest(stream) { 234 function WritableStreamAddWriteRequest(stream) {
256 TEMP_ASSERT(IsWritableStreamLocked(stream), 235 TEMP_ASSERT(IsWritableStreamLocked(stream),
257 '! IsWritableStreamLocked(writer) is true.'); 236 '! IsWritableStreamLocked(writer) is true.');
258 TEMP_ASSERT((stream[_stateAndFlags] & STATE_MASK) === WRITABLE, 237 TEMP_ASSERT((stream[_stateAndFlags] & STATE_MASK) === WRITABLE,
259 'stream.[[state]] is "writable".'); 238 'stream.[[state]] is "writable".');
260 const promise = v8.createPromise(); 239 const promise = v8.createPromise();
261 stream[_writeRequests].push(promise); 240 stream[_writeRequests].push(promise);
262 return promise; 241 return promise;
263 } 242 }
264 243
244 function WritableStreamDealWithRejection(stream, error) {
245 const state = stream[_stateAndFlags] & STATE_MASK;
246 if (state === WRITABLE) {
247 WritableStreamStartErroring(stream, error);
248 return;
249 }
250
251 TEMP_ASSERT(state === ERRORING, '_state_ is `"erroring"`');
252 WritableStreamFinishErroring(stream);
253 }
254
255 function WritableStreamStartErroring(stream, reason) {
256 TEMP_ASSERT(stream[_storedError] === undefined,
257 '_stream_.[[storedError]] is *undefined*');
258 TEMP_ASSERT((stream[_stateAndFlags] & STATE_MASK) === WRITABLE,
259 '_stream_.[[state]] is `"writable"`');
260
261 const controller = stream[_writableStreamController];
262 TEMP_ASSERT(controller !== undefined, '_controller_ is not *undefined*');
263
264 stream[_stateAndFlags] = (stream[_stateAndFlags] & ~STATE_MASK) | ERRORING;
265 stream[_storedError] = reason;
266
267 const writer = stream[_writer];
268 if (writer !== undefined) {
269 WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, reason);
270 }
271
272 if (!WritableStreamHasOperationMarkedInFlight(stream) &&
273 controller[_started]) {
274 WritableStreamFinishErroring(stream);
275 }
276 }
277
278 function WritableStreamFinishErroring(stream) {
279 TEMP_ASSERT((stream[_stateAndFlags] & STATE_MASK) === ERRORING,
280 '_stream_.[[state]] is `"erroring"`');
281 TEMP_ASSERT(
282 !WritableStreamHasOperationMarkedInFlight(stream),
283 '! WritableStreamHasOperationMarkedInFlight(_stream_) is *false*');
284
285 stream[_stateAndFlags] = (stream[_stateAndFlags] & ~STATE_MASK) | ERRORED;
286
287 WritableStreamDefaultControllerErrorSteps(
288 stream[_writableStreamController]);
289
290 const storedError = stream[_storedError];
291 rejectPromises(stream[_writeRequests], storedError);
292 stream[_writeRequests] = new binding.SimpleQueue();
293
294 if (stream[_pendingAbortRequest] === undefined) {
295 WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
296 return;
297 }
298
299 const abortRequest = stream[_pendingAbortRequest];
300 stream[_pendingAbortRequest] = undefined;
301
302 if (abortRequest.wasAlreadyErroring === true) {
303 v8.rejectPromise(abortRequest.promise, storedError);
304 WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
305 return;
306 }
307
308 const promise = WritableStreamDefaultControllerAbortSteps(
309 stream[_writableStreamController], abortRequest.reason);
310
311 thenPromise(
312 promise,
313 () => {
314 v8.resolvePromise(abortRequest.promise, undefined);
315 WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
316 },
317 reason => {
318 v8.rejectPromise(abortRequest.promise, reason);
319 WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
320 });
321 }
322
265 function WritableStreamFinishInFlightWrite(stream) { 323 function WritableStreamFinishInFlightWrite(stream) {
266 TEMP_ASSERT(stream[_inFlightWriteRequest] !== undefined, 324 TEMP_ASSERT(stream[_inFlightWriteRequest] !== undefined,
267 '_stream_.[[inFlightWriteRequest]] is not *undefined*.'); 325 '_stream_.[[inFlightWriteRequest]] is not *undefined*.');
268 v8.resolvePromise(stream[_inFlightWriteRequest], undefined); 326 v8.resolvePromise(stream[_inFlightWriteRequest], undefined);
269 stream[_inFlightWriteRequest] = undefined; 327 stream[_inFlightWriteRequest] = undefined;
270 const state = stream[_stateAndFlags] & STATE_MASK;
271 if (state === ERRORED) {
272 WritableStreamFinishInFlightWriteInErroredState(stream);
273 return;
274 }
275 TEMP_ASSERT(state === WRITABLE, '_state_ is `"writable"`.');
276 WritableStreamHandleAbortRequestIfPending(stream);
277 }
278
279 function WritableStreamFinishInFlightWriteInErroredState(stream) {
280 WritableStreamRejectAbortRequestIfPending(stream);
281 WritableStreamRejectPromisesInReactionToError(stream);
282 } 328 }
283 329
284 function WritableStreamFinishInFlightWriteWithError(stream, error) { 330 function WritableStreamFinishInFlightWriteWithError(stream, error) {
285 TEMP_ASSERT(stream[_inFlightWriteRequest] !== undefined, 331 TEMP_ASSERT(stream[_inFlightWriteRequest] !== undefined,
286 '_stream_.[[inFlightWriteRequest]] is not *undefined*.'); 332 '_stream_.[[inFlightWriteRequest]] is not *undefined*.');
287 v8.rejectPromise(stream[_inFlightWriteRequest], error); 333 v8.rejectPromise(stream[_inFlightWriteRequest], error);
288 stream[_inFlightWriteRequest] = undefined; 334 stream[_inFlightWriteRequest] = undefined;
289 const state = stream[_stateAndFlags] & STATE_MASK; 335
290 if (state === ERRORED) { 336 let state = stream[_stateAndFlags] & STATE_MASK;
291 WritableStreamFinishInFlightWriteInErroredState(stream); 337 TEMP_ASSERT(state === WRITABLE || state === ERRORING,
292 return; 338 '_stream_.[[state]] is `"writable"` or `"erroring"`');
293 } 339
294 TEMP_ASSERT(state === WRITABLE, '_state_ is `"writable"`.'); 340 WritableStreamDealWithRejection(stream, error);
295 WritableStreamError(stream, error);
296 WritableStreamRejectAbortRequestIfPending(stream);
297 } 341 }
298 342
299 function WritableStreamFinishInFlightClose(stream) { 343 function WritableStreamFinishInFlightClose(stream) {
300 TEMP_ASSERT(stream[_inFlightCloseRequest] !== undefined, 344 TEMP_ASSERT(stream[_inFlightCloseRequest] !== undefined,
301 '_stream_.[[inFlightCloseRequest]] is not *undefined*.'); 345 '_stream_.[[inFlightCloseRequest]] is not *undefined*.');
302 v8.resolvePromise(stream[_inFlightCloseRequest], undefined); 346 v8.resolvePromise(stream[_inFlightCloseRequest], undefined);
303 stream[_inFlightCloseRequest] = undefined; 347 stream[_inFlightCloseRequest] = undefined;
348
304 const state = stream[_stateAndFlags] & STATE_MASK; 349 const state = stream[_stateAndFlags] & STATE_MASK;
305 if (state === ERRORED) { 350 TEMP_ASSERT(state === WRITABLE || state === ERRORING,
306 WritableStreamFinishInFlightCloseInErroredState(stream); 351 '_stream_.[[state]] is `"writable"` or `"erroring"`');
307 return; 352
353 if (state === ERRORING) {
354 stream[_storedError] = undefined;
355 if (stream[_pendingAbortRequest] !== undefined) {
356 v8.resolvePromise(stream[_pendingAbortRequest].promise, undefined);
357 stream[_pendingAbortRequest] = undefined;
358 }
308 } 359 }
309 TEMP_ASSERT(state === WRITABLE, '_state_ is `"writable"`.'); 360
310 stream[_stateAndFlags] = (stream[_stateAndFlags] & ~STATE_MASK) | CLOSED; 361 stream[_stateAndFlags] = (stream[_stateAndFlags] & ~STATE_MASK) | CLOSED;
311 const writer = stream[_writer]; 362 const writer = stream[_writer];
312 if (writer !== undefined) { 363 if (writer !== undefined) {
313 v8.resolvePromise(writer[_closedPromise], undefined); 364 v8.resolvePromise(writer[_closedPromise], undefined);
314 } 365 }
315 if (stream[_pendingAbortRequest] !== undefined) {
316 v8.resolvePromise(stream[_pendingAbortRequest].promise, undefined);
317 stream[_pendingAbortRequest] = undefined;
318 }
319 }
320 366
321 function WritableStreamFinishInFlightCloseInErroredState(stream) { 367 TEMP_ASSERT(stream[_pendingAbortRequest] === undefined,
322 WritableStreamRejectAbortRequestIfPending(stream); 368 '_stream_.[[pendingAbortRequest]] is *undefined*');
323 WritableStreamRejectClosedPromiseInReactionToError(stream); 369 TEMP_ASSERT(stream[_storedError] === undefined,
370 '_stream_.[[storedError]] is *undefined*');
324 } 371 }
325 372
326 function WritableStreamFinishInFlightCloseWithError(stream, error) { 373 function WritableStreamFinishInFlightCloseWithError(stream, error) {
327 TEMP_ASSERT(stream[_inFlightCloseRequest] !== undefined, 374 TEMP_ASSERT(stream[_inFlightCloseRequest] !== undefined,
328 '_stream_.[[inFlightCloseRequest]] is not *undefined*.'); 375 '_stream_.[[inFlightCloseRequest]] is not *undefined*.');
329 v8.rejectPromise(stream[_inFlightCloseRequest], error); 376 v8.rejectPromise(stream[_inFlightCloseRequest], error);
330 stream[_inFlightCloseRequest] = undefined; 377 stream[_inFlightCloseRequest] = undefined;
378
331 const state = stream[_stateAndFlags] & STATE_MASK; 379 const state = stream[_stateAndFlags] & STATE_MASK;
332 if (state === ERRORED) { 380 TEMP_ASSERT(state === WRITABLE || state === ERRORING,
333 WritableStreamFinishInFlightCloseInErroredState(stream); 381 '_stream_.[[state]] is `"writable"` or `"erroring"`');
334 return; 382
383 if (stream[_pendingAbortRequest] !== undefined) {
384 v8.rejectPromise(stream[_pendingAbortRequest].promise, error);
385 stream[_pendingAbortRequest] = undefined;
335 } 386 }
336 TEMP_ASSERT(state === WRITABLE, '_state_ is `"writable"`.'); 387
337 WritableStreamError(stream, error); 388 WritableStreamDealWithRejection(stream, error);
338 WritableStreamRejectAbortRequestIfPending(stream);
339 } 389 }
340 390
341 function WritableStreamCloseQueuedOrInFlight(stream) { 391 function WritableStreamCloseQueuedOrInFlight(stream) {
342 return stream[_closeRequest] !== undefined || 392 return stream[_closeRequest] !== undefined ||
343 stream[_inFlightCloseRequest] !== undefined; 393 stream[_inFlightCloseRequest] !== undefined;
344 } 394 }
345 395
346 function WritableStreamHandleAbortRequestIfPending(stream) {
347 if (stream[_pendingAbortRequest] === undefined) {
348 return;
349 }
350 WritableStreamFinishAbort(stream);
351 const abortRequest = stream[_pendingAbortRequest];
352 stream[_pendingAbortRequest] = undefined;
353 const promise =
354 WritableStreamDefaultControllerAbortSteps(stream[_writableStreamControll er],
355 abortRequest.reason);
356 thenPromise(promise,
357 result => v8.resolvePromise(abortRequest.promise, result),
358 reason => v8.rejectPromise(abortRequest.promise, reason));
359 }
360
361 function WritableStreamHasOperationMarkedInFlight(stream) { 396 function WritableStreamHasOperationMarkedInFlight(stream) {
362 return stream[_inFlightWriteRequest] !== undefined || 397 return stream[_inFlightWriteRequest] !== undefined ||
363 stream[_inFlightCloseRequest] !== undefined; 398 stream[_inFlightCloseRequest] !== undefined;
364 } 399 }
365 400
366 function WritableStreamMarkCloseRequestInFlight(stream) { 401 function WritableStreamMarkCloseRequestInFlight(stream) {
367 TEMP_ASSERT(stream[_inFlightCloseRequest] === undefined, 402 TEMP_ASSERT(stream[_inFlightCloseRequest] === undefined,
368 '_stream_.[[inFlightCloseRequest]] is *undefined*.'); 403 '_stream_.[[inFlightCloseRequest]] is *undefined*.');
369 TEMP_ASSERT(stream[_closeRequest] !== undefined, 404 TEMP_ASSERT(stream[_closeRequest] !== undefined,
370 '_stream_.[[closeRequest]] is not *undefined*.'); 405 '_stream_.[[closeRequest]] is not *undefined*.');
371 stream[_inFlightCloseRequest] = stream[_closeRequest]; 406 stream[_inFlightCloseRequest] = stream[_closeRequest];
372 stream[_closeRequest] = undefined; 407 stream[_closeRequest] = undefined;
373 } 408 }
374 409
375 function WritableStreamMarkFirstWriteRequestInFlight(stream) { 410 function WritableStreamMarkFirstWriteRequestInFlight(stream) {
376 TEMP_ASSERT(stream[_inFlightWriteRequest] === undefined, 411 TEMP_ASSERT(stream[_inFlightWriteRequest] === undefined,
377 '_stream_.[[inFlightWriteRequest]] is *undefined*.'); 412 '_stream_.[[inFlightWriteRequest]] is *undefined*.');
378 TEMP_ASSERT(stream[_writeRequests].length !== 0, 413 TEMP_ASSERT(stream[_writeRequests].length !== 0,
379 '_stream_.[[writeRequests]] is not empty.'); 414 '_stream_.[[writeRequests]] is not empty.');
380 const writeRequest = stream[_writeRequests].shift(); 415 const writeRequest = stream[_writeRequests].shift();
381 stream[_inFlightWriteRequest] = writeRequest; 416 stream[_inFlightWriteRequest] = writeRequest;
382 } 417 }
383 418
384 function WritableStreamRejectClosedPromiseInReactionToError(stream) { 419 function WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream) {
420 TEMP_ASSERT((stream[_stateAndFlags] & STATE_MASK) === ERRORED,
421 '_stream_.[[state]] is `"errored"`');
422
423 if (stream[_closeRequest] !== undefined) {
424 TEMP_ASSERT(stream[_inFlightCloseRequest] === undefined,
425 '_stream_.[[inFlightCloseRequest]] is *undefined*');
426 v8.rejectPromise(stream[_closeRequest], stream[_storedError]);
427 stream[_closeRequest] = undefined;
428 }
429
385 const writer = stream[_writer]; 430 const writer = stream[_writer];
386 if (writer !== undefined) { 431 if (writer !== undefined) {
387 v8.rejectPromise(writer[_closedPromise], stream[_storedError]); 432 v8.rejectPromise(writer[_closedPromise], stream[_storedError]);
388 v8.markPromiseAsHandled(writer[_closedPromise]); 433 v8.markPromiseAsHandled(writer[_closedPromise]);
389 } 434 }
390 } 435 }
391 436
392 function WritableStreamRejectAbortRequestIfPending(stream) {
393 if (stream[_pendingAbortRequest] !== undefined) {
394 v8.rejectPromise(stream[_pendingAbortRequest].promise,
395 stream[_storedError]);
396 stream[_pendingAbortRequest] = undefined;
397 }
398 }
399
400 function WritableStreamRejectPromisesInReactionToError(stream) {
401 const storedError = stream[_storedError];
402 rejectPromises(stream[_writeRequests], storedError);
403 stream[_writeRequests] = new binding.SimpleQueue();
404
405 if (stream[_closeRequest] !== undefined) {
406 TEMP_ASSERT(stream[_inFlightCloseRequest] === undefined,
407 '_stream_.[[inFlightCloseRequest]] is *undefined*.');
408 v8.rejectPromise(stream[_closeRequest], storedError);
409 stream[_closeRequest] = undefined;
410 }
411
412 WritableStreamRejectClosedPromiseInReactionToError(stream);
413 }
414
415 function WritableStreamUpdateBackpressure(stream, backpressure) { 437 function WritableStreamUpdateBackpressure(stream, backpressure) {
416 TEMP_ASSERT((stream[_stateAndFlags] & STATE_MASK) === WRITABLE, 438 TEMP_ASSERT((stream[_stateAndFlags] & STATE_MASK) === WRITABLE,
417 'stream.[[state]] is "writable".'); 439 'stream.[[state]] is "writable".');
418 TEMP_ASSERT(!WritableStreamCloseQueuedOrInFlight(stream), 440 TEMP_ASSERT(!WritableStreamCloseQueuedOrInFlight(stream),
419 'WritableStreamCloseQueuedOrInFlight(_stream_) is *false*.'); 441 'WritableStreamCloseQueuedOrInFlight(_stream_) is *false*.');
420 const writer = stream[_writer]; 442 const writer = stream[_writer];
421 if (writer !== undefined && 443 if (writer !== undefined &&
422 backpressure !== Boolean(stream[_stateAndFlags] & BACKPRESSURE_FLAG)) { 444 backpressure !== Boolean(stream[_stateAndFlags] & BACKPRESSURE_FLAG)) {
423 if (backpressure) { 445 if (backpressure) {
424 writer[_readyPromise] = v8.createPromise(); 446 writer[_readyPromise] = v8.createPromise();
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
459 constructor(stream) { 481 constructor(stream) {
460 if (!IsWritableStream(stream)) { 482 if (!IsWritableStream(stream)) {
461 throw new TypeError(streamErrors.illegalConstructor); 483 throw new TypeError(streamErrors.illegalConstructor);
462 } 484 }
463 if (IsWritableStreamLocked(stream)) { 485 if (IsWritableStreamLocked(stream)) {
464 throw new TypeError(streamErrors.illegalConstructor); 486 throw new TypeError(streamErrors.illegalConstructor);
465 } 487 }
466 this[_ownerWritableStream] = stream; 488 this[_ownerWritableStream] = stream;
467 stream[_writer] = this; 489 stream[_writer] = this;
468 const state = stream[_stateAndFlags] & STATE_MASK; 490 const state = stream[_stateAndFlags] & STATE_MASK;
469 if (state === WRITABLE) { 491 switch (state) {
470 if (stream[_pendingAbortRequest] !== undefined) { 492 case WRITABLE:
domenic 2017/04/17 22:20:28 Nit: I personally like wrapping {}s around my case
Adam Rice 2017/04/18 02:55:45 Done. Looks better. I was concerned that this was
471 const error = new TypeError(errStreamAborting); 493 if (!WritableStreamCloseQueuedOrInFlight(stream) &&
472 this[_readyPromise] = Promise_reject(error);
473 v8.markPromiseAsHandled(this[_readyPromise]);
474 } else if (!WritableStreamCloseQueuedOrInFlight(stream) &&
475 stream[_stateAndFlags] & BACKPRESSURE_FLAG) { 494 stream[_stateAndFlags] & BACKPRESSURE_FLAG) {
476 this[_readyPromise] = v8.createPromise(); 495 this[_readyPromise] = v8.createPromise();
477 } else { 496 } else {
478 this[_readyPromise] = Promise_resolve(undefined); 497 this[_readyPromise] = Promise_resolve(undefined);
479 } 498 }
480 this[_closedPromise] = v8.createPromise(); 499 this[_closedPromise] = v8.createPromise();
481 } else if (state === CLOSED) { 500 break;
501
502 case ERRORING:
503 this[_readyPromise] = Promise_reject(stream[_storedError]);
504 v8.markPromiseAsHandled(this[_readyPromise]);
505 this[_closedPromise] = v8.createPromise();
506 break;
507
508 case CLOSED:
482 this[_readyPromise] = Promise_resolve(undefined); 509 this[_readyPromise] = Promise_resolve(undefined);
483 this[_closedPromise] = Promise_resolve(undefined); 510 this[_closedPromise] = Promise_resolve(undefined);
484 } else { 511 break;
512
513 default:
485 TEMP_ASSERT(state === ERRORED, '_state_ is `"errored"`.'); 514 TEMP_ASSERT(state === ERRORED, '_state_ is `"errored"`.');
486 const storedError = stream[_storedError]; 515 const storedError = stream[_storedError];
487 this[_readyPromise] = Promise_reject(storedError); 516 this[_readyPromise] = Promise_reject(storedError);
488 v8.markPromiseAsHandled(this[_readyPromise]); 517 v8.markPromiseAsHandled(this[_readyPromise]);
489 this[_closedPromise] = Promise_reject(storedError); 518 this[_closedPromise] = Promise_reject(storedError);
490 v8.markPromiseAsHandled(this[_closedPromise]); 519 v8.markPromiseAsHandled(this[_closedPromise]);
520 break;
491 } 521 }
492 } 522 }
493 523
494 get closed() { 524 get closed() {
495 if (!IsWritableStreamDefaultWriter(this)) { 525 if (!IsWritableStreamDefaultWriter(this)) {
496 return Promise_reject(new TypeError(streamErrors.illegalInvocation)); 526 return Promise_reject(new TypeError(streamErrors.illegalInvocation));
497 } 527 }
498 return this[_closedPromise]; 528 return this[_closedPromise];
499 } 529 }
500 530
(...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after
577 } 607 }
578 608
579 function WritableStreamDefaultWriterClose(writer) { 609 function WritableStreamDefaultWriterClose(writer) {
580 const stream = writer[_ownerWritableStream]; 610 const stream = writer[_ownerWritableStream];
581 TEMP_ASSERT(stream !== undefined, 'stream is not undefined.'); 611 TEMP_ASSERT(stream !== undefined, 'stream is not undefined.');
582 const state = stream[_stateAndFlags] & STATE_MASK; 612 const state = stream[_stateAndFlags] & STATE_MASK;
583 if (state === CLOSED || state === ERRORED) { 613 if (state === CLOSED || state === ERRORED) {
584 return Promise_reject( 614 return Promise_reject(
585 createCannotActionOnStateStreamError('close', state)); 615 createCannotActionOnStateStreamError('close', state));
586 } 616 }
587 if (stream[_pendingAbortRequest] !== undefined) { 617
588 return Promise_reject(new TypeError(errStreamAborting)); 618 TEMP_ASSERT(state === WRITABLE || state === ERRORING,
589 } 619 '_state_ is `"writable"` or `"erroring"`.');
590 TEMP_ASSERT(state === WRITABLE, '_state_ is `"writable"`.');
591 TEMP_ASSERT(!WritableStreamCloseQueuedOrInFlight(stream), 620 TEMP_ASSERT(!WritableStreamCloseQueuedOrInFlight(stream),
592 '! WritableStreamCloseQueuedOrInFlight(_stream_) is *false*.'); 621 '! WritableStreamCloseQueuedOrInFlight(_stream_) is *false*.');
593 const promise = v8.createPromise(); 622 const promise = v8.createPromise();
594 stream[_closeRequest] = promise; 623 stream[_closeRequest] = promise;
595 if (stream[_stateAndFlags] & BACKPRESSURE_FLAG) { 624
625 if ((stream[_stateAndFlags] & BACKPRESSURE_FLAG) &&
626 state === WRITABLE) {
596 v8.resolvePromise(writer[_readyPromise], undefined); 627 v8.resolvePromise(writer[_readyPromise], undefined);
597 } 628 }
598 WritableStreamDefaultControllerClose(stream[_writableStreamController]); 629 WritableStreamDefaultControllerClose(stream[_writableStreamController]);
599 return promise; 630 return promise;
600 } 631 }
601 632
602 function WritableStreamDefaultWriterCloseWithErrorPropagation(writer) { 633 function WritableStreamDefaultWriterCloseWithErrorPropagation(writer) {
603 const stream = writer[_ownerWritableStream]; 634 const stream = writer[_ownerWritableStream];
604 TEMP_ASSERT(stream !== undefined, 'stream is not undefined.'); 635 TEMP_ASSERT(stream !== undefined, 'stream is not undefined.');
605 const state = stream[_stateAndFlags] & STATE_MASK; 636 const state = stream[_stateAndFlags] & STATE_MASK;
606 if (WritableStreamCloseQueuedOrInFlight(stream) || state === CLOSED) { 637 if (WritableStreamCloseQueuedOrInFlight(stream) || state === CLOSED) {
607 return Promise_resolve(undefined); 638 return Promise_resolve(undefined);
608 } 639 }
609 if (state === ERRORED) { 640 if (state === ERRORED) {
610 return Promise_reject(stream[_storedError]); 641 return Promise_reject(stream[_storedError]);
611 } 642 }
612 TEMP_ASSERT(state === WRITABLE, 'state is "writable".'); 643
644 TEMP_ASSERT(state === WRITABLE || state === ERRORING,
645 '_state_ is `"writable"` or `"erroring"`.');
646
613 return WritableStreamDefaultWriterClose(writer); 647 return WritableStreamDefaultWriterClose(writer);
614 } 648 }
615 649
616 function WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, 650 function WritableStreamDefaultWriterEnsureClosedPromiseRejected(
617 error) { 651 writer, error) {
652 if (v8.promiseState(writer[_closedPromise]) === v8.kPROMISE_PENDING) {
653 v8.rejectPromise(writer[_closedPromise], error);
654 } else {
655 writer[_closedPromise] = Promise_reject(error);
656 }
657 v8.markPromiseAsHandled(writer[_closedPromise]);
658 }
659
660
661 function WritableStreamDefaultWriterEnsureReadyPromiseRejected(
662 writer, error) {
618 if (v8.promiseState(writer[_readyPromise]) === v8.kPROMISE_PENDING) { 663 if (v8.promiseState(writer[_readyPromise]) === v8.kPROMISE_PENDING) {
619 v8.rejectPromise(writer[_readyPromise], error); 664 v8.rejectPromise(writer[_readyPromise], error);
620 } else { 665 } else {
621 writer[_readyPromise] = Promise_reject(error); 666 writer[_readyPromise] = Promise_reject(error);
622 } 667 }
623 v8.markPromiseAsHandled(writer[_readyPromise]); 668 v8.markPromiseAsHandled(writer[_readyPromise]);
624 } 669 }
625 670
626 function WritableStreamDefaultWriterGetDesiredSize(writer) { 671 function WritableStreamDefaultWriterGetDesiredSize(writer) {
627 const stream = writer[_ownerWritableStream]; 672 const stream = writer[_ownerWritableStream];
628 const state = stream[_stateAndFlags] & STATE_MASK; 673 const state = stream[_stateAndFlags] & STATE_MASK;
629 if (state === ERRORED || stream[_pendingAbortRequest] !== undefined) { 674 if (state === ERRORED || state === ERRORING) {
630 return null; 675 return null;
631 } 676 }
632 if (state === CLOSED) { 677 if (state === CLOSED) {
633 return 0; 678 return 0;
634 } 679 }
635 return WritableStreamDefaultControllerGetDesiredSize( 680 return WritableStreamDefaultControllerGetDesiredSize(
636 stream[_writableStreamController]); 681 stream[_writableStreamController]);
637 } 682 }
638 683
639 function WritableStreamDefaultWriterRelease(writer) { 684 function WritableStreamDefaultWriterRelease(writer) {
640 const stream = writer[_ownerWritableStream]; 685 const stream = writer[_ownerWritableStream];
641 TEMP_ASSERT(stream !== undefined, 686 TEMP_ASSERT(stream !== undefined,
642 'stream is not undefined.'); 687 'stream is not undefined.');
643 TEMP_ASSERT(stream[_writer] === writer, 688 TEMP_ASSERT(stream[_writer] === writer,
644 'stream.[[writer]] is writer.'); 689 'stream.[[writer]] is writer.');
645 const releasedError = new TypeError(errReleasedWriterClosedPromise); 690 const releasedError = new TypeError(errReleasedWriterClosedPromise);
646 const state = stream[_stateAndFlags] & STATE_MASK; 691 WritableStreamDefaultWriterEnsureReadyPromiseRejected(
647 WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, 692 writer, releasedError);
648 releasedError); 693 WritableStreamDefaultWriterEnsureClosedPromiseRejected(
649 if (state === WRITABLE || 694 writer, releasedError);
650 WritableStreamHasOperationMarkedInFlight(stream)) {
651 v8.rejectPromise(writer[_closedPromise], releasedError);
652 } else {
653 writer[_closedPromise] = Promise_reject(releasedError);
654 }
655 v8.markPromiseAsHandled(writer[_closedPromise]);
656 stream[_writer] = undefined; 695 stream[_writer] = undefined;
657 writer[_ownerWritableStream] = undefined; 696 writer[_ownerWritableStream] = undefined;
658 } 697 }
659 698
660 function WritableStreamDefaultWriterWrite(writer, chunk) { 699 function WritableStreamDefaultWriterWrite(writer, chunk) {
661 const stream = writer[_ownerWritableStream]; 700 const stream = writer[_ownerWritableStream];
662 TEMP_ASSERT(stream !== undefined, 'stream is not undefined.'); 701 TEMP_ASSERT(stream !== undefined, 'stream is not undefined.');
663 const controller = stream[_writableStreamController]; 702 const controller = stream[_writableStreamController];
664 const chunkSize = 703 const chunkSize =
665 WritableStreamDefaultControllerGetChunkSize(controller, chunk); 704 WritableStreamDefaultControllerGetChunkSize(controller, chunk);
666 if (stream !== writer[_ownerWritableStream]) { 705 if (stream !== writer[_ownerWritableStream]) {
667 return Promise_reject(createWriterLockReleasedError(verbWrittenTo)); 706 return Promise_reject(createWriterLockReleasedError(verbWrittenTo));
668 } 707 }
669 const state = stream[_stateAndFlags] & STATE_MASK; 708 const state = stream[_stateAndFlags] & STATE_MASK;
670 if (state === ERRORED) { 709 if (state === ERRORED) {
671 return Promise_reject(stream[_storedError]); 710 return Promise_reject(stream[_storedError]);
672 } 711 }
673 if (WritableStreamCloseQueuedOrInFlight(stream)) { 712 if (WritableStreamCloseQueuedOrInFlight(stream)) {
674 return Promise_reject(new TypeError( 713 return Promise_reject(new TypeError(
675 templateErrorCannotActionOnStateStream('write to', 'closing'))); 714 templateErrorCannotActionOnStateStream('write to', 'closing')));
676 } 715 }
677 if (state === CLOSED) { 716 if (state === CLOSED) {
678 return Promise_reject( 717 return Promise_reject(
679 createCannotActionOnStateStreamError('write to', CLOSED)); 718 createCannotActionOnStateStreamError('write to', CLOSED));
680 } 719 }
681 if (stream[_pendingAbortRequest] !== undefined) { 720 if (state === ERRORING) {
682 return Promise_reject(new TypeError(errStreamAborting)); 721 return Promise_reject(stream[_storedError]);
683 } 722 }
723 TEMP_ASSERT(state === WRITABLE, '_state_ is `"writable"`');
684 const promise = WritableStreamAddWriteRequest(stream); 724 const promise = WritableStreamAddWriteRequest(stream);
685 WritableStreamDefaultControllerWrite(controller, chunk, chunkSize); 725 WritableStreamDefaultControllerWrite(controller, chunk, chunkSize);
686 return promise; 726 return promise;
687 } 727 }
688 728
689 // Functions to expose internals for ReadableStream.pipeTo. These do not 729 // Functions to expose internals for ReadableStream.pipeTo. These do not
690 // appear in the standard. 730 // appear in the standard.
691 function getWritableStreamDefaultWriterClosedPromise(writer) { 731 function getWritableStreamDefaultWriterClosedPromise(writer) {
692 TEMP_ASSERT( 732 TEMP_ASSERT(
693 IsWritableStreamDefaultWriter(writer), 733 IsWritableStreamDefaultWriter(writer),
(...skipping 29 matching lines...) Expand all
723 this[_strategySize] = normalizedStrategy.size; 763 this[_strategySize] = normalizedStrategy.size;
724 this[_strategyHWM] = normalizedStrategy.highWaterMark; 764 this[_strategyHWM] = normalizedStrategy.highWaterMark;
725 const backpressure = WritableStreamDefaultControllerGetBackpressure(this); 765 const backpressure = WritableStreamDefaultControllerGetBackpressure(this);
726 WritableStreamUpdateBackpressure(stream, backpressure); 766 WritableStreamUpdateBackpressure(stream, backpressure);
727 } 767 }
728 768
729 error(e) { 769 error(e) {
730 if (!IsWritableStreamDefaultController(this)) { 770 if (!IsWritableStreamDefaultController(this)) {
731 throw new TypeError(streamErrors.illegalInvocation); 771 throw new TypeError(streamErrors.illegalInvocation);
732 } 772 }
733 const state = this[_controlledWritableStream][_stateAndFlags] & STATE_MASK ; 773 const state =
734 if (state === CLOSED || state === ERRORED) { 774 this[_controlledWritableStream][_stateAndFlags] & STATE_MASK;
735 throw createCannotActionOnStateStreamError('error', state); 775 if (state !== WRITABLE) {
776 return;
736 } 777 }
737 WritableStreamDefaultControllerError(this, e); 778 WritableStreamDefaultControllerError(this, e);
738 } 779 }
739 } 780 }
740 781
741 // Writable Stream Default Controller Internal Methods 782 // Writable Stream Default Controller Internal Methods
742 783
743 // TODO(ricea): Virtual dispatch via V8 Private Symbols seems to be difficult 784 // TODO(ricea): Virtual dispatch via V8 Private Symbols seems to be difficult
744 // or impossible, so use static dispatch for now. This will have to be fixed 785 // or impossible, so use static dispatch for now. This will have to be fixed
745 // when adding a byte controller. 786 // when adding a byte controller.
746 function WritableStreamDefaultControllerAbortSteps(controller, reason) { 787 function WritableStreamDefaultControllerAbortSteps(controller, reason) {
747 const sinkAbortPromise = 788 return PromiseInvokeOrNoop(controller[_underlyingSink], 'abort', [reason]);
748 PromiseInvokeOrNoop(controller[_underlyingSink], 'abort', [reason]);
749 return thenPromise(sinkAbortPromise, () => undefined);
750 } 789 }
751 790
752 function WritableStreamDefaultControllerErrorSteps(controller) { 791 function WritableStreamDefaultControllerErrorSteps(controller) {
753 ResetQueue(controller); 792 ResetQueue(controller);
754 } 793 }
755 794
756 function WritableStreamDefaultControllerStartSteps(controller) { 795 function WritableStreamDefaultControllerStartSteps(controller) {
757 const startResult = 796 const startResult =
758 InvokeOrNoop(controller[_underlyingSink], 'start', [controller]); 797 InvokeOrNoop(controller[_underlyingSink], 'start', [controller]);
759 const stream = controller[_controlledWritableStream]; 798 const stream = controller[_controlledWritableStream];
760 const startPromise = Promise_resolve(startResult); 799 const startPromise = Promise_resolve(startResult);
761 thenPromise( 800 thenPromise(
762 startPromise, 801 startPromise,
763 () => { 802 () => {
803 const state = stream[_stateAndFlags] & STATE_MASK;
804 TEMP_ASSERT(state === WRITABLE || state === ERRORING,
805 '_stream_.[[state]] is `"writable"` or `"erroring"`');
764 controller[_started] = true; 806 controller[_started] = true;
765 if ((stream[_stateAndFlags] & STATE_MASK) === ERRORED) {
766 WritableStreamRejectAbortRequestIfPending(stream);
767 } else {
768 WritableStreamHandleAbortRequestIfPending(stream);
769 }
770 WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller); 807 WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
771 }, 808 },
772 r => { 809 r => {
773 TEMP_ASSERT( 810 const state = stream[_stateAndFlags] & STATE_MASK;
774 (stream[_stateAndFlags] & STATE_MASK) === WRITABLE || 811 TEMP_ASSERT(state === WRITABLE || state === ERRORING,
775 (stream[_stateAndFlags] & STATE_MASK) === ERRORED, 812 '_stream_.[[state]] is `"writable"` or `"erroring"`');
776 '_stream_.[[state]] is `"writable"` or `"errored"`.'); 813 controller[_started] = true;
777 WritableStreamDefaultControllerErrorIfNeeded(controller, r); 814 WritableStreamDealWithRejection(stream, r);
778 WritableStreamRejectAbortRequestIfPending(stream);
779 }); 815 });
780 } 816 }
781 817
782 // Writable Stream Default Controller Abstract Operations 818 // Writable Stream Default Controller Abstract Operations
783 819
784 function IsWritableStreamDefaultController(x) { 820 function IsWritableStreamDefaultController(x) {
785 return hasOwnProperty(x, _underlyingSink); 821 return hasOwnProperty(x, _underlyingSink);
786 } 822 }
787 823
788 function WritableStreamDefaultControllerClose(controller) { 824 function WritableStreamDefaultControllerClose(controller) {
(...skipping 22 matching lines...) Expand all
811 847
812 function WritableStreamDefaultControllerWrite(controller, chunk, chunkSize) { 848 function WritableStreamDefaultControllerWrite(controller, chunk, chunkSize) {
813 const writeRecord = {chunk}; 849 const writeRecord = {chunk};
814 try { 850 try {
815 EnqueueValueWithSize(controller, writeRecord, chunkSize); 851 EnqueueValueWithSize(controller, writeRecord, chunkSize);
816 } catch (e) { 852 } catch (e) {
817 WritableStreamDefaultControllerErrorIfNeeded(controller, e); 853 WritableStreamDefaultControllerErrorIfNeeded(controller, e);
818 return; 854 return;
819 } 855 }
820 const stream = controller[_controlledWritableStream]; 856 const stream = controller[_controlledWritableStream];
821 if (!WritableStreamCloseQueuedOrInFlight(stream)) { 857 if (!WritableStreamCloseQueuedOrInFlight(stream) &&
858 (stream[_stateAndFlags] & STATE_MASK) === WRITABLE) {
822 const backpressure = 859 const backpressure =
823 WritableStreamDefaultControllerGetBackpressure(controller); 860 WritableStreamDefaultControllerGetBackpressure(controller);
824 WritableStreamUpdateBackpressure(stream, backpressure); 861 WritableStreamUpdateBackpressure(stream, backpressure);
825 } 862 }
826 WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller); 863 WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
827 } 864 }
828 865
829 function WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller) { 866 function WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller) {
830 const stream = controller[_controlledWritableStream]; 867 const stream = controller[_controlledWritableStream];
831 const state = stream[_stateAndFlags] & STATE_MASK;
832 if (state === CLOSED || state === ERRORED) {
833 return;
834 }
835 if (!controller[_started]) { 868 if (!controller[_started]) {
836 return; 869 return;
837 } 870 }
838 if (stream[_inFlightWriteRequest] !== undefined) { 871 if (stream[_inFlightWriteRequest] !== undefined) {
839 return; 872 return;
840 } 873 }
874 const state = stream[_stateAndFlags] & STATE_MASK;
875 if (state === CLOSED || state === ERRORED) {
876 return;
877 }
878 if (state === ERRORING) {
879 WritableStreamFinishErroring(stream);
880 return;
881 }
841 if (controller[_queue].length === 0) { 882 if (controller[_queue].length === 0) {
842 return; 883 return;
843 } 884 }
844 const writeRecord = PeekQueueValue(controller); 885 const writeRecord = PeekQueueValue(controller);
845 if (writeRecord === 'close') { 886 if (writeRecord === 'close') {
846 WritableStreamDefaultControllerProcessClose(controller); 887 WritableStreamDefaultControllerProcessClose(controller);
847 } else { 888 } else {
848 WritableStreamDefaultControllerProcessWrite(controller, 889 WritableStreamDefaultControllerProcessWrite(controller,
849 writeRecord.chunk); 890 writeRecord.chunk);
850 } 891 }
851 } 892 }
852 893
853 function WritableStreamDefaultControllerErrorIfNeeded(controller, error) { 894 function WritableStreamDefaultControllerErrorIfNeeded(controller, error) {
854 const state = controller[_controlledWritableStream][_stateAndFlags] & STATE_ MASK; 895 const state =
896 controller[_controlledWritableStream][_stateAndFlags] & STATE_MASK;
855 if (state === WRITABLE) { 897 if (state === WRITABLE) {
856 WritableStreamDefaultControllerError(controller, error); 898 WritableStreamDefaultControllerError(controller, error);
857 } 899 }
858 } 900 }
859 901
860 function WritableStreamDefaultControllerProcessClose(controller) { 902 function WritableStreamDefaultControllerProcessClose(controller) {
861 const stream = controller[_controlledWritableStream]; 903 const stream = controller[_controlledWritableStream];
862 WritableStreamMarkCloseRequestInFlight(stream); 904 WritableStreamMarkCloseRequestInFlight(stream);
863 DequeueValue(controller); 905 DequeueValue(controller);
864 TEMP_ASSERT(controller[_queue].length === 0, 906 TEMP_ASSERT(controller[_queue].length === 0,
865 'controller.[[queue]] is empty.'); 907 'controller.[[queue]] is empty.');
866 const sinkClosePromise = PromiseInvokeOrNoop(controller[_underlyingSink], 908 const sinkClosePromise = PromiseInvokeOrNoop(
867 'close', [controller]); 909 controller[_underlyingSink], 'close', []);
868 thenPromise( 910 thenPromise(
869 sinkClosePromise, 911 sinkClosePromise,
870 () => WritableStreamFinishInFlightClose(stream), 912 () => WritableStreamFinishInFlightClose(stream),
871 reason => WritableStreamFinishInFlightCloseWithError(stream, reason) 913 reason => WritableStreamFinishInFlightCloseWithError(stream, reason)
872 ); 914 );
873 } 915 }
874 916
875 function WritableStreamDefaultControllerProcessWrite(controller, chunk) { 917 function WritableStreamDefaultControllerProcessWrite(controller, chunk) {
876 const stream = controller[_controlledWritableStream]; 918 const stream = controller[_controlledWritableStream];
877 WritableStreamMarkFirstWriteRequestInFlight(stream); 919 WritableStreamMarkFirstWriteRequestInFlight(stream);
878 const sinkWritePromise = PromiseInvokeOrNoop(controller[_underlyingSink], 920 const sinkWritePromise = PromiseInvokeOrNoop(controller[_underlyingSink],
879 'write', [chunk, controller]); 921 'write', [chunk, controller]);
880 thenPromise( 922 thenPromise(
881 sinkWritePromise, 923 sinkWritePromise,
882 () => { 924 () => {
883 WritableStreamFinishInFlightWrite(stream); 925 WritableStreamFinishInFlightWrite(stream);
884 const state = stream[_stateAndFlags] & STATE_MASK; 926 const state = stream[_stateAndFlags] & STATE_MASK;
885 if (state === ERRORED) { 927 TEMP_ASSERT(state === WRITABLE || state === ERRORING,
886 return; 928 '_state_ is `"writable"` or `"erroring"`');
887 }
888 TEMP_ASSERT(state === WRITABLE, '_state_ is `"writable"`.');
889 DequeueValue(controller); 929 DequeueValue(controller);
890 if (!WritableStreamCloseQueuedOrInFlight(stream)) { 930 if (!WritableStreamCloseQueuedOrInFlight(stream) &&
931 state === WRITABLE) {
891 const backpressure = 932 const backpressure =
892 WritableStreamDefaultControllerGetBackpressure(controller); 933 WritableStreamDefaultControllerGetBackpressure(controller);
893 WritableStreamUpdateBackpressure(stream, backpressure); 934 WritableStreamUpdateBackpressure(stream, backpressure);
894 } 935 }
895 WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller); 936 WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
896 }, 937 },
897 reason => { 938 reason => {
898 const wasErrored = (stream[_stateAndFlags] & STATE_MASK) === ERRORED;
899 WritableStreamFinishInFlightWriteWithError(stream, reason); 939 WritableStreamFinishInFlightWriteWithError(stream, reason);
900 TEMP_ASSERT((stream[_stateAndFlags] & STATE_MASK) === ERRORED,
901 '_stream_.[[state]] is `"errored"`.');
902 if (!wasErrored) {
903 ResetQueue(controller);
904 }
905 }); 940 });
906 } 941 }
907 942
908 function WritableStreamDefaultControllerGetBackpressure(controller) { 943 function WritableStreamDefaultControllerGetBackpressure(controller) {
909 const desiredSize = 944 const desiredSize =
910 WritableStreamDefaultControllerGetDesiredSize(controller); 945 WritableStreamDefaultControllerGetDesiredSize(controller);
911 return desiredSize <= 0; 946 return desiredSize <= 0;
912 } 947 }
913 948
914 function WritableStreamDefaultControllerError(controller, error) { 949 function WritableStreamDefaultControllerError(controller, error) {
915 const stream = controller[_controlledWritableStream]; 950 const stream = controller[_controlledWritableStream];
916 TEMP_ASSERT((stream[_stateAndFlags] & STATE_MASK) === WRITABLE, 951 TEMP_ASSERT((stream[_stateAndFlags] & STATE_MASK) === WRITABLE,
917 '_stream_.[[state]] is `"writable"`.'); 952 '_stream_.[[state]] is `"writable"`.');
918 WritableStreamError(stream, error); 953 WritableStreamStartErroring(stream, error);
919 } 954 }
920 955
921 // Queue-with-Sizes Operations 956 // Queue-with-Sizes Operations
922 // 957 //
923 // TODO(ricea): Share these operations with ReadableStream.js. 958 // TODO(ricea): Share these operations with ReadableStream.js.
924 function DequeueValue(container) { 959 function DequeueValue(container) {
925 TEMP_ASSERT( 960 TEMP_ASSERT(
926 hasOwnProperty(container, _queue) && 961 hasOwnProperty(container, _queue) &&
927 hasOwnProperty(container, _queueTotalSize), 962 hasOwnProperty(container, _queueTotalSize),
928 'Assert: _container_ has [[queue]] and [[queueTotalSize]] internal ' + 963 'Assert: _container_ has [[queue]] and [[queueTotalSize]] internal ' +
(...skipping 124 matching lines...) Expand 10 before | Expand all | Expand 10 after
1053 getWritableStreamDefaultWriterClosedPromise; 1088 getWritableStreamDefaultWriterClosedPromise;
1054 binding.WritableStreamDefaultWriterGetDesiredSize = 1089 binding.WritableStreamDefaultWriterGetDesiredSize =
1055 WritableStreamDefaultWriterGetDesiredSize; 1090 WritableStreamDefaultWriterGetDesiredSize;
1056 binding.getWritableStreamDefaultWriterReadyPromise = 1091 binding.getWritableStreamDefaultWriterReadyPromise =
1057 getWritableStreamDefaultWriterReadyPromise; 1092 getWritableStreamDefaultWriterReadyPromise;
1058 binding.WritableStreamDefaultWriterRelease = 1093 binding.WritableStreamDefaultWriterRelease =
1059 WritableStreamDefaultWriterRelease; 1094 WritableStreamDefaultWriterRelease;
1060 binding.WritableStreamDefaultWriterWrite = WritableStreamDefaultWriterWrite; 1095 binding.WritableStreamDefaultWriterWrite = WritableStreamDefaultWriterWrite;
1061 binding.getWritableStreamStoredError = getWritableStreamStoredError; 1096 binding.getWritableStreamStoredError = getWritableStreamStoredError;
1062 }); 1097 });
OLDNEW
« no previous file with comments | « third_party/WebKit/LayoutTests/external/wpt/streams/writable-streams/properties.serviceworker.https-expected.txt ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698