Chromium Code Reviews| Index: lib/src/close_guarantee_channel.dart |
| diff --git a/lib/src/close_guarantee_channel.dart b/lib/src/close_guarantee_channel.dart |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..f71ac5154dad0c77722df3bb15f3bf106c177db8 |
| --- /dev/null |
| +++ b/lib/src/close_guarantee_channel.dart |
| @@ -0,0 +1,87 @@ |
| +// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
| +// for details. All rights reserved. Use of this source code is governed by a |
| +// BSD-style license that can be found in the LICENSE file. |
| + |
| +import 'dart:async'; |
| + |
| +import 'package:async/async.dart'; |
| + |
| +import '../stream_channel.dart'; |
| + |
| +/// A [StreamChannel] that specifically enforces the second stream channel |
| +/// guarantee. |
|
Bob Nystrom
2016/06/06 22:54:53
To someone reading this doc cold, it's not clear w
nweiz
2016/06/06 23:25:30
Done.
|
| +/// |
| +/// This is exposed via [new StreamChannel.withCloseGuarantee]. |
| +class CloseGuaranteeChannel<T> extends StreamChannelMixin<T> { |
| + Stream<T> get stream => _stream; |
| + _CloseGuaranteeStream<T> _stream; |
| + |
| + StreamSink<T> get sink => _sink; |
| + _CloseGuaranteeSink<T> _sink; |
| + |
| + /// The subscription to the inner stream. |
| + StreamSubscription<T> _subscription; |
| + |
| + /// Whether the sink has closed, causing the underlying channel to disconnect. |
| + bool _disconnected = false; |
| + |
| + CloseGuaranteeChannel(Stream<T> innerStream, StreamSink<T> innerSink) { |
| + _sink = new _CloseGuaranteeSink<T>(innerSink, this); |
| + _stream = new _CloseGuaranteeStream<T>(innerStream, this); |
| + } |
| +} |
| + |
| +/// The stream for [CloseGuaranteeChannel]. |
| +/// |
| +/// This wraps the inner stream to save the subscription on the channel when |
| +/// [listen] is called. |
| +class _CloseGuaranteeStream<T> extends Stream<T> { |
| + /// The inner stream this is delegating to. |
| + final Stream<T> _inner; |
| + |
| + /// The [CloseGuaranteeChannel] this belongs to. |
| + final CloseGuaranteeChannel<T> _channel; |
| + |
| + _CloseGuaranteeStream(this._inner, this._channel); |
| + |
| + StreamSubscription<T> listen(void onData(T event), |
| + {Function onError, |
| + void onDone(), |
| + bool cancelOnError}) { |
|
Bob Nystrom
2016/06/06 22:54:53
Run dartfmt?
nweiz
2016/06/06 23:25:30
Done.
|
| + // If the channel is already disconnected, we shouldn't dispatch anything |
| + // but a done event. |
| + if (_channel._disconnected) { |
| + onData = null; |
| + onError = null; |
| + } |
| + |
| + var subscription = _inner.listen(onData, |
| + onError: onError, onDone: onDone, cancelOnError: cancelOnError); |
| + if (!_channel._disconnected) { |
| + _channel._subscription = subscription; |
| + } |
| + return subscription; |
| + } |
| +} |
| + |
| +/// The sink for [CloseGuaranteeChannel]. |
| +/// |
| +/// This wraps the inner sink to cancel the stream subscription when the sink is |
| +/// canceled. |
| +class _CloseGuaranteeSink<T> extends DelegatingStreamSink<T> { |
| + /// The [CloseGuaranteeChannel] this belongs to. |
| + final CloseGuaranteeChannel<T> _channel; |
| + |
| + _CloseGuaranteeSink(StreamSink<T> inner, this._channel) : super(inner); |
| + |
| + Future close() { |
| + var done = super.close(); |
| + _channel._disconnected = true; |
| + if (_channel._subscription != null) { |
| + // Don't dispatch anything but a done event. |
| + _channel._subscription.onData(null); |
| + _channel._subscription.onError(null); |
| + } |
| + return done; |
| + } |
| +} |