Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(421)

Side by Side Diff: packages/async/lib/src/stream_sink_completer.dart

Issue 2989763002: Update charted to 0.4.8 and roll (Closed)
Patch Set: Removed Cutch from list of reviewers Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 import 'dart:async';
6
7 import 'null_stream_sink.dart';
8
9 /// A [sink] where the destination is provided later.
10 ///
11 /// The [sink] is a normal sink that you can add events to to immediately, but
12 /// until [setDestinationSink] is called, the events will be buffered.
13 ///
14 /// The same effect can be achieved by using a [StreamController] and adding it
15 /// to the sink using [Sink.addStream] when the destination sink is ready. This
16 /// class attempts to shortcut some of the overhead when possible. For example,
17 /// if the [sink] only has events added after the destination sink has been set,
18 /// those events are added directly to the sink.
19 class StreamSinkCompleter<T> {
20 /// The sink for this completer.
21 ///
22 /// When a destination sink is provided, events that have been passed to the
23 /// sink will be forwarded to the destination.
24 ///
25 /// Events can be added to the sink either before or after a destination sink
26 /// is set.
27 final StreamSink<T> sink = new _CompleterSink<T>();
28
29 /// Returns [sink] typed as a [_CompleterSink].
30 _CompleterSink<T> get _sink => sink;
31
32 /// Convert a `Future<StreamSink>` to a `StreamSink`.
33 ///
34 /// This creates a sink using a sink completer, and sets the destination sink
35 /// to the result of the future when the future completes.
36 ///
37 /// If the future completes with an error, the returned sink will instead
38 /// be closed. Its [Sink.done] future will contain the error.
39 static StreamSink<T> fromFuture<T>(Future<StreamSink<T>> sinkFuture) {
40 var completer = new StreamSinkCompleter<T>();
41 sinkFuture.then(completer.setDestinationSink, onError: completer.setError);
42 return completer.sink;
43 }
44
45 /// Sets a sink as the destination for events from the [StreamSinkCompleter]'s
46 /// [sink].
47 ///
48 /// The completer's [sink] will act exactly as [destinationSink].
49 ///
50 /// If the destination sink is set before events are added to [sink], further
51 /// events are forwarded directly to [destinationSink].
52 ///
53 /// If events are added to [sink] before setting the destination sink, they're
54 /// buffered until the destination is available.
55 ///
56 /// A destination sink may be set at most once.
57 ///
58 /// Either of [setDestinationSink] or [setError] may be called at most once.
59 /// Trying to call either of them again will fail.
60 void setDestinationSink(StreamSink<T> destinationSink) {
61 if (_sink._destinationSink != null) {
62 throw new StateError("Destination sink already set");
63 }
64 _sink._setDestinationSink(destinationSink);
65 }
66
67 /// Completes this to a closed sink whose [Sink.done] future emits [error].
68 ///
69 /// This is useful when the process of loading the sink fails.
70 ///
71 /// Either of [setDestinationSink] or [setError] may be called at most once.
72 /// Trying to call either of them again will fail.
73 void setError(error, [StackTrace stackTrace]) {
74 setDestinationSink(new NullStreamSink.error(error, stackTrace));
75 }
76 }
77
78 /// [StreamSink] completed by [StreamSinkCompleter].
79 class _CompleterSink<T> implements StreamSink<T> {
80 /// Controller for an intermediate sink.
81 ///
82 /// Created if the user adds events to this sink before the destination sink
83 /// is set.
84 StreamController<T> _controller;
85
86 /// Completer for [done].
87 ///
88 /// Created if the user requests the [done] future before the destination sink
89 /// is set.
90 Completer _doneCompleter;
91
92 /// Destination sink for the events added to this sink.
93 ///
94 /// Set when [StreamSinkCompleter.setDestinationSink] is called.
95 StreamSink<T> _destinationSink;
96
97 /// Whether events should be sent directly to [_destinationSink], as opposed
98 /// to going through [_controller].
99 bool get _canSendDirectly => _controller == null && _destinationSink != null;
100
101 Future get done {
102 if (_doneCompleter != null) return _doneCompleter.future;
103 if (_destinationSink == null) {
104 _doneCompleter = new Completer.sync();
105 return _doneCompleter.future;
106 }
107 return _destinationSink.done;
108 }
109
110 void add(T event) {
111 if (_canSendDirectly) {
112 _destinationSink.add(event);
113 } else {
114 _ensureController();
115 _controller.add(event);
116 }
117 }
118
119 void addError(error, [StackTrace stackTrace]) {
120 if (_canSendDirectly) {
121 _destinationSink.addError(error, stackTrace);
122 } else {
123 _ensureController();
124 _controller.addError(error, stackTrace);
125 }
126 }
127
128 Future addStream(Stream<T> stream) {
129 if (_canSendDirectly) return _destinationSink.addStream(stream);
130
131 _ensureController();
132 return _controller.addStream(stream, cancelOnError: false);
133 }
134
135 Future close() {
136 if (_canSendDirectly) {
137 _destinationSink.close();
138 } else {
139 _ensureController();
140 _controller.close();
141 }
142 return done;
143 }
144
145 /// Create [_controller] if it doesn't yet exist.
146 void _ensureController() {
147 if (_controller == null) _controller = new StreamController(sync: true);
148 }
149
150 /// Sets the destination sink to which events from this sink will be provided.
151 ///
152 /// If set before the user adds events, events will be added directly to the
153 /// destination sink. If the user adds events earlier, an intermediate sink is
154 /// created using a stream controller, and the destination sink is linked to
155 /// it later.
156 void _setDestinationSink(StreamSink<T> sink) {
157 assert(_destinationSink == null);
158 _destinationSink = sink;
159
160 // If the user has already added data, it's buffered in the controller, so
161 // we add it to the sink.
162 if (_controller != null) {
163 // Catch any error that may come from [addStream] or [sink.close]. They'll
164 // be reported through [done] anyway.
165 sink
166 .addStream(_controller.stream)
167 .whenComplete(sink.close)
168 .catchError((_) {});
169 }
170
171 // If the user has already asked when the sink is done, connect the sink's
172 // done callback to that completer.
173 if (_doneCompleter != null) {
174 _doneCompleter.complete(sink.done);
175 }
176 }
177 }
OLDNEW
« no previous file with comments | « packages/async/lib/src/stream_queue.dart ('k') | packages/async/lib/src/stream_sink_transformer.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698