| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 // Implementation of WritableStream for Blink. See | 5 // Implementation of WritableStream for Blink. See |
| 6 // https://streams.spec.whatwg.org/#ws. The implementation closely follows the | 6 // https://streams.spec.whatwg.org/#ws. The implementation closely follows the |
| 7 // standard, except where required for performance or integration with Blink. In | 7 // standard, except where required for performance or integration with Blink. In |
| 8 // particular, classes, methods and abstract operations are implemented in the | 8 // particular, classes, methods and abstract operations are implemented in the |
| 9 // same order as in the standard, to simplify side-by-side reading. | 9 // same order as in the standard, to simplify side-by-side reading. |
| 10 | 10 |
| (...skipping 123 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 134 } | 134 } |
| 135 | 135 |
| 136 function getDefaultControllerInCloseFlag(controller) { | 136 function getDefaultControllerInCloseFlag(controller) { |
| 137 return Boolean(controller[_defaultControllerFlags] & FLAG_INCLOSE); | 137 return Boolean(controller[_defaultControllerFlags] & FLAG_INCLOSE); |
| 138 } | 138 } |
| 139 | 139 |
| 140 function setDefaultControllerInCloseFlag(controller, value) { | 140 function setDefaultControllerInCloseFlag(controller, value) { |
| 141 setDefaultControllerFlag(controller, FLAG_INCLOSE, value); | 141 setDefaultControllerFlag(controller, FLAG_INCLOSE, value); |
| 142 } | 142 } |
| 143 | 143 |
| 144 function rejectPromises(array, e) { | 144 function rejectPromises(queue, e) { |
| 145 // array is an InternalPackedArray so forEach won't work. | 145 queue.forEach(promise => v8.rejectPromise(promise, e)); |
| 146 for (let i = 0; i < array.length; ++i) { | |
| 147 v8.rejectPromise(array[i], e); | |
| 148 } | |
| 149 } | 146 } |
| 150 | 147 |
| 151 // https://tc39.github.io/ecma262/#sec-ispropertykey | 148 // https://tc39.github.io/ecma262/#sec-ispropertykey |
| 152 // TODO(ricea): Remove this when the asserts using it are removed. | 149 // TODO(ricea): Remove this when the asserts using it are removed. |
| 153 function IsPropertyKey(argument) { | 150 function IsPropertyKey(argument) { |
| 154 return typeof argument === 'string' || typeof argument === 'symbol'; | 151 return typeof argument === 'string' || typeof argument === 'symbol'; |
| 155 } | 152 } |
| 156 | 153 |
| 157 // TODO(ricea): Remove all asserts once the implementation has stabilised. | 154 // TODO(ricea): Remove all asserts once the implementation has stabilised. |
| 158 function TEMP_ASSERT(predicate, message) { | 155 function TEMP_ASSERT(predicate, message) { |
| (...skipping 12 matching lines...) Expand all Loading... |
| 171 | 168 |
| 172 class WritableStream { | 169 class WritableStream { |
| 173 constructor(underlyingSink = {}, { size, highWaterMark = 1 } = {}) { | 170 constructor(underlyingSink = {}, { size, highWaterMark = 1 } = {}) { |
| 174 this[_state] = WRITABLE; | 171 this[_state] = WRITABLE; |
| 175 this[_storedError] = undefined; | 172 this[_storedError] = undefined; |
| 176 this[_writer] = undefined; | 173 this[_writer] = undefined; |
| 177 this[_writableStreamController] = undefined; | 174 this[_writableStreamController] = undefined; |
| 178 this[_pendingWriteRequest] = undefined; | 175 this[_pendingWriteRequest] = undefined; |
| 179 this[_pendingCloseRequest] = undefined; | 176 this[_pendingCloseRequest] = undefined; |
| 180 this[_pendingAbortRequest] = undefined; | 177 this[_pendingAbortRequest] = undefined; |
| 181 this[_writeRequests] = new v8.InternalPackedArray(); | 178 this[_writeRequests] = new binding.SimpleQueue(); |
| 182 const type = underlyingSink.type; | 179 const type = underlyingSink.type; |
| 183 if (type !== undefined) { | 180 if (type !== undefined) { |
| 184 throw new RangeError(streamErrors.invalidType); | 181 throw new RangeError(streamErrors.invalidType); |
| 185 } | 182 } |
| 186 this[_writableStreamController] = | 183 this[_writableStreamController] = |
| 187 new WritableStreamDefaultController(this, underlyingSink, size, | 184 new WritableStreamDefaultController(this, underlyingSink, size, |
| 188 highWaterMark); | 185 highWaterMark); |
| 189 } | 186 } |
| 190 | 187 |
| 191 get locked() { | 188 get locked() { |
| (...skipping 132 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 324 } | 321 } |
| 325 } | 322 } |
| 326 | 323 |
| 327 function WritableStreamRejectPromisesInReactionToError(stream) { | 324 function WritableStreamRejectPromisesInReactionToError(stream) { |
| 328 TEMP_ASSERT(stream[_state] === ERRORED, 'stream.[[state]] is "errored"'); | 325 TEMP_ASSERT(stream[_state] === ERRORED, 'stream.[[state]] is "errored"'); |
| 329 TEMP_ASSERT(stream[_pendingWriteRequest] === undefined, | 326 TEMP_ASSERT(stream[_pendingWriteRequest] === undefined, |
| 330 'stream.[[pendingWriteRequest]] is undefined'); | 327 'stream.[[pendingWriteRequest]] is undefined'); |
| 331 | 328 |
| 332 const storedError = stream[_storedError]; | 329 const storedError = stream[_storedError]; |
| 333 rejectPromises(stream[_writeRequests], storedError); | 330 rejectPromises(stream[_writeRequests], storedError); |
| 334 stream[_writeRequests] = new v8.InternalPackedArray(); | 331 stream[_writeRequests] = new binding.SimpleQueue(); |
| 335 | 332 |
| 336 if (stream[_pendingCloseRequest] !== undefined) { | 333 if (stream[_pendingCloseRequest] !== undefined) { |
| 337 TEMP_ASSERT( | 334 TEMP_ASSERT( |
| 338 getDefaultControllerInCloseFlag(stream[_writableStreamController]) === | 335 getDefaultControllerInCloseFlag(stream[_writableStreamController]) === |
| 339 false, 'stream.[[writableStreamController]].[[inClose]] === false'); | 336 false, 'stream.[[writableStreamController]].[[inClose]] === false'); |
| 340 v8.rejectPromise(stream[_pendingCloseRequest], storedError); | 337 v8.rejectPromise(stream[_pendingCloseRequest], storedError); |
| 341 stream[_pendingCloseRequest] = undefined; | 338 stream[_pendingCloseRequest] = undefined; |
| 342 } | 339 } |
| 343 | 340 |
| 344 const writer = stream[_writer]; | 341 const writer = stream[_writer]; |
| (...skipping 271 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 616 class WritableStreamDefaultController { | 613 class WritableStreamDefaultController { |
| 617 constructor(stream, underlyingSink, size, highWaterMark) { | 614 constructor(stream, underlyingSink, size, highWaterMark) { |
| 618 if (!IsWritableStream(stream)) { | 615 if (!IsWritableStream(stream)) { |
| 619 throw new TypeError(streamErrors.illegalConstructor); | 616 throw new TypeError(streamErrors.illegalConstructor); |
| 620 } | 617 } |
| 621 if (stream[_writableStreamController] !== undefined) { | 618 if (stream[_writableStreamController] !== undefined) { |
| 622 throw new TypeError(streamErrors.illegalConstructor); | 619 throw new TypeError(streamErrors.illegalConstructor); |
| 623 } | 620 } |
| 624 this[_controlledWritableStream] = stream; | 621 this[_controlledWritableStream] = stream; |
| 625 this[_underlyingSink] = underlyingSink; | 622 this[_underlyingSink] = underlyingSink; |
| 626 this[_queue] = new v8.InternalPackedArray(); | 623 this[_queue] = new binding.SimpleQueue(); |
| 627 this[_queueSize] = 0; | 624 this[_queueSize] = 0; |
| 628 this[_defaultControllerFlags] = 0; | 625 this[_defaultControllerFlags] = 0; |
| 629 const normalizedStrategy = | 626 const normalizedStrategy = |
| 630 ValidateAndNormalizeQueuingStrategy(size, highWaterMark); | 627 ValidateAndNormalizeQueuingStrategy(size, highWaterMark); |
| 631 this[_strategySize] = normalizedStrategy.size; | 628 this[_strategySize] = normalizedStrategy.size; |
| 632 this[_strategyHWM] = normalizedStrategy.highWaterMark; | 629 this[_strategyHWM] = normalizedStrategy.highWaterMark; |
| 633 const backpressure = WritableStreamDefaultControllerGetBackpressure(this); | 630 const backpressure = WritableStreamDefaultControllerGetBackpressure(this); |
| 634 if (backpressure) { | 631 if (backpressure) { |
| 635 WritableStreamUpdateBackpressure(stream, backpressure); | 632 WritableStreamUpdateBackpressure(stream, backpressure); |
| 636 } | 633 } |
| (...skipping 21 matching lines...) Expand all Loading... |
| 658 } | 655 } |
| 659 } | 656 } |
| 660 | 657 |
| 661 // Writable Stream Default Controller Abstract Operations | 658 // Writable Stream Default Controller Abstract Operations |
| 662 | 659 |
| 663 function IsWritableStreamDefaultController(x) { | 660 function IsWritableStreamDefaultController(x) { |
| 664 return hasOwnProperty(x, _underlyingSink); | 661 return hasOwnProperty(x, _underlyingSink); |
| 665 } | 662 } |
| 666 | 663 |
| 667 function WritableStreamDefaultControllerAbort(controller, reason) { | 664 function WritableStreamDefaultControllerAbort(controller, reason) { |
| 668 controller[_queue] = v8.InternalPackedArray(); | 665 controller[_queue] = new binding.SimpleQueue(); |
| 669 controller[_queueSize] = 0; | 666 controller[_queueSize] = 0; |
| 670 const sinkAbortPromise = | 667 const sinkAbortPromise = |
| 671 PromiseInvokeOrNoop(controller[_underlyingSink], 'abort', [reason]); | 668 PromiseInvokeOrNoop(controller[_underlyingSink], 'abort', [reason]); |
| 672 return thenPromise(sinkAbortPromise, () => undefined); | 669 return thenPromise(sinkAbortPromise, () => undefined); |
| 673 } | 670 } |
| 674 | 671 |
| 675 function WritableStreamDefaultControllerClose(controller) { | 672 function WritableStreamDefaultControllerClose(controller) { |
| 676 EnqueueValueWithSizeForController(controller, 'close', 0); | 673 EnqueueValueWithSizeForController(controller, 'close', 0); |
| 677 WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller); | 674 WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller); |
| 678 } | 675 } |
| (...skipping 119 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 798 | 795 |
| 799 const stream = controller[_controlledWritableStream]; | 796 const stream = controller[_controlledWritableStream]; |
| 800 | 797 |
| 801 TEMP_ASSERT(stream[_pendingWriteRequest] === undefined, | 798 TEMP_ASSERT(stream[_pendingWriteRequest] === undefined, |
| 802 'stream.[[pendingWriteRequest]] is undefined'); | 799 'stream.[[pendingWriteRequest]] is undefined'); |
| 803 TEMP_ASSERT(stream[_writeRequests].length > 0, | 800 TEMP_ASSERT(stream[_writeRequests].length > 0, |
| 804 'stream.[[writeRequests]] is not empty'); | 801 'stream.[[writeRequests]] is not empty'); |
| 805 stream[_pendingWriteRequest] = stream[_writeRequests].shift(); | 802 stream[_pendingWriteRequest] = stream[_writeRequests].shift(); |
| 806 | 803 |
| 807 const sinkWritePromise = PromiseInvokeOrNoop(controller[_underlyingSink], | 804 const sinkWritePromise = PromiseInvokeOrNoop(controller[_underlyingSink], |
| 808 'write', [chunk, controller]); | 805 'write', [chunk, controller]); |
| 809 thenPromise( | 806 thenPromise( |
| 810 sinkWritePromise, | 807 sinkWritePromise, |
| 811 () => { | 808 () => { |
| 812 TEMP_ASSERT(getDefaultControllerWritingFlag(controller) === true, | 809 TEMP_ASSERT(getDefaultControllerWritingFlag(controller) === true, |
| 813 'controller.[[writing]] is true'); | 810 'controller.[[writing]] is true'); |
| 814 setDefaultControllerWritingFlag(controller, false); | 811 setDefaultControllerWritingFlag(controller, false); |
| 815 | 812 |
| 816 TEMP_ASSERT(stream[_pendingWriteRequest] !== undefined, | 813 TEMP_ASSERT(stream[_pendingWriteRequest] !== undefined, |
| 817 'stream.[[pendingWriteRequest]] is not undefined'); | 814 'stream.[[pendingWriteRequest]] is not undefined'); |
| 818 v8.resolvePromise(stream[_pendingWriteRequest], undefined); | 815 v8.resolvePromise(stream[_pendingWriteRequest], undefined); |
| (...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 874 WritableStreamDefaultControllerGetDesiredSize(controller); | 871 WritableStreamDefaultControllerGetDesiredSize(controller); |
| 875 return desiredSize <= 0; | 872 return desiredSize <= 0; |
| 876 } | 873 } |
| 877 | 874 |
| 878 function WritableStreamDefaultControllerError(controller, e) { | 875 function WritableStreamDefaultControllerError(controller, e) { |
| 879 const stream = controller[_controlledWritableStream]; | 876 const stream = controller[_controlledWritableStream]; |
| 880 const state = stream[_state]; | 877 const state = stream[_state]; |
| 881 TEMP_ASSERT(state === WRITABLE || state === CLOSING, | 878 TEMP_ASSERT(state === WRITABLE || state === CLOSING, |
| 882 'stream.[[state]] is "writable" or "closing".'); | 879 'stream.[[state]] is "writable" or "closing".'); |
| 883 WritableStreamError(stream, e); | 880 WritableStreamError(stream, e); |
| 884 controller[_queue] = new v8.InternalPackedArray(); | 881 controller[_queue] = new binding.SimpleQueue(); |
| 885 controller[_queueSize] = 0; | 882 controller[_queueSize] = 0; |
| 886 } | 883 } |
| 887 | 884 |
| 888 // Queue-with-Sizes Operations | 885 // Queue-with-Sizes Operations |
| 889 // | 886 // |
| 890 // These differ from the versions in the standard: they take a controller | |
| 891 // argument in order to cache the total queue size. This is necessary to avoid | |
| 892 // O(N^2) behaviour. | |
| 893 // | |
| 894 // TODO(ricea): Share these operations with ReadableStream.js. | 887 // TODO(ricea): Share these operations with ReadableStream.js. |
| 895 function DequeueValueForController(controller) { | 888 function DequeueValueForController(controller) { |
| 896 TEMP_ASSERT(controller[_queue].length !== 0, | 889 TEMP_ASSERT(controller[_queue].length !== 0, |
| 897 'queue is not empty.'); | 890 'queue is not empty.'); |
| 898 const result = controller[_queue].shift(); | 891 const result = controller[_queue].shift(); |
| 899 controller[_queueSize] -= result.size; | 892 controller[_queueSize] -= result.size; |
| 900 return result.value; | 893 return result.value; |
| 901 } | 894 } |
| 902 | 895 |
| 903 function EnqueueValueWithSizeForController(controller, value, size) { | 896 function EnqueueValueWithSizeForController(controller, value, size) { |
| 904 size = Number(size); | 897 size = Number(size); |
| 905 if (!IsFiniteNonNegativeNumber(size)) { | 898 if (!IsFiniteNonNegativeNumber(size)) { |
| 906 throw new RangeError(streamErrors.invalidSize); | 899 throw new RangeError(streamErrors.invalidSize); |
| 907 } | 900 } |
| 908 | 901 |
| 909 controller[_queueSize] += size; | 902 controller[_queueSize] += size; |
| 910 controller[_queue].push({value, size}); | 903 controller[_queue].push({value, size}); |
| 911 } | 904 } |
| 912 | 905 |
| 913 function GetTotalQueueSizeForController(controller) { | 906 function GetTotalQueueSizeForController(controller) { |
| 914 return controller[_queueSize]; | 907 return controller[_queueSize]; |
| 915 } | 908 } |
| 916 | 909 |
| 917 function PeekQueueValue(queue) { | 910 function PeekQueueValue(queue) { |
| 918 TEMP_ASSERT(queue.length !== 0, | 911 TEMP_ASSERT(queue.length !== 0, |
| 919 'queue is not empty.'); | 912 'queue is not empty.'); |
| 920 return queue[0].value; | 913 return queue.peek().value; |
| 921 } | 914 } |
| 922 | 915 |
| 923 // Miscellaneous Operations | 916 // Miscellaneous Operations |
| 924 | 917 |
| 925 // This differs from "CallOrNoop" in the ReadableStream implementation in | 918 // This differs from "CallOrNoop" in the ReadableStream implementation in |
| 926 // that it takes the arguments as an array, so that multiple arguments can be | 919 // that it takes the arguments as an array, so that multiple arguments can be |
| 927 // passed. | 920 // passed. |
| 928 // | 921 // |
| 929 // TODO(ricea): Consolidate with ReadableStream implementation. | 922 // TODO(ricea): Consolidate with ReadableStream implementation. |
| 930 function InvokeOrNoop(O, P, args) { | 923 function InvokeOrNoop(O, P, args) { |
| (...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 999 getWritableStreamDefaultWriterClosedPromise; | 992 getWritableStreamDefaultWriterClosedPromise; |
| 1000 binding.WritableStreamDefaultWriterGetDesiredSize = | 993 binding.WritableStreamDefaultWriterGetDesiredSize = |
| 1001 WritableStreamDefaultWriterGetDesiredSize; | 994 WritableStreamDefaultWriterGetDesiredSize; |
| 1002 binding.getWritableStreamDefaultWriterReadyPromise = | 995 binding.getWritableStreamDefaultWriterReadyPromise = |
| 1003 getWritableStreamDefaultWriterReadyPromise; | 996 getWritableStreamDefaultWriterReadyPromise; |
| 1004 binding.WritableStreamDefaultWriterRelease = | 997 binding.WritableStreamDefaultWriterRelease = |
| 1005 WritableStreamDefaultWriterRelease; | 998 WritableStreamDefaultWriterRelease; |
| 1006 binding.WritableStreamDefaultWriterWrite = WritableStreamDefaultWriterWrite; | 999 binding.WritableStreamDefaultWriterWrite = WritableStreamDefaultWriterWrite; |
| 1007 binding.getWritableStreamStoredError = getWritableStreamStoredError; | 1000 binding.getWritableStreamStoredError = getWritableStreamStoredError; |
| 1008 }); | 1001 }); |
| OLD | NEW |