| Index: mojo/public/dart/third_party/barback/lib/src/utils/stream_replayer.dart
|
| diff --git a/mojo/public/dart/third_party/barback/lib/src/utils/stream_replayer.dart b/mojo/public/dart/third_party/barback/lib/src/utils/stream_replayer.dart
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..4a2508596b0000348858ddc9be8c09129f698c21
|
| --- /dev/null
|
| +++ b/mojo/public/dart/third_party/barback/lib/src/utils/stream_replayer.dart
|
| @@ -0,0 +1,83 @@
|
| +// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
|
| +// for details. All rights reserved. Use of this source code is governed by a
|
| +// BSD-style license that can be found in the LICENSE file.
|
| +
|
| +library barback.utils.stream_replayer;
|
| +
|
| +import 'dart:async';
|
| +import 'dart:collection';
|
| +
|
| +import '../utils.dart';
|
| +
|
| +/// Records the values and errors that are sent through a stream and allows them
|
| +/// to be replayed arbitrarily many times.
|
| +///
|
| +/// This only listens to the wrapped stream when a replayed stream gets a
|
| +/// listener.
|
| +class StreamReplayer<T> {
|
| + /// The wrapped stream.
|
| + final Stream<T> _stream;
|
| +
|
| + /// Whether or not [this] has started listening to [_stream].
|
| + bool _isSubscribed = false;
|
| +
|
| + /// Whether or not [_stream] has been closed.
|
| + bool _isClosed = false;
|
| +
|
| + /// The buffer of events or errors that have already been emitted by
|
| + /// [_stream].
|
| + ///
|
| + /// Each element is a [Fallible] that's either a value or an error sent
|
| + /// through the stream.
|
| + final _buffer = new Queue<Fallible<T>>();
|
| +
|
| + /// The controllers that are listening for future events from [_stream].
|
| + final _controllers = new Set<StreamController<T>>();
|
| +
|
| + StreamReplayer(this._stream);
|
| +
|
| + /// Returns a stream that replays the values and errors of the input stream.
|
| + ///
|
| + /// This stream is a buffered stream.
|
| + Stream<T> getReplay() {
|
| + var controller = new StreamController<T>(onListen: _subscribe);
|
| +
|
| + for (var eventOrError in _buffer) {
|
| + if (eventOrError.hasValue) {
|
| + controller.add(eventOrError.value);
|
| + } else {
|
| + controller.addError(eventOrError.error, eventOrError.stackTrace);
|
| + }
|
| + }
|
| + if (_isClosed) {
|
| + controller.close();
|
| + } else {
|
| + _controllers.add(controller);
|
| + }
|
| + return controller.stream;
|
| + }
|
| +
|
| + /// Subscribe to [_stream] if we haven't yet done so.
|
| + void _subscribe() {
|
| + if (_isSubscribed || _isClosed) return;
|
| + _isSubscribed = true;
|
| +
|
| + _stream.listen((data) {
|
| + _buffer.add(new Fallible<T>.withValue(data));
|
| + for (var controller in _controllers) {
|
| + controller.add(data);
|
| + }
|
| + }, onError: (error, [stackTrace]) {
|
| + _buffer.add(new Fallible<T>.withError(error, stackTrace));
|
| + for (var controller in _controllers) {
|
| + controller.addError(error, stackTrace);
|
| + }
|
| + }, onDone: () {
|
| + _isClosed = true;
|
| + for (var controller in _controllers) {
|
| + controller.close();
|
| + }
|
| + _controllers.clear();
|
| + });
|
| + }
|
| +}
|
|
|