Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(319)

Side by Side Diff: sdk/lib/async/merge_stream.dart

Issue 11783009: Big merge from experimental to bleeding edge. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/async/future_impl.dart ('k') | sdk/lib/async/signal.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 // part of dart.async;
6
7 class _SupercedeEntry<T> {
8 final SupercedeStream stream;
9 Stream<T> source;
10 StreamSubscription subscription = null;
11 _SupercedeEntry next;
12
13 _SupercedeEntry(this.stream, this.source, this.next);
14
15 // Whether the source stream is complete.
16 bool get isDone => source == null;
17
18 void onData(T data) {
19 // Stop all lower-priority sources.
20 stream._setData(this, data);
21 }
22
23 void onError(AsyncError error) {
24 stream._signalError(error);
25 }
26
27 void onDone() {
28 subscription = null;
29 source = null;
30 stream._setDone(this);
31 }
32
33 void start() {
34 assert(subscription == null);
35 if (!isDone) {
36 subscription =
37 source.listen(onData, onError: onError, onDone: onDone);
38 }
39 }
40
41 void stop() {
42 if (!isDone) {
43 subscription.cancel();
44 subscription = null;
45 }
46 }
47
48 void pause() {
49 if (!isDone) subscription.pause();
50 }
51
52 void resume() {
53 if (!isDone) subscription.resume();
54 }
55 }
56
57 /**
58 * [Stream] that forwards data from its active source with greatest priority.
59 *
60 * The [SupercedeStream] gets data from some source [Stream]s which
61 * are ordered in order of increasing priority.
62 * When a higher priority stream provides data, all lower priority streams
63 * are dropped.
64 *
65 * Errors from all (undropped) streams are forwarded.
66 */
67 class SupercedeStream<T> extends _MultiStreamImpl<T> {
68 _SupercedeEntry _entries = null;
69
70 /**
71 * Create [SupercedeStream] from the given [sources].
72 *
73 * The [sources] are iterated in order of increasing priority.
74 */
75 SupercedeStream(Iterable<Stream<T>> sources) {
76 // Set up linked list of sources in decreasing priority order.
77 // The order allows us to drop all lower priority streams when a higher
78 // priority stream provides a value.
79 for (Stream<T> stream in sources) {
80 _entries = new _SupercedeEntry(this, stream, _entries);
81 }
82 }
83
84 void _onSubscriptionStateChange() {
85 if (_hasSubscribers) {
86 for (_SupercedeEntry entry = _entries;
87 entry != null;
88 entry = entry.next) {
89 entry.start();
90 }
91 } else {
92 for (_SupercedeEntry entry = _entries;
93 entry != null;
94 entry = entry.next) {
95 entry.stop();
96 }
97 }
98 }
99
100 void _onPauseStateChange() {
101 if (_isPaused) {
102 for (_SupercedeEntry entry = _entries;
103 entry != null;
104 entry = entry.next) {
105 entry.pause();
106 }
107 } else {
108 for (_SupercedeEntry entry = _entries;
109 entry != null;
110 entry = entry.next) {
111 entry.resume();
112 }
113 }
114 }
115
116 void _setData(_SupercedeEntry entry, T data) {
117 while (entry.next != null) {
118 _SupercedeEntry nextEntry = entry.next;
119 entry.next = null;
120 nextEntry.stop();
121 entry = nextEntry;
122 }
123 _add(data);
124 }
125
126 void _setDone(_SupercedeEntry entry) {
127 if (identical(_entries, entry)) {
128 // Remove the leading completed streams. These are streams
129 // the completed without ever providing data.
130 while (_entries.isDone) {
131 _entries = _entries.next;
132 if (_entries == null) {
133 _close();
134 return;
135 }
136 }
137 }
138 // Otherwise we leave the completed entry in the list and
139 // remove it when a higher priority stream provides data or
140 // all higher priority streams have completed.
141 }
142 }
143
144 /**
145 * Helper class for [CyclicScheduleStream].
146 *
147 * Used to maintain a list of source streams which are activated in cyclic
148 * order.
149 *
150 * The stream is either unsubscribed, paused or active. Only one stream
151 * will be active at a time. A source is not subscribed until it's first
152 * activated.
153 *
154 * If the source completes, the entry is removed from [stream].
155 */
156 class _CycleEntry<T> {
157 final CyclicScheduleStream stream;
158 /** A single source stream for the [CyclicScheduleStream]. */
159 Stream source;
160 /** The active subscription, if any. */
161 StreamSubscription subscription = null;
162 /** Next entry in a linked list of entries. */
163 _CycleEntry next;
164
165 _CycleEntry(this.stream, this.source);
166
167 void cancel() {
168 // This method may be called event if this entry has never been activated.
169 if (subscription != null) {
170 subscription.cancel();
171 subscription = null;
172 }
173 }
174
175 void pause() {
176 ensureSubscribed();
177 if (!subscription.isPaused) {
178 subscription.pause();
179 }
180 }
181
182 void activate() {
183 ensureSubscribed();
184 if (subscription.isPaused) {
185 subscription.resume();
186 }
187 }
188
189 void ensureSubscribed() {
190 if (subscription == null) {
191 subscription =
192 source.listen(stream._onData,
193 onError: stream._signalError,
194 onDone: stream._onDone);
195 }
196 }
197 }
198
199 /**
200 * [Stream] that schedules events from multiple sources in cyclic order.
201 *
202 * The source streams are activated and paused so that only one data event
203 * is generated at a time, and those data events are output on this stream.
204 *
205 * Error events from the currently active stream are forwarded without
206 * changing the schedule. When a source stream ends, it is removed from
207 * the schedule.
208 */
209 class CyclicScheduleStream<T> extends _MultiStreamImpl<T> {
210 _CycleEntry _currentEntry = null;
211 _CycleEntry _lastEntry = null;
212
213 /**
214 * Create a [Stream] that provides data from [sources] one event at a time.
215 *
216 * The data are provided as one event from each stream in the order they are
217 * given by the [Iterable], and then cycling as long as there are data.
218 */
219 CyclicScheduleStream(Iterable<Stream<T>> sources) {
220 _CycleEntry entry = null;
221 for (Stream<T> source in sources) {
222 _CycleEntry newEntry = new _CycleEntry(this, source);
223 if (_lastEntry == null) {
224 _currentEntry = _lastEntry = newEntry;
225 } else {
226 _lastEntry = _lastEntry.next = newEntry;
227 }
228 }
229 if (_currentEntry == null) {
230 _close();
231 }
232 }
233
234 void _onSubscriptionStateChange() {
235 if (_hasSubscribers) {
236 _currentEntry.activate();
237 for (_CycleEntry entry = _currentEntry.next;
238 entry != null;
239 entry = entry.next) {
240 entry.pause();
241 }
242 return;
243 }
244 for (_CycleEntry entry = _currentEntry; entry != null; entry = entry.next) {
245 entry.cancel();
246 }
247 }
248
249 void _onPauseStateChange() {
250 if (_isPaused) {
251 _currentEntry.pause();
252 } else {
253 _currentEntry.activate();
254 }
255 }
256
257 void _onData(T data) {
258 if (_currentEntry.next != null) {
259 _currentEntry.pause();
260 _add(data);
261 // Move the current entry to the end of the list.
262 _lastEntry = _lastEntry.next = _currentEntry;
263 _currentEntry = _currentEntry.next;
264 _lastEntry.next = null;
265 _currentEntry.activate();
266 } else {
267 // No pausing with only one entry left.
268 _add(data);
269 }
270 }
271
272 void _onDone() {
273 if (_currentEntry.next == null) {
274 _close();
275 _currentEntry = _lastEntry = null;
276 } else {
277 // Remove the current entry from the list now that it's complete.
278 _currentEntry = _currentEntry.next;
279 _currentEntry.activate();
280 }
281 }
282 }
OLDNEW
« no previous file with comments | « sdk/lib/async/future_impl.dart ('k') | sdk/lib/async/signal.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698