Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(499)

Side by Side Diff: lib/src/stream_sink_completer.dart

Issue 1616543002: Add a StreamSinkCompleter class. (Closed) Base URL: git@github.com:dart-lang/async.git@master
Patch Set: Code review changes Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « lib/async.dart ('k') | pubspec.yaml » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 library async.stream_sink_completer;
6
7 import 'dart:async';
8
9 /// A [sink] where the destination is provided later.
10 ///
11 /// The [sink] is a normal sink that you can add events to to immediately, but
12 /// until [setDestinationSink] is called, the events will be buffered.
13 ///
14 /// The same effect can be achieved by using a [StreamController] and adding it
15 /// to the sink using [Sink.addStream] when the destination sink is ready. This
16 /// class attempts to shortcut some of the overhead when possible. For example,
17 /// if the [sink] only has events added after the destination sink has been set,
18 /// those events are added directly to the sink.
19 class StreamSinkCompleter<T> {
20 /// The sink for this completer.
21 ///
22 /// When a destination sink is provided, events that have been passed to the
23 /// sink will be forwarded to the destination.
24 ///
25 /// Events can be added to the sink either before or after a destination sink
26 /// is set.
27 final StreamSink<T> sink = new _CompleterSink<T>();
28
29 /// Returns [sink] typed as a [_CompleterSink].
30 _CompleterSink<T> get _sink => sink;
31
32 /// Sets a sink as the destination for events from the [StreamSinkCompleter]'s
33 /// [sink].
34 ///
35 /// The completer's [sink] will act exactly as [destinationSink].
36 ///
37 /// If the destination sink is set before events are added to [sink], further
38 /// events are forwarded directly to [destinationSink].
39 ///
40 /// If events are added to [sink] before setting the destination sink, they're
41 /// buffered until the destination is available.
42 ///
43 /// A destination sink may be set at most once.
44 void setDestinationSink(StreamSink<T> destinationSink) {
45 if (_sink._destinationSink != null) {
46 throw new StateError("Destination sink already set");
47 }
48 _sink._setDestinationSink(destinationSink);
49 }
50 }
51
52 /// [StreamSink] completed by [StreamSinkCompleter].
53 class _CompleterSink<T> implements StreamSink<T> {
54 /// Controller for an intermediate sink.
55 ///
56 /// Created if the user adds events to this sink before the destination sink
57 /// is set.
58 StreamController<T> _controller;
59
60 /// Completer for [done].
61 ///
62 /// Created if the user requests the [done] future before the destination sink
63 /// is set.
64 Completer _doneCompleter;
65
66 /// Destination sink for the events added to this sink.
67 ///
68 /// Set when [StreamSinkCompleter.setDestinationSink] is called.
69 StreamSink<T> _destinationSink;
70
71 /// Whether events should be sent directly to [_destinationSink], as opposed
72 /// to going through [_controller].
73 bool get _canSendDirectly => _controller == null && _destinationSink != null;
74
75 Future get done {
76 if (_doneCompleter != null) return _doneCompleter.future;
77 if (_destinationSink == null) {
78 _doneCompleter = new Completer.sync();
79 return _doneCompleter.future;
80 }
81 return _destinationSink.done;
82 }
83
84 void add(T event) {
85 if (_canSendDirectly) {
86 _destinationSink.add(event);
87 } else {
88 _ensureController();
89 _controller.add(event);
90 }
91 }
92
93 void addError(error, [StackTrace stackTrace]) {
94 if (_canSendDirectly) {
95 _destinationSink.addError(error, stackTrace);
96 } else {
97 _ensureController();
98 _controller.addError(error, stackTrace);
99 }
100 }
101
102 Future addStream(Stream<T> stream) {
103 if (_canSendDirectly) return _destinationSink.addStream(stream);
104
105 _ensureController();
106 return _controller.addStream(stream, cancelOnError: false);
107 }
108
109 Future close() {
110 if (_canSendDirectly) {
111 _destinationSink.close();
112 } else {
113 _ensureController();
114 _controller.close();
115 }
116 return done;
117 }
118
119 /// Create [_controller] if it doesn't yet exist.
120 void _ensureController() {
121 if (_controller == null) _controller = new StreamController(sync: true);
122 }
123
124 /// Sets the destination sink to which events from this sink will be provided.
125 ///
126 /// If set before the user adds events, events will be added directly to the
127 /// destination sink. If the user adds events earlier, an intermediate sink is
128 /// created using a stream controller, and the destination sink is linked to
129 /// it later.
130 void _setDestinationSink(StreamSink<T> sink) {
131 assert(_destinationSink == null);
132 _destinationSink = sink;
133
134 // If the user has already added data, it's buffered in the controller, so
135 // we add it to the sink.
136 if (_controller != null) {
137 // Catch any error that may come from [addStream] or [sink.close]. They'll
138 // be reported through [done] anyway.
139 sink
140 .addStream(_controller.stream)
141 .whenComplete(sink.close)
142 .catchError((_) {});
143 }
144
145 // If the user has already asked when the sink is done, connect the sink's
146 // done callback to that completer.
147 if (_doneCompleter != null) {
148 _doneCompleter.complete(sink.done);
149 }
150 }
151 }
OLDNEW
« no previous file with comments | « lib/async.dart ('k') | pubspec.yaml » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698