Index: lib/src/stream_channel_completer.dart |
diff --git a/lib/src/stream_channel_completer.dart b/lib/src/stream_channel_completer.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..d15adcf94b43d95d47eacd50224caf6144515217 |
--- /dev/null |
+++ b/lib/src/stream_channel_completer.dart |
@@ -0,0 +1,77 @@ |
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+import 'dart:async'; |
+ |
+import 'package:async/async.dart'; |
+ |
+import '../stream_channel.dart'; |
+ |
+/// A [channel] where the source and destination are provided later. |
+/// |
+/// The [channel] is a normal channel that can be listened to and that events |
+/// can be added to immediately, but until [setChannel] is called it won't emit |
+/// any events and all events added to it will be buffered. |
+class StreamChannelCompleter<T> { |
+ /// The completer for this channel's stream. |
+ final _streamCompleter = new StreamCompleter<T>(); |
+ |
+ /// The completer for this channel's sink. |
+ final _sinkCompleter = new StreamSinkCompleter<T>(); |
+ |
+ /// The channel for this completer. |
+ StreamChannel<T> get channel => _channel; |
+ StreamChannel<T> _channel; |
+ |
+ /// Whether [setChannel] has been called. |
+ bool _set = false; |
+ |
+ /// Convert a `Future<StreamChannel>` to a `StreamChannel`. |
+ /// |
+ /// This creates a channel using a channel completer, and sets the source |
+ /// channel to the result of the future when the future completes. |
+ /// |
+ /// If the future completes with an error, the returned channel's stream will |
+ /// instead contain just that error. The sink will silently discard all |
+ /// events. |
+ static StreamChannel fromFuture(Future<StreamChannel> channelFuture) { |
+ var completer = new StreamChannelCompleter(); |
+ channelFuture.then(completer.setChannel, onError: completer.setError); |
+ return completer.channel; |
+ } |
+ |
+ StreamChannelCompleter() { |
+ _channel = new StreamChannel<T>( |
+ _streamCompleter.stream, _sinkCompleter.sink); |
+ } |
+ |
+ /// Set a channel as the source and destination for [channel]. |
+ /// |
+ /// A channel may be set at most once. |
+ /// |
+ /// Either [setChannel] or [setError] may be called at most once. Trying to |
+ /// call either of them again will fail. |
+ void setChannel(StreamChannel<T> channel) { |
+ if (_set) throw new StateError("The channel has already been set."); |
+ _set = true; |
+ |
+ _streamCompleter.setSourceStream(channel.stream); |
+ _sinkCompleter.setDestinationSink(channel.sink); |
+ } |
+ |
+ /// Indicates that there was an error connecting the channel. |
+ /// |
+ /// This makes the stream emit [error] and close. It makes the sink discard |
+ /// all its events. |
+ /// |
+ /// Either [setChannel] or [setError] may be called at most once. Trying to |
+ /// call either of them again will fail. |
+ void setError(error, [StackTrace stackTrace]) { |
+ if (_set) throw new StateError("The channel has already been set."); |
+ _set = true; |
+ |
+ _streamCompleter.setError(error, stackTrace); |
+ _sinkCompleter.setDestinationSink(new NullStreamSink()); |
+ } |
+} |