OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 import 'dart:async'; |
| 6 |
| 7 import 'package:async/async.dart'; |
| 8 |
| 9 import '../stream_channel.dart'; |
| 10 |
| 11 /// A [StreamChannel] that specifically enforces the stream channel guarantee |
| 12 /// that closing the sink causes the stream to close before it emits any more |
| 13 /// events |
| 14 /// |
| 15 /// This is exposed via [new StreamChannel.withCloseGuarantee]. |
| 16 class CloseGuaranteeChannel<T> extends StreamChannelMixin<T> { |
| 17 Stream<T> get stream => _stream; |
| 18 _CloseGuaranteeStream<T> _stream; |
| 19 |
| 20 StreamSink<T> get sink => _sink; |
| 21 _CloseGuaranteeSink<T> _sink; |
| 22 |
| 23 /// The subscription to the inner stream. |
| 24 StreamSubscription<T> _subscription; |
| 25 |
| 26 /// Whether the sink has closed, causing the underlying channel to disconnect. |
| 27 bool _disconnected = false; |
| 28 |
| 29 CloseGuaranteeChannel(Stream<T> innerStream, StreamSink<T> innerSink) { |
| 30 _sink = new _CloseGuaranteeSink<T>(innerSink, this); |
| 31 _stream = new _CloseGuaranteeStream<T>(innerStream, this); |
| 32 } |
| 33 } |
| 34 |
| 35 /// The stream for [CloseGuaranteeChannel]. |
| 36 /// |
| 37 /// This wraps the inner stream to save the subscription on the channel when |
| 38 /// [listen] is called. |
| 39 class _CloseGuaranteeStream<T> extends Stream<T> { |
| 40 /// The inner stream this is delegating to. |
| 41 final Stream<T> _inner; |
| 42 |
| 43 /// The [CloseGuaranteeChannel] this belongs to. |
| 44 final CloseGuaranteeChannel<T> _channel; |
| 45 |
| 46 _CloseGuaranteeStream(this._inner, this._channel); |
| 47 |
| 48 StreamSubscription<T> listen(void onData(T event), |
| 49 {Function onError, void onDone(), bool cancelOnError}) { |
| 50 // If the channel is already disconnected, we shouldn't dispatch anything |
| 51 // but a done event. |
| 52 if (_channel._disconnected) { |
| 53 onData = null; |
| 54 onError = null; |
| 55 } |
| 56 |
| 57 var subscription = _inner.listen(onData, |
| 58 onError: onError, onDone: onDone, cancelOnError: cancelOnError); |
| 59 if (!_channel._disconnected) { |
| 60 _channel._subscription = subscription; |
| 61 } |
| 62 return subscription; |
| 63 } |
| 64 } |
| 65 |
| 66 /// The sink for [CloseGuaranteeChannel]. |
| 67 /// |
| 68 /// This wraps the inner sink to cancel the stream subscription when the sink is |
| 69 /// canceled. |
| 70 class _CloseGuaranteeSink<T> extends DelegatingStreamSink<T> { |
| 71 /// The [CloseGuaranteeChannel] this belongs to. |
| 72 final CloseGuaranteeChannel<T> _channel; |
| 73 |
| 74 _CloseGuaranteeSink(StreamSink<T> inner, this._channel) : super(inner); |
| 75 |
| 76 Future close() { |
| 77 var done = super.close(); |
| 78 _channel._disconnected = true; |
| 79 if (_channel._subscription != null) { |
| 80 // Don't dispatch anything but a done event. |
| 81 _channel._subscription.onData(null); |
| 82 _channel._subscription.onError(null); |
| 83 } |
| 84 return done; |
| 85 } |
| 86 } |
OLD | NEW |