OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 import 'dart:async'; |
| 6 |
| 7 import 'package:async/async.dart'; |
| 8 |
| 9 import '../stream_channel.dart'; |
| 10 |
| 11 /// A [StreamChannel] that enforces the stream channel guarantees. |
| 12 /// |
| 13 /// This is exposed via [new StreamChannel.withGuarantees]. |
| 14 class GuaranteeChannel<T> extends StreamChannelMixin<T> { |
| 15 Stream<T> get stream => _streamController.stream; |
| 16 |
| 17 StreamSink<T> get sink => _sink; |
| 18 _GuaranteeSink<T> _sink; |
| 19 |
| 20 /// The controller for [stream]. |
| 21 /// |
| 22 /// This intermediate controller allows us to continue listening for a done |
| 23 /// event even after the user has canceled their subscription, and to send our |
| 24 /// own done event when the sink is closed. |
| 25 StreamController<T> _streamController; |
| 26 |
| 27 /// The subscription to the inner stream. |
| 28 StreamSubscription<T> _subscription; |
| 29 |
| 30 /// Whether the sink has closed, causing the underlying channel to disconnect. |
| 31 bool _disconnected = false; |
| 32 |
| 33 GuaranteeChannel(Stream<T> innerStream, StreamSink<T> innerSink) { |
| 34 _sink = new _GuaranteeSink<T>(innerSink, this); |
| 35 |
| 36 // Enforce the single-subscription guarantee by changing a broadcast stream |
| 37 // to single-subscription. |
| 38 if (innerStream.isBroadcast) { |
| 39 innerStream = innerStream.transform( |
| 40 const SingleSubscriptionTransformer()); |
| 41 } |
| 42 |
| 43 _streamController = new StreamController<T>(onListen: () { |
| 44 // If the sink has disconnected, we've already called |
| 45 // [_streamController.close]. |
| 46 if (_disconnected) return; |
| 47 |
| 48 _subscription = innerStream.listen(_streamController.add, |
| 49 onError: _streamController.addError, |
| 50 onDone: () { |
| 51 _sink._onStreamDisconnected(); |
| 52 _streamController.close(); |
| 53 }); |
| 54 }, sync: true); |
| 55 } |
| 56 |
| 57 /// Called by [_GuaranteeSink] when the user closes it. |
| 58 /// |
| 59 /// The sink closing indicates that the connection is closed, so the stream |
| 60 /// should stop emitting events. |
| 61 void _onSinkDisconnected() { |
| 62 _disconnected = true; |
| 63 if (_subscription != null) _subscription.cancel(); |
| 64 _streamController.close(); |
| 65 } |
| 66 } |
| 67 |
| 68 /// The sink for [GuaranteeChannel]. |
| 69 /// |
| 70 /// This wraps the inner sink to ignore events and cancel any in-progress |
| 71 /// [addStream] calls when the underlying channel closes. |
| 72 class _GuaranteeSink<T> implements StreamSink<T> { |
| 73 /// The inner sink being wrapped. |
| 74 final StreamSink<T> _inner; |
| 75 |
| 76 /// The [GuaranteeChannel] this belongs to. |
| 77 final GuaranteeChannel<T> _channel; |
| 78 |
| 79 Future get done => _inner.done; |
| 80 |
| 81 /// Whether the stream has emitted a done event, causing the underlying |
| 82 /// channel to disconnect. |
| 83 bool _disconnected = false; |
| 84 |
| 85 /// Whether the user has called [close]. |
| 86 bool _closed = false; |
| 87 |
| 88 /// The subscription to the stream passed to [addStream], if a stream is |
| 89 /// currently being added. |
| 90 StreamSubscription<T> _addStreamSubscription; |
| 91 |
| 92 /// The completer for the future returned by [addStream], if a stream is |
| 93 /// currently being added. |
| 94 Completer _addStreamCompleter; |
| 95 |
| 96 /// Whether we're currently adding a stream with [addStream]. |
| 97 bool get _inAddStream => _addStreamSubscription != null; |
| 98 |
| 99 _GuaranteeSink(this._inner, this._channel); |
| 100 |
| 101 void add(T data) { |
| 102 if (_closed) throw new StateError("Cannot add event after closing."); |
| 103 if (_inAddStream) { |
| 104 throw new StateError("Cannot add event while adding stream."); |
| 105 } |
| 106 if (_disconnected) return; |
| 107 |
| 108 _inner.add(data); |
| 109 } |
| 110 |
| 111 void addError(error, [StackTrace stackTrace]) { |
| 112 if (_closed) throw new StateError("Cannot add event after closing."); |
| 113 if (_inAddStream) { |
| 114 throw new StateError("Cannot add event while adding stream."); |
| 115 } |
| 116 if (_disconnected) return; |
| 117 |
| 118 _inner.addError(error, stackTrace); |
| 119 } |
| 120 |
| 121 Future addStream(Stream<T> stream) { |
| 122 if (_closed) throw new StateError("Cannot add stream after closing."); |
| 123 if (_inAddStream) { |
| 124 throw new StateError("Cannot add stream while adding stream."); |
| 125 } |
| 126 if (_disconnected) return new Future.value(); |
| 127 |
| 128 _addStreamCompleter = new Completer.sync(); |
| 129 _addStreamSubscription = stream.listen( |
| 130 _inner.add, |
| 131 onError: _inner.addError, |
| 132 onDone: _addStreamCompleter.complete); |
| 133 return _addStreamCompleter.future.then((_) { |
| 134 _addStreamCompleter = null; |
| 135 _addStreamSubscription = null; |
| 136 }); |
| 137 } |
| 138 |
| 139 Future close() { |
| 140 if (_inAddStream) { |
| 141 throw new StateError("Cannot close sink while adding stream."); |
| 142 } |
| 143 |
| 144 _closed = true; |
| 145 if (_disconnected) return new Future.value(); |
| 146 |
| 147 _channel._onSinkDisconnected(); |
| 148 return _inner.close(); |
| 149 } |
| 150 |
| 151 /// Called by [GuaranteeChannel] when the stream emits a done event. |
| 152 /// |
| 153 /// The stream being done indicates that the connection is closed, so the |
| 154 /// sink should stop forwarding events. |
| 155 void _onStreamDisconnected() { |
| 156 _disconnected = true; |
| 157 if (!_inAddStream) return; |
| 158 |
| 159 _addStreamCompleter.complete(_addStreamSubscription.cancel()); |
| 160 _addStreamCompleter = null; |
| 161 _addStreamSubscription = null; |
| 162 } |
| 163 } |
OLD | NEW |