Index: lib/src/stream_channel_completer.dart |
diff --git a/lib/src/stream_channel_completer.dart b/lib/src/stream_channel_completer.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..bd72542ed239ed8f4f0430618be3cc04f1ac9bd9 |
--- /dev/null |
+++ b/lib/src/stream_channel_completer.dart |
@@ -0,0 +1,77 @@ |
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+import 'dart:async'; |
+ |
+import 'package:async/async.dart'; |
+ |
+import '../stream_channel.dart'; |
+ |
+/// A [channel] where the source and destination are provided later. |
+/// |
+/// The [channel] is a normal channel that can be listened to and that events |
+/// can be added to immediately, but until [setChannel] is called it won't emit |
+/// any events and all events added to it will be buffered. |
+class StreamChannelCompleter<T> { |
+ /// The completer for this channel's stream. |
+ final _streamCompleter = new StreamCompleter<T>(); |
+ |
+ /// The completer for this channel's sink. |
+ final _sinkCompleter = new StreamSinkCompleter<T>(); |
+ |
+ /// The channel for this completer. |
+ StreamChannel<T> get channel => _channel; |
+ StreamChannel<T> _channel; |
+ |
+ /// Whether [setChannel] has been called. |
+ bool _set = false; |
Bob Nystrom
2016/01/26 19:00:18
Instead of a separate field, can you just do:
_st
nweiz
2016/01/26 23:24:57
No, the point of a completer is that the object it
|
+ |
+ /// Convert a `Future<StreamChannel>` to a `StreamChannel`. |
+ /// |
+ /// This creates a channel using a channel completer, and sets the source |
+ /// channel to the result of the future when the future completes. |
+ /// |
+ /// If the future completes with an error, the returned channel's stream will |
+ /// instead contain just that error. The sink will silently discard all |
+ /// events. |
+ static StreamChannel fromFuture(Future<StreamChannel> channelFuture) { |
+ var completer = new StreamChannelCompleter(); |
+ channelFuture.then(completer.setChannel, onError: completer.setError); |
+ return completer.channel; |
+ } |
+ |
+ StreamChannelCompleter() { |
+ _channel = new StreamChannel<T>( |
+ _streamCompleter.stream, _sinkCompleter.sink); |
+ } |
+ |
+ /// Set a channel as the source and destination for [channel]. |
+ /// |
+ /// A channel may be set at most once. |
+ /// |
+ /// Either [setChannel] or [setError] may be called at most once. Trying to |
+ /// call any of them again will fail. |
Bob Nystrom
2016/01/26 19:00:18
"any" -> "either".
nweiz
2016/01/26 23:24:57
Done.
|
+ void setChannel(StreamChannel<T> channel) { |
+ if (_set) throw new StateError("The channel has already been set."); |
+ _set = true; |
+ |
+ _streamCompleter.setSourceStream(channel.stream); |
+ _sinkCompleter.setDestinationSink(channel.sink); |
+ } |
+ |
+ /// Indicates that there was an error connecting the channel. |
+ /// |
+ /// This makes the stream emit [error] and close. It makes the sink discard |
+ /// all its events. |
+ /// |
+ /// Either [setChannel] or [setError] may be called at most once. Trying to |
+ /// call any of them again will fail. |
Bob Nystrom
2016/01/26 19:00:18
Ditto.
nweiz
2016/01/26 23:24:57
Done.
|
+ void setError(error, [StackTrace stackTrace]) { |
+ if (_set) throw new StateError("The channel has already been set."); |
+ _set = true; |
+ |
+ _streamCompleter.setError(error, stackTrace); |
+ _sinkCompleter.setDestinationSink(new NullStreamSink()); |
+ } |
+} |