Chromium Code Reviews| Index: sdk/lib/async/stream_impl.dart |
| diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart |
| index 85aa0833b6f0233cf7399ffb256e21aee5261169..a006059c442e404928652a42a820bdbbe8ddf6d5 100644 |
| --- a/sdk/lib/async/stream_impl.dart |
| +++ b/sdk/lib/async/stream_impl.dart |
| @@ -696,13 +696,22 @@ class _BroadcastLinkedList { |
| } |
| } |
| +typedef void _broadcastCallback(StreamSubscription subscription); |
| + |
| class _AsBroadcastStream<T> extends Stream<T> { |
| final Stream<T> _source; |
| + final _broadcastCallback _onListenHandler; |
| + final _broadcastCallback _onCancelHandler; |
| + final _Zone _zone; |
| + |
| _AsBroadcastStreamController<T> _controller; |
| StreamSubscription<T> _subscription; |
| - _AsBroadcastStream(this._source) { |
| - _controller = new _AsBroadcastStreamController<T>(null, _onCancel); |
| + _AsBroadcastStream(this._source, |
| + this._onListenHandler, |
| + this._onCancelHandler) |
| + : _zone = _Zone.current { |
| + _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel); |
| } |
| bool get isBroadcast => true; |
| @@ -724,14 +733,103 @@ class _AsBroadcastStream<T> extends Stream<T> { |
| } |
| void _onCancel() { |
| + if (_onCancelHandler != null) { |
| + _zone.executeCallback(() { |
|
floitsch
2013/06/20 14:28:30
I'm not sure if these should be run in the zone: t
Lasse Reichstein Nielsen
2013/06/21 09:49:11
Zone has been removed from the callback.
Uses curr
|
| + _onCancelHandler(new _BroadcastSubscriptionWrapper(this)); |
| + }); |
| + } |
| + } |
| + |
| + void _onListen() { |
| + if (_onListenHandler != null) { |
| + _zone.executeCallback(() { |
|
floitsch
2013/06/20 14:28:30
ditto.
Lasse Reichstein Nielsen
2013/06/21 09:49:11
Done.
|
| + _onListenHandler(new _BroadcastSubscriptionWrapper(this)); |
| + }); |
| + } |
| + } |
| + |
| + // Methods called from _BroadcastSubscriptionWrapper. |
| + void _cancelSubscription() { |
| + if (_subscription == null) return; |
| // Called by [_controller] when it has no subscribers left. |
| StreamSubscription subscription = _subscription; |
| _subscription = null; |
| _controller = null; // Marks the stream as no longer listenable. |
| subscription.cancel(); |
| } |
| + |
| + void _pauseSubscription(Future resumeSignal) { |
| + if (_subscription == null) return; |
| + _subscription.pause(resumeSignal); |
| + } |
| + |
| + void _resumeSubscription() { |
| + if (_subscription == null) return; |
| + _subscription.resume(); |
| + } |
| + |
| + bool get _isSubscriptionPaused { |
| + if (_subscription == null) return false; |
| + return _subscription.isPaused; |
| + } |
| +} |
| + |
| +/** |
| + * Wrapper for subscription that disallows changing handlers. |
| + * |
| + * Then wrapper is only valid while a callback using it is running. After |
| + * that it is invalidated, so a user can't hand on to it and change the |
|
floitsch
2013/06/20 14:28:30
Description seems to be wrong. Pause and resume ke
Lasse Reichstein Nielsen
2013/06/21 09:49:11
ACK, changed that but forgot the doc.
It stays val
|
| + * underlying subscription at inopportune moments. |
| + */ |
| +class _BroadcastSubscriptionWrapper<T> implements StreamSubscription<T> { |
| + _AsBroadcastStream _stream; |
| + |
| + _BroadcastSubscriptionWrapper(this._stream); |
| + |
| + void onData(void handleData(T data)) { |
| + throw new UnsupportedError( |
| + "Cannot change handlers of asBroadcastStream source subscription."); |
| + } |
| + |
| + void onError(void handleError(Object data)) { |
| + throw new UnsupportedError( |
| + "Cannot change handlers of asBroadcastStream source subscription."); |
| + } |
| + |
| + void onDone(void handleDone()) { |
| + throw new UnsupportedError( |
| + "Cannot change handlers of asBroadcastStream source subscription."); |
| + } |
| + |
| + void pause([Future resumeSignal]) { |
| + if (_stream == null) return; |
|
floitsch
2013/06/20 14:28:30
throw.
Lasse Reichstein Nielsen
2013/06/21 09:49:11
actually, I removed the line, and just call throug
|
| + _stream._pauseSubscription(resumeSignal); |
|
floitsch
2013/06/20 14:28:30
This is a little bit strange: we allow a resume-si
Lasse Reichstein Nielsen
2013/06/21 09:49:11
We do allow calling resume.
|
| + } |
| + |
| + void resume() { |
| + if (_stream == null) return; |
|
floitsch
2013/06/20 14:28:30
ditto (throw).
Lasse Reichstein Nielsen
2013/06/21 09:49:11
Ditto, removed.
|
| + _stream._resumeSubscription(); |
| + } |
| + |
| + void cancel() { |
| + if (_stream == null) return; |
|
floitsch
2013/06/20 14:28:30
ditto.
Lasse Reichstein Nielsen
2013/06/21 09:49:11
removed.
|
| + _AsBroadcastStream stream = _stream; |
| + _stream = null; |
| + stream._cancelSubscription(); |
| + } |
| + |
| + bool get isPaused { |
| + if (_stream == null) return false; |
|
floitsch
2013/06/20 14:28:30
ditto.
Lasse Reichstein Nielsen
2013/06/21 09:49:11
Removed.
|
| + return _stream._isSubscriptionPaused; |
| + } |
| + |
| + Future asFuture([var futureValue]) { |
| + throw new UnsupportedError( |
| + "Cannot change handlers of asBroadcastStream source subscription."); |
| + } |
| } |
| + |
| /** |
| * Simple implementation of [StreamIterator]. |
| */ |