OLD | NEW |
---|---|
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // States shared by single/multi stream implementations. | 7 // States shared by single/multi stream implementations. |
8 | 8 |
9 /// Initial and default state where the stream can receive and send events. | 9 /// Initial and default state where the stream can receive and send events. |
10 const int _STREAM_OPEN = 0; | 10 const int _STREAM_OPEN = 0; |
(...skipping 403 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
414 * * [_hasSubscribers]: Test whether there are currently any subscribers. | 414 * * [_hasSubscribers]: Test whether there are currently any subscribers. |
415 * * [_isPaused]: Test whether the stream is currently paused. | 415 * * [_isPaused]: Test whether the stream is currently paused. |
416 * The user should not add new events while the stream is paused, but if it | 416 * The user should not add new events while the stream is paused, but if it |
417 * happens anyway, the stream will enqueue the events just as when new events | 417 * happens anyway, the stream will enqueue the events just as when new events |
418 * arrive while still firing an old event. | 418 * arrive while still firing an old event. |
419 */ | 419 */ |
420 class _SingleStreamImpl<T> extends _StreamImpl<T> { | 420 class _SingleStreamImpl<T> extends _StreamImpl<T> { |
421 _StreamListener _subscriber = null; | 421 _StreamListener _subscriber = null; |
422 | 422 |
423 Stream<T> asMultiSubscriberStream() { | 423 Stream<T> asMultiSubscriberStream() { |
424 return new _ForwardingMultiStream<T, T>().._source = this; | 424 return new _SingleStreamMultiplexer<T>(this); |
425 } | 425 } |
426 | 426 |
427 bool get isSingleSubscription => true; | |
428 | |
427 /** Whether one or more active subscribers have requested a pause. */ | 429 /** Whether one or more active subscribers have requested a pause. */ |
428 bool get _isPaused => !_hasSubscribers || super._isPaused; | 430 bool get _isPaused => !_hasSubscribers || super._isPaused; |
429 | 431 |
430 /** Whether there is currently a subscriber on this [Stream]. */ | 432 /** Whether there is currently a subscriber on this [Stream]. */ |
431 bool get _hasSubscribers => _subscriber != null; | 433 bool get _hasSubscribers => _subscriber != null; |
432 | 434 |
433 // ------------------------------------------------------------------- | 435 // ------------------------------------------------------------------- |
434 // Internal implementation. | 436 // Internal implementation. |
435 | 437 |
436 /** | 438 /** |
(...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
534 | 536 |
535 _MultiStreamImpl() { | 537 _MultiStreamImpl() { |
536 _nextLink = _previousLink = this; | 538 _nextLink = _previousLink = this; |
537 } | 539 } |
538 | 540 |
539 Stream<T> asMultiSubscriberStream() => this; | 541 Stream<T> asMultiSubscriberStream() => this; |
540 | 542 |
541 // ------------------------------------------------------------------ | 543 // ------------------------------------------------------------------ |
542 // Helper functions that can be overridden in subclasses. | 544 // Helper functions that can be overridden in subclasses. |
543 | 545 |
546 bool get isSingleSubscription => false; | |
floitsch
2013/01/14 16:09:09
I don't think anybody wants to override this metho
Lasse Reichstein Nielsen
2013/01/15 07:19:51
Done.
| |
547 | |
544 /** Whether there are currently any subscribers on this [Stream]. */ | 548 /** Whether there are currently any subscribers on this [Stream]. */ |
545 bool get _hasSubscribers => !_InternalLinkList.isEmpty(this); | 549 bool get _hasSubscribers => !_InternalLinkList.isEmpty(this); |
546 | 550 |
547 /** | 551 /** |
548 * Create the new subscription object. | 552 * Create the new subscription object. |
549 */ | 553 */ |
550 _StreamListener<T> _createSubscription( | 554 _StreamListener<T> _createSubscription( |
551 void onData(T data), | 555 void onData(T data), |
552 void onError(AsyncError error), | 556 void onError(AsyncError error), |
553 void onDone(), | 557 void onDone(), |
(...skipping 532 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1086 if (_isComplete) { | 1090 if (_isComplete) { |
1087 throw new StateError("Subscription has been canceled."); | 1091 throw new StateError("Subscription has been canceled."); |
1088 } | 1092 } |
1089 if (_timer != null) { | 1093 if (_timer != null) { |
1090 _timer.cancel(); | 1094 _timer.cancel(); |
1091 _timer = null; | 1095 _timer = null; |
1092 } | 1096 } |
1093 _pauseCount = 0; | 1097 _pauseCount = 0; |
1094 } | 1098 } |
1095 } | 1099 } |
1100 | |
1101 class _SingleStreamMultiplexer<T> extends _MultiStreamImpl<T> { | |
1102 final _SingleStreamImpl<T> _source; | |
1103 StreamSubscription<T> _subscription; | |
1104 | |
1105 _SingleStreamMultiplexer(this._source); | |
1106 | |
1107 void _onPauseStateChange() { | |
1108 if (_isPaused) { | |
1109 if (_subscription != null) { | |
1110 _subscription.pause(); | |
1111 } | |
1112 } else { | |
1113 if (_subscription != null) { | |
1114 _subscription.resume(); | |
1115 } | |
1116 } | |
1117 } | |
1118 | |
1119 /** | |
1120 * Subscribe or unsubscribe on [_source] depending on whether | |
1121 * [_stream] has subscribers. | |
1122 */ | |
1123 void _onSubscriptionStateChange() { | |
1124 if (_hasSubscribers) { | |
1125 assert(_subscription == null); | |
1126 _subscription = _source.listen(this._add, | |
1127 onError: this._signalError, | |
1128 onDone: this._close); | |
1129 } else { | |
1130 // TODO(lrn): Check why this can happen. | |
1131 if (_subscription == null) return; | |
1132 _subscription.cancel(); | |
1133 _subscription = null; | |
1134 } | |
1135 } | |
1136 } | |
OLD | NEW |