Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(437)

Side by Side Diff: third_party/WebKit/Source/core/streams/ReadableStream.js

Issue 1902673003: Reflect recent spec changes to V8 Extra ReadableStream impl (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Addressed #18 Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 (function(global, binding, v8) { 5 (function(global, binding, v8) {
6 'use strict'; 6 'use strict';
7 7
8 const readableStreamReader = v8.createPrivateSymbol('[[reader]]');
9 const readableStreamStoredError = v8.createPrivateSymbol('[[storedError]]');
8 const readableStreamController = v8.createPrivateSymbol('[[controller]]'); 10 const readableStreamController = v8.createPrivateSymbol('[[controller]]');
9 const readableStreamQueue = v8.createPrivateSymbol('[[queue]]');
10 const readableStreamQueueSize =
11 v8.createPrivateSymbol('[[queue]] total size');
12 const readableStreamReader = v8.createPrivateSymbol('[[reader]]');
13 const readableStreamState = v8.createPrivateSymbol('[[state]]');
14 const readableStreamStoredError = v8.createPrivateSymbol('[[storedError]]');
15 const readableStreamStrategySize = v8.createPrivateSymbol('[[strategySize]]');
16 const readableStreamStrategyHWM = v8.createPrivateSymbol('[[strategyHWM]]');
17 const readableStreamUnderlyingSource =
18 v8.createPrivateSymbol('[[underlyingSource]]');
19
20 const readableStreamControllerControlledReadableStream =
21 v8.createPrivateSymbol('[[controlledReadableStream]]');
22 11
23 const readableStreamReaderClosedPromise = 12 const readableStreamReaderClosedPromise =
24 v8.createPrivateSymbol('[[closedPromise]]'); 13 v8.createPrivateSymbol('[[closedPromise]]');
25 const readableStreamReaderOwnerReadableStream = 14 const readableStreamReaderOwnerReadableStream =
26 v8.createPrivateSymbol('[[ownerReadableStream]]'); 15 v8.createPrivateSymbol('[[ownerReadableStream]]');
27 const readableStreamReaderReadRequests = 16
17 const readableStreamDefaultReaderReadRequests =
28 v8.createPrivateSymbol('[[readRequests]]'); 18 v8.createPrivateSymbol('[[readRequests]]');
29 19
30 const createWithExternalControllerSentinel = 20 const createWithExternalControllerSentinel =
31 v8.createPrivateSymbol('flag for UA-created ReadableStream to pass'); 21 v8.createPrivateSymbol('flag for UA-created ReadableStream to pass');
32 22
23 const readableStreamBits = v8.createPrivateSymbol('bit field for [[state]] and [[disturbed]]');
24 const DISTURBED = 0b1;
25 // The 2nd and 3rd bit are for [[state]].
26 const STATE_MASK = 0b110;
27 const STATE_BITS_OFFSET = 1;
33 const STATE_READABLE = 0; 28 const STATE_READABLE = 0;
34 const STATE_CLOSED = 1; 29 const STATE_CLOSED = 1;
35 const STATE_ERRORED = 2; 30 const STATE_ERRORED = 2;
36 31
37 const readableStreamBits = v8.createPrivateSymbol( 32 const readableStreamDefaultControllerUnderlyingSource =
38 'bit field for [[started]], [[closeRequested]], [[pulling]], [[pullAgain]] , [[disturbed]]'); 33 v8.createPrivateSymbol('[[underlyingSource]]');
34 const readableStreamDefaultControllerControlledReadableStream =
35 v8.createPrivateSymbol('[[controlledReadableStream]]');
36 const readableStreamDefaultControllerQueue = v8.createPrivateSymbol('[[queue]] ');
37 const readableStreamDefaultControllerQueueSize =
38 v8.createPrivateSymbol('[[queue]] total size');
39 const readableStreamDefaultControllerStrategySize =
40 v8.createPrivateSymbol('[[strategySize]]');
41 const readableStreamDefaultControllerStrategyHWM =
42 v8.createPrivateSymbol('[[strategyHWM]]');
43
44 const readableStreamDefaultControllerBits = v8.createPrivateSymbol(
45 'bit field for [[started]], [[closeRequested]], [[pulling]], [[pullAgain]] ');
39 const STARTED = 0b1; 46 const STARTED = 0b1;
40 const CLOSE_REQUESTED = 0b10; 47 const CLOSE_REQUESTED = 0b10;
41 const PULLING = 0b100; 48 const PULLING = 0b100;
42 const PULL_AGAIN = 0b1000; 49 const PULL_AGAIN = 0b1000;
43 const DISTURBED = 0b10000; 50 const EXTERNALLY_CONTROLLED = 0b10000;
51
52 const readableStreamControllerCancel =
53 v8.createPrivateSymbol('[[InternalCancel]]');
54 const readableStreamControllerPull = v8.createPrivateSymbol('[[InternalPull]]' );
44 55
45 const undefined = global.undefined; 56 const undefined = global.undefined;
46 const Infinity = global.Infinity; 57 const Infinity = global.Infinity;
47 58
48 const defineProperty = global.Object.defineProperty; 59 const defineProperty = global.Object.defineProperty;
49 const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty); 60 const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty);
50 const callFunction = v8.uncurryThis(global.Function.prototype.call); 61 const callFunction = v8.uncurryThis(global.Function.prototype.call);
51 62
52 const TypeError = global.TypeError; 63 const TypeError = global.TypeError;
53 const RangeError = global.RangeError; 64 const RangeError = global.RangeError;
54 65
55 const Number = global.Number; 66 const Number = global.Number;
56 const Number_isNaN = Number.isNaN; 67 const Number_isNaN = Number.isNaN;
57 const Number_isFinite = Number.isFinite; 68 const Number_isFinite = Number.isFinite;
58 69
59 const Promise = global.Promise; 70 const Promise = global.Promise;
60 const thenPromise = v8.uncurryThis(Promise.prototype.then); 71 const thenPromise = v8.uncurryThis(Promise.prototype.then);
61 const Promise_resolve = v8.simpleBind(Promise.resolve, Promise); 72 const Promise_resolve = v8.simpleBind(Promise.resolve, Promise);
62 const Promise_reject = v8.simpleBind(Promise.reject, Promise); 73 const Promise_reject = v8.simpleBind(Promise.reject, Promise);
63 74
64 const errIllegalInvocation = 'Illegal invocation'; 75 const errIllegalInvocation = 'Illegal invocation';
65 const errIllegalConstructor = 'Illegal constructor'; 76 const errIllegalConstructor = 'Illegal constructor';
66 const errCancelLockedStream = 77 const errCancelLockedStream =
67 'Cannot cancel a readable stream that is locked to a reader'; 78 'Cannot cancel a readable stream that is locked to a reader';
68 const errEnqueueInCloseRequestedStream = 79 const errEnqueueCloseRequestedStream =
69 'Cannot enqueue a chunk into a readable stream that is closed or has been requested to be closed'; 80 'Cannot enqueue a chunk into a readable stream that is closed or has been requested to be closed';
70 const errCancelReleasedReader = 81 const errCancelReleasedReader =
71 'This readable stream reader has been released and cannot be used to cance l its previous owner stream'; 82 'This readable stream reader has been released and cannot be used to cance l its previous owner stream';
72 const errReadReleasedReader = 83 const errReadReleasedReader =
73 'This readable stream reader has been released and cannot be used to read from its previous owner stream'; 84 'This readable stream reader has been released and cannot be used to read from its previous owner stream';
74 const errCloseCloseRequestedStream = 85 const errCloseCloseRequestedStream =
75 'Cannot close a readable stream that has already been requested to be clos ed'; 86 'Cannot close a readable stream that has already been requested to be clos ed';
87 const errEnqueueClosedStream = 'Cannot enqueue a chunk into a closed readable stream';
88 const errEnqueueErroredStream = 'Cannot enqueue a chunk into an errored readab le stream';
89 const errCloseClosedStream = 'Cannot close a closed readable stream';
76 const errCloseErroredStream = 'Cannot close an errored readable stream'; 90 const errCloseErroredStream = 'Cannot close an errored readable stream';
77 const errErrorClosedStream = 'Cannot error a close readable stream'; 91 const errErrorClosedStream = 'Cannot error a close readable stream';
78 const errErrorErroredStream = 92 const errErrorErroredStream =
79 'Cannot error a readable stream that is already errored'; 93 'Cannot error a readable stream that is already errored';
94 const errGetReaderNotByteStream = 'This readable stream does not support BYOB readers';
95 const errGetReaderBadMode = 'Invalid reader mode given: expected undefined or "byob"';
80 const errReaderConstructorBadArgument = 96 const errReaderConstructorBadArgument =
81 'ReadableStreamReader constructor argument is not a readable stream'; 97 'ReadableStreamReader constructor argument is not a readable stream';
82 const errReaderConstructorStreamAlreadyLocked = 98 const errReaderConstructorStreamAlreadyLocked =
83 'ReadableStreamReader constructor can only accept readable streams that ar e not yet locked to a reader'; 99 'ReadableStreamReader constructor can only accept readable streams that ar e not yet locked to a reader';
84 const errReleaseReaderWithPendingRead = 100 const errReleaseReaderWithPendingRead =
85 'Cannot release a readable stream reader when it still has outstanding rea d() calls that have not yet settled'; 101 'Cannot release a readable stream reader when it still has outstanding rea d() calls that have not yet settled';
86 const errReleasedReaderClosedPromise = 102 const errReleasedReaderClosedPromise =
87 'This readable stream reader has been released and cannot be used to monit or the stream\'s state'; 103 'This readable stream reader has been released and cannot be used to monit or the stream\'s state';
88 const errInvalidSize = 104 const errInvalidSize =
89 'The return value of a queuing strategy\'s size function must be a finite, non-NaN, non-negative number'; 105 'The return value of a queuing strategy\'s size function must be a finite, non-NaN, non-negative number';
90 const errSizeNotAFunction = 106 const errSizeNotAFunction =
91 'A queuing strategy\'s size property must be a function'; 107 'A queuing strategy\'s size property must be a function';
92 const errInvalidHWM = 108 const errInvalidHWM =
93 'A queueing strategy\'s highWaterMark property must be a nonnegative, non- NaN number'; 109 'A queueing strategy\'s highWaterMark property must be a nonnegative, non- NaN number';
94 const errTmplMustBeFunctionOrUndefined = name => 110 const errTmplMustBeFunctionOrUndefined = name =>
95 `${name} must be a function or undefined`; 111 `${name} must be a function or undefined`;
96 112
97 class ReadableStream { 113 class ReadableStream {
98 constructor() { 114 constructor() {
99 // TODO(domenic): when V8 gets default parameters and destructuring, all 115 // TODO(domenic): when V8 gets default parameters and destructuring, all
100 // this can be cleaned up. 116 // this can be cleaned up.
101 const underlyingSource = arguments[0] === undefined ? {} : arguments[0]; 117 const underlyingSource = arguments[0] === undefined ? {} : arguments[0];
102 const strategy = arguments[1] === undefined ? {} : arguments[1]; 118 const strategy = arguments[1] === undefined ? {} : arguments[1];
103 const size = strategy.size; 119 const size = strategy.size;
104 let highWaterMark = strategy.highWaterMark; 120 let highWaterMark = strategy.highWaterMark;
105 if (highWaterMark === undefined) { 121 if (highWaterMark === undefined) {
106 highWaterMark = 1; 122 highWaterMark = 1;
107 } 123 }
108 124
109 const normalizedStrategy =
110 ValidateAndNormalizeQueuingStrategy(size, highWaterMark);
111
112 this[readableStreamUnderlyingSource] = underlyingSource;
113
114 this[readableStreamQueue] = new v8.InternalPackedArray();
115 this[readableStreamQueueSize] = 0;
116
117 this[readableStreamState] = STATE_READABLE;
118 this[readableStreamBits] = 0b0; 125 this[readableStreamBits] = 0b0;
126 ReadableStreamSetState(this, STATE_READABLE);
119 this[readableStreamReader] = undefined; 127 this[readableStreamReader] = undefined;
120 this[readableStreamStoredError] = undefined; 128 this[readableStreamStoredError] = undefined;
121 129
122 this[readableStreamStrategySize] = normalizedStrategy.size;
123 this[readableStreamStrategyHWM] = normalizedStrategy.highWaterMark;
124
125 // Avoid allocating the controller if the stream is going to be controlled 130 // Avoid allocating the controller if the stream is going to be controlled
126 // externally (i.e. from C++) anyway. All calls to underlyingSource 131 // externally (i.e. from C++) anyway. All calls to underlyingSource
127 // methods will disregard their controller argument in such situations 132 // methods will disregard their controller argument in such situations
128 // (but see below). 133 // (but see below).
129 134
130 const isControlledExternally = 135 this[readableStreamController] = undefined;
131 arguments[2] === createWithExternalControllerSentinel;
132 const controller =
133 isControlledExternally ? null : new ReadableStreamController(this);
134 this[readableStreamController] = controller;
135 136
136 // We need to pass ourself to the underlyingSource start method for 137 const type = underlyingSource.type;
137 // externally-controlled streams. We use the now-useless controller 138 const typeString = String(type);
138 // argument to do so. 139 if (typeString === 'bytes') {
139 const argToStart = isControlledExternally ? this : controller; 140 throw new RangeError('bytes type is not yet implemented');
141 } else if (type !== undefined) {
142 throw new RangeError('Invalid type is specified');
143 }
140 144
141 const startResult = CallOrNoop( 145 this[readableStreamController] =
142 underlyingSource, 'start', argToStart, 'underlyingSource.start'); 146 new ReadableStreamDefaultController(this, underlyingSource, size, high WaterMark, arguments[2] === createWithExternalControllerSentinel);
143 thenPromise(Promise_resolve(startResult),
144 () => {
145 this[readableStreamBits] |= STARTED;
146 RequestReadableStreamPull(this);
147 },
148 r => {
149 if (this[readableStreamState] === STATE_READABLE) {
150 return ErrorReadableStream(this, r);
151 }
152 });
153 } 147 }
154 148
155 get locked() { 149 get locked() {
156 if (IsReadableStream(this) === false) { 150 if (IsReadableStream(this) === false) {
157 throw new TypeError(errIllegalInvocation); 151 throw new TypeError(errIllegalInvocation);
158 } 152 }
159 153
160 return IsReadableStreamLocked(this); 154 return IsReadableStreamLocked(this);
161 } 155 }
162 156
163 cancel(reason) { 157 cancel(reason) {
164 if (IsReadableStream(this) === false) { 158 if (IsReadableStream(this) === false) {
165 return Promise_reject(new TypeError(errIllegalInvocation)); 159 return Promise_reject(new TypeError(errIllegalInvocation));
166 } 160 }
167 161
168 if (IsReadableStreamLocked(this) === true) { 162 if (IsReadableStreamLocked(this) === true) {
169 return Promise_reject(new TypeError(errCancelLockedStream)); 163 return Promise_reject(new TypeError(errCancelLockedStream));
170 } 164 }
171 165
172 return CancelReadableStream(this, reason); 166 return ReadableStreamCancel(this, reason);
173 } 167 }
174 168
175 getReader() { 169 getReader({ mode } = {}) {
176 if (IsReadableStream(this) === false) { 170 if (IsReadableStream(this) === false) {
177 throw new TypeError(errIllegalInvocation); 171 throw new TypeError(errIllegalInvocation);
178 } 172 }
179 173
180 return AcquireReadableStreamReader(this); 174 if (mode === 'byob') {
175 if (IsReadableByteStreamDefaultController(this[readableStreamController] ) === false) {
176 throw new TypeError(errGetReaderNotByteStream);
177 }
178
179 return AcquireReadableStreamBYOBReader(this);
180 }
181
182 if (mode === undefined) {
183 return AcquireReadableStreamDefaultReader(this);
184 }
185
186 throw new RangeError(errGetReaderBadMode);
181 } 187 }
182 188
183 tee() { 189 tee() {
184 if (IsReadableStream(this) === false) { 190 if (IsReadableStream(this) === false) {
185 throw new TypeError(errIllegalInvocation); 191 throw new TypeError(errIllegalInvocation);
186 } 192 }
187 193
188 return TeeReadableStream(this); 194 return ReadableStreamTee(this);
189 } 195 }
190 } 196 }
191 197
192 class ReadableStreamController { 198 class ReadableStreamDefaultController {
193 constructor(stream) { 199 constructor(stream, underlyingSource, size, highWaterMark, isExternallyContr olled) {
194 if (IsReadableStream(stream) === false) { 200 if (IsReadableStream(stream) === false) {
195 throw new TypeError(errIllegalConstructor); 201 throw new TypeError(errIllegalConstructor);
196 } 202 }
197 203
198 if (stream[readableStreamController] !== undefined) { 204 if (stream[readableStreamController] !== undefined) {
199 throw new TypeError(errIllegalConstructor); 205 throw new TypeError(errIllegalConstructor);
200 } 206 }
201 207
202 this[readableStreamControllerControlledReadableStream] = stream; 208 this[readableStreamDefaultControllerControlledReadableStream] = stream;
209
210 this[readableStreamDefaultControllerUnderlyingSource] = underlyingSource;
211
212 this[readableStreamDefaultControllerQueue] = new v8.InternalPackedArray();
213 this[readableStreamDefaultControllerQueueSize] = 0;
214
215 this[readableStreamDefaultControllerBits] = 0b0;
216 if (isExternallyControlled === true) {
217 this[readableStreamDefaultControllerBits] |= EXTERNALLY_CONTROLLED;
218 }
219
220 const normalizedStrategy =
221 ValidateAndNormalizeQueuingStrategy(size, highWaterMark);
222 this[readableStreamDefaultControllerStrategySize] = normalizedStrategy.siz e;
223 this[readableStreamDefaultControllerStrategyHWM] = normalizedStrategy.high WaterMark;
224
225 const controller = this;
226
227 const startResult = CallOrNoop(
228 underlyingSource, 'start', this, 'underlyingSource.start');
229 thenPromise(Promise_resolve(startResult),
230 () => {
231 controller[readableStreamDefaultControllerBits] |= STARTED;
232 ReadableStreamDefaultControllerCallPullIfNeeded(controller);
233 },
234 r => {
235 if (ReadableStreamGetState(stream) === STATE_READABLE) {
236 ReadableStreamDefaultControllerError(controller, r);
237 }
238 });
203 } 239 }
204 240
205 get desiredSize() { 241 get desiredSize() {
206 if (IsReadableStreamController(this) === false) { 242 if (IsReadableStreamDefaultController(this) === false) {
207 throw new TypeError(errIllegalInvocation); 243 throw new TypeError(errIllegalInvocation);
208 } 244 }
209 245
210 return GetReadableStreamDesiredSize( 246 return ReadableStreamDefaultControllerGetDesiredSize(this);
211 this[readableStreamControllerControlledReadableStream]);
212 } 247 }
213 248
214 close() { 249 close() {
215 if (IsReadableStreamController(this) === false) { 250 if (IsReadableStreamDefaultController(this) === false) {
216 throw new TypeError(errIllegalInvocation); 251 throw new TypeError(errIllegalInvocation);
217 } 252 }
218 253
219 const stream = this[readableStreamControllerControlledReadableStream]; 254 const stream = this[readableStreamDefaultControllerControlledReadableStrea m];
220 255
221 if (stream[readableStreamBits] & CLOSE_REQUESTED) { 256 if (this[readableStreamDefaultControllerBits] & CLOSE_REQUESTED) {
222 throw new TypeError(errCloseCloseRequestedStream); 257 throw new TypeError(errCloseCloseRequestedStream);
223 } 258 }
224 if (stream[readableStreamState] === STATE_ERRORED) { 259
260 const state = ReadableStreamGetState(stream);
261 if (state === STATE_ERRORED) {
225 throw new TypeError(errCloseErroredStream); 262 throw new TypeError(errCloseErroredStream);
226 } 263 }
264 if (state === STATE_CLOSED) {
265 throw new TypeError(errCloseClosedStream);
266 }
227 267
228 return CloseReadableStream(stream); 268 return ReadableStreamDefaultControllerClose(this);
229 } 269 }
230 270
231 enqueue(chunk) { 271 enqueue(chunk) {
232 if (IsReadableStreamController(this) === false) { 272 if (IsReadableStreamDefaultController(this) === false) {
233 throw new TypeError(errIllegalInvocation); 273 throw new TypeError(errIllegalInvocation);
234 } 274 }
235 275
236 const stream = this[readableStreamControllerControlledReadableStream]; 276 const stream = this[readableStreamDefaultControllerControlledReadableStrea m];
237 277
238 if (stream[readableStreamState] === STATE_ERRORED) { 278 if (this[readableStreamDefaultControllerBits] & CLOSE_REQUESTED) {
239 throw stream[readableStreamStoredError]; 279 throw new TypeError(errEnqueueCloseRequestedStream);
240 } 280 }
241 281
242 if (stream[readableStreamBits] & CLOSE_REQUESTED) { 282 const state = ReadableStreamGetState(stream);
243 throw new TypeError(errEnqueueInCloseRequestedStream); 283 if (state === STATE_ERRORED) {
284 throw new TypeError(errEnqueueErroredStream);
285 }
286 if (state === STATE_CLOSED) {
287 throw new TypeError(errEnqueueClosedStream);
244 } 288 }
245 289
246 return EnqueueInReadableStream(stream, chunk); 290 return ReadableStreamDefaultControllerEnqueue(this, chunk);
247 } 291 }
248 292
249 error(e) { 293 error(e) {
250 if (IsReadableStreamController(this) === false) { 294 if (IsReadableStreamDefaultController(this) === false) {
251 throw new TypeError(errIllegalInvocation); 295 throw new TypeError(errIllegalInvocation);
252 } 296 }
253 297
254 const stream = this[readableStreamControllerControlledReadableStream]; 298 const stream = this[readableStreamDefaultControllerControlledReadableStrea m];
255 299
256 const state = stream[readableStreamState]; 300 const state = ReadableStreamGetState(stream);
257 if (state !== STATE_READABLE) { 301 if (state === STATE_ERRORED) {
258 if (state === STATE_ERRORED) { 302 throw new TypeError(errErrorErroredStream);
259 throw new TypeError(errErrorErroredStream); 303 }
260 } 304 if (state === STATE_CLOSED) {
261 if (state === STATE_CLOSED) { 305 throw new TypeError(errErrorClosedStream);
262 throw new TypeError(errErrorClosedStream);
263 }
264 } 306 }
265 307
266 return ErrorReadableStream(stream, e); 308 return ReadableStreamDefaultControllerError(this, e);
267 } 309 }
268 } 310 }
269 311
270 class ReadableStreamReader { 312 function ReadableStreamDefaultControllerCancel(controller, reason) {
313 controller[readableStreamDefaultControllerQueue] = new v8.InternalPackedArra y();
314
315 const underlyingSource = controller[readableStreamDefaultControllerUnderlyin gSource];
316 return PromiseCallOrNoop(underlyingSource, 'cancel', reason, 'underlyingSour ce.cancel');
317 }
318
319 function ReadableStreamDefaultControllerPull(controller) {
320 const stream = controller[readableStreamDefaultControllerControlledReadableS tream];
321
322 if (controller[readableStreamDefaultControllerQueue].length > 0) {
323 const chunk = DequeueValue(controller);
324
325 if ((controller[readableStreamDefaultControllerBits] & CLOSE_REQUESTED) &&
326 controller[readableStreamDefaultControllerQueue].length === 0) {
327 ReadableStreamClose(stream);
328 } else {
329 ReadableStreamDefaultControllerCallPullIfNeeded(controller);
330 }
331
332 return Promise_resolve(CreateIterResultObject(chunk, false));
333 }
334
335 const pendingPromise = ReadableStreamAddReadRequest(stream);
336 ReadableStreamDefaultControllerCallPullIfNeeded(controller);
337 return pendingPromise;
338 }
339
340 function ReadableStreamAddReadRequest(stream) {
341 const promise = v8.createPromise();
342 stream[readableStreamReader][readableStreamDefaultReaderReadRequests].push(p romise);
343 return promise;
344 }
345
346 class ReadableStreamDefaultReader {
271 constructor(stream) { 347 constructor(stream) {
272 if (IsReadableStream(stream) === false) { 348 if (IsReadableStream(stream) === false) {
273 throw new TypeError(errReaderConstructorBadArgument); 349 throw new TypeError(errReaderConstructorBadArgument);
274 } 350 }
275 if (IsReadableStreamLocked(stream) === true) { 351 if (IsReadableStreamLocked(stream) === true) {
276 throw new TypeError(errReaderConstructorStreamAlreadyLocked); 352 throw new TypeError(errReaderConstructorStreamAlreadyLocked);
277 } 353 }
278 354
279 // TODO(yhirano): Remove this when we don't need hasPendingActivity in 355 ReadableStreamReaderGenericInitialize(this, stream);
280 // blink::UnderlyingSourceBase.
281 if (stream[readableStreamController] === null) {
282 // The stream is created with an external controller (i.e. made in
283 // Blink).
284 const underlyingSource = stream[readableStreamUnderlyingSource];
285 callFunction(underlyingSource.notifyLockAcquired, underlyingSource);
286 }
287 356
288 this[readableStreamReaderOwnerReadableStream] = stream; 357 this[readableStreamDefaultReaderReadRequests] = new v8.InternalPackedArray ();
289 stream[readableStreamReader] = this;
290
291 this[readableStreamReaderReadRequests] = new v8.InternalPackedArray();
292
293 switch (stream[readableStreamState]) {
294 case STATE_READABLE:
295 this[readableStreamReaderClosedPromise] = v8.createPromise();
296 break;
297 case STATE_CLOSED:
298 this[readableStreamReaderClosedPromise] = Promise_resolve(undefined);
299 break;
300 case STATE_ERRORED:
301 this[readableStreamReaderClosedPromise] =
302 Promise_reject(stream[readableStreamStoredError]);
303 break;
304 }
305 } 358 }
306 359
307 get closed() { 360 get closed() {
308 if (IsReadableStreamReader(this) === false) { 361 if (IsReadableStreamDefaultReader(this) === false) {
309 return Promise_reject(new TypeError(errIllegalInvocation)); 362 return Promise_reject(new TypeError(errIllegalInvocation));
310 } 363 }
311 364
312 return this[readableStreamReaderClosedPromise]; 365 return this[readableStreamReaderClosedPromise];
313 } 366 }
314 367
315 cancel(reason) { 368 cancel(reason) {
316 if (IsReadableStreamReader(this) === false) { 369 if (IsReadableStreamDefaultReader(this) === false) {
317 return Promise_reject(new TypeError(errIllegalInvocation)); 370 return Promise_reject(new TypeError(errIllegalInvocation));
318 } 371 }
319 372
320 const stream = this[readableStreamReaderOwnerReadableStream]; 373 const stream = this[readableStreamReaderOwnerReadableStream];
321 if (stream === undefined) { 374 if (stream === undefined) {
322 return Promise_reject(new TypeError(errCancelReleasedReader)); 375 return Promise_reject(new TypeError(errCancelReleasedReader));
323 } 376 }
324 377
325 return CancelReadableStream(stream, reason); 378 return ReadableStreamReaderGenericCancel(this, reason);
yhirano 2016/05/06 11:26:58 [optional]: the spec uses ReadableStreamCancel dir
tyoshino (SeeGerritForStatus) 2016/05/06 12:40:48 Looks the spec is broken. I'll fix.
326 } 379 }
327 380
328 read() { 381 read() {
329 if (IsReadableStreamReader(this) === false) { 382 if (IsReadableStreamDefaultReader(this) === false) {
330 return Promise_reject(new TypeError(errIllegalInvocation)); 383 return Promise_reject(new TypeError(errIllegalInvocation));
331 } 384 }
332 385
333 if (this[readableStreamReaderOwnerReadableStream] === undefined) { 386 if (this[readableStreamReaderOwnerReadableStream] === undefined) {
334 return Promise_reject(new TypeError(errReadReleasedReader)); 387 return Promise_reject(new TypeError(errReadReleasedReader));
335 } 388 }
336 389
337 return ReadFromReadableStreamReader(this); 390 return ReadableStreamDefaultReaderRead(this);
338 } 391 }
339 392
340 releaseLock() { 393 releaseLock() {
341 if (IsReadableStreamReader(this) === false) { 394 if (IsReadableStreamDefaultReader(this) === false) {
342 throw new TypeError(errIllegalInvocation); 395 throw new TypeError(errIllegalInvocation);
343 } 396 }
344 397
345 const stream = this[readableStreamReaderOwnerReadableStream]; 398 const stream = this[readableStreamReaderOwnerReadableStream];
346 if (stream === undefined) { 399 if (stream === undefined) {
347 return undefined; 400 return undefined;
348 } 401 }
349 402
350 if (this[readableStreamReaderReadRequests].length > 0) { 403 if (this[readableStreamDefaultReaderReadRequests].length > 0) {
351 throw new TypeError(errReleaseReaderWithPendingRead); 404 throw new TypeError(errReleaseReaderWithPendingRead);
352 } 405 }
353 406
354 // TODO(yhirano): Remove this when we don't need hasPendingActivity in 407 ReadableStreamReaderGenericRelease(this);
355 // blink::UnderlyingSourceBase. 408 }
356 if (stream[readableStreamController] === null) { 409 }
357 // The stream is created with an external controller (i.e. made in
358 // Blink).
359 const underlyingSource = stream[readableStreamUnderlyingSource];
360 callFunction(underlyingSource.notifyLockReleased, underlyingSource);
361 }
362 410
363 if (stream[readableStreamState] === STATE_READABLE) { 411 function ReadableStreamReaderGenericCancel(reader, reason) {
364 v8.rejectPromise(this[readableStreamReaderClosedPromise], 412 return ReadableStreamCancel(reader[readableStreamReaderOwnerReadableStream], reason);
365 new TypeError(errReleasedReaderClosedPromise));
366 } else {
367 this[readableStreamReaderClosedPromise] =
368 Promise_reject(new TypeError(errReleasedReaderClosedPromise));
369 }
370
371 this[readableStreamReaderOwnerReadableStream][readableStreamReader] =
372 undefined;
373 this[readableStreamReaderOwnerReadableStream] = undefined;
374 }
375 } 413 }
376 414
377 // 415 //
378 // Readable stream abstract operations 416 // Readable stream abstract operations
379 // 417 //
380 418
381 function AcquireReadableStreamReader(stream) { 419 function AcquireReadableStreamDefaultReader(stream) {
382 return new ReadableStreamReader(stream); 420 return new ReadableStreamDefaultReader(stream);
383 } 421 }
384 422
385 function CancelReadableStream(stream, reason) { 423 function ReadableStreamCancel(stream, reason) {
386 stream[readableStreamBits] |= DISTURBED; 424 stream[readableStreamBits] |= DISTURBED;
387 425
388 const state = stream[readableStreamState]; 426 const state = ReadableStreamGetState(stream);
389 if (state === STATE_CLOSED) { 427 if (state === STATE_CLOSED) {
390 return Promise_resolve(undefined); 428 return Promise_resolve(undefined);
391 } 429 }
392 if (state === STATE_ERRORED) { 430 if (state === STATE_ERRORED) {
393 return Promise_reject(stream[readableStreamStoredError]); 431 return Promise_reject(stream[readableStreamStoredError]);
394 } 432 }
395 433
396 stream[readableStreamQueue] = new v8.InternalPackedArray(); 434 ReadableStreamClose(stream);
397 FinishClosingReadableStream(stream);
398 435
399 const underlyingSource = stream[readableStreamUnderlyingSource]; 436 const sourceCancelPromise = ReadableStreamDefaultControllerCancel(stream[rea dableStreamController], reason);
400 const sourceCancelPromise = PromiseCallOrNoop(
401 underlyingSource, 'cancel', reason, 'underlyingSource.cancel');
402 return thenPromise(sourceCancelPromise, () => undefined); 437 return thenPromise(sourceCancelPromise, () => undefined);
403 } 438 }
404 439
405 function CloseReadableStream(stream) { 440 function ReadableStreamDefaultControllerClose(controller) {
406 if (stream[readableStreamState] === STATE_CLOSED) { 441 const stream = controller[readableStreamDefaultControllerControlledReadableS tream];
407 return undefined;
408 }
409 442
410 stream[readableStreamBits] |= CLOSE_REQUESTED; 443 controller[readableStreamDefaultControllerBits] |= CLOSE_REQUESTED;
411 444
412 if (stream[readableStreamQueue].length === 0) { 445 if (controller[readableStreamDefaultControllerQueue].length === 0) {
413 return FinishClosingReadableStream(stream); 446 ReadableStreamClose(stream);
414 } 447 }
415 } 448 }
416 449
417 function EnqueueInReadableStream(stream, chunk) { 450 function ReadableStreamFulfillReadRequest(stream, chunk, done) {
418 if (stream[readableStreamState] === STATE_CLOSED) { 451 const reader = stream[readableStreamReader];
419 return undefined;
420 }
421 452
422 if (IsReadableStreamLocked(stream) === true && 453 const readRequest =
423 stream[readableStreamReader][readableStreamReaderReadRequests].length > 454 stream[readableStreamReader][readableStreamDefaultReaderReadRequests]
424 0) { 455 .shift();
425 const readRequest = 456 v8.resolvePromise(readRequest, CreateIterResultObject(chunk, done));
426 stream[readableStreamReader][readableStreamReaderReadRequests] 457 }
427 .shift(); 458
428 v8.resolvePromise(readRequest, CreateIterResultObject(chunk, false)); 459 function ReadableStreamDefaultControllerEnqueue(controller, chunk) {
460 const stream = controller[readableStreamDefaultControllerControlledReadableS tream];
461
462 if (IsReadableStreamLocked(stream) === true && ReadableStreamGetNumReadReque sts(stream) > 0) {
463 ReadableStreamFulfillReadRequest(stream, chunk, false);
429 } else { 464 } else {
430 let chunkSize = 1; 465 let chunkSize = 1;
431 466
432 const strategySize = stream[readableStreamStrategySize]; 467 const strategySize = controller[readableStreamDefaultControllerStrategySiz e];
433 if (strategySize !== undefined) { 468 if (strategySize !== undefined) {
434 try { 469 try {
435 chunkSize = strategySize(chunk); 470 chunkSize = strategySize(chunk);
436 } catch (chunkSizeE) { 471 } catch (chunkSizeE) {
437 if (stream[readableStreamState] === STATE_READABLE) { 472 if (ReadableStreamGetState(stream) === STATE_READABLE) {
438 ErrorReadableStream(stream, chunkSizeE); 473 ReadableStreamDefaultControllerError(controller, chunkSizeE);
439 } 474 }
440 throw chunkSizeE; 475 throw chunkSizeE;
441 } 476 }
442 } 477 }
443 478
444 try { 479 try {
445 EnqueueValueWithSize(stream, chunk, chunkSize); 480 EnqueueValueWithSize(controller, chunk, chunkSize);
446 } catch (enqueueE) { 481 } catch (enqueueE) {
447 if (stream[readableStreamState] === STATE_READABLE) { 482 if (ReadableStreamGetState(stream) === STATE_READABLE) {
448 ErrorReadableStream(stream, enqueueE); 483 ReadableStreamDefaultControllerError(controller, enqueueE);
449 } 484 }
450 throw enqueueE; 485 throw enqueueE;
451 } 486 }
452 } 487 }
453 488
454 RequestReadableStreamPull(stream); 489 ReadableStreamDefaultControllerCallPullIfNeeded(controller);
455 } 490 }
456 491
457 function ErrorReadableStream(stream, e) { 492 function ReadableStreamGetState(stream) {
458 stream[readableStreamQueue] = new v8.InternalPackedArray(); 493 return (stream[readableStreamBits] & STATE_MASK) >> STATE_BITS_OFFSET;
494 }
495
496 function ReadableStreamSetState(stream, state) {
497 stream[readableStreamBits] = (stream[readableStreamBits] & ~STATE_MASK) |
498 (state << STATE_BITS_OFFSET);
499 }
500
501 function ReadableStreamDefaultControllerError(controller, e) {
502 controller[readableStreamDefaultControllerQueue] = new v8.InternalPackedArra y();
503 const stream = controller[readableStreamDefaultControllerControlledReadableS tream];
504 ReadableStreamError(stream, e);
505 }
506
507 function ReadableStreamError(stream, e) {
459 stream[readableStreamStoredError] = e; 508 stream[readableStreamStoredError] = e;
460 stream[readableStreamState] = STATE_ERRORED; 509 ReadableStreamSetState(stream, STATE_ERRORED);
461 510
462 const reader = stream[readableStreamReader]; 511 const reader = stream[readableStreamReader];
463 if (reader === undefined) { 512 if (reader === undefined) {
464 return undefined; 513 return undefined;
465 } 514 }
466 515
467 const readRequests = reader[readableStreamReaderReadRequests]; 516 if (IsReadableStreamDefaultReader(reader) === true) {
468 for (let i = 0; i < readRequests.length; ++i) { 517 const readRequests = reader[readableStreamDefaultReaderReadRequests];
469 v8.rejectPromise(readRequests[i], e); 518 for (let i = 0; i < readRequests.length; ++i) {
yhirano 2016/05/06 11:26:58 [optional] You can use |for...of|.
tyoshino (SeeGerritForStatus) 2016/05/06 12:40:48 Hmm, tried but it doesn't work. Maybe v8.InternalP
519 v8.rejectPromise(readRequests[i], e);
520 }
521 reader[readableStreamDefaultReaderReadRequests] = new v8.InternalPackedArr ay();
522 } else {
523 const readIntoRequests = reader[readableStreamReaderReadIntoRequests];
yhirano 2016/05/06 11:26:58 Non-default reader appears only here.
tyoshino (SeeGerritForStatus) 2016/05/06 12:40:48 removed
524 for (let i = 0; i < readIntoRequests.length; ++i) {
yhirano 2016/05/06 11:26:58 ditto
tyoshino (SeeGerritForStatus) 2016/05/06 12:40:48 Done.
525 v8.rejectPromise(readIntoRequests[i], e);
526 }
527 reader[readableStreamReaderReadIntoRequests] = new v8.InternalPackedArray( );
470 } 528 }
471 reader[readableStreamReaderReadRequests] = new v8.InternalPackedArray();
472 529
473 v8.rejectPromise(reader[readableStreamReaderClosedPromise], e); 530 v8.rejectPromise(reader[readableStreamReaderClosedPromise], e);
474 } 531 }
475 532
476 function FinishClosingReadableStream(stream) { 533 function ReadableStreamClose(stream) {
477 stream[readableStreamState] = STATE_CLOSED; 534 ReadableStreamSetState(stream, STATE_CLOSED);
478 535
479 const reader = stream[readableStreamReader]; 536 const reader = stream[readableStreamReader];
480 if (reader === undefined) { 537 if (reader === undefined) {
481 return undefined; 538 return undefined;
482 } 539 }
483 540
484 541 if (IsReadableStreamDefaultReader(reader) === true) {
485 const readRequests = reader[readableStreamReaderReadRequests]; 542 const readRequests = reader[readableStreamDefaultReaderReadRequests];
486 for (let i = 0; i < readRequests.length; ++i) { 543 for (let i = 0; i < readRequests.length; ++i) {
yhirano 2016/05/06 11:26:58 [optional] You can use |for...of|.
tyoshino (SeeGerritForStatus) 2016/05/06 12:40:48 Done.
487 v8.resolvePromise( 544 v8.resolvePromise(
488 readRequests[i], CreateIterResultObject(undefined, true)); 545 readRequests[i], CreateIterResultObject(undefined, true));
546 }
547 reader[readableStreamDefaultReaderReadRequests] = new v8.InternalPackedArr ay();
489 } 548 }
490 reader[readableStreamReaderReadRequests] = new v8.InternalPackedArray();
491 549
492 v8.resolvePromise(reader[readableStreamReaderClosedPromise], undefined); 550 v8.resolvePromise(reader[readableStreamReaderClosedPromise], undefined);
493 } 551 }
494 552
495 function GetReadableStreamDesiredSize(stream) { 553 function ReadableStreamDefaultControllerGetDesiredSize(controller) {
496 const queueSize = GetTotalQueueSize(stream); 554 const queueSize = GetTotalQueueSize(controller);
497 return stream[readableStreamStrategyHWM] - queueSize; 555 return controller[readableStreamDefaultControllerStrategyHWM] - queueSize;
498 } 556 }
499 557
500 function IsReadableStream(x) { 558 function IsReadableStream(x) {
501 return hasOwnProperty(x, readableStreamUnderlyingSource); 559 return hasOwnProperty(x, readableStreamController);
502 } 560 }
503 561
504 function IsReadableStreamDisturbed(stream) { 562 function IsReadableStreamDisturbed(stream) {
505 return stream[readableStreamBits] & DISTURBED; 563 return stream[readableStreamBits] & DISTURBED;
506 } 564 }
507 565
508 function SetReadableStreamDisturbed(stream) { 566 function SetReadableStreamDisturbed(stream) {
509 return stream[readableStreamBits] |= DISTURBED; 567 return stream[readableStreamBits] |= DISTURBED;
510 } 568 }
511 569
512 function IsReadableStreamLocked(stream) { 570 function IsReadableStreamLocked(stream) {
513 return stream[readableStreamReader] !== undefined; 571 return stream[readableStreamReader] !== undefined;
514 } 572 }
515 573
516 function IsReadableStreamController(x) { 574 function IsReadableStreamDefaultController(x) {
517 return hasOwnProperty(x, readableStreamControllerControlledReadableStream); 575 return hasOwnProperty(x, readableStreamDefaultControllerControlledReadableSt ream);
576 }
577
578 function IsReadableStreamDefaultReader(x) {
579 return hasOwnProperty(x, readableStreamDefaultReaderReadRequests);
518 } 580 }
519 581
520 function IsReadableStreamReadable(stream) { 582 function IsReadableStreamReadable(stream) {
521 return stream[readableStreamState] === STATE_READABLE; 583 return ReadableStreamGetState(stream) === STATE_READABLE;
522 } 584 }
523 585
524 function IsReadableStreamClosed(stream) { 586 function IsReadableStreamClosed(stream) {
525 return stream[readableStreamState] === STATE_CLOSED; 587 return ReadableStreamGetState(stream) === STATE_CLOSED;
526 } 588 }
527 589
528 function IsReadableStreamErrored(stream) { 590 function IsReadableStreamErrored(stream) {
529 return stream[readableStreamState] === STATE_ERRORED; 591 return ReadableStreamGetState(stream) === STATE_ERRORED;
530 } 592 }
531 593
532 function IsReadableStreamReader(x) { 594 function ReadableStreamReaderGenericInitialize(reader, stream) {
533 return hasOwnProperty(x, readableStreamReaderOwnerReadableStream); 595 // TODO(yhirano): Remove this when we don't need hasPendingActivity in
596 // blink::UnderlyingSourceBase.
597 const controller = stream[readableStreamController];
598 if (controller[readableStreamDefaultControllerBits] & EXTERNALLY_CONTROLLED) {
599 // The stream is created with an external controller (i.e. made in
600 // Blink).
601 const underlyingSource = controller[readableStreamDefaultControllerUnderly ingSource];
602 callFunction(underlyingSource.notifyLockAcquired, underlyingSource);
603 }
604
605 reader[readableStreamReaderOwnerReadableStream] = stream;
606 stream[readableStreamReader] = reader;
607
608 switch (ReadableStreamGetState(stream)) {
609 case STATE_READABLE:
610 reader[readableStreamReaderClosedPromise] = v8.createPromise();
611 break;
612 case STATE_CLOSED:
613 reader[readableStreamReaderClosedPromise] = Promise_resolve(undefined);
614 break;
615 case STATE_ERRORED:
616 reader[readableStreamReaderClosedPromise] =
617 Promise_reject(stream[readableStreamStoredError]);
618 break;
619 }
534 } 620 }
535 621
536 function ReadFromReadableStreamReader(reader) { 622 function ReadableStreamReaderGenericRelease(reader) {
623 // TODO(yhirano): Remove this when we don't need hasPendingActivity in
624 // blink::UnderlyingSourceBase.
625 const controller = reader[readableStreamReaderOwnerReadableStream][readableS treamController];
626 if (controller[readableStreamDefaultControllerBits] & EXTERNALLY_CONTROLLED) {
627 // The stream is created with an external controller (i.e. made in
628 // Blink).
629 const underlyingSource = controller[readableStreamDefaultControllerUnderly ingSource];
630 callFunction(underlyingSource.notifyLockReleased, underlyingSource);
631 }
632
633 if (ReadableStreamGetState(reader[readableStreamReaderOwnerReadableStream]) === STATE_READABLE) {
634 v8.rejectPromise(reader[readableStreamReaderClosedPromise], new TypeError( errReleasedReaderClosedPromise));
635 } else {
636 reader[readableStreamReaderClosedPromise] = Promise_reject(new TypeError(e rrReleasedReaderClosedPromise));
637 }
638
639 reader[readableStreamReaderOwnerReadableStream][readableStreamReader] =
640 undefined;
641 reader[readableStreamReaderOwnerReadableStream] = undefined;
642 }
643
644 function ReadableStreamDefaultReaderRead(reader) {
537 const stream = reader[readableStreamReaderOwnerReadableStream]; 645 const stream = reader[readableStreamReaderOwnerReadableStream];
538 stream[readableStreamBits] |= DISTURBED; 646 stream[readableStreamBits] |= DISTURBED;
539 647
540 if (stream[readableStreamState] === STATE_CLOSED) { 648 if (ReadableStreamGetState(stream) === STATE_CLOSED) {
541 return Promise_resolve(CreateIterResultObject(undefined, true)); 649 return Promise_resolve(CreateIterResultObject(undefined, true));
542 } 650 }
543 651
544 if (stream[readableStreamState] === STATE_ERRORED) { 652 if (ReadableStreamGetState(stream) === STATE_ERRORED) {
545 return Promise_reject(stream[readableStreamStoredError]); 653 return Promise_reject(stream[readableStreamStoredError]);
546 } 654 }
547 655
548 const queue = stream[readableStreamQueue]; 656 return ReadableStreamDefaultControllerPull(stream[readableStreamController]) ;
549 if (queue.length > 0) {
550 const chunk = DequeueValue(stream);
551
552 if (stream[readableStreamBits] & CLOSE_REQUESTED && queue.length === 0) {
553 FinishClosingReadableStream(stream);
554 } else {
555 RequestReadableStreamPull(stream);
556 }
557
558 return Promise_resolve(CreateIterResultObject(chunk, false));
559 } else {
560 const readRequest = v8.createPromise();
561
562 reader[readableStreamReaderReadRequests].push(readRequest);
563 RequestReadableStreamPull(stream);
564 return readRequest;
565 }
566 } 657 }
567 658
568 function RequestReadableStreamPull(stream) { 659 function ReadableStreamDefaultControllerCallPullIfNeeded(controller) {
569 const shouldPull = ShouldReadableStreamPull(stream); 660 const shouldPull = ReadableStreamDefaultControllerShouldPull(controller);
570 if (shouldPull === false) { 661 if (shouldPull === false) {
571 return undefined; 662 return undefined;
572 } 663 }
573 664
574 if (stream[readableStreamBits] & PULLING) { 665 if (controller[readableStreamDefaultControllerBits] & PULLING) {
575 stream[readableStreamBits] |= PULL_AGAIN; 666 controller[readableStreamDefaultControllerBits] |= PULL_AGAIN;
576 return undefined; 667 return undefined;
577 } 668 }
578 669
579 stream[readableStreamBits] |= PULLING; 670 controller[readableStreamDefaultControllerBits] |= PULLING;
580 671
581 const underlyingSource = stream[readableStreamUnderlyingSource]; 672 const underlyingSource = controller[readableStreamDefaultControllerUnderlyin gSource];
582 const controller = stream[readableStreamController];
583 const pullPromise = PromiseCallOrNoop( 673 const pullPromise = PromiseCallOrNoop(
584 underlyingSource, 'pull', controller, 'underlyingSource.pull'); 674 underlyingSource, 'pull', controller, 'underlyingSource.pull');
585 675
586 thenPromise(pullPromise, 676 thenPromise(pullPromise,
587 () => { 677 () => {
588 stream[readableStreamBits] &= ~PULLING; 678 controller[readableStreamDefaultControllerBits] &= ~PULLING;
589 679
590 if (stream[readableStreamBits] & PULL_AGAIN) { 680 if (controller[readableStreamDefaultControllerBits] & PULL_AGAIN) {
591 stream[readableStreamBits] &= ~PULL_AGAIN; 681 controller[readableStreamDefaultControllerBits] &= ~PULL_AGAIN;
592 return RequestReadableStreamPull(stream); 682 return ReadableStreamDefaultControllerCallPullIfNeeded(controller);
yhirano 2016/05/06 11:26:58 [optional] Is |return| needed?
tyoshino (SeeGerritForStatus) 2016/05/06 12:40:48 Done.
593 } 683 }
594 }, 684 },
595 e => { 685 e => {
596 if (stream[readableStreamState] === STATE_READABLE) { 686 if (ReadableStreamGetState(controller[readableStreamDefaultControllerC ontrolledReadableStream]) === STATE_READABLE) {
597 return ErrorReadableStream(stream, e); 687 return ReadableStreamDefaultControllerError(controller, e);
yhirano 2016/05/06 11:26:58 ditto
tyoshino (SeeGerritForStatus) 2016/05/06 12:40:48 Done.
598 } 688 }
599 }); 689 });
600 } 690 }
601 691
602 function ShouldReadableStreamPull(stream) { 692 function ReadableStreamDefaultControllerShouldPull(controller) {
yhirano 2016/05/06 11:26:58 [optional] Its name is ReadableStreamDefaultContro
tyoshino (SeeGerritForStatus) 2016/05/06 12:40:48 Added Call
603 const state = stream[readableStreamState]; 693 const stream = controller[readableStreamDefaultControllerControlledReadableS tream];
694
695 const state = ReadableStreamGetState(stream);
604 if (state === STATE_CLOSED || state === STATE_ERRORED) { 696 if (state === STATE_CLOSED || state === STATE_ERRORED) {
605 return false; 697 return false;
606 } 698 }
607 699
608 if (stream[readableStreamBits] & CLOSE_REQUESTED) { 700 if (controller[readableStreamDefaultControllerBits] & CLOSE_REQUESTED) {
609 return false; 701 return false;
610 } 702 }
611 703
612 if (!(stream[readableStreamBits] & STARTED)) { 704 if (!(controller[readableStreamDefaultControllerBits] & STARTED)) {
613 return false; 705 return false;
614 } 706 }
615 707
616 if (IsReadableStreamLocked(stream) === true) { 708 if (IsReadableStreamLocked(stream) === true && ReadableStreamGetNumReadReque sts(stream) > 0) {
617 const reader = stream[readableStreamReader]; 709 return true;
618 const readRequests = reader[readableStreamReaderReadRequests];
619 if (readRequests.length > 0) {
620 return true;
621 }
622 } 710 }
623 711
624 const desiredSize = GetReadableStreamDesiredSize(stream); 712 const desiredSize = ReadableStreamDefaultControllerGetDesiredSize(controller );
625 if (desiredSize > 0) { 713 if (desiredSize > 0) {
626 return true; 714 return true;
627 } 715 }
628 716
629 return false; 717 return false;
630 } 718 }
631 719
720 function ReadableStreamGetNumReadRequests(stream) {
721 const reader = stream[readableStreamReader];
722 const readRequests = reader[readableStreamDefaultReaderReadRequests];
723 return readRequests.length;
724 }
725
632 // Potential future optimization: use class instances for the underlying 726 // Potential future optimization: use class instances for the underlying
633 // sources, so that we don't re-create 727 // sources, so that we don't re-create
634 // closures every time. 728 // closures every time.
635 729
636 // TODO(domenic): shouldClone argument from spec not supported yet 730 // TODO(domenic): shouldClone argument from spec not supported yet
637 function TeeReadableStream(stream) { 731 function ReadableStreamTee(stream) {
638 const reader = AcquireReadableStreamReader(stream); 732 const reader = AcquireReadableStreamDefaultReader(stream);
639 733
640 let closedOrErrored = false; 734 let closedOrErrored = false;
641 let canceled1 = false; 735 let canceled1 = false;
642 let canceled2 = false; 736 let canceled2 = false;
643 let reason1; 737 let reason1;
644 let reason2; 738 let reason2;
645 let promise = v8.createPromise(); 739 let promise = v8.createPromise();
646 740
647 const branch1 = new ReadableStream({pull, cancel: cancel1}); 741 const branch1Stream = new ReadableStream({pull, cancel: cancel1});
648 742
649 const branch2 = new ReadableStream({pull, cancel: cancel2}); 743 const branch2Stream = new ReadableStream({pull, cancel: cancel2});
744
745 const branch1 = branch1Stream[readableStreamController];
746 const branch2 = branch2Stream[readableStreamController];
650 747
651 thenPromise( 748 thenPromise(
652 reader[readableStreamReaderClosedPromise], undefined, function(r) { 749 reader[readableStreamReaderClosedPromise], undefined, function(r) {
653 if (closedOrErrored === true) { 750 if (closedOrErrored === true) {
654 return; 751 return;
655 } 752 }
656 753
657 ErrorReadableStream(branch1, r); 754 ReadableStreamDefaultControllerError(branch1, r);
658 ErrorReadableStream(branch2, r); 755 ReadableStreamDefaultControllerError(branch2, r);
659 closedOrErrored = true; 756 closedOrErrored = true;
660 }); 757 });
661 758
662 return [branch1, branch2]; 759 return [branch1Stream, branch2Stream];
663
664 760
665 function pull() { 761 function pull() {
666 return thenPromise( 762 return thenPromise(
667 ReadFromReadableStreamReader(reader), function(result) { 763 ReadableStreamDefaultReaderRead(reader), function(result) {
668 const value = result.value; 764 const value = result.value;
669 const done = result.done; 765 const done = result.done;
670 766
671 if (done === true && closedOrErrored === false) { 767 if (done === true && closedOrErrored === false) {
672 CloseReadableStream(branch1); 768 if (canceled1 === false) {
673 CloseReadableStream(branch2); 769 ReadableStreamDefaultControllerClose(branch1);
770 }
771 if (canceled2 === false) {
772 ReadableStreamDefaultControllerClose(branch2);
773 }
674 closedOrErrored = true; 774 closedOrErrored = true;
675 } 775 }
676 776
677 if (closedOrErrored === true) { 777 if (closedOrErrored === true) {
678 return; 778 return;
679 } 779 }
680 780
681 if (canceled1 === false) { 781 if (canceled1 === false) {
682 EnqueueInReadableStream(branch1, value); 782 ReadableStreamDefaultControllerEnqueue(branch1, value);
683 } 783 }
684 784
685 if (canceled2 === false) { 785 if (canceled2 === false) {
686 EnqueueInReadableStream(branch2, value); 786 ReadableStreamDefaultControllerEnqueue(branch2, value);
687 } 787 }
688 }); 788 });
689 } 789 }
690 790
691 function cancel1(reason) { 791 function cancel1(reason) {
692 canceled1 = true; 792 canceled1 = true;
693 reason1 = reason; 793 reason1 = reason;
694 794
695 if (canceled2 === true) { 795 if (canceled2 === true) {
696 const compositeReason = [reason1, reason2]; 796 const compositeReason = [reason1, reason2];
697 const cancelResult = CancelReadableStream(stream, compositeReason); 797 const cancelResult = ReadableStreamCancel(stream, compositeReason);
698 v8.resolvePromise(promise, cancelResult); 798 v8.resolvePromise(promise, cancelResult);
699 } 799 }
700 800
701 return promise; 801 return promise;
702 } 802 }
703 803
704 function cancel2(reason) { 804 function cancel2(reason) {
705 canceled2 = true; 805 canceled2 = true;
706 reason2 = reason; 806 reason2 = reason;
707 807
708 if (canceled1 === true) { 808 if (canceled1 === true) {
709 const compositeReason = [reason1, reason2]; 809 const compositeReason = [reason1, reason2];
710 const cancelResult = CancelReadableStream(stream, compositeReason); 810 const cancelResult = ReadableStreamCancel(stream, compositeReason);
711 v8.resolvePromise(promise, cancelResult); 811 v8.resolvePromise(promise, cancelResult);
712 } 812 }
713 813
714 return promise; 814 return promise;
715 } 815 }
716 } 816 }
717 817
718 // 818 //
719 // Queue-with-sizes 819 // Queue-with-sizes
720 // Modified from taking the queue (as in the spec) to taking the stream, so we 820 // Modified from taking the queue (as in the spec) to taking the stream, so we
721 // can modify the queue size alongside. 821 // can modify the queue size alongside.
722 // 822 //
723 823
724 function DequeueValue(stream) { 824 function DequeueValue(controller) {
725 const result = stream[readableStreamQueue].shift(); 825 const result = controller[readableStreamDefaultControllerQueue].shift();
726 stream[readableStreamQueueSize] -= result.size; 826 controller[readableStreamDefaultControllerQueueSize] -= result.size;
727 return result.value; 827 return result.value;
728 } 828 }
729 829
730 function EnqueueValueWithSize(stream, value, size) { 830 function EnqueueValueWithSize(controller, value, size) {
731 size = Number(size); 831 size = Number(size);
732 if (Number_isNaN(size) || size === +Infinity || size < 0) { 832 if (Number_isNaN(size) || size === +Infinity || size < 0) {
733 throw new RangeError(errInvalidSize); 833 throw new RangeError(errInvalidSize);
734 } 834 }
735 835
736 stream[readableStreamQueueSize] += size; 836 controller[readableStreamDefaultControllerQueueSize] += size;
737 stream[readableStreamQueue].push({value, size}); 837 controller[readableStreamDefaultControllerQueue].push({value, size});
738 } 838 }
739 839
740 function GetTotalQueueSize(stream) { return stream[readableStreamQueueSize]; } 840 function GetTotalQueueSize(controller) { return controller[readableStreamDefau ltControllerQueueSize]; }
741 841
742 // 842 //
743 // Other helpers 843 // Other helpers
744 // 844 //
745 845
746 function ValidateAndNormalizeQueuingStrategy(size, highWaterMark) { 846 function ValidateAndNormalizeQueuingStrategy(size, highWaterMark) {
747 if (size !== undefined && typeof size !== 'function') { 847 if (size !== undefined && typeof size !== 'function') {
748 throw new TypeError(errSizeNotAFunction); 848 throw new TypeError(errSizeNotAFunction);
749 } 849 }
750 850
(...skipping 29 matching lines...) Expand all
780 method = O[P]; 880 method = O[P];
781 } catch (methodE) { 881 } catch (methodE) {
782 return Promise_reject(methodE); 882 return Promise_reject(methodE);
783 } 883 }
784 884
785 if (method === undefined) { 885 if (method === undefined) {
786 return Promise_resolve(undefined); 886 return Promise_resolve(undefined);
787 } 887 }
788 888
789 if (typeof method !== 'function') { 889 if (typeof method !== 'function') {
790 return Promise_reject(errTmplMustBeFunctionOrUndefined(nameForError)); 890 return Promise_reject(new TypeError(errTmplMustBeFunctionOrUndefined(nameF orError)));
791 } 891 }
792 892
793 try { 893 try {
794 return Promise_resolve(callFunction(method, O, arg)); 894 return Promise_resolve(callFunction(method, O, arg));
795 } catch (e) { 895 } catch (e) {
796 return Promise_reject(e); 896 return Promise_reject(e);
797 } 897 }
798 } 898 }
799 899
800 function CreateIterResultObject(value, done) { return {value, done}; } 900 function CreateIterResultObject(value, done) { return {value, done}; }
801 901
802 902
803 // 903 //
804 // Additions to the global 904 // Additions to the global
805 // 905 //
806 906
807 defineProperty(global, 'ReadableStream', { 907 defineProperty(global, 'ReadableStream', {
808 value: ReadableStream, 908 value: ReadableStream,
809 enumerable: false, 909 enumerable: false,
810 configurable: true, 910 configurable: true,
811 writable: true 911 writable: true
812 }); 912 });
813 913
814 // 914 //
815 // Exports to Blink 915 // Exports to Blink
816 // 916 //
817 917
818 binding.AcquireReadableStreamReader = AcquireReadableStreamReader; 918 binding.AcquireReadableStreamDefaultReader = AcquireReadableStreamDefaultReade r;
819 binding.IsReadableStream = IsReadableStream; 919 binding.IsReadableStream = IsReadableStream;
820 binding.IsReadableStreamDisturbed = IsReadableStreamDisturbed; 920 binding.IsReadableStreamDisturbed = IsReadableStreamDisturbed;
821 binding.SetReadableStreamDisturbed = SetReadableStreamDisturbed; 921 binding.SetReadableStreamDisturbed = SetReadableStreamDisturbed;
822 binding.IsReadableStreamLocked = IsReadableStreamLocked; 922 binding.IsReadableStreamLocked = IsReadableStreamLocked;
823 binding.IsReadableStreamReadable = IsReadableStreamReadable; 923 binding.IsReadableStreamReadable = IsReadableStreamReadable;
824 binding.IsReadableStreamClosed = IsReadableStreamClosed; 924 binding.IsReadableStreamClosed = IsReadableStreamClosed;
825 binding.IsReadableStreamErrored = IsReadableStreamErrored; 925 binding.IsReadableStreamErrored = IsReadableStreamErrored;
826 binding.IsReadableStreamReader = IsReadableStreamReader; 926 binding.IsReadableStreamDefaultReader = IsReadableStreamDefaultReader;
827 binding.ReadFromReadableStreamReader = ReadFromReadableStreamReader; 927 binding.ReadableStreamDefaultReaderRead = ReadableStreamDefaultReaderRead;
828 928
829 binding.CloseReadableStream = CloseReadableStream; 929 binding.ReadableStreamDefaultControllerClose = ReadableStreamDefaultController Close;
830 binding.GetReadableStreamDesiredSize = GetReadableStreamDesiredSize; 930 binding.ReadableStreamDefaultControllerGetDesiredSize = ReadableStreamDefaultC ontrollerGetDesiredSize;
831 binding.EnqueueInReadableStream = EnqueueInReadableStream; 931 binding.ReadableStreamDefaultControllerEnqueue = ReadableStreamDefaultControll erEnqueue;
832 binding.ErrorReadableStream = ErrorReadableStream; 932 binding.ReadableStreamDefaultControllerError = ReadableStreamDefaultController Error;
833 933
834 binding.createReadableStreamWithExternalController = 934 binding.createReadableStreamWithExternalController =
835 (underlyingSource, strategy) => { 935 (underlyingSource, strategy) => {
836 return new ReadableStream( 936 return new ReadableStream(
837 underlyingSource, strategy, createWithExternalControllerSentinel); 937 underlyingSource, strategy, createWithExternalControllerSentinel);
838 }; 938 };
839 }); 939 });
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698