Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(432)

Side by Side Diff: third_party/WebKit/Source/core/streams/ReadableStream.js

Issue 2637863002: Fix ReadableStream scalability issue (Closed)
Patch Set: Created 3 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 (function(global, binding, v8) { 5 (function(global, binding, v8) {
6 'use strict'; 6 'use strict';
7 7
8 const _reader = v8.createPrivateSymbol('[[reader]]'); 8 const _reader = v8.createPrivateSymbol('[[reader]]');
9 const _storedError = v8.createPrivateSymbol('[[storedError]]'); 9 const _storedError = v8.createPrivateSymbol('[[storedError]]');
10 const _controller = v8.createPrivateSymbol('[[controller]]'); 10 const _controller = v8.createPrivateSymbol('[[controller]]');
(...skipping 177 matching lines...) Expand 10 before | Expand all | Expand 10 after
188 } 188 }
189 189
190 if (stream[_controller] !== undefined) { 190 if (stream[_controller] !== undefined) {
191 throw new TypeError(streamErrors.illegalConstructor); 191 throw new TypeError(streamErrors.illegalConstructor);
192 } 192 }
193 193
194 this[_controlledReadableStream] = stream; 194 this[_controlledReadableStream] = stream;
195 195
196 this[_underlyingSource] = underlyingSource; 196 this[_underlyingSource] = underlyingSource;
197 197
198 this[_queue] = new v8.InternalPackedArray(); 198 this[_queue] = new Queue();
199 this[_totalQueuedSize] = 0; 199 this[_totalQueuedSize] = 0;
200 200
201 this[_readableStreamDefaultControllerBits] = 0b0; 201 this[_readableStreamDefaultControllerBits] = 0b0;
202 if (isExternallyControlled === true) { 202 if (isExternallyControlled === true) {
203 this[_readableStreamDefaultControllerBits] |= EXTERNALLY_CONTROLLED; 203 this[_readableStreamDefaultControllerBits] |= EXTERNALLY_CONTROLLED;
204 } 204 }
205 205
206 const normalizedStrategy = 206 const normalizedStrategy =
207 ValidateAndNormalizeQueuingStrategy(size, highWaterMark); 207 ValidateAndNormalizeQueuingStrategy(size, highWaterMark);
208 this[_strategySize] = normalizedStrategy.size; 208 this[_strategySize] = normalizedStrategy.size;
(...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after
289 } 289 }
290 if (state === STATE_CLOSED) { 290 if (state === STATE_CLOSED) {
291 throw new TypeError(errErrorClosedStream); 291 throw new TypeError(errErrorClosedStream);
292 } 292 }
293 293
294 return ReadableStreamDefaultControllerError(this, e); 294 return ReadableStreamDefaultControllerError(this, e);
295 } 295 }
296 } 296 }
297 297
298 function ReadableStreamDefaultControllerCancel(controller, reason) { 298 function ReadableStreamDefaultControllerCancel(controller, reason) {
299 controller[_queue] = new v8.InternalPackedArray(); 299 controller[_queue] = new Queue();
300 300
301 const underlyingSource = controller[_underlyingSource]; 301 const underlyingSource = controller[_underlyingSource];
302 return PromiseCallOrNoop(underlyingSource, 'cancel', reason, 'underlyingSour ce.cancel'); 302 return PromiseCallOrNoop(underlyingSource, 'cancel', reason, 'underlyingSour ce.cancel');
303 } 303 }
304 304
305 function ReadableStreamDefaultControllerPull(controller) { 305 function ReadableStreamDefaultControllerPull(controller) {
306 const stream = controller[_controlledReadableStream]; 306 const stream = controller[_controlledReadableStream];
307 307
308 if (controller[_queue].length > 0) { 308 if (controller[_queue].length > 0) {
309 const chunk = DequeueValue(controller); 309 const chunk = DequeueValue(controller);
(...skipping 166 matching lines...) Expand 10 before | Expand all | Expand 10 after
476 function ReadableStreamGetState(stream) { 476 function ReadableStreamGetState(stream) {
477 return (stream[_readableStreamBits] & STATE_MASK) >> STATE_BITS_OFFSET; 477 return (stream[_readableStreamBits] & STATE_MASK) >> STATE_BITS_OFFSET;
478 } 478 }
479 479
480 function ReadableStreamSetState(stream, state) { 480 function ReadableStreamSetState(stream, state) {
481 stream[_readableStreamBits] = (stream[_readableStreamBits] & ~STATE_MASK) | 481 stream[_readableStreamBits] = (stream[_readableStreamBits] & ~STATE_MASK) |
482 (state << STATE_BITS_OFFSET); 482 (state << STATE_BITS_OFFSET);
483 } 483 }
484 484
485 function ReadableStreamDefaultControllerError(controller, e) { 485 function ReadableStreamDefaultControllerError(controller, e) {
486 controller[_queue] = new v8.InternalPackedArray(); 486 controller[_queue] = new Queue();
487 const stream = controller[_controlledReadableStream]; 487 const stream = controller[_controlledReadableStream];
488 ReadableStreamError(stream, e); 488 ReadableStreamError(stream, e);
489 } 489 }
490 490
491 function ReadableStreamError(stream, e) { 491 function ReadableStreamError(stream, e) {
492 stream[_storedError] = e; 492 stream[_storedError] = e;
493 ReadableStreamSetState(stream, STATE_ERRORED); 493 ReadableStreamSetState(stream, STATE_ERRORED);
494 494
495 const reader = stream[_reader]; 495 const reader = stream[_reader];
496 if (reader === undefined) { 496 if (reader === undefined) {
(...skipping 292 matching lines...) Expand 10 before | Expand all | Expand 10 after
789 return promise; 789 return promise;
790 } 790 }
791 } 791 }
792 792
793 // 793 //
794 // Queue-with-sizes 794 // Queue-with-sizes
795 // Modified from taking the queue (as in the spec) to taking the stream, so we 795 // Modified from taking the queue (as in the spec) to taking the stream, so we
796 // can modify the queue size alongside. 796 // can modify the queue size alongside.
797 // 797 //
798 798
799 // Simple queue structure. Avoids scalability issues with using
800 // InternalPackedArray directly by using multiple arrays
801 // in a linked list and keeping the size of each below 32768 elements.
802 class Queue {
803 constructor() {
804 this.front = {
805 elements: new v8.InternalPackedArray(),
806 next: undefined,
807 };
808 this.back = this.front;
809 this.size = 0;
810 }
811
812 get length() {
yhirano 2017/01/17 08:23:37 How about having |empty| predicate?
Adam Rice 2017/01/17 08:33:39 I am hoping to get feedback from someone more know
813 return this.size;
814 }
815
816 push(element) {
817 ++this.size;
818 if (this.back.elements.length === 32767) {
819 const oldBack = this.back;
820 this.back = {
821 elements: new v8.InternalPackedArray(),
822 next: undefined,
823 };
824 oldBack.next = this.back;
825 }
826 this.back.elements.push(element);
827 }
828
829 shift() {
830 --this.size;
831 const element = this.front.elements.shift();
832 if (this.front.elements.length === 0 && this.front.next !== undefined) {
833 this.front = this.front.next;
834 }
835 return element;
836 }
837 }
838
799 function DequeueValue(controller) { 839 function DequeueValue(controller) {
800 const result = controller[_queue].shift(); 840 const result = controller[_queue].shift();
801 controller[_totalQueuedSize] -= result.size; 841 controller[_totalQueuedSize] -= result.size;
802 return result.value; 842 return result.value;
803 } 843 }
804 844
805 function EnqueueValueWithSize(controller, value, size) { 845 function EnqueueValueWithSize(controller, value, size) {
806 size = Number(size); 846 size = Number(size);
807 if (Number_isNaN(size) || size === +Infinity || size < 0) { 847 if (Number_isNaN(size) || size === +Infinity || size < 0) {
808 throw new RangeError(streamErrors.invalidSize); 848 throw new RangeError(streamErrors.invalidSize);
(...skipping 96 matching lines...) Expand 10 before | Expand all | Expand 10 after
905 binding.ReadableStreamDefaultControllerGetDesiredSize = ReadableStreamDefaultC ontrollerGetDesiredSize; 945 binding.ReadableStreamDefaultControllerGetDesiredSize = ReadableStreamDefaultC ontrollerGetDesiredSize;
906 binding.ReadableStreamDefaultControllerEnqueue = ReadableStreamDefaultControll erEnqueue; 946 binding.ReadableStreamDefaultControllerEnqueue = ReadableStreamDefaultControll erEnqueue;
907 binding.ReadableStreamDefaultControllerError = ReadableStreamDefaultController Error; 947 binding.ReadableStreamDefaultControllerError = ReadableStreamDefaultController Error;
908 948
909 binding.createReadableStreamWithExternalController = 949 binding.createReadableStreamWithExternalController =
910 (underlyingSource, strategy) => { 950 (underlyingSource, strategy) => {
911 return new ReadableStream( 951 return new ReadableStream(
912 underlyingSource, strategy, createWithExternalControllerSentinel); 952 underlyingSource, strategy, createWithExternalControllerSentinel);
913 }; 953 };
914 }); 954 });
OLDNEW
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698