Chromium Code Reviews| Index: lib/src/guarantee_channel.dart |
| diff --git a/lib/src/guarantee_channel.dart b/lib/src/guarantee_channel.dart |
| index 849cc2ffe74edc22e0ff79afe65749e0f5636af3..6a2f2e05ce1525bc51f2b738ee2e3a54eb1fc46a 100644 |
| --- a/lib/src/guarantee_channel.dart |
| +++ b/lib/src/guarantee_channel.dart |
| @@ -30,8 +30,10 @@ class GuaranteeChannel<T> extends StreamChannelMixin<T> { |
| /// Whether the sink has closed, causing the underlying channel to disconnect. |
| bool _disconnected = false; |
| - GuaranteeChannel(Stream<T> innerStream, StreamSink<T> innerSink) { |
| - _sink = new _GuaranteeSink<T>(innerSink, this); |
| + GuaranteeChannel(Stream<T> innerStream, StreamSink<T> innerSink, |
| + {bool allowSinkErrors: true}) { |
| + _sink = new _GuaranteeSink<T>(innerSink, this, |
| + allowErrors: allowSinkErrors); |
| // Enforce the single-subscription guarantee by changing a broadcast stream |
| // to single-subscription. |
| @@ -76,10 +78,13 @@ class _GuaranteeSink<T> implements StreamSink<T> { |
| /// The [GuaranteeChannel] this belongs to. |
| final GuaranteeChannel<T> _channel; |
| - Future get done => _inner.done; |
| + Future get done => _doneCompleter.future; |
| + final _doneCompleter = new Completer(); |
| - /// Whether the stream has emitted a done event, causing the underlying |
| - /// channel to disconnect. |
| + /// Whether connection is disconnected. |
| + /// |
| + /// This can happen because the stream has emitted a done event, or because |
| + /// the user added an error when [_allowErrors] is `false`. |
| bool _disconnected = false; |
| /// Whether the user has called [close]. |
| @@ -96,7 +101,14 @@ class _GuaranteeSink<T> implements StreamSink<T> { |
| /// Whether we're currently adding a stream with [addStream]. |
| bool get _inAddStream => _addStreamSubscription != null; |
| - _GuaranteeSink(this._inner, this._channel); |
| + /// Whether errors are passed on to the underlying sink. |
| + /// |
| + /// If this is `false`, any error passed to the sink is piped to [done] and |
| + /// the underlying sink is closed. |
| + final bool _allowErrors; |
| + |
| + _GuaranteeSink(this._inner, this._channel, {bool allowErrors: true}) |
| + : _allowErrors = allowErrors; |
| void add(T data) { |
| if (_closed) throw new StateError("Cannot add event after closing."); |
| @@ -115,7 +127,20 @@ class _GuaranteeSink<T> implements StreamSink<T> { |
| } |
| if (_disconnected) return; |
| - _inner.addError(error, stackTrace); |
| + if (_allowErrors) { |
| + _inner.addError(error, stackTrace); |
| + return; |
| + } |
| + |
| + _doneCompleter.completeError(error, stackTrace); |
| + |
| + // Treat an error like both the stream and sink disconnecting. |
| + _onStreamDisconnected(); |
| + _channel._onSinkDisconnected(); |
| + |
| + // Ignore errors from the inner sink. We're already surfacing one error, and |
| + // if the user handles it we don't want them to have another top-level. |
| + _inner.close().catchError((_) {}); |
| } |
| Future addStream(Stream<T> stream) { |
| @@ -142,10 +167,11 @@ class _GuaranteeSink<T> implements StreamSink<T> { |
| } |
| _closed = true; |
| - if (_disconnected) return new Future.value(); |
| + if (_disconnected) return done; |
|
tjblasi
2016/02/04 23:16:01
mega-nit: Since here & line 174 return the same va
|
| _channel._onSinkDisconnected(); |
| - return _inner.close(); |
| + _doneCompleter.complete(_inner.close()); |
| + return done; |
| } |
| /// Called by [GuaranteeChannel] when the stream emits a done event. |
| @@ -154,8 +180,9 @@ class _GuaranteeSink<T> implements StreamSink<T> { |
| /// sink should stop forwarding events. |
| void _onStreamDisconnected() { |
| _disconnected = true; |
| - if (!_inAddStream) return; |
| + if (!_doneCompleter.isCompleted) _doneCompleter.complete(); |
| + if (!_inAddStream) return; |
| _addStreamCompleter.complete(_addStreamSubscription.cancel()); |
| _addStreamCompleter = null; |
| _addStreamSubscription = null; |