| Index: lib/src/stream_channel_completer.dart
|
| diff --git a/lib/src/stream_channel_completer.dart b/lib/src/stream_channel_completer.dart
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..d15adcf94b43d95d47eacd50224caf6144515217
|
| --- /dev/null
|
| +++ b/lib/src/stream_channel_completer.dart
|
| @@ -0,0 +1,77 @@
|
| +// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
|
| +// for details. All rights reserved. Use of this source code is governed by a
|
| +// BSD-style license that can be found in the LICENSE file.
|
| +
|
| +import 'dart:async';
|
| +
|
| +import 'package:async/async.dart';
|
| +
|
| +import '../stream_channel.dart';
|
| +
|
| +/// A [channel] where the source and destination are provided later.
|
| +///
|
| +/// The [channel] is a normal channel that can be listened to and that events
|
| +/// can be added to immediately, but until [setChannel] is called it won't emit
|
| +/// any events and all events added to it will be buffered.
|
| +class StreamChannelCompleter<T> {
|
| + /// The completer for this channel's stream.
|
| + final _streamCompleter = new StreamCompleter<T>();
|
| +
|
| + /// The completer for this channel's sink.
|
| + final _sinkCompleter = new StreamSinkCompleter<T>();
|
| +
|
| + /// The channel for this completer.
|
| + StreamChannel<T> get channel => _channel;
|
| + StreamChannel<T> _channel;
|
| +
|
| + /// Whether [setChannel] has been called.
|
| + bool _set = false;
|
| +
|
| + /// Convert a `Future<StreamChannel>` to a `StreamChannel`.
|
| + ///
|
| + /// This creates a channel using a channel completer, and sets the source
|
| + /// channel to the result of the future when the future completes.
|
| + ///
|
| + /// If the future completes with an error, the returned channel's stream will
|
| + /// instead contain just that error. The sink will silently discard all
|
| + /// events.
|
| + static StreamChannel fromFuture(Future<StreamChannel> channelFuture) {
|
| + var completer = new StreamChannelCompleter();
|
| + channelFuture.then(completer.setChannel, onError: completer.setError);
|
| + return completer.channel;
|
| + }
|
| +
|
| + StreamChannelCompleter() {
|
| + _channel = new StreamChannel<T>(
|
| + _streamCompleter.stream, _sinkCompleter.sink);
|
| + }
|
| +
|
| + /// Set a channel as the source and destination for [channel].
|
| + ///
|
| + /// A channel may be set at most once.
|
| + ///
|
| + /// Either [setChannel] or [setError] may be called at most once. Trying to
|
| + /// call either of them again will fail.
|
| + void setChannel(StreamChannel<T> channel) {
|
| + if (_set) throw new StateError("The channel has already been set.");
|
| + _set = true;
|
| +
|
| + _streamCompleter.setSourceStream(channel.stream);
|
| + _sinkCompleter.setDestinationSink(channel.sink);
|
| + }
|
| +
|
| + /// Indicates that there was an error connecting the channel.
|
| + ///
|
| + /// This makes the stream emit [error] and close. It makes the sink discard
|
| + /// all its events.
|
| + ///
|
| + /// Either [setChannel] or [setError] may be called at most once. Trying to
|
| + /// call either of them again will fail.
|
| + void setError(error, [StackTrace stackTrace]) {
|
| + if (_set) throw new StateError("The channel has already been set.");
|
| + _set = true;
|
| +
|
| + _streamCompleter.setError(error, stackTrace);
|
| + _sinkCompleter.setDestinationSink(new NullStreamSink());
|
| + }
|
| +}
|
|
|