Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1109)

Side by Side Diff: third_party/WebKit/Source/core/streams/ReadableStream.js

Issue 2752133003: Use SimpleQueue in WritableStream implementation (Closed)
Patch Set: Remove stale comments and correct copyright date Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « .gn ('k') | third_party/WebKit/Source/core/streams/SimpleQueue.js » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 (function(global, binding, v8) { 5 (function(global, binding, v8) {
6 'use strict'; 6 'use strict';
7 7
8 const _reader = v8.createPrivateSymbol('[[reader]]'); 8 const _reader = v8.createPrivateSymbol('[[reader]]');
9 const _storedError = v8.createPrivateSymbol('[[storedError]]'); 9 const _storedError = v8.createPrivateSymbol('[[storedError]]');
10 const _controller = v8.createPrivateSymbol('[[controller]]'); 10 const _controller = v8.createPrivateSymbol('[[controller]]');
(...skipping 382 matching lines...) Expand 10 before | Expand all | Expand 10 after
393 } 393 }
394 394
395 if (stream[_controller] !== undefined) { 395 if (stream[_controller] !== undefined) {
396 throw new TypeError(streamErrors.illegalConstructor); 396 throw new TypeError(streamErrors.illegalConstructor);
397 } 397 }
398 398
399 this[_controlledReadableStream] = stream; 399 this[_controlledReadableStream] = stream;
400 400
401 this[_underlyingSource] = underlyingSource; 401 this[_underlyingSource] = underlyingSource;
402 402
403 this[_queue] = new Queue(); 403 this[_queue] = new binding.SimpleQueue();
404 this[_totalQueuedSize] = 0; 404 this[_totalQueuedSize] = 0;
405 405
406 this[_readableStreamDefaultControllerBits] = 0b0; 406 this[_readableStreamDefaultControllerBits] = 0b0;
407 if (isExternallyControlled === true) { 407 if (isExternallyControlled === true) {
408 this[_readableStreamDefaultControllerBits] |= EXTERNALLY_CONTROLLED; 408 this[_readableStreamDefaultControllerBits] |= EXTERNALLY_CONTROLLED;
409 } 409 }
410 410
411 const normalizedStrategy = 411 const normalizedStrategy =
412 ValidateAndNormalizeQueuingStrategy(size, highWaterMark); 412 ValidateAndNormalizeQueuingStrategy(size, highWaterMark);
413 this[_strategySize] = normalizedStrategy.size; 413 this[_strategySize] = normalizedStrategy.size;
(...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after
494 } 494 }
495 if (state === STATE_CLOSED) { 495 if (state === STATE_CLOSED) {
496 throw new TypeError(errErrorClosedStream); 496 throw new TypeError(errErrorClosedStream);
497 } 497 }
498 498
499 return ReadableStreamDefaultControllerError(this, e); 499 return ReadableStreamDefaultControllerError(this, e);
500 } 500 }
501 } 501 }
502 502
503 function ReadableStreamDefaultControllerCancel(controller, reason) { 503 function ReadableStreamDefaultControllerCancel(controller, reason) {
504 controller[_queue] = new Queue(); 504 controller[_queue] = new binding.SimpleQueue();
505 505
506 const underlyingSource = controller[_underlyingSource]; 506 const underlyingSource = controller[_underlyingSource];
507 return PromiseCallOrNoop(underlyingSource, 'cancel', reason, 'underlyingSour ce.cancel'); 507 return PromiseCallOrNoop(underlyingSource, 'cancel', reason, 'underlyingSour ce.cancel');
508 } 508 }
509 509
510 function ReadableStreamDefaultControllerPull(controller) { 510 function ReadableStreamDefaultControllerPull(controller) {
511 const stream = controller[_controlledReadableStream]; 511 const stream = controller[_controlledReadableStream];
512 512
513 if (controller[_queue].length > 0) { 513 if (controller[_queue].length > 0) {
514 const chunk = DequeueValue(controller); 514 const chunk = DequeueValue(controller);
(...skipping 23 matching lines...) Expand all
538 constructor(stream) { 538 constructor(stream) {
539 if (IsReadableStream(stream) === false) { 539 if (IsReadableStream(stream) === false) {
540 throw new TypeError(errReaderConstructorBadArgument); 540 throw new TypeError(errReaderConstructorBadArgument);
541 } 541 }
542 if (IsReadableStreamLocked(stream) === true) { 542 if (IsReadableStreamLocked(stream) === true) {
543 throw new TypeError(errReaderConstructorStreamAlreadyLocked); 543 throw new TypeError(errReaderConstructorStreamAlreadyLocked);
544 } 544 }
545 545
546 ReadableStreamReaderGenericInitialize(this, stream); 546 ReadableStreamReaderGenericInitialize(this, stream);
547 547
548 this[_readRequests] = new Queue(); 548 this[_readRequests] = new binding.SimpleQueue();
549 } 549 }
550 550
551 get closed() { 551 get closed() {
552 if (IsReadableStreamDefaultReader(this) === false) { 552 if (IsReadableStreamDefaultReader(this) === false) {
553 return Promise_reject(new TypeError(streamErrors.illegalInvocation)); 553 return Promise_reject(new TypeError(streamErrors.illegalInvocation));
554 } 554 }
555 555
556 return this[_closedPromise]; 556 return this[_closedPromise];
557 } 557 }
558 558
(...skipping 122 matching lines...) Expand 10 before | Expand all | Expand 10 after
681 function ReadableStreamGetState(stream) { 681 function ReadableStreamGetState(stream) {
682 return (stream[_readableStreamBits] & STATE_MASK) >> STATE_BITS_OFFSET; 682 return (stream[_readableStreamBits] & STATE_MASK) >> STATE_BITS_OFFSET;
683 } 683 }
684 684
685 function ReadableStreamSetState(stream, state) { 685 function ReadableStreamSetState(stream, state) {
686 stream[_readableStreamBits] = (stream[_readableStreamBits] & ~STATE_MASK) | 686 stream[_readableStreamBits] = (stream[_readableStreamBits] & ~STATE_MASK) |
687 (state << STATE_BITS_OFFSET); 687 (state << STATE_BITS_OFFSET);
688 } 688 }
689 689
690 function ReadableStreamDefaultControllerError(controller, e) { 690 function ReadableStreamDefaultControllerError(controller, e) {
691 controller[_queue] = new Queue(); 691 controller[_queue] = new binding.SimpleQueue();
692 const stream = controller[_controlledReadableStream]; 692 const stream = controller[_controlledReadableStream];
693 ReadableStreamError(stream, e); 693 ReadableStreamError(stream, e);
694 } 694 }
695 695
696 function ReadableStreamError(stream, e) { 696 function ReadableStreamError(stream, e) {
697 stream[_storedError] = e; 697 stream[_storedError] = e;
698 ReadableStreamSetState(stream, STATE_ERRORED); 698 ReadableStreamSetState(stream, STATE_ERRORED);
699 699
700 const reader = stream[_reader]; 700 const reader = stream[_reader];
701 if (reader === undefined) { 701 if (reader === undefined) {
702 return undefined; 702 return undefined;
703 } 703 }
704 704
705 if (IsReadableStreamDefaultReader(reader) === true) { 705 if (IsReadableStreamDefaultReader(reader) === true) {
706 reader[_readRequests].forEach(request => v8.rejectPromise(request, e)); 706 reader[_readRequests].forEach(request => v8.rejectPromise(request, e));
707 reader[_readRequests] = new Queue(); 707 reader[_readRequests] = new binding.SimpleQueue();
708 } 708 }
709 709
710 v8.rejectPromise(reader[_closedPromise], e); 710 v8.rejectPromise(reader[_closedPromise], e);
711 v8.markPromiseAsHandled(reader[_closedPromise]); 711 v8.markPromiseAsHandled(reader[_closedPromise]);
712 } 712 }
713 713
714 function ReadableStreamClose(stream) { 714 function ReadableStreamClose(stream) {
715 ReadableStreamSetState(stream, STATE_CLOSED); 715 ReadableStreamSetState(stream, STATE_CLOSED);
716 716
717 const reader = stream[_reader]; 717 const reader = stream[_reader];
718 if (reader === undefined) { 718 if (reader === undefined) {
719 return undefined; 719 return undefined;
720 } 720 }
721 721
722 if (IsReadableStreamDefaultReader(reader) === true) { 722 if (IsReadableStreamDefaultReader(reader) === true) {
723 reader[_readRequests].forEach(request => 723 reader[_readRequests].forEach(request =>
724 v8.resolvePromise(request, CreateIterResultObject(undefined, true))); 724 v8.resolvePromise(request, CreateIterResultObject(undefined, true)));
725 reader[_readRequests] = new Queue(); 725 reader[_readRequests] = new binding.SimpleQueue();
726 } 726 }
727 727
728 v8.resolvePromise(reader[_closedPromise], undefined); 728 v8.resolvePromise(reader[_closedPromise], undefined);
729 } 729 }
730 730
731 function ReadableStreamDefaultControllerGetDesiredSize(controller) { 731 function ReadableStreamDefaultControllerGetDesiredSize(controller) {
732 const queueSize = GetTotalQueueSize(controller); 732 const queueSize = GetTotalQueueSize(controller);
733 return controller[_strategyHWM] - queueSize; 733 return controller[_strategyHWM] - queueSize;
734 } 734 }
735 735
(...skipping 248 matching lines...) Expand 10 before | Expand all | Expand 10 after
984 const cancelResult = ReadableStreamCancel(stream, compositeReason); 984 const cancelResult = ReadableStreamCancel(stream, compositeReason);
985 v8.resolvePromise(promise, cancelResult); 985 v8.resolvePromise(promise, cancelResult);
986 } 986 }
987 987
988 return promise; 988 return promise;
989 } 989 }
990 } 990 }
991 991
992 // 992 //
993 // Queue-with-sizes 993 // Queue-with-sizes
994 // Modified from taking the queue (as in the spec) to taking the stream, so we
995 // can modify the queue size alongside.
996 // 994 //
997 995
998 // Simple queue structure. Avoids scalability issues with using
999 // InternalPackedArray directly by using multiple arrays
1000 // in a linked list and keeping the array size bounded.
1001 const QUEUE_MAX_ARRAY_SIZE = 16384;
1002 class Queue {
1003 constructor() {
1004 this.front = {
1005 elements: new v8.InternalPackedArray(),
1006 next: undefined,
1007 };
1008 this.back = this.front;
1009 // The cursor is used to avoid calling InternalPackedArray.shift().
1010 this.cursor = 0;
1011 this.size = 0;
1012 }
1013
1014 get length() {
1015 return this.size;
1016 }
1017
1018 push(element) {
1019 ++this.size;
1020 if (this.back.elements.length === QUEUE_MAX_ARRAY_SIZE) {
1021 const oldBack = this.back;
1022 this.back = {
1023 elements: new v8.InternalPackedArray(),
1024 next: undefined,
1025 };
1026 oldBack.next = this.back;
1027 }
1028 this.back.elements.push(element);
1029 }
1030
1031 shift() {
1032 // assert(this.size > 0);
1033 --this.size;
1034 if (this.front.elements.length === this.cursor) {
1035 // assert(this.cursor === QUEUE_MAX_ARRAY_SIZE);
1036 // assert(this.front.next !== undefined);
1037 this.front = this.front.next;
1038 this.cursor = 0;
1039 }
1040 const element = this.front.elements[this.cursor];
1041 // Permit shifted element to be garbage collected.
1042 this.front.elements[this.cursor] = undefined;
1043 ++this.cursor;
1044
1045 return element;
1046 }
1047
1048 forEach(callback) {
1049 let i = this.cursor;
1050 let node = this.front;
1051 let elements = node.elements;
1052 while (i !== elements.length || node.next !== undefined) {
1053 if (i === elements.length) {
1054 // assert(node.next !== undefined);
1055 // assert(i === QUEUE_MAX_ARRAY_SIZE);
1056 node = node.next;
1057 elements = node.elements;
1058 i = 0;
1059 }
1060 callback(elements[i]);
1061 ++i;
1062 }
1063 }
1064 }
1065
1066 function DequeueValue(controller) { 996 function DequeueValue(controller) {
1067 const result = controller[_queue].shift(); 997 const result = controller[_queue].shift();
1068 controller[_totalQueuedSize] -= result.size; 998 controller[_totalQueuedSize] -= result.size;
1069 return result.value; 999 return result.value;
1070 } 1000 }
1071 1001
1072 function EnqueueValueWithSize(controller, value, size) { 1002 function EnqueueValueWithSize(controller, value, size) {
1073 size = Number(size); 1003 size = Number(size);
1074 if (Number_isNaN(size) || size === +Infinity || size < 0) { 1004 if (Number_isNaN(size) || size === +Infinity || size < 0) {
1075 throw new RangeError(streamErrors.invalidSize); 1005 throw new RangeError(streamErrors.invalidSize);
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after
1177 (underlyingSource, strategy) => { 1107 (underlyingSource, strategy) => {
1178 return new ReadableStream( 1108 return new ReadableStream(
1179 underlyingSource, strategy, createWithExternalControllerSentinel); 1109 underlyingSource, strategy, createWithExternalControllerSentinel);
1180 }; 1110 };
1181 1111
1182 // Temporary exports while pipeTo() and pipeThrough() are behind flags 1112 // Temporary exports while pipeTo() and pipeThrough() are behind flags
1183 binding.ReadableStream_prototype_pipeThrough = 1113 binding.ReadableStream_prototype_pipeThrough =
1184 ReadableStream_prototype_pipeThrough; 1114 ReadableStream_prototype_pipeThrough;
1185 binding.ReadableStream_prototype_pipeTo = ReadableStream_prototype_pipeTo; 1115 binding.ReadableStream_prototype_pipeTo = ReadableStream_prototype_pipeTo;
1186 }); 1116 });
OLDNEW
« no previous file with comments | « .gn ('k') | third_party/WebKit/Source/core/streams/SimpleQueue.js » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698