Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(322)

Side by Side Diff: lib/src/stream_sink_completer.dart

Issue 1616543002: Add a StreamSinkCompleter class. (Closed) Base URL: git@github.com:dart-lang/async.git@master
Patch Set: add another test Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 library async.stream_sink_completer;
6
7 import 'dart:async';
8
9 /// A [sink] where the destination is provided later.
10 ///
11 /// The [sink] is a normal sink that you can add events to to immediately, but
12 /// until [setDestinationSink] is called, the events will be buffered.
13 ///
14 /// The same effect can be achieved by using a [StreamController] and adding it
15 /// to the sink using [Sink.addStream] when the destination sink is ready. This
16 /// class attempts to shortcut some of the overhead when possible. For example,
17 /// if the [sink] only has events added after the destination sink has been set,
18 /// those events are added directly to the sink.
19 class StreamSinkCompleter<T> {
Lasse Reichstein Nielsen 2016/01/21 07:28:47 I don't think I like "completer" for this particul
nweiz 2016/01/21 20:59:05 I really think consistency with parallel APIs is i
Lasse Reichstein Nielsen 2016/01/22 12:49:27 Acknowledged. It doesn't fit my mental model of w
20 /// The sink for this completer.
21 ///
22 /// When a destination sink is provided, events that have been passed to the
23 /// sink will be forwarded to the destination.
Lasse Reichstein Nielsen 2016/01/21 07:28:47 Do you want to promise how the events are forwarde
nweiz 2016/01/21 20:59:05 I don't think we can provide any stronger guarante
Lasse Reichstein Nielsen 2016/01/22 12:49:27 That's actually bad - leaving something unspecifie
24 ///
25 /// Events can be added to the sink either before or after a destination sink
26 /// is set.
27 StreamSink<T> get sink => _sink;
28 final _sink = new _CompleterSink<T>();
Lasse Reichstein Nielsen 2016/01/21 07:28:47 Just make it one field: final StreamSink<T> sink
nweiz 2016/01/21 20:59:05 Done.
29
30 /// Set a sink as the destination for events from the [StreamSinkCompleter]'s
Lasse Reichstein Nielsen 2016/01/21 07:28:47 Set -> Sets
nweiz 2016/01/21 20:59:05 Done.
31 /// [sink].
32 ///
33 /// The completer's [sink] will act exactly as [destinationSink].
34 ///
35 /// If the destination sink is set before events are added to [sink], further
36 /// events are forwarded directly to [destinationSink].
37 ///
38 /// If events are added to [sink] before setting the destination sink, they're
39 /// buffered until the destination is available.
40 ///
41 /// A destination sink may be set at most once.
42 void setDestinationSink(StreamSink<T> destinationSink) {
Lasse Reichstein Nielsen 2016/01/21 07:28:47 Make this a setter.
nweiz 2016/01/21 20:59:05 That's inconsistent with StreamCompleter (and Comp
Lasse Reichstein Nielsen 2016/01/22 12:49:27 Argument accepted. It is probably better as a met
43 if (_sink._destinationSink != null) {
44 throw new StateError("Destination sink already set");
45 }
46 _sink._setDestinationSink(destinationSink);
Lasse Reichstein Nielsen 2016/01/21 07:28:47 Can you find another name than this (in particular
nweiz 2016/01/21 20:59:05 This is consistent with StreamCompleter (which you
Lasse Reichstein Nielsen 2016/01/22 12:49:27 The problem is that "set" isn't wrong. It just loo
47 }
48 }
49
50 /// [StreamSink] completed by [StreamSinkCompleter].
51 class _CompleterSink<T> implements StreamSink<T> {
52 /// Controller for an intermediate sink.
53 ///
54 /// Created if the user adds events to this sink before the destination sink
55 /// is set.
56 StreamController<T> _controller;
57
58 /// Completer for [done].
59 ///
60 /// Created if the user requests the [done] future before the destination sink
61 /// is set.
62 Completer _doneCompleter;
63
64 /// Destination sink for the events added to this sink.
65 ///
66 /// Set when [StreamSinkCompleter.setDestinationSink] is called.
67 StreamSink<T> _destinationSink;
68
69 /// Whether events should be send directly to [_destinationSink], as opposed
70 /// to going through [_controller].
71 bool get _sendToSink => _controller == null && _destinationSink != null;
Lasse Reichstein Nielsen 2016/01/21 07:28:47 Name sounds imperative. Maybe "_hasDirectSink" or
nweiz 2016/01/21 20:59:05 Done.
72
73 Future get done {
74 if (_doneCompleter != null) return _doneCompleter.future;
75 if (_destinationSink == null) {
76 _doneCompleter = new Completer.sync();
77 return _doneCompleter.future;
78 }
79 return _destinationSink.done;
80 }
81
82 void add(T event) {
83 if (_sendToSink) {
84 _destinationSink.add(event);
85 } else {
86 _ensureController();
87 _controller.add(event);
88 }
89 }
90
91 void addError(error, [StackTrace stackTrace]) {
92 if (_sendToSink) {
93 _destinationSink.addError(error, stackTrace);
94 } else {
95 _ensureController();
96 _controller.addError(error, stackTrace);
97 }
98 }
99
100 Future addStream(Stream<T> stream) {
101 if (_sendToSink) return _destinationSink.addStream(stream);
102
103 _ensureController();
104 return _controller.addStream(stream, cancelOnError: false);
105 }
106
107 Future close() {
108 if (_sendToSink) {
109 _destinationSink.close();
110 } else {
111 _ensureController();
112 _controller.close();
113 }
114 return done;
115 }
116
117 /// Create [_controller] if it doesn't yet exist.
118 void _ensureController() {
119 if (_controller == null) _controller = new StreamController<T>(sync: true);
Lasse Reichstein Nielsen 2016/01/21 07:28:47 Persionally, I'd drop the type parameter on the co
nweiz 2016/01/21 20:59:05 Done.
120 }
121
122 /// Sets the destination sink to which events from this sink will be provided.
123 ///
124 /// If set before the user adds events, events will be added directly to the
125 /// destination sink. If the user adds events earlier, an intermediate sink is
126 /// created using a stream controller, and the destination sink is linked to
127 /// it later.
128 void _setDestinationSink(StreamSink<T> sink) {
129 assert(_destinationSink == null);
130 _destinationSink = sink;
131
132 // If the user has already added data, it's buffered in the controller, so
133 // we add it to the sink.
134 if (_controller != null) {
135 // Catch any error that may come from [addStream] or [sink.close]. They'll
136 // be reported through [done] anyway.
137 sink
138 .addStream(_controller.stream)
139 .whenComplete(sink.close)
140 .catchError((_) {});
141 }
142
143 // If the user has already asked when the sink is done, connect the sink's
144 // done callback to that completer.
145 if (_doneCompleter != null) {
146 _doneCompleter.complete(sink.done);
147 }
148 }
149 }
OLDNEW
« no previous file with comments | « lib/async.dart ('k') | pubspec.yaml » ('j') | test/stream_sink_completer_test.dart » ('J')

Powered by Google App Engine
This is Rietveld 408576698