| Index: sdk/lib/async/merge_stream.dart
|
| diff --git a/sdk/lib/async/merge_stream.dart b/sdk/lib/async/merge_stream.dart
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..2a9d273e2cd76dfac4312efc91e4581bf38dd841
|
| --- /dev/null
|
| +++ b/sdk/lib/async/merge_stream.dart
|
| @@ -0,0 +1,282 @@
|
| +// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
|
| +// for details. All rights reserved. Use of this source code is governed by a
|
| +// BSD-style license that can be found in the LICENSE file.
|
| +
|
| +// part of dart.async;
|
| +
|
| +class _SupercedeEntry<T> {
|
| + final SupercedeStream stream;
|
| + Stream<T> source;
|
| + StreamSubscription subscription = null;
|
| + _SupercedeEntry next;
|
| +
|
| + _SupercedeEntry(this.stream, this.source, this.next);
|
| +
|
| + // Whether the source stream is complete.
|
| + bool get isDone => source == null;
|
| +
|
| + void onData(T data) {
|
| + // Stop all lower-priority sources.
|
| + stream._setData(this, data);
|
| + }
|
| +
|
| + void onError(AsyncError error) {
|
| + stream._signalError(error);
|
| + }
|
| +
|
| + void onDone() {
|
| + subscription = null;
|
| + source = null;
|
| + stream._setDone(this);
|
| + }
|
| +
|
| + void start() {
|
| + assert(subscription == null);
|
| + if (!isDone) {
|
| + subscription =
|
| + source.listen(onData, onError: onError, onDone: onDone);
|
| + }
|
| + }
|
| +
|
| + void stop() {
|
| + if (!isDone) {
|
| + subscription.cancel();
|
| + subscription = null;
|
| + }
|
| + }
|
| +
|
| + void pause() {
|
| + if (!isDone) subscription.pause();
|
| + }
|
| +
|
| + void resume() {
|
| + if (!isDone) subscription.resume();
|
| + }
|
| +}
|
| +
|
| +/**
|
| + * [Stream] that forwards data from its active source with greatest priority.
|
| + *
|
| + * The [SupercedeStream] gets data from some source [Stream]s which
|
| + * are ordered in order of increasing priority.
|
| + * When a higher priority stream provides data, all lower priority streams
|
| + * are dropped.
|
| + *
|
| + * Errors from all (undropped) streams are forwarded.
|
| + */
|
| +class SupercedeStream<T> extends _MultiStreamImpl<T> {
|
| + _SupercedeEntry _entries = null;
|
| +
|
| + /**
|
| + * Create [SupercedeStream] from the given [sources].
|
| + *
|
| + * The [sources] are iterated in order of increasing priority.
|
| + */
|
| + SupercedeStream(Iterable<Stream<T>> sources) {
|
| + // Set up linked list of sources in decreasing priority order.
|
| + // The order allows us to drop all lower priority streams when a higher
|
| + // priority stream provides a value.
|
| + for (Stream<T> stream in sources) {
|
| + _entries = new _SupercedeEntry(this, stream, _entries);
|
| + }
|
| + }
|
| +
|
| + void _onSubscriptionStateChange() {
|
| + if (_hasSubscribers) {
|
| + for (_SupercedeEntry entry = _entries;
|
| + entry != null;
|
| + entry = entry.next) {
|
| + entry.start();
|
| + }
|
| + } else {
|
| + for (_SupercedeEntry entry = _entries;
|
| + entry != null;
|
| + entry = entry.next) {
|
| + entry.stop();
|
| + }
|
| + }
|
| + }
|
| +
|
| + void _onPauseStateChange() {
|
| + if (_isPaused) {
|
| + for (_SupercedeEntry entry = _entries;
|
| + entry != null;
|
| + entry = entry.next) {
|
| + entry.pause();
|
| + }
|
| + } else {
|
| + for (_SupercedeEntry entry = _entries;
|
| + entry != null;
|
| + entry = entry.next) {
|
| + entry.resume();
|
| + }
|
| + }
|
| + }
|
| +
|
| + void _setData(_SupercedeEntry entry, T data) {
|
| + while (entry.next != null) {
|
| + _SupercedeEntry nextEntry = entry.next;
|
| + entry.next = null;
|
| + nextEntry.stop();
|
| + entry = nextEntry;
|
| + }
|
| + _add(data);
|
| + }
|
| +
|
| + void _setDone(_SupercedeEntry entry) {
|
| + if (identical(_entries, entry)) {
|
| + // Remove the leading completed streams. These are streams
|
| + // the completed without ever providing data.
|
| + while (_entries.isDone) {
|
| + _entries = _entries.next;
|
| + if (_entries == null) {
|
| + _close();
|
| + return;
|
| + }
|
| + }
|
| + }
|
| + // Otherwise we leave the completed entry in the list and
|
| + // remove it when a higher priority stream provides data or
|
| + // all higher priority streams have completed.
|
| + }
|
| +}
|
| +
|
| +/**
|
| + * Helper class for [CyclicScheduleStream].
|
| + *
|
| + * Used to maintain a list of source streams which are activated in cyclic
|
| + * order.
|
| + *
|
| + * The stream is either unsubscribed, paused or active. Only one stream
|
| + * will be active at a time. A source is not subscribed until it's first
|
| + * activated.
|
| + *
|
| + * If the source completes, the entry is removed from [stream].
|
| + */
|
| +class _CycleEntry<T> {
|
| + final CyclicScheduleStream stream;
|
| + /** A single source stream for the [CyclicScheduleStream]. */
|
| + Stream source;
|
| + /** The active subscription, if any. */
|
| + StreamSubscription subscription = null;
|
| + /** Next entry in a linked list of entries. */
|
| + _CycleEntry next;
|
| +
|
| + _CycleEntry(this.stream, this.source);
|
| +
|
| + void cancel() {
|
| + // This method may be called event if this entry has never been activated.
|
| + if (subscription != null) {
|
| + subscription.cancel();
|
| + subscription = null;
|
| + }
|
| + }
|
| +
|
| + void pause() {
|
| + ensureSubscribed();
|
| + if (!subscription.isPaused) {
|
| + subscription.pause();
|
| + }
|
| + }
|
| +
|
| + void activate() {
|
| + ensureSubscribed();
|
| + if (subscription.isPaused) {
|
| + subscription.resume();
|
| + }
|
| + }
|
| +
|
| + void ensureSubscribed() {
|
| + if (subscription == null) {
|
| + subscription =
|
| + source.listen(stream._onData,
|
| + onError: stream._signalError,
|
| + onDone: stream._onDone);
|
| + }
|
| + }
|
| +}
|
| +
|
| +/**
|
| + * [Stream] that schedules events from multiple sources in cyclic order.
|
| + *
|
| + * The source streams are activated and paused so that only one data event
|
| + * is generated at a time, and those data events are output on this stream.
|
| + *
|
| + * Error events from the currently active stream are forwarded without
|
| + * changing the schedule. When a source stream ends, it is removed from
|
| + * the schedule.
|
| + */
|
| +class CyclicScheduleStream<T> extends _MultiStreamImpl<T> {
|
| + _CycleEntry _currentEntry = null;
|
| + _CycleEntry _lastEntry = null;
|
| +
|
| + /**
|
| + * Create a [Stream] that provides data from [sources] one event at a time.
|
| + *
|
| + * The data are provided as one event from each stream in the order they are
|
| + * given by the [Iterable], and then cycling as long as there are data.
|
| + */
|
| + CyclicScheduleStream(Iterable<Stream<T>> sources) {
|
| + _CycleEntry entry = null;
|
| + for (Stream<T> source in sources) {
|
| + _CycleEntry newEntry = new _CycleEntry(this, source);
|
| + if (_lastEntry == null) {
|
| + _currentEntry = _lastEntry = newEntry;
|
| + } else {
|
| + _lastEntry = _lastEntry.next = newEntry;
|
| + }
|
| + }
|
| + if (_currentEntry == null) {
|
| + _close();
|
| + }
|
| + }
|
| +
|
| + void _onSubscriptionStateChange() {
|
| + if (_hasSubscribers) {
|
| + _currentEntry.activate();
|
| + for (_CycleEntry entry = _currentEntry.next;
|
| + entry != null;
|
| + entry = entry.next) {
|
| + entry.pause();
|
| + }
|
| + return;
|
| + }
|
| + for (_CycleEntry entry = _currentEntry; entry != null; entry = entry.next) {
|
| + entry.cancel();
|
| + }
|
| + }
|
| +
|
| + void _onPauseStateChange() {
|
| + if (_isPaused) {
|
| + _currentEntry.pause();
|
| + } else {
|
| + _currentEntry.activate();
|
| + }
|
| + }
|
| +
|
| + void _onData(T data) {
|
| + if (_currentEntry.next != null) {
|
| + _currentEntry.pause();
|
| + _add(data);
|
| + // Move the current entry to the end of the list.
|
| + _lastEntry = _lastEntry.next = _currentEntry;
|
| + _currentEntry = _currentEntry.next;
|
| + _lastEntry.next = null;
|
| + _currentEntry.activate();
|
| + } else {
|
| + // No pausing with only one entry left.
|
| + _add(data);
|
| + }
|
| + }
|
| +
|
| + void _onDone() {
|
| + if (_currentEntry.next == null) {
|
| + _close();
|
| + _currentEntry = _lastEntry = null;
|
| + } else {
|
| + // Remove the current entry from the list now that it's complete.
|
| + _currentEntry = _currentEntry.next;
|
| + _currentEntry.activate();
|
| + }
|
| + }
|
| +}
|
|
|