Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(369)

Side by Side Diff: third_party/WebKit/Source/core/streams/ReadableStream.js

Issue 2637863002: Fix ReadableStream scalability issue (Closed)
Patch Set: Remove unreachable code and use constant Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 (function(global, binding, v8) { 5 (function(global, binding, v8) {
6 'use strict'; 6 'use strict';
7 7
8 const _reader = v8.createPrivateSymbol('[[reader]]'); 8 const _reader = v8.createPrivateSymbol('[[reader]]');
9 const _storedError = v8.createPrivateSymbol('[[storedError]]'); 9 const _storedError = v8.createPrivateSymbol('[[storedError]]');
10 const _controller = v8.createPrivateSymbol('[[controller]]'); 10 const _controller = v8.createPrivateSymbol('[[controller]]');
(...skipping 177 matching lines...) Expand 10 before | Expand all | Expand 10 after
188 } 188 }
189 189
190 if (stream[_controller] !== undefined) { 190 if (stream[_controller] !== undefined) {
191 throw new TypeError(streamErrors.illegalConstructor); 191 throw new TypeError(streamErrors.illegalConstructor);
192 } 192 }
193 193
194 this[_controlledReadableStream] = stream; 194 this[_controlledReadableStream] = stream;
195 195
196 this[_underlyingSource] = underlyingSource; 196 this[_underlyingSource] = underlyingSource;
197 197
198 this[_queue] = new v8.InternalPackedArray(); 198 this[_queue] = new Queue();
199 this[_totalQueuedSize] = 0; 199 this[_totalQueuedSize] = 0;
200 200
201 this[_readableStreamDefaultControllerBits] = 0b0; 201 this[_readableStreamDefaultControllerBits] = 0b0;
202 if (isExternallyControlled === true) { 202 if (isExternallyControlled === true) {
203 this[_readableStreamDefaultControllerBits] |= EXTERNALLY_CONTROLLED; 203 this[_readableStreamDefaultControllerBits] |= EXTERNALLY_CONTROLLED;
204 } 204 }
205 205
206 const normalizedStrategy = 206 const normalizedStrategy =
207 ValidateAndNormalizeQueuingStrategy(size, highWaterMark); 207 ValidateAndNormalizeQueuingStrategy(size, highWaterMark);
208 this[_strategySize] = normalizedStrategy.size; 208 this[_strategySize] = normalizedStrategy.size;
(...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after
289 } 289 }
290 if (state === STATE_CLOSED) { 290 if (state === STATE_CLOSED) {
291 throw new TypeError(errErrorClosedStream); 291 throw new TypeError(errErrorClosedStream);
292 } 292 }
293 293
294 return ReadableStreamDefaultControllerError(this, e); 294 return ReadableStreamDefaultControllerError(this, e);
295 } 295 }
296 } 296 }
297 297
298 function ReadableStreamDefaultControllerCancel(controller, reason) { 298 function ReadableStreamDefaultControllerCancel(controller, reason) {
299 controller[_queue] = new v8.InternalPackedArray(); 299 controller[_queue] = new Queue();
300 300
301 const underlyingSource = controller[_underlyingSource]; 301 const underlyingSource = controller[_underlyingSource];
302 return PromiseCallOrNoop(underlyingSource, 'cancel', reason, 'underlyingSour ce.cancel'); 302 return PromiseCallOrNoop(underlyingSource, 'cancel', reason, 'underlyingSour ce.cancel');
303 } 303 }
304 304
305 function ReadableStreamDefaultControllerPull(controller) { 305 function ReadableStreamDefaultControllerPull(controller) {
306 const stream = controller[_controlledReadableStream]; 306 const stream = controller[_controlledReadableStream];
307 307
308 if (controller[_queue].length > 0) { 308 if (controller[_queue].length > 0) {
309 const chunk = DequeueValue(controller); 309 const chunk = DequeueValue(controller);
(...skipping 23 matching lines...) Expand all
333 constructor(stream) { 333 constructor(stream) {
334 if (IsReadableStream(stream) === false) { 334 if (IsReadableStream(stream) === false) {
335 throw new TypeError(errReaderConstructorBadArgument); 335 throw new TypeError(errReaderConstructorBadArgument);
336 } 336 }
337 if (IsReadableStreamLocked(stream) === true) { 337 if (IsReadableStreamLocked(stream) === true) {
338 throw new TypeError(errReaderConstructorStreamAlreadyLocked); 338 throw new TypeError(errReaderConstructorStreamAlreadyLocked);
339 } 339 }
340 340
341 ReadableStreamReaderGenericInitialize(this, stream); 341 ReadableStreamReaderGenericInitialize(this, stream);
342 342
343 this[_readRequests] = new v8.InternalPackedArray(); 343 this[_readRequests] = new Queue();
344 } 344 }
345 345
346 get closed() { 346 get closed() {
347 if (IsReadableStreamDefaultReader(this) === false) { 347 if (IsReadableStreamDefaultReader(this) === false) {
348 return Promise_reject(new TypeError(streamErrors.illegalInvocation)); 348 return Promise_reject(new TypeError(streamErrors.illegalInvocation));
349 } 349 }
350 350
351 return this[_closedPromise]; 351 return this[_closedPromise];
352 } 352 }
353 353
(...skipping 122 matching lines...) Expand 10 before | Expand all | Expand 10 after
476 function ReadableStreamGetState(stream) { 476 function ReadableStreamGetState(stream) {
477 return (stream[_readableStreamBits] & STATE_MASK) >> STATE_BITS_OFFSET; 477 return (stream[_readableStreamBits] & STATE_MASK) >> STATE_BITS_OFFSET;
478 } 478 }
479 479
480 function ReadableStreamSetState(stream, state) { 480 function ReadableStreamSetState(stream, state) {
481 stream[_readableStreamBits] = (stream[_readableStreamBits] & ~STATE_MASK) | 481 stream[_readableStreamBits] = (stream[_readableStreamBits] & ~STATE_MASK) |
482 (state << STATE_BITS_OFFSET); 482 (state << STATE_BITS_OFFSET);
483 } 483 }
484 484
485 function ReadableStreamDefaultControllerError(controller, e) { 485 function ReadableStreamDefaultControllerError(controller, e) {
486 controller[_queue] = new v8.InternalPackedArray(); 486 controller[_queue] = new Queue();
487 const stream = controller[_controlledReadableStream]; 487 const stream = controller[_controlledReadableStream];
488 ReadableStreamError(stream, e); 488 ReadableStreamError(stream, e);
489 } 489 }
490 490
491 function ReadableStreamError(stream, e) { 491 function ReadableStreamError(stream, e) {
492 stream[_storedError] = e; 492 stream[_storedError] = e;
493 ReadableStreamSetState(stream, STATE_ERRORED); 493 ReadableStreamSetState(stream, STATE_ERRORED);
494 494
495 const reader = stream[_reader]; 495 const reader = stream[_reader];
496 if (reader === undefined) { 496 if (reader === undefined) {
497 return undefined; 497 return undefined;
498 } 498 }
499 499
500 if (IsReadableStreamDefaultReader(reader) === true) { 500 if (IsReadableStreamDefaultReader(reader) === true) {
501 const readRequests = reader[_readRequests]; 501 reader[_readRequests].forEach(request => v8.rejectPromise(request, e));
502 for (let i = 0; i < readRequests.length; i++) { 502 reader[_readRequests] = new Queue();
503 v8.rejectPromise(readRequests[i], e);
504 }
505 reader[_readRequests] = new v8.InternalPackedArray();
506 } 503 }
507 504
508 v8.rejectPromise(reader[_closedPromise], e); 505 v8.rejectPromise(reader[_closedPromise], e);
509 v8.markPromiseAsHandled(reader[_closedPromise]); 506 v8.markPromiseAsHandled(reader[_closedPromise]);
510 } 507 }
511 508
512 function ReadableStreamClose(stream) { 509 function ReadableStreamClose(stream) {
513 ReadableStreamSetState(stream, STATE_CLOSED); 510 ReadableStreamSetState(stream, STATE_CLOSED);
514 511
515 const reader = stream[_reader]; 512 const reader = stream[_reader];
516 if (reader === undefined) { 513 if (reader === undefined) {
517 return undefined; 514 return undefined;
518 } 515 }
519 516
520 if (IsReadableStreamDefaultReader(reader) === true) { 517 if (IsReadableStreamDefaultReader(reader) === true) {
521 const readRequests = reader[_readRequests]; 518 reader[_readRequests].forEach(request =>
522 for (let i = 0; i < readRequests.length; i++) { 519 v8.resolvePromise(request, CreateIterResultObject(undefined, true)));
523 v8.resolvePromise( 520 reader[_readRequests] = new Queue();
524 readRequests[i], CreateIterResultObject(undefined, true));
525 }
526 reader[_readRequests] = new v8.InternalPackedArray();
527 } 521 }
528 522
529 v8.resolvePromise(reader[_closedPromise], undefined); 523 v8.resolvePromise(reader[_closedPromise], undefined);
530 } 524 }
531 525
532 function ReadableStreamDefaultControllerGetDesiredSize(controller) { 526 function ReadableStreamDefaultControllerGetDesiredSize(controller) {
533 const queueSize = GetTotalQueueSize(controller); 527 const queueSize = GetTotalQueueSize(controller);
534 return controller[_strategyHWM] - queueSize; 528 return controller[_strategyHWM] - queueSize;
535 } 529 }
536 530
(...skipping 252 matching lines...) Expand 10 before | Expand all | Expand 10 after
789 return promise; 783 return promise;
790 } 784 }
791 } 785 }
792 786
793 // 787 //
794 // Queue-with-sizes 788 // Queue-with-sizes
795 // Modified from taking the queue (as in the spec) to taking the stream, so we 789 // Modified from taking the queue (as in the spec) to taking the stream, so we
796 // can modify the queue size alongside. 790 // can modify the queue size alongside.
797 // 791 //
798 792
793 // Simple queue structure. Avoids scalability issues with using
794 // InternalPackedArray directly by using multiple arrays
795 // in a linked list and keeping the array size bounded.
796 const QUEUE_MAX_ARRAY_SIZE = 16384;
797 class Queue {
798 constructor() {
799 this.front = {
800 elements: new v8.InternalPackedArray(),
801 next: undefined,
802 };
803 this.back = this.front;
804 // The cursor is used to avoid calling InternalPackedArray.shift().
805 this.cursor = 0;
806 this.size = 0;
807 }
808
809 get length() {
810 return this.size;
811 }
812
813 push(element) {
814 ++this.size;
815 if (this.back.elements.length === QUEUE_MAX_ARRAY_SIZE) {
816 const oldBack = this.back;
817 this.back = {
818 elements: new v8.InternalPackedArray(),
819 next: undefined,
820 };
821 oldBack.next = this.back;
822 }
823 this.back.elements.push(element);
824 }
825
826 shift() {
827 // assert(this.size > 0);
828 --this.size;
829 if (this.front.elements.length === this.cursor) {
830 // assert(this.cursor === QUEUE_MAX_ARRAY_SIZE);
831 // assert(this.front.next !== undefined);
832 this.front = this.front.next;
833 this.cursor = 0;
834 }
835 const element = this.front.elements[this.cursor];
836 // Permit shifted element to be garbage collected.
837 this.front.elements[this.cursor] = undefined;
838 ++this.cursor;
839
840 return element;
841 }
842
843 forEach(callback) {
844 let i = this.cursor;
845 let node = this.front;
846 let elements = node.elements;
847 while (i !== elements.length || node.next !== undefined) {
848 if (i === elements.length) {
849 // assert(node.next !== undefined);
850 // assert(i === QUEUE_MAX_ARRAY_SIZE);
851 node = node.next;
852 elements = node.elements;
853 i = 0;
854 }
855 callback(elements[i]);
856 ++i;
857 }
858 }
859 }
860
799 function DequeueValue(controller) { 861 function DequeueValue(controller) {
800 const result = controller[_queue].shift(); 862 const result = controller[_queue].shift();
801 controller[_totalQueuedSize] -= result.size; 863 controller[_totalQueuedSize] -= result.size;
802 return result.value; 864 return result.value;
803 } 865 }
804 866
805 function EnqueueValueWithSize(controller, value, size) { 867 function EnqueueValueWithSize(controller, value, size) {
806 size = Number(size); 868 size = Number(size);
807 if (Number_isNaN(size) || size === +Infinity || size < 0) { 869 if (Number_isNaN(size) || size === +Infinity || size < 0) {
808 throw new RangeError(streamErrors.invalidSize); 870 throw new RangeError(streamErrors.invalidSize);
(...skipping 96 matching lines...) Expand 10 before | Expand all | Expand 10 after
905 binding.ReadableStreamDefaultControllerGetDesiredSize = ReadableStreamDefaultC ontrollerGetDesiredSize; 967 binding.ReadableStreamDefaultControllerGetDesiredSize = ReadableStreamDefaultC ontrollerGetDesiredSize;
906 binding.ReadableStreamDefaultControllerEnqueue = ReadableStreamDefaultControll erEnqueue; 968 binding.ReadableStreamDefaultControllerEnqueue = ReadableStreamDefaultControll erEnqueue;
907 binding.ReadableStreamDefaultControllerError = ReadableStreamDefaultController Error; 969 binding.ReadableStreamDefaultControllerError = ReadableStreamDefaultController Error;
908 970
909 binding.createReadableStreamWithExternalController = 971 binding.createReadableStreamWithExternalController =
910 (underlyingSource, strategy) => { 972 (underlyingSource, strategy) => {
911 return new ReadableStream( 973 return new ReadableStream(
912 underlyingSource, strategy, createWithExternalControllerSentinel); 974 underlyingSource, strategy, createWithExternalControllerSentinel);
913 }; 975 };
914 }); 976 });
OLDNEW
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698