Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(158)

Side by Side Diff: sdk/lib/async/stream_controller.dart

Issue 15989006: Revert until Windows crash is debugged. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | sdk/lib/async/stream_impl.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Controller for creating and adding events to a stream. 8 // Controller for creating and adding events to a stream.
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 28 matching lines...) Expand all
39 * Whether to invoke a callback depends only on the state before and after 39 * Whether to invoke a callback depends only on the state before and after
40 * a stream action, for example firing an event. If the state changes multiple 40 * a stream action, for example firing an event. If the state changes multiple
41 * times during the action, and then ends up in the same state as before, no 41 * times during the action, and then ends up in the same state as before, no
42 * callback is performed. 42 * callback is performed.
43 * 43 *
44 * If listeners are added after the stream has completed (sent a "done" event), 44 * If listeners are added after the stream has completed (sent a "done" event),
45 * the listeners will be sent a "done" event eventually, but they won't affect 45 * the listeners will be sent a "done" event eventually, but they won't affect
46 * the stream at all, and won't trigger callbacks. From the controller's point 46 * the stream at all, and won't trigger callbacks. From the controller's point
47 * of view, the stream is completely inert when has completed. 47 * of view, the stream is completely inert when has completed.
48 */ 48 */
49 abstract class StreamController<T> implements EventSink<T> { 49 class StreamController<T> extends EventSink<T> {
50 /** The stream that this controller is controlling. */ 50 final _StreamImpl<T> stream;
51 Stream<T> get stream;
52 51
53 /** 52 /**
54 * A controller with a [stream] that supports only one single subscriber. 53 * A controller with a [stream] that supports only one single subscriber.
55 * 54 *
56 * The controller will buffer all incoming events until the subscriber is 55 * The controller will buffer all incoming events until the subscriber is
57 * registered. 56 * registered.
58 * 57 *
59 * The [onPause] function is called when the stream becomes 58 * The [onPause] function is called when the stream becomes
60 * paused. [onResume] is called when the stream resumed. 59 * paused. [onResume] is called when the stream resumed.
61 * 60 *
62 * The [onListen] callback is called when the stream 61 * The [onListen] callback is called when the stream
63 * receives its listener and [onCancel] when the listener ends 62 * receives its listener. [onCancel] when the listener cancels
64 * its subscription. 63 * its subscription.
65 * 64 *
66 * If the stream is canceled before the controller needs new data the 65 * If the stream is canceled before the controller needs new data the
67 * [onResume] call might not be executed. 66 * [onResume] call might not be executed.
68 */ 67 */
69 factory StreamController({void onListen(), 68 StreamController({void onListen(),
70 void onPause(), 69 void onPause(),
71 void onResume(), 70 void onResume(),
72 void onCancel()}) 71 void onCancel()})
73 => new _StreamControllerImpl<T>(onListen, onPause, onResume, onCancel); 72 : stream = new _SingleControllerStream<T>(
74 73 onListen, onPause, onResume, onCancel);
75 /**
76 * A controller where [stream] creates new stream each time it is read.
77 *
78 * The controller distributes any events to all currently subscribed streams.
79 *
80 * The [onListen] callback is called when the first listener is subscribed,
81 * and the [onCancel] is called when there is no longer any active listeners.
82 * If a listener is added again later, after the [onCancel] was called,
83 * the [onListen] will be called again.
84 */
85 factory StreamController.multiplex({void onListen(), void onCancel()}) {
86 return new _MultiplexStreamController<T>(onListen, onCancel);
87 }
88 74
89 /** 75 /**
90 * Returns a view of this object that only exposes the [EventSink] interface. 76 * Returns a view of this object that only exposes the [EventSink] interface.
91 */ 77 */
92 EventSink<T> get sink; 78 EventSink<T> get sink => new _EventSinkView<T>(this);
93 79
94 /** 80 /**
95 * Whether the stream is closed for adding more events. 81 * Whether the stream is closed for adding more events.
96 * 82 *
97 * If true, the "done" event might not have fired yet, but it has been 83 * If true, the "done" event might not have fired yet, but it has been
98 * scheduled, and it is too late to add more events. 84 * scheduled, and it is too late to add more events.
99 */ 85 */
100 bool get isClosed; 86 bool get isClosed => stream._isClosed;
101 87
102 /** Whether the subscription is active and paused. */ 88 /** Whether one or more active subscribers have requested a pause. */
103 bool get isPaused; 89 bool get isPaused => stream._isInputPaused;
104 90
105 /** Whether there is a subscriber on the [Stream]. */ 91 /** Whether there are currently any subscribers on this [Stream]. */
106 bool get hasListener; 92 bool get hasListener => stream._hasListener;
93
94 /**
95 * Send or queue a data event.
96 */
97 void add(T value) => stream._add(value);
107 98
108 /** 99 /**
109 * Send or enqueue an error event. 100 * Send or enqueue an error event.
110 * 101 *
111 * Also allows an objection stack trace object, on top of what [EventSink] 102 * If a subscription has requested to be unsubscribed on errors,
112 * allows. 103 * it will be unsubscribed after receiving this event.
113 */
114 void addError(Object error, [Object stackTrace]);
115 }
116
117
118 abstract class _StreamControllerLifecycle<T> {
119 void _recordListen(StreamSubscription<T> subscription) {}
120 void _recordPause(StreamSubscription<T> subscription) {}
121 void _recordResume(StreamSubscription<T> subscription) {}
122 void _recordCancel(StreamSubscription<T> subscription) {}
123 }
124
125 /**
126 * Default implementation of [StreamController].
127 *
128 * Controls a stream that only supports a single controller.
129 */
130 class _StreamControllerImpl<T> implements StreamController<T>,
131 _StreamControllerLifecycle<T> {
132 static const int _STATE_OPEN = 0;
133 static const int _STATE_CANCELLED = 1;
134 static const int _STATE_CLOSED = 2;
135
136 final _NotificationHandler _onListen;
137 final _NotificationHandler _onPause;
138 final _NotificationHandler _onResume;
139 final _NotificationHandler _onCancel;
140 _StreamImpl<T> _stream;
141
142 // An active subscription on the stream, or null if no subscripton is active.
143 _ControllerSubscription<T> _subscription;
144
145 // Whether we have sent a "done" event.
146 int _state = _STATE_OPEN;
147
148 // Events added to the stream before it has an active subscription.
149 _PendingEvents _pendingEvents = null;
150
151 _StreamControllerImpl(this._onListen,
152 this._onPause,
153 this._onResume,
154 this._onCancel) {
155 _stream = new _ControllerStream<T>(this);
156 }
157
158 Stream<T> get stream => _stream;
159
160 /**
161 * Returns a view of this object that only exposes the [EventSink] interface.
162 */
163 EventSink<T> get sink => new _EventSinkView<T>(this);
164
165 /**
166 * Whether a listener has existed and been cancelled.
167 *
168 * After this, adding more events will be ignored.
169 */
170 bool get _isCancelled => (_state & _STATE_CANCELLED) != 0;
171
172 bool get isClosed => (_state & _STATE_CLOSED) != 0;
173
174 bool get isPaused => _subscription != null && _subscription._isInputPaused;
175
176 bool get hasListener => _subscription != null;
177
178 /**
179 * Send or queue a data event.
180 */
181 void add(T value) {
182 if (isClosed) throw new StateError("Adding event after close");
183 if (_subscription != null) {
184 _subscription._add(value);
185 } else if (!_isCancelled) {
186 _addPendingEvent(new _DelayedData<T>(value));
187 }
188 }
189
190 /**
191 * Send or enqueue an error event.
192 */ 104 */
193 void addError(Object error, [Object stackTrace]) { 105 void addError(Object error, [Object stackTrace]) {
194 if (isClosed) throw new StateError("Adding event after close");
195 if (stackTrace != null) { 106 if (stackTrace != null) {
196 // Force stack trace overwrite. Even if the error already contained 107 // Force stack trace overwrite. Even if the error already contained
197 // a stack trace. 108 // a stack trace.
198 _attachStackTrace(error, stackTrace); 109 _attachStackTrace(error, stackTrace);
199 } 110 }
200 if (_subscription != null) { 111 stream._addError(error);
201 _subscription._addError(error);
202 } else if (!_isCancelled) {
203 _addPendingEvent(new _DelayedError(error));
204 }
205 } 112 }
206 113
207 /** 114 /**
208 * Closes this controller. 115 * Send or enqueue a "done" message.
209 * 116 *
210 * After closing, no further events may be added using [add] or [addError]. 117 * The "done" message should be sent at most once by a stream, and it
211 * 118 * should be the last message sent.
212 * You are allowed to close the controller more than once, but only the first
213 * call has any effect.
214 *
215 * The first time a controller is closed, a "done" event is sent to its
216 * stream.
217 */ 119 */
218 void close() { 120 void close() { stream._close(); }
219 if (isClosed) return;
220 _state |= _STATE_CLOSED;
221 if (_subscription != null) {
222 _subscription._close();
223 } else if (!_isCancelled) {
224 _addPendingEvent(const _DelayedDone());
225 }
226 }
227
228 void _addPendingEvent(_DelayedEvent event) {
229 if (_isCancelled) return;
230 _StreamImplEvents events = _pendingEvents;
231 if (events == null) {
232 events = _pendingEvents = new _StreamImplEvents();
233 }
234 events.add(event);
235 }
236
237 void _recordListen(_BufferingStreamSubscription<T> subscription) {
238 assert(_subscription == null);
239 _subscription = subscription;
240 subscription._setPendingEvents(_pendingEvents);
241 _pendingEvents = null;
242 subscription._guardCallback(() {
243 _runGuarded(_onListen);
244 });
245 }
246
247 void _recordCancel(StreamSubscription<T> subscription) {
248 assert(identical(_subscription, subscription));
249 _subscription = null;
250 _state |= _STATE_CANCELLED;
251 _runGuarded(_onCancel);
252 }
253
254 void _recordPause(StreamSubscription<T> subscription) {
255 _runGuarded(_onPause);
256 }
257
258 void _recordResume(StreamSubscription<T> subscription) {
259 _runGuarded(_onResume);
260 }
261 } 121 }
262 122
263 typedef void _NotificationHandler(); 123 typedef void _NotificationHandler();
264 124
265 void _runGuarded(_NotificationHandler notificationHandler) { 125 class _SingleControllerStream<T> extends _SingleStreamImpl<T> {
266 if (notificationHandler == null) return; 126 _NotificationHandler _onListen;
267 try { 127 _NotificationHandler _onPause;
268 notificationHandler(); 128 _NotificationHandler _onResume;
269 } catch (e, s) { 129 _NotificationHandler _onCancel;
270 _throwDelayed(e, s);
271 }
272 }
273 130
274 class _ControllerStream<T> extends _StreamImpl<T> { 131 // TODO(floitsch): share this code with _MultiControllerStream.
275 _StreamControllerLifecycle<T> _controller; 132 _runGuarded(_NotificationHandler notificationHandler) {
276 bool _hasListener = false; 133 if (notificationHandler == null) return;
277 134 try {
278 _ControllerStream(this._controller); 135 notificationHandler();
279 136 } catch (e, s) {
280 StreamSubscription<T> _createSubscription( 137 _throwDelayed(e, s);
281 void onData(T data),
282 void onError(Object error),
283 void onDone(),
284 bool cancelOnError) {
285 if (_hasListener) {
286 throw new StateError("The stream has already been listened to.");
287 }
288 _hasListener = true;
289 return new _ControllerSubscription<T>(
290 _controller, onData, onError, onDone, cancelOnError);
291 }
292
293 void _onListen(_BufferingStreamSubscription subscription) {
294 _controller._recordListen(subscription);
295 }
296 }
297
298 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> {
299 final _StreamControllerLifecycle<T> _controller;
300
301 _ControllerSubscription(this._controller,
302 void onData(T data),
303 void onError(Object error),
304 void onDone(),
305 bool cancelOnError)
306 : super(onData, onError, onDone, cancelOnError);
307
308 void _onCancel() {
309 _controller._recordCancel(this);
310 }
311
312 void _onPause() {
313 _controller._recordPause(this);
314 }
315
316 void _onResume() {
317 _controller._recordResume(this);
318 }
319 }
320
321 class _MultiplexStreamController<T> implements StreamController<T>,
322 _StreamControllerLifecycle<T> {
323 final _NotificationHandler _onListen;
324 final _NotificationHandler _onCancel;
325 /** Set when the [close] method is called. */
326 bool _isClosed = false;
327
328 // TODO(lrn): Make a more efficient implementation of these subscriptions,
329 // e.g., the traditional double-linked list with concurrent add and remove
330 // while firing.
331 Set<_BufferingStreamSubscription<T>> _streams;
332
333 _MultiplexStreamController(this._onListen, this._onCancel)
334 : _streams = new Set<_BufferingStreamSubscription<T>>();
335
336 // StreamController interface.
337
338 Stream<T> get stream => new _ControllerStream<T>(this);
339
340 EventSink<T> get sink => new _EventSinkView<T>(this);
341
342 bool get isClosed => _isClosed;
343
344 /**
345 * A multiplex controller is never paused.
346 *
347 * Each receiving stream may be paused individually, and they handle their
348 * own buffering.
349 */
350 bool get isPaused => false;
351
352 /** Whether there are currently a subscriber on the [Stream]. */
353 bool get hasListener => !_streams.isEmpty;
354
355 // _StreamControllerLifecycle interface.
356
357 void _recordListen(_BufferingStreamSubscription<T> subscription) {
358 bool isFirst = _streams.isEmpty;
359 _streams.add(subscription);
360 if (isFirst) {
361 _runGuarded(_onListen);
362 } 138 }
363 } 139 }
364 140
365 void _recordCancel(_BufferingStreamSubscription<T> subscription) { 141 _SingleControllerStream(this._onListen,
366 _streams.remove(subscription); 142 this._onPause,
367 if (_streams.isEmpty) { 143 this._onResume,
368 _runGuarded(_onCancel); 144 this._onCancel);
369 } 145
146 void _onSubscriptionStateChange() {
147 _runGuarded(_hasListener ? _onListen : _onCancel);
370 } 148 }
371 149
372 void _recordPause(StreamSubscription<T> subscription) {} 150 void _onPauseStateChange() {
373 void _recordResume(StreamSubscription<T> subscription) {} 151 _runGuarded(_isPaused ? _onPause : _onResume);
374
375 // EventSink interface.
376
377 void add(T data) {
378 if (_streams.isEmpty) return;
379 _forEachListener((_BufferingStreamSubscription<T> subscription) {
380 subscription._add(data);
381 });
382 }
383
384 void addError(Object error, [Object stackTrace]) {
385 if (_streams.isEmpty) return;
386 _forEachListener((_BufferingStreamSubscription<T> subscription) {
387 subscription._addError(error);
388 });
389 }
390
391 void close() {
392 _isClosed = true;
393 if (_streams.isEmpty) return;
394 _forEachListener((_BufferingStreamSubscription<T> subscription) {
395 _streams.remove(subscription);
396 subscription._close();
397 });
398 }
399
400 void _forEachListener(
401 void action(_BufferingStreamSubscription<T> subscription)) {
402 List<_BufferingStreamSubscription<T>> subscriptions = _streams.toList();
403 for (_BufferingStreamSubscription<T> subscription in subscriptions) {
404 if (_streams.contains(subscription)) {
405 action(subscription);
406 }
407 }
408 } 152 }
409 } 153 }
410
OLDNEW
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | sdk/lib/async/stream_impl.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698