Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(370)

Side by Side Diff: lib/src/util/forkable_stream.dart

Issue 1262623006: Temporarily bring in code from the async package. (Closed) Base URL: git@github.com:dart-lang/test@master
Patch Set: Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 // TODO(nweiz): Get rid of this when https://codereview.chromium.org/1241723003/
6 // lands.
7 library test.util.forkable_stream;
8
9 import 'dart:async';
10
11 import 'package:async/async.dart' hide ForkableStream;
12
13 /// A single-subscription stream from which other streams may be forked off at
14 /// the current position.
15 ///
16 /// This adds an operation, [fork], which produces a new stream that
17 /// independently emits the same events as this stream. Unlike the branches
18 /// produced by [StreamSplitter], a fork only emits events that arrive *after*
19 /// the call to [fork].
20 ///
21 /// Each fork can be paused or canceled independently of one another and of this
22 /// stream. The underlying stream will be listened to once any branch is
23 /// listened to. It will be paused when all branches are paused or not yet
24 /// listened to. It will be canceled when all branches have been listened to and
25 /// then canceled.
26 class ForkableStream<T> extends StreamView<T> {
27 /// The underlying stream.
28 final Stream _sourceStream;
29
30 /// The subscription to [_sourceStream].
31 ///
32 /// This will be `null` until this stream or any of its forks are listened to.
33 StreamSubscription _subscription;
34
35 /// Whether this has been cancelled and no more forks may be created.
36 bool _isCanceled = false;
37
38 /// The controllers for any branches that have not yet been canceled.
39 ///
40 /// This includes a controller for this stream, until that has been cancelled.
41 final _controllers = new Set<StreamController<T>>();
42
43 /// Creates a new forkable stream wrapping [sourceStream].
44 ForkableStream(Stream sourceStream)
45 // Use a completer here so that we can provide its stream to the
46 // superclass constructor while also adding the stream controller to
47 // [_controllers].
48 : this._(sourceStream, new StreamCompleter());
49
50 ForkableStream._(this._sourceStream, StreamCompleter completer)
51 : super(completer.stream) {
52 completer.setSourceStream(_fork(primary: true));
53 }
54
55 /// Creates a new fork of this stream.
56 ///
57 /// From this point forward, the fork will emit the same events as this
58 /// stream. It will *not* emit any events that have already been emitted by
59 /// this stream. The fork is independent of this stream, which means each one
60 /// may be paused or canceled without affecting the other.
61 ///
62 /// If this stream is done or its subscription has been canceled, this returns
63 /// an empty stream.
64 Stream<T> fork() => _fork(primary: false);
65
66 /// Creates a stream forwarding [_sourceStream].
67 ///
68 /// If [primary] is true, this is the stream underlying this object;
69 /// otherwise, it's a fork. The only difference is that when the primary
70 /// stream is canceled, [fork] starts throwing [StateError]s.
71 Stream<T> _fork({bool primary: false}) {
72 if (_isCanceled) {
73 var controller = new StreamController<T>()..close();
74 return controller.stream;
75 }
76
77 var controller;
78 controller = new StreamController<T>(
79 onListen: () => _onListenOrResume(controller),
80 onCancel: () => _onCancel(controller, primary: primary),
81 onPause: () => _onPause(controller),
82 onResume: () => _onListenOrResume(controller),
83 sync: true);
84
85 _controllers.add(controller);
86
87 return controller.stream;
88 }
89
90 /// The callback called when `onListen` or `onResume` is called for the branch
91 /// managed by [controller].
92 ///
93 /// This ensures that we're subscribed to [_sourceStream] and that the
94 /// subscription isn't paused.
95 void _onListenOrResume(StreamController<T> controller) {
96 if (controller.isClosed) return;
97 if (_subscription == null) {
98 _subscription =
99 _sourceStream.listen(_onData, onError: _onError, onDone: _onDone);
100 } else {
101 _subscription.resume();
102 }
103 }
104
105 /// The callback called when `onCancel` is called for the branch managed by
106 /// [controller].
107 ///
108 /// This cancels or pauses the underlying subscription as necessary. If
109 /// [primary] is true, it also ensures that future calls to [fork] throw
110 /// [StateError]s.
111 Future _onCancel(StreamController<T> controller, {bool primary: false}) {
112 if (primary) _isCanceled = true;
113
114 if (controller.isClosed) return null;
115 _controllers.remove(controller);
116
117 if (_controllers.isEmpty) return _subscription.cancel();
118
119 _onPause(controller);
120 return null;
121 }
122
123 /// The callback called when `onPause` is called for the branch managed by
124 /// [controller].
125 ///
126 /// This pauses the underlying subscription if necessary.
127 void _onPause(StreamController<T> controller) {
128 if (controller.isClosed) return;
129 if (_subscription.isPaused) return;
130 if (_controllers.any((controller) =>
131 controller.hasListener && !controller.isPaused)) {
132 return;
133 }
134
135 _subscription.pause();
136 }
137
138 /// Forwards data events to all branches.
139 void _onData(value) {
140 // Don't iterate directly over the set because [controller.add] might cause
141 // it to be modified synchronously.
142 for (var controller in _controllers.toList()) {
143 controller.add(value);
144 }
145 }
146
147 /// Forwards error events to all branches.
148 void _onError(error, StackTrace stackTrace) {
149 // Don't iterate directly over the set because [controller.addError] might
150 // cause it to be modified synchronously.
151 for (var controller in _controllers.toList()) {
152 controller.addError(error, stackTrace);
153 }
154 }
155
156 /// Forwards close events to all branches.
157 void _onDone() {
158 _isCanceled = true;
159
160 // Don't iterate directly over the set because [controller.close] might
161 // cause it to be modified synchronously.
162 for (var controller in _controllers.toList()) {
163 controller.close();
164 }
165 _controllers.clear();
166 }
167 }
168
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698