OLD | NEW |
| (Empty) |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 library barback.utils.stream_replayer; | |
6 | |
7 import 'dart:async'; | |
8 import 'dart:collection'; | |
9 | |
10 import '../utils.dart'; | |
11 | |
12 /// Records the values and errors that are sent through a stream and allows them | |
13 /// to be replayed arbitrarily many times. | |
14 /// | |
15 /// This only listens to the wrapped stream when a replayed stream gets a | |
16 /// listener. | |
17 class StreamReplayer<T> { | |
18 /// The wrapped stream. | |
19 final Stream<T> _stream; | |
20 | |
21 /// Whether or not [this] has started listening to [_stream]. | |
22 bool _isSubscribed = false; | |
23 | |
24 /// Whether or not [_stream] has been closed. | |
25 bool _isClosed = false; | |
26 | |
27 /// The buffer of events or errors that have already been emitted by | |
28 /// [_stream]. | |
29 /// | |
30 /// Each element is a [Fallible] that's either a value or an error sent | |
31 /// through the stream. | |
32 final _buffer = new Queue<Fallible<T>>(); | |
33 | |
34 /// The controllers that are listening for future events from [_stream]. | |
35 final _controllers = new Set<StreamController<T>>(); | |
36 | |
37 StreamReplayer(this._stream); | |
38 | |
39 /// Returns a stream that replays the values and errors of the input stream. | |
40 /// | |
41 /// This stream is a buffered stream. | |
42 Stream<T> getReplay() { | |
43 var controller = new StreamController<T>(onListen: _subscribe); | |
44 | |
45 for (var eventOrError in _buffer) { | |
46 if (eventOrError.hasValue) { | |
47 controller.add(eventOrError.value); | |
48 } else { | |
49 controller.addError(eventOrError.error, eventOrError.stackTrace); | |
50 } | |
51 } | |
52 if (_isClosed) { | |
53 controller.close(); | |
54 } else { | |
55 _controllers.add(controller); | |
56 } | |
57 return controller.stream; | |
58 } | |
59 | |
60 /// Subscribe to [_stream] if we haven't yet done so. | |
61 void _subscribe() { | |
62 if (_isSubscribed || _isClosed) return; | |
63 _isSubscribed = true; | |
64 | |
65 _stream.listen((data) { | |
66 _buffer.add(new Fallible<T>.withValue(data)); | |
67 for (var controller in _controllers) { | |
68 controller.add(data); | |
69 } | |
70 }, onError: (error, [stackTrace]) { | |
71 _buffer.add(new Fallible<T>.withError(error, stackTrace)); | |
72 for (var controller in _controllers) { | |
73 controller.addError(error, stackTrace); | |
74 } | |
75 }, onDone: () { | |
76 _isClosed = true; | |
77 for (var controller in _controllers) { | |
78 controller.close(); | |
79 } | |
80 _controllers.clear(); | |
81 }); | |
82 } | |
83 } | |
OLD | NEW |