OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 library barback.utils.stream_replayer; |
| 6 |
| 7 import 'dart:async'; |
| 8 import 'dart:collection'; |
| 9 |
| 10 import '../utils.dart'; |
| 11 |
| 12 /// Records the values and errors that are sent through a stream and allows them |
| 13 /// to be replayed arbitrarily many times. |
| 14 /// |
| 15 /// This only listens to the wrapped stream when a replayed stream gets a |
| 16 /// listener. |
| 17 class StreamReplayer<T> { |
| 18 /// The wrapped stream. |
| 19 final Stream<T> _stream; |
| 20 |
| 21 /// Whether or not [this] has started listening to [_stream]. |
| 22 bool _isSubscribed = false; |
| 23 |
| 24 /// Whether or not [_stream] has been closed. |
| 25 bool _isClosed = false; |
| 26 |
| 27 /// The buffer of events or errors that have already been emitted by |
| 28 /// [_stream]. |
| 29 /// |
| 30 /// Each element is a [Fallible] that's either a value or an error sent |
| 31 /// through the stream. |
| 32 final _buffer = new Queue<Fallible<T>>(); |
| 33 |
| 34 /// The controllers that are listening for future events from [_stream]. |
| 35 final _controllers = new Set<StreamController<T>>(); |
| 36 |
| 37 StreamReplayer(this._stream); |
| 38 |
| 39 /// Returns a stream that replays the values and errors of the input stream. |
| 40 /// |
| 41 /// This stream is a buffered stream. |
| 42 Stream<T> getReplay() { |
| 43 var controller = new StreamController<T>(onListen: _subscribe); |
| 44 |
| 45 for (var eventOrError in _buffer) { |
| 46 if (eventOrError.hasValue) { |
| 47 controller.add(eventOrError.value); |
| 48 } else { |
| 49 controller.addError(eventOrError.error, eventOrError.stackTrace); |
| 50 } |
| 51 } |
| 52 if (_isClosed) { |
| 53 controller.close(); |
| 54 } else { |
| 55 _controllers.add(controller); |
| 56 } |
| 57 return controller.stream; |
| 58 } |
| 59 |
| 60 /// Subscribe to [_stream] if we haven't yet done so. |
| 61 void _subscribe() { |
| 62 if (_isSubscribed || _isClosed) return; |
| 63 _isSubscribed = true; |
| 64 |
| 65 _stream.listen((data) { |
| 66 _buffer.add(new Fallible<T>.withValue(data)); |
| 67 for (var controller in _controllers) { |
| 68 controller.add(data); |
| 69 } |
| 70 }, onError: (error, [stackTrace]) { |
| 71 _buffer.add(new Fallible<T>.withError(error, stackTrace)); |
| 72 for (var controller in _controllers) { |
| 73 controller.addError(error, stackTrace); |
| 74 } |
| 75 }, onDone: () { |
| 76 _isClosed = true; |
| 77 for (var controller in _controllers) { |
| 78 controller.close(); |
| 79 } |
| 80 _controllers.clear(); |
| 81 }); |
| 82 } |
| 83 } |
OLD | NEW |