Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1207)

Side by Side Diff: sdk/lib/async/broadcast_stream_controller.dart

Issue 25094002: Adapt streams for additional stackTrace argument. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Remove types in closures. Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/_internal/pub/lib/src/utils.dart ('k') | sdk/lib/async/stream.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 class _BroadcastStream<T> extends _ControllerStream<T> { 7 class _BroadcastStream<T> extends _ControllerStream<T> {
8 _BroadcastStream(_StreamControllerLifecycle controller) : super(controller); 8 _BroadcastStream(_StreamControllerLifecycle controller) : super(controller);
9 9
10 bool get isBroadcast => true; 10 bool get isBroadcast => true;
(...skipping 12 matching lines...) Expand all
23 // TODO(lrn): Use the _state field on _ControllerSubscription to 23 // TODO(lrn): Use the _state field on _ControllerSubscription to
24 // also store this state. Requires that the subscription implementation 24 // also store this state. Requires that the subscription implementation
25 // does not assume that it's use of the state integer is the only use. 25 // does not assume that it's use of the state integer is the only use.
26 int _eventState; 26 int _eventState;
27 27
28 _BroadcastSubscriptionLink _next; 28 _BroadcastSubscriptionLink _next;
29 _BroadcastSubscriptionLink _previous; 29 _BroadcastSubscriptionLink _previous;
30 30
31 _BroadcastSubscription(_StreamControllerLifecycle controller, 31 _BroadcastSubscription(_StreamControllerLifecycle controller,
32 void onData(T data), 32 void onData(T data),
33 void onError(Object error), 33 Function onError,
34 void onDone(), 34 void onDone(),
35 bool cancelOnError) 35 bool cancelOnError)
36 : super(controller, onData, onError, onDone, cancelOnError) { 36 : super(controller, onData, onError, onDone, cancelOnError) {
37 _next = _previous = this; 37 _next = _previous = this;
38 } 38 }
39 39
40 _BroadcastStreamController get _controller => super._controller; 40 _BroadcastStreamController get _controller => super._controller;
41 41
42 bool _expectsEvent(int eventId) => 42 bool _expectsEvent(int eventId) =>
43 (_eventState & _STATE_EVENT_ID) == eventId; 43 (_eventState & _STATE_EVENT_ID) == eventId;
(...skipping 122 matching lines...) Expand 10 before | Expand all | Expand 10 after
166 _BroadcastSubscriptionLink previous = subscription._previous; 166 _BroadcastSubscriptionLink previous = subscription._previous;
167 _BroadcastSubscriptionLink next = subscription._next; 167 _BroadcastSubscriptionLink next = subscription._next;
168 previous._next = next; 168 previous._next = next;
169 next._previous = previous; 169 next._previous = previous;
170 subscription._next = subscription._previous = subscription; 170 subscription._next = subscription._previous = subscription;
171 } 171 }
172 172
173 // _StreamControllerLifecycle interface. 173 // _StreamControllerLifecycle interface.
174 174
175 StreamSubscription<T> _subscribe(void onData(T data), 175 StreamSubscription<T> _subscribe(void onData(T data),
176 void onError(Object error), 176 Function onError,
177 void onDone(), 177 void onDone(),
178 bool cancelOnError) { 178 bool cancelOnError) {
179 if (isClosed) { 179 if (isClosed) {
180 throw new StateError("Subscribing to closed stream"); 180 throw new StateError("Subscribing to closed stream");
181 } 181 }
182 StreamSubscription subscription = new _BroadcastSubscription<T>( 182 StreamSubscription subscription = new _BroadcastSubscription<T>(
183 this, onData, onError, onDone, cancelOnError); 183 this, onData, onError, onDone, cancelOnError);
184 _addListener(subscription); 184 _addListener(subscription);
185 if (identical(_next, _previous)) { 185 if (identical(_next, _previous)) {
186 // Only one listener, so it must be the first listener. 186 // Only one listener, so it must be the first listener.
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
220 } 220 }
221 221
222 void add(T data) { 222 void add(T data) {
223 if (!_mayAddEvent) throw _addEventError(); 223 if (!_mayAddEvent) throw _addEventError();
224 _sendData(data); 224 _sendData(data);
225 } 225 }
226 226
227 void addError(Object error, [Object stackTrace]) { 227 void addError(Object error, [Object stackTrace]) {
228 if (!_mayAddEvent) throw _addEventError(); 228 if (!_mayAddEvent) throw _addEventError();
229 if (stackTrace != null) _attachStackTrace(error, stackTrace); 229 if (stackTrace != null) _attachStackTrace(error, stackTrace);
230 _sendError(error); 230 _sendError(error, stackTrace);
231 } 231 }
232 232
233 Future close() { 233 Future close() {
234 if (isClosed) { 234 if (isClosed) {
235 assert(_doneFuture != null); 235 assert(_doneFuture != null);
236 return _doneFuture; 236 return _doneFuture;
237 } 237 }
238 if (!_mayAddEvent) throw _addEventError(); 238 if (!_mayAddEvent) throw _addEventError();
239 _state |= _STATE_CLOSED; 239 _state |= _STATE_CLOSED;
240 Future doneFuture = _ensureDoneFuture(); 240 Future doneFuture = _ensureDoneFuture();
241 _sendDone(); 241 _sendDone();
242 return doneFuture; 242 return doneFuture;
243 } 243 }
244 244
245 Future get done => _ensureDoneFuture(); 245 Future get done => _ensureDoneFuture();
246 246
247 Future addStream(Stream<T> stream) { 247 Future addStream(Stream<T> stream) {
248 if (!_mayAddEvent) throw _addEventError(); 248 if (!_mayAddEvent) throw _addEventError();
249 _state |= _STATE_ADDSTREAM; 249 _state |= _STATE_ADDSTREAM;
250 _addStreamState = new _AddStreamState(this, stream); 250 _addStreamState = new _AddStreamState(this, stream);
251 return _addStreamState.addStreamFuture; 251 return _addStreamState.addStreamFuture;
252 } 252 }
253 253
254 // _EventSink interface, called from AddStreamState. 254 // _EventSink interface, called from AddStreamState.
255 void _add(T data) { 255 void _add(T data) {
256 _sendData(data); 256 _sendData(data);
257 } 257 }
258 258
259 void _addError(Object error) { 259 void _addError(Object error, StackTrace stackTrace) {
260 assert(_isAddingStream); 260 assert(_isAddingStream);
261 _sendError(error); 261 _sendError(error, stackTrace);
262 } 262 }
263 263
264 void _close() { 264 void _close() {
265 assert(_isAddingStream); 265 assert(_isAddingStream);
266 _AddStreamState addState = _addStreamState; 266 _AddStreamState addState = _addStreamState;
267 _addStreamState = null; 267 _addStreamState = null;
268 _state &= ~_STATE_ADDSTREAM; 268 _state &= ~_STATE_ADDSTREAM;
269 addState.complete(); 269 addState.complete();
270 } 270 }
271 271
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after
326 326
327 // EventDispatch interface. 327 // EventDispatch interface.
328 328
329 void _sendData(T data) { 329 void _sendData(T data) {
330 if (_isEmpty) return; 330 if (_isEmpty) return;
331 _forEachListener((_BufferingStreamSubscription<T> subscription) { 331 _forEachListener((_BufferingStreamSubscription<T> subscription) {
332 subscription._add(data); 332 subscription._add(data);
333 }); 333 });
334 } 334 }
335 335
336 void _sendError(Object error) { 336 void _sendError(Object error, StackTrace stackTrace) {
337 if (_isEmpty) return; 337 if (_isEmpty) return;
338 _forEachListener((_BufferingStreamSubscription<T> subscription) { 338 _forEachListener((_BufferingStreamSubscription<T> subscription) {
339 subscription._addError(error); 339 subscription._addError(error, stackTrace);
340 }); 340 });
341 } 341 }
342 342
343 void _sendDone() { 343 void _sendDone() {
344 if (!_isEmpty) { 344 if (!_isEmpty) {
345 _forEachListener((_BroadcastSubscription<T> subscription) { 345 _forEachListener((_BroadcastSubscription<T> subscription) {
346 subscription._close(); 346 subscription._close();
347 }); 347 });
348 } else { 348 } else {
349 assert(_doneFuture != null); 349 assert(_doneFuture != null);
(...skipping 11 matching lines...) Expand all
361 361
362 void _sendData(T data) { 362 void _sendData(T data) {
363 for (_BroadcastSubscriptionLink link = _next; 363 for (_BroadcastSubscriptionLink link = _next;
364 !identical(link, this); 364 !identical(link, this);
365 link = link._next) { 365 link = link._next) {
366 _BroadcastSubscription<T> subscription = link; 366 _BroadcastSubscription<T> subscription = link;
367 subscription._addPending(new _DelayedData(data)); 367 subscription._addPending(new _DelayedData(data));
368 } 368 }
369 } 369 }
370 370
371 void _sendError(Object error) { 371 void _sendError(Object error, StackTrace stackTrace) {
372 for (_BroadcastSubscriptionLink link = _next; 372 for (_BroadcastSubscriptionLink link = _next;
373 !identical(link, this); 373 !identical(link, this);
374 link = link._next) { 374 link = link._next) {
375 _BroadcastSubscription<T> subscription = link; 375 _BroadcastSubscription<T> subscription = link;
376 subscription._addPending(new _DelayedError(error)); 376 subscription._addPending(new _DelayedError(error, stackTrace));
377 } 377 }
378 } 378 }
379 379
380 void _sendDone() { 380 void _sendDone() {
381 if (!_isEmpty) { 381 if (!_isEmpty) {
382 for (_BroadcastSubscriptionLink link = _next; 382 for (_BroadcastSubscriptionLink link = _next;
383 !identical(link, this); 383 !identical(link, this);
384 link = link._next) { 384 link = link._next) {
385 _BroadcastSubscription<T> subscription = link; 385 _BroadcastSubscription<T> subscription = link;
386 subscription._addPending(const _DelayedDone()); 386 subscription._addPending(const _DelayedDone());
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
427 return; 427 return;
428 } 428 }
429 super.add(data); 429 super.add(data);
430 while (_hasPending) { 430 while (_hasPending) {
431 _pending.handleNext(this); 431 _pending.handleNext(this);
432 } 432 }
433 } 433 }
434 434
435 void addError(Object error, [StackTrace stackTrace]) { 435 void addError(Object error, [StackTrace stackTrace]) {
436 if (!isClosed && _isFiring) { 436 if (!isClosed && _isFiring) {
437 _addPendingEvent(new _DelayedError(error)); 437 _addPendingEvent(new _DelayedError(error, stackTrace));
438 return; 438 return;
439 } 439 }
440 super.addError(error, stackTrace); 440 super.addError(error, stackTrace);
441 while (_hasPending) { 441 while (_hasPending) {
442 _pending.handleNext(this); 442 _pending.handleNext(this);
443 } 443 }
444 } 444 }
445 445
446 Future close() { 446 Future close() {
447 if (!isClosed && _isFiring) { 447 if (!isClosed && _isFiring) {
(...skipping 13 matching lines...) Expand all
461 } 461 }
462 super._callOnCancel(); 462 super._callOnCancel();
463 } 463 }
464 } 464 }
465 465
466 // A subscription that never receives any events. 466 // A subscription that never receives any events.
467 // It can simulate pauses, but otherwise does nothing. 467 // It can simulate pauses, but otherwise does nothing.
468 class _DoneSubscription<T> implements StreamSubscription<T> { 468 class _DoneSubscription<T> implements StreamSubscription<T> {
469 int _pauseCount = 0; 469 int _pauseCount = 0;
470 void onData(void handleData(T data)) {} 470 void onData(void handleData(T data)) {}
471 void onError(void handleErrr(Object error)) {} 471 void onError(Function handleError) {}
472 void onDone(void handleDone()) {} 472 void onDone(void handleDone()) {}
473 void pause([Future resumeSignal]) { 473 void pause([Future resumeSignal]) {
474 if (resumeSignal != null) resumeSignal.then(_resume); 474 if (resumeSignal != null) resumeSignal.then(_resume);
475 _pauseCount++; 475 _pauseCount++;
476 } 476 }
477 void resume() { _resume(null); } 477 void resume() { _resume(null); }
478 void _resume(_) { 478 void _resume(_) {
479 if (_pauseCount > 0) _pauseCount--; 479 if (_pauseCount > 0) _pauseCount--;
480 } 480 }
481 void cancel() {} 481 void cancel() {}
482 bool get isPaused => _pauseCount > 0; 482 bool get isPaused => _pauseCount > 0;
483 Future asFuture([Object value]) => new _Future(); 483 Future asFuture([Object value]) => new _Future();
484 } 484 }
OLDNEW
« no previous file with comments | « sdk/lib/_internal/pub/lib/src/utils.dart ('k') | sdk/lib/async/stream.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698