Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(169)

Side by Side Diff: packages/async/lib/src/stream_completer.dart

Issue 1400473008: Roll Observatory packages and add a roll script (Closed) Base URL: git@github.com:dart-lang/observatory_pub_packages.git@master
Patch Set: Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « packages/async/lib/src/result_future.dart ('k') | packages/async/lib/src/stream_group.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 library async.stream_completer;
6
7 import "dart:async";
8
9 /// A single-subscription [stream] where the contents are provided later.
10 ///
11 /// It is generally recommended that you never create a `Future<Stream>`
12 /// because you can just directly create a stream that doesn't do anything
13 /// until it's ready to do so.
14 /// This class can be used to create such a stream.
15 ///
16 /// The [stream] is a normal stream that you can listen to immediately,
17 /// but until either [setSourceStream] or [setEmpty] is called,
18 /// the stream won't produce any events.
19 ///
20 /// The same effect can be achieved by using a [StreamController]
21 /// and adding the stream using `addStream` when both
22 /// the controller's stream is listened to and the source stream is ready.
23 /// This class attempts to shortcut some of the overhead when possible.
24 /// For example, if the [stream] is only listened to
25 /// after the source stream has been set,
26 /// the listen is performed directly on the source stream.
27 class StreamCompleter<T> {
28 /// The stream doing the actual work, is returned by [stream].
29 final _CompleterStream _stream = new _CompleterStream<T>();
30
31 /// Convert a `Future<Stream>` to a `Stream`.
32 ///
33 /// This creates a stream using a stream completer,
34 /// and sets the source stream to the result of the future when the
35 /// future completes.
36 ///
37 /// If the future completes with an error, the returned stream will
38 /// instead contain just that error.
39 static Stream fromFuture(Future<Stream> streamFuture) {
40 var completer = new StreamCompleter();
41 streamFuture.then(completer.setSourceStream,
42 onError: (e, s) {
43 completer.setSourceStream(streamFuture.asStream());
44 });
45 return completer.stream;
46 }
47
48 /// The stream of this completer.
49 ///
50 /// This stream is always a single-subscription stream.
51 ///
52 /// When a source stream is provided, its events will be forwarded to
53 /// listeners on this stream.
54 ///
55 /// The stream can be listened either before or after a source stream
56 /// is set.
57 Stream<T> get stream => _stream;
58
59 /// Set a stream as the source of events for the [StreamCompleter]'s
60 /// [stream].
61 ///
62 /// The completer's `stream` will act exactly as [sourceStream].
63 ///
64 /// If the source stream is set before [stream] is listened to,
65 /// the listen call on [stream] is forwarded directly to [sourceStream].
66 ///
67 /// If [stream] is listened to before setting the source stream,
68 /// an intermediate subscription is created. It looks like a completely
69 /// normal subscription, and can be paused or canceled, but it won't
70 /// produce any events until a source stream is provided.
71 ///
72 /// If the `stream` subscription is canceled before a source stream is set,
73 /// the source stream will be listened to and immediately canceled again.
74 ///
75 /// Otherwise, when the source stream is then set,
76 /// it is immediately listened to, and its events are forwarded to the
77 /// existing subscription.
78 ///
79 /// Either [setSourceStream] or [setEmpty] may be called at most once.
80 /// Trying to call either of them again will fail.
81 void setSourceStream(Stream<T> sourceStream) {
82 if (_stream._isSourceStreamSet) {
83 throw new StateError("Source stream already set");
84 }
85 _stream._setSourceStream(sourceStream);
86 }
87
88 /// Equivalent to setting an empty stream using [setSourceStream].
89 ///
90 /// Either [setSourceStream] or [setEmpty] may be called at most once.
91 /// Trying to call either of them again will fail.
92 void setEmpty() {
93 if (_stream._isSourceStreamSet) {
94 throw new StateError("Source stream already set");
95 }
96 _stream._setEmpty();
97 }
98 }
99
100 /// Stream completed by [StreamCompleter].
101 class _CompleterStream<T> extends Stream<T> {
102 /// Controller for an intermediate stream.
103 ///
104 /// Created if the user listens on this stream before the source stream
105 /// is set, or if using [_setEmpty] so there is no source stream.
106 StreamController _controller;
107
108 /// Source stream for the events provided by this stream.
109 ///
110 /// Set when the completer sets the source stream using [_setSourceStream]
111 /// or [_setEmpty].
112 Stream _sourceStream;
113
114 StreamSubscription<T> listen(onData(T data),
115 {Function onError,
116 void onDone(),
117 bool cancelOnError}) {
118 if (_controller == null) {
119 if (_sourceStream != null && !_sourceStream.isBroadcast) {
120 // If the source stream is itself single subscription,
121 // just listen to it directly instead of creating a controller.
122 return _sourceStream.listen(onData, onError: onError, onDone: onDone,
123 cancelOnError: cancelOnError);
124 }
125 _createController();
126 if (_sourceStream != null) {
127 _linkStreamToController();
128 }
129 }
130 return _controller.stream.listen(onData, onError: onError, onDone: onDone,
131 cancelOnError: cancelOnError);
132 }
133
134 /// Whether a source stream has been set.
135 ///
136 /// Used to throw an error if trying to set a source stream twice.
137 bool get _isSourceStreamSet => _sourceStream != null;
138
139 /// Sets the source stream providing the events for this stream.
140 ///
141 /// If set before the user listens, listen calls will be directed directly
142 /// to the source stream. If the user listenes earlier, and intermediate
143 /// stream is created using a stream controller, and the source stream is
144 /// linked into that stream later.
145 void _setSourceStream(Stream<T> sourceStream) {
146 assert(_sourceStream == null);
147 _sourceStream = sourceStream;
148 if (_controller != null) {
149 // User has already listened, so provide the data through controller.
150 _linkStreamToController();
151 }
152 }
153
154 /// Links source stream to controller when both are available.
155 void _linkStreamToController() {
156 assert(_controller != null);
157 assert(_sourceStream != null);
158 _controller.addStream(_sourceStream, cancelOnError: false)
159 .whenComplete(_controller.close);
160 }
161
162 /// Sets an empty source stream.
163 ///
164 /// Uses [_controller] for the stream, then closes the controller
165 /// immediately.
166 void _setEmpty() {
167 assert(_sourceStream == null);
168 if (_controller == null) {
169 _createController();
170 }
171 _sourceStream = _controller.stream; // Mark stream as set.
172 _controller.close();
173 }
174
175 // Creates the [_controller].
176 void _createController() {
177 assert(_controller == null);
178 _controller = new StreamController<T>(sync: true);
179 }
180 }
OLDNEW
« no previous file with comments | « packages/async/lib/src/result_future.dart ('k') | packages/async/lib/src/stream_group.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698