Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(214)

Side by Side Diff: sdk/lib/async/stream_controller.dart

Issue 11783009: Big merge from experimental to bleeding edge. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | sdk/lib/async/stream_impl.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 // part of dart.async;
6
7 // -------------------------------------------------------------------
8 // Default implementation of a stream with a controller for adding
9 // events to the stream.
10 // -------------------------------------------------------------------
11
12 /**
13 * A controller and the stream it controls.
14 *
15 * This controller allows sending data, error and done events on
16 * its [stream].
17 * This class can be used to create a simple stream that others
18 * can listen on, and to push events to that stream.
19 * For more specialized streams, the [createStream] method can be
20 * overridden to return a specialization of [ControllerStream], and
21 * other public methods can be overridden too (but it's recommended
22 * that the overriding method calls its super method).
23 *
24 * A [StreamController] may have zero or more subscribers.
25 *
26 * If it has subscribers, it may also be paused by any number of its
27 * subscribers. When paused, all incoming events are queued. It is the
28 * responsibility of the user of this stream to prevent incoming events when
29 * the controller is paused. When there are no pausing subscriptions left,
30 * either due to them resuming, or due to the pausing subscriptions
31 * unsubscribing, events are resumed.
32 *
33 * When "close" is invoked (but not necessarily when the done event is fired,
34 * depending on pause state) the stream controller is closed.
35 * When the done event is fired to a subscriber, the subscriber is automatically
36 * unsubscribed.
37 */
38 class StreamController<T> extends Stream<T> implements StreamSink<T> {
39 _StreamImpl<T> _stream;
40 Stream<T> get stream => _stream;
41
42 /**
43 * A controller with a [stream] that supports multiple subscribers.
44 */
45 StreamController() {
46 _stream = new _MultiControllerStream<T>(onSubscriptionStateChange,
47 onPauseStateChange);
48 }
49 /**
50 * A controller with a [stream] that supports only one single subscriber.
51 * The controller will buffer all incoming events until the subscriber is
52 * registered.
53 */
54 StreamController.singleSubscription() {
55 _stream = new _SingleControllerStream<T>(onSubscriptionStateChange,
56 onPauseStateChange);
57 }
58
59 StreamSubscription listen(void onData(T data),
60 { void onError(AsyncError error),
61 void onDone(),
62 bool unsubscribeOnError}) {
63 return _stream.listen(onData,
64 onError: onError,
65 onDone: onDone,
66 unsubscribeOnError: unsubscribeOnError);
67 }
68
69 /**
70 * Returns a view of this object that only exposes the [StreamSink] interface.
71 */
72 StreamSink<T> get sink => new StreamSinkView<T>(this);
73
74 /** Whether one or more active subscribers have requested a pause. */
75 bool get isPaused => _stream._isPaused;
76
77 /** Whether there are currently any subscribers on this [Stream]. */
78 bool get hasSubscribers => _stream._hasSubscribers;
79
80 /**
81 * Send or queue a data event.
82 */
83 Signal add(T value) => _stream._add(value);
84
85 /**
86 * Send or enqueue an error event.
87 *
88 * If a subscription has requested to be unsubscribed on errors,
89 * it will be unsubscribed after receiving this event.
90 */
91 void signalError(AsyncError error) { _stream._signalError(error); }
92
93 /**
94 * Send or enqueue a "done" message.
95 *
96 * The "done" message should be sent at most once by a stream, and it
97 * should be the last message sent.
98 */
99 void close() { _stream._close(); }
100
101 /**
102 * Called when the first subscriber requests a pause or the last a resume.
103 *
104 * Read [isPaused] to see the new state.
105 */
106 void onPauseStateChange() {}
107
108 /**
109 * Called when the first listener subscribes or the last unsubscribes.
110 *
111 * Read [hasSubscribers] to see what the new state is.
112 */
113 void onSubscriptionStateChange() {}
114
115 void forEachSubscriber(void action(_StreamSubscriptionImpl<T> subscription)) {
116 _stream._forEachSubscriber(() {
117 try {
118 action();
119 } catch (e, s) {
120 new AsyncError(e, s).throwDelayed();
121 }
122 });
123 }
124 }
125
126 typedef void _NotificationHandler();
127
128 class _MultiControllerStream<T> extends _MultiStreamImpl<T> {
129 _NotificationHandler _subscriptionHandler;
130 _NotificationHandler _pauseHandler;
131
132 _MultiControllerStream(this._subscriptionHandler, this._pauseHandler);
133
134 void _onSubscriptionStateChange() {
135 _subscriptionHandler();
136 }
137
138 void _onPauseStateChange() {
139 _pauseHandler();
140 }
141 }
142
143 class _SingleControllerStream<T> extends _SingleStreamImpl<T> {
144 _NotificationHandler _subscriptionHandler;
145 _NotificationHandler _pauseHandler;
146
147 _SingleControllerStream(this._subscriptionHandler, this._pauseHandler);
148
149 void _onSubscriptionStateChange() {
150 _subscriptionHandler();
151 }
152
153 void _onPauseStateChange() {
154 _pauseHandler();
155 }
156 }
OLDNEW
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | sdk/lib/async/stream_impl.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698