Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(588)

Side by Side Diff: lib/src/stream_sink_completer.dart

Issue 1615253002: Add ClosedStreamSink, and *Completer.setError, and StreamSinkCompleter.fromFuture. (Closed) Base URL: git@github.com:dart-lang/async.git@master
Patch Set: Code review changes Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « lib/src/stream_completer.dart ('k') | pubspec.yaml » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 library async.stream_sink_completer; 5 library async.stream_sink_completer;
6 6
7 import 'dart:async'; 7 import 'dart:async';
8 8
9 import 'null_stream_sink.dart';
10
9 /// A [sink] where the destination is provided later. 11 /// A [sink] where the destination is provided later.
10 /// 12 ///
11 /// The [sink] is a normal sink that you can add events to to immediately, but 13 /// The [sink] is a normal sink that you can add events to to immediately, but
12 /// until [setDestinationSink] is called, the events will be buffered. 14 /// until [setDestinationSink] is called, the events will be buffered.
13 /// 15 ///
14 /// The same effect can be achieved by using a [StreamController] and adding it 16 /// The same effect can be achieved by using a [StreamController] and adding it
15 /// to the sink using [Sink.addStream] when the destination sink is ready. This 17 /// to the sink using [Sink.addStream] when the destination sink is ready. This
16 /// class attempts to shortcut some of the overhead when possible. For example, 18 /// class attempts to shortcut some of the overhead when possible. For example,
17 /// if the [sink] only has events added after the destination sink has been set, 19 /// if the [sink] only has events added after the destination sink has been set,
18 /// those events are added directly to the sink. 20 /// those events are added directly to the sink.
19 class StreamSinkCompleter<T> { 21 class StreamSinkCompleter<T> {
20 /// The sink for this completer. 22 /// The sink for this completer.
21 /// 23 ///
22 /// When a destination sink is provided, events that have been passed to the 24 /// When a destination sink is provided, events that have been passed to the
23 /// sink will be forwarded to the destination. 25 /// sink will be forwarded to the destination.
24 /// 26 ///
25 /// Events can be added to the sink either before or after a destination sink 27 /// Events can be added to the sink either before or after a destination sink
26 /// is set. 28 /// is set.
27 final StreamSink<T> sink = new _CompleterSink<T>(); 29 final StreamSink<T> sink = new _CompleterSink<T>();
28 30
29 /// Returns [sink] typed as a [_CompleterSink]. 31 /// Returns [sink] typed as a [_CompleterSink].
30 _CompleterSink<T> get _sink => sink; 32 _CompleterSink<T> get _sink => sink;
31 33
34 /// Convert a `Future<StreamSink>` to a `StreamSink`.
35 ///
36 /// This creates a sink using a sink completer, and sets the destination sink
37 /// to the result of the future when the future completes.
38 ///
39 /// If the future completes with an error, the returned sink will instead
40 /// be closed. Its [Sink.done] future will contain the error.
41 static StreamSink fromFuture(Future<StreamSink> sinkFuture) {
42 var completer = new StreamSinkCompleter();
43 sinkFuture.then(completer.setDestinationSink,
44 onError: completer.setError);
45 return completer.sink;
46 }
47
32 /// Sets a sink as the destination for events from the [StreamSinkCompleter]'s 48 /// Sets a sink as the destination for events from the [StreamSinkCompleter]'s
33 /// [sink]. 49 /// [sink].
34 /// 50 ///
35 /// The completer's [sink] will act exactly as [destinationSink]. 51 /// The completer's [sink] will act exactly as [destinationSink].
36 /// 52 ///
37 /// If the destination sink is set before events are added to [sink], further 53 /// If the destination sink is set before events are added to [sink], further
38 /// events are forwarded directly to [destinationSink]. 54 /// events are forwarded directly to [destinationSink].
39 /// 55 ///
40 /// If events are added to [sink] before setting the destination sink, they're 56 /// If events are added to [sink] before setting the destination sink, they're
41 /// buffered until the destination is available. 57 /// buffered until the destination is available.
42 /// 58 ///
43 /// A destination sink may be set at most once. 59 /// A destination sink may be set at most once.
60 ///
61 /// Either of [setDestinationSink] or [setError] may be called at most once.
62 /// Trying to call either of them again will fail.
44 void setDestinationSink(StreamSink<T> destinationSink) { 63 void setDestinationSink(StreamSink<T> destinationSink) {
45 if (_sink._destinationSink != null) { 64 if (_sink._destinationSink != null) {
46 throw new StateError("Destination sink already set"); 65 throw new StateError("Destination sink already set");
47 } 66 }
48 _sink._setDestinationSink(destinationSink); 67 _sink._setDestinationSink(destinationSink);
49 } 68 }
69
70 /// Completes this to a closed sink whose [Sink.done] future emits [error].
71 ///
72 /// This is useful when the process of loading the sink fails.
73 ///
74 /// Either of [setDestinationSink] or [setError] may be called at most once.
75 /// Trying to call either of them again will fail.
76 void setError(error, [StackTrace stackTrace]) {
77 setDestinationSink(new NullStreamSink.error(error, stackTrace));
78 }
50 } 79 }
51 80
52 /// [StreamSink] completed by [StreamSinkCompleter]. 81 /// [StreamSink] completed by [StreamSinkCompleter].
53 class _CompleterSink<T> implements StreamSink<T> { 82 class _CompleterSink<T> implements StreamSink<T> {
54 /// Controller for an intermediate sink. 83 /// Controller for an intermediate sink.
55 /// 84 ///
56 /// Created if the user adds events to this sink before the destination sink 85 /// Created if the user adds events to this sink before the destination sink
57 /// is set. 86 /// is set.
58 StreamController<T> _controller; 87 StreamController<T> _controller;
59 88
(...skipping 82 matching lines...) Expand 10 before | Expand all | Expand 10 after
142 .catchError((_) {}); 171 .catchError((_) {});
143 } 172 }
144 173
145 // If the user has already asked when the sink is done, connect the sink's 174 // If the user has already asked when the sink is done, connect the sink's
146 // done callback to that completer. 175 // done callback to that completer.
147 if (_doneCompleter != null) { 176 if (_doneCompleter != null) {
148 _doneCompleter.complete(sink.done); 177 _doneCompleter.complete(sink.done);
149 } 178 }
150 } 179 }
151 } 180 }
OLDNEW
« no previous file with comments | « lib/src/stream_completer.dart ('k') | pubspec.yaml » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698