OLD | NEW |
---|---|
(Empty) | |
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 library async.stream_sink_completer; | |
6 | |
7 import 'dart:async'; | |
8 | |
9 /// A [sink] where the destination is provided later. | |
10 /// | |
11 /// The [sink] is a normal sink that you can add events to to immediately, but | |
12 /// until [setDestinationSink] is called, the events will be buffered. | |
13 /// | |
14 /// The same effect can be achieved by using a [StreamController] and adding it | |
15 /// to the sink using [Sink.addStream] when the destination sink is ready. This | |
16 /// class attempts to shortcut some of the overhead when possible. For example, | |
17 /// if the [sink] only has events added after the destination sink has been set, | |
18 /// those events are added directly to the sink. | |
19 class StreamSinkCompleter<T> { | |
Lasse Reichstein Nielsen
2016/01/21 07:28:47
I don't think I like "completer" for this particul
nweiz
2016/01/21 20:59:05
I really think consistency with parallel APIs is i
Lasse Reichstein Nielsen
2016/01/22 12:49:27
Acknowledged.
It doesn't fit my mental model of w
| |
20 /// The sink for this completer. | |
21 /// | |
22 /// When a destination sink is provided, events that have been passed to the | |
23 /// sink will be forwarded to the destination. | |
Lasse Reichstein Nielsen
2016/01/21 07:28:47
Do you want to promise how the events are forwarde
nweiz
2016/01/21 20:59:05
I don't think we can provide any stronger guarante
Lasse Reichstein Nielsen
2016/01/22 12:49:27
That's actually bad - leaving something unspecifie
| |
24 /// | |
25 /// Events can be added to the sink either before or after a destination sink | |
26 /// is set. | |
27 StreamSink<T> get sink => _sink; | |
28 final _sink = new _CompleterSink<T>(); | |
Lasse Reichstein Nielsen
2016/01/21 07:28:47
Just make it one field:
final StreamSink<T> sink
nweiz
2016/01/21 20:59:05
Done.
| |
29 | |
30 /// Set a sink as the destination for events from the [StreamSinkCompleter]'s | |
Lasse Reichstein Nielsen
2016/01/21 07:28:47
Set -> Sets
nweiz
2016/01/21 20:59:05
Done.
| |
31 /// [sink]. | |
32 /// | |
33 /// The completer's [sink] will act exactly as [destinationSink]. | |
34 /// | |
35 /// If the destination sink is set before events are added to [sink], further | |
36 /// events are forwarded directly to [destinationSink]. | |
37 /// | |
38 /// If events are added to [sink] before setting the destination sink, they're | |
39 /// buffered until the destination is available. | |
40 /// | |
41 /// A destination sink may be set at most once. | |
42 void setDestinationSink(StreamSink<T> destinationSink) { | |
Lasse Reichstein Nielsen
2016/01/21 07:28:47
Make this a setter.
nweiz
2016/01/21 20:59:05
That's inconsistent with StreamCompleter (and Comp
Lasse Reichstein Nielsen
2016/01/22 12:49:27
Argument accepted.
It is probably better as a met
| |
43 if (_sink._destinationSink != null) { | |
44 throw new StateError("Destination sink already set"); | |
45 } | |
46 _sink._setDestinationSink(destinationSink); | |
Lasse Reichstein Nielsen
2016/01/21 07:28:47
Can you find another name than this (in particular
nweiz
2016/01/21 20:59:05
This is consistent with StreamCompleter (which you
Lasse Reichstein Nielsen
2016/01/22 12:49:27
The problem is that "set" isn't wrong. It just loo
| |
47 } | |
48 } | |
49 | |
50 /// [StreamSink] completed by [StreamSinkCompleter]. | |
51 class _CompleterSink<T> implements StreamSink<T> { | |
52 /// Controller for an intermediate sink. | |
53 /// | |
54 /// Created if the user adds events to this sink before the destination sink | |
55 /// is set. | |
56 StreamController<T> _controller; | |
57 | |
58 /// Completer for [done]. | |
59 /// | |
60 /// Created if the user requests the [done] future before the destination sink | |
61 /// is set. | |
62 Completer _doneCompleter; | |
63 | |
64 /// Destination sink for the events added to this sink. | |
65 /// | |
66 /// Set when [StreamSinkCompleter.setDestinationSink] is called. | |
67 StreamSink<T> _destinationSink; | |
68 | |
69 /// Whether events should be send directly to [_destinationSink], as opposed | |
70 /// to going through [_controller]. | |
71 bool get _sendToSink => _controller == null && _destinationSink != null; | |
Lasse Reichstein Nielsen
2016/01/21 07:28:47
Name sounds imperative. Maybe "_hasDirectSink" or
nweiz
2016/01/21 20:59:05
Done.
| |
72 | |
73 Future get done { | |
74 if (_doneCompleter != null) return _doneCompleter.future; | |
75 if (_destinationSink == null) { | |
76 _doneCompleter = new Completer.sync(); | |
77 return _doneCompleter.future; | |
78 } | |
79 return _destinationSink.done; | |
80 } | |
81 | |
82 void add(T event) { | |
83 if (_sendToSink) { | |
84 _destinationSink.add(event); | |
85 } else { | |
86 _ensureController(); | |
87 _controller.add(event); | |
88 } | |
89 } | |
90 | |
91 void addError(error, [StackTrace stackTrace]) { | |
92 if (_sendToSink) { | |
93 _destinationSink.addError(error, stackTrace); | |
94 } else { | |
95 _ensureController(); | |
96 _controller.addError(error, stackTrace); | |
97 } | |
98 } | |
99 | |
100 Future addStream(Stream<T> stream) { | |
101 if (_sendToSink) return _destinationSink.addStream(stream); | |
102 | |
103 _ensureController(); | |
104 return _controller.addStream(stream, cancelOnError: false); | |
105 } | |
106 | |
107 Future close() { | |
108 if (_sendToSink) { | |
109 _destinationSink.close(); | |
110 } else { | |
111 _ensureController(); | |
112 _controller.close(); | |
113 } | |
114 return done; | |
115 } | |
116 | |
117 /// Create [_controller] if it doesn't yet exist. | |
118 void _ensureController() { | |
119 if (_controller == null) _controller = new StreamController<T>(sync: true); | |
Lasse Reichstein Nielsen
2016/01/21 07:28:47
Persionally, I'd drop the type parameter on the co
nweiz
2016/01/21 20:59:05
Done.
| |
120 } | |
121 | |
122 /// Sets the destination sink to which events from this sink will be provided. | |
123 /// | |
124 /// If set before the user adds events, events will be added directly to the | |
125 /// destination sink. If the user adds events earlier, an intermediate sink is | |
126 /// created using a stream controller, and the destination sink is linked to | |
127 /// it later. | |
128 void _setDestinationSink(StreamSink<T> sink) { | |
129 assert(_destinationSink == null); | |
130 _destinationSink = sink; | |
131 | |
132 // If the user has already added data, it's buffered in the controller, so | |
133 // we add it to the sink. | |
134 if (_controller != null) { | |
135 // Catch any error that may come from [addStream] or [sink.close]. They'll | |
136 // be reported through [done] anyway. | |
137 sink | |
138 .addStream(_controller.stream) | |
139 .whenComplete(sink.close) | |
140 .catchError((_) {}); | |
141 } | |
142 | |
143 // If the user has already asked when the sink is done, connect the sink's | |
144 // done callback to that completer. | |
145 if (_doneCompleter != null) { | |
146 _doneCompleter.complete(sink.done); | |
147 } | |
148 } | |
149 } | |
OLD | NEW |