OLD | NEW |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 // Implementation of WritableStream for Blink. See | 5 // Implementation of WritableStream for Blink. See |
6 // https://streams.spec.whatwg.org/#ws. The implementation closely follows the | 6 // https://streams.spec.whatwg.org/#ws. The implementation closely follows the |
7 // standard, except where required for performance or integration with Blink. In | 7 // standard, except where required for performance or integration with Blink. In |
8 // particular, classes, methods and abstract operations are implemented in the | 8 // particular, classes, methods and abstract operations are implemented in the |
9 // same order as in the standard, to simplify side-by-side reading. | 9 // same order as in the standard, to simplify side-by-side reading. |
10 | 10 |
(...skipping 123 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
134 } | 134 } |
135 | 135 |
136 function getDefaultControllerInCloseFlag(controller) { | 136 function getDefaultControllerInCloseFlag(controller) { |
137 return Boolean(controller[_defaultControllerFlags] & FLAG_INCLOSE); | 137 return Boolean(controller[_defaultControllerFlags] & FLAG_INCLOSE); |
138 } | 138 } |
139 | 139 |
140 function setDefaultControllerInCloseFlag(controller, value) { | 140 function setDefaultControllerInCloseFlag(controller, value) { |
141 setDefaultControllerFlag(controller, FLAG_INCLOSE, value); | 141 setDefaultControllerFlag(controller, FLAG_INCLOSE, value); |
142 } | 142 } |
143 | 143 |
144 function rejectPromises(array, e) { | 144 function rejectPromises(queue, e) { |
145 // array is an InternalPackedArray so forEach won't work. | 145 queue.forEach(promise => v8.rejectPromise(promise, e)); |
146 for (let i = 0; i < array.length; ++i) { | |
147 v8.rejectPromise(array[i], e); | |
148 } | |
149 } | 146 } |
150 | 147 |
151 // https://tc39.github.io/ecma262/#sec-ispropertykey | 148 // https://tc39.github.io/ecma262/#sec-ispropertykey |
152 // TODO(ricea): Remove this when the asserts using it are removed. | 149 // TODO(ricea): Remove this when the asserts using it are removed. |
153 function IsPropertyKey(argument) { | 150 function IsPropertyKey(argument) { |
154 return typeof argument === 'string' || typeof argument === 'symbol'; | 151 return typeof argument === 'string' || typeof argument === 'symbol'; |
155 } | 152 } |
156 | 153 |
157 // TODO(ricea): Remove all asserts once the implementation has stabilised. | 154 // TODO(ricea): Remove all asserts once the implementation has stabilised. |
158 function TEMP_ASSERT(predicate, message) { | 155 function TEMP_ASSERT(predicate, message) { |
(...skipping 12 matching lines...) Expand all Loading... |
171 | 168 |
172 class WritableStream { | 169 class WritableStream { |
173 constructor(underlyingSink = {}, { size, highWaterMark = 1 } = {}) { | 170 constructor(underlyingSink = {}, { size, highWaterMark = 1 } = {}) { |
174 this[_state] = WRITABLE; | 171 this[_state] = WRITABLE; |
175 this[_storedError] = undefined; | 172 this[_storedError] = undefined; |
176 this[_writer] = undefined; | 173 this[_writer] = undefined; |
177 this[_writableStreamController] = undefined; | 174 this[_writableStreamController] = undefined; |
178 this[_pendingWriteRequest] = undefined; | 175 this[_pendingWriteRequest] = undefined; |
179 this[_pendingCloseRequest] = undefined; | 176 this[_pendingCloseRequest] = undefined; |
180 this[_pendingAbortRequest] = undefined; | 177 this[_pendingAbortRequest] = undefined; |
181 this[_writeRequests] = new v8.InternalPackedArray(); | 178 this[_writeRequests] = new binding.SimpleQueue(); |
182 const type = underlyingSink.type; | 179 const type = underlyingSink.type; |
183 if (type !== undefined) { | 180 if (type !== undefined) { |
184 throw new RangeError(streamErrors.invalidType); | 181 throw new RangeError(streamErrors.invalidType); |
185 } | 182 } |
186 this[_writableStreamController] = | 183 this[_writableStreamController] = |
187 new WritableStreamDefaultController(this, underlyingSink, size, | 184 new WritableStreamDefaultController(this, underlyingSink, size, |
188 highWaterMark); | 185 highWaterMark); |
189 } | 186 } |
190 | 187 |
191 get locked() { | 188 get locked() { |
(...skipping 132 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
324 } | 321 } |
325 } | 322 } |
326 | 323 |
327 function WritableStreamRejectPromisesInReactionToError(stream) { | 324 function WritableStreamRejectPromisesInReactionToError(stream) { |
328 TEMP_ASSERT(stream[_state] === ERRORED, 'stream.[[state]] is "errored"'); | 325 TEMP_ASSERT(stream[_state] === ERRORED, 'stream.[[state]] is "errored"'); |
329 TEMP_ASSERT(stream[_pendingWriteRequest] === undefined, | 326 TEMP_ASSERT(stream[_pendingWriteRequest] === undefined, |
330 'stream.[[pendingWriteRequest]] is undefined'); | 327 'stream.[[pendingWriteRequest]] is undefined'); |
331 | 328 |
332 const storedError = stream[_storedError]; | 329 const storedError = stream[_storedError]; |
333 rejectPromises(stream[_writeRequests], storedError); | 330 rejectPromises(stream[_writeRequests], storedError); |
334 stream[_writeRequests] = new v8.InternalPackedArray(); | 331 stream[_writeRequests] = new binding.SimpleQueue(); |
335 | 332 |
336 if (stream[_pendingCloseRequest] !== undefined) { | 333 if (stream[_pendingCloseRequest] !== undefined) { |
337 TEMP_ASSERT( | 334 TEMP_ASSERT( |
338 getDefaultControllerInCloseFlag(stream[_writableStreamController]) === | 335 getDefaultControllerInCloseFlag(stream[_writableStreamController]) === |
339 false, 'stream.[[writableStreamController]].[[inClose]] === false'); | 336 false, 'stream.[[writableStreamController]].[[inClose]] === false'); |
340 v8.rejectPromise(stream[_pendingCloseRequest], storedError); | 337 v8.rejectPromise(stream[_pendingCloseRequest], storedError); |
341 stream[_pendingCloseRequest] = undefined; | 338 stream[_pendingCloseRequest] = undefined; |
342 } | 339 } |
343 | 340 |
344 const writer = stream[_writer]; | 341 const writer = stream[_writer]; |
(...skipping 271 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
616 class WritableStreamDefaultController { | 613 class WritableStreamDefaultController { |
617 constructor(stream, underlyingSink, size, highWaterMark) { | 614 constructor(stream, underlyingSink, size, highWaterMark) { |
618 if (!IsWritableStream(stream)) { | 615 if (!IsWritableStream(stream)) { |
619 throw new TypeError(streamErrors.illegalConstructor); | 616 throw new TypeError(streamErrors.illegalConstructor); |
620 } | 617 } |
621 if (stream[_writableStreamController] !== undefined) { | 618 if (stream[_writableStreamController] !== undefined) { |
622 throw new TypeError(streamErrors.illegalConstructor); | 619 throw new TypeError(streamErrors.illegalConstructor); |
623 } | 620 } |
624 this[_controlledWritableStream] = stream; | 621 this[_controlledWritableStream] = stream; |
625 this[_underlyingSink] = underlyingSink; | 622 this[_underlyingSink] = underlyingSink; |
626 this[_queue] = new v8.InternalPackedArray(); | 623 this[_queue] = new binding.SimpleQueue(); |
627 this[_queueSize] = 0; | 624 this[_queueSize] = 0; |
628 this[_defaultControllerFlags] = 0; | 625 this[_defaultControllerFlags] = 0; |
629 const normalizedStrategy = | 626 const normalizedStrategy = |
630 ValidateAndNormalizeQueuingStrategy(size, highWaterMark); | 627 ValidateAndNormalizeQueuingStrategy(size, highWaterMark); |
631 this[_strategySize] = normalizedStrategy.size; | 628 this[_strategySize] = normalizedStrategy.size; |
632 this[_strategyHWM] = normalizedStrategy.highWaterMark; | 629 this[_strategyHWM] = normalizedStrategy.highWaterMark; |
633 const backpressure = WritableStreamDefaultControllerGetBackpressure(this); | 630 const backpressure = WritableStreamDefaultControllerGetBackpressure(this); |
634 if (backpressure) { | 631 if (backpressure) { |
635 WritableStreamUpdateBackpressure(stream, backpressure); | 632 WritableStreamUpdateBackpressure(stream, backpressure); |
636 } | 633 } |
(...skipping 21 matching lines...) Expand all Loading... |
658 } | 655 } |
659 } | 656 } |
660 | 657 |
661 // Writable Stream Default Controller Abstract Operations | 658 // Writable Stream Default Controller Abstract Operations |
662 | 659 |
663 function IsWritableStreamDefaultController(x) { | 660 function IsWritableStreamDefaultController(x) { |
664 return hasOwnProperty(x, _underlyingSink); | 661 return hasOwnProperty(x, _underlyingSink); |
665 } | 662 } |
666 | 663 |
667 function WritableStreamDefaultControllerAbort(controller, reason) { | 664 function WritableStreamDefaultControllerAbort(controller, reason) { |
668 controller[_queue] = v8.InternalPackedArray(); | 665 controller[_queue] = new binding.SimpleQueue(); |
669 controller[_queueSize] = 0; | 666 controller[_queueSize] = 0; |
670 const sinkAbortPromise = | 667 const sinkAbortPromise = |
671 PromiseInvokeOrNoop(controller[_underlyingSink], 'abort', [reason]); | 668 PromiseInvokeOrNoop(controller[_underlyingSink], 'abort', [reason]); |
672 return thenPromise(sinkAbortPromise, () => undefined); | 669 return thenPromise(sinkAbortPromise, () => undefined); |
673 } | 670 } |
674 | 671 |
675 function WritableStreamDefaultControllerClose(controller) { | 672 function WritableStreamDefaultControllerClose(controller) { |
676 EnqueueValueWithSizeForController(controller, 'close', 0); | 673 EnqueueValueWithSizeForController(controller, 'close', 0); |
677 WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller); | 674 WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller); |
678 } | 675 } |
(...skipping 119 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
798 | 795 |
799 const stream = controller[_controlledWritableStream]; | 796 const stream = controller[_controlledWritableStream]; |
800 | 797 |
801 TEMP_ASSERT(stream[_pendingWriteRequest] === undefined, | 798 TEMP_ASSERT(stream[_pendingWriteRequest] === undefined, |
802 'stream.[[pendingWriteRequest]] is undefined'); | 799 'stream.[[pendingWriteRequest]] is undefined'); |
803 TEMP_ASSERT(stream[_writeRequests].length > 0, | 800 TEMP_ASSERT(stream[_writeRequests].length > 0, |
804 'stream.[[writeRequests]] is not empty'); | 801 'stream.[[writeRequests]] is not empty'); |
805 stream[_pendingWriteRequest] = stream[_writeRequests].shift(); | 802 stream[_pendingWriteRequest] = stream[_writeRequests].shift(); |
806 | 803 |
807 const sinkWritePromise = PromiseInvokeOrNoop(controller[_underlyingSink], | 804 const sinkWritePromise = PromiseInvokeOrNoop(controller[_underlyingSink], |
808 'write', [chunk, controller]); | 805 'write', [chunk, controller]); |
809 thenPromise( | 806 thenPromise( |
810 sinkWritePromise, | 807 sinkWritePromise, |
811 () => { | 808 () => { |
812 TEMP_ASSERT(getDefaultControllerWritingFlag(controller) === true, | 809 TEMP_ASSERT(getDefaultControllerWritingFlag(controller) === true, |
813 'controller.[[writing]] is true'); | 810 'controller.[[writing]] is true'); |
814 setDefaultControllerWritingFlag(controller, false); | 811 setDefaultControllerWritingFlag(controller, false); |
815 | 812 |
816 TEMP_ASSERT(stream[_pendingWriteRequest] !== undefined, | 813 TEMP_ASSERT(stream[_pendingWriteRequest] !== undefined, |
817 'stream.[[pendingWriteRequest]] is not undefined'); | 814 'stream.[[pendingWriteRequest]] is not undefined'); |
818 v8.resolvePromise(stream[_pendingWriteRequest], undefined); | 815 v8.resolvePromise(stream[_pendingWriteRequest], undefined); |
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
874 WritableStreamDefaultControllerGetDesiredSize(controller); | 871 WritableStreamDefaultControllerGetDesiredSize(controller); |
875 return desiredSize <= 0; | 872 return desiredSize <= 0; |
876 } | 873 } |
877 | 874 |
878 function WritableStreamDefaultControllerError(controller, e) { | 875 function WritableStreamDefaultControllerError(controller, e) { |
879 const stream = controller[_controlledWritableStream]; | 876 const stream = controller[_controlledWritableStream]; |
880 const state = stream[_state]; | 877 const state = stream[_state]; |
881 TEMP_ASSERT(state === WRITABLE || state === CLOSING, | 878 TEMP_ASSERT(state === WRITABLE || state === CLOSING, |
882 'stream.[[state]] is "writable" or "closing".'); | 879 'stream.[[state]] is "writable" or "closing".'); |
883 WritableStreamError(stream, e); | 880 WritableStreamError(stream, e); |
884 controller[_queue] = new v8.InternalPackedArray(); | 881 controller[_queue] = new binding.SimpleQueue(); |
885 controller[_queueSize] = 0; | 882 controller[_queueSize] = 0; |
886 } | 883 } |
887 | 884 |
888 // Queue-with-Sizes Operations | 885 // Queue-with-Sizes Operations |
889 // | 886 // |
890 // These differ from the versions in the standard: they take a controller | 887 // These differ from the versions in the standard: they take a controller |
891 // argument in order to cache the total queue size. This is necessary to avoid | 888 // argument in order to cache the total queue size. This is necessary to avoid |
892 // O(N^2) behaviour. | 889 // O(N^2) behaviour. |
893 // | 890 // |
894 // TODO(ricea): Share these operations with ReadableStream.js. | 891 // TODO(ricea): Share these operations with ReadableStream.js. |
(...skipping 15 matching lines...) Expand all Loading... |
910 controller[_queue].push({value, size}); | 907 controller[_queue].push({value, size}); |
911 } | 908 } |
912 | 909 |
913 function GetTotalQueueSizeForController(controller) { | 910 function GetTotalQueueSizeForController(controller) { |
914 return controller[_queueSize]; | 911 return controller[_queueSize]; |
915 } | 912 } |
916 | 913 |
917 function PeekQueueValue(queue) { | 914 function PeekQueueValue(queue) { |
918 TEMP_ASSERT(queue.length !== 0, | 915 TEMP_ASSERT(queue.length !== 0, |
919 'queue is not empty.'); | 916 'queue is not empty.'); |
920 return queue[0].value; | 917 return queue.peek().value; |
921 } | 918 } |
922 | 919 |
923 // Miscellaneous Operations | 920 // Miscellaneous Operations |
924 | 921 |
925 // This differs from "CallOrNoop" in the ReadableStream implementation in | 922 // This differs from "CallOrNoop" in the ReadableStream implementation in |
926 // that it takes the arguments as an array, so that multiple arguments can be | 923 // that it takes the arguments as an array, so that multiple arguments can be |
927 // passed. | 924 // passed. |
928 // | 925 // |
929 // TODO(ricea): Consolidate with ReadableStream implementation. | 926 // TODO(ricea): Consolidate with ReadableStream implementation. |
930 function InvokeOrNoop(O, P, args) { | 927 function InvokeOrNoop(O, P, args) { |
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
999 getWritableStreamDefaultWriterClosedPromise; | 996 getWritableStreamDefaultWriterClosedPromise; |
1000 binding.WritableStreamDefaultWriterGetDesiredSize = | 997 binding.WritableStreamDefaultWriterGetDesiredSize = |
1001 WritableStreamDefaultWriterGetDesiredSize; | 998 WritableStreamDefaultWriterGetDesiredSize; |
1002 binding.getWritableStreamDefaultWriterReadyPromise = | 999 binding.getWritableStreamDefaultWriterReadyPromise = |
1003 getWritableStreamDefaultWriterReadyPromise; | 1000 getWritableStreamDefaultWriterReadyPromise; |
1004 binding.WritableStreamDefaultWriterRelease = | 1001 binding.WritableStreamDefaultWriterRelease = |
1005 WritableStreamDefaultWriterRelease; | 1002 WritableStreamDefaultWriterRelease; |
1006 binding.WritableStreamDefaultWriterWrite = WritableStreamDefaultWriterWrite; | 1003 binding.WritableStreamDefaultWriterWrite = WritableStreamDefaultWriterWrite; |
1007 binding.getWritableStreamStoredError = getWritableStreamStoredError; | 1004 binding.getWritableStreamStoredError = getWritableStreamStoredError; |
1008 }); | 1005 }); |
OLD | NEW |