Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(653)

Unified Diff: sdk/lib/async/merge_stream.dart

Issue 11783009: Big merge from experimental to bleeding edge. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sdk/lib/async/future_impl.dart ('k') | sdk/lib/async/signal.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/async/merge_stream.dart
diff --git a/sdk/lib/async/merge_stream.dart b/sdk/lib/async/merge_stream.dart
new file mode 100644
index 0000000000000000000000000000000000000000..2a9d273e2cd76dfac4312efc91e4581bf38dd841
--- /dev/null
+++ b/sdk/lib/async/merge_stream.dart
@@ -0,0 +1,282 @@
+// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+// part of dart.async;
+
+class _SupercedeEntry<T> {
+ final SupercedeStream stream;
+ Stream<T> source;
+ StreamSubscription subscription = null;
+ _SupercedeEntry next;
+
+ _SupercedeEntry(this.stream, this.source, this.next);
+
+ // Whether the source stream is complete.
+ bool get isDone => source == null;
+
+ void onData(T data) {
+ // Stop all lower-priority sources.
+ stream._setData(this, data);
+ }
+
+ void onError(AsyncError error) {
+ stream._signalError(error);
+ }
+
+ void onDone() {
+ subscription = null;
+ source = null;
+ stream._setDone(this);
+ }
+
+ void start() {
+ assert(subscription == null);
+ if (!isDone) {
+ subscription =
+ source.listen(onData, onError: onError, onDone: onDone);
+ }
+ }
+
+ void stop() {
+ if (!isDone) {
+ subscription.cancel();
+ subscription = null;
+ }
+ }
+
+ void pause() {
+ if (!isDone) subscription.pause();
+ }
+
+ void resume() {
+ if (!isDone) subscription.resume();
+ }
+}
+
+/**
+ * [Stream] that forwards data from its active source with greatest priority.
+ *
+ * The [SupercedeStream] gets data from some source [Stream]s which
+ * are ordered in order of increasing priority.
+ * When a higher priority stream provides data, all lower priority streams
+ * are dropped.
+ *
+ * Errors from all (undropped) streams are forwarded.
+ */
+class SupercedeStream<T> extends _MultiStreamImpl<T> {
+ _SupercedeEntry _entries = null;
+
+ /**
+ * Create [SupercedeStream] from the given [sources].
+ *
+ * The [sources] are iterated in order of increasing priority.
+ */
+ SupercedeStream(Iterable<Stream<T>> sources) {
+ // Set up linked list of sources in decreasing priority order.
+ // The order allows us to drop all lower priority streams when a higher
+ // priority stream provides a value.
+ for (Stream<T> stream in sources) {
+ _entries = new _SupercedeEntry(this, stream, _entries);
+ }
+ }
+
+ void _onSubscriptionStateChange() {
+ if (_hasSubscribers) {
+ for (_SupercedeEntry entry = _entries;
+ entry != null;
+ entry = entry.next) {
+ entry.start();
+ }
+ } else {
+ for (_SupercedeEntry entry = _entries;
+ entry != null;
+ entry = entry.next) {
+ entry.stop();
+ }
+ }
+ }
+
+ void _onPauseStateChange() {
+ if (_isPaused) {
+ for (_SupercedeEntry entry = _entries;
+ entry != null;
+ entry = entry.next) {
+ entry.pause();
+ }
+ } else {
+ for (_SupercedeEntry entry = _entries;
+ entry != null;
+ entry = entry.next) {
+ entry.resume();
+ }
+ }
+ }
+
+ void _setData(_SupercedeEntry entry, T data) {
+ while (entry.next != null) {
+ _SupercedeEntry nextEntry = entry.next;
+ entry.next = null;
+ nextEntry.stop();
+ entry = nextEntry;
+ }
+ _add(data);
+ }
+
+ void _setDone(_SupercedeEntry entry) {
+ if (identical(_entries, entry)) {
+ // Remove the leading completed streams. These are streams
+ // the completed without ever providing data.
+ while (_entries.isDone) {
+ _entries = _entries.next;
+ if (_entries == null) {
+ _close();
+ return;
+ }
+ }
+ }
+ // Otherwise we leave the completed entry in the list and
+ // remove it when a higher priority stream provides data or
+ // all higher priority streams have completed.
+ }
+}
+
+/**
+ * Helper class for [CyclicScheduleStream].
+ *
+ * Used to maintain a list of source streams which are activated in cyclic
+ * order.
+ *
+ * The stream is either unsubscribed, paused or active. Only one stream
+ * will be active at a time. A source is not subscribed until it's first
+ * activated.
+ *
+ * If the source completes, the entry is removed from [stream].
+ */
+class _CycleEntry<T> {
+ final CyclicScheduleStream stream;
+ /** A single source stream for the [CyclicScheduleStream]. */
+ Stream source;
+ /** The active subscription, if any. */
+ StreamSubscription subscription = null;
+ /** Next entry in a linked list of entries. */
+ _CycleEntry next;
+
+ _CycleEntry(this.stream, this.source);
+
+ void cancel() {
+ // This method may be called event if this entry has never been activated.
+ if (subscription != null) {
+ subscription.cancel();
+ subscription = null;
+ }
+ }
+
+ void pause() {
+ ensureSubscribed();
+ if (!subscription.isPaused) {
+ subscription.pause();
+ }
+ }
+
+ void activate() {
+ ensureSubscribed();
+ if (subscription.isPaused) {
+ subscription.resume();
+ }
+ }
+
+ void ensureSubscribed() {
+ if (subscription == null) {
+ subscription =
+ source.listen(stream._onData,
+ onError: stream._signalError,
+ onDone: stream._onDone);
+ }
+ }
+}
+
+/**
+ * [Stream] that schedules events from multiple sources in cyclic order.
+ *
+ * The source streams are activated and paused so that only one data event
+ * is generated at a time, and those data events are output on this stream.
+ *
+ * Error events from the currently active stream are forwarded without
+ * changing the schedule. When a source stream ends, it is removed from
+ * the schedule.
+ */
+class CyclicScheduleStream<T> extends _MultiStreamImpl<T> {
+ _CycleEntry _currentEntry = null;
+ _CycleEntry _lastEntry = null;
+
+ /**
+ * Create a [Stream] that provides data from [sources] one event at a time.
+ *
+ * The data are provided as one event from each stream in the order they are
+ * given by the [Iterable], and then cycling as long as there are data.
+ */
+ CyclicScheduleStream(Iterable<Stream<T>> sources) {
+ _CycleEntry entry = null;
+ for (Stream<T> source in sources) {
+ _CycleEntry newEntry = new _CycleEntry(this, source);
+ if (_lastEntry == null) {
+ _currentEntry = _lastEntry = newEntry;
+ } else {
+ _lastEntry = _lastEntry.next = newEntry;
+ }
+ }
+ if (_currentEntry == null) {
+ _close();
+ }
+ }
+
+ void _onSubscriptionStateChange() {
+ if (_hasSubscribers) {
+ _currentEntry.activate();
+ for (_CycleEntry entry = _currentEntry.next;
+ entry != null;
+ entry = entry.next) {
+ entry.pause();
+ }
+ return;
+ }
+ for (_CycleEntry entry = _currentEntry; entry != null; entry = entry.next) {
+ entry.cancel();
+ }
+ }
+
+ void _onPauseStateChange() {
+ if (_isPaused) {
+ _currentEntry.pause();
+ } else {
+ _currentEntry.activate();
+ }
+ }
+
+ void _onData(T data) {
+ if (_currentEntry.next != null) {
+ _currentEntry.pause();
+ _add(data);
+ // Move the current entry to the end of the list.
+ _lastEntry = _lastEntry.next = _currentEntry;
+ _currentEntry = _currentEntry.next;
+ _lastEntry.next = null;
+ _currentEntry.activate();
+ } else {
+ // No pausing with only one entry left.
+ _add(data);
+ }
+ }
+
+ void _onDone() {
+ if (_currentEntry.next == null) {
+ _close();
+ _currentEntry = _lastEntry = null;
+ } else {
+ // Remove the current entry from the list now that it's complete.
+ _currentEntry = _currentEntry.next;
+ _currentEntry.activate();
+ }
+ }
+}
« no previous file with comments | « sdk/lib/async/future_impl.dart ('k') | sdk/lib/async/signal.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698