OLD | NEW |
---|---|
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 // Implementation of WritableStream for Blink. See | 5 // Implementation of WritableStream for Blink. See |
6 // https://streams.spec.whatwg.org/#ws. The implementation closely follows the | 6 // https://streams.spec.whatwg.org/#ws. The implementation closely follows the |
7 // standard, except where required for performance or integration with Blink. In | 7 // standard, except where required for performance or integration with Blink. In |
8 // particular, classes, methods and abstract operations are implemented in the | 8 // particular, classes, methods and abstract operations are implemented in the |
9 // same order as in the standard, to simplify side-by-side reading. | 9 // same order as in the standard, to simplify side-by-side reading. |
10 | 10 |
11 (function(global, binding, v8) { | 11 (function(global, binding, v8) { |
12 'use strict'; | 12 'use strict'; |
13 | 13 |
14 // Private symbols. These correspond to the internal slots in the standard. | 14 // Private symbols. These correspond to the internal slots in the standard. |
15 // "[[X]]" in the standard is spelt _X in this implementation. | 15 // "[[X]]" in the standard is spelt _X in this implementation. |
16 const _pendingWriteRequest = v8.createPrivateSymbol('[[pendingWriteRequest]]') ; | |
17 const _pendingCloseRequest = v8.createPrivateSymbol('[[pendingCloseRequest]]') ; | |
18 const _pendingAbortRequest = v8.createPrivateSymbol('[[pendingAbortRequest]]') ; | |
16 const _state = v8.createPrivateSymbol('[[state]]'); | 19 const _state = v8.createPrivateSymbol('[[state]]'); |
17 const _storedError = v8.createPrivateSymbol('[[storedError]]'); | 20 const _storedError = v8.createPrivateSymbol('[[storedError]]'); |
18 const _writer = v8.createPrivateSymbol('[[writer]]'); | 21 const _writer = v8.createPrivateSymbol('[[writer]]'); |
19 const _writableStreamController = | 22 const _writableStreamController = |
20 v8.createPrivateSymbol('[[writableStreamController]]'); | 23 v8.createPrivateSymbol('[[writableStreamController]]'); |
21 const _writeRequests = v8.createPrivateSymbol('[[writeRequests]]'); | 24 const _writeRequests = v8.createPrivateSymbol('[[writeRequests]]'); |
22 const _closedPromise = v8.createPrivateSymbol('[[closedPromise]]'); | 25 const _closedPromise = v8.createPrivateSymbol('[[closedPromise]]'); |
23 const _ownerWritableStream = | 26 const _ownerWritableStream = |
24 v8.createPrivateSymbol('[[ownerWritableStream]]'); | 27 v8.createPrivateSymbol('[[ownerWritableStream]]'); |
25 const _readyPromise = v8.createPrivateSymbol('[[readyPromise]]'); | 28 const _readyPromise = v8.createPrivateSymbol('[[readyPromise]]'); |
26 const _controlledWritableStream = | 29 const _controlledWritableStream = |
27 v8.createPrivateSymbol('[[controlledWritableStream]]'); | 30 v8.createPrivateSymbol('[[controlledWritableStream]]'); |
28 const _queue = v8.createPrivateSymbol('[[queue]]'); | 31 const _queue = v8.createPrivateSymbol('[[queue]]'); |
29 const _queueSize = v8.createPrivateSymbol('[[queueSize]]'); | 32 const _queueSize = v8.createPrivateSymbol('[[queueSize]]'); |
30 const _strategyHWM = v8.createPrivateSymbol('[[strategyHWM]]'); | 33 const _strategyHWM = v8.createPrivateSymbol('[[strategyHWM]]'); |
31 const _strategySize = v8.createPrivateSymbol('[[strategySize]]'); | 34 const _strategySize = v8.createPrivateSymbol('[[strategySize]]'); |
32 const _underlyingSink = v8.createPrivateSymbol('[[underlyingSink]]'); | 35 const _underlyingSink = v8.createPrivateSymbol('[[underlyingSink]]'); |
33 | 36 |
34 // _defaultControllerFlags combines WritableStreamDefaultController's internal | 37 // _defaultControllerFlags combines WritableStreamDefaultController's internal |
35 // slots [[started]] and [[writing]] into a single bitmask for efficiency. | 38 // slots [[started]], [[writing]], and [[inClose]] into a single bitmask for |
39 // efficiency. | |
36 const _defaultControllerFlags = | 40 const _defaultControllerFlags = |
37 v8.createPrivateSymbol('[[defaultControllerFlags]]'); | 41 v8.createPrivateSymbol('[[defaultControllerFlags]]'); |
38 const FLAG_STARTED = 0b1; | 42 const FLAG_STARTED = 0b1; |
39 const FLAG_WRITING = 0b10; | 43 const FLAG_WRITING = 0b10; |
44 const FLAG_INCLOSE = 0b100; | |
40 | 45 |
41 // For efficiency, WritableStream [[state]] contains numeric values. | 46 // For efficiency, WritableStream [[state]] contains numeric values. |
42 const WRITABLE = 0; | 47 const WRITABLE = 0; |
43 const CLOSING = 1; | 48 const CLOSING = 1; |
44 const CLOSED = 2; | 49 const CLOSED = 2; |
45 const ERRORED = 3; | 50 const ERRORED = 3; |
46 | 51 |
47 // Javascript functions. It is important to use these copies, as the ones on | 52 // Javascript functions. It is important to use these copies, as the ones on |
48 // the global object may have been overwritten. See "V8 Extras Design Doc", | 53 // the global object may have been overwritten. See "V8 Extras Design Doc", |
49 // section "Security Considerations". | 54 // section "Security Considerations". |
50 // https://docs.google.com/document/d/1AT5-T0aHGp7Lt29vPWFr2-qG8r3l9CByyvKwEuA 8Ec0/edit#heading=h.9yixony1a18r | 55 // https://docs.google.com/document/d/1AT5-T0aHGp7Lt29vPWFr2-qG8r3l9CByyvKwEuA 8Ec0/edit#heading=h.9yixony1a18r |
51 const undefined = global.undefined; | 56 const undefined = global.undefined; |
52 | 57 |
53 const defineProperty = global.Object.defineProperty; | 58 const defineProperty = global.Object.defineProperty; |
54 const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty); | 59 const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty); |
55 | 60 |
56 const Function_call = v8.uncurryThis(global.Function.prototype.call); | 61 const Function_call = v8.uncurryThis(global.Function.prototype.call); |
Adam Rice
2017/01/05 11:52:39
I think this can be removed now.
| |
57 const Function_apply = v8.uncurryThis(global.Function.prototype.apply); | 62 const Function_apply = v8.uncurryThis(global.Function.prototype.apply); |
58 | 63 |
59 const TypeError = global.TypeError; | 64 const TypeError = global.TypeError; |
60 const RangeError = global.RangeError; | 65 const RangeError = global.RangeError; |
61 | 66 |
62 const Boolean = global.Boolean; | 67 const Boolean = global.Boolean; |
63 const Number = global.Number; | 68 const Number = global.Number; |
64 const Number_isNaN = Number.isNaN; | 69 const Number_isNaN = Number.isNaN; |
65 const Number_isFinite = Number.isFinite; | 70 const Number_isFinite = Number.isFinite; |
66 | 71 |
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
122 } | 127 } |
123 | 128 |
124 function getDefaultControllerWritingFlag(controller) { | 129 function getDefaultControllerWritingFlag(controller) { |
125 return Boolean(controller[_defaultControllerFlags] & FLAG_WRITING); | 130 return Boolean(controller[_defaultControllerFlags] & FLAG_WRITING); |
126 } | 131 } |
127 | 132 |
128 function setDefaultControllerWritingFlag(controller, value) { | 133 function setDefaultControllerWritingFlag(controller, value) { |
129 setDefaultControllerFlag(controller, FLAG_WRITING, value); | 134 setDefaultControllerFlag(controller, FLAG_WRITING, value); |
130 } | 135 } |
131 | 136 |
137 function getDefaultControllerInCloseFlag(controller) { | |
138 return Boolean(controller[_defaultControllerFlags] & FLAG_INCLOSE); | |
139 } | |
140 | |
141 function setDefaultControllerInCloseFlag(controller, value) { | |
142 setDefaultControllerFlag(controller, FLAG_INCLOSE, value); | |
143 } | |
144 | |
132 function rejectPromises(array, e) { | 145 function rejectPromises(array, e) { |
133 // array is an InternalPackedArray so forEach won't work. | 146 // array is an InternalPackedArray so forEach won't work. |
134 for (let i = 0; i < array.length; ++i) { | 147 for (let i = 0; i < array.length; ++i) { |
135 v8.rejectPromise(array[i], e); | 148 v8.rejectPromise(array[i], e); |
136 } | 149 } |
137 } | 150 } |
138 | 151 |
139 // https://tc39.github.io/ecma262/#sec-ispropertykey | 152 // https://tc39.github.io/ecma262/#sec-ispropertykey |
140 // TODO(ricea): Remove this when the asserts using it are removed. | 153 // TODO(ricea): Remove this when the asserts using it are removed. |
141 function IsPropertyKey(argument) { | 154 function IsPropertyKey(argument) { |
142 return typeof argument === 'string' || typeof argument === 'symbol'; | 155 return typeof argument === 'string' || typeof argument === 'symbol'; |
143 } | 156 } |
144 | 157 |
145 // TODO(ricea): Remove all asserts once the implementation has stabilised. | 158 // TODO(ricea): Remove all asserts once the implementation has stabilised. |
146 function TEMP_ASSERT(predicate, message) { | 159 function TEMP_ASSERT(predicate, message) { |
147 if (predicate) { | 160 if (predicate) { |
148 return; | 161 return; |
149 } | 162 } |
150 v8.log(`Assertion failed: ${message}\n`); | 163 v8.log(`Assertion failed: ${message}\n`); |
151 v8.logStackTrace(); | 164 v8.logStackTrace(); |
152 class WritableStreamInternalError { | 165 class WritableStreamInternalError extends Error { |
Adam Rice
2017/01/05 11:48:24
I actually had a reason for not inheriting from Er
domenic
2017/01/05 21:51:00
Yeah, in debugging in content_shell this was quite
| |
166 constructor(message) { | |
167 super(message); | |
168 } | |
153 } | 169 } |
154 throw new WritableStreamInternalError(); | 170 throw new WritableStreamInternalError(message); |
155 } | 171 } |
156 | 172 |
157 class WritableStream { | 173 class WritableStream { |
158 constructor(underlyingSink = {}, { size, highWaterMark = 1 } = {}) { | 174 constructor(underlyingSink = {}, { size, highWaterMark = 1 } = {}) { |
159 this[_state] = WRITABLE; | 175 this[_state] = WRITABLE; |
160 this[_storedError] = undefined; | 176 this[_storedError] = undefined; |
161 this[_writer] = undefined; | 177 this[_writer] = undefined; |
162 this[_writableStreamController] = undefined; | 178 this[_writableStreamController] = undefined; |
179 this[_pendingWriteRequest] = undefined; | |
180 this[_pendingCloseRequest] = undefined; | |
181 this[_pendingAbortRequest] = undefined; | |
163 this[_writeRequests] = new v8.InternalPackedArray(); | 182 this[_writeRequests] = new v8.InternalPackedArray(); |
164 const type = underlyingSink.type; | 183 const type = underlyingSink.type; |
165 if (type !== undefined) { | 184 if (type !== undefined) { |
166 throw new RangeError(streamErrors.invalidType); | 185 throw new RangeError(streamErrors.invalidType); |
167 } | 186 } |
168 this[_writableStreamController] = | 187 this[_writableStreamController] = |
169 new WritableStreamDefaultController(this, underlyingSink, size, | 188 new WritableStreamDefaultController(this, underlyingSink, size, |
170 highWaterMark); | 189 highWaterMark); |
171 } | 190 } |
172 | 191 |
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
216 if (state === CLOSED) { | 235 if (state === CLOSED) { |
217 return Promise_resolve(undefined); | 236 return Promise_resolve(undefined); |
218 } | 237 } |
219 if (state === ERRORED) { | 238 if (state === ERRORED) { |
220 return Promise_reject(stream[_storedError]); | 239 return Promise_reject(stream[_storedError]); |
221 } | 240 } |
222 TEMP_ASSERT(state === WRITABLE || state === CLOSING, | 241 TEMP_ASSERT(state === WRITABLE || state === CLOSING, |
223 'state is "writable" or "closing".'); | 242 'state is "writable" or "closing".'); |
224 const error = new TypeError(errStreamAborted); | 243 const error = new TypeError(errStreamAborted); |
225 WritableStreamError(stream, error); | 244 WritableStreamError(stream, error); |
226 return WritableStreamDefaultControllerAbort( | 245 |
227 stream[_writableStreamController], reason); | 246 const controller = stream[_writableStreamController]; |
247 TEMP_ASSERT(controller !== undefined); | |
Adam Rice
2017/01/05 11:48:24
Please add ", 'controller is not undefined.'"
I r
| |
248 | |
249 const isWriting = getDefaultControllerWritingFlag(controller); | |
250 if (isWriting || getDefaultControllerInCloseFlag(controller)) { | |
251 const promise = v8.createPromise(); | |
252 stream[_pendingAbortRequest] = promise; | |
253 | |
254 if (isWriting) { | |
255 return thenPromise(promise, () => { | |
256 WritableStreamDefaultControllerAbort(controller, reason); | |
Adam Rice
2017/01/05 11:48:24
Needs a return statement.
domenic
2017/01/05 21:51:00
The fact that no tests caught this is worrying. Le
| |
257 }); | |
258 } | |
259 return promise; | |
260 } | |
261 | |
262 return WritableStreamDefaultControllerAbort(controller, reason); | |
228 } | 263 } |
229 | 264 |
230 // Writable Stream Abstract Operations Used by Controllers | 265 // Writable Stream Abstract Operations Used by Controllers |
231 | 266 |
232 function WritableStreamAddWriteRequest(stream) { | 267 function WritableStreamAddWriteRequest(stream) { |
233 TEMP_ASSERT(IsWritableStreamLocked(stream), | 268 TEMP_ASSERT(IsWritableStreamLocked(stream), |
234 '! IsWritableStreamLocked(writer) is true.'); | 269 '! IsWritableStreamLocked(writer) is true.'); |
235 TEMP_ASSERT(stream[_state] === WRITABLE, | 270 TEMP_ASSERT(stream[_state] === WRITABLE, |
236 'stream.[[state]] is "writable".'); | 271 'stream.[[state]] is "writable".'); |
237 const promise = v8.createPromise(); | 272 const promise = v8.createPromise(); |
238 stream[_writeRequests].push(promise); | 273 stream[_writeRequests].push(promise); |
239 return promise; | 274 return promise; |
240 } | 275 } |
241 | 276 |
242 function WritableStreamError(stream, e) { | 277 function WritableStreamError(stream, e) { |
243 const state = stream[_state]; | 278 const oldState = stream[_state]; |
244 TEMP_ASSERT(state === WRITABLE || state === CLOSING, | 279 TEMP_ASSERT(oldState === WRITABLE || oldState === CLOSING, |
245 'state is "writable" or "closing".'); | 280 'oldState is "writable" or "closing".'); |
246 rejectPromises(stream[_writeRequests], e); | 281 |
247 stream[_writeRequests] = new v8.InternalPackedArray(); | 282 stream[_state] = ERRORED; |
283 stream[_storedError] = e; | |
284 | |
285 const controller = stream[_writableStreamController]; | |
286 if (controller === undefined || | |
287 (getDefaultControllerWritingFlag(controller) === false && | |
Adam Rice
2017/01/05 11:48:24
I would prefer just `!getDefaultControllerWritingF
| |
288 getDefaultControllerInCloseFlag(controller) === false)) { | |
289 WritableStreamRejectPromisesInReactionToError(stream); | |
290 } | |
291 | |
248 const writer = stream[_writer]; | 292 const writer = stream[_writer]; |
249 if (writer !== undefined) { | 293 if (writer !== undefined) { |
250 v8.rejectPromise(writer[_closedPromise], e); | 294 if (oldState === WRITABLE && |
251 if (state === WRITABLE && | 295 WritableStreamDefaultControllerGetBackpressure(controller) === true) { |
252 WritableStreamDefaultControllerGetBackpressure( | |
253 stream[_writableStreamController])) { | |
254 v8.rejectPromise(writer[_readyPromise], e); | 296 v8.rejectPromise(writer[_readyPromise], e); |
255 } else { | 297 } else { |
256 writer[_readyPromise] = Promise_reject(e); | 298 writer[_readyPromise] = Promise_reject(e); |
257 } | 299 } |
300 v8.markPromiseAsHandled(writer[_readyPromise]); | |
258 } | 301 } |
259 stream[_state] = ERRORED; | |
260 stream[_storedError] = e; | |
261 } | 302 } |
262 | 303 |
263 function WritableStreamFinishClose(stream) { | 304 function WritableStreamFinishClose(stream) { |
264 TEMP_ASSERT(stream[_state] === CLOSING, | 305 const state = stream[_state]; |
265 'stream.[[state]] is "closing".'); | 306 TEMP_ASSERT(state === CLOSING || state === ERRORED, |
266 TEMP_ASSERT(stream[_writer] !== undefined, | 307 'state is "closing" or "errored".'); |
267 'stream.[[writer]] is not undefined.'); | 308 |
268 stream[_state] = CLOSED; | 309 if (state === CLOSING) { |
269 v8.resolvePromise(stream[_writer][_closedPromise], undefined); | 310 v8.resolvePromise(stream[_writer][_closedPromise], undefined); |
311 stream[_state] = CLOSED; | |
312 } else { | |
313 TEMP_ASSERT(state === ERRORED); | |
Adam Rice
2017/01/05 11:48:24
Message please.
| |
314 v8.rejectPromise(stream[_writer][_closedPromise], stream[_storedError]); | |
315 v8.markPromiseAsHandled(stream[_writer][_closedPromise]); | |
316 } | |
317 | |
318 if (stream[_pendingAbortRequest] !== undefined) { | |
319 v8.resolvePromise(stream[_pendingAbortRequest], undefined); | |
320 stream[_pendingAbortRequest] = undefined; | |
321 } | |
270 } | 322 } |
271 | 323 |
272 function WritableStreamFulfillWriteRequest(stream) { | 324 function WritableStreamRejectPromisesInReactionToError(stream) { |
273 TEMP_ASSERT(stream[_writeRequests].length !== 0, | 325 TEMP_ASSERT(stream[_state] === ERRORED, 'stream.[[state]] is "errored"'); |
274 'stream.[[writeRequests]] is not empty.'); | 326 TEMP_ASSERT(stream[_pendingWriteRequest] === undefined, |
275 const writeRequest = stream[_writeRequests].shift(); | 327 'stream.[[pendingWriteRequest]] is undefined'); |
276 v8.resolvePromise(writeRequest, undefined); | 328 |
329 const storedError = stream[_storedError]; | |
330 rejectPromises(stream[_writeRequests], storedError); | |
331 stream[_writeRequests] = new v8.InternalPackedArray(); | |
332 | |
333 if (stream[_pendingCloseRequest] !== undefined) { | |
334 TEMP_ASSERT( | |
335 getDefaultControllerInCloseFlag(stream[_writableStreamController]) === | |
336 false, 'stream.[[writableStreamController]].[[inClose]] === false'); | |
337 v8.rejectPromise(stream[_pendingCloseRequest], storedError); | |
338 stream[_pendingCloseRequest] = undefined; | |
339 } | |
340 | |
341 const writer = stream[_writer]; | |
342 if (writer !== undefined) { | |
343 v8.rejectPromise(writer[_closedPromise], storedError); | |
344 v8.markPromiseAsHandled(writer[_closedPromise]); | |
345 } | |
277 } | 346 } |
278 | 347 |
279 function WritableStreamUpdateBackpressure(stream, backpressure) { | 348 function WritableStreamUpdateBackpressure(stream, backpressure) { |
280 TEMP_ASSERT(stream[_state] === WRITABLE, | 349 TEMP_ASSERT(stream[_state] === WRITABLE, |
281 'stream.[[state]] is "writable".'); | 350 'stream.[[state]] is "writable".'); |
282 const writer = stream[_writer]; | 351 const writer = stream[_writer]; |
283 if (writer === undefined) { | 352 if (writer === undefined) { |
284 return; | 353 return; |
285 } | 354 } |
286 if (backpressure) { | 355 if (backpressure) { |
(...skipping 17 matching lines...) Expand all Loading... | |
304 stream[_writer] = this; | 373 stream[_writer] = this; |
305 const state = stream[_state]; | 374 const state = stream[_state]; |
306 if (state === WRITABLE || state === CLOSING) { | 375 if (state === WRITABLE || state === CLOSING) { |
307 this[_closedPromise] = v8.createPromise(); | 376 this[_closedPromise] = v8.createPromise(); |
308 } else if (state === CLOSED) { | 377 } else if (state === CLOSED) { |
309 this[_closedPromise] = Promise_resolve(undefined); | 378 this[_closedPromise] = Promise_resolve(undefined); |
310 } else { | 379 } else { |
311 TEMP_ASSERT(state === ERRORED, | 380 TEMP_ASSERT(state === ERRORED, |
312 'state is "errored".'); | 381 'state is "errored".'); |
313 this[_closedPromise] = Promise_reject(stream[_storedError]); | 382 this[_closedPromise] = Promise_reject(stream[_storedError]); |
383 v8.markPromiseAsHandled(this[_closedPromise]); | |
314 } | 384 } |
315 if (state === WRITABLE && | 385 if (state === WRITABLE && |
316 WritableStreamDefaultControllerGetBackpressure( | 386 WritableStreamDefaultControllerGetBackpressure( |
317 stream[_writableStreamController])) { | 387 stream[_writableStreamController])) { |
318 this[_readyPromise] = v8.createPromise(); | 388 this[_readyPromise] = v8.createPromise(); |
319 } else { | 389 } else { |
320 this[_readyPromise] = Promise_resolve(undefined); | 390 this[_readyPromise] = Promise_resolve(undefined); |
321 } | 391 } |
322 } | 392 } |
323 | 393 |
(...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
414 const stream = writer[_ownerWritableStream]; | 484 const stream = writer[_ownerWritableStream]; |
415 TEMP_ASSERT(stream !== undefined, | 485 TEMP_ASSERT(stream !== undefined, |
416 'stream is not undefined.'); | 486 'stream is not undefined.'); |
417 const state = stream[_state]; | 487 const state = stream[_state]; |
418 if (state === CLOSED || state === ERRORED) { | 488 if (state === CLOSED || state === ERRORED) { |
419 return Promise_reject( | 489 return Promise_reject( |
420 createCannotActionOnStateStreamError('close', state)); | 490 createCannotActionOnStateStreamError('close', state)); |
421 } | 491 } |
422 TEMP_ASSERT(state === WRITABLE, | 492 TEMP_ASSERT(state === WRITABLE, |
423 'state is "writable".'); | 493 'state is "writable".'); |
424 const promise = WritableStreamAddWriteRequest(stream); | 494 stream[_pendingCloseRequest] = v8.createPromise(); |
425 if (WritableStreamDefaultControllerGetBackpressure( | 495 if (WritableStreamDefaultControllerGetBackpressure( |
426 stream[_writableStreamController])) { | 496 stream[_writableStreamController])) { |
427 v8.resolvePromise(writer[_readyPromise], undefined); | 497 v8.resolvePromise(writer[_readyPromise], undefined); |
428 } | 498 } |
429 stream[_state] = CLOSING; | 499 stream[_state] = CLOSING; |
430 WritableStreamDefaultControllerClose(stream[_writableStreamController]); | 500 WritableStreamDefaultControllerClose(stream[_writableStreamController]); |
431 return promise; | 501 return stream[_pendingCloseRequest]; |
432 } | 502 } |
433 | 503 |
434 function WritableStreamDefaultWriterGetDesiredSize(writer) { | 504 function WritableStreamDefaultWriterGetDesiredSize(writer) { |
435 const stream = writer[_ownerWritableStream]; | 505 const stream = writer[_ownerWritableStream]; |
436 const state = stream[_state]; | 506 const state = stream[_state]; |
437 if (state === ERRORED) { | 507 if (state === ERRORED) { |
438 return null; | 508 return null; |
439 } | 509 } |
440 if (state === CLOSED) { | 510 if (state === CLOSED) { |
441 return 0; | 511 return 0; |
442 } | 512 } |
443 return WritableStreamDefaultControllerGetDesiredSize( | 513 return WritableStreamDefaultControllerGetDesiredSize( |
444 stream[_writableStreamController]); | 514 stream[_writableStreamController]); |
445 } | 515 } |
446 | 516 |
447 function WritableStreamDefaultWriterRelease(writer) { | 517 function WritableStreamDefaultWriterRelease(writer) { |
448 const stream = writer[_ownerWritableStream]; | 518 const stream = writer[_ownerWritableStream]; |
449 TEMP_ASSERT(stream !== undefined, | 519 TEMP_ASSERT(stream !== undefined, |
450 'stream is not undefined.'); | 520 'stream is not undefined.'); |
451 TEMP_ASSERT(stream[_writer] === writer, | 521 TEMP_ASSERT(stream[_writer] === writer, |
452 'stream.[[writer]] is writer.'); | 522 'stream.[[writer]] is writer.'); |
453 const releasedError = new TypeError(errReleasedWriterClosedPromise); | 523 const releasedError = new TypeError(errReleasedWriterClosedPromise); |
454 const state = stream[_state]; | 524 const state = stream[_state]; |
455 if (state === WRITABLE || state === CLOSING) { | 525 if (state === WRITABLE || state === CLOSING || |
526 stream[_pendingAbortRequest] !== undefined) { | |
456 v8.rejectPromise(writer[_closedPromise], releasedError); | 527 v8.rejectPromise(writer[_closedPromise], releasedError); |
457 } else { | 528 } else { |
458 writer[_closedPromise] = Promise_reject(releasedError); | 529 writer[_closedPromise] = Promise_reject(releasedError); |
459 } | 530 } |
531 v8.markPromiseAsHandled(writer[_closedPromise]); | |
532 | |
460 if (state === WRITABLE && | 533 if (state === WRITABLE && |
461 WritableStreamDefaultControllerGetBackpressure( | 534 WritableStreamDefaultControllerGetBackpressure( |
462 stream[_writableStreamController])) { | 535 stream[_writableStreamController])) { |
463 v8.rejectPromise(writer[_readyPromise], releasedError); | 536 v8.rejectPromise(writer[_readyPromise], releasedError); |
464 } else { | 537 } else { |
465 writer[_readyPromise] = Promise_reject(releasedError); | 538 writer[_readyPromise] = Promise_reject(releasedError); |
466 } | 539 } |
540 v8.markPromiseAsHandled(writer[_readyPromise]); | |
541 | |
467 stream[_writer] = undefined; | 542 stream[_writer] = undefined; |
468 writer[_ownerWritableStream] = undefined; | 543 writer[_ownerWritableStream] = undefined; |
469 } | 544 } |
470 | 545 |
471 function WritableStreamDefaultWriterWrite(writer, chunk) { | 546 function WritableStreamDefaultWriterWrite(writer, chunk) { |
472 const stream = writer[_ownerWritableStream]; | 547 const stream = writer[_ownerWritableStream]; |
473 TEMP_ASSERT(stream !== undefined, | 548 TEMP_ASSERT(stream !== undefined, |
474 'stream is not undefined.'); | 549 'stream is not undefined.'); |
475 const state = stream[_state]; | 550 const state = stream[_state]; |
476 if (state === CLOSED || state === ERRORED) { | 551 if (state === CLOSED || state === ERRORED) { |
(...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
553 function WritableStreamDefaultControllerGetDesiredSize(controller) { | 628 function WritableStreamDefaultControllerGetDesiredSize(controller) { |
554 const queueSize = GetTotalQueueSizeForController(controller); | 629 const queueSize = GetTotalQueueSizeForController(controller); |
555 return controller[_strategyHWM] - queueSize; | 630 return controller[_strategyHWM] - queueSize; |
556 } | 631 } |
557 | 632 |
558 function WritableStreamDefaultControllerWrite(controller, chunk) { | 633 function WritableStreamDefaultControllerWrite(controller, chunk) { |
559 const stream = controller[_controlledWritableStream]; | 634 const stream = controller[_controlledWritableStream]; |
560 TEMP_ASSERT(stream[_state] === WRITABLE, | 635 TEMP_ASSERT(stream[_state] === WRITABLE, |
561 'stream.[[state]] is "writable".'); | 636 'stream.[[state]] is "writable".'); |
562 let chunkSize = 1; | 637 let chunkSize = 1; |
563 if (controller[_strategySize] !== undefined) { | 638 const strategySize = controller[_strategySize]; |
639 if (strategySize !== undefined) { | |
564 try { | 640 try { |
565 chunkSize = Function_call(controller[_strategySize], undefined, chunk); | 641 chunkSize = strategySize(chunk); |
566 } catch (e) { | 642 } catch (e) { |
567 WritableStreamDefaultControllerErrorIfNeeded(controller, e); | 643 WritableStreamDefaultControllerErrorIfNeeded(controller, e); |
568 return Promise_reject(e); | 644 return; |
569 } | 645 } |
570 } | 646 } |
571 const writeRecord = {chunk}; | 647 const writeRecord = {chunk}; |
572 const lastBackpressure = | 648 const lastBackpressure = |
573 WritableStreamDefaultControllerGetBackpressure(controller); | 649 WritableStreamDefaultControllerGetBackpressure(controller); |
574 try { | 650 try { |
575 const enqueueResult = | 651 EnqueueValueWithSizeForController(controller, writeRecord, chunkSize); |
576 EnqueueValueWithSizeForController(controller, writeRecord, chunkSize); | |
577 } catch (e) { | 652 } catch (e) { |
578 WritableStreamDefaultControllerErrorIfNeeded(controller, e); | 653 WritableStreamDefaultControllerErrorIfNeeded(controller, e); |
579 return Promise_reject(e); | 654 return; |
580 } | 655 } |
581 if (stream[_state] === WRITABLE) { | 656 if (stream[_state] === WRITABLE) { |
582 const backpressure = | 657 const backpressure = |
583 WritableStreamDefaultControllerGetBackpressure(controller); | 658 WritableStreamDefaultControllerGetBackpressure(controller); |
584 if (lastBackpressure !== backpressure) { | 659 if (lastBackpressure !== backpressure) { |
585 WritableStreamUpdateBackpressure(stream, backpressure); | 660 WritableStreamUpdateBackpressure(stream, backpressure); |
586 } | 661 } |
587 } | 662 } |
588 WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller); | 663 WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller); |
589 } | 664 } |
(...skipping 28 matching lines...) Expand all Loading... | |
618 } | 693 } |
619 } | 694 } |
620 | 695 |
621 function WritableStreamDefaultControllerProcessClose(controller) { | 696 function WritableStreamDefaultControllerProcessClose(controller) { |
622 const stream = controller[_controlledWritableStream]; | 697 const stream = controller[_controlledWritableStream]; |
623 TEMP_ASSERT(stream[_state] === CLOSING, | 698 TEMP_ASSERT(stream[_state] === CLOSING, |
624 'stream.[[state]] is "closing".'); | 699 'stream.[[state]] is "closing".'); |
625 DequeueValueForController(controller); | 700 DequeueValueForController(controller); |
626 TEMP_ASSERT(controller[_queue].length === 0, | 701 TEMP_ASSERT(controller[_queue].length === 0, |
627 'controller.[[queue]] is empty.'); | 702 'controller.[[queue]] is empty.'); |
703 setDefaultControllerInCloseFlag(controller, true); | |
628 const sinkClosePromise = PromiseInvokeOrNoop(controller[_underlyingSink], | 704 const sinkClosePromise = PromiseInvokeOrNoop(controller[_underlyingSink], |
629 'close', [controller]); | 705 'close', [controller]); |
630 thenPromise(sinkClosePromise, | 706 thenPromise(sinkClosePromise, |
631 () => { | 707 () => { |
632 if (stream[_state] !== CLOSING) { | 708 TEMP_ASSERT(getDefaultControllerInCloseFlag(controller) |
709 === true, | |
710 'controller.[[inClose]] is true'); | |
711 setDefaultControllerInCloseFlag(controller, false); | |
712 | |
713 if (stream[_state] !== CLOSING && | |
714 stream[_state] !== ERRORED) { | |
633 return; | 715 return; |
634 } | 716 } |
635 WritableStreamFulfillWriteRequest(stream); | 717 |
718 TEMP_ASSERT(stream[_pendingCloseRequest] !== undefined); | |
719 v8.resolvePromise(stream[_pendingCloseRequest], undefined); | |
720 stream[_pendingCloseRequest] = undefined; | |
721 | |
636 WritableStreamFinishClose(stream); | 722 WritableStreamFinishClose(stream); |
637 }, | 723 }, |
638 r => WritableStreamDefaultControllerErrorIfNeeded(controller, r) | 724 r => { |
725 TEMP_ASSERT(getDefaultControllerInCloseFlag(controller) | |
726 === true, | |
727 'controller.[[inClose]] is true'); | |
728 setDefaultControllerInCloseFlag(controller, false); | |
729 | |
730 TEMP_ASSERT(stream[_pendingCloseRequest] !== undefined); | |
731 v8.rejectPromise(stream[_pendingCloseRequest], r); | |
732 stream[_pendingCloseRequest] = undefined; | |
733 | |
734 if (stream[_pendingAbortRequest] !== undefined) { | |
735 v8.rejectPromise(stream[_pendingAbortRequest]); | |
Adam Rice
2017/01/05 11:48:24
I think we need to supply r as the second argument
domenic
2017/01/05 21:51:00
More test coverage gaps!
| |
736 stream[_pendingAbortRequest] = undefined; | |
737 } | |
738 | |
739 WritableStreamDefaultControllerErrorIfNeeded(controller, r); | |
740 } | |
639 ); | 741 ); |
640 } | 742 } |
641 | 743 |
642 function WritableStreamDefaultControllerProcessWrite(controller, chunk) { | 744 function WritableStreamDefaultControllerProcessWrite(controller, chunk) { |
643 setDefaultControllerWritingFlag(controller, true); | 745 setDefaultControllerWritingFlag(controller, true); |
746 | |
747 const stream = controller[_controlledWritableStream]; | |
748 | |
749 TEMP_ASSERT(stream[_pendingWriteRequest] === undefined, | |
750 'stream.[[pendingWriteRequest]] is undefined'); | |
751 TEMP_ASSERT(stream[_writeRequests].length > 0, | |
752 'stream.[[writeRequests]] is not empty'); | |
753 stream[_pendingWriteRequest] = stream[_writeRequests].shift(); | |
754 | |
644 const sinkWritePromise = PromiseInvokeOrNoop(controller[_underlyingSink], | 755 const sinkWritePromise = PromiseInvokeOrNoop(controller[_underlyingSink], |
645 'write', [chunk, controller]); | 756 'write', [chunk, controller]); |
646 thenPromise( | 757 thenPromise( |
647 sinkWritePromise, | 758 sinkWritePromise, |
648 () => { | 759 () => { |
649 const stream = controller[_controlledWritableStream]; | 760 TEMP_ASSERT(getDefaultControllerWritingFlag(controller) === true, |
761 'controller.[[writing]] is true'); | |
762 setDefaultControllerWritingFlag(controller, false); | |
763 | |
764 TEMP_ASSERT(stream[_pendingWriteRequest] !== undefined, | |
765 'stream.[[pendingWriteRequest]] is not undefined'); | |
766 v8.resolvePromise(stream[_pendingWriteRequest], undefined); | |
767 stream[_pendingWriteRequest] = undefined; | |
768 | |
650 const state = stream[_state]; | 769 const state = stream[_state]; |
651 if (state === ERRORED || state === CLOSED) { | 770 if (state === ERRORED) { |
771 WritableStreamRejectPromisesInReactionToError(stream); | |
772 | |
773 if (stream[_pendingAbortRequest] !== undefined) { | |
774 v8.resolvePromise(stream[_pendingAbortRequest], undefined); | |
775 stream[_pendingAbortRequest] = undefined; | |
776 } | |
652 return; | 777 return; |
653 } | 778 } |
654 setDefaultControllerWritingFlag(controller, false); | 779 |
655 WritableStreamFulfillWriteRequest(stream); | |
656 const lastBackpressure = | 780 const lastBackpressure = |
657 WritableStreamDefaultControllerGetBackpressure(controller); | 781 WritableStreamDefaultControllerGetBackpressure(controller); |
658 DequeueValueForController(controller); | 782 DequeueValueForController(controller); |
783 | |
659 if (state !== CLOSING) { | 784 if (state !== CLOSING) { |
660 const backpressure = | 785 const backpressure = |
661 WritableStreamDefaultControllerGetBackpressure(controller); | 786 WritableStreamDefaultControllerGetBackpressure(controller); |
662 if (lastBackpressure !== backpressure) { | 787 if (lastBackpressure !== backpressure) { |
663 WritableStreamUpdateBackpressure( | 788 WritableStreamUpdateBackpressure( |
664 controller[_controlledWritableStream], backpressure); | 789 controller[_controlledWritableStream], backpressure); |
665 } | 790 } |
666 } | 791 } |
792 | |
667 WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller); | 793 WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller); |
668 }, | 794 }, |
669 r => WritableStreamDefaultControllerErrorIfNeeded(controller, r) | 795 r => { |
670 ); | 796 TEMP_ASSERT(getDefaultControllerWritingFlag(controller) === true, |
797 'controller.[[writing]] is true'); | |
798 setDefaultControllerWritingFlag(controller, false); | |
799 | |
800 TEMP_ASSERT(stream[_pendingWriteRequest] !== undefined, | |
801 'stream.[[pendingWriteRequest]] is not undefined'); | |
802 v8.rejectPromise(stream[_pendingWriteRequest], r); | |
803 stream[_pendingWriteRequest] = undefined; | |
804 | |
805 if (stream[_state] === ERRORED) { | |
806 stream[_storedError] = r; | |
807 WritableStreamRejectPromisesInReactionToError(stream); | |
808 } | |
809 | |
810 if (stream[_pendingAbortRequest] !== undefined) { | |
811 v8.rejectPromise(stream[_pendingAbortRequest], r); | |
812 stream[_pendingAbortRequest] = undefined; | |
813 } | |
814 | |
815 WritableStreamDefaultControllerErrorIfNeeded(controller, r); | |
816 } | |
817 ); | |
671 } | 818 } |
672 | 819 |
673 function WritableStreamDefaultControllerGetBackpressure(controller) { | 820 function WritableStreamDefaultControllerGetBackpressure(controller) { |
674 const desiredSize = | 821 const desiredSize = |
675 WritableStreamDefaultControllerGetDesiredSize(controller); | 822 WritableStreamDefaultControllerGetDesiredSize(controller); |
676 return desiredSize <= 0; | 823 return desiredSize <= 0; |
677 } | 824 } |
678 | 825 |
679 function WritableStreamDefaultControllerError(controller, e) { | 826 function WritableStreamDefaultControllerError(controller, e) { |
680 const stream = controller[_controlledWritableStream]; | 827 const stream = controller[_controlledWritableStream]; |
(...skipping 117 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
798 | 945 |
799 defineProperty(global, 'WritableStream', { | 946 defineProperty(global, 'WritableStream', { |
800 value: WritableStream, | 947 value: WritableStream, |
801 enumerable: false, | 948 enumerable: false, |
802 configurable: true, | 949 configurable: true, |
803 writable: true | 950 writable: true |
804 }); | 951 }); |
805 | 952 |
806 // TODO(ricea): Exports to Blink | 953 // TODO(ricea): Exports to Blink |
807 }); | 954 }); |
OLD | NEW |