OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 // part of dart.async; |
| 6 |
| 7 class _SupercedeEntry<T> { |
| 8 final SupercedeStream stream; |
| 9 Stream<T> source; |
| 10 StreamSubscription subscription = null; |
| 11 _SupercedeEntry next; |
| 12 |
| 13 _SupercedeEntry(this.stream, this.source, this.next); |
| 14 |
| 15 // Whether the source stream is complete. |
| 16 bool get isDone => source == null; |
| 17 |
| 18 void onData(T data) { |
| 19 // Stop all lower-priority sources. |
| 20 stream._setData(this, data); |
| 21 } |
| 22 |
| 23 void onError(AsyncError error) { |
| 24 stream._signalError(error); |
| 25 } |
| 26 |
| 27 void onDone() { |
| 28 subscription = null; |
| 29 source = null; |
| 30 stream._setDone(this); |
| 31 } |
| 32 |
| 33 void start() { |
| 34 assert(subscription == null); |
| 35 if (!isDone) { |
| 36 subscription = |
| 37 source.listen(onData, onError: onError, onDone: onDone); |
| 38 } |
| 39 } |
| 40 |
| 41 void stop() { |
| 42 if (!isDone) { |
| 43 subscription.cancel(); |
| 44 subscription = null; |
| 45 } |
| 46 } |
| 47 |
| 48 void pause() { |
| 49 if (!isDone) subscription.pause(); |
| 50 } |
| 51 |
| 52 void resume() { |
| 53 if (!isDone) subscription.resume(); |
| 54 } |
| 55 } |
| 56 |
| 57 /** |
| 58 * [Stream] that forwards data from its active source with greatest priority. |
| 59 * |
| 60 * The [SupercedeStream] gets data from some source [Stream]s which |
| 61 * are ordered in order of increasing priority. |
| 62 * When a higher priority stream provides data, all lower priority streams |
| 63 * are dropped. |
| 64 * |
| 65 * Errors from all (undropped) streams are forwarded. |
| 66 */ |
| 67 class SupercedeStream<T> extends _MultiStreamImpl<T> { |
| 68 _SupercedeEntry _entries = null; |
| 69 |
| 70 /** |
| 71 * Create [SupercedeStream] from the given [sources]. |
| 72 * |
| 73 * The [sources] are iterated in order of increasing priority. |
| 74 */ |
| 75 SupercedeStream(Iterable<Stream<T>> sources) { |
| 76 // Set up linked list of sources in decreasing priority order. |
| 77 // The order allows us to drop all lower priority streams when a higher |
| 78 // priority stream provides a value. |
| 79 for (Stream<T> stream in sources) { |
| 80 _entries = new _SupercedeEntry(this, stream, _entries); |
| 81 } |
| 82 } |
| 83 |
| 84 void _onSubscriptionStateChange() { |
| 85 if (_hasSubscribers) { |
| 86 for (_SupercedeEntry entry = _entries; |
| 87 entry != null; |
| 88 entry = entry.next) { |
| 89 entry.start(); |
| 90 } |
| 91 } else { |
| 92 for (_SupercedeEntry entry = _entries; |
| 93 entry != null; |
| 94 entry = entry.next) { |
| 95 entry.stop(); |
| 96 } |
| 97 } |
| 98 } |
| 99 |
| 100 void _onPauseStateChange() { |
| 101 if (_isPaused) { |
| 102 for (_SupercedeEntry entry = _entries; |
| 103 entry != null; |
| 104 entry = entry.next) { |
| 105 entry.pause(); |
| 106 } |
| 107 } else { |
| 108 for (_SupercedeEntry entry = _entries; |
| 109 entry != null; |
| 110 entry = entry.next) { |
| 111 entry.resume(); |
| 112 } |
| 113 } |
| 114 } |
| 115 |
| 116 void _setData(_SupercedeEntry entry, T data) { |
| 117 while (entry.next != null) { |
| 118 _SupercedeEntry nextEntry = entry.next; |
| 119 entry.next = null; |
| 120 nextEntry.stop(); |
| 121 entry = nextEntry; |
| 122 } |
| 123 _add(data); |
| 124 } |
| 125 |
| 126 void _setDone(_SupercedeEntry entry) { |
| 127 if (identical(_entries, entry)) { |
| 128 // Remove the leading completed streams. These are streams |
| 129 // the completed without ever providing data. |
| 130 while (_entries.isDone) { |
| 131 _entries = _entries.next; |
| 132 if (_entries == null) { |
| 133 _close(); |
| 134 return; |
| 135 } |
| 136 } |
| 137 } |
| 138 // Otherwise we leave the completed entry in the list and |
| 139 // remove it when a higher priority stream provides data or |
| 140 // all higher priority streams have completed. |
| 141 } |
| 142 } |
| 143 |
| 144 /** |
| 145 * Helper class for [CyclicScheduleStream]. |
| 146 * |
| 147 * Used to maintain a list of source streams which are activated in cyclic |
| 148 * order. |
| 149 * |
| 150 * The stream is either unsubscribed, paused or active. Only one stream |
| 151 * will be active at a time. A source is not subscribed until it's first |
| 152 * activated. |
| 153 * |
| 154 * If the source completes, the entry is removed from [stream]. |
| 155 */ |
| 156 class _CycleEntry<T> { |
| 157 final CyclicScheduleStream stream; |
| 158 /** A single source stream for the [CyclicScheduleStream]. */ |
| 159 Stream source; |
| 160 /** The active subscription, if any. */ |
| 161 StreamSubscription subscription = null; |
| 162 /** Next entry in a linked list of entries. */ |
| 163 _CycleEntry next; |
| 164 |
| 165 _CycleEntry(this.stream, this.source); |
| 166 |
| 167 void cancel() { |
| 168 // This method may be called event if this entry has never been activated. |
| 169 if (subscription != null) { |
| 170 subscription.cancel(); |
| 171 subscription = null; |
| 172 } |
| 173 } |
| 174 |
| 175 void pause() { |
| 176 ensureSubscribed(); |
| 177 if (!subscription.isPaused) { |
| 178 subscription.pause(); |
| 179 } |
| 180 } |
| 181 |
| 182 void activate() { |
| 183 ensureSubscribed(); |
| 184 if (subscription.isPaused) { |
| 185 subscription.resume(); |
| 186 } |
| 187 } |
| 188 |
| 189 void ensureSubscribed() { |
| 190 if (subscription == null) { |
| 191 subscription = |
| 192 source.listen(stream._onData, |
| 193 onError: stream._signalError, |
| 194 onDone: stream._onDone); |
| 195 } |
| 196 } |
| 197 } |
| 198 |
| 199 /** |
| 200 * [Stream] that schedules events from multiple sources in cyclic order. |
| 201 * |
| 202 * The source streams are activated and paused so that only one data event |
| 203 * is generated at a time, and those data events are output on this stream. |
| 204 * |
| 205 * Error events from the currently active stream are forwarded without |
| 206 * changing the schedule. When a source stream ends, it is removed from |
| 207 * the schedule. |
| 208 */ |
| 209 class CyclicScheduleStream<T> extends _MultiStreamImpl<T> { |
| 210 _CycleEntry _currentEntry = null; |
| 211 _CycleEntry _lastEntry = null; |
| 212 |
| 213 /** |
| 214 * Create a [Stream] that provides data from [sources] one event at a time. |
| 215 * |
| 216 * The data are provided as one event from each stream in the order they are |
| 217 * given by the [Iterable], and then cycling as long as there are data. |
| 218 */ |
| 219 CyclicScheduleStream(Iterable<Stream<T>> sources) { |
| 220 _CycleEntry entry = null; |
| 221 for (Stream<T> source in sources) { |
| 222 _CycleEntry newEntry = new _CycleEntry(this, source); |
| 223 if (_lastEntry == null) { |
| 224 _currentEntry = _lastEntry = newEntry; |
| 225 } else { |
| 226 _lastEntry = _lastEntry.next = newEntry; |
| 227 } |
| 228 } |
| 229 if (_currentEntry == null) { |
| 230 _close(); |
| 231 } |
| 232 } |
| 233 |
| 234 void _onSubscriptionStateChange() { |
| 235 if (_hasSubscribers) { |
| 236 _currentEntry.activate(); |
| 237 for (_CycleEntry entry = _currentEntry.next; |
| 238 entry != null; |
| 239 entry = entry.next) { |
| 240 entry.pause(); |
| 241 } |
| 242 return; |
| 243 } |
| 244 for (_CycleEntry entry = _currentEntry; entry != null; entry = entry.next) { |
| 245 entry.cancel(); |
| 246 } |
| 247 } |
| 248 |
| 249 void _onPauseStateChange() { |
| 250 if (_isPaused) { |
| 251 _currentEntry.pause(); |
| 252 } else { |
| 253 _currentEntry.activate(); |
| 254 } |
| 255 } |
| 256 |
| 257 void _onData(T data) { |
| 258 if (_currentEntry.next != null) { |
| 259 _currentEntry.pause(); |
| 260 _add(data); |
| 261 // Move the current entry to the end of the list. |
| 262 _lastEntry = _lastEntry.next = _currentEntry; |
| 263 _currentEntry = _currentEntry.next; |
| 264 _lastEntry.next = null; |
| 265 _currentEntry.activate(); |
| 266 } else { |
| 267 // No pausing with only one entry left. |
| 268 _add(data); |
| 269 } |
| 270 } |
| 271 |
| 272 void _onDone() { |
| 273 if (_currentEntry.next == null) { |
| 274 _close(); |
| 275 _currentEntry = _lastEntry = null; |
| 276 } else { |
| 277 // Remove the current entry from the list now that it's complete. |
| 278 _currentEntry = _currentEntry.next; |
| 279 _currentEntry.activate(); |
| 280 } |
| 281 } |
| 282 } |
OLD | NEW |