Index: third_party/WebKit/Source/core/streams/ReadableStream.js |
diff --git a/third_party/WebKit/Source/core/streams/ReadableStream.js b/third_party/WebKit/Source/core/streams/ReadableStream.js |
index 1586b0e61500db517bc361c1b456f8867c0db6a7..b8d33caf451f90ce1511a4da69284ec61148c0c1 100644 |
--- a/third_party/WebKit/Source/core/streams/ReadableStream.js |
+++ b/third_party/WebKit/Source/core/streams/ReadableStream.js |
@@ -195,7 +195,7 @@ |
this[_underlyingSource] = underlyingSource; |
- this[_queue] = new v8.InternalPackedArray(); |
+ this[_queue] = new Queue(); |
this[_totalQueuedSize] = 0; |
this[_readableStreamDefaultControllerBits] = 0b0; |
@@ -296,7 +296,7 @@ |
} |
function ReadableStreamDefaultControllerCancel(controller, reason) { |
- controller[_queue] = new v8.InternalPackedArray(); |
+ controller[_queue] = new Queue(); |
const underlyingSource = controller[_underlyingSource]; |
return PromiseCallOrNoop(underlyingSource, 'cancel', reason, 'underlyingSource.cancel'); |
@@ -340,7 +340,7 @@ |
ReadableStreamReaderGenericInitialize(this, stream); |
- this[_readRequests] = new v8.InternalPackedArray(); |
+ this[_readRequests] = new Queue(); |
} |
get closed() { |
@@ -483,7 +483,7 @@ |
} |
function ReadableStreamDefaultControllerError(controller, e) { |
- controller[_queue] = new v8.InternalPackedArray(); |
+ controller[_queue] = new Queue(); |
const stream = controller[_controlledReadableStream]; |
ReadableStreamError(stream, e); |
} |
@@ -498,11 +498,8 @@ |
} |
if (IsReadableStreamDefaultReader(reader) === true) { |
- const readRequests = reader[_readRequests]; |
- for (let i = 0; i < readRequests.length; i++) { |
- v8.rejectPromise(readRequests[i], e); |
- } |
- reader[_readRequests] = new v8.InternalPackedArray(); |
+ reader[_readRequests].forEach(request => v8.rejectPromise(request, e)); |
+ reader[_readRequests] = new Queue(); |
} |
v8.rejectPromise(reader[_closedPromise], e); |
@@ -518,12 +515,9 @@ |
} |
if (IsReadableStreamDefaultReader(reader) === true) { |
- const readRequests = reader[_readRequests]; |
- for (let i = 0; i < readRequests.length; i++) { |
- v8.resolvePromise( |
- readRequests[i], CreateIterResultObject(undefined, true)); |
- } |
- reader[_readRequests] = new v8.InternalPackedArray(); |
+ reader[_readRequests].forEach(request => |
+ v8.resolvePromise(request, CreateIterResultObject(undefined, true))); |
+ reader[_readRequests] = new Queue(); |
} |
v8.resolvePromise(reader[_closedPromise], undefined); |
@@ -796,6 +790,74 @@ |
// can modify the queue size alongside. |
// |
+ // Simple queue structure. Avoids scalability issues with using |
+ // InternalPackedArray directly by using multiple arrays |
+ // in a linked list and keeping the array size bounded. |
+ const QUEUE_MAX_ARRAY_SIZE = 16384; |
+ class Queue { |
+ constructor() { |
+ this.front = { |
+ elements: new v8.InternalPackedArray(), |
+ next: undefined, |
+ }; |
+ this.back = this.front; |
+ // The cursor is used to avoid calling InternalPackedArray.shift(). |
+ this.cursor = 0; |
+ this.size = 0; |
+ } |
+ |
+ get length() { |
+ return this.size; |
+ } |
+ |
+ push(element) { |
+ ++this.size; |
+ if (this.back.elements.length === QUEUE_MAX_ARRAY_SIZE) { |
+ const oldBack = this.back; |
+ this.back = { |
+ elements: new v8.InternalPackedArray(), |
+ next: undefined, |
+ }; |
+ oldBack.next = this.back; |
+ } |
+ this.back.elements.push(element); |
+ } |
+ |
+ shift() { |
+ // assert(this.size > 0); |
+ --this.size; |
+ if (this.front.elements.length === this.cursor) { |
+ // assert(this.cursor === QUEUE_MAX_ARRAY_SIZE); |
+ // assert(this.front.next !== undefined); |
+ this.front = this.front.next; |
+ this.cursor = 0; |
+ } |
+ const element = this.front.elements[this.cursor]; |
+ // Permit shifted element to be garbage collected. |
+ this.front.elements[this.cursor] = undefined; |
+ ++this.cursor; |
+ |
+ return element; |
+ } |
+ |
+ forEach(callback) { |
+ let i = this.cursor; |
+ let node = this.front; |
+ let elements = node.elements; |
+ while (i !== elements.length || node.next !== undefined) { |
+ if (i === elements.length) { |
+ // assert(node.next !== undefined); |
+ // assert(i === QUEUE_MAX_ARRAY_SIZE); |
+ node = node.next; |
+ elements = node.elements; |
+ i = 0; |
+ } |
+ callback(elements[i]); |
+ ++i; |
+ } |
+ } |
+ } |
+ |
function DequeueValue(controller) { |
const result = controller[_queue].shift(); |
controller[_totalQueuedSize] -= result.size; |