| OLD | NEW | 
|---|
|  | (Empty) | 
| 1 // Copyright (c) 2012, the Dart project authors.  Please see the AUTHORS file |  | 
| 2 // for details. All rights reserved. Use of this source code is governed by a |  | 
| 3 // BSD-style license that can be found in the LICENSE file. |  | 
| 4 |  | 
| 5 part of dart.async; |  | 
| 6 |  | 
| 7 class _BroadcastStream<T> extends _ControllerStream<T> { |  | 
| 8   _BroadcastStream(_StreamControllerLifecycle<T> controller) |  | 
| 9       : super(controller); |  | 
| 10 |  | 
| 11   bool get isBroadcast => true; |  | 
| 12 } |  | 
| 13 |  | 
| 14 class _BroadcastSubscription<T> extends _ControllerSubscription<T> { |  | 
| 15   static const int _STATE_EVENT_ID = 1; |  | 
| 16   static const int _STATE_FIRING = 2; |  | 
| 17   static const int _STATE_REMOVE_AFTER_FIRING = 4; |  | 
| 18   // TODO(lrn): Use the _state field on _ControllerSubscription to |  | 
| 19   // also store this state. Requires that the subscription implementation |  | 
| 20   // does not assume that it's use of the state integer is the only use. |  | 
| 21   int _eventState = 0;  // Initialized to help dart2js type inference. |  | 
| 22 |  | 
| 23   _BroadcastSubscription<T> _next; |  | 
| 24   _BroadcastSubscription<T> _previous; |  | 
| 25 |  | 
| 26   _BroadcastSubscription(_StreamControllerLifecycle<T> controller, |  | 
| 27                          void onData(T data), |  | 
| 28                          Function onError, |  | 
| 29                          void onDone(), |  | 
| 30                          bool cancelOnError) |  | 
| 31       : super(controller, onData, onError, onDone, cancelOnError) { |  | 
| 32     _next = _previous = this; |  | 
| 33   } |  | 
| 34 |  | 
| 35   bool _expectsEvent(int eventId) => |  | 
| 36       (_eventState & _STATE_EVENT_ID) == eventId; |  | 
| 37 |  | 
| 38   void _toggleEventId() { |  | 
| 39     _eventState ^= _STATE_EVENT_ID; |  | 
| 40   } |  | 
| 41 |  | 
| 42   bool get _isFiring => (_eventState & _STATE_FIRING) != 0; |  | 
| 43 |  | 
| 44   void _setRemoveAfterFiring() { |  | 
| 45     assert(_isFiring); |  | 
| 46     _eventState |= _STATE_REMOVE_AFTER_FIRING; |  | 
| 47   } |  | 
| 48 |  | 
| 49   bool get _removeAfterFiring => |  | 
| 50       (_eventState & _STATE_REMOVE_AFTER_FIRING) != 0; |  | 
| 51 |  | 
| 52   // The controller._recordPause doesn't do anything for a broadcast controller, |  | 
| 53   // so we don't bother calling it. |  | 
| 54   void _onPause() { } |  | 
| 55 |  | 
| 56   // The controller._recordResume doesn't do anything for a broadcast |  | 
| 57   // controller, so we don't bother calling it. |  | 
| 58   void _onResume() { } |  | 
| 59 |  | 
| 60   // _onCancel is inherited. |  | 
| 61 } |  | 
| 62 |  | 
| 63 abstract class _BroadcastStreamController<T> |  | 
| 64     implements StreamController<T>, |  | 
| 65                _StreamControllerLifecycle<T>, |  | 
| 66                _EventSink<T>, |  | 
| 67                _EventDispatch<T> { |  | 
| 68   static const int _STATE_INITIAL = 0; |  | 
| 69   static const int _STATE_EVENT_ID = 1; |  | 
| 70   static const int _STATE_FIRING = 2; |  | 
| 71   static const int _STATE_CLOSED = 4; |  | 
| 72   static const int _STATE_ADDSTREAM = 8; |  | 
| 73 |  | 
| 74   ControllerCallback onListen; |  | 
| 75   ControllerCancelCallback onCancel; |  | 
| 76 |  | 
| 77   // State of the controller. |  | 
| 78   int _state; |  | 
| 79 |  | 
| 80   // Double-linked list of active listeners. |  | 
| 81   _BroadcastSubscription<T> _firstSubscription; |  | 
| 82   _BroadcastSubscription<T> _lastSubscription; |  | 
| 83 |  | 
| 84   // Extra state used during an [addStream] call. |  | 
| 85   _AddStreamState<T> _addStreamState; |  | 
| 86 |  | 
| 87   /** |  | 
| 88    * Future returned by [close] and [done]. |  | 
| 89    * |  | 
| 90    * The future is completed whenever the done event has been sent to all |  | 
| 91    * relevant listeners. |  | 
| 92    * The relevant listeners are the ones that were listening when [close] was |  | 
| 93    * called. When all of these have been canceled (sending the done event makes |  | 
| 94    * them cancel, but they can also be canceled before sending the event), |  | 
| 95    * this future completes. |  | 
| 96    * |  | 
| 97    * Any attempt to listen after calling [close] will throw, so there won't |  | 
| 98    * be any further listeners. |  | 
| 99    */ |  | 
| 100   _Future _doneFuture; |  | 
| 101 |  | 
| 102   _BroadcastStreamController(this.onListen, this.onCancel) |  | 
| 103       : _state = _STATE_INITIAL; |  | 
| 104 |  | 
| 105   ControllerCallback get onPause { |  | 
| 106     throw new UnsupportedError( |  | 
| 107         "Broadcast stream controllers do not support pause callbacks"); |  | 
| 108   } |  | 
| 109 |  | 
| 110   void set onPause(void onPauseHandler()) { |  | 
| 111     throw new UnsupportedError( |  | 
| 112         "Broadcast stream controllers do not support pause callbacks"); |  | 
| 113   } |  | 
| 114 |  | 
| 115   ControllerCallback get onResume { |  | 
| 116     throw new UnsupportedError( |  | 
| 117         "Broadcast stream controllers do not support pause callbacks"); |  | 
| 118   } |  | 
| 119 |  | 
| 120   void set onResume(void onResumeHandler())  { |  | 
| 121     throw new UnsupportedError( |  | 
| 122         "Broadcast stream controllers do not support pause callbacks"); |  | 
| 123   } |  | 
| 124 |  | 
| 125   // StreamController interface. |  | 
| 126 |  | 
| 127   Stream<T> get stream => new _BroadcastStream<T>(this); |  | 
| 128 |  | 
| 129   StreamSink<T> get sink => new _StreamSinkWrapper<T>(this); |  | 
| 130 |  | 
| 131   bool get isClosed => (_state & _STATE_CLOSED) != 0; |  | 
| 132 |  | 
| 133   /** |  | 
| 134    * A broadcast controller is never paused. |  | 
| 135    * |  | 
| 136    * Each receiving stream may be paused individually, and they handle their |  | 
| 137    * own buffering. |  | 
| 138    */ |  | 
| 139   bool get isPaused => false; |  | 
| 140 |  | 
| 141   /** Whether there are currently one or more subscribers. */ |  | 
| 142   bool get hasListener => !_isEmpty; |  | 
| 143 |  | 
| 144   /** |  | 
| 145    * Test whether the stream has exactly one listener. |  | 
| 146    * |  | 
| 147    * Assumes that the stream has a listener (not [_isEmpty]). |  | 
| 148    */ |  | 
| 149   bool get _hasOneListener { |  | 
| 150     assert(!_isEmpty); |  | 
| 151     return identical(_firstSubscription, _lastSubscription); |  | 
| 152   } |  | 
| 153 |  | 
| 154   /** Whether an event is being fired (sent to some, but not all, listeners). */ |  | 
| 155   bool get _isFiring => (_state & _STATE_FIRING) != 0; |  | 
| 156 |  | 
| 157   bool get _isAddingStream => (_state & _STATE_ADDSTREAM) != 0; |  | 
| 158 |  | 
| 159   bool get _mayAddEvent => (_state < _STATE_CLOSED); |  | 
| 160 |  | 
| 161   _Future _ensureDoneFuture() { |  | 
| 162     if (_doneFuture != null) return _doneFuture; |  | 
| 163     return _doneFuture = new _Future(); |  | 
| 164   } |  | 
| 165 |  | 
| 166   // Linked list helpers |  | 
| 167 |  | 
| 168   bool get _isEmpty => _firstSubscription == null; |  | 
| 169 |  | 
| 170   /** Adds subscription to linked list of active listeners. */ |  | 
| 171   void _addListener(_BroadcastSubscription<T> subscription) { |  | 
| 172     assert(identical(subscription._next, subscription)); |  | 
| 173     subscription._eventState = (_state & _STATE_EVENT_ID); |  | 
| 174     // Insert in linked list as last subscription. |  | 
| 175     _BroadcastSubscription<T> oldLast = _lastSubscription; |  | 
| 176     _lastSubscription = subscription; |  | 
| 177     subscription._next = null; |  | 
| 178     subscription._previous = oldLast; |  | 
| 179     if (oldLast == null) { |  | 
| 180       _firstSubscription = subscription; |  | 
| 181     } else { |  | 
| 182       oldLast._next = subscription; |  | 
| 183     } |  | 
| 184   } |  | 
| 185 |  | 
| 186   void _removeListener(_BroadcastSubscription<T> subscription) { |  | 
| 187     assert(identical(subscription._controller, this)); |  | 
| 188     assert(!identical(subscription._next, subscription)); |  | 
| 189     _BroadcastSubscription<T> previous = subscription._previous; |  | 
| 190     _BroadcastSubscription<T> next = subscription._next; |  | 
| 191     if (previous == null) { |  | 
| 192       // This was the first subscription. |  | 
| 193       _firstSubscription = next; |  | 
| 194     } else { |  | 
| 195       previous._next = next; |  | 
| 196     } |  | 
| 197     if (next == null) { |  | 
| 198       // This was the last subscription. |  | 
| 199       _lastSubscription = previous; |  | 
| 200     } else { |  | 
| 201       next._previous = previous; |  | 
| 202     } |  | 
| 203 |  | 
| 204     subscription._next = subscription._previous = subscription; |  | 
| 205   } |  | 
| 206 |  | 
| 207   // _StreamControllerLifecycle interface. |  | 
| 208 |  | 
| 209   StreamSubscription<T> _subscribe( |  | 
| 210       void onData(T data), |  | 
| 211       Function onError, |  | 
| 212       void onDone(), |  | 
| 213       bool cancelOnError) { |  | 
| 214     if (isClosed) { |  | 
| 215       if (onDone == null) onDone = _nullDoneHandler; |  | 
| 216       return new _DoneStreamSubscription<T>(onDone); |  | 
| 217     } |  | 
| 218     StreamSubscription<T> subscription = |  | 
| 219         new _BroadcastSubscription<T>(this, onData, onError, onDone, |  | 
| 220                                       cancelOnError); |  | 
| 221     _addListener(subscription); |  | 
| 222     if (identical(_firstSubscription, _lastSubscription)) { |  | 
| 223       // Only one listener, so it must be the first listener. |  | 
| 224       _runGuarded(onListen); |  | 
| 225     } |  | 
| 226     return subscription; |  | 
| 227   } |  | 
| 228 |  | 
| 229   Future _recordCancel(StreamSubscription<T> sub) { |  | 
| 230     _BroadcastSubscription<T> subscription = sub; |  | 
| 231     // If already removed by the stream, don't remove it again. |  | 
| 232     if (identical(subscription._next, subscription)) return null; |  | 
| 233     if (subscription._isFiring) { |  | 
| 234       subscription._setRemoveAfterFiring(); |  | 
| 235     } else { |  | 
| 236       _removeListener(subscription); |  | 
| 237       // If we are currently firing an event, the empty-check is performed at |  | 
| 238       // the end of the listener loop instead of here. |  | 
| 239       if (!_isFiring && _isEmpty) { |  | 
| 240         _callOnCancel(); |  | 
| 241       } |  | 
| 242     } |  | 
| 243     return null; |  | 
| 244   } |  | 
| 245 |  | 
| 246   void _recordPause(StreamSubscription<T> subscription) {} |  | 
| 247   void _recordResume(StreamSubscription<T> subscription) {} |  | 
| 248 |  | 
| 249   // EventSink interface. |  | 
| 250 |  | 
| 251   Error _addEventError() { |  | 
| 252     if (isClosed) { |  | 
| 253       return new StateError("Cannot add new events after calling close"); |  | 
| 254     } |  | 
| 255     assert(_isAddingStream); |  | 
| 256     return new StateError("Cannot add new events while doing an addStream"); |  | 
| 257   } |  | 
| 258 |  | 
| 259   void add(T data) { |  | 
| 260     if (!_mayAddEvent) throw _addEventError(); |  | 
| 261     _sendData(data); |  | 
| 262   } |  | 
| 263 |  | 
| 264   void addError(Object error, [StackTrace stackTrace]) { |  | 
| 265     error = _nonNullError(error); |  | 
| 266     if (!_mayAddEvent) throw _addEventError(); |  | 
| 267     AsyncError replacement = Zone.current.errorCallback(error, stackTrace); |  | 
| 268     if (replacement != null) { |  | 
| 269       error = _nonNullError(replacement.error); |  | 
| 270       stackTrace = replacement.stackTrace; |  | 
| 271     } |  | 
| 272     _sendError(error, stackTrace); |  | 
| 273   } |  | 
| 274 |  | 
| 275   Future close() { |  | 
| 276     if (isClosed) { |  | 
| 277       assert(_doneFuture != null); |  | 
| 278       return _doneFuture; |  | 
| 279     } |  | 
| 280     if (!_mayAddEvent) throw _addEventError(); |  | 
| 281     _state |= _STATE_CLOSED; |  | 
| 282     Future doneFuture = _ensureDoneFuture(); |  | 
| 283     _sendDone(); |  | 
| 284     return doneFuture; |  | 
| 285   } |  | 
| 286 |  | 
| 287   Future get done => _ensureDoneFuture(); |  | 
| 288 |  | 
| 289   Future addStream(Stream<T> stream, {bool cancelOnError: true}) { |  | 
| 290     if (!_mayAddEvent) throw _addEventError(); |  | 
| 291     _state |= _STATE_ADDSTREAM; |  | 
| 292     _addStreamState = new _AddStreamState(this, stream, cancelOnError); |  | 
| 293     return _addStreamState.addStreamFuture; |  | 
| 294   } |  | 
| 295 |  | 
| 296   // _EventSink interface, called from AddStreamState. |  | 
| 297   void _add(T data) { |  | 
| 298     _sendData(data); |  | 
| 299   } |  | 
| 300 |  | 
| 301   void _addError(Object error, StackTrace stackTrace) { |  | 
| 302     _sendError(error, stackTrace); |  | 
| 303   } |  | 
| 304 |  | 
| 305   void _close() { |  | 
| 306     assert(_isAddingStream); |  | 
| 307     _AddStreamState addState = _addStreamState; |  | 
| 308     _addStreamState = null; |  | 
| 309     _state &= ~_STATE_ADDSTREAM; |  | 
| 310     addState.complete(); |  | 
| 311   } |  | 
| 312 |  | 
| 313   // Event handling. |  | 
| 314   void _forEachListener( |  | 
| 315       void action(_BufferingStreamSubscription<T> subscription)) { |  | 
| 316     if (_isFiring) { |  | 
| 317       throw new StateError( |  | 
| 318           "Cannot fire new event. Controller is already firing an event"); |  | 
| 319     } |  | 
| 320     if (_isEmpty) return; |  | 
| 321 |  | 
| 322     // Get event id of this event. |  | 
| 323     int id = (_state & _STATE_EVENT_ID); |  | 
| 324     // Start firing (set the _STATE_FIRING bit). We don't do [onCancel] |  | 
| 325     // callbacks while firing, and we prevent reentrancy of this function. |  | 
| 326     // |  | 
| 327     // Set [_state]'s event id to the next event's id. |  | 
| 328     // Any listeners added while firing this event will expect the next event, |  | 
| 329     // not this one, and won't get notified. |  | 
| 330     _state ^= _STATE_EVENT_ID | _STATE_FIRING; |  | 
| 331     _BroadcastSubscription<T> subscription = _firstSubscription; |  | 
| 332     while (subscription != null) { |  | 
| 333       if (subscription._expectsEvent(id)) { |  | 
| 334         subscription._eventState |= _BroadcastSubscription._STATE_FIRING; |  | 
| 335         action(subscription); |  | 
| 336         subscription._toggleEventId(); |  | 
| 337         _BroadcastSubscription<T> next = subscription._next; |  | 
| 338         if (subscription._removeAfterFiring) { |  | 
| 339           _removeListener(subscription); |  | 
| 340         } |  | 
| 341         subscription._eventState &= ~_BroadcastSubscription._STATE_FIRING; |  | 
| 342         subscription = next; |  | 
| 343       } else { |  | 
| 344         subscription = subscription._next; |  | 
| 345       } |  | 
| 346     } |  | 
| 347     _state &= ~_STATE_FIRING; |  | 
| 348 |  | 
| 349     if (_isEmpty) { |  | 
| 350       _callOnCancel(); |  | 
| 351     } |  | 
| 352   } |  | 
| 353 |  | 
| 354   void _callOnCancel() { |  | 
| 355     assert(_isEmpty); |  | 
| 356     if (isClosed && _doneFuture._mayComplete) { |  | 
| 357       // When closed, _doneFuture is not null. |  | 
| 358       _doneFuture._asyncComplete(null); |  | 
| 359     } |  | 
| 360     _runGuarded(onCancel); |  | 
| 361   } |  | 
| 362 } |  | 
| 363 |  | 
| 364 class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T> |  | 
| 365                                      implements SynchronousStreamController<T> { |  | 
| 366   _SyncBroadcastStreamController(void onListen(), void onCancel()) |  | 
| 367       : super(onListen, onCancel); |  | 
| 368 |  | 
| 369   // EventDispatch interface. |  | 
| 370 |  | 
| 371   bool get _mayAddEvent => super._mayAddEvent && !_isFiring; |  | 
| 372 |  | 
| 373   _addEventError() { |  | 
| 374     if (_isFiring) { |  | 
| 375       return new StateError( |  | 
| 376           "Cannot fire new event. Controller is already firing an event"); |  | 
| 377     } |  | 
| 378     return super._addEventError(); |  | 
| 379   } |  | 
| 380 |  | 
| 381   void _sendData(T data) { |  | 
| 382     if (_isEmpty) return; |  | 
| 383     if (_hasOneListener) { |  | 
| 384       _state |= _BroadcastStreamController._STATE_FIRING; |  | 
| 385       _BroadcastSubscription<T> subscription = _firstSubscription; |  | 
| 386       subscription._add(data); |  | 
| 387       _state &= ~_BroadcastStreamController._STATE_FIRING; |  | 
| 388       if (_isEmpty) { |  | 
| 389         _callOnCancel(); |  | 
| 390       } |  | 
| 391       return; |  | 
| 392     } |  | 
| 393     _forEachListener((_BufferingStreamSubscription<T> subscription) { |  | 
| 394       subscription._add(data); |  | 
| 395     }); |  | 
| 396   } |  | 
| 397 |  | 
| 398   void _sendError(Object error, StackTrace stackTrace) { |  | 
| 399     if (_isEmpty) return; |  | 
| 400     _forEachListener((_BufferingStreamSubscription<T> subscription) { |  | 
| 401       subscription._addError(error, stackTrace); |  | 
| 402     }); |  | 
| 403   } |  | 
| 404 |  | 
| 405   void _sendDone() { |  | 
| 406     if (!_isEmpty) { |  | 
| 407       _forEachListener((_BufferingStreamSubscription<T> subscription) { |  | 
| 408         subscription._close(); |  | 
| 409       }); |  | 
| 410     } else { |  | 
| 411       assert(_doneFuture != null); |  | 
| 412       assert(_doneFuture._mayComplete); |  | 
| 413       _doneFuture._asyncComplete(null); |  | 
| 414     } |  | 
| 415   } |  | 
| 416 } |  | 
| 417 |  | 
| 418 class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T> { |  | 
| 419   _AsyncBroadcastStreamController(void onListen(), void onCancel()) |  | 
| 420       : super(onListen, onCancel); |  | 
| 421 |  | 
| 422   // EventDispatch interface. |  | 
| 423 |  | 
| 424   void _sendData(T data) { |  | 
| 425     for (_BroadcastSubscription<T> subscription = _firstSubscription; |  | 
| 426          subscription != null; |  | 
| 427          subscription = subscription._next) { |  | 
| 428       subscription._addPending(new _DelayedData<T>(data)); |  | 
| 429     } |  | 
| 430   } |  | 
| 431 |  | 
| 432   void _sendError(Object error, StackTrace stackTrace) { |  | 
| 433     for (_BroadcastSubscription<T> subscription = _firstSubscription; |  | 
| 434          subscription != null; |  | 
| 435          subscription = subscription._next) { |  | 
| 436       subscription._addPending(new _DelayedError(error, stackTrace)); |  | 
| 437     } |  | 
| 438   } |  | 
| 439 |  | 
| 440   void _sendDone() { |  | 
| 441     if (!_isEmpty) { |  | 
| 442       for (_BroadcastSubscription<T> subscription = _firstSubscription; |  | 
| 443            subscription != null; |  | 
| 444            subscription = subscription._next) { |  | 
| 445         subscription._addPending(const _DelayedDone()); |  | 
| 446       } |  | 
| 447     } else { |  | 
| 448       assert(_doneFuture != null); |  | 
| 449       assert(_doneFuture._mayComplete); |  | 
| 450       _doneFuture._asyncComplete(null); |  | 
| 451     } |  | 
| 452   } |  | 
| 453 } |  | 
| 454 |  | 
| 455 /** |  | 
| 456  * Stream controller that is used by [Stream.asBroadcastStream]. |  | 
| 457  * |  | 
| 458  * This stream controller allows incoming events while it is firing |  | 
| 459  * other events. This is handled by delaying the events until the |  | 
| 460  * current event is done firing, and then fire the pending events. |  | 
| 461  * |  | 
| 462  * This class extends [_SyncBroadcastStreamController]. Events of |  | 
| 463  * an "asBroadcastStream" stream are always initiated by events |  | 
| 464  * on another stream, and it is fine to forward them synchronously. |  | 
| 465  */ |  | 
| 466 class _AsBroadcastStreamController<T> |  | 
| 467     extends _SyncBroadcastStreamController<T> |  | 
| 468     implements _EventDispatch<T> { |  | 
| 469   _StreamImplEvents<T> _pending; |  | 
| 470 |  | 
| 471   _AsBroadcastStreamController(void onListen(), void onCancel()) |  | 
| 472       : super(onListen, onCancel); |  | 
| 473 |  | 
| 474   bool get _hasPending => _pending != null && ! _pending.isEmpty; |  | 
| 475 |  | 
| 476   void _addPendingEvent(_DelayedEvent event) { |  | 
| 477     if (_pending == null) { |  | 
| 478       _pending = new _StreamImplEvents<T>(); |  | 
| 479     } |  | 
| 480     _pending.add(event); |  | 
| 481   } |  | 
| 482 |  | 
| 483   void add(T data) { |  | 
| 484     if (!isClosed && _isFiring) { |  | 
| 485       _addPendingEvent(new _DelayedData<T>(data)); |  | 
| 486       return; |  | 
| 487     } |  | 
| 488     super.add(data); |  | 
| 489     while (_hasPending) { |  | 
| 490       _pending.handleNext(this); |  | 
| 491     } |  | 
| 492   } |  | 
| 493 |  | 
| 494   void addError(Object error, [StackTrace stackTrace]) { |  | 
| 495     if (!isClosed && _isFiring) { |  | 
| 496       _addPendingEvent(new _DelayedError(error, stackTrace)); |  | 
| 497       return; |  | 
| 498     } |  | 
| 499     if (!_mayAddEvent) throw _addEventError(); |  | 
| 500     _sendError(error, stackTrace); |  | 
| 501     while (_hasPending) { |  | 
| 502       _pending.handleNext(this); |  | 
| 503     } |  | 
| 504   } |  | 
| 505 |  | 
| 506   Future close() { |  | 
| 507     if (!isClosed && _isFiring) { |  | 
| 508       _addPendingEvent(const _DelayedDone()); |  | 
| 509       _state |= _BroadcastStreamController._STATE_CLOSED; |  | 
| 510       return super.done; |  | 
| 511     } |  | 
| 512     Future result = super.close(); |  | 
| 513     assert(!_hasPending); |  | 
| 514     return result; |  | 
| 515   } |  | 
| 516 |  | 
| 517   void _callOnCancel() { |  | 
| 518     if (_hasPending) { |  | 
| 519       _pending.clear(); |  | 
| 520       _pending = null; |  | 
| 521     } |  | 
| 522     super._callOnCancel(); |  | 
| 523   } |  | 
| 524 } |  | 
| 525 |  | 
| 526 // A subscription that never receives any events. |  | 
| 527 // It can simulate pauses, but otherwise does nothing. |  | 
| 528 class _DoneSubscription<T> implements StreamSubscription<T> { |  | 
| 529   int _pauseCount = 0; |  | 
| 530   void onData(void handleData(T data)) {} |  | 
| 531   void onError(Function handleError) {} |  | 
| 532   void onDone(void handleDone()) {} |  | 
| 533   void pause([Future resumeSignal]) { |  | 
| 534     if (resumeSignal != null) resumeSignal.then(_resume); |  | 
| 535     _pauseCount++; |  | 
| 536   } |  | 
| 537   void resume() { _resume(null); } |  | 
| 538   void _resume(_) { |  | 
| 539     if (_pauseCount > 0) _pauseCount--; |  | 
| 540   } |  | 
| 541   Future cancel() { return new _Future.immediate(null); } |  | 
| 542   bool get isPaused => _pauseCount > 0; |  | 
| 543   Future/*<E>*/ asFuture/*<E>*/([Object/*=E*/ value]) => new _Future/*<E>*/(); |  | 
| 544 } |  | 
| OLD | NEW | 
|---|