Index: sdk/lib/_internal/pub_generated/lib/src/error_group.dart |
diff --git a/sdk/lib/_internal/pub_generated/lib/src/error_group.dart b/sdk/lib/_internal/pub_generated/lib/src/error_group.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..0262085cf0b61243c61d2ca87b0c3d8c6b28a997 |
--- /dev/null |
+++ b/sdk/lib/_internal/pub_generated/lib/src/error_group.dart |
@@ -0,0 +1,143 @@ |
+library pub.error_group; |
+import 'dart:async'; |
+class ErrorGroup { |
+ final _futures = <_ErrorGroupFuture>[]; |
+ final _streams = <_ErrorGroupStream>[]; |
+ var _isDone = false; |
+ final _doneCompleter = new Completer(); |
+ _ErrorGroupFuture _done; |
+ Future get done => _done; |
+ ErrorGroup() { |
+ this._done = new _ErrorGroupFuture(this, _doneCompleter.future); |
+ } |
+ Future registerFuture(Future future) { |
+ if (_isDone) { |
+ throw new StateError( |
+ "Can't register new members on a complete " "ErrorGroup."); |
+ } |
+ var wrapped = new _ErrorGroupFuture(this, future); |
+ _futures.add(wrapped); |
+ return wrapped; |
+ } |
+ Stream registerStream(Stream stream) { |
+ if (_isDone) { |
+ throw new StateError( |
+ "Can't register new members on a complete " "ErrorGroup."); |
+ } |
+ var wrapped = new _ErrorGroupStream(this, stream); |
+ _streams.add(wrapped); |
+ return wrapped; |
+ } |
+ void signalError(var error, [StackTrace stackTrace]) { |
+ if (_isDone) { |
+ throw new StateError("Can't signal errors on a complete ErrorGroup."); |
+ } |
+ _signalError(error, stackTrace); |
+ } |
+ void _signalError(var error, [StackTrace stackTrace]) { |
+ if (_isDone) return; |
+ var caught = false; |
+ for (var future in _futures) { |
+ if (future._isDone || future._hasListeners) caught = true; |
+ future._signalError(error, stackTrace); |
+ } |
+ for (var stream in _streams) { |
+ if (stream._isDone || stream._hasListeners) caught = true; |
+ stream._signalError(error, stackTrace); |
+ } |
+ _isDone = true; |
+ _done._signalError(error, stackTrace); |
+ if (!caught && !_done._hasListeners) scheduleMicrotask(() { |
+ throw error; |
+ }); |
+ } |
+ void _signalFutureComplete(_ErrorGroupFuture future) { |
+ if (_isDone) return; |
+ _isDone = _futures.every((future) => future._isDone) && |
+ _streams.every((stream) => stream._isDone); |
+ if (_isDone) _doneCompleter.complete(); |
+ } |
+ void _signalStreamComplete(_ErrorGroupStream stream) { |
+ if (_isDone) return; |
+ _isDone = _futures.every((future) => future._isDone) && |
+ _streams.every((stream) => stream._isDone); |
+ if (_isDone) _doneCompleter.complete(); |
+ } |
+} |
+class _ErrorGroupFuture implements Future { |
+ final ErrorGroup _group; |
+ var _isDone = false; |
+ final _completer = new Completer(); |
+ bool _hasListeners = false; |
+ _ErrorGroupFuture(this._group, Future inner) { |
+ inner.then((value) { |
+ if (!_isDone) _completer.complete(value); |
+ _isDone = true; |
+ _group._signalFutureComplete(this); |
+ }).catchError(_group._signalError); |
+ _completer.future.catchError((_) {}); |
+ } |
+ Future then(onValue(value), {Function onError}) { |
+ _hasListeners = true; |
+ return _completer.future.then(onValue, onError: onError); |
+ } |
+ Future catchError(Function onError, {bool test(Object error)}) { |
+ _hasListeners = true; |
+ return _completer.future.catchError(onError, test: test); |
+ } |
+ Future whenComplete(void action()) { |
+ _hasListeners = true; |
+ return _completer.future.whenComplete(action); |
+ } |
+ Future timeout(Duration timeLimit, {void onTimeout()}) { |
+ _hasListeners = true; |
+ return _completer.future.timeout(timeLimit, onTimeout: onTimeout); |
+ } |
+ Stream asStream() { |
+ _hasListeners = true; |
+ return _completer.future.asStream(); |
+ } |
+ void _signalError(var error, [StackTrace stackTrace]) { |
+ if (!_isDone) _completer.completeError(error, stackTrace); |
+ _isDone = true; |
+ } |
+} |
+class _ErrorGroupStream extends Stream { |
+ final ErrorGroup _group; |
+ var _isDone = false; |
+ final StreamController _controller; |
+ Stream _stream; |
+ StreamSubscription _subscription; |
+ bool get _hasListeners => _controller.hasListener; |
+ _ErrorGroupStream(this._group, Stream inner) |
+ : _controller = new StreamController(sync: true) { |
+ _stream = inner.isBroadcast ? |
+ _controller.stream.asBroadcastStream(onCancel: (sub) => sub.cancel()) : |
+ _controller.stream; |
+ _subscription = inner.listen((v) { |
+ _controller.add(v); |
+ }, onError: (e, [stackTrace]) { |
+ _group._signalError(e, stackTrace); |
+ }, onDone: () { |
+ _isDone = true; |
+ _group._signalStreamComplete(this); |
+ _controller.close(); |
+ }); |
+ } |
+ StreamSubscription listen(void onData(value), {Function onError, void |
+ onDone(), bool cancelOnError}) { |
+ return _stream.listen( |
+ onData, |
+ onError: onError, |
+ onDone: onDone, |
+ cancelOnError: true); |
+ } |
+ void _signalError(var e, [StackTrace stackTrace]) { |
+ if (_isDone) return; |
+ _subscription.cancel(); |
+ new Future.value().then((_) { |
+ _controller.addError(e, stackTrace); |
+ _controller.close(); |
+ }); |
+ } |
+} |