Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file | |
| 2 // for details. All rights reserved. Use of this source code is governed by a | |
| 3 // BSD-style license that can be found in the LICENSE file. | |
| 4 | |
| 5 import 'dart:async'; | |
| 6 | |
| 7 import 'package:async/async.dart'; | |
| 8 | |
| 9 import '../stream_channel.dart'; | |
| 10 | |
| 11 /// A [StreamChannel] that enforces the stream channel guarantees. | |
|
tjblasi
2016/02/04 00:33:47
Consider adding a pointer to docs explaining what
nweiz
2016/02/04 01:35:04
The guarantees are explained in the StreamChannel
| |
| 12 /// | |
| 13 /// This is exposed via [new StreamChannel.withGuarantees]. | |
| 14 class GuaranteeChannel<T> extends StreamChannelMixin<T> { | |
| 15 Stream<T> get stream => _streamController.stream; | |
| 16 | |
| 17 StreamSink<T> get sink => _sink; | |
| 18 _GuaranteeSink<T> _sink; | |
|
tjblasi
2016/02/04 00:33:47
`_sink`, `_streamController`, and `_subscription`
nweiz
2016/02/04 01:35:04
Unfortunately not :(. _sink's constructor needs to
| |
| 19 | |
| 20 /// The controller for [stream]. | |
| 21 /// | |
| 22 /// This intermediate controller allows us to continue listening for a done | |
| 23 /// event even after the user has canceled their subscription, and to send our | |
| 24 /// own done event when the sink is closed. | |
| 25 StreamController<T> _streamController; | |
| 26 | |
| 27 /// The subscription to the inner stream. | |
| 28 StreamSubscription<T> _subscription; | |
| 29 | |
| 30 /// Whether the sink has closed, causing the underlying channel to disconnect. | |
| 31 bool _disconnected = false; | |
| 32 | |
| 33 GuaranteeChannel(Stream<T> innerStream, StreamSink<T> innerSink) { | |
| 34 _sink = new _GuaranteeSink<T>(innerSink, this); | |
| 35 | |
| 36 // Enforce the single-subscription guarantee by changing a broadcast stream | |
| 37 // to single-subscription. | |
| 38 if (innerStream.isBroadcast) { | |
| 39 innerStream = innerStream.transform( | |
| 40 const SingleSubscriptionTransformer()); | |
| 41 } | |
| 42 | |
| 43 _streamController = new StreamController<T>(onListen: () { | |
| 44 // If the sink has disconnected, we've already called | |
| 45 // [_streamController.close]. | |
| 46 if (_disconnected) return; | |
| 47 | |
| 48 _subscription = innerStream.listen(_streamController.add, | |
| 49 onError: _streamController.addError, | |
| 50 onDone: () { | |
| 51 _sink._onStreamDisconnected(); | |
| 52 _streamController.close(); | |
| 53 }); | |
| 54 }, sync: true); | |
| 55 } | |
| 56 | |
| 57 /// Called by [_GuaranteeSink] when the user closes it. | |
| 58 /// | |
| 59 /// The sink closing indicates that the connection is closed, so the stream | |
| 60 /// should stop emitting events. | |
| 61 void _onSinkDisconnected() { | |
| 62 _disconnected = true; | |
| 63 if (_subscription != null) _subscription.cancel(); | |
| 64 _streamController.close(); | |
| 65 } | |
| 66 } | |
| 67 | |
| 68 /// The sink for [GuaranteeChannel]. | |
| 69 /// | |
| 70 /// This wraps the inner sink to ignore events and cancel any in-progress | |
| 71 /// [addStream] calls when the underlying channel closes. | |
| 72 class _GuaranteeSink<T> implements StreamSink<T> { | |
| 73 /// The inner sink being wrapped. | |
| 74 final StreamSink<T> _inner; | |
| 75 | |
| 76 /// The [GuaranteeChannel] this belongs to. | |
| 77 final GuaranteeChannel<T> _channel; | |
| 78 | |
| 79 Future get done => _inner.done; | |
| 80 | |
| 81 /// Whether the stream has emitted a done event, causing the underlying | |
| 82 /// channel to disconnect. | |
| 83 bool _disconnected = false; | |
| 84 | |
| 85 /// Whether the user has called [close]. | |
| 86 bool _closed = false; | |
| 87 | |
| 88 /// The subscription to the stream passed to [addStream], if a stream is | |
| 89 /// currently being added. | |
| 90 StreamSubscription<T> _addStreamSubscription; | |
| 91 | |
| 92 /// The completer for the future returned by [addStream], if a stream is | |
| 93 /// currently being added. | |
| 94 Completer _addStreamCompleter; | |
| 95 | |
| 96 /// Whether we're currently adding a stream with [addStream]. | |
| 97 bool get _inAddStream => _addStreamSubscription != null; | |
| 98 | |
| 99 _GuaranteeSink(this._inner, this._channel); | |
| 100 | |
| 101 void add(T data) { | |
| 102 if (_closed) throw new StateError("Cannot add event after closing."); | |
| 103 if (_inAddStream) { | |
| 104 throw new StateError("Cannot add event while adding stream."); | |
| 105 } | |
| 106 if (_disconnected) return; | |
| 107 | |
| 108 _inner.add(data); | |
| 109 } | |
| 110 | |
| 111 void addError(error, [StackTrace stackTrace]) { | |
| 112 if (_closed) throw new StateError("Cannot add event after closing."); | |
| 113 if (_inAddStream) { | |
| 114 throw new StateError("Cannot add event while adding stream."); | |
| 115 } | |
| 116 if (_disconnected) return; | |
| 117 | |
| 118 _inner.addError(error, stackTrace); | |
| 119 } | |
| 120 | |
| 121 Future addStream(Stream<T> stream) { | |
| 122 if (_closed) throw new StateError("Cannot add stream after closing."); | |
| 123 if (_inAddStream) { | |
| 124 throw new StateError("Cannot add stream while adding stream."); | |
| 125 } | |
| 126 if (_disconnected) return new Future.value(); | |
| 127 | |
| 128 _addStreamCompleter = new Completer.sync(); | |
| 129 _addStreamSubscription = stream.listen( | |
| 130 _inner.add, | |
| 131 onError: _inner.addError, | |
| 132 onDone: _addStreamCompleter.complete); | |
| 133 return _addStreamCompleter.future.then((_) { | |
| 134 _addStreamCompleter = null; | |
| 135 _addStreamSubscription = null; | |
| 136 }); | |
| 137 } | |
| 138 | |
| 139 Future close() { | |
| 140 if (_inAddStream) { | |
| 141 throw new StateError("Cannot close sink while adding stream."); | |
| 142 } | |
| 143 | |
| 144 _closed = true; | |
| 145 if (_disconnected) return new Future.value(); | |
| 146 | |
| 147 _channel._onSinkDisconnected(); | |
| 148 return _inner.close(); | |
| 149 } | |
| 150 | |
| 151 /// Called by [GuaranteeChannel] when the stream emits a done event. | |
| 152 /// | |
| 153 /// The stream being done indicates that the connection is closed, so the | |
| 154 /// sink should stop forwarding events. | |
| 155 void _onStreamDisconnected() { | |
| 156 _disconnected = true; | |
| 157 if (!_inAddStream) return; | |
| 158 | |
| 159 _addStreamCompleter.complete(_addStreamSubscription.cancel()); | |
| 160 _addStreamCompleter = null; | |
| 161 _addStreamSubscription = null; | |
| 162 } | |
| 163 } | |
| OLD | NEW |