Index: lib/src/guarantee_channel.dart |
diff --git a/lib/src/guarantee_channel.dart b/lib/src/guarantee_channel.dart |
index 849cc2ffe74edc22e0ff79afe65749e0f5636af3..6a2f2e05ce1525bc51f2b738ee2e3a54eb1fc46a 100644 |
--- a/lib/src/guarantee_channel.dart |
+++ b/lib/src/guarantee_channel.dart |
@@ -30,8 +30,10 @@ class GuaranteeChannel<T> extends StreamChannelMixin<T> { |
/// Whether the sink has closed, causing the underlying channel to disconnect. |
bool _disconnected = false; |
- GuaranteeChannel(Stream<T> innerStream, StreamSink<T> innerSink) { |
- _sink = new _GuaranteeSink<T>(innerSink, this); |
+ GuaranteeChannel(Stream<T> innerStream, StreamSink<T> innerSink, |
+ {bool allowSinkErrors: true}) { |
+ _sink = new _GuaranteeSink<T>(innerSink, this, |
+ allowErrors: allowSinkErrors); |
// Enforce the single-subscription guarantee by changing a broadcast stream |
// to single-subscription. |
@@ -76,10 +78,13 @@ class _GuaranteeSink<T> implements StreamSink<T> { |
/// The [GuaranteeChannel] this belongs to. |
final GuaranteeChannel<T> _channel; |
- Future get done => _inner.done; |
+ Future get done => _doneCompleter.future; |
+ final _doneCompleter = new Completer(); |
- /// Whether the stream has emitted a done event, causing the underlying |
- /// channel to disconnect. |
+ /// Whether connection is disconnected. |
+ /// |
+ /// This can happen because the stream has emitted a done event, or because |
+ /// the user added an error when [_allowErrors] is `false`. |
bool _disconnected = false; |
/// Whether the user has called [close]. |
@@ -96,7 +101,14 @@ class _GuaranteeSink<T> implements StreamSink<T> { |
/// Whether we're currently adding a stream with [addStream]. |
bool get _inAddStream => _addStreamSubscription != null; |
- _GuaranteeSink(this._inner, this._channel); |
+ /// Whether errors are passed on to the underlying sink. |
+ /// |
+ /// If this is `false`, any error passed to the sink is piped to [done] and |
+ /// the underlying sink is closed. |
+ final bool _allowErrors; |
+ |
+ _GuaranteeSink(this._inner, this._channel, {bool allowErrors: true}) |
+ : _allowErrors = allowErrors; |
void add(T data) { |
if (_closed) throw new StateError("Cannot add event after closing."); |
@@ -115,7 +127,20 @@ class _GuaranteeSink<T> implements StreamSink<T> { |
} |
if (_disconnected) return; |
- _inner.addError(error, stackTrace); |
+ if (_allowErrors) { |
+ _inner.addError(error, stackTrace); |
+ return; |
+ } |
+ |
+ _doneCompleter.completeError(error, stackTrace); |
+ |
+ // Treat an error like both the stream and sink disconnecting. |
+ _onStreamDisconnected(); |
+ _channel._onSinkDisconnected(); |
+ |
+ // Ignore errors from the inner sink. We're already surfacing one error, and |
+ // if the user handles it we don't want them to have another top-level. |
+ _inner.close().catchError((_) {}); |
} |
Future addStream(Stream<T> stream) { |
@@ -142,10 +167,11 @@ class _GuaranteeSink<T> implements StreamSink<T> { |
} |
_closed = true; |
- if (_disconnected) return new Future.value(); |
+ if (_disconnected) return done; |
tjblasi
2016/02/04 23:16:01
mega-nit: Since here & line 174 return the same va
|
_channel._onSinkDisconnected(); |
- return _inner.close(); |
+ _doneCompleter.complete(_inner.close()); |
+ return done; |
} |
/// Called by [GuaranteeChannel] when the stream emits a done event. |
@@ -154,8 +180,9 @@ class _GuaranteeSink<T> implements StreamSink<T> { |
/// sink should stop forwarding events. |
void _onStreamDisconnected() { |
_disconnected = true; |
- if (!_inAddStream) return; |
+ if (!_doneCompleter.isCompleted) _doneCompleter.complete(); |
+ if (!_inAddStream) return; |
_addStreamCompleter.complete(_addStreamSubscription.cancel()); |
_addStreamCompleter = null; |
_addStreamSubscription = null; |