OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 library pub.error_group; | 5 library pub.error_group; |
6 | 6 |
7 import 'dart:async'; | 7 import 'dart:async'; |
8 | 8 |
9 /// An [ErrorGroup] entangles the errors of multiple [Future]s and [Stream]s | 9 /// An [ErrorGroup] entangles the errors of multiple [Future]s and [Stream]s |
10 /// with one another. | 10 /// with one another. |
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
62 } | 62 } |
63 | 63 |
64 /// Registers a [Future] as a member of [this]. | 64 /// Registers a [Future] as a member of [this]. |
65 /// | 65 /// |
66 /// Returns a wrapped version of [future] that should be used in its place. | 66 /// Returns a wrapped version of [future] that should be used in its place. |
67 /// | 67 /// |
68 /// If all members of [this] have already completed successfully or with an | 68 /// If all members of [this] have already completed successfully or with an |
69 /// error, it's a [StateError] to try to register a new [Future]. | 69 /// error, it's a [StateError] to try to register a new [Future]. |
70 Future registerFuture(Future future) { | 70 Future registerFuture(Future future) { |
71 if (_isDone) { | 71 if (_isDone) { |
72 throw new StateError("Can't register new members on a complete " | 72 throw new StateError( |
73 "ErrorGroup."); | 73 "Can't register new members on a complete " "ErrorGroup."); |
74 } | 74 } |
75 | 75 |
76 var wrapped = new _ErrorGroupFuture(this, future); | 76 var wrapped = new _ErrorGroupFuture(this, future); |
77 _futures.add(wrapped); | 77 _futures.add(wrapped); |
78 return wrapped; | 78 return wrapped; |
79 } | 79 } |
80 | 80 |
81 /// Registers a [Stream] as a member of [this]. | 81 /// Registers a [Stream] as a member of [this]. |
82 /// | 82 /// |
83 /// Returns a wrapped version of [stream] that should be used in its place. | 83 /// Returns a wrapped version of [stream] that should be used in its place. |
84 /// The returned [Stream] will be multi-subscription if and only if [stream] | 84 /// The returned [Stream] will be multi-subscription if and only if [stream] |
85 /// is. | 85 /// is. |
86 /// | 86 /// |
87 /// Since all errors in a group are passed to all members, the returned | 87 /// Since all errors in a group are passed to all members, the returned |
88 /// [Stream] will automatically unsubscribe all its listeners when it | 88 /// [Stream] will automatically unsubscribe all its listeners when it |
89 /// encounters an error. | 89 /// encounters an error. |
90 /// | 90 /// |
91 /// If all members of [this] have already completed successfully or with an | 91 /// If all members of [this] have already completed successfully or with an |
92 /// error, it's a [StateError] to try to register a new [Stream]. | 92 /// error, it's a [StateError] to try to register a new [Stream]. |
93 Stream registerStream(Stream stream) { | 93 Stream registerStream(Stream stream) { |
94 if (_isDone) { | 94 if (_isDone) { |
95 throw new StateError("Can't register new members on a complete " | 95 throw new StateError( |
96 "ErrorGroup."); | 96 "Can't register new members on a complete " "ErrorGroup."); |
97 } | 97 } |
98 | 98 |
99 var wrapped = new _ErrorGroupStream(this, stream); | 99 var wrapped = new _ErrorGroupStream(this, stream); |
100 _streams.add(wrapped); | 100 _streams.add(wrapped); |
101 return wrapped; | 101 return wrapped; |
102 } | 102 } |
103 | 103 |
104 /// Sends [error] to all members of [this]. | 104 /// Sends [error] to all members of [this]. |
105 /// | 105 /// |
106 /// Like errors that come from members, this will only be passed to the | 106 /// Like errors that come from members, this will only be passed to the |
(...skipping 22 matching lines...) Expand all Loading... |
129 future._signalError(error, stackTrace); | 129 future._signalError(error, stackTrace); |
130 } | 130 } |
131 | 131 |
132 for (var stream in _streams) { | 132 for (var stream in _streams) { |
133 if (stream._isDone || stream._hasListeners) caught = true; | 133 if (stream._isDone || stream._hasListeners) caught = true; |
134 stream._signalError(error, stackTrace); | 134 stream._signalError(error, stackTrace); |
135 } | 135 } |
136 | 136 |
137 _isDone = true; | 137 _isDone = true; |
138 _done._signalError(error, stackTrace); | 138 _done._signalError(error, stackTrace); |
139 if (!caught && !_done._hasListeners) scheduleMicrotask((){ throw error; }); | 139 if (!caught && !_done._hasListeners) scheduleMicrotask(() { |
| 140 throw error; |
| 141 }); |
140 } | 142 } |
141 | 143 |
142 /// Notifies [this] that one of its member [Future]s is complete. | 144 /// Notifies [this] that one of its member [Future]s is complete. |
143 void _signalFutureComplete(_ErrorGroupFuture future) { | 145 void _signalFutureComplete(_ErrorGroupFuture future) { |
144 if (_isDone) return; | 146 if (_isDone) return; |
145 | 147 |
146 _isDone = _futures.every((future) => future._isDone) && | 148 _isDone = _futures.every((future) => future._isDone) && |
147 _streams.every((stream) => stream._isDone); | 149 _streams.every((stream) => stream._isDone); |
148 if (_isDone) _doneCompleter.complete(); | 150 if (_isDone) _doneCompleter.complete(); |
149 } | 151 } |
(...skipping 100 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
250 /// The [StreamSubscription] that connects the wrapped [Stream] to | 252 /// The [StreamSubscription] that connects the wrapped [Stream] to |
251 /// [_controller]. | 253 /// [_controller]. |
252 StreamSubscription _subscription; | 254 StreamSubscription _subscription; |
253 | 255 |
254 /// Whether [this] has any listeners. | 256 /// Whether [this] has any listeners. |
255 bool get _hasListeners => _controller.hasListener; | 257 bool get _hasListeners => _controller.hasListener; |
256 | 258 |
257 /// Creates a new [_ErrorGroupFuture] that's a child of [_group] and wraps | 259 /// Creates a new [_ErrorGroupFuture] that's a child of [_group] and wraps |
258 /// [inner]. | 260 /// [inner]. |
259 _ErrorGroupStream(this._group, Stream inner) | 261 _ErrorGroupStream(this._group, Stream inner) |
260 : _controller = new StreamController(sync: true) { | 262 : _controller = new StreamController(sync: true) { |
261 // Use old-style asBroadcastStream behavior - cancel source _subscription | 263 // Use old-style asBroadcastStream behavior - cancel source _subscription |
262 // the first time the stream has no listeners. | 264 // the first time the stream has no listeners. |
263 _stream = inner.isBroadcast | 265 _stream = inner.isBroadcast ? |
264 ? _controller.stream.asBroadcastStream(onCancel: (sub) => sub.cancel()) | 266 _controller.stream.asBroadcastStream(onCancel: (sub) => sub.cancel()) : |
265 : _controller.stream; | 267 _controller.stream; |
266 _subscription = inner.listen((v) { | 268 _subscription = inner.listen((v) { |
267 _controller.add(v); | 269 _controller.add(v); |
268 }, onError: (e, [stackTrace]) { | 270 }, onError: (e, [stackTrace]) { |
269 _group._signalError(e, stackTrace); | 271 _group._signalError(e, stackTrace); |
270 }, onDone: () { | 272 }, onDone: () { |
271 _isDone = true; | 273 _isDone = true; |
272 _group._signalStreamComplete(this); | 274 _group._signalStreamComplete(this); |
273 _controller.close(); | 275 _controller.close(); |
274 }); | 276 }); |
275 } | 277 } |
276 | 278 |
277 StreamSubscription listen(void onData(value), | 279 StreamSubscription listen(void onData(value), {Function onError, void |
278 {Function onError, void onDone(), | 280 onDone(), bool cancelOnError}) { |
279 bool cancelOnError}) { | 281 return _stream.listen( |
280 return _stream.listen(onData, | 282 onData, |
281 onError: onError, | 283 onError: onError, |
282 onDone: onDone, | 284 onDone: onDone, |
283 cancelOnError: true); | 285 cancelOnError: true); |
284 } | 286 } |
285 | 287 |
286 /// Signal that an error from [_group] should be propagated through [this], | 288 /// Signal that an error from [_group] should be propagated through [this], |
287 /// unless it's already complete. | 289 /// unless it's already complete. |
288 void _signalError(var e, [StackTrace stackTrace]) { | 290 void _signalError(var e, [StackTrace stackTrace]) { |
289 if (_isDone) return; | 291 if (_isDone) return; |
290 _subscription.cancel(); | 292 _subscription.cancel(); |
291 // Call these asynchronously to work around issue 7913. | 293 // Call these asynchronously to work around issue 7913. |
292 new Future.value().then((_) { | 294 new Future.value().then((_) { |
293 _controller.addError(e, stackTrace); | 295 _controller.addError(e, stackTrace); |
294 _controller.close(); | 296 _controller.close(); |
295 }); | 297 }); |
296 } | 298 } |
297 } | 299 } |
OLD | NEW |