Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(365)

Side by Side Diff: lib/src/forkable_stream.dart

Issue 1241723003: Add StreamQueue.fork and ForkableStream. (Closed) Base URL: git@github.com:dart-lang/async.git@master
Patch Set: Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 library async.forkable_stream;
6
7 import 'dart:async';
8
9 import 'stream_completer.dart';
10
11 /// A single-subscription stream from which other streams may be forked off at
12 /// the current position.
13 ///
14 /// This adds an operation, [fork], which produces a new stream that
15 /// independently emits the same events as this stream. Unlike the branches
16 /// produced by [StreamSplitter], a fork only emits events that arrive *after*
17 /// the call to [fork].
18 ///
19 /// Each fork can be paused or canceled independently of one another and of this
20 /// stream. The underlying stream will be listened to once any branch is
21 /// listened to. It will be paused when all branches are paused or not yet
22 /// listened to. It will be canceled when all branches have been listened to and
23 /// then canceled.
Lasse Reichstein Nielsen 2015/07/15 20:07:11 It seems it will be cancelled immediately when the
nweiz 2015/07/15 22:19:43 Only if [_controllers] is empty—that is, there are
24 class ForkableStream<T> extends StreamView<T> {
Lasse Reichstein Nielsen 2015/07/15 20:07:11 AFAICS this differs from just making it a broadcas
nweiz 2015/07/15 22:19:43 I think the idea of a forkable stream is clearer t
25 /// The underlying stream.
26 final Stream _sourceStream;
27
28 /// The subscription to [_sourceStream].
29 ///
30 /// This will be `null` until this stream or any of its forks are listened to.
31 StreamSubscription _subscription;
32
33 /// Whether this has been cancelled and no more forks may be created.
34 bool _isClosed = false;
Lasse Reichstein Nielsen 2015/07/15 20:07:12 We usually use "isClosed" for when "close" has bee
nweiz 2015/07/15 22:19:43 Done. I was trying to use it in the same sense as
35
36 /// The controllers for any branches that have not yet been canceled.
37 ///
38 /// This includes a controller for this stream, until that has been cancelled.
39 final _controllers = new Set<StreamController<T>>();
40
41 /// Creates a new forkable stream wrapping [sourceStream].
42 ForkableStream(Stream sourceStream)
43 // Use a completer here so that we can provide its stream to the
44 // superclass constructor while also adding the stream controller to
45 // [_controllers].
46 : this._(sourceStream, new StreamCompleter());
47
48 ForkableStream._(this._sourceStream, StreamCompleter completer)
49 : super(completer.stream) {
50 completer.setSourceStream(_fork(primary: true));
51 }
52
53 /// Creates a new fork of this stream.
54 ///
55 /// From this point forward, the fork will emit the same events as this
56 /// stream. It will *not* emit any events that have already been emitted by
57 /// this stream. The fork is independent of this stream, which means each one
58 /// may be paused or canceled without affecting the other.
59 ///
60 /// Throws a [StateError] if this stream is done or its subscription has been
61 /// canceled.
62 Stream<T> fork() => _fork(primary: false);
Lasse Reichstein Nielsen 2015/07/16 14:01:23 I don't think fork belongs on Stream. A stream is
nweiz 2015/07/17 20:30:16 I disagree. It's important that the fork can be cr
63
64 /// Creates a stream forwarding [_sourceStream].
65 ///
66 /// If [primary] is true, this is the stream underlying this object;
67 /// otherwise, it's a fork. The only difference is that when the primary
68 /// stream is canceled, [fork] starts throwing [StateError]s.
69 Stream<T> _fork({bool primary: false}) {
70 if (_isClosed) {
71 throw new StateError("Can't fork a closed or canceled stream.");
Lasse Reichstein Nielsen 2015/07/15 20:07:11 Alternatively just return an empty stream (new Str
nweiz 2015/07/15 22:19:43 Done.
72 }
73
74 var controller;
75 controller = new StreamController<T>(
76 onListen: () => _onListenOrResume(controller),
77 onCancel: () => _onCancel(controller, primary: primary),
78 onPause: () => _onPause(controller),
79 onResume: () => _onListenOrResume(controller),
80 sync: true);
81
82 _controllers.add(controller);
83
84 return controller.stream;
85 }
86
87 /// The callback called when `onListen` or `onResume` is called for the branch
88 /// managed by [controller].
89 ///
90 /// This ensures that we're subscribed to [_sourceStream] and that the
91 /// subscription isn't paused.
92 void _onListenOrResume(StreamController<T> controller) {
93 if (controller.isClosed) return;
94 if (_subscription == null) {
95 _subscription =
96 _sourceStream.listen(_onData, onError: _onError, onDone: _onDone);
97 } else {
98 _subscription.resume();
99 }
100 }
101
102 /// The callback called when `onCancel` is called for the branch managed by
103 /// [controller].
104 ///
105 /// This cancels or pauses the underlying subscription as necessary. If
106 /// [primary] is true, it also ensures that future calls to [fork] throw
107 /// [StateError]s.
108 Future _onCancel(StreamController<T> controller, {bool primary: false}) {
109 if (primary) _isClosed = true;
Lasse Reichstein Nielsen 2015/07/15 20:07:11 So if the primary stream is canceled, you can't fo
nweiz 2015/07/15 22:19:43 That's right, although you can still fork if you w
110
111 if (controller.isClosed) return null;
Lasse Reichstein Nielsen 2015/07/15 20:07:11 A controller is only closed by the "onDone" handle
nweiz 2015/07/15 22:19:43 Not quite—while dispatching _onDone, it's possible
112 _controllers.remove(controller);
113
114 if (_controllers.isEmpty) return _subscription.cancel();
115
116 _onPause(controller);
117 return null;
118 }
119
120 /// The callback called when `onPause` is called for the branch managed by
121 /// [controller].
122 ///
123 /// This pauses the underlying subscription if necessary.
124 void _onPause(StreamController<T> controller) {
125 if (controller.isClosed) return;
126 if (_subscription.isPaused) return;
127 if (_controllers.any((controller) =>
128 controller.hasListener && !controller.isPaused)) {
129 return;
130 }
131
132 _subscription.pause();
133 }
134
135 /// Forwards data events to all branches.
136 void _onData(value) {
137 // Don't iterate directly over the set because [controller.add] might cause
138 // it to be modified synchronously.
139 for (var controller in _controllers.toSet()) {
Lasse Reichstein Nielsen 2015/07/15 20:07:12 I would use `toList()` here. It should have a lowe
nweiz 2015/07/15 22:19:43 Done.
140 controller.add(value);
141 }
142 }
143
144 /// Forwards error events to all branches.
145 void _onError(error, StackTrace stackTrace) {
146 // Don't iterate directly over the set because [controller.addError] might
147 // cause it to be modified synchronously.
148 for (var controller in _controllers.toSet()) {
149 controller.addError(error, stackTrace);
150 }
151 }
152
153 /// Forwards close events to all branches.
154 void _onDone() {
155 // Don't iterate directly over the set because [controller.close] might
156 // cause it to be modified synchronously.
157 for (var controller in _controllers.toSet()) {
158 controller.close();
Lasse Reichstein Nielsen 2015/07/15 20:07:11 You may have a race condition here. The _isClosed
nweiz 2015/07/15 22:19:43 Because the controller is first and the controller
159 }
160 _controllers.clear();
161 }
162 }
163
OLDNEW
« no previous file with comments | « lib/async.dart ('k') | lib/src/stream_queue.dart » ('j') | lib/src/stream_queue.dart » ('J')

Powered by Google App Engine
This is Rietveld 408576698