Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(850)

Side by Side Diff: third_party/WebKit/Source/core/streams/ReadableStream.js

Issue 1902673003: Reflect recent spec changes to V8 Extra ReadableStream impl (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Fixed build Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 (function(global, binding, v8) { 5 (function(global, binding, v8) {
6 'use strict'; 6 'use strict';
7 7
8 const readableStreamReader = v8.createPrivateSymbol('[[reader]]');
9 const readableStreamStoredError = v8.createPrivateSymbol('[[storedError]]');
8 const readableStreamController = v8.createPrivateSymbol('[[controller]]'); 10 const readableStreamController = v8.createPrivateSymbol('[[controller]]');
9 const readableStreamQueue = v8.createPrivateSymbol('[[queue]]');
10 const readableStreamQueueSize =
11 v8.createPrivateSymbol('[[queue]] total size');
12 const readableStreamReader = v8.createPrivateSymbol('[[reader]]');
13 const readableStreamState = v8.createPrivateSymbol('[[state]]');
14 const readableStreamStoredError = v8.createPrivateSymbol('[[storedError]]');
15 const readableStreamStrategySize = v8.createPrivateSymbol('[[strategySize]]');
16 const readableStreamStrategyHWM = v8.createPrivateSymbol('[[strategyHWM]]');
17 const readableStreamUnderlyingSource =
18 v8.createPrivateSymbol('[[underlyingSource]]');
19
20 const readableStreamControllerControlledReadableStream =
21 v8.createPrivateSymbol('[[controlledReadableStream]]');
22 11
23 const readableStreamReaderClosedPromise = 12 const readableStreamReaderClosedPromise =
24 v8.createPrivateSymbol('[[closedPromise]]'); 13 v8.createPrivateSymbol('[[closedPromise]]');
25 const readableStreamReaderOwnerReadableStream = 14 const readableStreamReaderOwnerReadableStream =
26 v8.createPrivateSymbol('[[ownerReadableStream]]'); 15 v8.createPrivateSymbol('[[ownerReadableStream]]');
27 const readableStreamReaderReadRequests = 16
17 const readableStreamDefaultReaderReadRequests =
28 v8.createPrivateSymbol('[[readRequests]]'); 18 v8.createPrivateSymbol('[[readRequests]]');
29 19
30 const createWithExternalControllerSentinel = 20 const createWithExternalControllerSentinel =
31 v8.createPrivateSymbol('flag for UA-created ReadableStream to pass'); 21 v8.createPrivateSymbol('flag for UA-created ReadableStream to pass');
32 22
23 const readableStreamBits = v8.createPrivateSymbol('bit field for [[state]] and [[disturbed]]');
24 const DISTURBED = 0b1;
25 // The 2nd and 3rd bit are for [[state]].
26 const STATE_MASK = 0b110;
27 const STATE_BITS_OFFSET = 1;
33 const STATE_READABLE = 0; 28 const STATE_READABLE = 0;
34 const STATE_CLOSED = 1; 29 const STATE_CLOSED = 1;
35 const STATE_ERRORED = 2; 30 const STATE_ERRORED = 2;
36 31
37 const readableStreamBits = v8.createPrivateSymbol( 32 const readableStreamDefaultControllerUnderlyingSource =
38 'bit field for [[started]], [[closeRequested]], [[pulling]], [[pullAgain]] , [[disturbed]]'); 33 v8.createPrivateSymbol('[[underlyingSource]]');
34 const readableStreamDefaultControllerControlledReadableStream =
35 v8.createPrivateSymbol('[[controlledReadableStream]]');
36 const readableStreamDefaultControllerQueue = v8.createPrivateSymbol('[[queue]] ');
37 const readableStreamDefaultControllerQueueSize =
38 v8.createPrivateSymbol('[[queue]] total size');
39 const readableStreamDefaultControllerStrategySize =
40 v8.createPrivateSymbol('[[strategySize]]');
41 const readableStreamDefaultControllerStrategyHWM =
42 v8.createPrivateSymbol('[[strategyHWM]]');
43
44 const readableStreamDefaultControllerBits = v8.createPrivateSymbol(
45 'bit field for [[started]], [[closeRequested]], [[pulling]], [[pullAgain]] ');
39 const STARTED = 0b1; 46 const STARTED = 0b1;
40 const CLOSE_REQUESTED = 0b10; 47 const CLOSE_REQUESTED = 0b10;
41 const PULLING = 0b100; 48 const PULLING = 0b100;
42 const PULL_AGAIN = 0b1000; 49 const PULL_AGAIN = 0b1000;
43 const DISTURBED = 0b10000; 50 const EXTERNALLY_CONTROLLED = 0b10000;
51
52 const readableStreamControllerCancel =
53 v8.createPrivateSymbol('[[InternalCancel]]');
54 const readableStreamControllerPull = v8.createPrivateSymbol('[[InternalPull]]' );
44 55
45 const undefined = global.undefined; 56 const undefined = global.undefined;
46 const Infinity = global.Infinity; 57 const Infinity = global.Infinity;
47 58
48 const defineProperty = global.Object.defineProperty; 59 const defineProperty = global.Object.defineProperty;
49 const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty); 60 const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty);
50 const callFunction = v8.uncurryThis(global.Function.prototype.call); 61 const callFunction = v8.uncurryThis(global.Function.prototype.call);
51 62
52 const TypeError = global.TypeError; 63 const TypeError = global.TypeError;
53 const RangeError = global.RangeError; 64 const RangeError = global.RangeError;
54 65
55 const Number = global.Number; 66 const Number = global.Number;
56 const Number_isNaN = Number.isNaN; 67 const Number_isNaN = Number.isNaN;
57 const Number_isFinite = Number.isFinite; 68 const Number_isFinite = Number.isFinite;
58 69
59 const Promise = global.Promise; 70 const Promise = global.Promise;
60 const thenPromise = v8.uncurryThis(Promise.prototype.then); 71 const thenPromise = v8.uncurryThis(Promise.prototype.then);
61 const Promise_resolve = v8.simpleBind(Promise.resolve, Promise); 72 const Promise_resolve = v8.simpleBind(Promise.resolve, Promise);
62 const Promise_reject = v8.simpleBind(Promise.reject, Promise); 73 const Promise_reject = v8.simpleBind(Promise.reject, Promise);
63 74
64 const errIllegalInvocation = 'Illegal invocation'; 75 const errIllegalInvocation = 'Illegal invocation';
65 const errIllegalConstructor = 'Illegal constructor'; 76 const errIllegalConstructor = 'Illegal constructor';
66 const errCancelLockedStream = 77 const errCancelLockedStream =
67 'Cannot cancel a readable stream that is locked to a reader'; 78 'Cannot cancel a readable stream that is locked to a reader';
68 const errEnqueueInCloseRequestedStream = 79 const errEnqueueCloseRequestedStream =
69 'Cannot enqueue a chunk into a readable stream that is closed or has been requested to be closed'; 80 'Cannot enqueue a chunk into a readable stream that is closed or has been requested to be closed';
70 const errCancelReleasedReader = 81 const errCancelReleasedReader =
71 'This readable stream reader has been released and cannot be used to cance l its previous owner stream'; 82 'This readable stream reader has been released and cannot be used to cance l its previous owner stream';
72 const errReadReleasedReader = 83 const errReadReleasedReader =
73 'This readable stream reader has been released and cannot be used to read from its previous owner stream'; 84 'This readable stream reader has been released and cannot be used to read from its previous owner stream';
74 const errCloseCloseRequestedStream = 85 const errCloseCloseRequestedStream =
75 'Cannot close a readable stream that has already been requested to be clos ed'; 86 'Cannot close a readable stream that has already been requested to be clos ed';
87 const errEnqueueClosedStream = 'Cannot enqueue a chunk into a closed readable stream';
88 const errEnqueueErroredStream = 'Cannot enqueue a chunk into an errored readab le stream';
89 const errCloseClosedStream = 'Cannot close a closed readable stream';
76 const errCloseErroredStream = 'Cannot close an errored readable stream'; 90 const errCloseErroredStream = 'Cannot close an errored readable stream';
77 const errErrorClosedStream = 'Cannot error a close readable stream'; 91 const errErrorClosedStream = 'Cannot error a close readable stream';
78 const errErrorErroredStream = 92 const errErrorErroredStream =
79 'Cannot error a readable stream that is already errored'; 93 'Cannot error a readable stream that is already errored';
94 const errGetReaderNotByteStream = 'This readable stream does not support BYOB readers';
95 const errGetReaderBadMode = 'Invalid reader mode given: expected undefined or "byob"';
80 const errReaderConstructorBadArgument = 96 const errReaderConstructorBadArgument =
81 'ReadableStreamReader constructor argument is not a readable stream'; 97 'ReadableStreamReader constructor argument is not a readable stream';
82 const errReaderConstructorStreamAlreadyLocked = 98 const errReaderConstructorStreamAlreadyLocked =
83 'ReadableStreamReader constructor can only accept readable streams that ar e not yet locked to a reader'; 99 'ReadableStreamReader constructor can only accept readable streams that ar e not yet locked to a reader';
84 const errReleaseReaderWithPendingRead = 100 const errReleaseReaderWithPendingRead =
85 'Cannot release a readable stream reader when it still has outstanding rea d() calls that have not yet settled'; 101 'Cannot release a readable stream reader when it still has outstanding rea d() calls that have not yet settled';
86 const errReleasedReaderClosedPromise = 102 const errReleasedReaderClosedPromise =
87 'This readable stream reader has been released and cannot be used to monit or the stream\'s state'; 103 'This readable stream reader has been released and cannot be used to monit or the stream\'s state';
88 const errInvalidSize = 104 const errInvalidSize =
89 'The return value of a queuing strategy\'s size function must be a finite, non-NaN, non-negative number'; 105 'The return value of a queuing strategy\'s size function must be a finite, non-NaN, non-negative number';
90 const errSizeNotAFunction = 106 const errSizeNotAFunction =
91 'A queuing strategy\'s size property must be a function'; 107 'A queuing strategy\'s size property must be a function';
92 const errInvalidHWM = 108 const errInvalidHWM =
93 'A queueing strategy\'s highWaterMark property must be a nonnegative, non- NaN number'; 109 'A queueing strategy\'s highWaterMark property must be a nonnegative, non- NaN number';
94 const errTmplMustBeFunctionOrUndefined = name => 110 const errTmplMustBeFunctionOrUndefined = name =>
95 `${name} must be a function or undefined`; 111 `${name} must be a function or undefined`;
96 112
97 class ReadableStream { 113 class ReadableStream {
98 constructor() { 114 constructor() {
99 // TODO(domenic): when V8 gets default parameters and destructuring, all 115 // TODO(domenic): when V8 gets default parameters and destructuring, all
100 // this can be cleaned up. 116 // this can be cleaned up.
101 const underlyingSource = arguments[0] === undefined ? {} : arguments[0]; 117 const underlyingSource = arguments[0] === undefined ? {} : arguments[0];
102 const strategy = arguments[1] === undefined ? {} : arguments[1]; 118 const strategy = arguments[1] === undefined ? {} : arguments[1];
103 const size = strategy.size; 119 const size = strategy.size;
104 let highWaterMark = strategy.highWaterMark; 120 let highWaterMark = strategy.highWaterMark;
105 if (highWaterMark === undefined) { 121 if (highWaterMark === undefined) {
106 highWaterMark = 1; 122 highWaterMark = 1;
107 } 123 }
108 124
109 const normalizedStrategy =
110 ValidateAndNormalizeQueuingStrategy(size, highWaterMark);
111
112 this[readableStreamUnderlyingSource] = underlyingSource;
113
114 this[readableStreamQueue] = new v8.InternalPackedArray();
115 this[readableStreamQueueSize] = 0;
116
117 this[readableStreamState] = STATE_READABLE;
118 this[readableStreamBits] = 0b0; 125 this[readableStreamBits] = 0b0;
126 ReadableStreamSetState(this, STATE_READABLE);
119 this[readableStreamReader] = undefined; 127 this[readableStreamReader] = undefined;
120 this[readableStreamStoredError] = undefined; 128 this[readableStreamStoredError] = undefined;
121 129
122 this[readableStreamStrategySize] = normalizedStrategy.size;
123 this[readableStreamStrategyHWM] = normalizedStrategy.highWaterMark;
124
125 // Avoid allocating the controller if the stream is going to be controlled 130 // Avoid allocating the controller if the stream is going to be controlled
126 // externally (i.e. from C++) anyway. All calls to underlyingSource 131 // externally (i.e. from C++) anyway. All calls to underlyingSource
127 // methods will disregard their controller argument in such situations 132 // methods will disregard their controller argument in such situations
128 // (but see below). 133 // (but see below).
129 134
130 const isControlledExternally = 135 this[readableStreamController] = undefined;
131 arguments[2] === createWithExternalControllerSentinel;
132 const controller =
133 isControlledExternally ? null : new ReadableStreamController(this);
134 this[readableStreamController] = controller;
135 136
136 // We need to pass ourself to the underlyingSource start method for 137 const type = underlyingSource.type;
137 // externally-controlled streams. We use the now-useless controller 138 const typeString = String(type);
138 // argument to do so. 139 if (typeString === 'bytes') {
139 const argToStart = isControlledExternally ? this : controller; 140 throw new RangeError('bytes type is not yet implemented');
141 } else if (type !== undefined) {
142 throw new RangeError('Invalid type is specified');
143 }
140 144
141 const startResult = CallOrNoop( 145 this[readableStreamController] =
142 underlyingSource, 'start', argToStart, 'underlyingSource.start'); 146 new ReadableStreamDefaultController(this, underlyingSource, size, high WaterMark, arguments[2] === createWithExternalControllerSentinel);
143 thenPromise(Promise_resolve(startResult),
144 () => {
145 this[readableStreamBits] |= STARTED;
146 RequestReadableStreamPull(this);
147 },
148 r => {
149 if (this[readableStreamState] === STATE_READABLE) {
150 return ErrorReadableStream(this, r);
151 }
152 });
153 } 147 }
154 148
155 get locked() { 149 get locked() {
156 if (IsReadableStream(this) === false) { 150 if (IsReadableStream(this) === false) {
157 throw new TypeError(errIllegalInvocation); 151 throw new TypeError(errIllegalInvocation);
158 } 152 }
159 153
160 return IsReadableStreamLocked(this); 154 return IsReadableStreamLocked(this);
161 } 155 }
162 156
163 cancel(reason) { 157 cancel(reason) {
164 if (IsReadableStream(this) === false) { 158 if (IsReadableStream(this) === false) {
165 return Promise_reject(new TypeError(errIllegalInvocation)); 159 return Promise_reject(new TypeError(errIllegalInvocation));
166 } 160 }
167 161
168 if (IsReadableStreamLocked(this) === true) { 162 if (IsReadableStreamLocked(this) === true) {
169 return Promise_reject(new TypeError(errCancelLockedStream)); 163 return Promise_reject(new TypeError(errCancelLockedStream));
170 } 164 }
171 165
172 return CancelReadableStream(this, reason); 166 return ReadableStreamCancel(this, reason);
173 } 167 }
174 168
175 getReader() { 169 getReader({ mode } = {}) {
176 if (IsReadableStream(this) === false) { 170 if (IsReadableStream(this) === false) {
177 throw new TypeError(errIllegalInvocation); 171 throw new TypeError(errIllegalInvocation);
178 } 172 }
179 173
180 return AcquireReadableStreamReader(this); 174 if (mode === 'byob') {
175 if (IsReadableByteStreamDefaultController(this[readableStreamController] ) === false) {
176 throw new TypeError(errGetReaderNotByteStream);
177 }
178
179 return AcquireReadableStreamBYOBReader(this);
180 }
181
182 if (mode === undefined) {
183 return AcquireReadableStreamDefaultReader(this);
184 }
185
186 throw new RangeError(errGetReaderBadMode);
181 } 187 }
182 188
183 tee() { 189 tee() {
184 if (IsReadableStream(this) === false) { 190 if (IsReadableStream(this) === false) {
185 throw new TypeError(errIllegalInvocation); 191 throw new TypeError(errIllegalInvocation);
186 } 192 }
187 193
188 return TeeReadableStream(this); 194 return ReadableStreamTee(this);
189 } 195 }
190 } 196 }
191 197
192 class ReadableStreamController { 198 class ReadableStreamDefaultController {
193 constructor(stream) { 199 constructor(stream, underlyingSource, size, highWaterMark, isExternallyContr olled) {
194 if (IsReadableStream(stream) === false) { 200 if (IsReadableStream(stream) === false) {
195 throw new TypeError(errIllegalConstructor); 201 throw new TypeError(errIllegalConstructor);
196 } 202 }
197 203
198 if (stream[readableStreamController] !== undefined) { 204 if (stream[readableStreamController] !== undefined) {
199 throw new TypeError(errIllegalConstructor); 205 throw new TypeError(errIllegalConstructor);
200 } 206 }
201 207
202 this[readableStreamControllerControlledReadableStream] = stream; 208 this[readableStreamDefaultControllerControlledReadableStream] = stream;
209
210 this[readableStreamDefaultControllerUnderlyingSource] = underlyingSource;
211
212 this[readableStreamDefaultControllerQueue] = new v8.InternalPackedArray();
213 this[readableStreamDefaultControllerQueueSize] = 0;
214
215 this[readableStreamDefaultControllerBits] = 0b0;
216 if (isExternallyControlled === true) {
217 this[readableStreamDefaultControllerBits] |= EXTERNALLY_CONTROLLED;
218 }
219
220 const normalizedStrategy =
221 ValidateAndNormalizeQueuingStrategy(size, highWaterMark);
222 this[readableStreamDefaultControllerStrategySize] = normalizedStrategy.siz e;
223 this[readableStreamDefaultControllerStrategyHWM] = normalizedStrategy.high WaterMark;
224
225 const controller = this;
226
227 const startResult = CallOrNoop(
228 underlyingSource, 'start', this, 'underlyingSource.start');
229 thenPromise(Promise_resolve(startResult),
230 () => {
231 controller[readableStreamDefaultControllerBits] |= STARTED;
232 ReadableStreamDefaultControllerCallPullIfNeeded(controller);
233 },
234 r => {
235 if (ReadableStreamGetState(stream) === STATE_READABLE) {
236 ReadableStreamDefaultControllerError(controller, r);
237 }
238 });
203 } 239 }
204 240
205 get desiredSize() { 241 get desiredSize() {
206 if (IsReadableStreamController(this) === false) { 242 if (IsReadableStreamDefaultController(this) === false) {
207 throw new TypeError(errIllegalInvocation); 243 throw new TypeError(errIllegalInvocation);
208 } 244 }
209 245
210 return GetReadableStreamDesiredSize( 246 return ReadableStreamDefaultControllerGetDesiredSize(this);
211 this[readableStreamControllerControlledReadableStream]);
212 } 247 }
213 248
214 close() { 249 close() {
215 if (IsReadableStreamController(this) === false) { 250 if (IsReadableStreamDefaultController(this) === false) {
216 throw new TypeError(errIllegalInvocation); 251 throw new TypeError(errIllegalInvocation);
217 } 252 }
218 253
219 const stream = this[readableStreamControllerControlledReadableStream]; 254 const stream = this[readableStreamDefaultControllerControlledReadableStrea m];
220 255
221 if (stream[readableStreamBits] & CLOSE_REQUESTED) { 256 if (this[readableStreamDefaultControllerBits] & CLOSE_REQUESTED) {
222 throw new TypeError(errCloseCloseRequestedStream); 257 throw new TypeError(errCloseCloseRequestedStream);
223 } 258 }
224 if (stream[readableStreamState] === STATE_ERRORED) { 259
260 const state = ReadableStreamGetState(stream);
261 if (state === STATE_ERRORED) {
225 throw new TypeError(errCloseErroredStream); 262 throw new TypeError(errCloseErroredStream);
226 } 263 }
264 if (state === STATE_CLOSED) {
265 throw new TypeError(errCloseClosedStream);
266 }
227 267
228 return CloseReadableStream(stream); 268 return ReadableStreamDefaultControllerClose(this);
229 } 269 }
230 270
231 enqueue(chunk) { 271 enqueue(chunk) {
232 if (IsReadableStreamController(this) === false) { 272 if (IsReadableStreamDefaultController(this) === false) {
233 throw new TypeError(errIllegalInvocation); 273 throw new TypeError(errIllegalInvocation);
234 } 274 }
235 275
236 const stream = this[readableStreamControllerControlledReadableStream]; 276 const stream = this[readableStreamDefaultControllerControlledReadableStrea m];
237 277
238 if (stream[readableStreamState] === STATE_ERRORED) { 278 if (this[readableStreamDefaultControllerBits] & CLOSE_REQUESTED) {
239 throw stream[readableStreamStoredError]; 279 throw new TypeError(errEnqueueCloseRequestedStream);
240 } 280 }
241 281
242 if (stream[readableStreamBits] & CLOSE_REQUESTED) { 282 const state = ReadableStreamGetState(stream);
243 throw new TypeError(errEnqueueInCloseRequestedStream); 283 if (state === STATE_ERRORED) {
284 throw new TypeError(errEnqueueErroredStream);
285 }
286 if (state === STATE_CLOSED) {
287 throw new TypeError(errEnqueueClosedStream);
244 } 288 }
245 289
246 return EnqueueInReadableStream(stream, chunk); 290 return ReadableStreamDefaultControllerEnqueue(this, chunk);
247 } 291 }
248 292
249 error(e) { 293 error(e) {
250 if (IsReadableStreamController(this) === false) { 294 if (IsReadableStreamDefaultController(this) === false) {
251 throw new TypeError(errIllegalInvocation); 295 throw new TypeError(errIllegalInvocation);
252 } 296 }
253 297
254 const stream = this[readableStreamControllerControlledReadableStream]; 298 const stream = this[readableStreamDefaultControllerControlledReadableStrea m];
255 299
256 const state = stream[readableStreamState]; 300 const state = ReadableStreamGetState(stream);
257 if (state !== STATE_READABLE) { 301 if (state === STATE_ERRORED) {
258 if (state === STATE_ERRORED) { 302 throw new TypeError(errErrorErroredStream);
259 throw new TypeError(errErrorErroredStream); 303 }
260 } 304 if (state === STATE_CLOSED) {
261 if (state === STATE_CLOSED) { 305 throw new TypeError(errErrorClosedStream);
262 throw new TypeError(errErrorClosedStream);
263 }
264 } 306 }
265 307
266 return ErrorReadableStream(stream, e); 308 return ReadableStreamDefaultControllerError(this, e);
267 } 309 }
268 } 310 }
269 311
270 class ReadableStreamReader { 312 function ReadableStreamDefaultControllerCancel(controller, reason) {
313 controller[readableStreamDefaultControllerQueue] = new v8.InternalPackedArra y();
314
315 const underlyingSource = controller[readableStreamDefaultControllerUnderlyin gSource];
316 return PromiseCallOrNoop(underlyingSource, 'cancel', reason, 'underlyingSour ce.cancel');
317 }
318
319 function ReadableStreamDefaultControllerPull(controller) {
320 const stream = controller[readableStreamDefaultControllerControlledReadableS tream];
321
322 if (controller[readableStreamDefaultControllerQueue].length > 0) {
323 const chunk = DequeueValue(controller);
324
325 if ((controller[readableStreamDefaultControllerBits] & CLOSE_REQUESTED) &&
326 controller[readableStreamDefaultControllerQueue].length === 0) {
327 ReadableStreamClose(stream);
328 } else {
329 ReadableStreamDefaultControllerCallPullIfNeeded(controller);
330 }
331
332 return Promise_resolve(CreateIterResultObject(chunk, false));
333 }
334
335 const pendingPromise = ReadableStreamAddReadRequest(stream);
336 ReadableStreamDefaultControllerCallPullIfNeeded(controller);
337 return pendingPromise;
338 }
339
340 function ReadableStreamAddReadRequest(stream) {
341 const promise = v8.createPromise();
342 stream[readableStreamReader][readableStreamDefaultReaderReadRequests].push(p romise);
343 return promise;
344 }
345
346 class ReadableStreamDefaultReader {
271 constructor(stream) { 347 constructor(stream) {
272 if (IsReadableStream(stream) === false) { 348 if (IsReadableStream(stream) === false) {
273 throw new TypeError(errReaderConstructorBadArgument); 349 throw new TypeError(errReaderConstructorBadArgument);
274 } 350 }
275 if (IsReadableStreamLocked(stream) === true) { 351 if (IsReadableStreamLocked(stream) === true) {
276 throw new TypeError(errReaderConstructorStreamAlreadyLocked); 352 throw new TypeError(errReaderConstructorStreamAlreadyLocked);
277 } 353 }
278 354
279 // TODO(yhirano): Remove this when we don't need hasPendingActivity in 355 ReadableStreamReaderGenericInitialize(this, stream);
280 // blink::UnderlyingSourceBase.
281 if (stream[readableStreamController] === null) {
282 // The stream is created with an external controller (i.e. made in
283 // Blink).
284 const underlyingSource = stream[readableStreamUnderlyingSource];
285 callFunction(underlyingSource.notifyLockAcquired, underlyingSource);
286 }
287 356
288 this[readableStreamReaderOwnerReadableStream] = stream; 357 this[readableStreamDefaultReaderReadRequests] = new v8.InternalPackedArray ();
289 stream[readableStreamReader] = this;
290
291 this[readableStreamReaderReadRequests] = new v8.InternalPackedArray();
292
293 switch (stream[readableStreamState]) {
294 case STATE_READABLE:
295 this[readableStreamReaderClosedPromise] = v8.createPromise();
296 break;
297 case STATE_CLOSED:
298 this[readableStreamReaderClosedPromise] = Promise_resolve(undefined);
299 break;
300 case STATE_ERRORED:
301 this[readableStreamReaderClosedPromise] =
302 Promise_reject(stream[readableStreamStoredError]);
303 break;
304 }
305 } 358 }
306 359
307 get closed() { 360 get closed() {
308 if (IsReadableStreamReader(this) === false) { 361 if (IsReadableStreamDefaultReader(this) === false) {
309 return Promise_reject(new TypeError(errIllegalInvocation)); 362 return Promise_reject(new TypeError(errIllegalInvocation));
310 } 363 }
311 364
312 return this[readableStreamReaderClosedPromise]; 365 return this[readableStreamReaderClosedPromise];
313 } 366 }
314 367
315 cancel(reason) { 368 cancel(reason) {
316 if (IsReadableStreamReader(this) === false) { 369 if (IsReadableStreamDefaultReader(this) === false) {
317 return Promise_reject(new TypeError(errIllegalInvocation)); 370 return Promise_reject(new TypeError(errIllegalInvocation));
318 } 371 }
319 372
320 const stream = this[readableStreamReaderOwnerReadableStream]; 373 const stream = this[readableStreamReaderOwnerReadableStream];
321 if (stream === undefined) { 374 if (stream === undefined) {
322 return Promise_reject(new TypeError(errCancelReleasedReader)); 375 return Promise_reject(new TypeError(errCancelReleasedReader));
323 } 376 }
324 377
325 return CancelReadableStream(stream, reason); 378 return ReadableStreamReaderGenericCancel(this, reason);
326 } 379 }
327 380
328 read() { 381 read() {
329 if (IsReadableStreamReader(this) === false) { 382 if (IsReadableStreamDefaultReader(this) === false) {
330 return Promise_reject(new TypeError(errIllegalInvocation)); 383 return Promise_reject(new TypeError(errIllegalInvocation));
331 } 384 }
332 385
333 if (this[readableStreamReaderOwnerReadableStream] === undefined) { 386 if (this[readableStreamReaderOwnerReadableStream] === undefined) {
334 return Promise_reject(new TypeError(errReadReleasedReader)); 387 return Promise_reject(new TypeError(errReadReleasedReader));
335 } 388 }
336 389
337 return ReadFromReadableStreamReader(this); 390 return ReadableStreamDefaultReaderRead(this);
338 } 391 }
339 392
340 releaseLock() { 393 releaseLock() {
341 if (IsReadableStreamReader(this) === false) { 394 if (IsReadableStreamDefaultReader(this) === false) {
342 throw new TypeError(errIllegalInvocation); 395 throw new TypeError(errIllegalInvocation);
343 } 396 }
344 397
345 const stream = this[readableStreamReaderOwnerReadableStream]; 398 const stream = this[readableStreamReaderOwnerReadableStream];
346 if (stream === undefined) { 399 if (stream === undefined) {
347 return undefined; 400 return undefined;
348 } 401 }
349 402
350 if (this[readableStreamReaderReadRequests].length > 0) { 403 if (this[readableStreamDefaultReaderReadRequests].length > 0) {
351 throw new TypeError(errReleaseReaderWithPendingRead); 404 throw new TypeError(errReleaseReaderWithPendingRead);
352 } 405 }
353 406
354 // TODO(yhirano): Remove this when we don't need hasPendingActivity in 407 ReadableStreamReaderGenericRelease(this);
355 // blink::UnderlyingSourceBase. 408 }
356 if (stream[readableStreamController] === null) { 409 }
357 // The stream is created with an external controller (i.e. made in
358 // Blink).
359 const underlyingSource = stream[readableStreamUnderlyingSource];
360 callFunction(underlyingSource.notifyLockReleased, underlyingSource);
361 }
362 410
363 if (stream[readableStreamState] === STATE_READABLE) { 411 function ReadableStreamReaderGenericCancel(reader, reason) {
364 v8.rejectPromise(this[readableStreamReaderClosedPromise], 412 return ReadableStreamCancel(reader[readableStreamReaderOwnerReadableStream], reason);
365 new TypeError(errReleasedReaderClosedPromise));
366 } else {
367 this[readableStreamReaderClosedPromise] =
368 Promise_reject(new TypeError(errReleasedReaderClosedPromise));
369 }
370
371 this[readableStreamReaderOwnerReadableStream][readableStreamReader] =
372 undefined;
373 this[readableStreamReaderOwnerReadableStream] = undefined;
374 }
375 } 413 }
376 414
377 // 415 //
378 // Readable stream abstract operations 416 // Readable stream abstract operations
379 // 417 //
380 418
381 function AcquireReadableStreamReader(stream) { 419 function AcquireReadableStreamDefaultReader(stream) {
382 return new ReadableStreamReader(stream); 420 return new ReadableStreamDefaultReader(stream);
383 } 421 }
384 422
385 function CancelReadableStream(stream, reason) { 423 function ReadableStreamCancel(stream, reason) {
386 stream[readableStreamBits] |= DISTURBED; 424 stream[readableStreamBits] |= DISTURBED;
387 425
388 const state = stream[readableStreamState]; 426 const state = ReadableStreamGetState(stream);
389 if (state === STATE_CLOSED) { 427 if (state === STATE_CLOSED) {
390 return Promise_resolve(undefined); 428 return Promise_resolve(undefined);
391 } 429 }
392 if (state === STATE_ERRORED) { 430 if (state === STATE_ERRORED) {
393 return Promise_reject(stream[readableStreamStoredError]); 431 return Promise_reject(stream[readableStreamStoredError]);
394 } 432 }
395 433
396 stream[readableStreamQueue] = new v8.InternalPackedArray(); 434 ReadableStreamClose(stream);
397 FinishClosingReadableStream(stream);
398 435
399 const underlyingSource = stream[readableStreamUnderlyingSource]; 436 const sourceCancelPromise = ReadableStreamDefaultControllerCancel(stream[rea dableStreamController], reason);
400 const sourceCancelPromise = PromiseCallOrNoop(
401 underlyingSource, 'cancel', reason, 'underlyingSource.cancel');
402 return thenPromise(sourceCancelPromise, () => undefined); 437 return thenPromise(sourceCancelPromise, () => undefined);
403 } 438 }
404 439
405 function CloseReadableStream(stream) { 440 function ReadableStreamDefaultControllerClose(controller) {
406 if (stream[readableStreamState] === STATE_CLOSED) { 441 const stream = controller[readableStreamDefaultControllerControlledReadableS tream];
407 return undefined;
408 }
409 442
410 stream[readableStreamBits] |= CLOSE_REQUESTED; 443 controller[readableStreamDefaultControllerBits] |= CLOSE_REQUESTED;
411 444
412 if (stream[readableStreamQueue].length === 0) { 445 if (controller[readableStreamDefaultControllerQueue].length === 0) {
413 return FinishClosingReadableStream(stream); 446 ReadableStreamClose(stream);
414 } 447 }
415 } 448 }
416 449
417 function EnqueueInReadableStream(stream, chunk) { 450 function ReadableStreamFulfillReadRequest(stream, chunk, done) {
418 if (stream[readableStreamState] === STATE_CLOSED) { 451 const reader = stream[readableStreamReader];
419 return undefined;
420 }
421 452
422 if (IsReadableStreamLocked(stream) === true && 453 const readRequest =
423 stream[readableStreamReader][readableStreamReaderReadRequests].length > 454 stream[readableStreamReader][readableStreamDefaultReaderReadRequests]
424 0) { 455 .shift();
425 const readRequest = 456 v8.resolvePromise(readRequest, CreateIterResultObject(chunk, done));
426 stream[readableStreamReader][readableStreamReaderReadRequests] 457 }
427 .shift(); 458
428 v8.resolvePromise(readRequest, CreateIterResultObject(chunk, false)); 459 function ReadableStreamDefaultControllerEnqueue(controller, chunk) {
460 const stream = controller[readableStreamDefaultControllerControlledReadableS tream];
461
462 if (IsReadableStreamLocked(stream) === true && ReadableStreamGetNumReadReque sts(stream) > 0) {
463 ReadableStreamFulfillReadRequest(stream, chunk, false);
429 } else { 464 } else {
430 let chunkSize = 1; 465 let chunkSize = 1;
431 466
432 const strategySize = stream[readableStreamStrategySize]; 467 const strategySize = controller[readableStreamDefaultControllerStrategySiz e];
433 if (strategySize !== undefined) { 468 if (strategySize !== undefined) {
434 try { 469 try {
435 chunkSize = strategySize(chunk); 470 chunkSize = strategySize(chunk);
436 } catch (chunkSizeE) { 471 } catch (chunkSizeE) {
437 if (stream[readableStreamState] === STATE_READABLE) { 472 if (ReadableStreamGetState(stream) === STATE_READABLE) {
438 ErrorReadableStream(stream, chunkSizeE); 473 ReadableStreamDefaultControllerError(controller, chunkSizeE);
439 } 474 }
440 throw chunkSizeE; 475 throw chunkSizeE;
441 } 476 }
442 } 477 }
443 478
444 try { 479 try {
445 EnqueueValueWithSize(stream, chunk, chunkSize); 480 EnqueueValueWithSize(controller, chunk, chunkSize);
446 } catch (enqueueE) { 481 } catch (enqueueE) {
447 if (stream[readableStreamState] === STATE_READABLE) { 482 if (ReadableStreamGetState(stream) === STATE_READABLE) {
448 ErrorReadableStream(stream, enqueueE); 483 ReadableStreamDefaultControllerError(controller, enqueueE);
449 } 484 }
450 throw enqueueE; 485 throw enqueueE;
451 } 486 }
452 } 487 }
453 488
454 RequestReadableStreamPull(stream); 489 ReadableStreamDefaultControllerCallPullIfNeeded(controller);
455 } 490 }
456 491
457 function ErrorReadableStream(stream, e) { 492 function ReadableStreamGetState(stream) {
458 stream[readableStreamQueue] = new v8.InternalPackedArray(); 493 return (stream[readableStreamBits] & STATE_MASK) >> STATE_BITS_OFFSET;
494 }
495
496 function ReadableStreamSetState(stream, state) {
497 stream[readableStreamBits] = (stream[readableStreamBits] & ~STATE_MASK) |
498 (state << STATE_BITS_OFFSET);
499 }
500
501 function ReadableStreamDefaultControllerError(controller, e) {
502 controller[readableStreamDefaultControllerQueue] = new v8.InternalPackedArra y();
503 const stream = controller[readableStreamDefaultControllerControlledReadableS tream];
504 ReadableStreamError(stream, e);
505 }
506
507 function ReadableStreamError(stream, e) {
459 stream[readableStreamStoredError] = e; 508 stream[readableStreamStoredError] = e;
460 stream[readableStreamState] = STATE_ERRORED; 509 ReadableStreamSetState(stream, STATE_ERRORED);
461 510
462 const reader = stream[readableStreamReader]; 511 const reader = stream[readableStreamReader];
463 if (reader === undefined) { 512 if (reader === undefined) {
464 return undefined; 513 return undefined;
465 } 514 }
466 515
467 const readRequests = reader[readableStreamReaderReadRequests]; 516 if (IsReadableStreamDefaultReader(reader) === true) {
468 for (let i = 0; i < readRequests.length; ++i) { 517 const readRequests = reader[readableStreamDefaultReaderReadRequests];
469 v8.rejectPromise(readRequests[i], e); 518 for (let i = 0; i < readRequests.length; i++) {
519 v8.rejectPromise(readRequests[i], e);
520 }
521 reader[readableStreamDefaultReaderReadRequests] = new v8.InternalPackedArr ay();
470 } 522 }
471 reader[readableStreamReaderReadRequests] = new v8.InternalPackedArray();
472 523
473 v8.rejectPromise(reader[readableStreamReaderClosedPromise], e); 524 v8.rejectPromise(reader[readableStreamReaderClosedPromise], e);
474 } 525 }
475 526
476 function FinishClosingReadableStream(stream) { 527 function ReadableStreamClose(stream) {
477 stream[readableStreamState] = STATE_CLOSED; 528 ReadableStreamSetState(stream, STATE_CLOSED);
478 529
479 const reader = stream[readableStreamReader]; 530 const reader = stream[readableStreamReader];
480 if (reader === undefined) { 531 if (reader === undefined) {
481 return undefined; 532 return undefined;
482 } 533 }
483 534
484 535 if (IsReadableStreamDefaultReader(reader) === true) {
485 const readRequests = reader[readableStreamReaderReadRequests]; 536 const readRequests = reader[readableStreamDefaultReaderReadRequests];
486 for (let i = 0; i < readRequests.length; ++i) { 537 for (let i = 0; i < readRequests.length; i++) {
487 v8.resolvePromise( 538 v8.resolvePromise(
488 readRequests[i], CreateIterResultObject(undefined, true)); 539 readRequests[i], CreateIterResultObject(undefined, true));
540 }
541 reader[readableStreamDefaultReaderReadRequests] = new v8.InternalPackedArr ay();
489 } 542 }
490 reader[readableStreamReaderReadRequests] = new v8.InternalPackedArray();
491 543
492 v8.resolvePromise(reader[readableStreamReaderClosedPromise], undefined); 544 v8.resolvePromise(reader[readableStreamReaderClosedPromise], undefined);
493 } 545 }
494 546
495 function GetReadableStreamDesiredSize(stream) { 547 function ReadableStreamDefaultControllerGetDesiredSize(controller) {
496 const queueSize = GetTotalQueueSize(stream); 548 const queueSize = GetTotalQueueSize(controller);
497 return stream[readableStreamStrategyHWM] - queueSize; 549 return controller[readableStreamDefaultControllerStrategyHWM] - queueSize;
498 } 550 }
499 551
500 function IsReadableStream(x) { 552 function IsReadableStream(x) {
501 return hasOwnProperty(x, readableStreamUnderlyingSource); 553 return hasOwnProperty(x, readableStreamController);
502 } 554 }
503 555
504 function IsReadableStreamDisturbed(stream) { 556 function IsReadableStreamDisturbed(stream) {
505 return stream[readableStreamBits] & DISTURBED; 557 return stream[readableStreamBits] & DISTURBED;
506 } 558 }
507 559
508 function IsReadableStreamLocked(stream) { 560 function IsReadableStreamLocked(stream) {
509 return stream[readableStreamReader] !== undefined; 561 return stream[readableStreamReader] !== undefined;
510 } 562 }
511 563
512 function IsReadableStreamController(x) { 564 function IsReadableStreamDefaultController(x) {
513 return hasOwnProperty(x, readableStreamControllerControlledReadableStream); 565 return hasOwnProperty(x, readableStreamDefaultControllerControlledReadableSt ream);
566 }
567
568 function IsReadableStreamDefaultReader(x) {
569 return hasOwnProperty(x, readableStreamDefaultReaderReadRequests);
514 } 570 }
515 571
516 function IsReadableStreamReadable(stream) { 572 function IsReadableStreamReadable(stream) {
517 return stream[readableStreamState] === STATE_READABLE; 573 return ReadableStreamGetState(stream) === STATE_READABLE;
518 } 574 }
519 575
520 function IsReadableStreamClosed(stream) { 576 function IsReadableStreamClosed(stream) {
521 return stream[readableStreamState] === STATE_CLOSED; 577 return ReadableStreamGetState(stream) === STATE_CLOSED;
522 } 578 }
523 579
524 function IsReadableStreamErrored(stream) { 580 function IsReadableStreamErrored(stream) {
525 return stream[readableStreamState] === STATE_ERRORED; 581 return ReadableStreamGetState(stream) === STATE_ERRORED;
526 } 582 }
527 583
528 function IsReadableStreamReader(x) { 584 function ReadableStreamReaderGenericInitialize(reader, stream) {
529 return hasOwnProperty(x, readableStreamReaderOwnerReadableStream); 585 // TODO(yhirano): Remove this when we don't need hasPendingActivity in
586 // blink::UnderlyingSourceBase.
587 const controller = stream[readableStreamController];
588 if (controller[readableStreamDefaultControllerBits] & EXTERNALLY_CONTROLLED) {
589 // The stream is created with an external controller (i.e. made in
590 // Blink).
591 const underlyingSource = controller[readableStreamDefaultControllerUnderly ingSource];
592 callFunction(underlyingSource.notifyLockAcquired, underlyingSource);
593 }
594
595 reader[readableStreamReaderOwnerReadableStream] = stream;
596 stream[readableStreamReader] = reader;
597
598 switch (ReadableStreamGetState(stream)) {
599 case STATE_READABLE:
600 reader[readableStreamReaderClosedPromise] = v8.createPromise();
601 break;
602 case STATE_CLOSED:
603 reader[readableStreamReaderClosedPromise] = Promise_resolve(undefined);
604 break;
605 case STATE_ERRORED:
606 reader[readableStreamReaderClosedPromise] =
607 Promise_reject(stream[readableStreamStoredError]);
608 break;
609 }
530 } 610 }
531 611
532 function ReadFromReadableStreamReader(reader) { 612 function ReadableStreamReaderGenericRelease(reader) {
613 // TODO(yhirano): Remove this when we don't need hasPendingActivity in
614 // blink::UnderlyingSourceBase.
615 const controller = reader[readableStreamReaderOwnerReadableStream][readableS treamController];
616 if (controller[readableStreamDefaultControllerBits] & EXTERNALLY_CONTROLLED) {
617 // The stream is created with an external controller (i.e. made in
618 // Blink).
619 const underlyingSource = controller[readableStreamDefaultControllerUnderly ingSource];
620 callFunction(underlyingSource.notifyLockReleased, underlyingSource);
621 }
622
623 if (ReadableStreamGetState(reader[readableStreamReaderOwnerReadableStream]) === STATE_READABLE) {
624 v8.rejectPromise(reader[readableStreamReaderClosedPromise], new TypeError( errReleasedReaderClosedPromise));
625 } else {
626 reader[readableStreamReaderClosedPromise] = Promise_reject(new TypeError(e rrReleasedReaderClosedPromise));
627 }
628
629 reader[readableStreamReaderOwnerReadableStream][readableStreamReader] =
630 undefined;
631 reader[readableStreamReaderOwnerReadableStream] = undefined;
632 }
633
634 function ReadableStreamDefaultReaderRead(reader) {
533 const stream = reader[readableStreamReaderOwnerReadableStream]; 635 const stream = reader[readableStreamReaderOwnerReadableStream];
534 stream[readableStreamBits] |= DISTURBED; 636 stream[readableStreamBits] |= DISTURBED;
535 637
536 if (stream[readableStreamState] === STATE_CLOSED) { 638 if (ReadableStreamGetState(stream) === STATE_CLOSED) {
537 return Promise_resolve(CreateIterResultObject(undefined, true)); 639 return Promise_resolve(CreateIterResultObject(undefined, true));
538 } 640 }
539 641
540 if (stream[readableStreamState] === STATE_ERRORED) { 642 if (ReadableStreamGetState(stream) === STATE_ERRORED) {
541 return Promise_reject(stream[readableStreamStoredError]); 643 return Promise_reject(stream[readableStreamStoredError]);
542 } 644 }
543 645
544 const queue = stream[readableStreamQueue]; 646 return ReadableStreamDefaultControllerPull(stream[readableStreamController]) ;
545 if (queue.length > 0) {
546 const chunk = DequeueValue(stream);
547
548 if (stream[readableStreamBits] & CLOSE_REQUESTED && queue.length === 0) {
549 FinishClosingReadableStream(stream);
550 } else {
551 RequestReadableStreamPull(stream);
552 }
553
554 return Promise_resolve(CreateIterResultObject(chunk, false));
555 } else {
556 const readRequest = v8.createPromise();
557
558 reader[readableStreamReaderReadRequests].push(readRequest);
559 RequestReadableStreamPull(stream);
560 return readRequest;
561 }
562 } 647 }
563 648
564 function RequestReadableStreamPull(stream) { 649 function ReadableStreamDefaultControllerCallPullIfNeeded(controller) {
565 const shouldPull = ShouldReadableStreamPull(stream); 650 const shouldPull = ReadableStreamDefaultControllerShouldCallPull(controller) ;
566 if (shouldPull === false) { 651 if (shouldPull === false) {
567 return undefined; 652 return undefined;
568 } 653 }
569 654
570 if (stream[readableStreamBits] & PULLING) { 655 if (controller[readableStreamDefaultControllerBits] & PULLING) {
571 stream[readableStreamBits] |= PULL_AGAIN; 656 controller[readableStreamDefaultControllerBits] |= PULL_AGAIN;
572 return undefined; 657 return undefined;
573 } 658 }
574 659
575 stream[readableStreamBits] |= PULLING; 660 controller[readableStreamDefaultControllerBits] |= PULLING;
576 661
577 const underlyingSource = stream[readableStreamUnderlyingSource]; 662 const underlyingSource = controller[readableStreamDefaultControllerUnderlyin gSource];
578 const controller = stream[readableStreamController];
579 const pullPromise = PromiseCallOrNoop( 663 const pullPromise = PromiseCallOrNoop(
580 underlyingSource, 'pull', controller, 'underlyingSource.pull'); 664 underlyingSource, 'pull', controller, 'underlyingSource.pull');
581 665
582 thenPromise(pullPromise, 666 thenPromise(pullPromise,
583 () => { 667 () => {
584 stream[readableStreamBits] &= ~PULLING; 668 controller[readableStreamDefaultControllerBits] &= ~PULLING;
585 669
586 if (stream[readableStreamBits] & PULL_AGAIN) { 670 if (controller[readableStreamDefaultControllerBits] & PULL_AGAIN) {
587 stream[readableStreamBits] &= ~PULL_AGAIN; 671 controller[readableStreamDefaultControllerBits] &= ~PULL_AGAIN;
588 return RequestReadableStreamPull(stream); 672 ReadableStreamDefaultControllerCallPullIfNeeded(controller);
589 } 673 }
590 }, 674 },
591 e => { 675 e => {
592 if (stream[readableStreamState] === STATE_READABLE) { 676 if (ReadableStreamGetState(controller[readableStreamDefaultControllerC ontrolledReadableStream]) === STATE_READABLE) {
593 return ErrorReadableStream(stream, e); 677 ReadableStreamDefaultControllerError(controller, e);
594 } 678 }
595 }); 679 });
596 } 680 }
597 681
598 function ShouldReadableStreamPull(stream) { 682 function ReadableStreamDefaultControllerShouldCallPull(controller) {
599 const state = stream[readableStreamState]; 683 const stream = controller[readableStreamDefaultControllerControlledReadableS tream];
684
685 const state = ReadableStreamGetState(stream);
600 if (state === STATE_CLOSED || state === STATE_ERRORED) { 686 if (state === STATE_CLOSED || state === STATE_ERRORED) {
601 return false; 687 return false;
602 } 688 }
603 689
604 if (stream[readableStreamBits] & CLOSE_REQUESTED) { 690 if (controller[readableStreamDefaultControllerBits] & CLOSE_REQUESTED) {
605 return false; 691 return false;
606 } 692 }
607 693
608 if (!(stream[readableStreamBits] & STARTED)) { 694 if (!(controller[readableStreamDefaultControllerBits] & STARTED)) {
609 return false; 695 return false;
610 } 696 }
611 697
612 if (IsReadableStreamLocked(stream) === true) { 698 if (IsReadableStreamLocked(stream) === true && ReadableStreamGetNumReadReque sts(stream) > 0) {
613 const reader = stream[readableStreamReader]; 699 return true;
614 const readRequests = reader[readableStreamReaderReadRequests];
615 if (readRequests.length > 0) {
616 return true;
617 }
618 } 700 }
619 701
620 const desiredSize = GetReadableStreamDesiredSize(stream); 702 const desiredSize = ReadableStreamDefaultControllerGetDesiredSize(controller );
621 if (desiredSize > 0) { 703 if (desiredSize > 0) {
622 return true; 704 return true;
623 } 705 }
624 706
625 return false; 707 return false;
626 } 708 }
627 709
710 function ReadableStreamGetNumReadRequests(stream) {
711 const reader = stream[readableStreamReader];
712 const readRequests = reader[readableStreamDefaultReaderReadRequests];
713 return readRequests.length;
714 }
715
628 // Potential future optimization: use class instances for the underlying 716 // Potential future optimization: use class instances for the underlying
629 // sources, so that we don't re-create 717 // sources, so that we don't re-create
630 // closures every time. 718 // closures every time.
631 719
632 // TODO(domenic): shouldClone argument from spec not supported yet 720 // TODO(domenic): shouldClone argument from spec not supported yet
633 function TeeReadableStream(stream) { 721 function ReadableStreamTee(stream) {
634 const reader = AcquireReadableStreamReader(stream); 722 const reader = AcquireReadableStreamDefaultReader(stream);
635 723
636 let closedOrErrored = false; 724 let closedOrErrored = false;
637 let canceled1 = false; 725 let canceled1 = false;
638 let canceled2 = false; 726 let canceled2 = false;
639 let reason1; 727 let reason1;
640 let reason2; 728 let reason2;
641 let promise = v8.createPromise(); 729 let promise = v8.createPromise();
642 730
643 const branch1 = new ReadableStream({pull, cancel: cancel1}); 731 const branch1Stream = new ReadableStream({pull, cancel: cancel1});
644 732
645 const branch2 = new ReadableStream({pull, cancel: cancel2}); 733 const branch2Stream = new ReadableStream({pull, cancel: cancel2});
734
735 const branch1 = branch1Stream[readableStreamController];
736 const branch2 = branch2Stream[readableStreamController];
646 737
647 thenPromise( 738 thenPromise(
648 reader[readableStreamReaderClosedPromise], undefined, function(r) { 739 reader[readableStreamReaderClosedPromise], undefined, function(r) {
649 if (closedOrErrored === true) { 740 if (closedOrErrored === true) {
650 return; 741 return;
651 } 742 }
652 743
653 ErrorReadableStream(branch1, r); 744 ReadableStreamDefaultControllerError(branch1, r);
654 ErrorReadableStream(branch2, r); 745 ReadableStreamDefaultControllerError(branch2, r);
655 closedOrErrored = true; 746 closedOrErrored = true;
656 }); 747 });
657 748
658 return [branch1, branch2]; 749 return [branch1Stream, branch2Stream];
659
660 750
661 function pull() { 751 function pull() {
662 return thenPromise( 752 return thenPromise(
663 ReadFromReadableStreamReader(reader), function(result) { 753 ReadableStreamDefaultReaderRead(reader), function(result) {
664 const value = result.value; 754 const value = result.value;
665 const done = result.done; 755 const done = result.done;
666 756
667 if (done === true && closedOrErrored === false) { 757 if (done === true && closedOrErrored === false) {
668 CloseReadableStream(branch1); 758 if (canceled1 === false) {
669 CloseReadableStream(branch2); 759 ReadableStreamDefaultControllerClose(branch1);
760 }
761 if (canceled2 === false) {
762 ReadableStreamDefaultControllerClose(branch2);
763 }
670 closedOrErrored = true; 764 closedOrErrored = true;
671 } 765 }
672 766
673 if (closedOrErrored === true) { 767 if (closedOrErrored === true) {
674 return; 768 return;
675 } 769 }
676 770
677 if (canceled1 === false) { 771 if (canceled1 === false) {
678 EnqueueInReadableStream(branch1, value); 772 ReadableStreamDefaultControllerEnqueue(branch1, value);
679 } 773 }
680 774
681 if (canceled2 === false) { 775 if (canceled2 === false) {
682 EnqueueInReadableStream(branch2, value); 776 ReadableStreamDefaultControllerEnqueue(branch2, value);
683 } 777 }
684 }); 778 });
685 } 779 }
686 780
687 function cancel1(reason) { 781 function cancel1(reason) {
688 canceled1 = true; 782 canceled1 = true;
689 reason1 = reason; 783 reason1 = reason;
690 784
691 if (canceled2 === true) { 785 if (canceled2 === true) {
692 const compositeReason = [reason1, reason2]; 786 const compositeReason = [reason1, reason2];
693 const cancelResult = CancelReadableStream(stream, compositeReason); 787 const cancelResult = ReadableStreamCancel(stream, compositeReason);
694 v8.resolvePromise(promise, cancelResult); 788 v8.resolvePromise(promise, cancelResult);
695 } 789 }
696 790
697 return promise; 791 return promise;
698 } 792 }
699 793
700 function cancel2(reason) { 794 function cancel2(reason) {
701 canceled2 = true; 795 canceled2 = true;
702 reason2 = reason; 796 reason2 = reason;
703 797
704 if (canceled1 === true) { 798 if (canceled1 === true) {
705 const compositeReason = [reason1, reason2]; 799 const compositeReason = [reason1, reason2];
706 const cancelResult = CancelReadableStream(stream, compositeReason); 800 const cancelResult = ReadableStreamCancel(stream, compositeReason);
707 v8.resolvePromise(promise, cancelResult); 801 v8.resolvePromise(promise, cancelResult);
708 } 802 }
709 803
710 return promise; 804 return promise;
711 } 805 }
712 } 806 }
713 807
714 // 808 //
715 // Queue-with-sizes 809 // Queue-with-sizes
716 // Modified from taking the queue (as in the spec) to taking the stream, so we 810 // Modified from taking the queue (as in the spec) to taking the stream, so we
717 // can modify the queue size alongside. 811 // can modify the queue size alongside.
718 // 812 //
719 813
720 function DequeueValue(stream) { 814 function DequeueValue(controller) {
721 const result = stream[readableStreamQueue].shift(); 815 const result = controller[readableStreamDefaultControllerQueue].shift();
722 stream[readableStreamQueueSize] -= result.size; 816 controller[readableStreamDefaultControllerQueueSize] -= result.size;
723 return result.value; 817 return result.value;
724 } 818 }
725 819
726 function EnqueueValueWithSize(stream, value, size) { 820 function EnqueueValueWithSize(controller, value, size) {
727 size = Number(size); 821 size = Number(size);
728 if (Number_isNaN(size) || size === +Infinity || size < 0) { 822 if (Number_isNaN(size) || size === +Infinity || size < 0) {
729 throw new RangeError(errInvalidSize); 823 throw new RangeError(errInvalidSize);
730 } 824 }
731 825
732 stream[readableStreamQueueSize] += size; 826 controller[readableStreamDefaultControllerQueueSize] += size;
733 stream[readableStreamQueue].push({value, size}); 827 controller[readableStreamDefaultControllerQueue].push({value, size});
734 } 828 }
735 829
736 function GetTotalQueueSize(stream) { return stream[readableStreamQueueSize]; } 830 function GetTotalQueueSize(controller) { return controller[readableStreamDefau ltControllerQueueSize]; }
737 831
738 // 832 //
739 // Other helpers 833 // Other helpers
740 // 834 //
741 835
742 function ValidateAndNormalizeQueuingStrategy(size, highWaterMark) { 836 function ValidateAndNormalizeQueuingStrategy(size, highWaterMark) {
743 if (size !== undefined && typeof size !== 'function') { 837 if (size !== undefined && typeof size !== 'function') {
744 throw new TypeError(errSizeNotAFunction); 838 throw new TypeError(errSizeNotAFunction);
745 } 839 }
746 840
(...skipping 29 matching lines...) Expand all
776 method = O[P]; 870 method = O[P];
777 } catch (methodE) { 871 } catch (methodE) {
778 return Promise_reject(methodE); 872 return Promise_reject(methodE);
779 } 873 }
780 874
781 if (method === undefined) { 875 if (method === undefined) {
782 return Promise_resolve(undefined); 876 return Promise_resolve(undefined);
783 } 877 }
784 878
785 if (typeof method !== 'function') { 879 if (typeof method !== 'function') {
786 return Promise_reject(errTmplMustBeFunctionOrUndefined(nameForError)); 880 return Promise_reject(new TypeError(errTmplMustBeFunctionOrUndefined(nameF orError)));
787 } 881 }
788 882
789 try { 883 try {
790 return Promise_resolve(callFunction(method, O, arg)); 884 return Promise_resolve(callFunction(method, O, arg));
791 } catch (e) { 885 } catch (e) {
792 return Promise_reject(e); 886 return Promise_reject(e);
793 } 887 }
794 } 888 }
795 889
796 function CreateIterResultObject(value, done) { return {value, done}; } 890 function CreateIterResultObject(value, done) { return {value, done}; }
797 891
798 892
799 // 893 //
800 // Additions to the global 894 // Additions to the global
801 // 895 //
802 896
803 defineProperty(global, 'ReadableStream', { 897 defineProperty(global, 'ReadableStream', {
804 value: ReadableStream, 898 value: ReadableStream,
805 enumerable: false, 899 enumerable: false,
806 configurable: true, 900 configurable: true,
807 writable: true 901 writable: true
808 }); 902 });
809 903
810 // 904 //
811 // Exports to Blink 905 // Exports to Blink
812 // 906 //
813 907
814 binding.AcquireReadableStreamReader = AcquireReadableStreamReader; 908 binding.AcquireReadableStreamDefaultReader = AcquireReadableStreamDefaultReade r;
815 binding.IsReadableStream = IsReadableStream; 909 binding.IsReadableStream = IsReadableStream;
816 binding.IsReadableStreamDisturbed = IsReadableStreamDisturbed; 910 binding.IsReadableStreamDisturbed = IsReadableStreamDisturbed;
817 binding.IsReadableStreamLocked = IsReadableStreamLocked; 911 binding.IsReadableStreamLocked = IsReadableStreamLocked;
818 binding.IsReadableStreamReadable = IsReadableStreamReadable; 912 binding.IsReadableStreamReadable = IsReadableStreamReadable;
819 binding.IsReadableStreamClosed = IsReadableStreamClosed; 913 binding.IsReadableStreamClosed = IsReadableStreamClosed;
820 binding.IsReadableStreamErrored = IsReadableStreamErrored; 914 binding.IsReadableStreamErrored = IsReadableStreamErrored;
821 binding.IsReadableStreamReader = IsReadableStreamReader; 915 binding.IsReadableStreamDefaultReader = IsReadableStreamDefaultReader;
822 binding.ReadFromReadableStreamReader = ReadFromReadableStreamReader; 916 binding.ReadableStreamDefaultReaderRead = ReadableStreamDefaultReaderRead;
823 917
824 binding.CloseReadableStream = CloseReadableStream; 918 binding.ReadableStreamDefaultControllerClose = ReadableStreamDefaultController Close;
825 binding.GetReadableStreamDesiredSize = GetReadableStreamDesiredSize; 919 binding.ReadableStreamDefaultControllerGetDesiredSize = ReadableStreamDefaultC ontrollerGetDesiredSize;
826 binding.EnqueueInReadableStream = EnqueueInReadableStream; 920 binding.ReadableStreamDefaultControllerEnqueue = ReadableStreamDefaultControll erEnqueue;
827 binding.ErrorReadableStream = ErrorReadableStream; 921 binding.ReadableStreamDefaultControllerError = ReadableStreamDefaultController Error;
828 922
829 binding.createReadableStreamWithExternalController = 923 binding.createReadableStreamWithExternalController =
830 (underlyingSource, strategy) => { 924 (underlyingSource, strategy) => {
831 return new ReadableStream( 925 return new ReadableStream(
832 underlyingSource, strategy, createWithExternalControllerSentinel); 926 underlyingSource, strategy, createWithExternalControllerSentinel);
833 }; 927 };
834 }); 928 });
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698