Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(134)

Side by Side Diff: third_party/WebKit/Source/core/streams/ReadableStream.js

Issue 1902673003: Reflect recent spec changes to V8 Extra ReadableStream impl (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Some reordering Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 (function(global, binding, v8) { 5 (function(global, binding, v8) {
6 'use strict'; 6 'use strict';
7 7
8 const readableStreamController = v8.createPrivateSymbol('[[controller]]');
9 const readableStreamQueue = v8.createPrivateSymbol('[[queue]]');
10 const readableStreamQueueSize =
11 v8.createPrivateSymbol('[[queue]] total size');
12 const readableStreamReader = v8.createPrivateSymbol('[[reader]]'); 8 const readableStreamReader = v8.createPrivateSymbol('[[reader]]');
13 const readableStreamState = v8.createPrivateSymbol('[[state]]'); 9 const readableStreamState = v8.createPrivateSymbol('[[state]]');
14 const readableStreamStoredError = v8.createPrivateSymbol('[[storedError]]'); 10 const readableStreamStoredError = v8.createPrivateSymbol('[[storedError]]');
15 const readableStreamStrategySize = v8.createPrivateSymbol('[[strategySize]]'); 11 const readableStreamController = v8.createPrivateSymbol('[[controller]]');
16 const readableStreamStrategyHWM = v8.createPrivateSymbol('[[strategyHWM]]');
17 const readableStreamUnderlyingSource =
18 v8.createPrivateSymbol('[[underlyingSource]]');
19
20 const readableStreamControllerControlledReadableStream =
21 v8.createPrivateSymbol('[[controlledReadableStream]]');
22 12
23 const readableStreamReaderClosedPromise = 13 const readableStreamReaderClosedPromise =
24 v8.createPrivateSymbol('[[closedPromise]]'); 14 v8.createPrivateSymbol('[[closedPromise]]');
25 const readableStreamReaderOwnerReadableStream = 15 const readableStreamReaderOwnerReadableStream =
26 v8.createPrivateSymbol('[[ownerReadableStream]]'); 16 v8.createPrivateSymbol('[[ownerReadableStream]]');
27 const readableStreamReaderReadRequests = 17
18 const readableStreamDefaultReaderReadRequests =
28 v8.createPrivateSymbol('[[readRequests]]'); 19 v8.createPrivateSymbol('[[readRequests]]');
29 20
30 const createWithExternalControllerSentinel = 21 const createWithExternalControllerSentinel =
31 v8.createPrivateSymbol('flag for UA-created ReadableStream to pass'); 22 v8.createPrivateSymbol('flag for UA-created ReadableStream to pass');
32 23
33 const STATE_READABLE = 0; 24 const STATE_READABLE = 0;
34 const STATE_CLOSED = 1; 25 const STATE_CLOSED = 1;
35 const STATE_ERRORED = 2; 26 const STATE_ERRORED = 2;
36 27
37 const readableStreamBits = v8.createPrivateSymbol( 28 const readableStreamBits = v8.createPrivateSymbol('bit field for [[disturbed]] ');
domenic 2016/04/27 21:12:23 It's a bit unfortunate that we need a whole new 64
tyoshino (SeeGerritForStatus) 2016/04/28 14:51:11 Oh, yes. Combined with [[state]] instead.
38 'bit field for [[started]], [[closeRequested]], [[pulling]], [[pullAgain]] , [[disturbed]]'); 29 const DISTURBED = 0b1;
30
31 const readableStreamDefaultControllerUnderlyingSource =
32 v8.createPrivateSymbol('[[underlyingSource]]');
33 const readableStreamDefaultControllerControlledReadableStream =
34 v8.createPrivateSymbol('[[controlledReadableStream]]');
35 const readableStreamDefaultControllerQueue = v8.createPrivateSymbol('[[queue]] ');
36 const readableStreamDefaultControllerQueueSize =
37 v8.createPrivateSymbol('[[queue]] total size');
38 const readableStreamDefaultControllerStrategySize =
39 v8.createPrivateSymbol('[[strategySize]]');
40 const readableStreamDefaultControllerStrategyHWM =
41 v8.createPrivateSymbol('[[strategyHWM]]');
42
43 const readableStreamDefaultControllerBits = v8.createPrivateSymbol(
44 'bit field for [[started]], [[closeRequested]], [[pulling]], [[pullAgain]] ');
39 const STARTED = 0b1; 45 const STARTED = 0b1;
40 const CLOSE_REQUESTED = 0b10; 46 const CLOSE_REQUESTED = 0b10;
41 const PULLING = 0b100; 47 const PULLING = 0b100;
42 const PULL_AGAIN = 0b1000; 48 const PULL_AGAIN = 0b1000;
43 const DISTURBED = 0b10000; 49 const EXTERNALLY_CONTROLLED = 0b10000;
50
51 const readableStreamControllerCancel =
52 v8.createPrivateSymbol('[[InternalCancel]]');
53 const readableStreamControllerPull = v8.createPrivateSymbol('[[InternalPull]]' );
44 54
45 const undefined = global.undefined; 55 const undefined = global.undefined;
46 const Infinity = global.Infinity; 56 const Infinity = global.Infinity;
47 57
48 const defineProperty = global.Object.defineProperty; 58 const defineProperty = global.Object.defineProperty;
49 const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty); 59 const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty);
50 const callFunction = v8.uncurryThis(global.Function.prototype.call); 60 const callFunction = v8.uncurryThis(global.Function.prototype.call);
51 61
52 const TypeError = global.TypeError; 62 const TypeError = global.TypeError;
53 const RangeError = global.RangeError; 63 const RangeError = global.RangeError;
54 64
55 const Number = global.Number; 65 const Number = global.Number;
56 const Number_isNaN = Number.isNaN; 66 const Number_isNaN = Number.isNaN;
57 const Number_isFinite = Number.isFinite; 67 const Number_isFinite = Number.isFinite;
58 68
59 const Promise = global.Promise; 69 const Promise = global.Promise;
60 const thenPromise = v8.uncurryThis(Promise.prototype.then); 70 const thenPromise = v8.uncurryThis(Promise.prototype.then);
61 const Promise_resolve = v8.simpleBind(Promise.resolve, Promise); 71 const Promise_resolve = v8.simpleBind(Promise.resolve, Promise);
62 const Promise_reject = v8.simpleBind(Promise.reject, Promise); 72 const Promise_reject = v8.simpleBind(Promise.reject, Promise);
63 73
64 const errIllegalInvocation = 'Illegal invocation'; 74 const errIllegalInvocation = 'Illegal invocation';
65 const errIllegalConstructor = 'Illegal constructor'; 75 const errIllegalConstructor = 'Illegal constructor';
66 const errCancelLockedStream = 76 const errCancelLockedStream =
67 'Cannot cancel a readable stream that is locked to a reader'; 77 'Cannot cancel a readable stream that is locked to a reader';
68 const errEnqueueInCloseRequestedStream = 78 const errEnqueueCloseRequestedStream =
69 'Cannot enqueue a chunk into a readable stream that is closed or has been requested to be closed'; 79 'Cannot enqueue a chunk into a readable stream that is closed or has been requested to be closed';
70 const errCancelReleasedReader = 80 const errCancelReleasedReader =
71 'This readable stream reader has been released and cannot be used to cance l its previous owner stream'; 81 'This readable stream reader has been released and cannot be used to cance l its previous owner stream';
72 const errReadReleasedReader = 82 const errReadReleasedReader =
73 'This readable stream reader has been released and cannot be used to read from its previous owner stream'; 83 'This readable stream reader has been released and cannot be used to read from its previous owner stream';
74 const errCloseCloseRequestedStream = 84 const errCloseCloseRequestedStream =
75 'Cannot close a readable stream that has already been requested to be clos ed'; 85 'Cannot close a readable stream that has already been requested to be clos ed';
86 const errEnqueueClosedStream = 'Cannot enqueue a closed readable stream';
domenic 2016/04/27 21:12:24 "Cannot enqueue a chunk into a closed readable str
tyoshino (SeeGerritForStatus) 2016/04/28 14:51:11 Done.
87 const errEnqueueErroredStream = 'Cannot enqueue an errored readable stream';
domenic 2016/04/27 21:12:24 "Cannot enqueue a chunk into an errored readable s
tyoshino (SeeGerritForStatus) 2016/04/28 14:51:11 Done.
88 const errCloseClosedStream = 'Cannot close a closed readable stream';
76 const errCloseErroredStream = 'Cannot close an errored readable stream'; 89 const errCloseErroredStream = 'Cannot close an errored readable stream';
77 const errErrorClosedStream = 'Cannot error a close readable stream'; 90 const errErrorClosedStream = 'Cannot error a close readable stream';
78 const errErrorErroredStream = 91 const errErrorErroredStream =
79 'Cannot error a readable stream that is already errored'; 92 'Cannot error a readable stream that is already errored';
93 const errGetReaderNotByteStream = 'Cannot get a ReadableStreamBYOBReader for a stream not constructed with a byte source';
domenic 2016/04/27 21:12:24 "This readable stream does not support BYOB reader
tyoshino (SeeGerritForStatus) 2016/04/28 14:51:11 Done.
94 const errGetReaderBadMode = 'Invalid mode is specified';
domenic 2016/04/27 21:12:24 'Invalid reader mode given: expected undefined or
tyoshino (SeeGerritForStatus) 2016/04/28 14:51:11 Done.
80 const errReaderConstructorBadArgument = 95 const errReaderConstructorBadArgument =
81 'ReadableStreamReader constructor argument is not a readable stream'; 96 'ReadableStreamReader constructor argument is not a readable stream';
82 const errReaderConstructorStreamAlreadyLocked = 97 const errReaderConstructorStreamAlreadyLocked =
83 'ReadableStreamReader constructor can only accept readable streams that ar e not yet locked to a reader'; 98 'ReadableStreamReader constructor can only accept readable streams that ar e not yet locked to a reader';
84 const errReleaseReaderWithPendingRead = 99 const errReleaseReaderWithPendingRead =
85 'Cannot release a readable stream reader when it still has outstanding rea d() calls that have not yet settled'; 100 'Cannot release a readable stream reader when it still has outstanding rea d() calls that have not yet settled';
86 const errReleasedReaderClosedPromise = 101 const errReleasedReaderClosedPromise =
87 'This readable stream reader has been released and cannot be used to monit or the stream\'s state'; 102 'This readable stream reader has been released and cannot be used to monit or the stream\'s state';
88 const errInvalidSize = 103 const errInvalidSize =
89 'The return value of a queuing strategy\'s size function must be a finite, non-NaN, non-negative number'; 104 'The return value of a queuing strategy\'s size function must be a finite, non-NaN, non-negative number';
90 const errSizeNotAFunction = 105 const errSizeNotAFunction =
91 'A queuing strategy\'s size property must be a function'; 106 'A queuing strategy\'s size property must be a function';
92 const errInvalidHWM = 107 const errInvalidHWM =
93 'A queueing strategy\'s highWaterMark property must be a nonnegative, non- NaN number'; 108 'A queueing strategy\'s highWaterMark property must be a nonnegative, non- NaN number';
94 const errTmplMustBeFunctionOrUndefined = name => 109 const errTmplMustBeFunctionOrUndefined = name =>
95 `${name} must be a function or undefined`; 110 `${name} must be a function or undefined`;
96 111
97 class ReadableStream { 112 class ReadableStream {
98 constructor() { 113 constructor() {
99 // TODO(domenic): when V8 gets default parameters and destructuring, all 114 // TODO(domenic): when V8 gets default parameters and destructuring, all
100 // this can be cleaned up. 115 // this can be cleaned up.
101 const underlyingSource = arguments[0] === undefined ? {} : arguments[0]; 116 const underlyingSource = arguments[0] === undefined ? {} : arguments[0];
102 const strategy = arguments[1] === undefined ? {} : arguments[1]; 117 const strategy = arguments[1] === undefined ? {} : arguments[1];
103 const size = strategy.size; 118 const size = strategy.size;
104 let highWaterMark = strategy.highWaterMark; 119 let highWaterMark = strategy.highWaterMark;
105 if (highWaterMark === undefined) { 120 if (highWaterMark === undefined) {
106 highWaterMark = 1; 121 highWaterMark = 1;
107 } 122 }
108 123
109 const normalizedStrategy =
110 ValidateAndNormalizeQueuingStrategy(size, highWaterMark);
111
112 this[readableStreamUnderlyingSource] = underlyingSource;
113
114 this[readableStreamQueue] = new v8.InternalPackedArray();
115 this[readableStreamQueueSize] = 0;
116
117 this[readableStreamState] = STATE_READABLE; 124 this[readableStreamState] = STATE_READABLE;
118 this[readableStreamBits] = 0b0; 125 this[readableStreamBits] = 0b0;
119 this[readableStreamReader] = undefined; 126 this[readableStreamReader] = undefined;
120 this[readableStreamStoredError] = undefined; 127 this[readableStreamStoredError] = undefined;
121 128
122 this[readableStreamStrategySize] = normalizedStrategy.size;
123 this[readableStreamStrategyHWM] = normalizedStrategy.highWaterMark;
124
125 // Avoid allocating the controller if the stream is going to be controlled 129 // Avoid allocating the controller if the stream is going to be controlled
126 // externally (i.e. from C++) anyway. All calls to underlyingSource 130 // externally (i.e. from C++) anyway. All calls to underlyingSource
127 // methods will disregard their controller argument in such situations 131 // methods will disregard their controller argument in such situations
128 // (but see below). 132 // (but see below).
129 133
130 const isControlledExternally = 134 this[readableStreamController] = undefined;
131 arguments[2] === createWithExternalControllerSentinel;
132 const controller =
133 isControlledExternally ? null : new ReadableStreamController(this);
134 this[readableStreamController] = controller;
135 135
136 // We need to pass ourself to the underlyingSource start method for 136 const type = underlyingSource['type'];
domenic 2016/04/27 21:12:23 Nit: use .type notation
tyoshino (SeeGerritForStatus) 2016/04/28 14:51:11 Done.
137 // externally-controlled streams. We use the now-useless controller 137 const typeString = String(type);
138 // argument to do so. 138 if (typeString === 'bytes') {
139 const argToStart = isControlledExternally ? this : controller; 139 throw new RangeError('bytes type is not yet implemented');
140 } else if (type !== undefined) {
141 throw new RangeError('Invalid type is specified');
142 }
140 143
141 const startResult = CallOrNoop( 144 this[readableStreamController] =
142 underlyingSource, 'start', argToStart, 'underlyingSource.start'); 145 new ReadableStreamDefaultController(this, underlyingSource, size, high WaterMark, arguments[2] === createWithExternalControllerSentinel);
143 thenPromise(Promise_resolve(startResult),
144 () => {
145 this[readableStreamBits] |= STARTED;
146 RequestReadableStreamPull(this);
147 },
148 r => {
149 if (this[readableStreamState] === STATE_READABLE) {
150 return ErrorReadableStream(this, r);
151 }
152 });
153 } 146 }
154 147
155 get locked() { 148 get locked() {
156 if (IsReadableStream(this) === false) { 149 if (IsReadableStream(this) === false) {
157 throw new TypeError(errIllegalInvocation); 150 throw new TypeError(errIllegalInvocation);
158 } 151 }
159 152
160 return IsReadableStreamLocked(this); 153 return IsReadableStreamLocked(this);
161 } 154 }
162 155
163 cancel(reason) { 156 cancel(reason) {
164 if (IsReadableStream(this) === false) { 157 if (IsReadableStream(this) === false) {
165 return Promise_reject(new TypeError(errIllegalInvocation)); 158 return Promise_reject(new TypeError(errIllegalInvocation));
166 } 159 }
167 160
168 if (IsReadableStreamLocked(this) === true) { 161 if (IsReadableStreamLocked(this) === true) {
169 return Promise_reject(new TypeError(errCancelLockedStream)); 162 return Promise_reject(new TypeError(errCancelLockedStream));
170 } 163 }
171 164
172 return CancelReadableStream(this, reason); 165 return ReadableStreamCancel(this, reason);
173 } 166 }
174 167
175 getReader() { 168 getReader() {
domenic 2016/04/27 21:12:23 In recent versions of Chrome we can do this as `ge
tyoshino (SeeGerritForStatus) 2016/04/28 14:51:11 Done.
169 const options = arguments[0] === undefined ? {} : arguments[0];
170
176 if (IsReadableStream(this) === false) { 171 if (IsReadableStream(this) === false) {
177 throw new TypeError(errIllegalInvocation); 172 throw new TypeError(errIllegalInvocation);
178 } 173 }
179 174
180 return AcquireReadableStreamReader(this); 175 const mode = options.mode;
176 if (mode === 'byob') {
177 if (IsReadableByteStreamDefaultController(this[readableStreamController] ) === false) {
178 throw new TypeError(errGetReaderNotByteStream);
179 }
180
181 return AcquireReadableStreamBYOBReader(this);
182 }
183
184 if (mode === undefined) {
185 return AcquireReadableStreamDefaultReader(this);
186 }
187
188 throw new RangeError(errGetReaderBadMode);;
181 } 189 }
182 190
183 tee() { 191 tee() {
184 if (IsReadableStream(this) === false) { 192 if (IsReadableStream(this) === false) {
185 throw new TypeError(errIllegalInvocation); 193 throw new TypeError(errIllegalInvocation);
186 } 194 }
187 195
188 return TeeReadableStream(this); 196 return ReadableStreamTee(this);
189 } 197 }
190 } 198 }
191 199
192 class ReadableStreamController { 200 class ReadableStreamDefaultController {
193 constructor(stream) { 201 constructor(stream, underlyingSource, size, highWaterMark, isExternallyContr olled) {
194 if (IsReadableStream(stream) === false) { 202 if (IsReadableStream(stream) === false) {
195 throw new TypeError(errIllegalConstructor); 203 throw new TypeError(errIllegalConstructor);
196 } 204 }
197 205
198 if (stream[readableStreamController] !== undefined) { 206 if (stream[readableStreamController] !== undefined) {
199 throw new TypeError(errIllegalConstructor); 207 throw new TypeError(errIllegalConstructor);
200 } 208 }
201 209
202 this[readableStreamControllerControlledReadableStream] = stream; 210 this[readableStreamDefaultControllerControlledReadableStream] = stream;
211
212 this[readableStreamDefaultControllerUnderlyingSource] = underlyingSource;
213
214 this[readableStreamDefaultControllerQueue] = new v8.InternalPackedArray();
215 this[readableStreamDefaultControllerQueueSize] = 0;
216
217 this[readableStreamDefaultControllerBits] = 0b0;
218 if (isExternallyControlled === true) {
219 this[readableStreamDefaultControllerBits] |= EXTERNALLY_CONTROLLED;
220 }
221
222 const normalizedStrategy =
223 ValidateAndNormalizeQueuingStrategy(size, highWaterMark);
224 this[readableStreamDefaultControllerStrategySize] = normalizedStrategy.siz e;
225 this[readableStreamDefaultControllerStrategyHWM] = normalizedStrategy.high WaterMark;
226
227 const controller = this;
228
229 const startResult = CallOrNoop(
230 underlyingSource, 'start', this, 'underlyingSource.start');
231 thenPromise(Promise_resolve(startResult),
232 () => {
233 controller[readableStreamDefaultControllerBits] |= STARTED;
234 ReadableStreamDefaultControllerCallPullIfNeeded(controller);
235 },
236 r => {
237 if (stream[readableStreamState] === STATE_READABLE) {
238 return ReadableStreamDefaultControllerError(controller, r);
domenic 2016/04/27 21:12:24 Nit: no need for return here
tyoshino (SeeGerritForStatus) 2016/04/28 14:51:11 Done.
239 }
240 });
203 } 241 }
204 242
205 get desiredSize() { 243 get desiredSize() {
206 if (IsReadableStreamController(this) === false) { 244 if (IsReadableStreamDefaultController(this) === false) {
207 throw new TypeError(errIllegalInvocation); 245 throw new TypeError(errIllegalInvocation);
208 } 246 }
209 247
210 return GetReadableStreamDesiredSize( 248 return ReadableStreamDefaultControllerGetDesiredSize(this);
211 this[readableStreamControllerControlledReadableStream]);
212 } 249 }
213 250
214 close() { 251 close() {
215 if (IsReadableStreamController(this) === false) { 252 if (IsReadableStreamDefaultController(this) === false) {
216 throw new TypeError(errIllegalInvocation); 253 throw new TypeError(errIllegalInvocation);
217 } 254 }
218 255
219 const stream = this[readableStreamControllerControlledReadableStream]; 256 const stream = this[readableStreamDefaultControllerControlledReadableStrea m];
220 257
221 if (stream[readableStreamBits] & CLOSE_REQUESTED) { 258 if (this[readableStreamDefaultControllerBits] & CLOSE_REQUESTED) {
222 throw new TypeError(errCloseCloseRequestedStream); 259 throw new TypeError(errCloseCloseRequestedStream);
223 } 260 }
224 if (stream[readableStreamState] === STATE_ERRORED) { 261
262 const state = stream[readableStreamState];
263 if (state === STATE_ERRORED) {
225 throw new TypeError(errCloseErroredStream); 264 throw new TypeError(errCloseErroredStream);
226 } 265 }
266 if (state === STATE_CLOSED) {
267 throw new TypeError(errCloseClosedStream);
268 }
227 269
228 return CloseReadableStream(stream); 270 return ReadableStreamDefaultControllerClose(this);
229 } 271 }
230 272
231 enqueue(chunk) { 273 enqueue(chunk) {
232 if (IsReadableStreamController(this) === false) { 274 if (IsReadableStreamDefaultController(this) === false) {
233 throw new TypeError(errIllegalInvocation); 275 throw new TypeError(errIllegalInvocation);
234 } 276 }
235 277
236 const stream = this[readableStreamControllerControlledReadableStream]; 278 const stream = this[readableStreamDefaultControllerControlledReadableStrea m];
237 279
238 if (stream[readableStreamState] === STATE_ERRORED) { 280 if (this[readableStreamDefaultControllerBits] & CLOSE_REQUESTED) {
239 throw stream[readableStreamStoredError]; 281 throw new TypeError(errEnqueueCloseRequestedStream);
240 } 282 }
241 283
242 if (stream[readableStreamBits] & CLOSE_REQUESTED) { 284 const state = stream[readableStreamState];
243 throw new TypeError(errEnqueueInCloseRequestedStream); 285 if (state === STATE_ERRORED) {
286 throw new TypeError(errEnqueueErroredStream);
287 }
288 if (state === STATE_CLOSED) {
289 throw new TypeError(errEnqueueClosedStream);
244 } 290 }
245 291
246 return EnqueueInReadableStream(stream, chunk); 292 return ReadableStreamDefaultControllerEnqueue(this, chunk);
247 } 293 }
248 294
249 error(e) { 295 error(e) {
250 if (IsReadableStreamController(this) === false) { 296 if (IsReadableStreamDefaultController(this) === false) {
251 throw new TypeError(errIllegalInvocation); 297 throw new TypeError(errIllegalInvocation);
252 } 298 }
253 299
254 const stream = this[readableStreamControllerControlledReadableStream]; 300 const stream = this[readableStreamDefaultControllerControlledReadableStrea m];
255 301
256 const state = stream[readableStreamState]; 302 const state = stream[readableStreamState];
257 if (state !== STATE_READABLE) { 303 if (state === STATE_ERRORED) {
258 if (state === STATE_ERRORED) { 304 throw new TypeError(errErrorErroredStream);
259 throw new TypeError(errErrorErroredStream); 305 }
260 } 306 if (state === STATE_CLOSED) {
261 if (state === STATE_CLOSED) { 307 throw new TypeError(errErrorClosedStream);
262 throw new TypeError(errErrorClosedStream);
263 }
264 } 308 }
265 309
266 return ErrorReadableStream(stream, e); 310 return ReadableStreamDefaultControllerError(this, e);
267 } 311 }
268 } 312 }
269 313
270 class ReadableStreamReader { 314 function ReadableStreamDefaultControllerCancel(controller, reason) {
315 controller[readableStreamDefaultControllerQueue] = new v8.InternalPackedArra y();
316
317 const underlyingSource = controller[readableStreamDefaultControllerUnderlyin gSource];
318 return PromiseCallOrNoop(underlyingSource, 'cancel', reason, 'underlyingSour ce.cancel');
319 }
320
321 function ReadableStreamDefaultControllerPull(controller) {
322 const stream = controller[readableStreamDefaultControllerControlledReadableS tream];
323
324 if (controller[readableStreamDefaultControllerQueue].length > 0) {
325 const chunk = DequeueValue(controller);
326
327 if (controller[readableStreamDefaultControllerBits] & CLOSE_REQUESTED && c ontroller[readableStreamDefaultControllerQueue].length === 0) {
328 ReadableStreamClose(stream);
329 } else {
330 ReadableStreamDefaultControllerCallPullIfNeeded(controller);
331 }
332
333 return Promise_resolve(CreateIterResultObject(chunk, false));
334 }
335
336 const pendingPromise = ReadableStreamAddReadRequest(stream);
337 ReadableStreamDefaultControllerCallPullIfNeeded(controller);
338 return pendingPromise;
339 }
340
341 function ReadableStreamAddReadRequest(stream) {
342 const promise = v8.createPromise();
343 stream[readableStreamReader][readableStreamDefaultReaderReadRequests].push(p romise);
344 return promise;
345 }
346
347 class ReadableStreamDefaultReader {
271 constructor(stream) { 348 constructor(stream) {
272 if (IsReadableStream(stream) === false) { 349 if (IsReadableStream(stream) === false) {
273 throw new TypeError(errReaderConstructorBadArgument); 350 throw new TypeError(errReaderConstructorBadArgument);
274 } 351 }
275 if (IsReadableStreamLocked(stream) === true) { 352 if (IsReadableStreamLocked(stream) === true) {
276 throw new TypeError(errReaderConstructorStreamAlreadyLocked); 353 throw new TypeError(errReaderConstructorStreamAlreadyLocked);
277 } 354 }
278 355
279 // TODO(yhirano): Remove this when we don't need hasPendingActivity in 356 ReadableStreamReaderGenericInitialize(this, stream);
280 // blink::UnderlyingSourceBase.
281 if (stream[readableStreamController] === null) {
282 // The stream is created with an external controller (i.e. made in
283 // Blink).
284 const underlyingSource = stream[readableStreamUnderlyingSource];
285 callFunction(underlyingSource.notifyLockAcquired, underlyingSource);
286 }
287 357
288 this[readableStreamReaderOwnerReadableStream] = stream; 358 this[readableStreamDefaultReaderReadRequests] = new v8.InternalPackedArray ();
289 stream[readableStreamReader] = this;
290
291 this[readableStreamReaderReadRequests] = new v8.InternalPackedArray();
292
293 switch (stream[readableStreamState]) {
294 case STATE_READABLE:
295 this[readableStreamReaderClosedPromise] = v8.createPromise();
296 break;
297 case STATE_CLOSED:
298 this[readableStreamReaderClosedPromise] = Promise_resolve(undefined);
299 break;
300 case STATE_ERRORED:
301 this[readableStreamReaderClosedPromise] =
302 Promise_reject(stream[readableStreamStoredError]);
303 break;
304 }
305 } 359 }
306 360
307 get closed() { 361 get closed() {
308 if (IsReadableStreamReader(this) === false) { 362 if (IsReadableStreamDefaultReader(this) === false) {
309 return Promise_reject(new TypeError(errIllegalInvocation)); 363 return Promise_reject(new TypeError(errIllegalInvocation));
310 } 364 }
311 365
312 return this[readableStreamReaderClosedPromise]; 366 return this[readableStreamReaderClosedPromise];
313 } 367 }
314 368
315 cancel(reason) { 369 cancel(reason) {
316 if (IsReadableStreamReader(this) === false) { 370 if (IsReadableStreamDefaultReader(this) === false) {
317 return Promise_reject(new TypeError(errIllegalInvocation)); 371 return Promise_reject(new TypeError(errIllegalInvocation));
318 } 372 }
319 373
320 const stream = this[readableStreamReaderOwnerReadableStream]; 374 const stream = this[readableStreamReaderOwnerReadableStream];
321 if (stream === undefined) { 375 if (stream === undefined) {
322 return Promise_reject(new TypeError(errCancelReleasedReader)); 376 return Promise_reject(new TypeError(errCancelReleasedReader));
323 } 377 }
324 378
325 return CancelReadableStream(stream, reason); 379 return ReadableStreamReaderGenericCancel(this, reason);
326 } 380 }
327 381
328 read() { 382 read() {
329 if (IsReadableStreamReader(this) === false) { 383 if (IsReadableStreamDefaultReader(this) === false) {
330 return Promise_reject(new TypeError(errIllegalInvocation)); 384 return Promise_reject(new TypeError(errIllegalInvocation));
331 } 385 }
332 386
333 if (this[readableStreamReaderOwnerReadableStream] === undefined) { 387 if (this[readableStreamReaderOwnerReadableStream] === undefined) {
334 return Promise_reject(new TypeError(errReadReleasedReader)); 388 return Promise_reject(new TypeError(errReadReleasedReader));
335 } 389 }
336 390
337 return ReadFromReadableStreamReader(this); 391 return ReadableStreamDefaultReaderRead(this);
338 } 392 }
339 393
340 releaseLock() { 394 releaseLock() {
341 if (IsReadableStreamReader(this) === false) { 395 if (IsReadableStreamDefaultReader(this) === false) {
342 throw new TypeError(errIllegalInvocation); 396 throw new TypeError(errIllegalInvocation);
343 } 397 }
344 398
345 const stream = this[readableStreamReaderOwnerReadableStream]; 399 const stream = this[readableStreamReaderOwnerReadableStream];
346 if (stream === undefined) { 400 if (stream === undefined) {
347 return undefined; 401 return undefined;
348 } 402 }
349 403
350 if (this[readableStreamReaderReadRequests].length > 0) { 404 if (this[readableStreamDefaultReaderReadRequests].length > 0) {
351 throw new TypeError(errReleaseReaderWithPendingRead); 405 throw new TypeError(errReleaseReaderWithPendingRead);
352 } 406 }
353 407
354 // TODO(yhirano): Remove this when we don't need hasPendingActivity in 408 ReadableStreamReaderGenericRelease(this);
355 // blink::UnderlyingSourceBase. 409 }
356 if (stream[readableStreamController] === null) { 410 }
357 // The stream is created with an external controller (i.e. made in
358 // Blink).
359 const underlyingSource = stream[readableStreamUnderlyingSource];
360 callFunction(underlyingSource.notifyLockReleased, underlyingSource);
361 }
362 411
363 if (stream[readableStreamState] === STATE_READABLE) { 412 function ReadableStreamReaderGenericCancel(reader, reason) {
364 v8.rejectPromise(this[readableStreamReaderClosedPromise], 413 return ReadableStreamCancel(reader[readableStreamReaderOwnerReadableStream], reason);
365 new TypeError(errReleasedReaderClosedPromise));
366 } else {
367 this[readableStreamReaderClosedPromise] =
368 Promise_reject(new TypeError(errReleasedReaderClosedPromise));
369 }
370
371 this[readableStreamReaderOwnerReadableStream][readableStreamReader] =
372 undefined;
373 this[readableStreamReaderOwnerReadableStream] = undefined;
374 }
375 } 414 }
376 415
377 // 416 //
378 // Readable stream abstract operations 417 // Readable stream abstract operations
379 // 418 //
380 419
381 function AcquireReadableStreamReader(stream) { 420 function AcquireReadableStreamDefaultReader(stream) {
382 return new ReadableStreamReader(stream); 421 return new ReadableStreamDefaultReader(stream);
383 } 422 }
384 423
385 function CancelReadableStream(stream, reason) { 424 function ReadableStreamCancel(stream, reason) {
386 stream[readableStreamBits] |= DISTURBED; 425 stream[readableStreamBits] |= DISTURBED;
387 426
388 const state = stream[readableStreamState]; 427 const state = stream[readableStreamState];
389 if (state === STATE_CLOSED) { 428 if (state === STATE_CLOSED) {
390 return Promise_resolve(undefined); 429 return Promise_resolve(undefined);
391 } 430 }
392 if (state === STATE_ERRORED) { 431 if (state === STATE_ERRORED) {
393 return Promise_reject(stream[readableStreamStoredError]); 432 return Promise_reject(stream[readableStreamStoredError]);
394 } 433 }
395 434
396 stream[readableStreamQueue] = new v8.InternalPackedArray(); 435 ReadableStreamClose(stream);
397 FinishClosingReadableStream(stream);
398 436
399 const underlyingSource = stream[readableStreamUnderlyingSource]; 437 const sourceCancelPromise = ReadableStreamDefaultControllerCancel(stream[rea dableStreamController], reason);
400 const sourceCancelPromise = PromiseCallOrNoop(
401 underlyingSource, 'cancel', reason, 'underlyingSource.cancel');
402 return thenPromise(sourceCancelPromise, () => undefined); 438 return thenPromise(sourceCancelPromise, () => undefined);
403 } 439 }
404 440
405 function CloseReadableStream(stream) { 441 function ReadableStreamDefaultControllerClose(controller) {
406 if (stream[readableStreamState] === STATE_CLOSED) { 442 const stream = controller[readableStreamDefaultControllerControlledReadableS tream];
407 return undefined;
408 }
409 443
410 stream[readableStreamBits] |= CLOSE_REQUESTED; 444 controller[readableStreamDefaultControllerBits] |= CLOSE_REQUESTED;
411 445
412 if (stream[readableStreamQueue].length === 0) { 446 if (controller[readableStreamDefaultControllerQueue].length === 0) {
413 return FinishClosingReadableStream(stream); 447 ReadableStreamClose(stream);
414 } 448 }
415 } 449 }
416 450
417 function EnqueueInReadableStream(stream, chunk) { 451 function ReadableStreamFulfillReadRequest(stream, chunk, done) {
418 if (stream[readableStreamState] === STATE_CLOSED) { 452 const reader = stream[readableStreamReader];
419 return undefined;
420 }
421 453
422 if (IsReadableStreamLocked(stream) === true && 454 const readRequest =
423 stream[readableStreamReader][readableStreamReaderReadRequests].length > 455 stream[readableStreamReader][readableStreamDefaultReaderReadRequests]
424 0) { 456 .shift();
425 const readRequest = 457 v8.resolvePromise(readRequest, CreateIterResultObject(chunk, done));
426 stream[readableStreamReader][readableStreamReaderReadRequests] 458 }
427 .shift(); 459
428 v8.resolvePromise(readRequest, CreateIterResultObject(chunk, false)); 460 function ReadableStreamDefaultControllerEnqueue(controller, chunk) {
461 const stream = controller[readableStreamDefaultControllerControlledReadableS tream];
462
463 if (IsReadableStreamLocked(stream) === true && ReadableStreamGetNumReadReque sts(stream) > 0) {
464 ReadableStreamFulfillReadRequest(stream, chunk, false);
429 } else { 465 } else {
430 let chunkSize = 1; 466 let chunkSize = 1;
431 467
432 const strategySize = stream[readableStreamStrategySize]; 468 const strategySize = controller[readableStreamDefaultControllerStrategySiz e];
433 if (strategySize !== undefined) { 469 if (strategySize !== undefined) {
434 try { 470 try {
435 chunkSize = strategySize(chunk); 471 chunkSize = strategySize(chunk);
436 } catch (chunkSizeE) { 472 } catch (chunkSizeE) {
437 if (stream[readableStreamState] === STATE_READABLE) { 473 if (stream[readableStreamState] === STATE_READABLE) {
438 ErrorReadableStream(stream, chunkSizeE); 474 ReadableStreamDefaultControllerError(controller, chunkSizeE);
439 } 475 }
440 throw chunkSizeE; 476 throw chunkSizeE;
441 } 477 }
442 } 478 }
443 479
444 try { 480 try {
445 EnqueueValueWithSize(stream, chunk, chunkSize); 481 EnqueueValueWithSize(controller, chunk, chunkSize);
446 } catch (enqueueE) { 482 } catch (enqueueE) {
447 if (stream[readableStreamState] === STATE_READABLE) { 483 if (stream[readableStreamState] === STATE_READABLE) {
448 ErrorReadableStream(stream, enqueueE); 484 ReadableStreamDefaultControllerError(controller, enqueueE);
449 } 485 }
450 throw enqueueE; 486 throw enqueueE;
451 } 487 }
452 } 488 }
453 489
454 RequestReadableStreamPull(stream); 490 ReadableStreamDefaultControllerCallPullIfNeeded(controller);
455 } 491 }
456 492
457 function ErrorReadableStream(stream, e) { 493 function ReadableStreamDefaultControllerError(controller, e) {
458 stream[readableStreamQueue] = new v8.InternalPackedArray(); 494 controller[readableStreamDefaultControllerQueue] = new v8.InternalPackedArra y();
495 const stream = controller[readableStreamDefaultControllerControlledReadableS tream];
496 ReadableStreamError(stream, e);
497 }
498
499 function ReadableStreamError(stream, e) {
459 stream[readableStreamStoredError] = e; 500 stream[readableStreamStoredError] = e;
460 stream[readableStreamState] = STATE_ERRORED; 501 stream[readableStreamState] = STATE_ERRORED;
461 502
462 const reader = stream[readableStreamReader]; 503 const reader = stream[readableStreamReader];
463 if (reader === undefined) { 504 if (reader === undefined) {
464 return undefined; 505 return undefined;
465 } 506 }
466 507
467 const readRequests = reader[readableStreamReaderReadRequests]; 508 if (IsReadableStreamDefaultReader(reader) === true) {
468 for (let i = 0; i < readRequests.length; ++i) { 509 const readRequests = reader[readableStreamDefaultReaderReadRequests];
469 v8.rejectPromise(readRequests[i], e); 510 for (let i = 0; i < readRequests.length; ++i) {
511 v8.rejectPromise(readRequests[i], e);
512 }
513 reader[readableStreamDefaultReaderReadRequests] = new v8.InternalPackedArr ay();
514 } else {
515 const readIntoRequests = reader[readableStreamReaderReadIntoRequests];
516 for (let i = 0; i < readIntoRequests.length; ++i) {
517 v8.rejectPromise(readIntoRequests[i], e);
518 }
519 reader[readableStreamReaderReadIntoRequests] = new v8.InternalPackedArray( );
470 } 520 }
471 reader[readableStreamReaderReadRequests] = new v8.InternalPackedArray();
472 521
473 v8.rejectPromise(reader[readableStreamReaderClosedPromise], e); 522 v8.rejectPromise(reader[readableStreamReaderClosedPromise], e);
474 } 523 }
475 524
476 function FinishClosingReadableStream(stream) { 525 function ReadableStreamClose(stream) {
477 stream[readableStreamState] = STATE_CLOSED; 526 stream[readableStreamState] = STATE_CLOSED;
478 527
479 const reader = stream[readableStreamReader]; 528 const reader = stream[readableStreamReader];
480 if (reader === undefined) { 529 if (reader === undefined) {
481 return undefined; 530 return undefined;
482 } 531 }
483 532
484 533 if (IsReadableStreamDefaultReader(reader) === true) {
485 const readRequests = reader[readableStreamReaderReadRequests]; 534 const readRequests = reader[readableStreamDefaultReaderReadRequests];
486 for (let i = 0; i < readRequests.length; ++i) { 535 for (let i = 0; i < readRequests.length; ++i) {
487 v8.resolvePromise( 536 v8.resolvePromise(
488 readRequests[i], CreateIterResultObject(undefined, true)); 537 readRequests[i], CreateIterResultObject(undefined, true));
538 }
539 reader[readableStreamDefaultReaderReadRequests] = new v8.InternalPackedArr ay();
489 } 540 }
490 reader[readableStreamReaderReadRequests] = new v8.InternalPackedArray();
491 541
492 v8.resolvePromise(reader[readableStreamReaderClosedPromise], undefined); 542 v8.resolvePromise(reader[readableStreamReaderClosedPromise], undefined);
493 } 543 }
494 544
495 function GetReadableStreamDesiredSize(stream) { 545 function ReadableStreamDefaultControllerGetDesiredSize(controller) {
496 const queueSize = GetTotalQueueSize(stream); 546 const queueSize = GetTotalQueueSize(controller);
497 return stream[readableStreamStrategyHWM] - queueSize; 547 return controller[readableStreamDefaultControllerStrategyHWM] - queueSize;
498 } 548 }
499 549
500 function IsReadableStream(x) { 550 function IsReadableStream(x) {
501 return hasOwnProperty(x, readableStreamUnderlyingSource); 551 return hasOwnProperty(x, readableStreamController);
502 } 552 }
503 553
504 function IsReadableStreamDisturbed(stream) { 554 function IsReadableStreamDisturbed(stream) {
505 return stream[readableStreamBits] & DISTURBED; 555 return stream[readableStreamBits] & DISTURBED;
506 } 556 }
507 557
508 function SetReadableStreamDisturbed(stream) { 558 function SetReadableStreamDisturbed(stream) {
509 return stream[readableStreamBits] |= DISTURBED; 559 return stream[readableStreamBits] |= DISTURBED;
510 } 560 }
511 561
512 function IsReadableStreamLocked(stream) { 562 function IsReadableStreamLocked(stream) {
513 return stream[readableStreamReader] !== undefined; 563 return stream[readableStreamReader] !== undefined;
514 } 564 }
515 565
516 function IsReadableStreamController(x) { 566 function IsReadableStreamDefaultController(x) {
517 return hasOwnProperty(x, readableStreamControllerControlledReadableStream); 567 return hasOwnProperty(x, readableStreamDefaultControllerControlledReadableSt ream);
568 }
569
570 function IsReadableStreamDefaultReader(x) {
571 return hasOwnProperty(x, readableStreamDefaultReaderReadRequests);
518 } 572 }
519 573
520 function IsReadableStreamReadable(stream) { 574 function IsReadableStreamReadable(stream) {
521 return stream[readableStreamState] === STATE_READABLE; 575 return stream[readableStreamState] === STATE_READABLE;
522 } 576 }
523 577
524 function IsReadableStreamClosed(stream) { 578 function IsReadableStreamClosed(stream) {
525 return stream[readableStreamState] === STATE_CLOSED; 579 return stream[readableStreamState] === STATE_CLOSED;
526 } 580 }
527 581
528 function IsReadableStreamErrored(stream) { 582 function IsReadableStreamErrored(stream) {
529 return stream[readableStreamState] === STATE_ERRORED; 583 return stream[readableStreamState] === STATE_ERRORED;
530 } 584 }
531 585
532 function IsReadableStreamReader(x) { 586 function ReadableStreamReaderGenericInitialize(reader, stream) {
533 return hasOwnProperty(x, readableStreamReaderOwnerReadableStream); 587 // TODO(yhirano): Remove this when we don't need hasPendingActivity in
588 // blink::UnderlyingSourceBase.
589 const controller = stream[readableStreamController];
590 if (controller[readableStreamDefaultControllerBits] & EXTERNALLY_CONTROLLED) {
591 // The stream is created with an external controller (i.e. made in
592 // Blink).
593 const underlyingSource = controller[readableStreamDefaultControllerUnderly ingSource];
594 callFunction(underlyingSource.notifyLockAcquired, underlyingSource);
595 }
596
597 reader[readableStreamReaderOwnerReadableStream] = stream;
598 stream[readableStreamReader] = reader;
599
600 switch (stream[readableStreamState]) {
601 case STATE_READABLE:
602 reader[readableStreamReaderClosedPromise] = v8.createPromise();
603 break;
604 case STATE_CLOSED:
605 reader[readableStreamReaderClosedPromise] = Promise_resolve(undefined);
606 break;
607 case STATE_ERRORED:
608 reader[readableStreamReaderClosedPromise] =
609 Promise_reject(stream[readableStreamStoredError]);
610 break;
611 }
534 } 612 }
535 613
536 function ReadFromReadableStreamReader(reader) { 614 function ReadableStreamReaderGenericRelease(reader) {
615 // TODO(yhirano): Remove this when we don't need hasPendingActivity in
616 // blink::UnderlyingSourceBase.
617 const controller = reader[readableStreamReaderOwnerReadableStream][readableS treamController];
618 if (controller[readableStreamDefaultControllerBits] & EXTERNALLY_CONTROLLED) {
619 // The stream is created with an external controller (i.e. made in
620 // Blink).
621 const underlyingSource = controller[readableStreamDefaultControllerUnderly ingSource];
622 callFunction(underlyingSource.notifyLockReleased, underlyingSource);
623 }
624
625 if (reader[readableStreamReaderOwnerReadableStream][readableStreamState] === STATE_READABLE) {
626 v8.rejectPromise(reader[readableStreamReaderClosedPromise], new TypeError( errReleasedReaderClosedPromise));
627 } else {
628 reader[readableStreamReaderClosedPromise] = Promise_reject(new TypeError(e rrReleasedReaderClosedPromise));
629 }
630
631 reader[readableStreamReaderOwnerReadableStream][readableStreamReader] =
632 undefined;
633 reader[readableStreamReaderOwnerReadableStream] = undefined;
634 }
635
636 function ReadableStreamDefaultReaderRead(reader) {
537 const stream = reader[readableStreamReaderOwnerReadableStream]; 637 const stream = reader[readableStreamReaderOwnerReadableStream];
538 stream[readableStreamBits] |= DISTURBED; 638 stream[readableStreamBits] |= DISTURBED;
539 639
540 if (stream[readableStreamState] === STATE_CLOSED) { 640 if (stream[readableStreamState] === STATE_CLOSED) {
541 return Promise_resolve(CreateIterResultObject(undefined, true)); 641 return Promise_resolve(CreateIterResultObject(undefined, true));
542 } 642 }
543 643
544 if (stream[readableStreamState] === STATE_ERRORED) { 644 if (stream[readableStreamState] === STATE_ERRORED) {
545 return Promise_reject(stream[readableStreamStoredError]); 645 return Promise_reject(stream[readableStreamStoredError]);
546 } 646 }
547 647
548 const queue = stream[readableStreamQueue]; 648 return ReadableStreamDefaultControllerPull(stream[readableStreamController]) ;
549 if (queue.length > 0) {
550 const chunk = DequeueValue(stream);
551
552 if (stream[readableStreamBits] & CLOSE_REQUESTED && queue.length === 0) {
553 FinishClosingReadableStream(stream);
554 } else {
555 RequestReadableStreamPull(stream);
556 }
557
558 return Promise_resolve(CreateIterResultObject(chunk, false));
559 } else {
560 const readRequest = v8.createPromise();
561
562 reader[readableStreamReaderReadRequests].push(readRequest);
563 RequestReadableStreamPull(stream);
564 return readRequest;
565 }
566 } 649 }
567 650
568 function RequestReadableStreamPull(stream) { 651 function ReadableStreamDefaultControllerCallPullIfNeeded(controller) {
569 const shouldPull = ShouldReadableStreamPull(stream); 652 const shouldPull = ReadableStreamDefaultControllerShouldPull(controller);
570 if (shouldPull === false) { 653 if (shouldPull === false) {
571 return undefined; 654 return undefined;
572 } 655 }
573 656
574 if (stream[readableStreamBits] & PULLING) { 657 if (controller[readableStreamDefaultControllerBits] & PULLING) {
575 stream[readableStreamBits] |= PULL_AGAIN; 658 controller[readableStreamDefaultControllerBits] |= PULL_AGAIN;
576 return undefined; 659 return undefined;
577 } 660 }
578 661
579 stream[readableStreamBits] |= PULLING; 662 controller[readableStreamDefaultControllerBits] |= PULLING;
580 663
581 const underlyingSource = stream[readableStreamUnderlyingSource]; 664 const underlyingSource = controller[readableStreamDefaultControllerUnderlyin gSource];
582 const controller = stream[readableStreamController];
583 const pullPromise = PromiseCallOrNoop( 665 const pullPromise = PromiseCallOrNoop(
584 underlyingSource, 'pull', controller, 'underlyingSource.pull'); 666 underlyingSource, 'pull', controller, 'underlyingSource.pull');
585 667
586 thenPromise(pullPromise, 668 thenPromise(pullPromise,
587 () => { 669 () => {
588 stream[readableStreamBits] &= ~PULLING; 670 controller[readableStreamDefaultControllerBits] &= ~PULLING;
589 671
590 if (stream[readableStreamBits] & PULL_AGAIN) { 672 if (controller[readableStreamDefaultControllerBits] & PULL_AGAIN) {
591 stream[readableStreamBits] &= ~PULL_AGAIN; 673 controller[readableStreamDefaultControllerBits] &= ~PULL_AGAIN;
592 return RequestReadableStreamPull(stream); 674 return ReadableStreamDefaultControllerCallPullIfNeeded(controller);
593 } 675 }
594 }, 676 },
595 e => { 677 e => {
596 if (stream[readableStreamState] === STATE_READABLE) { 678 if (controller[readableStreamDefaultControllerControlledReadableStream ][readableStreamState] === STATE_READABLE) {
597 return ErrorReadableStream(stream, e); 679 return ReadableStreamDefaultControllerError(controller, e);
598 } 680 }
599 }); 681 });
600 } 682 }
601 683
602 function ShouldReadableStreamPull(stream) { 684 function ReadableStreamDefaultControllerShouldPull(controller) {
685 const stream = controller[readableStreamDefaultControllerControlledReadableS tream];
686
603 const state = stream[readableStreamState]; 687 const state = stream[readableStreamState];
604 if (state === STATE_CLOSED || state === STATE_ERRORED) { 688 if (state === STATE_CLOSED || state === STATE_ERRORED) {
605 return false; 689 return false;
606 } 690 }
607 691
608 if (stream[readableStreamBits] & CLOSE_REQUESTED) { 692 if (controller[readableStreamDefaultControllerBits] & CLOSE_REQUESTED) {
609 return false; 693 return false;
610 } 694 }
611 695
612 if (!(stream[readableStreamBits] & STARTED)) { 696 if (!(controller[readableStreamDefaultControllerBits] & STARTED)) {
613 return false; 697 return false;
614 } 698 }
615 699
616 if (IsReadableStreamLocked(stream) === true) { 700 if (IsReadableStreamLocked(stream) === true && ReadableStreamGetNumReadReque sts(stream) > 0) {
617 const reader = stream[readableStreamReader]; 701 return true;
618 const readRequests = reader[readableStreamReaderReadRequests];
619 if (readRequests.length > 0) {
620 return true;
621 }
622 } 702 }
623 703
624 const desiredSize = GetReadableStreamDesiredSize(stream); 704 const desiredSize = ReadableStreamDefaultControllerGetDesiredSize(controller );
625 if (desiredSize > 0) { 705 if (desiredSize > 0) {
626 return true; 706 return true;
627 } 707 }
628 708
629 return false; 709 return false;
630 } 710 }
631 711
712 function ReadableStreamGetNumReadRequests(stream) {
713 const reader = stream[readableStreamReader];
714 const readRequests = reader[readableStreamDefaultReaderReadRequests];
715 return readRequests.length;
716 }
717
632 // Potential future optimization: use class instances for the underlying 718 // Potential future optimization: use class instances for the underlying
633 // sources, so that we don't re-create 719 // sources, so that we don't re-create
634 // closures every time. 720 // closures every time.
635 721
636 // TODO(domenic): shouldClone argument from spec not supported yet 722 // TODO(domenic): shouldClone argument from spec not supported yet
637 function TeeReadableStream(stream) { 723 function ReadableStreamTee(stream) {
638 const reader = AcquireReadableStreamReader(stream); 724 const reader = AcquireReadableStreamDefaultReader(stream);
639 725
640 let closedOrErrored = false; 726 let closedOrErrored = false;
641 let canceled1 = false; 727 let canceled1 = false;
642 let canceled2 = false; 728 let canceled2 = false;
643 let reason1; 729 let reason1;
644 let reason2; 730 let reason2;
645 let promise = v8.createPromise(); 731 let promise = v8.createPromise();
646 732
647 const branch1 = new ReadableStream({pull, cancel: cancel1}); 733 const branch1Stream = new ReadableStream({pull, cancel: cancel1});
648 734
649 const branch2 = new ReadableStream({pull, cancel: cancel2}); 735 const branch2Stream = new ReadableStream({pull, cancel: cancel2});
736
737 const branch1 = branch1Stream[readableStreamController];
738 const branch2 = branch2Stream[readableStreamController];
650 739
651 thenPromise( 740 thenPromise(
652 reader[readableStreamReaderClosedPromise], undefined, function(r) { 741 reader[readableStreamReaderClosedPromise], undefined, function(r) {
653 if (closedOrErrored === true) { 742 if (closedOrErrored === true) {
654 return; 743 return;
655 } 744 }
656 745
657 ErrorReadableStream(branch1, r); 746 ReadableStreamDefaultControllerError(branch1, r);
658 ErrorReadableStream(branch2, r); 747 ReadableStreamDefaultControllerError(branch2, r);
659 closedOrErrored = true; 748 closedOrErrored = true;
660 }); 749 });
661 750
662 return [branch1, branch2]; 751 return [branch1Stream, branch2Stream];
663
664 752
665 function pull() { 753 function pull() {
666 return thenPromise( 754 return thenPromise(
667 ReadFromReadableStreamReader(reader), function(result) { 755 ReadableStreamDefaultReaderRead(reader), function(result) {
668 const value = result.value; 756 const value = result.value;
669 const done = result.done; 757 const done = result.done;
670 758
671 if (done === true && closedOrErrored === false) { 759 if (done === true && closedOrErrored === false) {
672 CloseReadableStream(branch1); 760 if (canceled1 === false) {
673 CloseReadableStream(branch2); 761 ReadableStreamDefaultControllerClose(branch1);
762 }
763 if (canceled2 === false) {
764 ReadableStreamDefaultControllerClose(branch2);
765 }
674 closedOrErrored = true; 766 closedOrErrored = true;
675 } 767 }
676 768
677 if (closedOrErrored === true) { 769 if (closedOrErrored === true) {
678 return; 770 return;
679 } 771 }
680 772
681 if (canceled1 === false) { 773 if (canceled1 === false) {
682 EnqueueInReadableStream(branch1, value); 774 ReadableStreamDefaultControllerEnqueue(branch1, value);
683 } 775 }
684 776
685 if (canceled2 === false) { 777 if (canceled2 === false) {
686 EnqueueInReadableStream(branch2, value); 778 ReadableStreamDefaultControllerEnqueue(branch2, value);
687 } 779 }
688 }); 780 });
689 } 781 }
690 782
691 function cancel1(reason) { 783 function cancel1(reason) {
692 canceled1 = true; 784 canceled1 = true;
693 reason1 = reason; 785 reason1 = reason;
694 786
695 if (canceled2 === true) { 787 if (canceled2 === true) {
696 const compositeReason = [reason1, reason2]; 788 const compositeReason = [reason1, reason2];
697 const cancelResult = CancelReadableStream(stream, compositeReason); 789 const cancelResult = ReadableStreamCancel(stream, compositeReason);
698 v8.resolvePromise(promise, cancelResult); 790 v8.resolvePromise(promise, cancelResult);
699 } 791 }
700 792
701 return promise; 793 return promise;
702 } 794 }
703 795
704 function cancel2(reason) { 796 function cancel2(reason) {
705 canceled2 = true; 797 canceled2 = true;
706 reason2 = reason; 798 reason2 = reason;
707 799
708 if (canceled1 === true) { 800 if (canceled1 === true) {
709 const compositeReason = [reason1, reason2]; 801 const compositeReason = [reason1, reason2];
710 const cancelResult = CancelReadableStream(stream, compositeReason); 802 const cancelResult = ReadableStreamCancel(stream, compositeReason);
711 v8.resolvePromise(promise, cancelResult); 803 v8.resolvePromise(promise, cancelResult);
712 } 804 }
713 805
714 return promise; 806 return promise;
715 } 807 }
716 } 808 }
717 809
718 // 810 //
719 // Queue-with-sizes 811 // Queue-with-sizes
720 // Modified from taking the queue (as in the spec) to taking the stream, so we 812 // Modified from taking the queue (as in the spec) to taking the stream, so we
721 // can modify the queue size alongside. 813 // can modify the queue size alongside.
722 // 814 //
723 815
724 function DequeueValue(stream) { 816 function DequeueValue(controller) {
725 const result = stream[readableStreamQueue].shift(); 817 const result = controller[readableStreamDefaultControllerQueue].shift();
726 stream[readableStreamQueueSize] -= result.size; 818 controller[readableStreamDefaultControllerQueueSize] -= result.size;
727 return result.value; 819 return result.value;
728 } 820 }
729 821
730 function EnqueueValueWithSize(stream, value, size) { 822 function EnqueueValueWithSize(controller, value, size) {
731 size = Number(size); 823 size = Number(size);
732 if (Number_isNaN(size) || size === +Infinity || size < 0) { 824 if (Number_isNaN(size) || size === +Infinity || size < 0) {
733 throw new RangeError(errInvalidSize); 825 throw new RangeError(errInvalidSize);
734 } 826 }
735 827
736 stream[readableStreamQueueSize] += size; 828 controller[readableStreamDefaultControllerQueueSize] += size;
737 stream[readableStreamQueue].push({value, size}); 829 controller[readableStreamDefaultControllerQueue].push({value, size});
738 } 830 }
739 831
740 function GetTotalQueueSize(stream) { return stream[readableStreamQueueSize]; } 832 function GetTotalQueueSize(controller) { return controller[readableStreamDefau ltControllerQueueSize]; }
741 833
742 // 834 //
743 // Other helpers 835 // Other helpers
744 // 836 //
745 837
746 function ValidateAndNormalizeQueuingStrategy(size, highWaterMark) { 838 function ValidateAndNormalizeQueuingStrategy(size, highWaterMark) {
747 if (size !== undefined && typeof size !== 'function') { 839 if (size !== undefined && typeof size !== 'function') {
748 throw new TypeError(errSizeNotAFunction); 840 throw new TypeError(errSizeNotAFunction);
749 } 841 }
750 842
(...skipping 29 matching lines...) Expand all
780 method = O[P]; 872 method = O[P];
781 } catch (methodE) { 873 } catch (methodE) {
782 return Promise_reject(methodE); 874 return Promise_reject(methodE);
783 } 875 }
784 876
785 if (method === undefined) { 877 if (method === undefined) {
786 return Promise_resolve(undefined); 878 return Promise_resolve(undefined);
787 } 879 }
788 880
789 if (typeof method !== 'function') { 881 if (typeof method !== 'function') {
790 return Promise_reject(errTmplMustBeFunctionOrUndefined(nameForError)); 882 return Promise_reject(new TypeError(errTmplMustBeFunctionOrUndefined(nameF orError)));
791 } 883 }
792 884
793 try { 885 try {
794 return Promise_resolve(callFunction(method, O, arg)); 886 return Promise_resolve(callFunction(method, O, arg));
795 } catch (e) { 887 } catch (e) {
796 return Promise_reject(e); 888 return Promise_reject(e);
797 } 889 }
798 } 890 }
799 891
800 function CreateIterResultObject(value, done) { return {value, done}; } 892 function CreateIterResultObject(value, done) { return {value, done}; }
801 893
802 894
803 // 895 //
804 // Additions to the global 896 // Additions to the global
805 // 897 //
806 898
807 defineProperty(global, 'ReadableStream', { 899 defineProperty(global, 'ReadableStream', {
808 value: ReadableStream, 900 value: ReadableStream,
809 enumerable: false, 901 enumerable: false,
810 configurable: true, 902 configurable: true,
811 writable: true 903 writable: true
812 }); 904 });
813 905
814 // 906 //
815 // Exports to Blink 907 // Exports to Blink
816 // 908 //
817 909
818 binding.AcquireReadableStreamReader = AcquireReadableStreamReader; 910 binding.AcquireReadableStreamDefaultReader = AcquireReadableStreamDefaultReade r;
819 binding.IsReadableStream = IsReadableStream; 911 binding.IsReadableStream = IsReadableStream;
820 binding.IsReadableStreamDisturbed = IsReadableStreamDisturbed; 912 binding.IsReadableStreamDisturbed = IsReadableStreamDisturbed;
821 binding.SetReadableStreamDisturbed = SetReadableStreamDisturbed; 913 binding.SetReadableStreamDisturbed = SetReadableStreamDisturbed;
822 binding.IsReadableStreamLocked = IsReadableStreamLocked; 914 binding.IsReadableStreamLocked = IsReadableStreamLocked;
823 binding.IsReadableStreamReadable = IsReadableStreamReadable; 915 binding.IsReadableStreamReadable = IsReadableStreamReadable;
824 binding.IsReadableStreamClosed = IsReadableStreamClosed; 916 binding.IsReadableStreamClosed = IsReadableStreamClosed;
825 binding.IsReadableStreamErrored = IsReadableStreamErrored; 917 binding.IsReadableStreamErrored = IsReadableStreamErrored;
826 binding.IsReadableStreamReader = IsReadableStreamReader; 918 binding.IsReadableStreamDefaultReader = IsReadableStreamDefaultReader;
827 binding.ReadFromReadableStreamReader = ReadFromReadableStreamReader; 919 binding.ReadableStreamDefaultReaderRead = ReadableStreamDefaultReaderRead;
828 920
829 binding.CloseReadableStream = CloseReadableStream; 921 binding.ReadableStreamDefaultControllerClose = ReadableStreamDefaultController Close;
830 binding.GetReadableStreamDesiredSize = GetReadableStreamDesiredSize; 922 binding.ReadableStreamDefaultControllerGetDesiredSize = ReadableStreamDefaultC ontrollerGetDesiredSize;
831 binding.EnqueueInReadableStream = EnqueueInReadableStream; 923 binding.ReadableStreamDefaultControllerEnqueue = ReadableStreamDefaultControll erEnqueue;
832 binding.ErrorReadableStream = ErrorReadableStream; 924 binding.ReadableStreamDefaultControllerError = ReadableStreamDefaultController Error;
833 925
834 binding.createReadableStreamWithExternalController = 926 binding.createReadableStreamWithExternalController =
835 (underlyingSource, strategy) => { 927 (underlyingSource, strategy) => {
836 return new ReadableStream( 928 return new ReadableStream(
837 underlyingSource, strategy, createWithExternalControllerSentinel); 929 underlyingSource, strategy, createWithExternalControllerSentinel);
838 }; 930 };
839 }); 931 });
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698