Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(324)

Side by Side Diff: pkg/barback/lib/src/stream_replayer.dart

Issue 261823008: Reorganize barback's source files. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: re-add barback/lib/src/internal_asset.dart Created 6 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « pkg/barback/lib/src/stream_pool.dart ('k') | pkg/barback/lib/src/transform.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 library barback.stream_replayer;
6
7 import 'dart:async';
8 import 'dart:collection';
9
10 import 'utils.dart';
11
12 /// Records the values and errors that are sent through a stream and allows them
13 /// to be replayed arbitrarily many times.
14 ///
15 /// This only listens to the wrapped stream when a replayed stream gets a
16 /// listener.
17 class StreamReplayer<T> {
18 /// The wrapped stream.
19 final Stream<T> _stream;
20
21 /// Whether or not [this] has started listening to [_stream].
22 bool _isSubscribed = false;
23
24 /// Whether or not [_stream] has been closed.
25 bool _isClosed = false;
26
27 /// The buffer of events or errors that have already been emitted by
28 /// [_stream].
29 ///
30 /// Each element is a [Fallible] that's either a value or an error sent
31 /// through the stream.
32 final _buffer = new Queue<Fallible<T>>();
33
34 /// The controllers that are listening for future events from [_stream].
35 final _controllers = new Set<StreamController<T>>();
36
37 StreamReplayer(this._stream);
38
39 /// Returns a stream that replays the values and errors of the input stream.
40 ///
41 /// This stream is a buffered stream.
42 Stream<T> getReplay() {
43 var controller = new StreamController<T>(onListen: _subscribe);
44
45 for (var eventOrError in _buffer) {
46 if (eventOrError.hasValue) {
47 controller.add(eventOrError.value);
48 } else {
49 controller.addError(eventOrError.error, eventOrError.stackTrace);
50 }
51 }
52 if (_isClosed) {
53 controller.close();
54 } else {
55 _controllers.add(controller);
56 }
57 return controller.stream;
58 }
59
60 /// Subscribe to [_stream] if we haven't yet done so.
61 void _subscribe() {
62 if (_isSubscribed || _isClosed) return;
63 _isSubscribed = true;
64
65 _stream.listen((data) {
66 _buffer.add(new Fallible<T>.withValue(data));
67 for (var controller in _controllers) {
68 controller.add(data);
69 }
70 }, onError: (error, [stackTrace]) {
71 _buffer.add(new Fallible<T>.withError(error, stackTrace));
72 for (var controller in _controllers) {
73 controller.addError(error, stackTrace);
74 }
75 }, onDone: () {
76 _isClosed = true;
77 for (var controller in _controllers) {
78 controller.close();
79 }
80 _controllers.clear();
81 });
82 }
83 }
OLDNEW
« no previous file with comments | « pkg/barback/lib/src/stream_pool.dart ('k') | pkg/barback/lib/src/transform.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698