Index: sdk/lib/async/stream_impl.dart |
diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart |
index 85aa0833b6f0233cf7399ffb256e21aee5261169..a006059c442e404928652a42a820bdbbe8ddf6d5 100644 |
--- a/sdk/lib/async/stream_impl.dart |
+++ b/sdk/lib/async/stream_impl.dart |
@@ -696,13 +696,22 @@ class _BroadcastLinkedList { |
} |
} |
+typedef void _broadcastCallback(StreamSubscription subscription); |
+ |
class _AsBroadcastStream<T> extends Stream<T> { |
final Stream<T> _source; |
+ final _broadcastCallback _onListenHandler; |
+ final _broadcastCallback _onCancelHandler; |
+ final _Zone _zone; |
+ |
_AsBroadcastStreamController<T> _controller; |
StreamSubscription<T> _subscription; |
- _AsBroadcastStream(this._source) { |
- _controller = new _AsBroadcastStreamController<T>(null, _onCancel); |
+ _AsBroadcastStream(this._source, |
+ this._onListenHandler, |
+ this._onCancelHandler) |
+ : _zone = _Zone.current { |
+ _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel); |
} |
bool get isBroadcast => true; |
@@ -724,14 +733,103 @@ class _AsBroadcastStream<T> extends Stream<T> { |
} |
void _onCancel() { |
+ if (_onCancelHandler != null) { |
+ _zone.executeCallback(() { |
floitsch
2013/06/20 14:28:30
I'm not sure if these should be run in the zone: t
Lasse Reichstein Nielsen
2013/06/21 09:49:11
Zone has been removed from the callback.
Uses curr
|
+ _onCancelHandler(new _BroadcastSubscriptionWrapper(this)); |
+ }); |
+ } |
+ } |
+ |
+ void _onListen() { |
+ if (_onListenHandler != null) { |
+ _zone.executeCallback(() { |
floitsch
2013/06/20 14:28:30
ditto.
Lasse Reichstein Nielsen
2013/06/21 09:49:11
Done.
|
+ _onListenHandler(new _BroadcastSubscriptionWrapper(this)); |
+ }); |
+ } |
+ } |
+ |
+ // Methods called from _BroadcastSubscriptionWrapper. |
+ void _cancelSubscription() { |
+ if (_subscription == null) return; |
// Called by [_controller] when it has no subscribers left. |
StreamSubscription subscription = _subscription; |
_subscription = null; |
_controller = null; // Marks the stream as no longer listenable. |
subscription.cancel(); |
} |
+ |
+ void _pauseSubscription(Future resumeSignal) { |
+ if (_subscription == null) return; |
+ _subscription.pause(resumeSignal); |
+ } |
+ |
+ void _resumeSubscription() { |
+ if (_subscription == null) return; |
+ _subscription.resume(); |
+ } |
+ |
+ bool get _isSubscriptionPaused { |
+ if (_subscription == null) return false; |
+ return _subscription.isPaused; |
+ } |
+} |
+ |
+/** |
+ * Wrapper for subscription that disallows changing handlers. |
+ * |
+ * Then wrapper is only valid while a callback using it is running. After |
+ * that it is invalidated, so a user can't hand on to it and change the |
floitsch
2013/06/20 14:28:30
Description seems to be wrong. Pause and resume ke
Lasse Reichstein Nielsen
2013/06/21 09:49:11
ACK, changed that but forgot the doc.
It stays val
|
+ * underlying subscription at inopportune moments. |
+ */ |
+class _BroadcastSubscriptionWrapper<T> implements StreamSubscription<T> { |
+ _AsBroadcastStream _stream; |
+ |
+ _BroadcastSubscriptionWrapper(this._stream); |
+ |
+ void onData(void handleData(T data)) { |
+ throw new UnsupportedError( |
+ "Cannot change handlers of asBroadcastStream source subscription."); |
+ } |
+ |
+ void onError(void handleError(Object data)) { |
+ throw new UnsupportedError( |
+ "Cannot change handlers of asBroadcastStream source subscription."); |
+ } |
+ |
+ void onDone(void handleDone()) { |
+ throw new UnsupportedError( |
+ "Cannot change handlers of asBroadcastStream source subscription."); |
+ } |
+ |
+ void pause([Future resumeSignal]) { |
+ if (_stream == null) return; |
floitsch
2013/06/20 14:28:30
throw.
Lasse Reichstein Nielsen
2013/06/21 09:49:11
actually, I removed the line, and just call throug
|
+ _stream._pauseSubscription(resumeSignal); |
floitsch
2013/06/20 14:28:30
This is a little bit strange: we allow a resume-si
Lasse Reichstein Nielsen
2013/06/21 09:49:11
We do allow calling resume.
|
+ } |
+ |
+ void resume() { |
+ if (_stream == null) return; |
floitsch
2013/06/20 14:28:30
ditto (throw).
Lasse Reichstein Nielsen
2013/06/21 09:49:11
Ditto, removed.
|
+ _stream._resumeSubscription(); |
+ } |
+ |
+ void cancel() { |
+ if (_stream == null) return; |
floitsch
2013/06/20 14:28:30
ditto.
Lasse Reichstein Nielsen
2013/06/21 09:49:11
removed.
|
+ _AsBroadcastStream stream = _stream; |
+ _stream = null; |
+ stream._cancelSubscription(); |
+ } |
+ |
+ bool get isPaused { |
+ if (_stream == null) return false; |
floitsch
2013/06/20 14:28:30
ditto.
Lasse Reichstein Nielsen
2013/06/21 09:49:11
Removed.
|
+ return _stream._isSubscriptionPaused; |
+ } |
+ |
+ Future asFuture([var futureValue]) { |
+ throw new UnsupportedError( |
+ "Cannot change handlers of asBroadcastStream source subscription."); |
+ } |
} |
+ |
/** |
* Simple implementation of [StreamIterator]. |
*/ |