| OLD | NEW | 
|---|
| (Empty) |  | 
|  | 1 // Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file | 
|  | 2 // for details. All rights reserved. Use of this source code is governed by a | 
|  | 3 // BSD-style license that can be found in the LICENSE file. | 
|  | 4 | 
|  | 5 import 'dart:async'; | 
|  | 6 | 
|  | 7 import 'package:async/async.dart'; | 
|  | 8 | 
|  | 9 import '../stream_channel.dart'; | 
|  | 10 | 
|  | 11 /// A [StreamChannel] that specifically enforces the stream channel guarantee | 
|  | 12 /// that closing the sink causes the stream to close before it emits any more | 
|  | 13 /// events | 
|  | 14 /// | 
|  | 15 /// This is exposed via [new StreamChannel.withCloseGuarantee]. | 
|  | 16 class CloseGuaranteeChannel<T> extends StreamChannelMixin<T> { | 
|  | 17   Stream<T> get stream => _stream; | 
|  | 18   _CloseGuaranteeStream<T> _stream; | 
|  | 19 | 
|  | 20   StreamSink<T> get sink => _sink; | 
|  | 21   _CloseGuaranteeSink<T> _sink; | 
|  | 22 | 
|  | 23   /// The subscription to the inner stream. | 
|  | 24   StreamSubscription<T> _subscription; | 
|  | 25 | 
|  | 26   /// Whether the sink has closed, causing the underlying channel to disconnect. | 
|  | 27   bool _disconnected = false; | 
|  | 28 | 
|  | 29   CloseGuaranteeChannel(Stream<T> innerStream, StreamSink<T> innerSink) { | 
|  | 30     _sink = new _CloseGuaranteeSink<T>(innerSink, this); | 
|  | 31     _stream = new _CloseGuaranteeStream<T>(innerStream, this); | 
|  | 32   } | 
|  | 33 } | 
|  | 34 | 
|  | 35 /// The stream for [CloseGuaranteeChannel]. | 
|  | 36 /// | 
|  | 37 /// This wraps the inner stream to save the subscription on the channel when | 
|  | 38 /// [listen] is called. | 
|  | 39 class _CloseGuaranteeStream<T> extends Stream<T> { | 
|  | 40   /// The inner stream this is delegating to. | 
|  | 41   final Stream<T> _inner; | 
|  | 42 | 
|  | 43   /// The [CloseGuaranteeChannel] this belongs to. | 
|  | 44   final CloseGuaranteeChannel<T> _channel; | 
|  | 45 | 
|  | 46   _CloseGuaranteeStream(this._inner, this._channel); | 
|  | 47 | 
|  | 48   StreamSubscription<T> listen(void onData(T event), | 
|  | 49       {Function onError, void onDone(), bool cancelOnError}) { | 
|  | 50     // If the channel is already disconnected, we shouldn't dispatch anything | 
|  | 51     // but a done event. | 
|  | 52     if (_channel._disconnected) { | 
|  | 53       onData = null; | 
|  | 54       onError = null; | 
|  | 55     } | 
|  | 56 | 
|  | 57     var subscription = _inner.listen(onData, | 
|  | 58         onError: onError, onDone: onDone, cancelOnError: cancelOnError); | 
|  | 59     if (!_channel._disconnected) { | 
|  | 60       _channel._subscription = subscription; | 
|  | 61     } | 
|  | 62     return subscription; | 
|  | 63   } | 
|  | 64 } | 
|  | 65 | 
|  | 66 /// The sink for [CloseGuaranteeChannel]. | 
|  | 67 /// | 
|  | 68 /// This wraps the inner sink to cancel the stream subscription when the sink is | 
|  | 69 /// canceled. | 
|  | 70 class _CloseGuaranteeSink<T> extends DelegatingStreamSink<T> { | 
|  | 71   /// The [CloseGuaranteeChannel] this belongs to. | 
|  | 72   final CloseGuaranteeChannel<T> _channel; | 
|  | 73 | 
|  | 74   _CloseGuaranteeSink(StreamSink<T> inner, this._channel) : super(inner); | 
|  | 75 | 
|  | 76   Future close() { | 
|  | 77     var done = super.close(); | 
|  | 78     _channel._disconnected = true; | 
|  | 79     if (_channel._subscription != null) { | 
|  | 80       // Don't dispatch anything but a done event. | 
|  | 81       _channel._subscription.onData(null); | 
|  | 82       _channel._subscription.onError(null); | 
|  | 83     } | 
|  | 84     return done; | 
|  | 85   } | 
|  | 86 } | 
| OLD | NEW | 
|---|