Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(503)

Side by Side Diff: third_party/WebKit/Source/core/streams/WritableStream.js

Issue 2752133003: Use SimpleQueue in WritableStream implementation (Closed)
Patch Set: Remove stale comments and correct copyright date Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « third_party/WebKit/Source/core/streams/SimpleQueue.js ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 // Implementation of WritableStream for Blink. See 5 // Implementation of WritableStream for Blink. See
6 // https://streams.spec.whatwg.org/#ws. The implementation closely follows the 6 // https://streams.spec.whatwg.org/#ws. The implementation closely follows the
7 // standard, except where required for performance or integration with Blink. In 7 // standard, except where required for performance or integration with Blink. In
8 // particular, classes, methods and abstract operations are implemented in the 8 // particular, classes, methods and abstract operations are implemented in the
9 // same order as in the standard, to simplify side-by-side reading. 9 // same order as in the standard, to simplify side-by-side reading.
10 10
(...skipping 123 matching lines...) Expand 10 before | Expand all | Expand 10 after
134 } 134 }
135 135
136 function getDefaultControllerInCloseFlag(controller) { 136 function getDefaultControllerInCloseFlag(controller) {
137 return Boolean(controller[_defaultControllerFlags] & FLAG_INCLOSE); 137 return Boolean(controller[_defaultControllerFlags] & FLAG_INCLOSE);
138 } 138 }
139 139
140 function setDefaultControllerInCloseFlag(controller, value) { 140 function setDefaultControllerInCloseFlag(controller, value) {
141 setDefaultControllerFlag(controller, FLAG_INCLOSE, value); 141 setDefaultControllerFlag(controller, FLAG_INCLOSE, value);
142 } 142 }
143 143
144 function rejectPromises(array, e) { 144 function rejectPromises(queue, e) {
145 // array is an InternalPackedArray so forEach won't work. 145 queue.forEach(promise => v8.rejectPromise(promise, e));
146 for (let i = 0; i < array.length; ++i) {
147 v8.rejectPromise(array[i], e);
148 }
149 } 146 }
150 147
151 // https://tc39.github.io/ecma262/#sec-ispropertykey 148 // https://tc39.github.io/ecma262/#sec-ispropertykey
152 // TODO(ricea): Remove this when the asserts using it are removed. 149 // TODO(ricea): Remove this when the asserts using it are removed.
153 function IsPropertyKey(argument) { 150 function IsPropertyKey(argument) {
154 return typeof argument === 'string' || typeof argument === 'symbol'; 151 return typeof argument === 'string' || typeof argument === 'symbol';
155 } 152 }
156 153
157 // TODO(ricea): Remove all asserts once the implementation has stabilised. 154 // TODO(ricea): Remove all asserts once the implementation has stabilised.
158 function TEMP_ASSERT(predicate, message) { 155 function TEMP_ASSERT(predicate, message) {
(...skipping 12 matching lines...) Expand all
171 168
172 class WritableStream { 169 class WritableStream {
173 constructor(underlyingSink = {}, { size, highWaterMark = 1 } = {}) { 170 constructor(underlyingSink = {}, { size, highWaterMark = 1 } = {}) {
174 this[_state] = WRITABLE; 171 this[_state] = WRITABLE;
175 this[_storedError] = undefined; 172 this[_storedError] = undefined;
176 this[_writer] = undefined; 173 this[_writer] = undefined;
177 this[_writableStreamController] = undefined; 174 this[_writableStreamController] = undefined;
178 this[_pendingWriteRequest] = undefined; 175 this[_pendingWriteRequest] = undefined;
179 this[_pendingCloseRequest] = undefined; 176 this[_pendingCloseRequest] = undefined;
180 this[_pendingAbortRequest] = undefined; 177 this[_pendingAbortRequest] = undefined;
181 this[_writeRequests] = new v8.InternalPackedArray(); 178 this[_writeRequests] = new binding.SimpleQueue();
182 const type = underlyingSink.type; 179 const type = underlyingSink.type;
183 if (type !== undefined) { 180 if (type !== undefined) {
184 throw new RangeError(streamErrors.invalidType); 181 throw new RangeError(streamErrors.invalidType);
185 } 182 }
186 this[_writableStreamController] = 183 this[_writableStreamController] =
187 new WritableStreamDefaultController(this, underlyingSink, size, 184 new WritableStreamDefaultController(this, underlyingSink, size,
188 highWaterMark); 185 highWaterMark);
189 } 186 }
190 187
191 get locked() { 188 get locked() {
(...skipping 132 matching lines...) Expand 10 before | Expand all | Expand 10 after
324 } 321 }
325 } 322 }
326 323
327 function WritableStreamRejectPromisesInReactionToError(stream) { 324 function WritableStreamRejectPromisesInReactionToError(stream) {
328 TEMP_ASSERT(stream[_state] === ERRORED, 'stream.[[state]] is "errored"'); 325 TEMP_ASSERT(stream[_state] === ERRORED, 'stream.[[state]] is "errored"');
329 TEMP_ASSERT(stream[_pendingWriteRequest] === undefined, 326 TEMP_ASSERT(stream[_pendingWriteRequest] === undefined,
330 'stream.[[pendingWriteRequest]] is undefined'); 327 'stream.[[pendingWriteRequest]] is undefined');
331 328
332 const storedError = stream[_storedError]; 329 const storedError = stream[_storedError];
333 rejectPromises(stream[_writeRequests], storedError); 330 rejectPromises(stream[_writeRequests], storedError);
334 stream[_writeRequests] = new v8.InternalPackedArray(); 331 stream[_writeRequests] = new binding.SimpleQueue();
335 332
336 if (stream[_pendingCloseRequest] !== undefined) { 333 if (stream[_pendingCloseRequest] !== undefined) {
337 TEMP_ASSERT( 334 TEMP_ASSERT(
338 getDefaultControllerInCloseFlag(stream[_writableStreamController]) === 335 getDefaultControllerInCloseFlag(stream[_writableStreamController]) ===
339 false, 'stream.[[writableStreamController]].[[inClose]] === false'); 336 false, 'stream.[[writableStreamController]].[[inClose]] === false');
340 v8.rejectPromise(stream[_pendingCloseRequest], storedError); 337 v8.rejectPromise(stream[_pendingCloseRequest], storedError);
341 stream[_pendingCloseRequest] = undefined; 338 stream[_pendingCloseRequest] = undefined;
342 } 339 }
343 340
344 const writer = stream[_writer]; 341 const writer = stream[_writer];
(...skipping 271 matching lines...) Expand 10 before | Expand all | Expand 10 after
616 class WritableStreamDefaultController { 613 class WritableStreamDefaultController {
617 constructor(stream, underlyingSink, size, highWaterMark) { 614 constructor(stream, underlyingSink, size, highWaterMark) {
618 if (!IsWritableStream(stream)) { 615 if (!IsWritableStream(stream)) {
619 throw new TypeError(streamErrors.illegalConstructor); 616 throw new TypeError(streamErrors.illegalConstructor);
620 } 617 }
621 if (stream[_writableStreamController] !== undefined) { 618 if (stream[_writableStreamController] !== undefined) {
622 throw new TypeError(streamErrors.illegalConstructor); 619 throw new TypeError(streamErrors.illegalConstructor);
623 } 620 }
624 this[_controlledWritableStream] = stream; 621 this[_controlledWritableStream] = stream;
625 this[_underlyingSink] = underlyingSink; 622 this[_underlyingSink] = underlyingSink;
626 this[_queue] = new v8.InternalPackedArray(); 623 this[_queue] = new binding.SimpleQueue();
627 this[_queueSize] = 0; 624 this[_queueSize] = 0;
628 this[_defaultControllerFlags] = 0; 625 this[_defaultControllerFlags] = 0;
629 const normalizedStrategy = 626 const normalizedStrategy =
630 ValidateAndNormalizeQueuingStrategy(size, highWaterMark); 627 ValidateAndNormalizeQueuingStrategy(size, highWaterMark);
631 this[_strategySize] = normalizedStrategy.size; 628 this[_strategySize] = normalizedStrategy.size;
632 this[_strategyHWM] = normalizedStrategy.highWaterMark; 629 this[_strategyHWM] = normalizedStrategy.highWaterMark;
633 const backpressure = WritableStreamDefaultControllerGetBackpressure(this); 630 const backpressure = WritableStreamDefaultControllerGetBackpressure(this);
634 if (backpressure) { 631 if (backpressure) {
635 WritableStreamUpdateBackpressure(stream, backpressure); 632 WritableStreamUpdateBackpressure(stream, backpressure);
636 } 633 }
(...skipping 21 matching lines...) Expand all
658 } 655 }
659 } 656 }
660 657
661 // Writable Stream Default Controller Abstract Operations 658 // Writable Stream Default Controller Abstract Operations
662 659
663 function IsWritableStreamDefaultController(x) { 660 function IsWritableStreamDefaultController(x) {
664 return hasOwnProperty(x, _underlyingSink); 661 return hasOwnProperty(x, _underlyingSink);
665 } 662 }
666 663
667 function WritableStreamDefaultControllerAbort(controller, reason) { 664 function WritableStreamDefaultControllerAbort(controller, reason) {
668 controller[_queue] = v8.InternalPackedArray(); 665 controller[_queue] = new binding.SimpleQueue();
669 controller[_queueSize] = 0; 666 controller[_queueSize] = 0;
670 const sinkAbortPromise = 667 const sinkAbortPromise =
671 PromiseInvokeOrNoop(controller[_underlyingSink], 'abort', [reason]); 668 PromiseInvokeOrNoop(controller[_underlyingSink], 'abort', [reason]);
672 return thenPromise(sinkAbortPromise, () => undefined); 669 return thenPromise(sinkAbortPromise, () => undefined);
673 } 670 }
674 671
675 function WritableStreamDefaultControllerClose(controller) { 672 function WritableStreamDefaultControllerClose(controller) {
676 EnqueueValueWithSizeForController(controller, 'close', 0); 673 EnqueueValueWithSizeForController(controller, 'close', 0);
677 WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller); 674 WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
678 } 675 }
(...skipping 119 matching lines...) Expand 10 before | Expand all | Expand 10 after
798 795
799 const stream = controller[_controlledWritableStream]; 796 const stream = controller[_controlledWritableStream];
800 797
801 TEMP_ASSERT(stream[_pendingWriteRequest] === undefined, 798 TEMP_ASSERT(stream[_pendingWriteRequest] === undefined,
802 'stream.[[pendingWriteRequest]] is undefined'); 799 'stream.[[pendingWriteRequest]] is undefined');
803 TEMP_ASSERT(stream[_writeRequests].length > 0, 800 TEMP_ASSERT(stream[_writeRequests].length > 0,
804 'stream.[[writeRequests]] is not empty'); 801 'stream.[[writeRequests]] is not empty');
805 stream[_pendingWriteRequest] = stream[_writeRequests].shift(); 802 stream[_pendingWriteRequest] = stream[_writeRequests].shift();
806 803
807 const sinkWritePromise = PromiseInvokeOrNoop(controller[_underlyingSink], 804 const sinkWritePromise = PromiseInvokeOrNoop(controller[_underlyingSink],
808 'write', [chunk, controller]); 805 'write', [chunk, controller]);
809 thenPromise( 806 thenPromise(
810 sinkWritePromise, 807 sinkWritePromise,
811 () => { 808 () => {
812 TEMP_ASSERT(getDefaultControllerWritingFlag(controller) === true, 809 TEMP_ASSERT(getDefaultControllerWritingFlag(controller) === true,
813 'controller.[[writing]] is true'); 810 'controller.[[writing]] is true');
814 setDefaultControllerWritingFlag(controller, false); 811 setDefaultControllerWritingFlag(controller, false);
815 812
816 TEMP_ASSERT(stream[_pendingWriteRequest] !== undefined, 813 TEMP_ASSERT(stream[_pendingWriteRequest] !== undefined,
817 'stream.[[pendingWriteRequest]] is not undefined'); 814 'stream.[[pendingWriteRequest]] is not undefined');
818 v8.resolvePromise(stream[_pendingWriteRequest], undefined); 815 v8.resolvePromise(stream[_pendingWriteRequest], undefined);
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after
874 WritableStreamDefaultControllerGetDesiredSize(controller); 871 WritableStreamDefaultControllerGetDesiredSize(controller);
875 return desiredSize <= 0; 872 return desiredSize <= 0;
876 } 873 }
877 874
878 function WritableStreamDefaultControllerError(controller, e) { 875 function WritableStreamDefaultControllerError(controller, e) {
879 const stream = controller[_controlledWritableStream]; 876 const stream = controller[_controlledWritableStream];
880 const state = stream[_state]; 877 const state = stream[_state];
881 TEMP_ASSERT(state === WRITABLE || state === CLOSING, 878 TEMP_ASSERT(state === WRITABLE || state === CLOSING,
882 'stream.[[state]] is "writable" or "closing".'); 879 'stream.[[state]] is "writable" or "closing".');
883 WritableStreamError(stream, e); 880 WritableStreamError(stream, e);
884 controller[_queue] = new v8.InternalPackedArray(); 881 controller[_queue] = new binding.SimpleQueue();
885 controller[_queueSize] = 0; 882 controller[_queueSize] = 0;
886 } 883 }
887 884
888 // Queue-with-Sizes Operations 885 // Queue-with-Sizes Operations
889 // 886 //
890 // These differ from the versions in the standard: they take a controller
891 // argument in order to cache the total queue size. This is necessary to avoid
892 // O(N^2) behaviour.
893 //
894 // TODO(ricea): Share these operations with ReadableStream.js. 887 // TODO(ricea): Share these operations with ReadableStream.js.
895 function DequeueValueForController(controller) { 888 function DequeueValueForController(controller) {
896 TEMP_ASSERT(controller[_queue].length !== 0, 889 TEMP_ASSERT(controller[_queue].length !== 0,
897 'queue is not empty.'); 890 'queue is not empty.');
898 const result = controller[_queue].shift(); 891 const result = controller[_queue].shift();
899 controller[_queueSize] -= result.size; 892 controller[_queueSize] -= result.size;
900 return result.value; 893 return result.value;
901 } 894 }
902 895
903 function EnqueueValueWithSizeForController(controller, value, size) { 896 function EnqueueValueWithSizeForController(controller, value, size) {
904 size = Number(size); 897 size = Number(size);
905 if (!IsFiniteNonNegativeNumber(size)) { 898 if (!IsFiniteNonNegativeNumber(size)) {
906 throw new RangeError(streamErrors.invalidSize); 899 throw new RangeError(streamErrors.invalidSize);
907 } 900 }
908 901
909 controller[_queueSize] += size; 902 controller[_queueSize] += size;
910 controller[_queue].push({value, size}); 903 controller[_queue].push({value, size});
911 } 904 }
912 905
913 function GetTotalQueueSizeForController(controller) { 906 function GetTotalQueueSizeForController(controller) {
914 return controller[_queueSize]; 907 return controller[_queueSize];
915 } 908 }
916 909
917 function PeekQueueValue(queue) { 910 function PeekQueueValue(queue) {
918 TEMP_ASSERT(queue.length !== 0, 911 TEMP_ASSERT(queue.length !== 0,
919 'queue is not empty.'); 912 'queue is not empty.');
920 return queue[0].value; 913 return queue.peek().value;
921 } 914 }
922 915
923 // Miscellaneous Operations 916 // Miscellaneous Operations
924 917
925 // This differs from "CallOrNoop" in the ReadableStream implementation in 918 // This differs from "CallOrNoop" in the ReadableStream implementation in
926 // that it takes the arguments as an array, so that multiple arguments can be 919 // that it takes the arguments as an array, so that multiple arguments can be
927 // passed. 920 // passed.
928 // 921 //
929 // TODO(ricea): Consolidate with ReadableStream implementation. 922 // TODO(ricea): Consolidate with ReadableStream implementation.
930 function InvokeOrNoop(O, P, args) { 923 function InvokeOrNoop(O, P, args) {
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after
999 getWritableStreamDefaultWriterClosedPromise; 992 getWritableStreamDefaultWriterClosedPromise;
1000 binding.WritableStreamDefaultWriterGetDesiredSize = 993 binding.WritableStreamDefaultWriterGetDesiredSize =
1001 WritableStreamDefaultWriterGetDesiredSize; 994 WritableStreamDefaultWriterGetDesiredSize;
1002 binding.getWritableStreamDefaultWriterReadyPromise = 995 binding.getWritableStreamDefaultWriterReadyPromise =
1003 getWritableStreamDefaultWriterReadyPromise; 996 getWritableStreamDefaultWriterReadyPromise;
1004 binding.WritableStreamDefaultWriterRelease = 997 binding.WritableStreamDefaultWriterRelease =
1005 WritableStreamDefaultWriterRelease; 998 WritableStreamDefaultWriterRelease;
1006 binding.WritableStreamDefaultWriterWrite = WritableStreamDefaultWriterWrite; 999 binding.WritableStreamDefaultWriterWrite = WritableStreamDefaultWriterWrite;
1007 binding.getWritableStreamStoredError = getWritableStreamStoredError; 1000 binding.getWritableStreamStoredError = getWritableStreamStoredError;
1008 }); 1001 });
OLDNEW
« no previous file with comments | « third_party/WebKit/Source/core/streams/SimpleQueue.js ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698