Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(75)

Side by Side Diff: third_party/WebKit/Source/core/streams/WritableStream.js

Issue 2823563002: Unified error handling for WritableStream (Closed)
Patch Set: remove local copy of piping/multiple-propagation test as it is fail Created 3 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « third_party/WebKit/LayoutTests/http/tests/streams/piping/multiple-propagation.https-expected.txt ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 // Implementation of WritableStream for Blink. See 5 // Implementation of WritableStream for Blink. See
6 // https://streams.spec.whatwg.org/#ws. The implementation closely follows the 6 // https://streams.spec.whatwg.org/#ws. The implementation closely follows the
7 // standard, except where required for performance or integration with Blink. In 7 // standard, except where required for performance or integration with Blink. In
8 // particular, classes, methods and abstract operations are implemented in the 8 // particular, classes, methods and abstract operations are implemented in the
9 // same order as in the standard, to simplify side-by-side reading. 9 // same order as in the standard, to simplify side-by-side reading.
10 10
(...skipping 23 matching lines...) Expand all
34 const _readyPromise = v8.createPrivateSymbol('[[readyPromise]]'); 34 const _readyPromise = v8.createPrivateSymbol('[[readyPromise]]');
35 const _controlledWritableStream = 35 const _controlledWritableStream =
36 v8.createPrivateSymbol('[[controlledWritableStream]]'); 36 v8.createPrivateSymbol('[[controlledWritableStream]]');
37 const _queue = v8.createPrivateSymbol('[[queue]]'); 37 const _queue = v8.createPrivateSymbol('[[queue]]');
38 const _queueTotalSize = v8.createPrivateSymbol('[[queueTotalSize]]'); 38 const _queueTotalSize = v8.createPrivateSymbol('[[queueTotalSize]]');
39 const _started = v8.createPrivateSymbol('[[started]]'); 39 const _started = v8.createPrivateSymbol('[[started]]');
40 const _strategyHWM = v8.createPrivateSymbol('[[strategyHWM]]'); 40 const _strategyHWM = v8.createPrivateSymbol('[[strategyHWM]]');
41 const _strategySize = v8.createPrivateSymbol('[[strategySize]]'); 41 const _strategySize = v8.createPrivateSymbol('[[strategySize]]');
42 const _underlyingSink = v8.createPrivateSymbol('[[underlyingSink]]'); 42 const _underlyingSink = v8.createPrivateSymbol('[[underlyingSink]]');
43 43
44 // Numeric encodings of states 44 // Numeric encodings of stream states. Stored in the _stateAndFlags slot.
45 const WRITABLE = 0; 45 const WRITABLE = 0;
46 const CLOSED = 1; 46 const CLOSED = 1;
47 const ERRORED = 2; 47 const ERRORING = 2;
48 const ERRORED = 3;
48 49
49 // Mask to extract or assign states to _stateAndFlags 50 // Mask to extract or assign states to _stateAndFlags.
50 const STATE_MASK = 0xF; 51 const STATE_MASK = 0xF;
51 52
53 // Also stored in _stateAndFlags.
52 const BACKPRESSURE_FLAG = 0x10; 54 const BACKPRESSURE_FLAG = 0x10;
53 55
54 // Javascript functions. It is important to use these copies, as the ones on 56 // Javascript functions. It is important to use these copies, as the ones on
55 // the global object may have been overwritten. See "V8 Extras Design Doc", 57 // the global object may have been overwritten. See "V8 Extras Design Doc",
56 // section "Security Considerations". 58 // section "Security Considerations".
57 // https://docs.google.com/document/d/1AT5-T0aHGp7Lt29vPWFr2-qG8r3l9CByyvKwEuA 8Ec0/edit#heading=h.9yixony1a18r 59 // https://docs.google.com/document/d/1AT5-T0aHGp7Lt29vPWFr2-qG8r3l9CByyvKwEuA 8Ec0/edit#heading=h.9yixony1a18r
58 const undefined = global.undefined; 60 const undefined = global.undefined;
59 61
60 const defineProperty = global.Object.defineProperty; 62 const defineProperty = global.Object.defineProperty;
61 const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty); 63 const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty);
(...skipping 137 matching lines...) Expand 10 before | Expand all | Expand 10 after
199 } 201 }
200 202
201 function WritableStreamAbort(stream, reason) { 203 function WritableStreamAbort(stream, reason) {
202 const state = stream[_stateAndFlags] & STATE_MASK; 204 const state = stream[_stateAndFlags] & STATE_MASK;
203 if (state === CLOSED) { 205 if (state === CLOSED) {
204 return Promise_resolve(undefined); 206 return Promise_resolve(undefined);
205 } 207 }
206 if (state === ERRORED) { 208 if (state === ERRORED) {
207 return Promise_reject(stream[_storedError]); 209 return Promise_reject(stream[_storedError]);
208 } 210 }
209 TEMP_ASSERT(state === WRITABLE,
210 'state is "writable".');
211 const error = new TypeError(errStreamAborting); 211 const error = new TypeError(errStreamAborting);
212 if (stream[_pendingAbortRequest] !== undefined) { 212 if (stream[_pendingAbortRequest] !== undefined) {
213 return Promise_reject(error); 213 return Promise_reject(error);
214 } 214 }
215 215
216 const controller = stream[_writableStreamController]; 216 TEMP_ASSERT(state === WRITABLE || state === ERRORING,
217 TEMP_ASSERT(controller !== undefined, 217 '_state_ is `"writable"` or `"erroring"`');
218 'controller is not undefined'); 218
219 if (!WritableStreamHasOperationMarkedInFlight(stream) && 219 const wasAlreadyErroring = state === ERRORING;
220 controller[_started]) { 220 if (wasAlreadyErroring) {
221 WritableStreamFinishAbort(stream); 221 reason = undefined;
222 return WritableStreamDefaultControllerAbortSteps(controller, reason);
223 } 222 }
224 const writer = stream[_writer]; 223
225 if (writer !== undefined) { 224 const promise = v8.createPromise();
226 WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, error); 225 stream[_pendingAbortRequest] = {promise, reason, wasAlreadyErroring};
226
227 if (!wasAlreadyErroring) {
228 WritableStreamStartErroring(stream, error);
227 } 229 }
228 const promise = v8.createPromise();
229 stream[_pendingAbortRequest] = {promise, reason};
230 return promise; 230 return promise;
231 } 231 }
232 232
233 function WritableStreamError(stream, error) {
234 stream[_stateAndFlags] = (stream[_stateAndFlags] & ~STATE_MASK) | ERRORED;
235 stream[_storedError] = error;
236 WritableStreamDefaultControllerErrorSteps(stream[_writableStreamController]) ;
237 if (stream[_pendingAbortRequest] === undefined) {
238 const writer = stream[_writer];
239 if (writer !== undefined) {
240 WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, error);
241 }
242 }
243 if (!WritableStreamHasOperationMarkedInFlight(stream)) {
244 WritableStreamRejectPromisesInReactionToError(stream);
245 }
246 }
247
248 function WritableStreamFinishAbort(stream) {
249 const error = new TypeError(errStreamAborted);
250 WritableStreamError(stream, error);
251 }
252
253 // Writable Stream Abstract Operations Used by Controllers 233 // Writable Stream Abstract Operations Used by Controllers
254 234
255 function WritableStreamAddWriteRequest(stream) { 235 function WritableStreamAddWriteRequest(stream) {
256 TEMP_ASSERT(IsWritableStreamLocked(stream), 236 TEMP_ASSERT(IsWritableStreamLocked(stream),
257 '! IsWritableStreamLocked(writer) is true.'); 237 '! IsWritableStreamLocked(writer) is true.');
258 TEMP_ASSERT((stream[_stateAndFlags] & STATE_MASK) === WRITABLE, 238 TEMP_ASSERT((stream[_stateAndFlags] & STATE_MASK) === WRITABLE,
259 'stream.[[state]] is "writable".'); 239 'stream.[[state]] is "writable".');
260 const promise = v8.createPromise(); 240 const promise = v8.createPromise();
261 stream[_writeRequests].push(promise); 241 stream[_writeRequests].push(promise);
262 return promise; 242 return promise;
263 } 243 }
264 244
245 function WritableStreamDealWithRejection(stream, error) {
246 const state = stream[_stateAndFlags] & STATE_MASK;
247 if (state === WRITABLE) {
248 WritableStreamStartErroring(stream, error);
249 return;
250 }
251
252 TEMP_ASSERT(state === ERRORING, '_state_ is `"erroring"`');
253 WritableStreamFinishErroring(stream);
254 }
255
256 function WritableStreamStartErroring(stream, reason) {
257 TEMP_ASSERT(stream[_storedError] === undefined,
258 '_stream_.[[storedError]] is *undefined*');
259 TEMP_ASSERT((stream[_stateAndFlags] & STATE_MASK) === WRITABLE,
260 '_stream_.[[state]] is `"writable"`');
261
262 const controller = stream[_writableStreamController];
263 TEMP_ASSERT(controller !== undefined, '_controller_ is not *undefined*');
264
265 stream[_stateAndFlags] = (stream[_stateAndFlags] & ~STATE_MASK) | ERRORING;
266 stream[_storedError] = reason;
267
268 const writer = stream[_writer];
269 if (writer !== undefined) {
270 WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, reason);
271 }
272
273 if (!WritableStreamHasOperationMarkedInFlight(stream) &&
274 controller[_started]) {
275 WritableStreamFinishErroring(stream);
276 }
277 }
278
279 function WritableStreamFinishErroring(stream) {
280 TEMP_ASSERT((stream[_stateAndFlags] & STATE_MASK) === ERRORING,
281 '_stream_.[[state]] is `"erroring"`');
282 TEMP_ASSERT(
283 !WritableStreamHasOperationMarkedInFlight(stream),
284 '! WritableStreamHasOperationMarkedInFlight(_stream_) is *false*');
285
286 stream[_stateAndFlags] = (stream[_stateAndFlags] & ~STATE_MASK) | ERRORED;
287
288 WritableStreamDefaultControllerErrorSteps(
289 stream[_writableStreamController]);
290
291 const storedError = stream[_storedError];
292 rejectPromises(stream[_writeRequests], storedError);
293 stream[_writeRequests] = new binding.SimpleQueue();
294
295 if (stream[_pendingAbortRequest] === undefined) {
296 WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
297 return;
298 }
299
300 const abortRequest = stream[_pendingAbortRequest];
301 stream[_pendingAbortRequest] = undefined;
302
303 if (abortRequest.wasAlreadyErroring === true) {
304 v8.rejectPromise(abortRequest.promise, storedError);
305 WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
306 return;
307 }
308
309 const promise = WritableStreamDefaultControllerAbortSteps(
310 stream[_writableStreamController], abortRequest.reason);
311
312 thenPromise(
313 promise,
314 () => {
315 v8.resolvePromise(abortRequest.promise, undefined);
316 WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
317 },
318 reason => {
319 v8.rejectPromise(abortRequest.promise, reason);
320 WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
321 });
322 }
323
265 function WritableStreamFinishInFlightWrite(stream) { 324 function WritableStreamFinishInFlightWrite(stream) {
266 TEMP_ASSERT(stream[_inFlightWriteRequest] !== undefined, 325 TEMP_ASSERT(stream[_inFlightWriteRequest] !== undefined,
267 '_stream_.[[inFlightWriteRequest]] is not *undefined*.'); 326 '_stream_.[[inFlightWriteRequest]] is not *undefined*.');
268 v8.resolvePromise(stream[_inFlightWriteRequest], undefined); 327 v8.resolvePromise(stream[_inFlightWriteRequest], undefined);
269 stream[_inFlightWriteRequest] = undefined; 328 stream[_inFlightWriteRequest] = undefined;
270 const state = stream[_stateAndFlags] & STATE_MASK;
271 if (state === ERRORED) {
272 WritableStreamFinishInFlightWriteInErroredState(stream);
273 return;
274 }
275 TEMP_ASSERT(state === WRITABLE, '_state_ is `"writable"`.');
276 WritableStreamHandleAbortRequestIfPending(stream);
277 }
278
279 function WritableStreamFinishInFlightWriteInErroredState(stream) {
280 WritableStreamRejectAbortRequestIfPending(stream);
281 WritableStreamRejectPromisesInReactionToError(stream);
282 } 329 }
283 330
284 function WritableStreamFinishInFlightWriteWithError(stream, error) { 331 function WritableStreamFinishInFlightWriteWithError(stream, error) {
285 TEMP_ASSERT(stream[_inFlightWriteRequest] !== undefined, 332 TEMP_ASSERT(stream[_inFlightWriteRequest] !== undefined,
286 '_stream_.[[inFlightWriteRequest]] is not *undefined*.'); 333 '_stream_.[[inFlightWriteRequest]] is not *undefined*.');
287 v8.rejectPromise(stream[_inFlightWriteRequest], error); 334 v8.rejectPromise(stream[_inFlightWriteRequest], error);
288 stream[_inFlightWriteRequest] = undefined; 335 stream[_inFlightWriteRequest] = undefined;
289 const state = stream[_stateAndFlags] & STATE_MASK; 336
290 if (state === ERRORED) { 337 let state = stream[_stateAndFlags] & STATE_MASK;
291 WritableStreamFinishInFlightWriteInErroredState(stream); 338 TEMP_ASSERT(state === WRITABLE || state === ERRORING,
292 return; 339 '_stream_.[[state]] is `"writable"` or `"erroring"`');
293 } 340
294 TEMP_ASSERT(state === WRITABLE, '_state_ is `"writable"`.'); 341 WritableStreamDealWithRejection(stream, error);
295 WritableStreamError(stream, error);
296 WritableStreamRejectAbortRequestIfPending(stream);
297 } 342 }
298 343
299 function WritableStreamFinishInFlightClose(stream) { 344 function WritableStreamFinishInFlightClose(stream) {
300 TEMP_ASSERT(stream[_inFlightCloseRequest] !== undefined, 345 TEMP_ASSERT(stream[_inFlightCloseRequest] !== undefined,
301 '_stream_.[[inFlightCloseRequest]] is not *undefined*.'); 346 '_stream_.[[inFlightCloseRequest]] is not *undefined*.');
302 v8.resolvePromise(stream[_inFlightCloseRequest], undefined); 347 v8.resolvePromise(stream[_inFlightCloseRequest], undefined);
303 stream[_inFlightCloseRequest] = undefined; 348 stream[_inFlightCloseRequest] = undefined;
349
304 const state = stream[_stateAndFlags] & STATE_MASK; 350 const state = stream[_stateAndFlags] & STATE_MASK;
305 if (state === ERRORED) { 351 TEMP_ASSERT(state === WRITABLE || state === ERRORING,
306 WritableStreamFinishInFlightCloseInErroredState(stream); 352 '_stream_.[[state]] is `"writable"` or `"erroring"`');
307 return; 353
354 if (state === ERRORING) {
355 stream[_storedError] = undefined;
356 if (stream[_pendingAbortRequest] !== undefined) {
357 v8.resolvePromise(stream[_pendingAbortRequest].promise, undefined);
358 stream[_pendingAbortRequest] = undefined;
359 }
308 } 360 }
309 TEMP_ASSERT(state === WRITABLE, '_state_ is `"writable"`.'); 361
310 stream[_stateAndFlags] = (stream[_stateAndFlags] & ~STATE_MASK) | CLOSED; 362 stream[_stateAndFlags] = (stream[_stateAndFlags] & ~STATE_MASK) | CLOSED;
311 const writer = stream[_writer]; 363 const writer = stream[_writer];
312 if (writer !== undefined) { 364 if (writer !== undefined) {
313 v8.resolvePromise(writer[_closedPromise], undefined); 365 v8.resolvePromise(writer[_closedPromise], undefined);
314 } 366 }
315 if (stream[_pendingAbortRequest] !== undefined) {
316 v8.resolvePromise(stream[_pendingAbortRequest].promise, undefined);
317 stream[_pendingAbortRequest] = undefined;
318 }
319 }
320 367
321 function WritableStreamFinishInFlightCloseInErroredState(stream) { 368 TEMP_ASSERT(stream[_pendingAbortRequest] === undefined,
322 WritableStreamRejectAbortRequestIfPending(stream); 369 '_stream_.[[pendingAbortRequest]] is *undefined*');
323 WritableStreamRejectClosedPromiseInReactionToError(stream); 370 TEMP_ASSERT(stream[_storedError] === undefined,
371 '_stream_.[[storedError]] is *undefined*');
324 } 372 }
325 373
326 function WritableStreamFinishInFlightCloseWithError(stream, error) { 374 function WritableStreamFinishInFlightCloseWithError(stream, error) {
327 TEMP_ASSERT(stream[_inFlightCloseRequest] !== undefined, 375 TEMP_ASSERT(stream[_inFlightCloseRequest] !== undefined,
328 '_stream_.[[inFlightCloseRequest]] is not *undefined*.'); 376 '_stream_.[[inFlightCloseRequest]] is not *undefined*.');
329 v8.rejectPromise(stream[_inFlightCloseRequest], error); 377 v8.rejectPromise(stream[_inFlightCloseRequest], error);
330 stream[_inFlightCloseRequest] = undefined; 378 stream[_inFlightCloseRequest] = undefined;
379
331 const state = stream[_stateAndFlags] & STATE_MASK; 380 const state = stream[_stateAndFlags] & STATE_MASK;
332 if (state === ERRORED) { 381 TEMP_ASSERT(state === WRITABLE || state === ERRORING,
333 WritableStreamFinishInFlightCloseInErroredState(stream); 382 '_stream_.[[state]] is `"writable"` or `"erroring"`');
334 return; 383
384 if (stream[_pendingAbortRequest] !== undefined) {
385 v8.rejectPromise(stream[_pendingAbortRequest].promise, error);
386 stream[_pendingAbortRequest] = undefined;
335 } 387 }
336 TEMP_ASSERT(state === WRITABLE, '_state_ is `"writable"`.'); 388
337 WritableStreamError(stream, error); 389 WritableStreamDealWithRejection(stream, error);
338 WritableStreamRejectAbortRequestIfPending(stream);
339 } 390 }
340 391
341 function WritableStreamCloseQueuedOrInFlight(stream) { 392 function WritableStreamCloseQueuedOrInFlight(stream) {
342 return stream[_closeRequest] !== undefined || 393 return stream[_closeRequest] !== undefined ||
343 stream[_inFlightCloseRequest] !== undefined; 394 stream[_inFlightCloseRequest] !== undefined;
344 } 395 }
345 396
346 function WritableStreamHandleAbortRequestIfPending(stream) {
347 if (stream[_pendingAbortRequest] === undefined) {
348 return;
349 }
350 WritableStreamFinishAbort(stream);
351 const abortRequest = stream[_pendingAbortRequest];
352 stream[_pendingAbortRequest] = undefined;
353 const promise =
354 WritableStreamDefaultControllerAbortSteps(stream[_writableStreamControll er],
355 abortRequest.reason);
356 thenPromise(promise,
357 result => v8.resolvePromise(abortRequest.promise, result),
358 reason => v8.rejectPromise(abortRequest.promise, reason));
359 }
360
361 function WritableStreamHasOperationMarkedInFlight(stream) { 397 function WritableStreamHasOperationMarkedInFlight(stream) {
362 return stream[_inFlightWriteRequest] !== undefined || 398 return stream[_inFlightWriteRequest] !== undefined ||
363 stream[_inFlightCloseRequest] !== undefined; 399 stream[_inFlightCloseRequest] !== undefined;
364 } 400 }
365 401
366 function WritableStreamMarkCloseRequestInFlight(stream) { 402 function WritableStreamMarkCloseRequestInFlight(stream) {
367 TEMP_ASSERT(stream[_inFlightCloseRequest] === undefined, 403 TEMP_ASSERT(stream[_inFlightCloseRequest] === undefined,
368 '_stream_.[[inFlightCloseRequest]] is *undefined*.'); 404 '_stream_.[[inFlightCloseRequest]] is *undefined*.');
369 TEMP_ASSERT(stream[_closeRequest] !== undefined, 405 TEMP_ASSERT(stream[_closeRequest] !== undefined,
370 '_stream_.[[closeRequest]] is not *undefined*.'); 406 '_stream_.[[closeRequest]] is not *undefined*.');
371 stream[_inFlightCloseRequest] = stream[_closeRequest]; 407 stream[_inFlightCloseRequest] = stream[_closeRequest];
372 stream[_closeRequest] = undefined; 408 stream[_closeRequest] = undefined;
373 } 409 }
374 410
375 function WritableStreamMarkFirstWriteRequestInFlight(stream) { 411 function WritableStreamMarkFirstWriteRequestInFlight(stream) {
376 TEMP_ASSERT(stream[_inFlightWriteRequest] === undefined, 412 TEMP_ASSERT(stream[_inFlightWriteRequest] === undefined,
377 '_stream_.[[inFlightWriteRequest]] is *undefined*.'); 413 '_stream_.[[inFlightWriteRequest]] is *undefined*.');
378 TEMP_ASSERT(stream[_writeRequests].length !== 0, 414 TEMP_ASSERT(stream[_writeRequests].length !== 0,
379 '_stream_.[[writeRequests]] is not empty.'); 415 '_stream_.[[writeRequests]] is not empty.');
380 const writeRequest = stream[_writeRequests].shift(); 416 const writeRequest = stream[_writeRequests].shift();
381 stream[_inFlightWriteRequest] = writeRequest; 417 stream[_inFlightWriteRequest] = writeRequest;
382 } 418 }
383 419
384 function WritableStreamRejectClosedPromiseInReactionToError(stream) { 420 function WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream) {
421 TEMP_ASSERT((stream[_stateAndFlags] & STATE_MASK) === ERRORED,
422 '_stream_.[[state]] is `"errored"`');
423
424 if (stream[_closeRequest] !== undefined) {
425 TEMP_ASSERT(stream[_inFlightCloseRequest] === undefined,
426 '_stream_.[[inFlightCloseRequest]] is *undefined*');
427 v8.rejectPromise(stream[_closeRequest], stream[_storedError]);
428 stream[_closeRequest] = undefined;
429 }
430
385 const writer = stream[_writer]; 431 const writer = stream[_writer];
386 if (writer !== undefined) { 432 if (writer !== undefined) {
387 v8.rejectPromise(writer[_closedPromise], stream[_storedError]); 433 v8.rejectPromise(writer[_closedPromise], stream[_storedError]);
388 v8.markPromiseAsHandled(writer[_closedPromise]); 434 v8.markPromiseAsHandled(writer[_closedPromise]);
389 } 435 }
390 } 436 }
391 437
392 function WritableStreamRejectAbortRequestIfPending(stream) {
393 if (stream[_pendingAbortRequest] !== undefined) {
394 v8.rejectPromise(stream[_pendingAbortRequest].promise,
395 stream[_storedError]);
396 stream[_pendingAbortRequest] = undefined;
397 }
398 }
399
400 function WritableStreamRejectPromisesInReactionToError(stream) {
401 const storedError = stream[_storedError];
402 rejectPromises(stream[_writeRequests], storedError);
403 stream[_writeRequests] = new binding.SimpleQueue();
404
405 if (stream[_closeRequest] !== undefined) {
406 TEMP_ASSERT(stream[_inFlightCloseRequest] === undefined,
407 '_stream_.[[inFlightCloseRequest]] is *undefined*.');
408 v8.rejectPromise(stream[_closeRequest], storedError);
409 stream[_closeRequest] = undefined;
410 }
411
412 WritableStreamRejectClosedPromiseInReactionToError(stream);
413 }
414
415 function WritableStreamUpdateBackpressure(stream, backpressure) { 438 function WritableStreamUpdateBackpressure(stream, backpressure) {
416 TEMP_ASSERT((stream[_stateAndFlags] & STATE_MASK) === WRITABLE, 439 TEMP_ASSERT((stream[_stateAndFlags] & STATE_MASK) === WRITABLE,
417 'stream.[[state]] is "writable".'); 440 'stream.[[state]] is "writable".');
418 TEMP_ASSERT(!WritableStreamCloseQueuedOrInFlight(stream), 441 TEMP_ASSERT(!WritableStreamCloseQueuedOrInFlight(stream),
419 'WritableStreamCloseQueuedOrInFlight(_stream_) is *false*.'); 442 'WritableStreamCloseQueuedOrInFlight(_stream_) is *false*.');
420 const writer = stream[_writer]; 443 const writer = stream[_writer];
421 if (writer !== undefined && 444 if (writer !== undefined &&
422 backpressure !== Boolean(stream[_stateAndFlags] & BACKPRESSURE_FLAG)) { 445 backpressure !== Boolean(stream[_stateAndFlags] & BACKPRESSURE_FLAG)) {
423 if (backpressure) { 446 if (backpressure) {
424 writer[_readyPromise] = v8.createPromise(); 447 writer[_readyPromise] = v8.createPromise();
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
459 constructor(stream) { 482 constructor(stream) {
460 if (!IsWritableStream(stream)) { 483 if (!IsWritableStream(stream)) {
461 throw new TypeError(streamErrors.illegalConstructor); 484 throw new TypeError(streamErrors.illegalConstructor);
462 } 485 }
463 if (IsWritableStreamLocked(stream)) { 486 if (IsWritableStreamLocked(stream)) {
464 throw new TypeError(streamErrors.illegalConstructor); 487 throw new TypeError(streamErrors.illegalConstructor);
465 } 488 }
466 this[_ownerWritableStream] = stream; 489 this[_ownerWritableStream] = stream;
467 stream[_writer] = this; 490 stream[_writer] = this;
468 const state = stream[_stateAndFlags] & STATE_MASK; 491 const state = stream[_stateAndFlags] & STATE_MASK;
469 if (state === WRITABLE) { 492 switch (state) {
470 if (stream[_pendingAbortRequest] !== undefined) { 493 case WRITABLE:
471 const error = new TypeError(errStreamAborting); 494 {
472 this[_readyPromise] = Promise_reject(error); 495 if (!WritableStreamCloseQueuedOrInFlight(stream) &&
496 stream[_stateAndFlags] & BACKPRESSURE_FLAG) {
497 this[_readyPromise] = v8.createPromise();
498 } else {
499 this[_readyPromise] = Promise_resolve(undefined);
500 }
501 this[_closedPromise] = v8.createPromise();
502 break;
503 }
504
505 case ERRORING:
506 {
507 this[_readyPromise] = Promise_reject(stream[_storedError]);
473 v8.markPromiseAsHandled(this[_readyPromise]); 508 v8.markPromiseAsHandled(this[_readyPromise]);
474 } else if (!WritableStreamCloseQueuedOrInFlight(stream) && 509 this[_closedPromise] = v8.createPromise();
475 stream[_stateAndFlags] & BACKPRESSURE_FLAG) { 510 break;
476 this[_readyPromise] = v8.createPromise(); 511 }
477 } else { 512
513 case CLOSED:
514 {
478 this[_readyPromise] = Promise_resolve(undefined); 515 this[_readyPromise] = Promise_resolve(undefined);
516 this[_closedPromise] = Promise_resolve(undefined);
517 break;
479 } 518 }
480 this[_closedPromise] = v8.createPromise(); 519
481 } else if (state === CLOSED) { 520 default:
482 this[_readyPromise] = Promise_resolve(undefined); 521 {
483 this[_closedPromise] = Promise_resolve(undefined); 522 TEMP_ASSERT(state === ERRORED, '_state_ is `"errored"`.');
484 } else { 523 const storedError = stream[_storedError];
485 TEMP_ASSERT(state === ERRORED, '_state_ is `"errored"`.'); 524 this[_readyPromise] = Promise_reject(storedError);
486 const storedError = stream[_storedError]; 525 v8.markPromiseAsHandled(this[_readyPromise]);
487 this[_readyPromise] = Promise_reject(storedError); 526 this[_closedPromise] = Promise_reject(storedError);
488 v8.markPromiseAsHandled(this[_readyPromise]); 527 v8.markPromiseAsHandled(this[_closedPromise]);
489 this[_closedPromise] = Promise_reject(storedError); 528 break;
490 v8.markPromiseAsHandled(this[_closedPromise]); 529 }
491 } 530 }
492 } 531 }
493 532
494 get closed() { 533 get closed() {
495 if (!IsWritableStreamDefaultWriter(this)) { 534 if (!IsWritableStreamDefaultWriter(this)) {
496 return Promise_reject(new TypeError(streamErrors.illegalInvocation)); 535 return Promise_reject(new TypeError(streamErrors.illegalInvocation));
497 } 536 }
498 return this[_closedPromise]; 537 return this[_closedPromise];
499 } 538 }
500 539
(...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after
577 } 616 }
578 617
579 function WritableStreamDefaultWriterClose(writer) { 618 function WritableStreamDefaultWriterClose(writer) {
580 const stream = writer[_ownerWritableStream]; 619 const stream = writer[_ownerWritableStream];
581 TEMP_ASSERT(stream !== undefined, 'stream is not undefined.'); 620 TEMP_ASSERT(stream !== undefined, 'stream is not undefined.');
582 const state = stream[_stateAndFlags] & STATE_MASK; 621 const state = stream[_stateAndFlags] & STATE_MASK;
583 if (state === CLOSED || state === ERRORED) { 622 if (state === CLOSED || state === ERRORED) {
584 return Promise_reject( 623 return Promise_reject(
585 createCannotActionOnStateStreamError('close', state)); 624 createCannotActionOnStateStreamError('close', state));
586 } 625 }
587 if (stream[_pendingAbortRequest] !== undefined) { 626
588 return Promise_reject(new TypeError(errStreamAborting)); 627 TEMP_ASSERT(state === WRITABLE || state === ERRORING,
589 } 628 '_state_ is `"writable"` or `"erroring"`.');
590 TEMP_ASSERT(state === WRITABLE, '_state_ is `"writable"`.');
591 TEMP_ASSERT(!WritableStreamCloseQueuedOrInFlight(stream), 629 TEMP_ASSERT(!WritableStreamCloseQueuedOrInFlight(stream),
592 '! WritableStreamCloseQueuedOrInFlight(_stream_) is *false*.'); 630 '! WritableStreamCloseQueuedOrInFlight(_stream_) is *false*.');
593 const promise = v8.createPromise(); 631 const promise = v8.createPromise();
594 stream[_closeRequest] = promise; 632 stream[_closeRequest] = promise;
595 if (stream[_stateAndFlags] & BACKPRESSURE_FLAG) { 633
634 if ((stream[_stateAndFlags] & BACKPRESSURE_FLAG) &&
635 state === WRITABLE) {
596 v8.resolvePromise(writer[_readyPromise], undefined); 636 v8.resolvePromise(writer[_readyPromise], undefined);
597 } 637 }
598 WritableStreamDefaultControllerClose(stream[_writableStreamController]); 638 WritableStreamDefaultControllerClose(stream[_writableStreamController]);
599 return promise; 639 return promise;
600 } 640 }
601 641
602 function WritableStreamDefaultWriterCloseWithErrorPropagation(writer) { 642 function WritableStreamDefaultWriterCloseWithErrorPropagation(writer) {
603 const stream = writer[_ownerWritableStream]; 643 const stream = writer[_ownerWritableStream];
604 TEMP_ASSERT(stream !== undefined, 'stream is not undefined.'); 644 TEMP_ASSERT(stream !== undefined, 'stream is not undefined.');
605 const state = stream[_stateAndFlags] & STATE_MASK; 645 const state = stream[_stateAndFlags] & STATE_MASK;
606 if (WritableStreamCloseQueuedOrInFlight(stream) || state === CLOSED) { 646 if (WritableStreamCloseQueuedOrInFlight(stream) || state === CLOSED) {
607 return Promise_resolve(undefined); 647 return Promise_resolve(undefined);
608 } 648 }
609 if (state === ERRORED) { 649 if (state === ERRORED) {
610 return Promise_reject(stream[_storedError]); 650 return Promise_reject(stream[_storedError]);
611 } 651 }
612 TEMP_ASSERT(state === WRITABLE, 'state is "writable".'); 652
653 TEMP_ASSERT(state === WRITABLE || state === ERRORING,
654 '_state_ is `"writable"` or `"erroring"`.');
655
613 return WritableStreamDefaultWriterClose(writer); 656 return WritableStreamDefaultWriterClose(writer);
614 } 657 }
615 658
616 function WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, 659 function WritableStreamDefaultWriterEnsureClosedPromiseRejected(
617 error) { 660 writer, error) {
661 if (v8.promiseState(writer[_closedPromise]) === v8.kPROMISE_PENDING) {
662 v8.rejectPromise(writer[_closedPromise], error);
663 } else {
664 writer[_closedPromise] = Promise_reject(error);
665 }
666 v8.markPromiseAsHandled(writer[_closedPromise]);
667 }
668
669
670 function WritableStreamDefaultWriterEnsureReadyPromiseRejected(
671 writer, error) {
618 if (v8.promiseState(writer[_readyPromise]) === v8.kPROMISE_PENDING) { 672 if (v8.promiseState(writer[_readyPromise]) === v8.kPROMISE_PENDING) {
619 v8.rejectPromise(writer[_readyPromise], error); 673 v8.rejectPromise(writer[_readyPromise], error);
620 } else { 674 } else {
621 writer[_readyPromise] = Promise_reject(error); 675 writer[_readyPromise] = Promise_reject(error);
622 } 676 }
623 v8.markPromiseAsHandled(writer[_readyPromise]); 677 v8.markPromiseAsHandled(writer[_readyPromise]);
624 } 678 }
625 679
626 function WritableStreamDefaultWriterGetDesiredSize(writer) { 680 function WritableStreamDefaultWriterGetDesiredSize(writer) {
627 const stream = writer[_ownerWritableStream]; 681 const stream = writer[_ownerWritableStream];
628 const state = stream[_stateAndFlags] & STATE_MASK; 682 const state = stream[_stateAndFlags] & STATE_MASK;
629 if (state === ERRORED || stream[_pendingAbortRequest] !== undefined) { 683 if (state === ERRORED || state === ERRORING) {
630 return null; 684 return null;
631 } 685 }
632 if (state === CLOSED) { 686 if (state === CLOSED) {
633 return 0; 687 return 0;
634 } 688 }
635 return WritableStreamDefaultControllerGetDesiredSize( 689 return WritableStreamDefaultControllerGetDesiredSize(
636 stream[_writableStreamController]); 690 stream[_writableStreamController]);
637 } 691 }
638 692
639 function WritableStreamDefaultWriterRelease(writer) { 693 function WritableStreamDefaultWriterRelease(writer) {
640 const stream = writer[_ownerWritableStream]; 694 const stream = writer[_ownerWritableStream];
641 TEMP_ASSERT(stream !== undefined, 695 TEMP_ASSERT(stream !== undefined,
642 'stream is not undefined.'); 696 'stream is not undefined.');
643 TEMP_ASSERT(stream[_writer] === writer, 697 TEMP_ASSERT(stream[_writer] === writer,
644 'stream.[[writer]] is writer.'); 698 'stream.[[writer]] is writer.');
645 const releasedError = new TypeError(errReleasedWriterClosedPromise); 699 const releasedError = new TypeError(errReleasedWriterClosedPromise);
646 const state = stream[_stateAndFlags] & STATE_MASK; 700 WritableStreamDefaultWriterEnsureReadyPromiseRejected(
647 WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, 701 writer, releasedError);
648 releasedError); 702 WritableStreamDefaultWriterEnsureClosedPromiseRejected(
649 if (state === WRITABLE || 703 writer, releasedError);
650 WritableStreamHasOperationMarkedInFlight(stream)) {
651 v8.rejectPromise(writer[_closedPromise], releasedError);
652 } else {
653 writer[_closedPromise] = Promise_reject(releasedError);
654 }
655 v8.markPromiseAsHandled(writer[_closedPromise]);
656 stream[_writer] = undefined; 704 stream[_writer] = undefined;
657 writer[_ownerWritableStream] = undefined; 705 writer[_ownerWritableStream] = undefined;
658 } 706 }
659 707
660 function WritableStreamDefaultWriterWrite(writer, chunk) { 708 function WritableStreamDefaultWriterWrite(writer, chunk) {
661 const stream = writer[_ownerWritableStream]; 709 const stream = writer[_ownerWritableStream];
662 TEMP_ASSERT(stream !== undefined, 'stream is not undefined.'); 710 TEMP_ASSERT(stream !== undefined, 'stream is not undefined.');
663 const controller = stream[_writableStreamController]; 711 const controller = stream[_writableStreamController];
664 const chunkSize = 712 const chunkSize =
665 WritableStreamDefaultControllerGetChunkSize(controller, chunk); 713 WritableStreamDefaultControllerGetChunkSize(controller, chunk);
666 if (stream !== writer[_ownerWritableStream]) { 714 if (stream !== writer[_ownerWritableStream]) {
667 return Promise_reject(createWriterLockReleasedError(verbWrittenTo)); 715 return Promise_reject(createWriterLockReleasedError(verbWrittenTo));
668 } 716 }
669 const state = stream[_stateAndFlags] & STATE_MASK; 717 const state = stream[_stateAndFlags] & STATE_MASK;
670 if (state === ERRORED) { 718 if (state === ERRORED) {
671 return Promise_reject(stream[_storedError]); 719 return Promise_reject(stream[_storedError]);
672 } 720 }
673 if (WritableStreamCloseQueuedOrInFlight(stream)) { 721 if (WritableStreamCloseQueuedOrInFlight(stream)) {
674 return Promise_reject(new TypeError( 722 return Promise_reject(new TypeError(
675 templateErrorCannotActionOnStateStream('write to', 'closing'))); 723 templateErrorCannotActionOnStateStream('write to', 'closing')));
676 } 724 }
677 if (state === CLOSED) { 725 if (state === CLOSED) {
678 return Promise_reject( 726 return Promise_reject(
679 createCannotActionOnStateStreamError('write to', CLOSED)); 727 createCannotActionOnStateStreamError('write to', CLOSED));
680 } 728 }
681 if (stream[_pendingAbortRequest] !== undefined) { 729 if (state === ERRORING) {
682 return Promise_reject(new TypeError(errStreamAborting)); 730 return Promise_reject(stream[_storedError]);
683 } 731 }
732 TEMP_ASSERT(state === WRITABLE, '_state_ is `"writable"`');
684 const promise = WritableStreamAddWriteRequest(stream); 733 const promise = WritableStreamAddWriteRequest(stream);
685 WritableStreamDefaultControllerWrite(controller, chunk, chunkSize); 734 WritableStreamDefaultControllerWrite(controller, chunk, chunkSize);
686 return promise; 735 return promise;
687 } 736 }
688 737
689 // Functions to expose internals for ReadableStream.pipeTo. These do not 738 // Functions to expose internals for ReadableStream.pipeTo. These do not
690 // appear in the standard. 739 // appear in the standard.
691 function getWritableStreamDefaultWriterClosedPromise(writer) { 740 function getWritableStreamDefaultWriterClosedPromise(writer) {
692 TEMP_ASSERT( 741 TEMP_ASSERT(
693 IsWritableStreamDefaultWriter(writer), 742 IsWritableStreamDefaultWriter(writer),
(...skipping 29 matching lines...) Expand all
723 this[_strategySize] = normalizedStrategy.size; 772 this[_strategySize] = normalizedStrategy.size;
724 this[_strategyHWM] = normalizedStrategy.highWaterMark; 773 this[_strategyHWM] = normalizedStrategy.highWaterMark;
725 const backpressure = WritableStreamDefaultControllerGetBackpressure(this); 774 const backpressure = WritableStreamDefaultControllerGetBackpressure(this);
726 WritableStreamUpdateBackpressure(stream, backpressure); 775 WritableStreamUpdateBackpressure(stream, backpressure);
727 } 776 }
728 777
729 error(e) { 778 error(e) {
730 if (!IsWritableStreamDefaultController(this)) { 779 if (!IsWritableStreamDefaultController(this)) {
731 throw new TypeError(streamErrors.illegalInvocation); 780 throw new TypeError(streamErrors.illegalInvocation);
732 } 781 }
733 const state = this[_controlledWritableStream][_stateAndFlags] & STATE_MASK ; 782 const state =
734 if (state === CLOSED || state === ERRORED) { 783 this[_controlledWritableStream][_stateAndFlags] & STATE_MASK;
735 throw createCannotActionOnStateStreamError('error', state); 784 if (state !== WRITABLE) {
785 return;
736 } 786 }
737 WritableStreamDefaultControllerError(this, e); 787 WritableStreamDefaultControllerError(this, e);
738 } 788 }
739 } 789 }
740 790
741 // Writable Stream Default Controller Internal Methods 791 // Writable Stream Default Controller Internal Methods
742 792
743 // TODO(ricea): Virtual dispatch via V8 Private Symbols seems to be difficult 793 // TODO(ricea): Virtual dispatch via V8 Private Symbols seems to be difficult
744 // or impossible, so use static dispatch for now. This will have to be fixed 794 // or impossible, so use static dispatch for now. This will have to be fixed
745 // when adding a byte controller. 795 // when adding a byte controller.
746 function WritableStreamDefaultControllerAbortSteps(controller, reason) { 796 function WritableStreamDefaultControllerAbortSteps(controller, reason) {
747 const sinkAbortPromise = 797 return PromiseInvokeOrNoop(controller[_underlyingSink], 'abort', [reason]);
748 PromiseInvokeOrNoop(controller[_underlyingSink], 'abort', [reason]);
749 return thenPromise(sinkAbortPromise, () => undefined);
750 } 798 }
751 799
752 function WritableStreamDefaultControllerErrorSteps(controller) { 800 function WritableStreamDefaultControllerErrorSteps(controller) {
753 ResetQueue(controller); 801 ResetQueue(controller);
754 } 802 }
755 803
756 function WritableStreamDefaultControllerStartSteps(controller) { 804 function WritableStreamDefaultControllerStartSteps(controller) {
757 const startResult = 805 const startResult =
758 InvokeOrNoop(controller[_underlyingSink], 'start', [controller]); 806 InvokeOrNoop(controller[_underlyingSink], 'start', [controller]);
759 const stream = controller[_controlledWritableStream]; 807 const stream = controller[_controlledWritableStream];
760 const startPromise = Promise_resolve(startResult); 808 const startPromise = Promise_resolve(startResult);
761 thenPromise( 809 thenPromise(
762 startPromise, 810 startPromise,
763 () => { 811 () => {
812 const state = stream[_stateAndFlags] & STATE_MASK;
813 TEMP_ASSERT(state === WRITABLE || state === ERRORING,
814 '_stream_.[[state]] is `"writable"` or `"erroring"`');
764 controller[_started] = true; 815 controller[_started] = true;
765 if ((stream[_stateAndFlags] & STATE_MASK) === ERRORED) {
766 WritableStreamRejectAbortRequestIfPending(stream);
767 } else {
768 WritableStreamHandleAbortRequestIfPending(stream);
769 }
770 WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller); 816 WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
771 }, 817 },
772 r => { 818 r => {
773 TEMP_ASSERT( 819 const state = stream[_stateAndFlags] & STATE_MASK;
774 (stream[_stateAndFlags] & STATE_MASK) === WRITABLE || 820 TEMP_ASSERT(state === WRITABLE || state === ERRORING,
775 (stream[_stateAndFlags] & STATE_MASK) === ERRORED, 821 '_stream_.[[state]] is `"writable"` or `"erroring"`');
776 '_stream_.[[state]] is `"writable"` or `"errored"`.'); 822 controller[_started] = true;
777 WritableStreamDefaultControllerErrorIfNeeded(controller, r); 823 WritableStreamDealWithRejection(stream, r);
778 WritableStreamRejectAbortRequestIfPending(stream);
779 }); 824 });
780 } 825 }
781 826
782 // Writable Stream Default Controller Abstract Operations 827 // Writable Stream Default Controller Abstract Operations
783 828
784 function IsWritableStreamDefaultController(x) { 829 function IsWritableStreamDefaultController(x) {
785 return hasOwnProperty(x, _underlyingSink); 830 return hasOwnProperty(x, _underlyingSink);
786 } 831 }
787 832
788 function WritableStreamDefaultControllerClose(controller) { 833 function WritableStreamDefaultControllerClose(controller) {
(...skipping 22 matching lines...) Expand all
811 856
812 function WritableStreamDefaultControllerWrite(controller, chunk, chunkSize) { 857 function WritableStreamDefaultControllerWrite(controller, chunk, chunkSize) {
813 const writeRecord = {chunk}; 858 const writeRecord = {chunk};
814 try { 859 try {
815 EnqueueValueWithSize(controller, writeRecord, chunkSize); 860 EnqueueValueWithSize(controller, writeRecord, chunkSize);
816 } catch (e) { 861 } catch (e) {
817 WritableStreamDefaultControllerErrorIfNeeded(controller, e); 862 WritableStreamDefaultControllerErrorIfNeeded(controller, e);
818 return; 863 return;
819 } 864 }
820 const stream = controller[_controlledWritableStream]; 865 const stream = controller[_controlledWritableStream];
821 if (!WritableStreamCloseQueuedOrInFlight(stream)) { 866 if (!WritableStreamCloseQueuedOrInFlight(stream) &&
867 (stream[_stateAndFlags] & STATE_MASK) === WRITABLE) {
822 const backpressure = 868 const backpressure =
823 WritableStreamDefaultControllerGetBackpressure(controller); 869 WritableStreamDefaultControllerGetBackpressure(controller);
824 WritableStreamUpdateBackpressure(stream, backpressure); 870 WritableStreamUpdateBackpressure(stream, backpressure);
825 } 871 }
826 WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller); 872 WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
827 } 873 }
828 874
829 function WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller) { 875 function WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller) {
830 const stream = controller[_controlledWritableStream]; 876 const stream = controller[_controlledWritableStream];
831 const state = stream[_stateAndFlags] & STATE_MASK;
832 if (state === CLOSED || state === ERRORED) {
833 return;
834 }
835 if (!controller[_started]) { 877 if (!controller[_started]) {
836 return; 878 return;
837 } 879 }
838 if (stream[_inFlightWriteRequest] !== undefined) { 880 if (stream[_inFlightWriteRequest] !== undefined) {
839 return; 881 return;
840 } 882 }
883 const state = stream[_stateAndFlags] & STATE_MASK;
884 if (state === CLOSED || state === ERRORED) {
885 return;
886 }
887 if (state === ERRORING) {
888 WritableStreamFinishErroring(stream);
889 return;
890 }
841 if (controller[_queue].length === 0) { 891 if (controller[_queue].length === 0) {
842 return; 892 return;
843 } 893 }
844 const writeRecord = PeekQueueValue(controller); 894 const writeRecord = PeekQueueValue(controller);
845 if (writeRecord === 'close') { 895 if (writeRecord === 'close') {
846 WritableStreamDefaultControllerProcessClose(controller); 896 WritableStreamDefaultControllerProcessClose(controller);
847 } else { 897 } else {
848 WritableStreamDefaultControllerProcessWrite(controller, 898 WritableStreamDefaultControllerProcessWrite(controller,
849 writeRecord.chunk); 899 writeRecord.chunk);
850 } 900 }
851 } 901 }
852 902
853 function WritableStreamDefaultControllerErrorIfNeeded(controller, error) { 903 function WritableStreamDefaultControllerErrorIfNeeded(controller, error) {
854 const state = controller[_controlledWritableStream][_stateAndFlags] & STATE_ MASK; 904 const state =
905 controller[_controlledWritableStream][_stateAndFlags] & STATE_MASK;
855 if (state === WRITABLE) { 906 if (state === WRITABLE) {
856 WritableStreamDefaultControllerError(controller, error); 907 WritableStreamDefaultControllerError(controller, error);
857 } 908 }
858 } 909 }
859 910
860 function WritableStreamDefaultControllerProcessClose(controller) { 911 function WritableStreamDefaultControllerProcessClose(controller) {
861 const stream = controller[_controlledWritableStream]; 912 const stream = controller[_controlledWritableStream];
862 WritableStreamMarkCloseRequestInFlight(stream); 913 WritableStreamMarkCloseRequestInFlight(stream);
863 DequeueValue(controller); 914 DequeueValue(controller);
864 TEMP_ASSERT(controller[_queue].length === 0, 915 TEMP_ASSERT(controller[_queue].length === 0,
865 'controller.[[queue]] is empty.'); 916 'controller.[[queue]] is empty.');
866 const sinkClosePromise = PromiseInvokeOrNoop(controller[_underlyingSink], 917 const sinkClosePromise = PromiseInvokeOrNoop(
867 'close', [controller]); 918 controller[_underlyingSink], 'close', []);
868 thenPromise( 919 thenPromise(
869 sinkClosePromise, 920 sinkClosePromise,
870 () => WritableStreamFinishInFlightClose(stream), 921 () => WritableStreamFinishInFlightClose(stream),
871 reason => WritableStreamFinishInFlightCloseWithError(stream, reason) 922 reason => WritableStreamFinishInFlightCloseWithError(stream, reason)
872 ); 923 );
873 } 924 }
874 925
875 function WritableStreamDefaultControllerProcessWrite(controller, chunk) { 926 function WritableStreamDefaultControllerProcessWrite(controller, chunk) {
876 const stream = controller[_controlledWritableStream]; 927 const stream = controller[_controlledWritableStream];
877 WritableStreamMarkFirstWriteRequestInFlight(stream); 928 WritableStreamMarkFirstWriteRequestInFlight(stream);
878 const sinkWritePromise = PromiseInvokeOrNoop(controller[_underlyingSink], 929 const sinkWritePromise = PromiseInvokeOrNoop(controller[_underlyingSink],
879 'write', [chunk, controller]); 930 'write', [chunk, controller]);
880 thenPromise( 931 thenPromise(
881 sinkWritePromise, 932 sinkWritePromise,
882 () => { 933 () => {
883 WritableStreamFinishInFlightWrite(stream); 934 WritableStreamFinishInFlightWrite(stream);
884 const state = stream[_stateAndFlags] & STATE_MASK; 935 const state = stream[_stateAndFlags] & STATE_MASK;
885 if (state === ERRORED) { 936 TEMP_ASSERT(state === WRITABLE || state === ERRORING,
886 return; 937 '_state_ is `"writable"` or `"erroring"`');
887 }
888 TEMP_ASSERT(state === WRITABLE, '_state_ is `"writable"`.');
889 DequeueValue(controller); 938 DequeueValue(controller);
890 if (!WritableStreamCloseQueuedOrInFlight(stream)) { 939 if (!WritableStreamCloseQueuedOrInFlight(stream) &&
940 state === WRITABLE) {
891 const backpressure = 941 const backpressure =
892 WritableStreamDefaultControllerGetBackpressure(controller); 942 WritableStreamDefaultControllerGetBackpressure(controller);
893 WritableStreamUpdateBackpressure(stream, backpressure); 943 WritableStreamUpdateBackpressure(stream, backpressure);
894 } 944 }
895 WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller); 945 WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
896 }, 946 },
897 reason => { 947 reason => {
898 const wasErrored = (stream[_stateAndFlags] & STATE_MASK) === ERRORED;
899 WritableStreamFinishInFlightWriteWithError(stream, reason); 948 WritableStreamFinishInFlightWriteWithError(stream, reason);
900 TEMP_ASSERT((stream[_stateAndFlags] & STATE_MASK) === ERRORED,
901 '_stream_.[[state]] is `"errored"`.');
902 if (!wasErrored) {
903 ResetQueue(controller);
904 }
905 }); 949 });
906 } 950 }
907 951
908 function WritableStreamDefaultControllerGetBackpressure(controller) { 952 function WritableStreamDefaultControllerGetBackpressure(controller) {
909 const desiredSize = 953 const desiredSize =
910 WritableStreamDefaultControllerGetDesiredSize(controller); 954 WritableStreamDefaultControllerGetDesiredSize(controller);
911 return desiredSize <= 0; 955 return desiredSize <= 0;
912 } 956 }
913 957
914 function WritableStreamDefaultControllerError(controller, error) { 958 function WritableStreamDefaultControllerError(controller, error) {
915 const stream = controller[_controlledWritableStream]; 959 const stream = controller[_controlledWritableStream];
916 TEMP_ASSERT((stream[_stateAndFlags] & STATE_MASK) === WRITABLE, 960 TEMP_ASSERT((stream[_stateAndFlags] & STATE_MASK) === WRITABLE,
917 '_stream_.[[state]] is `"writable"`.'); 961 '_stream_.[[state]] is `"writable"`.');
918 WritableStreamError(stream, error); 962 WritableStreamStartErroring(stream, error);
919 } 963 }
920 964
921 // Queue-with-Sizes Operations 965 // Queue-with-Sizes Operations
922 // 966 //
923 // TODO(ricea): Share these operations with ReadableStream.js. 967 // TODO(ricea): Share these operations with ReadableStream.js.
924 function DequeueValue(container) { 968 function DequeueValue(container) {
925 TEMP_ASSERT( 969 TEMP_ASSERT(
926 hasOwnProperty(container, _queue) && 970 hasOwnProperty(container, _queue) &&
927 hasOwnProperty(container, _queueTotalSize), 971 hasOwnProperty(container, _queueTotalSize),
928 'Assert: _container_ has [[queue]] and [[queueTotalSize]] internal ' + 972 'Assert: _container_ has [[queue]] and [[queueTotalSize]] internal ' +
(...skipping 124 matching lines...) Expand 10 before | Expand all | Expand 10 after
1053 getWritableStreamDefaultWriterClosedPromise; 1097 getWritableStreamDefaultWriterClosedPromise;
1054 binding.WritableStreamDefaultWriterGetDesiredSize = 1098 binding.WritableStreamDefaultWriterGetDesiredSize =
1055 WritableStreamDefaultWriterGetDesiredSize; 1099 WritableStreamDefaultWriterGetDesiredSize;
1056 binding.getWritableStreamDefaultWriterReadyPromise = 1100 binding.getWritableStreamDefaultWriterReadyPromise =
1057 getWritableStreamDefaultWriterReadyPromise; 1101 getWritableStreamDefaultWriterReadyPromise;
1058 binding.WritableStreamDefaultWriterRelease = 1102 binding.WritableStreamDefaultWriterRelease =
1059 WritableStreamDefaultWriterRelease; 1103 WritableStreamDefaultWriterRelease;
1060 binding.WritableStreamDefaultWriterWrite = WritableStreamDefaultWriterWrite; 1104 binding.WritableStreamDefaultWriterWrite = WritableStreamDefaultWriterWrite;
1061 binding.getWritableStreamStoredError = getWritableStreamStoredError; 1105 binding.getWritableStreamStoredError = getWritableStreamStoredError;
1062 }); 1106 });
OLDNEW
« no previous file with comments | « third_party/WebKit/LayoutTests/http/tests/streams/piping/multiple-propagation.https-expected.txt ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698