Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(115)

Side by Side Diff: lib/src/guarantee_channel.dart

Issue 1669953002: Provide more error-handling customization. (Closed) Base URL: git@github.com:dart-lang/stream_channel.git@master
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | lib/src/stream_channel_controller.dart » ('j') | test/with_guarantees_test.dart » ('J')
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 import 'dart:async'; 5 import 'dart:async';
6 6
7 import 'package:async/async.dart'; 7 import 'package:async/async.dart';
8 8
9 import '../stream_channel.dart'; 9 import '../stream_channel.dart';
10 10
(...skipping 12 matching lines...) Expand all
23 /// event even after the user has canceled their subscription, and to send our 23 /// event even after the user has canceled their subscription, and to send our
24 /// own done event when the sink is closed. 24 /// own done event when the sink is closed.
25 StreamController<T> _streamController; 25 StreamController<T> _streamController;
26 26
27 /// The subscription to the inner stream. 27 /// The subscription to the inner stream.
28 StreamSubscription<T> _subscription; 28 StreamSubscription<T> _subscription;
29 29
30 /// Whether the sink has closed, causing the underlying channel to disconnect. 30 /// Whether the sink has closed, causing the underlying channel to disconnect.
31 bool _disconnected = false; 31 bool _disconnected = false;
32 32
33 GuaranteeChannel(Stream<T> innerStream, StreamSink<T> innerSink) { 33 GuaranteeChannel(Stream<T> innerStream, StreamSink<T> innerSink,
34 _sink = new _GuaranteeSink<T>(innerSink, this); 34 {bool allowSinkErrors: true}) {
35 _sink = new _GuaranteeSink<T>(innerSink, this,
36 allowErrors: allowSinkErrors);
35 37
36 // Enforce the single-subscription guarantee by changing a broadcast stream 38 // Enforce the single-subscription guarantee by changing a broadcast stream
37 // to single-subscription. 39 // to single-subscription.
38 if (innerStream.isBroadcast) { 40 if (innerStream.isBroadcast) {
39 innerStream = innerStream.transform( 41 innerStream = innerStream.transform(
40 const SingleSubscriptionTransformer()); 42 const SingleSubscriptionTransformer());
41 } 43 }
42 44
43 _streamController = new StreamController<T>(onListen: () { 45 _streamController = new StreamController<T>(onListen: () {
44 // If the sink has disconnected, we've already called 46 // If the sink has disconnected, we've already called
(...skipping 24 matching lines...) Expand all
69 /// 71 ///
70 /// This wraps the inner sink to ignore events and cancel any in-progress 72 /// This wraps the inner sink to ignore events and cancel any in-progress
71 /// [addStream] calls when the underlying channel closes. 73 /// [addStream] calls when the underlying channel closes.
72 class _GuaranteeSink<T> implements StreamSink<T> { 74 class _GuaranteeSink<T> implements StreamSink<T> {
73 /// The inner sink being wrapped. 75 /// The inner sink being wrapped.
74 final StreamSink<T> _inner; 76 final StreamSink<T> _inner;
75 77
76 /// The [GuaranteeChannel] this belongs to. 78 /// The [GuaranteeChannel] this belongs to.
77 final GuaranteeChannel<T> _channel; 79 final GuaranteeChannel<T> _channel;
78 80
79 Future get done => _inner.done; 81 Future get done => _doneCompleter.future;
82 final _doneCompleter = new Completer();
80 83
81 /// Whether the stream has emitted a done event, causing the underlying 84 /// Whether connection is disconnected.
82 /// channel to disconnect. 85 ///
86 /// This can happen because the stream has emitted a done event, or because
87 /// the user added an error when [_allowErrors] is `false`.
83 bool _disconnected = false; 88 bool _disconnected = false;
84 89
85 /// Whether the user has called [close]. 90 /// Whether the user has called [close].
86 bool _closed = false; 91 bool _closed = false;
87 92
88 /// The subscription to the stream passed to [addStream], if a stream is 93 /// The subscription to the stream passed to [addStream], if a stream is
89 /// currently being added. 94 /// currently being added.
90 StreamSubscription<T> _addStreamSubscription; 95 StreamSubscription<T> _addStreamSubscription;
91 96
92 /// The completer for the future returned by [addStream], if a stream is 97 /// The completer for the future returned by [addStream], if a stream is
93 /// currently being added. 98 /// currently being added.
94 Completer _addStreamCompleter; 99 Completer _addStreamCompleter;
95 100
96 /// Whether we're currently adding a stream with [addStream]. 101 /// Whether we're currently adding a stream with [addStream].
97 bool get _inAddStream => _addStreamSubscription != null; 102 bool get _inAddStream => _addStreamSubscription != null;
98 103
99 _GuaranteeSink(this._inner, this._channel); 104 /// Whether errors are passed on to the underlying sink.
105 ///
106 /// If this is `false`, any error passed to the sink is piped to [done] and
107 /// the underlying sink is closed.
108 final bool _allowErrors;
109
110 _GuaranteeSink(this._inner, this._channel, {bool allowErrors: true})
111 : _allowErrors = allowErrors;
100 112
101 void add(T data) { 113 void add(T data) {
102 if (_closed) throw new StateError("Cannot add event after closing."); 114 if (_closed) throw new StateError("Cannot add event after closing.");
103 if (_inAddStream) { 115 if (_inAddStream) {
104 throw new StateError("Cannot add event while adding stream."); 116 throw new StateError("Cannot add event while adding stream.");
105 } 117 }
106 if (_disconnected) return; 118 if (_disconnected) return;
107 119
108 _inner.add(data); 120 _inner.add(data);
109 } 121 }
110 122
111 void addError(error, [StackTrace stackTrace]) { 123 void addError(error, [StackTrace stackTrace]) {
112 if (_closed) throw new StateError("Cannot add event after closing."); 124 if (_closed) throw new StateError("Cannot add event after closing.");
113 if (_inAddStream) { 125 if (_inAddStream) {
114 throw new StateError("Cannot add event while adding stream."); 126 throw new StateError("Cannot add event while adding stream.");
115 } 127 }
116 if (_disconnected) return; 128 if (_disconnected) return;
117 129
118 _inner.addError(error, stackTrace); 130 if (_allowErrors) {
131 _inner.addError(error, stackTrace);
132 return;
133 }
134
135 _doneCompleter.completeError(error, stackTrace);
136
137 // Treat an error like both the stream and sink disconnecting.
138 _onStreamDisconnected();
139 _channel._onSinkDisconnected();
140
141 // Ignore errors from the inner sink. We're already surfacing one error, and
142 // if the user handles it we don't want them to have another top-level.
143 _inner.close().catchError((_) {});
119 } 144 }
120 145
121 Future addStream(Stream<T> stream) { 146 Future addStream(Stream<T> stream) {
122 if (_closed) throw new StateError("Cannot add stream after closing."); 147 if (_closed) throw new StateError("Cannot add stream after closing.");
123 if (_inAddStream) { 148 if (_inAddStream) {
124 throw new StateError("Cannot add stream while adding stream."); 149 throw new StateError("Cannot add stream while adding stream.");
125 } 150 }
126 if (_disconnected) return new Future.value(); 151 if (_disconnected) return new Future.value();
127 152
128 _addStreamCompleter = new Completer.sync(); 153 _addStreamCompleter = new Completer.sync();
129 _addStreamSubscription = stream.listen( 154 _addStreamSubscription = stream.listen(
130 _inner.add, 155 _inner.add,
131 onError: _inner.addError, 156 onError: _inner.addError,
132 onDone: _addStreamCompleter.complete); 157 onDone: _addStreamCompleter.complete);
133 return _addStreamCompleter.future.then((_) { 158 return _addStreamCompleter.future.then((_) {
134 _addStreamCompleter = null; 159 _addStreamCompleter = null;
135 _addStreamSubscription = null; 160 _addStreamSubscription = null;
136 }); 161 });
137 } 162 }
138 163
139 Future close() { 164 Future close() {
140 if (_inAddStream) { 165 if (_inAddStream) {
141 throw new StateError("Cannot close sink while adding stream."); 166 throw new StateError("Cannot close sink while adding stream.");
142 } 167 }
143 168
144 _closed = true; 169 _closed = true;
145 if (_disconnected) return new Future.value(); 170 if (_disconnected) return done;
tjblasi 2016/02/04 23:16:01 mega-nit: Since here & line 174 return the same va
146 171
147 _channel._onSinkDisconnected(); 172 _channel._onSinkDisconnected();
148 return _inner.close(); 173 _doneCompleter.complete(_inner.close());
174 return done;
149 } 175 }
150 176
151 /// Called by [GuaranteeChannel] when the stream emits a done event. 177 /// Called by [GuaranteeChannel] when the stream emits a done event.
152 /// 178 ///
153 /// The stream being done indicates that the connection is closed, so the 179 /// The stream being done indicates that the connection is closed, so the
154 /// sink should stop forwarding events. 180 /// sink should stop forwarding events.
155 void _onStreamDisconnected() { 181 void _onStreamDisconnected() {
156 _disconnected = true; 182 _disconnected = true;
183 if (!_doneCompleter.isCompleted) _doneCompleter.complete();
184
157 if (!_inAddStream) return; 185 if (!_inAddStream) return;
158
159 _addStreamCompleter.complete(_addStreamSubscription.cancel()); 186 _addStreamCompleter.complete(_addStreamSubscription.cancel());
160 _addStreamCompleter = null; 187 _addStreamCompleter = null;
161 _addStreamSubscription = null; 188 _addStreamSubscription = null;
162 } 189 }
163 } 190 }
OLDNEW
« no previous file with comments | « no previous file | lib/src/stream_channel_controller.dart » ('j') | test/with_guarantees_test.dart » ('J')

Powered by Google App Engine
This is Rietveld 408576698