Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(56)

Unified Diff: lib/src/guarantee_channel.dart

Issue 1669953002: Provide more error-handling customization. (Closed) Base URL: git@github.com:dart-lang/stream_channel.git@master
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | lib/src/stream_channel_controller.dart » ('j') | test/with_guarantees_test.dart » ('J')
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: lib/src/guarantee_channel.dart
diff --git a/lib/src/guarantee_channel.dart b/lib/src/guarantee_channel.dart
index 849cc2ffe74edc22e0ff79afe65749e0f5636af3..6a2f2e05ce1525bc51f2b738ee2e3a54eb1fc46a 100644
--- a/lib/src/guarantee_channel.dart
+++ b/lib/src/guarantee_channel.dart
@@ -30,8 +30,10 @@ class GuaranteeChannel<T> extends StreamChannelMixin<T> {
/// Whether the sink has closed, causing the underlying channel to disconnect.
bool _disconnected = false;
- GuaranteeChannel(Stream<T> innerStream, StreamSink<T> innerSink) {
- _sink = new _GuaranteeSink<T>(innerSink, this);
+ GuaranteeChannel(Stream<T> innerStream, StreamSink<T> innerSink,
+ {bool allowSinkErrors: true}) {
+ _sink = new _GuaranteeSink<T>(innerSink, this,
+ allowErrors: allowSinkErrors);
// Enforce the single-subscription guarantee by changing a broadcast stream
// to single-subscription.
@@ -76,10 +78,13 @@ class _GuaranteeSink<T> implements StreamSink<T> {
/// The [GuaranteeChannel] this belongs to.
final GuaranteeChannel<T> _channel;
- Future get done => _inner.done;
+ Future get done => _doneCompleter.future;
+ final _doneCompleter = new Completer();
- /// Whether the stream has emitted a done event, causing the underlying
- /// channel to disconnect.
+ /// Whether connection is disconnected.
+ ///
+ /// This can happen because the stream has emitted a done event, or because
+ /// the user added an error when [_allowErrors] is `false`.
bool _disconnected = false;
/// Whether the user has called [close].
@@ -96,7 +101,14 @@ class _GuaranteeSink<T> implements StreamSink<T> {
/// Whether we're currently adding a stream with [addStream].
bool get _inAddStream => _addStreamSubscription != null;
- _GuaranteeSink(this._inner, this._channel);
+ /// Whether errors are passed on to the underlying sink.
+ ///
+ /// If this is `false`, any error passed to the sink is piped to [done] and
+ /// the underlying sink is closed.
+ final bool _allowErrors;
+
+ _GuaranteeSink(this._inner, this._channel, {bool allowErrors: true})
+ : _allowErrors = allowErrors;
void add(T data) {
if (_closed) throw new StateError("Cannot add event after closing.");
@@ -115,7 +127,20 @@ class _GuaranteeSink<T> implements StreamSink<T> {
}
if (_disconnected) return;
- _inner.addError(error, stackTrace);
+ if (_allowErrors) {
+ _inner.addError(error, stackTrace);
+ return;
+ }
+
+ _doneCompleter.completeError(error, stackTrace);
+
+ // Treat an error like both the stream and sink disconnecting.
+ _onStreamDisconnected();
+ _channel._onSinkDisconnected();
+
+ // Ignore errors from the inner sink. We're already surfacing one error, and
+ // if the user handles it we don't want them to have another top-level.
+ _inner.close().catchError((_) {});
}
Future addStream(Stream<T> stream) {
@@ -142,10 +167,11 @@ class _GuaranteeSink<T> implements StreamSink<T> {
}
_closed = true;
- if (_disconnected) return new Future.value();
+ if (_disconnected) return done;
tjblasi 2016/02/04 23:16:01 mega-nit: Since here & line 174 return the same va
_channel._onSinkDisconnected();
- return _inner.close();
+ _doneCompleter.complete(_inner.close());
+ return done;
}
/// Called by [GuaranteeChannel] when the stream emits a done event.
@@ -154,8 +180,9 @@ class _GuaranteeSink<T> implements StreamSink<T> {
/// sink should stop forwarding events.
void _onStreamDisconnected() {
_disconnected = true;
- if (!_inAddStream) return;
+ if (!_doneCompleter.isCompleted) _doneCompleter.complete();
+ if (!_inAddStream) return;
_addStreamCompleter.complete(_addStreamSubscription.cancel());
_addStreamCompleter = null;
_addStreamSubscription = null;
« no previous file with comments | « no previous file | lib/src/stream_channel_controller.dart » ('j') | test/with_guarantees_test.dart » ('J')

Powered by Google App Engine
This is Rietveld 408576698