Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(261)

Side by Side Diff: third_party/WebKit/Source/core/streams/ReadableStream.js

Issue 1902673003: Reflect recent spec changes to V8 Extra ReadableStream impl (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Updated UnderlyingSourceBase Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 (function(global, binding, v8) { 5 (function(global, binding, v8) {
6 'use strict'; 6 'use strict';
7 7
8 const readableStreamReader = v8.createPrivateSymbol('[[reader]]');
9 const readableStreamStoredError = v8.createPrivateSymbol('[[storedError]]');
8 const readableStreamController = v8.createPrivateSymbol('[[controller]]'); 10 const readableStreamController = v8.createPrivateSymbol('[[controller]]');
9 const readableStreamQueue = v8.createPrivateSymbol('[[queue]]');
10 const readableStreamQueueSize =
11 v8.createPrivateSymbol('[[queue]] total size');
12 const readableStreamReader = v8.createPrivateSymbol('[[reader]]');
13 const readableStreamState = v8.createPrivateSymbol('[[state]]');
14 const readableStreamStoredError = v8.createPrivateSymbol('[[storedError]]');
15 const readableStreamStrategySize = v8.createPrivateSymbol('[[strategySize]]');
16 const readableStreamStrategyHWM = v8.createPrivateSymbol('[[strategyHWM]]');
17 const readableStreamUnderlyingSource =
18 v8.createPrivateSymbol('[[underlyingSource]]');
19
20 const readableStreamControllerControlledReadableStream =
21 v8.createPrivateSymbol('[[controlledReadableStream]]');
22 11
23 const readableStreamReaderClosedPromise = 12 const readableStreamReaderClosedPromise =
24 v8.createPrivateSymbol('[[closedPromise]]'); 13 v8.createPrivateSymbol('[[closedPromise]]');
25 const readableStreamReaderOwnerReadableStream = 14 const readableStreamReaderOwnerReadableStream =
26 v8.createPrivateSymbol('[[ownerReadableStream]]'); 15 v8.createPrivateSymbol('[[ownerReadableStream]]');
27 const readableStreamReaderReadRequests = 16
17 const readableStreamDefaultReaderReadRequests =
28 v8.createPrivateSymbol('[[readRequests]]'); 18 v8.createPrivateSymbol('[[readRequests]]');
29 19
30 const createWithExternalControllerSentinel = 20 const createWithExternalControllerSentinel =
31 v8.createPrivateSymbol('flag for UA-created ReadableStream to pass'); 21 v8.createPrivateSymbol('flag for UA-created ReadableStream to pass');
32 22
23 const readableStreamBits = v8.createPrivateSymbol('bit field for [[state]] and [[disturbed]]');
24 const DISTURBED = 0b1;
25 // The 2nd and 3rd bit are for [[state]].
26 const STATE_MASK = 0b110;
27 const STATE_BITS_OFFSET = 1;
33 const STATE_READABLE = 0; 28 const STATE_READABLE = 0;
34 const STATE_CLOSED = 1; 29 const STATE_CLOSED = 1;
35 const STATE_ERRORED = 2; 30 const STATE_ERRORED = 2;
36 31
37 const readableStreamBits = v8.createPrivateSymbol( 32 const readableStreamDefaultControllerUnderlyingSource =
38 'bit field for [[started]], [[closeRequested]], [[pulling]], [[pullAgain]] , [[disturbed]]'); 33 v8.createPrivateSymbol('[[underlyingSource]]');
34 const readableStreamDefaultControllerControlledReadableStream =
35 v8.createPrivateSymbol('[[controlledReadableStream]]');
36 const readableStreamDefaultControllerQueue = v8.createPrivateSymbol('[[queue]] ');
37 const readableStreamDefaultControllerQueueSize =
38 v8.createPrivateSymbol('[[queue]] total size');
39 const readableStreamDefaultControllerStrategySize =
40 v8.createPrivateSymbol('[[strategySize]]');
41 const readableStreamDefaultControllerStrategyHWM =
42 v8.createPrivateSymbol('[[strategyHWM]]');
43
44 const readableStreamDefaultControllerBits = v8.createPrivateSymbol(
45 'bit field for [[started]], [[closeRequested]], [[pulling]], [[pullAgain]] ');
39 const STARTED = 0b1; 46 const STARTED = 0b1;
40 const CLOSE_REQUESTED = 0b10; 47 const CLOSE_REQUESTED = 0b10;
41 const PULLING = 0b100; 48 const PULLING = 0b100;
42 const PULL_AGAIN = 0b1000; 49 const PULL_AGAIN = 0b1000;
43 const DISTURBED = 0b10000; 50 const EXTERNALLY_CONTROLLED = 0b10000;
51
52 const readableStreamControllerCancel =
53 v8.createPrivateSymbol('[[InternalCancel]]');
54 const readableStreamControllerPull = v8.createPrivateSymbol('[[InternalPull]]' );
44 55
45 const undefined = global.undefined; 56 const undefined = global.undefined;
46 const Infinity = global.Infinity; 57 const Infinity = global.Infinity;
47 58
48 const defineProperty = global.Object.defineProperty; 59 const defineProperty = global.Object.defineProperty;
49 const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty); 60 const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty);
50 const callFunction = v8.uncurryThis(global.Function.prototype.call); 61 const callFunction = v8.uncurryThis(global.Function.prototype.call);
51 62
52 const TypeError = global.TypeError; 63 const TypeError = global.TypeError;
53 const RangeError = global.RangeError; 64 const RangeError = global.RangeError;
54 65
55 const Number = global.Number; 66 const Number = global.Number;
56 const Number_isNaN = Number.isNaN; 67 const Number_isNaN = Number.isNaN;
57 const Number_isFinite = Number.isFinite; 68 const Number_isFinite = Number.isFinite;
58 69
59 const Promise = global.Promise; 70 const Promise = global.Promise;
60 const thenPromise = v8.uncurryThis(Promise.prototype.then); 71 const thenPromise = v8.uncurryThis(Promise.prototype.then);
61 const Promise_resolve = v8.simpleBind(Promise.resolve, Promise); 72 const Promise_resolve = v8.simpleBind(Promise.resolve, Promise);
62 const Promise_reject = v8.simpleBind(Promise.reject, Promise); 73 const Promise_reject = v8.simpleBind(Promise.reject, Promise);
63 74
64 const errIllegalInvocation = 'Illegal invocation'; 75 const errIllegalInvocation = 'Illegal invocation';
65 const errIllegalConstructor = 'Illegal constructor'; 76 const errIllegalConstructor = 'Illegal constructor';
66 const errCancelLockedStream = 77 const errCancelLockedStream =
67 'Cannot cancel a readable stream that is locked to a reader'; 78 'Cannot cancel a readable stream that is locked to a reader';
68 const errEnqueueInCloseRequestedStream = 79 const errEnqueueCloseRequestedStream =
69 'Cannot enqueue a chunk into a readable stream that is closed or has been requested to be closed'; 80 'Cannot enqueue a chunk into a readable stream that is closed or has been requested to be closed';
70 const errCancelReleasedReader = 81 const errCancelReleasedReader =
71 'This readable stream reader has been released and cannot be used to cance l its previous owner stream'; 82 'This readable stream reader has been released and cannot be used to cance l its previous owner stream';
72 const errReadReleasedReader = 83 const errReadReleasedReader =
73 'This readable stream reader has been released and cannot be used to read from its previous owner stream'; 84 'This readable stream reader has been released and cannot be used to read from its previous owner stream';
74 const errCloseCloseRequestedStream = 85 const errCloseCloseRequestedStream =
75 'Cannot close a readable stream that has already been requested to be clos ed'; 86 'Cannot close a readable stream that has already been requested to be clos ed';
87 const errEnqueueClosedStream = 'Cannot enqueue a chunk into a closed readable stream';
88 const errEnqueueErroredStream = 'Cannot enqueue a chunk into an errored readab le stream';
89 const errCloseClosedStream = 'Cannot close a closed readable stream';
76 const errCloseErroredStream = 'Cannot close an errored readable stream'; 90 const errCloseErroredStream = 'Cannot close an errored readable stream';
77 const errErrorClosedStream = 'Cannot error a close readable stream'; 91 const errErrorClosedStream = 'Cannot error a close readable stream';
78 const errErrorErroredStream = 92 const errErrorErroredStream =
79 'Cannot error a readable stream that is already errored'; 93 'Cannot error a readable stream that is already errored';
94 const errGetReaderNotByteStream = 'This readable stream does not support BYOB readers';
95 const errGetReaderBadMode = 'Invalid reader mode given: expected undefined or "byob"';
80 const errReaderConstructorBadArgument = 96 const errReaderConstructorBadArgument =
81 'ReadableStreamReader constructor argument is not a readable stream'; 97 'ReadableStreamReader constructor argument is not a readable stream';
82 const errReaderConstructorStreamAlreadyLocked = 98 const errReaderConstructorStreamAlreadyLocked =
83 'ReadableStreamReader constructor can only accept readable streams that ar e not yet locked to a reader'; 99 'ReadableStreamReader constructor can only accept readable streams that ar e not yet locked to a reader';
84 const errReleaseReaderWithPendingRead = 100 const errReleaseReaderWithPendingRead =
85 'Cannot release a readable stream reader when it still has outstanding rea d() calls that have not yet settled'; 101 'Cannot release a readable stream reader when it still has outstanding rea d() calls that have not yet settled';
86 const errReleasedReaderClosedPromise = 102 const errReleasedReaderClosedPromise =
87 'This readable stream reader has been released and cannot be used to monit or the stream\'s state'; 103 'This readable stream reader has been released and cannot be used to monit or the stream\'s state';
88 const errInvalidSize = 104 const errInvalidSize =
89 'The return value of a queuing strategy\'s size function must be a finite, non-NaN, non-negative number'; 105 'The return value of a queuing strategy\'s size function must be a finite, non-NaN, non-negative number';
90 const errSizeNotAFunction = 106 const errSizeNotAFunction =
91 'A queuing strategy\'s size property must be a function'; 107 'A queuing strategy\'s size property must be a function';
92 const errInvalidHWM = 108 const errInvalidHWM =
93 'A queueing strategy\'s highWaterMark property must be a nonnegative, non- NaN number'; 109 'A queueing strategy\'s highWaterMark property must be a nonnegative, non- NaN number';
94 const errTmplMustBeFunctionOrUndefined = name => 110 const errTmplMustBeFunctionOrUndefined = name =>
95 `${name} must be a function or undefined`; 111 `${name} must be a function or undefined`;
96 112
97 class ReadableStream { 113 class ReadableStream {
98 constructor() { 114 constructor() {
99 // TODO(domenic): when V8 gets default parameters and destructuring, all 115 // TODO(domenic): when V8 gets default parameters and destructuring, all
100 // this can be cleaned up. 116 // this can be cleaned up.
101 const underlyingSource = arguments[0] === undefined ? {} : arguments[0]; 117 const underlyingSource = arguments[0] === undefined ? {} : arguments[0];
102 const strategy = arguments[1] === undefined ? {} : arguments[1]; 118 const strategy = arguments[1] === undefined ? {} : arguments[1];
103 const size = strategy.size; 119 const size = strategy.size;
104 let highWaterMark = strategy.highWaterMark; 120 let highWaterMark = strategy.highWaterMark;
105 if (highWaterMark === undefined) { 121 if (highWaterMark === undefined) {
106 highWaterMark = 1; 122 highWaterMark = 1;
107 } 123 }
108 124
109 const normalizedStrategy = 125 this[readableStreamBits] = 0b0 | STATE_READABLE;
yhirano 2016/05/02 13:17:03 STATE_READBLE << OFFSET?
tyoshino (SeeGerritForStatus) 2016/05/06 06:24:57 Changed to use ReadableStreamSetState()
110 ValidateAndNormalizeQueuingStrategy(size, highWaterMark);
111
112 this[readableStreamUnderlyingSource] = underlyingSource;
113
114 this[readableStreamQueue] = new v8.InternalPackedArray();
115 this[readableStreamQueueSize] = 0;
116
117 this[readableStreamState] = STATE_READABLE;
118 this[readableStreamBits] = 0b0;
119 this[readableStreamReader] = undefined; 126 this[readableStreamReader] = undefined;
120 this[readableStreamStoredError] = undefined; 127 this[readableStreamStoredError] = undefined;
121 128
122 this[readableStreamStrategySize] = normalizedStrategy.size;
123 this[readableStreamStrategyHWM] = normalizedStrategy.highWaterMark;
124
125 // Avoid allocating the controller if the stream is going to be controlled 129 // Avoid allocating the controller if the stream is going to be controlled
126 // externally (i.e. from C++) anyway. All calls to underlyingSource 130 // externally (i.e. from C++) anyway. All calls to underlyingSource
127 // methods will disregard their controller argument in such situations 131 // methods will disregard their controller argument in such situations
128 // (but see below). 132 // (but see below).
129 133
130 const isControlledExternally = 134 this[readableStreamController] = undefined;
131 arguments[2] === createWithExternalControllerSentinel;
132 const controller =
133 isControlledExternally ? null : new ReadableStreamController(this);
134 this[readableStreamController] = controller;
135 135
136 // We need to pass ourself to the underlyingSource start method for 136 const type = underlyingSource.type;
137 // externally-controlled streams. We use the now-useless controller 137 const typeString = String(type);
138 // argument to do so. 138 if (typeString === 'bytes') {
139 const argToStart = isControlledExternally ? this : controller; 139 throw new RangeError('bytes type is not yet implemented');
140 } else if (type !== undefined) {
141 throw new RangeError('Invalid type is specified');
142 }
140 143
141 const startResult = CallOrNoop( 144 this[readableStreamController] =
142 underlyingSource, 'start', argToStart, 'underlyingSource.start'); 145 new ReadableStreamDefaultController(this, underlyingSource, size, high WaterMark, arguments[2] === createWithExternalControllerSentinel);
143 thenPromise(Promise_resolve(startResult),
144 () => {
145 this[readableStreamBits] |= STARTED;
146 RequestReadableStreamPull(this);
147 },
148 r => {
149 if (this[readableStreamState] === STATE_READABLE) {
150 return ErrorReadableStream(this, r);
151 }
152 });
153 } 146 }
154 147
155 get locked() { 148 get locked() {
156 if (IsReadableStream(this) === false) { 149 if (IsReadableStream(this) === false) {
157 throw new TypeError(errIllegalInvocation); 150 throw new TypeError(errIllegalInvocation);
158 } 151 }
159 152
160 return IsReadableStreamLocked(this); 153 return IsReadableStreamLocked(this);
161 } 154 }
162 155
163 cancel(reason) { 156 cancel(reason) {
164 if (IsReadableStream(this) === false) { 157 if (IsReadableStream(this) === false) {
165 return Promise_reject(new TypeError(errIllegalInvocation)); 158 return Promise_reject(new TypeError(errIllegalInvocation));
166 } 159 }
167 160
168 if (IsReadableStreamLocked(this) === true) { 161 if (IsReadableStreamLocked(this) === true) {
169 return Promise_reject(new TypeError(errCancelLockedStream)); 162 return Promise_reject(new TypeError(errCancelLockedStream));
170 } 163 }
171 164
172 return CancelReadableStream(this, reason); 165 return ReadableStreamCancel(this, reason);
173 } 166 }
174 167
175 getReader() { 168 getReader({ mode } = {}) {
176 if (IsReadableStream(this) === false) { 169 if (IsReadableStream(this) === false) {
177 throw new TypeError(errIllegalInvocation); 170 throw new TypeError(errIllegalInvocation);
178 } 171 }
179 172
180 return AcquireReadableStreamReader(this); 173 if (mode === 'byob') {
174 if (IsReadableByteStreamDefaultController(this[readableStreamController] ) === false) {
175 throw new TypeError(errGetReaderNotByteStream);
176 }
177
178 return AcquireReadableStreamBYOBReader(this);
179 }
180
181 if (mode === undefined) {
182 return AcquireReadableStreamDefaultReader(this);
183 }
184
185 throw new RangeError(errGetReaderBadMode);;
yhirano 2016/05/02 13:17:03 ;;
tyoshino (SeeGerritForStatus) 2016/05/06 06:24:57 Done.
181 } 186 }
182 187
183 tee() { 188 tee() {
184 if (IsReadableStream(this) === false) { 189 if (IsReadableStream(this) === false) {
185 throw new TypeError(errIllegalInvocation); 190 throw new TypeError(errIllegalInvocation);
186 } 191 }
187 192
188 return TeeReadableStream(this); 193 return ReadableStreamTee(this);
189 } 194 }
190 } 195 }
191 196
192 class ReadableStreamController { 197 class ReadableStreamDefaultController {
193 constructor(stream) { 198 constructor(stream, underlyingSource, size, highWaterMark, isExternallyContr olled) {
194 if (IsReadableStream(stream) === false) { 199 if (IsReadableStream(stream) === false) {
195 throw new TypeError(errIllegalConstructor); 200 throw new TypeError(errIllegalConstructor);
196 } 201 }
197 202
198 if (stream[readableStreamController] !== undefined) { 203 if (stream[readableStreamController] !== undefined) {
199 throw new TypeError(errIllegalConstructor); 204 throw new TypeError(errIllegalConstructor);
200 } 205 }
201 206
202 this[readableStreamControllerControlledReadableStream] = stream; 207 this[readableStreamDefaultControllerControlledReadableStream] = stream;
208
209 this[readableStreamDefaultControllerUnderlyingSource] = underlyingSource;
210
211 this[readableStreamDefaultControllerQueue] = new v8.InternalPackedArray();
212 this[readableStreamDefaultControllerQueueSize] = 0;
213
214 this[readableStreamDefaultControllerBits] = 0b0;
215 if (isExternallyControlled === true) {
216 this[readableStreamDefaultControllerBits] |= EXTERNALLY_CONTROLLED;
217 }
218
219 const normalizedStrategy =
220 ValidateAndNormalizeQueuingStrategy(size, highWaterMark);
221 this[readableStreamDefaultControllerStrategySize] = normalizedStrategy.siz e;
222 this[readableStreamDefaultControllerStrategyHWM] = normalizedStrategy.high WaterMark;
223
224 const controller = this;
225
226 const startResult = CallOrNoop(
227 underlyingSource, 'start', this, 'underlyingSource.start');
228 thenPromise(Promise_resolve(startResult),
229 () => {
230 controller[readableStreamDefaultControllerBits] |= STARTED;
231 ReadableStreamDefaultControllerCallPullIfNeeded(controller);
232 },
233 r => {
234 if ((stream[readableStreamBits] & STATE_MASK) >> 1 === STATE_READABL E) {
yhirano 2016/05/02 13:17:03 ReadableStreamGetState(this) or s/1/STATE_BITS_OFF
tyoshino (SeeGerritForStatus) 2016/05/06 06:24:56 Replaced with ReadableStreamGetState() call.
235 ReadableStreamDefaultControllerError(controller, r);
236 }
237 });
203 } 238 }
204 239
205 get desiredSize() { 240 get desiredSize() {
206 if (IsReadableStreamController(this) === false) { 241 if (IsReadableStreamDefaultController(this) === false) {
207 throw new TypeError(errIllegalInvocation); 242 throw new TypeError(errIllegalInvocation);
208 } 243 }
209 244
210 return GetReadableStreamDesiredSize( 245 return ReadableStreamDefaultControllerGetDesiredSize(this);
211 this[readableStreamControllerControlledReadableStream]);
212 } 246 }
213 247
214 close() { 248 close() {
215 if (IsReadableStreamController(this) === false) { 249 if (IsReadableStreamDefaultController(this) === false) {
216 throw new TypeError(errIllegalInvocation); 250 throw new TypeError(errIllegalInvocation);
217 } 251 }
218 252
219 const stream = this[readableStreamControllerControlledReadableStream]; 253 const stream = this[readableStreamDefaultControllerControlledReadableStrea m];
220 254
221 if (stream[readableStreamBits] & CLOSE_REQUESTED) { 255 if (this[readableStreamDefaultControllerBits] & CLOSE_REQUESTED) {
222 throw new TypeError(errCloseCloseRequestedStream); 256 throw new TypeError(errCloseCloseRequestedStream);
223 } 257 }
224 if (stream[readableStreamState] === STATE_ERRORED) { 258
259 const state = (stream[readableStreamBits] & STATE_MASK) >> 1;
yhirano 2016/05/02 13:17:04 ditto
tyoshino (SeeGerritForStatus) 2016/05/06 06:24:56 Replaced with ReadableStreamGetState() call.
260 if (state === STATE_ERRORED) {
225 throw new TypeError(errCloseErroredStream); 261 throw new TypeError(errCloseErroredStream);
226 } 262 }
263 if (state === STATE_CLOSED) {
264 throw new TypeError(errCloseClosedStream);
265 }
227 266
228 return CloseReadableStream(stream); 267 return ReadableStreamDefaultControllerClose(this);
229 } 268 }
230 269
231 enqueue(chunk) { 270 enqueue(chunk) {
232 if (IsReadableStreamController(this) === false) { 271 if (IsReadableStreamDefaultController(this) === false) {
233 throw new TypeError(errIllegalInvocation); 272 throw new TypeError(errIllegalInvocation);
234 } 273 }
235 274
236 const stream = this[readableStreamControllerControlledReadableStream]; 275 const stream = this[readableStreamDefaultControllerControlledReadableStrea m];
237 276
238 if (stream[readableStreamState] === STATE_ERRORED) { 277 if (this[readableStreamDefaultControllerBits] & CLOSE_REQUESTED) {
239 throw stream[readableStreamStoredError]; 278 throw new TypeError(errEnqueueCloseRequestedStream);
240 } 279 }
241 280
242 if (stream[readableStreamBits] & CLOSE_REQUESTED) { 281 const state = (stream[readableStreamBits] & STATE_MASK) >> 1;
yhirano 2016/05/02 13:17:04 ditto
tyoshino (SeeGerritForStatus) 2016/05/06 06:24:57 Replaced with ReadableStreamGetState() call.
243 throw new TypeError(errEnqueueInCloseRequestedStream); 282 if (state === STATE_ERRORED) {
283 throw new TypeError(errEnqueueErroredStream);
284 }
285 if (state === STATE_CLOSED) {
286 throw new TypeError(errEnqueueClosedStream);
244 } 287 }
245 288
246 return EnqueueInReadableStream(stream, chunk); 289 return ReadableStreamDefaultControllerEnqueue(this, chunk);
247 } 290 }
248 291
249 error(e) { 292 error(e) {
250 if (IsReadableStreamController(this) === false) { 293 if (IsReadableStreamDefaultController(this) === false) {
251 throw new TypeError(errIllegalInvocation); 294 throw new TypeError(errIllegalInvocation);
252 } 295 }
253 296
254 const stream = this[readableStreamControllerControlledReadableStream]; 297 const stream = this[readableStreamDefaultControllerControlledReadableStrea m];
255 298
256 const state = stream[readableStreamState]; 299 const state = (stream[readableStreamBits] & STATE_MASK) >> 1;
yhirano 2016/05/02 13:17:04 ditto
tyoshino (SeeGerritForStatus) 2016/05/06 06:24:57 Replaced with ReadableStreamGetState() call.
257 if (state !== STATE_READABLE) { 300 if (state === STATE_ERRORED) {
258 if (state === STATE_ERRORED) { 301 throw new TypeError(errErrorErroredStream);
259 throw new TypeError(errErrorErroredStream); 302 }
260 } 303 if (state === STATE_CLOSED) {
261 if (state === STATE_CLOSED) { 304 throw new TypeError(errErrorClosedStream);
262 throw new TypeError(errErrorClosedStream);
263 }
264 } 305 }
265 306
266 return ErrorReadableStream(stream, e); 307 return ReadableStreamDefaultControllerError(this, e);
267 } 308 }
268 } 309 }
269 310
270 class ReadableStreamReader { 311 function ReadableStreamDefaultControllerCancel(controller, reason) {
312 controller[readableStreamDefaultControllerQueue] = new v8.InternalPackedArra y();
313
314 const underlyingSource = controller[readableStreamDefaultControllerUnderlyin gSource];
315 return PromiseCallOrNoop(underlyingSource, 'cancel', reason, 'underlyingSour ce.cancel');
316 }
317
318 function ReadableStreamDefaultControllerPull(controller) {
319 const stream = controller[readableStreamDefaultControllerControlledReadableS tream];
320
321 if (controller[readableStreamDefaultControllerQueue].length > 0) {
322 const chunk = DequeueValue(controller);
323
324 if (controller[readableStreamDefaultControllerBits] & CLOSE_REQUESTED && c ontroller[readableStreamDefaultControllerQueue].length === 0) {
yhirano 2016/05/02 13:17:04 Can you add a pair of parenthesis around (controll
tyoshino (SeeGerritForStatus) 2016/05/06 06:24:57 Done.
325 ReadableStreamClose(stream);
326 } else {
327 ReadableStreamDefaultControllerCallPullIfNeeded(controller);
328 }
329
330 return Promise_resolve(CreateIterResultObject(chunk, false));
331 }
332
333 const pendingPromise = ReadableStreamAddReadRequest(stream);
334 ReadableStreamDefaultControllerCallPullIfNeeded(controller);
335 return pendingPromise;
336 }
337
338 function ReadableStreamAddReadRequest(stream) {
339 const promise = v8.createPromise();
340 stream[readableStreamReader][readableStreamDefaultReaderReadRequests].push(p romise);
341 return promise;
342 }
343
344 class ReadableStreamDefaultReader {
271 constructor(stream) { 345 constructor(stream) {
272 if (IsReadableStream(stream) === false) { 346 if (IsReadableStream(stream) === false) {
273 throw new TypeError(errReaderConstructorBadArgument); 347 throw new TypeError(errReaderConstructorBadArgument);
274 } 348 }
275 if (IsReadableStreamLocked(stream) === true) { 349 if (IsReadableStreamLocked(stream) === true) {
276 throw new TypeError(errReaderConstructorStreamAlreadyLocked); 350 throw new TypeError(errReaderConstructorStreamAlreadyLocked);
277 } 351 }
278 352
279 // TODO(yhirano): Remove this when we don't need hasPendingActivity in 353 ReadableStreamReaderGenericInitialize(this, stream);
280 // blink::UnderlyingSourceBase.
281 if (stream[readableStreamController] === null) {
282 // The stream is created with an external controller (i.e. made in
283 // Blink).
284 const underlyingSource = stream[readableStreamUnderlyingSource];
285 callFunction(underlyingSource.notifyLockAcquired, underlyingSource);
286 }
287 354
288 this[readableStreamReaderOwnerReadableStream] = stream; 355 this[readableStreamDefaultReaderReadRequests] = new v8.InternalPackedArray ();
289 stream[readableStreamReader] = this;
290
291 this[readableStreamReaderReadRequests] = new v8.InternalPackedArray();
292
293 switch (stream[readableStreamState]) {
294 case STATE_READABLE:
295 this[readableStreamReaderClosedPromise] = v8.createPromise();
296 break;
297 case STATE_CLOSED:
298 this[readableStreamReaderClosedPromise] = Promise_resolve(undefined);
299 break;
300 case STATE_ERRORED:
301 this[readableStreamReaderClosedPromise] =
302 Promise_reject(stream[readableStreamStoredError]);
303 break;
304 }
305 } 356 }
306 357
307 get closed() { 358 get closed() {
308 if (IsReadableStreamReader(this) === false) { 359 if (IsReadableStreamDefaultReader(this) === false) {
309 return Promise_reject(new TypeError(errIllegalInvocation)); 360 return Promise_reject(new TypeError(errIllegalInvocation));
310 } 361 }
311 362
312 return this[readableStreamReaderClosedPromise]; 363 return this[readableStreamReaderClosedPromise];
313 } 364 }
314 365
315 cancel(reason) { 366 cancel(reason) {
316 if (IsReadableStreamReader(this) === false) { 367 if (IsReadableStreamDefaultReader(this) === false) {
317 return Promise_reject(new TypeError(errIllegalInvocation)); 368 return Promise_reject(new TypeError(errIllegalInvocation));
318 } 369 }
319 370
320 const stream = this[readableStreamReaderOwnerReadableStream]; 371 const stream = this[readableStreamReaderOwnerReadableStream];
321 if (stream === undefined) { 372 if (stream === undefined) {
322 return Promise_reject(new TypeError(errCancelReleasedReader)); 373 return Promise_reject(new TypeError(errCancelReleasedReader));
323 } 374 }
324 375
325 return CancelReadableStream(stream, reason); 376 return ReadableStreamReaderGenericCancel(this, reason);
326 } 377 }
327 378
328 read() { 379 read() {
329 if (IsReadableStreamReader(this) === false) { 380 if (IsReadableStreamDefaultReader(this) === false) {
330 return Promise_reject(new TypeError(errIllegalInvocation)); 381 return Promise_reject(new TypeError(errIllegalInvocation));
331 } 382 }
332 383
333 if (this[readableStreamReaderOwnerReadableStream] === undefined) { 384 if (this[readableStreamReaderOwnerReadableStream] === undefined) {
334 return Promise_reject(new TypeError(errReadReleasedReader)); 385 return Promise_reject(new TypeError(errReadReleasedReader));
335 } 386 }
336 387
337 return ReadFromReadableStreamReader(this); 388 return ReadableStreamDefaultReaderRead(this);
338 } 389 }
339 390
340 releaseLock() { 391 releaseLock() {
341 if (IsReadableStreamReader(this) === false) { 392 if (IsReadableStreamDefaultReader(this) === false) {
342 throw new TypeError(errIllegalInvocation); 393 throw new TypeError(errIllegalInvocation);
343 } 394 }
344 395
345 const stream = this[readableStreamReaderOwnerReadableStream]; 396 const stream = this[readableStreamReaderOwnerReadableStream];
346 if (stream === undefined) { 397 if (stream === undefined) {
347 return undefined; 398 return undefined;
348 } 399 }
349 400
350 if (this[readableStreamReaderReadRequests].length > 0) { 401 if (this[readableStreamDefaultReaderReadRequests].length > 0) {
351 throw new TypeError(errReleaseReaderWithPendingRead); 402 throw new TypeError(errReleaseReaderWithPendingRead);
352 } 403 }
353 404
354 // TODO(yhirano): Remove this when we don't need hasPendingActivity in 405 ReadableStreamReaderGenericRelease(this);
355 // blink::UnderlyingSourceBase. 406 }
356 if (stream[readableStreamController] === null) { 407 }
357 // The stream is created with an external controller (i.e. made in
358 // Blink).
359 const underlyingSource = stream[readableStreamUnderlyingSource];
360 callFunction(underlyingSource.notifyLockReleased, underlyingSource);
361 }
362 408
363 if (stream[readableStreamState] === STATE_READABLE) { 409 function ReadableStreamReaderGenericCancel(reader, reason) {
364 v8.rejectPromise(this[readableStreamReaderClosedPromise], 410 return ReadableStreamCancel(reader[readableStreamReaderOwnerReadableStream], reason);
365 new TypeError(errReleasedReaderClosedPromise));
366 } else {
367 this[readableStreamReaderClosedPromise] =
368 Promise_reject(new TypeError(errReleasedReaderClosedPromise));
369 }
370
371 this[readableStreamReaderOwnerReadableStream][readableStreamReader] =
372 undefined;
373 this[readableStreamReaderOwnerReadableStream] = undefined;
374 }
375 } 411 }
376 412
377 // 413 //
378 // Readable stream abstract operations 414 // Readable stream abstract operations
379 // 415 //
380 416
381 function AcquireReadableStreamReader(stream) { 417 function AcquireReadableStreamDefaultReader(stream) {
382 return new ReadableStreamReader(stream); 418 return new ReadableStreamDefaultReader(stream);
383 } 419 }
384 420
385 function CancelReadableStream(stream, reason) { 421 function ReadableStreamCancel(stream, reason) {
386 stream[readableStreamBits] |= DISTURBED; 422 stream[readableStreamBits] |= DISTURBED;
387 423
388 const state = stream[readableStreamState]; 424 const state = ReadableStreamGetState(stream);
389 if (state === STATE_CLOSED) { 425 if (state === STATE_CLOSED) {
390 return Promise_resolve(undefined); 426 return Promise_resolve(undefined);
391 } 427 }
392 if (state === STATE_ERRORED) { 428 if (state === STATE_ERRORED) {
393 return Promise_reject(stream[readableStreamStoredError]); 429 return Promise_reject(stream[readableStreamStoredError]);
394 } 430 }
395 431
396 stream[readableStreamQueue] = new v8.InternalPackedArray(); 432 ReadableStreamClose(stream);
397 FinishClosingReadableStream(stream);
398 433
399 const underlyingSource = stream[readableStreamUnderlyingSource]; 434 const sourceCancelPromise = ReadableStreamDefaultControllerCancel(stream[rea dableStreamController], reason);
400 const sourceCancelPromise = PromiseCallOrNoop(
401 underlyingSource, 'cancel', reason, 'underlyingSource.cancel');
402 return thenPromise(sourceCancelPromise, () => undefined); 435 return thenPromise(sourceCancelPromise, () => undefined);
403 } 436 }
404 437
405 function CloseReadableStream(stream) { 438 function ReadableStreamDefaultControllerClose(controller) {
406 if (stream[readableStreamState] === STATE_CLOSED) { 439 const stream = controller[readableStreamDefaultControllerControlledReadableS tream];
407 return undefined;
408 }
409 440
410 stream[readableStreamBits] |= CLOSE_REQUESTED; 441 controller[readableStreamDefaultControllerBits] |= CLOSE_REQUESTED;
411 442
412 if (stream[readableStreamQueue].length === 0) { 443 if (controller[readableStreamDefaultControllerQueue].length === 0) {
413 return FinishClosingReadableStream(stream); 444 ReadableStreamClose(stream);
414 } 445 }
415 } 446 }
416 447
417 function EnqueueInReadableStream(stream, chunk) { 448 function ReadableStreamFulfillReadRequest(stream, chunk, done) {
418 if (stream[readableStreamState] === STATE_CLOSED) { 449 const reader = stream[readableStreamReader];
419 return undefined;
420 }
421 450
422 if (IsReadableStreamLocked(stream) === true && 451 const readRequest =
423 stream[readableStreamReader][readableStreamReaderReadRequests].length > 452 stream[readableStreamReader][readableStreamDefaultReaderReadRequests]
424 0) { 453 .shift();
425 const readRequest = 454 v8.resolvePromise(readRequest, CreateIterResultObject(chunk, done));
426 stream[readableStreamReader][readableStreamReaderReadRequests] 455 }
427 .shift(); 456
428 v8.resolvePromise(readRequest, CreateIterResultObject(chunk, false)); 457 function ReadableStreamDefaultControllerEnqueue(controller, chunk) {
458 const stream = controller[readableStreamDefaultControllerControlledReadableS tream];
459
460 if (IsReadableStreamLocked(stream) === true && ReadableStreamGetNumReadReque sts(stream) > 0) {
461 ReadableStreamFulfillReadRequest(stream, chunk, false);
429 } else { 462 } else {
430 let chunkSize = 1; 463 let chunkSize = 1;
431 464
432 const strategySize = stream[readableStreamStrategySize]; 465 const strategySize = controller[readableStreamDefaultControllerStrategySiz e];
433 if (strategySize !== undefined) { 466 if (strategySize !== undefined) {
434 try { 467 try {
435 chunkSize = strategySize(chunk); 468 chunkSize = strategySize(chunk);
436 } catch (chunkSizeE) { 469 } catch (chunkSizeE) {
437 if (stream[readableStreamState] === STATE_READABLE) { 470 if (ReadableStreamGetState(stream) === STATE_READABLE) {
438 ErrorReadableStream(stream, chunkSizeE); 471 ReadableStreamDefaultControllerError(controller, chunkSizeE);
439 } 472 }
440 throw chunkSizeE; 473 throw chunkSizeE;
441 } 474 }
442 } 475 }
443 476
444 try { 477 try {
445 EnqueueValueWithSize(stream, chunk, chunkSize); 478 EnqueueValueWithSize(controller, chunk, chunkSize);
446 } catch (enqueueE) { 479 } catch (enqueueE) {
447 if (stream[readableStreamState] === STATE_READABLE) { 480 if (ReadableStreamGetState(stream) === STATE_READABLE) {
448 ErrorReadableStream(stream, enqueueE); 481 ReadableStreamDefaultControllerError(controller, enqueueE);
449 } 482 }
450 throw enqueueE; 483 throw enqueueE;
451 } 484 }
452 } 485 }
453 486
454 RequestReadableStreamPull(stream); 487 ReadableStreamDefaultControllerCallPullIfNeeded(controller);
455 } 488 }
456 489
457 function ErrorReadableStream(stream, e) { 490 function ReadableStreamGetState(stream) {
458 stream[readableStreamQueue] = new v8.InternalPackedArray(); 491 return (stream[readableStreamBits] & STATE_MASK) >> STATE_BITS_OFFSET;
492 }
493
494 function ReadableStreamSetState(stream, state) {
495 stream[readableStreamBits] = (stream[readableStreamBits] & ~STATE_MASK) |
496 (state << STATE_BITS_OFFSET);
497 }
498
499 function ReadableStreamDefaultControllerError(controller, e) {
500 controller[readableStreamDefaultControllerQueue] = new v8.InternalPackedArra y();
501 const stream = controller[readableStreamDefaultControllerControlledReadableS tream];
502 ReadableStreamError(stream, e);
503 }
504
505 function ReadableStreamError(stream, e) {
459 stream[readableStreamStoredError] = e; 506 stream[readableStreamStoredError] = e;
460 stream[readableStreamState] = STATE_ERRORED; 507 ReadableStreamSetState(stream, STATE_ERRORED);
461 508
462 const reader = stream[readableStreamReader]; 509 const reader = stream[readableStreamReader];
463 if (reader === undefined) { 510 if (reader === undefined) {
464 return undefined; 511 return undefined;
465 } 512 }
466 513
467 const readRequests = reader[readableStreamReaderReadRequests]; 514 if (IsReadableStreamDefaultReader(reader) === true) {
468 for (let i = 0; i < readRequests.length; ++i) { 515 const readRequests = reader[readableStreamDefaultReaderReadRequests];
469 v8.rejectPromise(readRequests[i], e); 516 for (let i = 0; i < readRequests.length; ++i) {
517 v8.rejectPromise(readRequests[i], e);
518 }
519 reader[readableStreamDefaultReaderReadRequests] = new v8.InternalPackedArr ay();
520 } else {
521 const readIntoRequests = reader[readableStreamReaderReadIntoRequests];
522 for (let i = 0; i < readIntoRequests.length; ++i) {
523 v8.rejectPromise(readIntoRequests[i], e);
524 }
525 reader[readableStreamReaderReadIntoRequests] = new v8.InternalPackedArray( );
470 } 526 }
471 reader[readableStreamReaderReadRequests] = new v8.InternalPackedArray();
472 527
473 v8.rejectPromise(reader[readableStreamReaderClosedPromise], e); 528 v8.rejectPromise(reader[readableStreamReaderClosedPromise], e);
474 } 529 }
475 530
476 function FinishClosingReadableStream(stream) { 531 function ReadableStreamClose(stream) {
477 stream[readableStreamState] = STATE_CLOSED; 532 ReadableStreamSetState(stream, STATE_CLOSED);
478 533
479 const reader = stream[readableStreamReader]; 534 const reader = stream[readableStreamReader];
480 if (reader === undefined) { 535 if (reader === undefined) {
481 return undefined; 536 return undefined;
482 } 537 }
483 538
484 539 if (IsReadableStreamDefaultReader(reader) === true) {
485 const readRequests = reader[readableStreamReaderReadRequests]; 540 const readRequests = reader[readableStreamDefaultReaderReadRequests];
486 for (let i = 0; i < readRequests.length; ++i) { 541 for (let i = 0; i < readRequests.length; ++i) {
487 v8.resolvePromise( 542 v8.resolvePromise(
488 readRequests[i], CreateIterResultObject(undefined, true)); 543 readRequests[i], CreateIterResultObject(undefined, true));
544 }
545 reader[readableStreamDefaultReaderReadRequests] = new v8.InternalPackedArr ay();
489 } 546 }
490 reader[readableStreamReaderReadRequests] = new v8.InternalPackedArray();
491 547
492 v8.resolvePromise(reader[readableStreamReaderClosedPromise], undefined); 548 v8.resolvePromise(reader[readableStreamReaderClosedPromise], undefined);
493 } 549 }
494 550
495 function GetReadableStreamDesiredSize(stream) { 551 function ReadableStreamDefaultControllerGetDesiredSize(controller) {
496 const queueSize = GetTotalQueueSize(stream); 552 const queueSize = GetTotalQueueSize(controller);
497 return stream[readableStreamStrategyHWM] - queueSize; 553 return controller[readableStreamDefaultControllerStrategyHWM] - queueSize;
498 } 554 }
499 555
500 function IsReadableStream(x) { 556 function IsReadableStream(x) {
501 return hasOwnProperty(x, readableStreamUnderlyingSource); 557 return hasOwnProperty(x, readableStreamController);
502 } 558 }
503 559
504 function IsReadableStreamDisturbed(stream) { 560 function IsReadableStreamDisturbed(stream) {
505 return stream[readableStreamBits] & DISTURBED; 561 return stream[readableStreamBits] & DISTURBED;
506 } 562 }
507 563
508 function SetReadableStreamDisturbed(stream) { 564 function SetReadableStreamDisturbed(stream) {
509 return stream[readableStreamBits] |= DISTURBED; 565 return stream[readableStreamBits] |= DISTURBED;
510 } 566 }
511 567
512 function IsReadableStreamLocked(stream) { 568 function IsReadableStreamLocked(stream) {
513 return stream[readableStreamReader] !== undefined; 569 return stream[readableStreamReader] !== undefined;
514 } 570 }
515 571
516 function IsReadableStreamController(x) { 572 function IsReadableStreamDefaultController(x) {
517 return hasOwnProperty(x, readableStreamControllerControlledReadableStream); 573 return hasOwnProperty(x, readableStreamDefaultControllerControlledReadableSt ream);
574 }
575
576 function IsReadableStreamDefaultReader(x) {
577 return hasOwnProperty(x, readableStreamDefaultReaderReadRequests);
518 } 578 }
519 579
520 function IsReadableStreamReadable(stream) { 580 function IsReadableStreamReadable(stream) {
521 return stream[readableStreamState] === STATE_READABLE; 581 return ReadableStreamGetState(stream) === STATE_READABLE;
522 } 582 }
523 583
524 function IsReadableStreamClosed(stream) { 584 function IsReadableStreamClosed(stream) {
525 return stream[readableStreamState] === STATE_CLOSED; 585 return ReadableStreamGetState(stream) === STATE_CLOSED;
526 } 586 }
527 587
528 function IsReadableStreamErrored(stream) { 588 function IsReadableStreamErrored(stream) {
529 return stream[readableStreamState] === STATE_ERRORED; 589 return ReadableStreamGetState(stream) === STATE_ERRORED;
530 } 590 }
531 591
532 function IsReadableStreamReader(x) { 592 function ReadableStreamReaderGenericInitialize(reader, stream) {
533 return hasOwnProperty(x, readableStreamReaderOwnerReadableStream); 593 // TODO(yhirano): Remove this when we don't need hasPendingActivity in
594 // blink::UnderlyingSourceBase.
595 const controller = stream[readableStreamController];
596 if (controller[readableStreamDefaultControllerBits] & EXTERNALLY_CONTROLLED) {
597 // The stream is created with an external controller (i.e. made in
598 // Blink).
599 const underlyingSource = controller[readableStreamDefaultControllerUnderly ingSource];
600 callFunction(underlyingSource.notifyLockAcquired, underlyingSource);
601 }
602
603 reader[readableStreamReaderOwnerReadableStream] = stream;
604 stream[readableStreamReader] = reader;
605
606 switch (ReadableStreamGetState(stream)) {
607 case STATE_READABLE:
608 reader[readableStreamReaderClosedPromise] = v8.createPromise();
609 break;
610 case STATE_CLOSED:
611 reader[readableStreamReaderClosedPromise] = Promise_resolve(undefined);
612 break;
613 case STATE_ERRORED:
614 reader[readableStreamReaderClosedPromise] =
615 Promise_reject(stream[readableStreamStoredError]);
616 break;
617 }
534 } 618 }
535 619
536 function ReadFromReadableStreamReader(reader) { 620 function ReadableStreamReaderGenericRelease(reader) {
621 // TODO(yhirano): Remove this when we don't need hasPendingActivity in
622 // blink::UnderlyingSourceBase.
623 const controller = reader[readableStreamReaderOwnerReadableStream][readableS treamController];
624 if (controller[readableStreamDefaultControllerBits] & EXTERNALLY_CONTROLLED) {
625 // The stream is created with an external controller (i.e. made in
626 // Blink).
627 const underlyingSource = controller[readableStreamDefaultControllerUnderly ingSource];
628 callFunction(underlyingSource.notifyLockReleased, underlyingSource);
629 }
630
631 if (ReadableStreamGetState(reader[readableStreamReaderOwnerReadableStream]) === STATE_READABLE) {
632 v8.rejectPromise(reader[readableStreamReaderClosedPromise], new TypeError( errReleasedReaderClosedPromise));
633 } else {
634 reader[readableStreamReaderClosedPromise] = Promise_reject(new TypeError(e rrReleasedReaderClosedPromise));
635 }
636
637 reader[readableStreamReaderOwnerReadableStream][readableStreamReader] =
638 undefined;
639 reader[readableStreamReaderOwnerReadableStream] = undefined;
640 }
641
642 function ReadableStreamDefaultReaderRead(reader) {
537 const stream = reader[readableStreamReaderOwnerReadableStream]; 643 const stream = reader[readableStreamReaderOwnerReadableStream];
538 stream[readableStreamBits] |= DISTURBED; 644 stream[readableStreamBits] |= DISTURBED;
539 645
540 if (stream[readableStreamState] === STATE_CLOSED) { 646 if (ReadableStreamGetState(stream) === STATE_CLOSED) {
541 return Promise_resolve(CreateIterResultObject(undefined, true)); 647 return Promise_resolve(CreateIterResultObject(undefined, true));
542 } 648 }
543 649
544 if (stream[readableStreamState] === STATE_ERRORED) { 650 if (ReadableStreamGetState(stream) === STATE_ERRORED) {
545 return Promise_reject(stream[readableStreamStoredError]); 651 return Promise_reject(stream[readableStreamStoredError]);
546 } 652 }
547 653
548 const queue = stream[readableStreamQueue]; 654 return ReadableStreamDefaultControllerPull(stream[readableStreamController]) ;
549 if (queue.length > 0) {
550 const chunk = DequeueValue(stream);
551
552 if (stream[readableStreamBits] & CLOSE_REQUESTED && queue.length === 0) {
553 FinishClosingReadableStream(stream);
554 } else {
555 RequestReadableStreamPull(stream);
556 }
557
558 return Promise_resolve(CreateIterResultObject(chunk, false));
559 } else {
560 const readRequest = v8.createPromise();
561
562 reader[readableStreamReaderReadRequests].push(readRequest);
563 RequestReadableStreamPull(stream);
564 return readRequest;
565 }
566 } 655 }
567 656
568 function RequestReadableStreamPull(stream) { 657 function ReadableStreamDefaultControllerCallPullIfNeeded(controller) {
569 const shouldPull = ShouldReadableStreamPull(stream); 658 const shouldPull = ReadableStreamDefaultControllerShouldPull(controller);
570 if (shouldPull === false) { 659 if (shouldPull === false) {
571 return undefined; 660 return undefined;
572 } 661 }
573 662
574 if (stream[readableStreamBits] & PULLING) { 663 if (controller[readableStreamDefaultControllerBits] & PULLING) {
575 stream[readableStreamBits] |= PULL_AGAIN; 664 controller[readableStreamDefaultControllerBits] |= PULL_AGAIN;
576 return undefined; 665 return undefined;
577 } 666 }
578 667
579 stream[readableStreamBits] |= PULLING; 668 controller[readableStreamDefaultControllerBits] |= PULLING;
580 669
581 const underlyingSource = stream[readableStreamUnderlyingSource]; 670 const underlyingSource = controller[readableStreamDefaultControllerUnderlyin gSource];
582 const controller = stream[readableStreamController];
583 const pullPromise = PromiseCallOrNoop( 671 const pullPromise = PromiseCallOrNoop(
584 underlyingSource, 'pull', controller, 'underlyingSource.pull'); 672 underlyingSource, 'pull', controller, 'underlyingSource.pull');
585 673
586 thenPromise(pullPromise, 674 thenPromise(pullPromise,
587 () => { 675 () => {
588 stream[readableStreamBits] &= ~PULLING; 676 controller[readableStreamDefaultControllerBits] &= ~PULLING;
589 677
590 if (stream[readableStreamBits] & PULL_AGAIN) { 678 if (controller[readableStreamDefaultControllerBits] & PULL_AGAIN) {
591 stream[readableStreamBits] &= ~PULL_AGAIN; 679 controller[readableStreamDefaultControllerBits] &= ~PULL_AGAIN;
592 return RequestReadableStreamPull(stream); 680 return ReadableStreamDefaultControllerCallPullIfNeeded(controller);
593 } 681 }
594 }, 682 },
595 e => { 683 e => {
596 if (stream[readableStreamState] === STATE_READABLE) { 684 if (ReadableStreamGetState(controller[readableStreamDefaultControllerC ontrolledReadableStream]) === STATE_READABLE) {
597 return ErrorReadableStream(stream, e); 685 return ReadableStreamDefaultControllerError(controller, e);
598 } 686 }
599 }); 687 });
600 } 688 }
601 689
602 function ShouldReadableStreamPull(stream) { 690 function ReadableStreamDefaultControllerShouldPull(controller) {
603 const state = stream[readableStreamState]; 691 const stream = controller[readableStreamDefaultControllerControlledReadableS tream];
692
693 const state = ReadableStreamGetState(stream);
604 if (state === STATE_CLOSED || state === STATE_ERRORED) { 694 if (state === STATE_CLOSED || state === STATE_ERRORED) {
605 return false; 695 return false;
606 } 696 }
607 697
608 if (stream[readableStreamBits] & CLOSE_REQUESTED) { 698 if (controller[readableStreamDefaultControllerBits] & CLOSE_REQUESTED) {
609 return false; 699 return false;
610 } 700 }
611 701
612 if (!(stream[readableStreamBits] & STARTED)) { 702 if (!(controller[readableStreamDefaultControllerBits] & STARTED)) {
613 return false; 703 return false;
614 } 704 }
615 705
616 if (IsReadableStreamLocked(stream) === true) { 706 if (IsReadableStreamLocked(stream) === true && ReadableStreamGetNumReadReque sts(stream) > 0) {
617 const reader = stream[readableStreamReader]; 707 return true;
618 const readRequests = reader[readableStreamReaderReadRequests];
619 if (readRequests.length > 0) {
620 return true;
621 }
622 } 708 }
623 709
624 const desiredSize = GetReadableStreamDesiredSize(stream); 710 const desiredSize = ReadableStreamDefaultControllerGetDesiredSize(controller );
625 if (desiredSize > 0) { 711 if (desiredSize > 0) {
626 return true; 712 return true;
627 } 713 }
628 714
629 return false; 715 return false;
630 } 716 }
631 717
718 function ReadableStreamGetNumReadRequests(stream) {
719 const reader = stream[readableStreamReader];
720 const readRequests = reader[readableStreamDefaultReaderReadRequests];
721 return readRequests.length;
722 }
723
632 // Potential future optimization: use class instances for the underlying 724 // Potential future optimization: use class instances for the underlying
633 // sources, so that we don't re-create 725 // sources, so that we don't re-create
634 // closures every time. 726 // closures every time.
635 727
636 // TODO(domenic): shouldClone argument from spec not supported yet 728 // TODO(domenic): shouldClone argument from spec not supported yet
637 function TeeReadableStream(stream) { 729 function ReadableStreamTee(stream) {
638 const reader = AcquireReadableStreamReader(stream); 730 const reader = AcquireReadableStreamDefaultReader(stream);
639 731
640 let closedOrErrored = false; 732 let closedOrErrored = false;
641 let canceled1 = false; 733 let canceled1 = false;
642 let canceled2 = false; 734 let canceled2 = false;
643 let reason1; 735 let reason1;
644 let reason2; 736 let reason2;
645 let promise = v8.createPromise(); 737 let promise = v8.createPromise();
646 738
647 const branch1 = new ReadableStream({pull, cancel: cancel1}); 739 const branch1Stream = new ReadableStream({pull, cancel: cancel1});
648 740
649 const branch2 = new ReadableStream({pull, cancel: cancel2}); 741 const branch2Stream = new ReadableStream({pull, cancel: cancel2});
742
743 const branch1 = branch1Stream[readableStreamController];
744 const branch2 = branch2Stream[readableStreamController];
650 745
651 thenPromise( 746 thenPromise(
652 reader[readableStreamReaderClosedPromise], undefined, function(r) { 747 reader[readableStreamReaderClosedPromise], undefined, function(r) {
653 if (closedOrErrored === true) { 748 if (closedOrErrored === true) {
654 return; 749 return;
655 } 750 }
656 751
657 ErrorReadableStream(branch1, r); 752 ReadableStreamDefaultControllerError(branch1, r);
658 ErrorReadableStream(branch2, r); 753 ReadableStreamDefaultControllerError(branch2, r);
659 closedOrErrored = true; 754 closedOrErrored = true;
660 }); 755 });
661 756
662 return [branch1, branch2]; 757 return [branch1Stream, branch2Stream];
663
664 758
665 function pull() { 759 function pull() {
666 return thenPromise( 760 return thenPromise(
667 ReadFromReadableStreamReader(reader), function(result) { 761 ReadableStreamDefaultReaderRead(reader), function(result) {
668 const value = result.value; 762 const value = result.value;
669 const done = result.done; 763 const done = result.done;
670 764
671 if (done === true && closedOrErrored === false) { 765 if (done === true && closedOrErrored === false) {
672 CloseReadableStream(branch1); 766 if (canceled1 === false) {
673 CloseReadableStream(branch2); 767 ReadableStreamDefaultControllerClose(branch1);
768 }
769 if (canceled2 === false) {
770 ReadableStreamDefaultControllerClose(branch2);
771 }
674 closedOrErrored = true; 772 closedOrErrored = true;
675 } 773 }
676 774
677 if (closedOrErrored === true) { 775 if (closedOrErrored === true) {
678 return; 776 return;
679 } 777 }
680 778
681 if (canceled1 === false) { 779 if (canceled1 === false) {
682 EnqueueInReadableStream(branch1, value); 780 ReadableStreamDefaultControllerEnqueue(branch1, value);
683 } 781 }
684 782
685 if (canceled2 === false) { 783 if (canceled2 === false) {
686 EnqueueInReadableStream(branch2, value); 784 ReadableStreamDefaultControllerEnqueue(branch2, value);
687 } 785 }
688 }); 786 });
689 } 787 }
690 788
691 function cancel1(reason) { 789 function cancel1(reason) {
692 canceled1 = true; 790 canceled1 = true;
693 reason1 = reason; 791 reason1 = reason;
694 792
695 if (canceled2 === true) { 793 if (canceled2 === true) {
696 const compositeReason = [reason1, reason2]; 794 const compositeReason = [reason1, reason2];
697 const cancelResult = CancelReadableStream(stream, compositeReason); 795 const cancelResult = ReadableStreamCancel(stream, compositeReason);
698 v8.resolvePromise(promise, cancelResult); 796 v8.resolvePromise(promise, cancelResult);
699 } 797 }
700 798
701 return promise; 799 return promise;
702 } 800 }
703 801
704 function cancel2(reason) { 802 function cancel2(reason) {
705 canceled2 = true; 803 canceled2 = true;
706 reason2 = reason; 804 reason2 = reason;
707 805
708 if (canceled1 === true) { 806 if (canceled1 === true) {
709 const compositeReason = [reason1, reason2]; 807 const compositeReason = [reason1, reason2];
710 const cancelResult = CancelReadableStream(stream, compositeReason); 808 const cancelResult = ReadableStreamCancel(stream, compositeReason);
711 v8.resolvePromise(promise, cancelResult); 809 v8.resolvePromise(promise, cancelResult);
712 } 810 }
713 811
714 return promise; 812 return promise;
715 } 813 }
716 } 814 }
717 815
718 // 816 //
719 // Queue-with-sizes 817 // Queue-with-sizes
720 // Modified from taking the queue (as in the spec) to taking the stream, so we 818 // Modified from taking the queue (as in the spec) to taking the stream, so we
721 // can modify the queue size alongside. 819 // can modify the queue size alongside.
722 // 820 //
723 821
724 function DequeueValue(stream) { 822 function DequeueValue(controller) {
725 const result = stream[readableStreamQueue].shift(); 823 const result = controller[readableStreamDefaultControllerQueue].shift();
726 stream[readableStreamQueueSize] -= result.size; 824 controller[readableStreamDefaultControllerQueueSize] -= result.size;
727 return result.value; 825 return result.value;
728 } 826 }
729 827
730 function EnqueueValueWithSize(stream, value, size) { 828 function EnqueueValueWithSize(controller, value, size) {
731 size = Number(size); 829 size = Number(size);
732 if (Number_isNaN(size) || size === +Infinity || size < 0) { 830 if (Number_isNaN(size) || size === +Infinity || size < 0) {
733 throw new RangeError(errInvalidSize); 831 throw new RangeError(errInvalidSize);
734 } 832 }
735 833
736 stream[readableStreamQueueSize] += size; 834 controller[readableStreamDefaultControllerQueueSize] += size;
737 stream[readableStreamQueue].push({value, size}); 835 controller[readableStreamDefaultControllerQueue].push({value, size});
738 } 836 }
739 837
740 function GetTotalQueueSize(stream) { return stream[readableStreamQueueSize]; } 838 function GetTotalQueueSize(controller) { return controller[readableStreamDefau ltControllerQueueSize]; }
741 839
742 // 840 //
743 // Other helpers 841 // Other helpers
744 // 842 //
745 843
746 function ValidateAndNormalizeQueuingStrategy(size, highWaterMark) { 844 function ValidateAndNormalizeQueuingStrategy(size, highWaterMark) {
747 if (size !== undefined && typeof size !== 'function') { 845 if (size !== undefined && typeof size !== 'function') {
748 throw new TypeError(errSizeNotAFunction); 846 throw new TypeError(errSizeNotAFunction);
749 } 847 }
750 848
(...skipping 29 matching lines...) Expand all
780 method = O[P]; 878 method = O[P];
781 } catch (methodE) { 879 } catch (methodE) {
782 return Promise_reject(methodE); 880 return Promise_reject(methodE);
783 } 881 }
784 882
785 if (method === undefined) { 883 if (method === undefined) {
786 return Promise_resolve(undefined); 884 return Promise_resolve(undefined);
787 } 885 }
788 886
789 if (typeof method !== 'function') { 887 if (typeof method !== 'function') {
790 return Promise_reject(errTmplMustBeFunctionOrUndefined(nameForError)); 888 return Promise_reject(new TypeError(errTmplMustBeFunctionOrUndefined(nameF orError)));
791 } 889 }
792 890
793 try { 891 try {
794 return Promise_resolve(callFunction(method, O, arg)); 892 return Promise_resolve(callFunction(method, O, arg));
795 } catch (e) { 893 } catch (e) {
796 return Promise_reject(e); 894 return Promise_reject(e);
797 } 895 }
798 } 896 }
799 897
800 function CreateIterResultObject(value, done) { return {value, done}; } 898 function CreateIterResultObject(value, done) { return {value, done}; }
801 899
802 900
803 // 901 //
804 // Additions to the global 902 // Additions to the global
805 // 903 //
806 904
807 defineProperty(global, 'ReadableStream', { 905 defineProperty(global, 'ReadableStream', {
808 value: ReadableStream, 906 value: ReadableStream,
809 enumerable: false, 907 enumerable: false,
810 configurable: true, 908 configurable: true,
811 writable: true 909 writable: true
812 }); 910 });
813 911
814 // 912 //
815 // Exports to Blink 913 // Exports to Blink
816 // 914 //
817 915
818 binding.AcquireReadableStreamReader = AcquireReadableStreamReader; 916 binding.AcquireReadableStreamDefaultReader = AcquireReadableStreamDefaultReade r;
819 binding.IsReadableStream = IsReadableStream; 917 binding.IsReadableStream = IsReadableStream;
820 binding.IsReadableStreamDisturbed = IsReadableStreamDisturbed; 918 binding.IsReadableStreamDisturbed = IsReadableStreamDisturbed;
821 binding.SetReadableStreamDisturbed = SetReadableStreamDisturbed; 919 binding.SetReadableStreamDisturbed = SetReadableStreamDisturbed;
822 binding.IsReadableStreamLocked = IsReadableStreamLocked; 920 binding.IsReadableStreamLocked = IsReadableStreamLocked;
823 binding.IsReadableStreamReadable = IsReadableStreamReadable; 921 binding.IsReadableStreamReadable = IsReadableStreamReadable;
824 binding.IsReadableStreamClosed = IsReadableStreamClosed; 922 binding.IsReadableStreamClosed = IsReadableStreamClosed;
825 binding.IsReadableStreamErrored = IsReadableStreamErrored; 923 binding.IsReadableStreamErrored = IsReadableStreamErrored;
826 binding.IsReadableStreamReader = IsReadableStreamReader; 924 binding.IsReadableStreamDefaultReader = IsReadableStreamDefaultReader;
827 binding.ReadFromReadableStreamReader = ReadFromReadableStreamReader; 925 binding.ReadableStreamDefaultReaderRead = ReadableStreamDefaultReaderRead;
828 926
829 binding.CloseReadableStream = CloseReadableStream; 927 binding.ReadableStreamDefaultControllerClose = ReadableStreamDefaultController Close;
830 binding.GetReadableStreamDesiredSize = GetReadableStreamDesiredSize; 928 binding.ReadableStreamDefaultControllerGetDesiredSize = ReadableStreamDefaultC ontrollerGetDesiredSize;
831 binding.EnqueueInReadableStream = EnqueueInReadableStream; 929 binding.ReadableStreamDefaultControllerEnqueue = ReadableStreamDefaultControll erEnqueue;
832 binding.ErrorReadableStream = ErrorReadableStream; 930 binding.ReadableStreamDefaultControllerError = ReadableStreamDefaultController Error;
833 931
834 binding.createReadableStreamWithExternalController = 932 binding.createReadableStreamWithExternalController =
835 (underlyingSource, strategy) => { 933 (underlyingSource, strategy) => {
836 return new ReadableStream( 934 return new ReadableStream(
837 underlyingSource, strategy, createWithExternalControllerSentinel); 935 underlyingSource, strategy, createWithExternalControllerSentinel);
838 }; 936 };
839 }); 937 });
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698