Chromium Code Reviews| Index: lib/src/stream_channel_completer.dart |
| diff --git a/lib/src/stream_channel_completer.dart b/lib/src/stream_channel_completer.dart |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..bd72542ed239ed8f4f0430618be3cc04f1ac9bd9 |
| --- /dev/null |
| +++ b/lib/src/stream_channel_completer.dart |
| @@ -0,0 +1,77 @@ |
| +// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
| +// for details. All rights reserved. Use of this source code is governed by a |
| +// BSD-style license that can be found in the LICENSE file. |
| + |
| +import 'dart:async'; |
| + |
| +import 'package:async/async.dart'; |
| + |
| +import '../stream_channel.dart'; |
| + |
| +/// A [channel] where the source and destination are provided later. |
| +/// |
| +/// The [channel] is a normal channel that can be listened to and that events |
| +/// can be added to immediately, but until [setChannel] is called it won't emit |
| +/// any events and all events added to it will be buffered. |
| +class StreamChannelCompleter<T> { |
| + /// The completer for this channel's stream. |
| + final _streamCompleter = new StreamCompleter<T>(); |
| + |
| + /// The completer for this channel's sink. |
| + final _sinkCompleter = new StreamSinkCompleter<T>(); |
| + |
| + /// The channel for this completer. |
| + StreamChannel<T> get channel => _channel; |
| + StreamChannel<T> _channel; |
| + |
| + /// Whether [setChannel] has been called. |
| + bool _set = false; |
|
Bob Nystrom
2016/01/26 19:00:18
Instead of a separate field, can you just do:
_st
nweiz
2016/01/26 23:24:57
No, the point of a completer is that the object it
|
| + |
| + /// Convert a `Future<StreamChannel>` to a `StreamChannel`. |
| + /// |
| + /// This creates a channel using a channel completer, and sets the source |
| + /// channel to the result of the future when the future completes. |
| + /// |
| + /// If the future completes with an error, the returned channel's stream will |
| + /// instead contain just that error. The sink will silently discard all |
| + /// events. |
| + static StreamChannel fromFuture(Future<StreamChannel> channelFuture) { |
| + var completer = new StreamChannelCompleter(); |
| + channelFuture.then(completer.setChannel, onError: completer.setError); |
| + return completer.channel; |
| + } |
| + |
| + StreamChannelCompleter() { |
| + _channel = new StreamChannel<T>( |
| + _streamCompleter.stream, _sinkCompleter.sink); |
| + } |
| + |
| + /// Set a channel as the source and destination for [channel]. |
| + /// |
| + /// A channel may be set at most once. |
| + /// |
| + /// Either [setChannel] or [setError] may be called at most once. Trying to |
| + /// call any of them again will fail. |
|
Bob Nystrom
2016/01/26 19:00:18
"any" -> "either".
nweiz
2016/01/26 23:24:57
Done.
|
| + void setChannel(StreamChannel<T> channel) { |
| + if (_set) throw new StateError("The channel has already been set."); |
| + _set = true; |
| + |
| + _streamCompleter.setSourceStream(channel.stream); |
| + _sinkCompleter.setDestinationSink(channel.sink); |
| + } |
| + |
| + /// Indicates that there was an error connecting the channel. |
| + /// |
| + /// This makes the stream emit [error] and close. It makes the sink discard |
| + /// all its events. |
| + /// |
| + /// Either [setChannel] or [setError] may be called at most once. Trying to |
| + /// call any of them again will fail. |
|
Bob Nystrom
2016/01/26 19:00:18
Ditto.
nweiz
2016/01/26 23:24:57
Done.
|
| + void setError(error, [StackTrace stackTrace]) { |
| + if (_set) throw new StateError("The channel has already been set."); |
| + _set = true; |
| + |
| + _streamCompleter.setError(error, stackTrace); |
| + _sinkCompleter.setDestinationSink(new NullStreamSink()); |
| + } |
| +} |