| OLD | NEW |
| (Empty) |
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | |
| 2 // for details. All rights reserved. Use of this source code is governed by a | |
| 3 // BSD-style license that can be found in the LICENSE file. | |
| 4 | |
| 5 library barback.utils.stream_replayer; | |
| 6 | |
| 7 import 'dart:async'; | |
| 8 import 'dart:collection'; | |
| 9 | |
| 10 import '../utils.dart'; | |
| 11 | |
| 12 /// Records the values and errors that are sent through a stream and allows them | |
| 13 /// to be replayed arbitrarily many times. | |
| 14 /// | |
| 15 /// This only listens to the wrapped stream when a replayed stream gets a | |
| 16 /// listener. | |
| 17 class StreamReplayer<T> { | |
| 18 /// The wrapped stream. | |
| 19 final Stream<T> _stream; | |
| 20 | |
| 21 /// Whether or not [this] has started listening to [_stream]. | |
| 22 bool _isSubscribed = false; | |
| 23 | |
| 24 /// Whether or not [_stream] has been closed. | |
| 25 bool _isClosed = false; | |
| 26 | |
| 27 /// The buffer of events or errors that have already been emitted by | |
| 28 /// [_stream]. | |
| 29 /// | |
| 30 /// Each element is a [Fallible] that's either a value or an error sent | |
| 31 /// through the stream. | |
| 32 final _buffer = new Queue<Fallible<T>>(); | |
| 33 | |
| 34 /// The controllers that are listening for future events from [_stream]. | |
| 35 final _controllers = new Set<StreamController<T>>(); | |
| 36 | |
| 37 StreamReplayer(this._stream); | |
| 38 | |
| 39 /// Returns a stream that replays the values and errors of the input stream. | |
| 40 /// | |
| 41 /// This stream is a buffered stream. | |
| 42 Stream<T> getReplay() { | |
| 43 var controller = new StreamController<T>(onListen: _subscribe); | |
| 44 | |
| 45 for (var eventOrError in _buffer) { | |
| 46 if (eventOrError.hasValue) { | |
| 47 controller.add(eventOrError.value); | |
| 48 } else { | |
| 49 controller.addError(eventOrError.error, eventOrError.stackTrace); | |
| 50 } | |
| 51 } | |
| 52 if (_isClosed) { | |
| 53 controller.close(); | |
| 54 } else { | |
| 55 _controllers.add(controller); | |
| 56 } | |
| 57 return controller.stream; | |
| 58 } | |
| 59 | |
| 60 /// Subscribe to [_stream] if we haven't yet done so. | |
| 61 void _subscribe() { | |
| 62 if (_isSubscribed || _isClosed) return; | |
| 63 _isSubscribed = true; | |
| 64 | |
| 65 _stream.listen((data) { | |
| 66 _buffer.add(new Fallible<T>.withValue(data)); | |
| 67 for (var controller in _controllers) { | |
| 68 controller.add(data); | |
| 69 } | |
| 70 }, onError: (error, [stackTrace]) { | |
| 71 _buffer.add(new Fallible<T>.withError(error, stackTrace)); | |
| 72 for (var controller in _controllers) { | |
| 73 controller.addError(error, stackTrace); | |
| 74 } | |
| 75 }, onDone: () { | |
| 76 _isClosed = true; | |
| 77 for (var controller in _controllers) { | |
| 78 controller.close(); | |
| 79 } | |
| 80 _controllers.clear(); | |
| 81 }); | |
| 82 } | |
| 83 } | |
| OLD | NEW |