Chromium Code Reviews| Index: lib/src/stream_sink_completer.dart |
| diff --git a/lib/src/stream_sink_completer.dart b/lib/src/stream_sink_completer.dart |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..f8ec290eb8514d5e6f06170e9e95a00e0b188767 |
| --- /dev/null |
| +++ b/lib/src/stream_sink_completer.dart |
| @@ -0,0 +1,149 @@ |
| +// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
| +// for details. All rights reserved. Use of this source code is governed by a |
| +// BSD-style license that can be found in the LICENSE file. |
| + |
| +library async.stream_sink_completer; |
| + |
| +import 'dart:async'; |
| + |
| +/// A [sink] where the destination is provided later. |
| +/// |
| +/// The [sink] is a normal sink that you can add events to to immediately, but |
| +/// until [setDestinationSink] is called, the events will be buffered. |
| +/// |
| +/// The same effect can be achieved by using a [StreamController] and adding it |
| +/// to the sink using [Sink.addStream] when the destination sink is ready. This |
| +/// class attempts to shortcut some of the overhead when possible. For example, |
| +/// if the [sink] only has events added after the destination sink has been set, |
| +/// those events are added directly to the sink. |
| +class StreamSinkCompleter<T> { |
|
Lasse Reichstein Nielsen
2016/01/21 07:28:47
I don't think I like "completer" for this particul
nweiz
2016/01/21 20:59:05
I really think consistency with parallel APIs is i
Lasse Reichstein Nielsen
2016/01/22 12:49:27
Acknowledged.
It doesn't fit my mental model of w
|
| + /// The sink for this completer. |
| + /// |
| + /// When a destination sink is provided, events that have been passed to the |
| + /// sink will be forwarded to the destination. |
|
Lasse Reichstein Nielsen
2016/01/21 07:28:47
Do you want to promise how the events are forwarde
nweiz
2016/01/21 20:59:05
I don't think we can provide any stronger guarante
Lasse Reichstein Nielsen
2016/01/22 12:49:27
That's actually bad - leaving something unspecifie
|
| + /// |
| + /// Events can be added to the sink either before or after a destination sink |
| + /// is set. |
| + StreamSink<T> get sink => _sink; |
| + final _sink = new _CompleterSink<T>(); |
|
Lasse Reichstein Nielsen
2016/01/21 07:28:47
Just make it one field:
final StreamSink<T> sink
nweiz
2016/01/21 20:59:05
Done.
|
| + |
| + /// Set a sink as the destination for events from the [StreamSinkCompleter]'s |
|
Lasse Reichstein Nielsen
2016/01/21 07:28:47
Set -> Sets
nweiz
2016/01/21 20:59:05
Done.
|
| + /// [sink]. |
| + /// |
| + /// The completer's [sink] will act exactly as [destinationSink]. |
| + /// |
| + /// If the destination sink is set before events are added to [sink], further |
| + /// events are forwarded directly to [destinationSink]. |
| + /// |
| + /// If events are added to [sink] before setting the destination sink, they're |
| + /// buffered until the destination is available. |
| + /// |
| + /// A destination sink may be set at most once. |
| + void setDestinationSink(StreamSink<T> destinationSink) { |
|
Lasse Reichstein Nielsen
2016/01/21 07:28:47
Make this a setter.
nweiz
2016/01/21 20:59:05
That's inconsistent with StreamCompleter (and Comp
Lasse Reichstein Nielsen
2016/01/22 12:49:27
Argument accepted.
It is probably better as a met
|
| + if (_sink._destinationSink != null) { |
| + throw new StateError("Destination sink already set"); |
| + } |
| + _sink._setDestinationSink(destinationSink); |
|
Lasse Reichstein Nielsen
2016/01/21 07:28:47
Can you find another name than this (in particular
nweiz
2016/01/21 20:59:05
This is consistent with StreamCompleter (which you
Lasse Reichstein Nielsen
2016/01/22 12:49:27
The problem is that "set" isn't wrong. It just loo
|
| + } |
| +} |
| + |
| +/// [StreamSink] completed by [StreamSinkCompleter]. |
| +class _CompleterSink<T> implements StreamSink<T> { |
| + /// Controller for an intermediate sink. |
| + /// |
| + /// Created if the user adds events to this sink before the destination sink |
| + /// is set. |
| + StreamController<T> _controller; |
| + |
| + /// Completer for [done]. |
| + /// |
| + /// Created if the user requests the [done] future before the destination sink |
| + /// is set. |
| + Completer _doneCompleter; |
| + |
| + /// Destination sink for the events added to this sink. |
| + /// |
| + /// Set when [StreamSinkCompleter.setDestinationSink] is called. |
| + StreamSink<T> _destinationSink; |
| + |
| + /// Whether events should be send directly to [_destinationSink], as opposed |
| + /// to going through [_controller]. |
| + bool get _sendToSink => _controller == null && _destinationSink != null; |
|
Lasse Reichstein Nielsen
2016/01/21 07:28:47
Name sounds imperative. Maybe "_hasDirectSink" or
nweiz
2016/01/21 20:59:05
Done.
|
| + |
| + Future get done { |
| + if (_doneCompleter != null) return _doneCompleter.future; |
| + if (_destinationSink == null) { |
| + _doneCompleter = new Completer.sync(); |
| + return _doneCompleter.future; |
| + } |
| + return _destinationSink.done; |
| + } |
| + |
| + void add(T event) { |
| + if (_sendToSink) { |
| + _destinationSink.add(event); |
| + } else { |
| + _ensureController(); |
| + _controller.add(event); |
| + } |
| + } |
| + |
| + void addError(error, [StackTrace stackTrace]) { |
| + if (_sendToSink) { |
| + _destinationSink.addError(error, stackTrace); |
| + } else { |
| + _ensureController(); |
| + _controller.addError(error, stackTrace); |
| + } |
| + } |
| + |
| + Future addStream(Stream<T> stream) { |
| + if (_sendToSink) return _destinationSink.addStream(stream); |
| + |
| + _ensureController(); |
| + return _controller.addStream(stream, cancelOnError: false); |
| + } |
| + |
| + Future close() { |
| + if (_sendToSink) { |
| + _destinationSink.close(); |
| + } else { |
| + _ensureController(); |
| + _controller.close(); |
| + } |
| + return done; |
| + } |
| + |
| + /// Create [_controller] if it doesn't yet exist. |
| + void _ensureController() { |
| + if (_controller == null) _controller = new StreamController<T>(sync: true); |
|
Lasse Reichstein Nielsen
2016/01/21 07:28:47
Persionally, I'd drop the type parameter on the co
nweiz
2016/01/21 20:59:05
Done.
|
| + } |
| + |
| + /// Sets the destination sink to which events from this sink will be provided. |
| + /// |
| + /// If set before the user adds events, events will be added directly to the |
| + /// destination sink. If the user adds events earlier, an intermediate sink is |
| + /// created using a stream controller, and the destination sink is linked to |
| + /// it later. |
| + void _setDestinationSink(StreamSink<T> sink) { |
| + assert(_destinationSink == null); |
| + _destinationSink = sink; |
| + |
| + // If the user has already added data, it's buffered in the controller, so |
| + // we add it to the sink. |
| + if (_controller != null) { |
| + // Catch any error that may come from [addStream] or [sink.close]. They'll |
| + // be reported through [done] anyway. |
| + sink |
| + .addStream(_controller.stream) |
| + .whenComplete(sink.close) |
| + .catchError((_) {}); |
| + } |
| + |
| + // If the user has already asked when the sink is done, connect the sink's |
| + // done callback to that completer. |
| + if (_doneCompleter != null) { |
| + _doneCompleter.complete(sink.done); |
| + } |
| + } |
| +} |