Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 import 'dart:async'; | 5 import 'dart:async'; |
| 6 | 6 |
| 7 import 'package:async/async.dart'; | 7 import 'package:async/async.dart'; |
| 8 | 8 |
| 9 import '../stream_channel.dart'; | 9 import '../stream_channel.dart'; |
| 10 | 10 |
| (...skipping 12 matching lines...) Expand all Loading... | |
| 23 /// event even after the user has canceled their subscription, and to send our | 23 /// event even after the user has canceled their subscription, and to send our |
| 24 /// own done event when the sink is closed. | 24 /// own done event when the sink is closed. |
| 25 StreamController<T> _streamController; | 25 StreamController<T> _streamController; |
| 26 | 26 |
| 27 /// The subscription to the inner stream. | 27 /// The subscription to the inner stream. |
| 28 StreamSubscription<T> _subscription; | 28 StreamSubscription<T> _subscription; |
| 29 | 29 |
| 30 /// Whether the sink has closed, causing the underlying channel to disconnect. | 30 /// Whether the sink has closed, causing the underlying channel to disconnect. |
| 31 bool _disconnected = false; | 31 bool _disconnected = false; |
| 32 | 32 |
| 33 GuaranteeChannel(Stream<T> innerStream, StreamSink<T> innerSink) { | 33 GuaranteeChannel(Stream<T> innerStream, StreamSink<T> innerSink, |
| 34 _sink = new _GuaranteeSink<T>(innerSink, this); | 34 {bool allowSinkErrors: true}) { |
| 35 _sink = new _GuaranteeSink<T>(innerSink, this, | |
| 36 allowErrors: allowSinkErrors); | |
| 35 | 37 |
| 36 // Enforce the single-subscription guarantee by changing a broadcast stream | 38 // Enforce the single-subscription guarantee by changing a broadcast stream |
| 37 // to single-subscription. | 39 // to single-subscription. |
| 38 if (innerStream.isBroadcast) { | 40 if (innerStream.isBroadcast) { |
| 39 innerStream = innerStream.transform( | 41 innerStream = innerStream.transform( |
| 40 const SingleSubscriptionTransformer()); | 42 const SingleSubscriptionTransformer()); |
| 41 } | 43 } |
| 42 | 44 |
| 43 _streamController = new StreamController<T>(onListen: () { | 45 _streamController = new StreamController<T>(onListen: () { |
| 44 // If the sink has disconnected, we've already called | 46 // If the sink has disconnected, we've already called |
| (...skipping 24 matching lines...) Expand all Loading... | |
| 69 /// | 71 /// |
| 70 /// This wraps the inner sink to ignore events and cancel any in-progress | 72 /// This wraps the inner sink to ignore events and cancel any in-progress |
| 71 /// [addStream] calls when the underlying channel closes. | 73 /// [addStream] calls when the underlying channel closes. |
| 72 class _GuaranteeSink<T> implements StreamSink<T> { | 74 class _GuaranteeSink<T> implements StreamSink<T> { |
| 73 /// The inner sink being wrapped. | 75 /// The inner sink being wrapped. |
| 74 final StreamSink<T> _inner; | 76 final StreamSink<T> _inner; |
| 75 | 77 |
| 76 /// The [GuaranteeChannel] this belongs to. | 78 /// The [GuaranteeChannel] this belongs to. |
| 77 final GuaranteeChannel<T> _channel; | 79 final GuaranteeChannel<T> _channel; |
| 78 | 80 |
| 79 Future get done => _inner.done; | 81 Future get done => _doneCompleter.future; |
| 82 final _doneCompleter = new Completer(); | |
| 80 | 83 |
| 81 /// Whether the stream has emitted a done event, causing the underlying | 84 /// Whether connection is disconnected. |
| 82 /// channel to disconnect. | 85 /// |
| 86 /// This can happen because the stream has emitted a done event, or because | |
| 87 /// the user added an error when [_allowErrors] is `false`. | |
| 83 bool _disconnected = false; | 88 bool _disconnected = false; |
| 84 | 89 |
| 85 /// Whether the user has called [close]. | 90 /// Whether the user has called [close]. |
| 86 bool _closed = false; | 91 bool _closed = false; |
| 87 | 92 |
| 88 /// The subscription to the stream passed to [addStream], if a stream is | 93 /// The subscription to the stream passed to [addStream], if a stream is |
| 89 /// currently being added. | 94 /// currently being added. |
| 90 StreamSubscription<T> _addStreamSubscription; | 95 StreamSubscription<T> _addStreamSubscription; |
| 91 | 96 |
| 92 /// The completer for the future returned by [addStream], if a stream is | 97 /// The completer for the future returned by [addStream], if a stream is |
| 93 /// currently being added. | 98 /// currently being added. |
| 94 Completer _addStreamCompleter; | 99 Completer _addStreamCompleter; |
| 95 | 100 |
| 96 /// Whether we're currently adding a stream with [addStream]. | 101 /// Whether we're currently adding a stream with [addStream]. |
| 97 bool get _inAddStream => _addStreamSubscription != null; | 102 bool get _inAddStream => _addStreamSubscription != null; |
| 98 | 103 |
| 99 _GuaranteeSink(this._inner, this._channel); | 104 /// Whether errors are passed on to the underlying sink. |
| 105 /// | |
| 106 /// If this is `false`, any error passed to the sink is piped to [done] and | |
| 107 /// the underlying sink is closed. | |
| 108 final bool _allowErrors; | |
| 109 | |
| 110 _GuaranteeSink(this._inner, this._channel, {bool allowErrors: true}) | |
| 111 : _allowErrors = allowErrors; | |
| 100 | 112 |
| 101 void add(T data) { | 113 void add(T data) { |
| 102 if (_closed) throw new StateError("Cannot add event after closing."); | 114 if (_closed) throw new StateError("Cannot add event after closing."); |
| 103 if (_inAddStream) { | 115 if (_inAddStream) { |
| 104 throw new StateError("Cannot add event while adding stream."); | 116 throw new StateError("Cannot add event while adding stream."); |
| 105 } | 117 } |
| 106 if (_disconnected) return; | 118 if (_disconnected) return; |
| 107 | 119 |
| 108 _inner.add(data); | 120 _inner.add(data); |
| 109 } | 121 } |
| 110 | 122 |
| 111 void addError(error, [StackTrace stackTrace]) { | 123 void addError(error, [StackTrace stackTrace]) { |
| 112 if (_closed) throw new StateError("Cannot add event after closing."); | 124 if (_closed) throw new StateError("Cannot add event after closing."); |
| 113 if (_inAddStream) { | 125 if (_inAddStream) { |
| 114 throw new StateError("Cannot add event while adding stream."); | 126 throw new StateError("Cannot add event while adding stream."); |
| 115 } | 127 } |
| 116 if (_disconnected) return; | 128 if (_disconnected) return; |
| 117 | 129 |
| 118 _inner.addError(error, stackTrace); | 130 if (_allowErrors) { |
| 131 _inner.addError(error, stackTrace); | |
| 132 return; | |
| 133 } | |
| 134 | |
| 135 _doneCompleter.completeError(error, stackTrace); | |
| 136 | |
| 137 // Treat an error like both the stream and sink disconnecting. | |
| 138 _onStreamDisconnected(); | |
| 139 _channel._onSinkDisconnected(); | |
| 140 | |
| 141 // Ignore errors from the inner sink. We're already surfacing one error, and | |
| 142 // if the user handles it we don't want them to have another top-level. | |
| 143 _inner.close().catchError((_) {}); | |
| 119 } | 144 } |
| 120 | 145 |
| 121 Future addStream(Stream<T> stream) { | 146 Future addStream(Stream<T> stream) { |
| 122 if (_closed) throw new StateError("Cannot add stream after closing."); | 147 if (_closed) throw new StateError("Cannot add stream after closing."); |
| 123 if (_inAddStream) { | 148 if (_inAddStream) { |
| 124 throw new StateError("Cannot add stream while adding stream."); | 149 throw new StateError("Cannot add stream while adding stream."); |
| 125 } | 150 } |
| 126 if (_disconnected) return new Future.value(); | 151 if (_disconnected) return new Future.value(); |
| 127 | 152 |
| 128 _addStreamCompleter = new Completer.sync(); | 153 _addStreamCompleter = new Completer.sync(); |
| 129 _addStreamSubscription = stream.listen( | 154 _addStreamSubscription = stream.listen( |
| 130 _inner.add, | 155 _inner.add, |
| 131 onError: _inner.addError, | 156 onError: _inner.addError, |
| 132 onDone: _addStreamCompleter.complete); | 157 onDone: _addStreamCompleter.complete); |
| 133 return _addStreamCompleter.future.then((_) { | 158 return _addStreamCompleter.future.then((_) { |
| 134 _addStreamCompleter = null; | 159 _addStreamCompleter = null; |
| 135 _addStreamSubscription = null; | 160 _addStreamSubscription = null; |
| 136 }); | 161 }); |
| 137 } | 162 } |
| 138 | 163 |
| 139 Future close() { | 164 Future close() { |
| 140 if (_inAddStream) { | 165 if (_inAddStream) { |
| 141 throw new StateError("Cannot close sink while adding stream."); | 166 throw new StateError("Cannot close sink while adding stream."); |
| 142 } | 167 } |
| 143 | 168 |
| 144 _closed = true; | 169 _closed = true; |
| 145 if (_disconnected) return new Future.value(); | 170 if (_disconnected) return done; |
|
tjblasi
2016/02/04 23:16:01
mega-nit: Since here & line 174 return the same va
| |
| 146 | 171 |
| 147 _channel._onSinkDisconnected(); | 172 _channel._onSinkDisconnected(); |
| 148 return _inner.close(); | 173 _doneCompleter.complete(_inner.close()); |
| 174 return done; | |
| 149 } | 175 } |
| 150 | 176 |
| 151 /// Called by [GuaranteeChannel] when the stream emits a done event. | 177 /// Called by [GuaranteeChannel] when the stream emits a done event. |
| 152 /// | 178 /// |
| 153 /// The stream being done indicates that the connection is closed, so the | 179 /// The stream being done indicates that the connection is closed, so the |
| 154 /// sink should stop forwarding events. | 180 /// sink should stop forwarding events. |
| 155 void _onStreamDisconnected() { | 181 void _onStreamDisconnected() { |
| 156 _disconnected = true; | 182 _disconnected = true; |
| 183 if (!_doneCompleter.isCompleted) _doneCompleter.complete(); | |
| 184 | |
| 157 if (!_inAddStream) return; | 185 if (!_inAddStream) return; |
| 158 | |
| 159 _addStreamCompleter.complete(_addStreamSubscription.cancel()); | 186 _addStreamCompleter.complete(_addStreamSubscription.cancel()); |
| 160 _addStreamCompleter = null; | 187 _addStreamCompleter = null; |
| 161 _addStreamSubscription = null; | 188 _addStreamSubscription = null; |
| 162 } | 189 } |
| 163 } | 190 } |
| OLD | NEW |