Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(583)

Unified Diff: utils/pub/error_group.dart

Issue 11941003: Add an ErrorGroup class to Pub. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | utils/tests/pub/error_group_test.dart » ('j') | utils/tests/pub/error_group_test.dart » ('J')
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: utils/pub/error_group.dart
diff --git a/utils/pub/error_group.dart b/utils/pub/error_group.dart
new file mode 100644
index 0000000000000000000000000000000000000000..afd5f8b75915f88daee06de49c22506eda5bf2e5
--- /dev/null
+++ b/utils/pub/error_group.dart
@@ -0,0 +1,268 @@
+// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library error_group;
+
+import 'dart:async';
+
+/// An [ErrorGroup] entangles the errors of multiple [Future]s and [Stream]s
+/// with one another. This allows APIs to expose multiple [Future]s and
+/// [Stream]s that have identical error conditions without forcing API consumers
+/// to attach error handling to objects they don't care about.
+///
+/// To use an [ErrorGroup], register [Future]s and [Stream]s with it using
+/// [registerFuture] and [registerStream]. These methods return wrapped versions
+/// of the [Future]s and [Stream]s, which should then be used in place of the
+/// originals. For example:
+///
+/// var errorGroup = new ErrorGroup();
+/// future = errorGroup.registerFuture(future);
+/// stream = errorGroup.registerStream(stream);
+///
+/// An [ErrorGroup] has two major effects on its wrapped members:
+///
+/// * An error in any member of the group will be propagated to every member
+/// that hasn't already completed. If those members later complete, their
+/// values will be ignored.
+/// * If any member of this group has a listener, errors on members without
+/// listeners won't get passed to the top-level error handler.
+class ErrorGroup {
+ /// The [Future]s that are members of [this].
+ final _futures = <_ErrorGroupFuture>[];
+
+ /// The [Stream]s that are members of [this].
+ final _streams = <_ErrorGroupStream>[];
+
+ /// Whether [this] has completed, either successfully or with an error.
+ var _isComplete = false;
Bob Nystrom 2013/01/15 22:49:47 var -> bool
nweiz 2013/01/16 00:12:10 You've told me in the past not to use type declara
Bob Nystrom 2013/01/16 18:03:44 I think the rough guideline we've followed is to d
nweiz 2013/01/16 19:29:11 ...why? It's no less inferable in the "var" case t
+
+ /// The [Completer] for [complete].
+ final _completeCompleter = new Completer();
+
+ /// The underlying [Future] for [complete]. We need to be able to access it
+ /// internally as an [_ErrorGroupFuture] so we can check if it has listeners
+ /// and signal errors on it.
+ _ErrorGroupFuture _complete;
+
+ /// Returns a [Future] that completes successully when all members of [this]
+ /// are complete, or with an error if any member receives an error.
+ ///
+ /// This [Future] is effectively in the group in that an error on it won't be
+ /// passed to the top-level error handler unless no members of the group have
+ /// listeners attached.
+ Future get complete => _complete;
Bob Nystrom 2013/01/15 22:49:47 "completed"? The name "complete" seems odd to me h
nweiz 2013/01/16 00:12:10 Changed to "done" as per offline discussion.
+
+ /// Creates a new group with no members.
+ ErrorGroup() {
+ this._complete = new _ErrorGroupFuture(this, _completeCompleter.future);
+ }
+
+ /// Registers a [Future] as a member of [this]. Returns a wrapped version of
+ /// [future] that should be used in its place.
Bob Nystrom 2013/01/15 22:49:47 Document that it's an error to register an already
nweiz 2013/01/16 00:12:10 It's not. It's a little silly, but it'll work fine
+ ///
+ /// If all members of [this] have already completed successfully or with an
+ /// error, it's a [StateError] to try to register a new [Future].
+ Future registerFuture(Future future) {
Bob Nystrom 2013/01/15 22:49:47 What do you think of just "addFuture" instead of "
nweiz 2013/01/16 00:12:10 I think that makes it sound a little too much like
+ if (_isComplete) {
+ throw new StateError("Can't register new members on a complete "
+ "ErrorGroup.");
+ }
+
+ var wrapped = new _ErrorGroupFuture(this, future);
+ _futures.add(wrapped);
+ return wrapped;
+ }
+
+ /// Registers a [Stream] as a member of [this]. Returns a wrapped version of
+ /// [stream] that should be used in its place. The returned [Stream] will be
+ /// multi-subscription if and only if [stream] is.
+ ///
+ /// Since all errors in a group are passed to all members, the returned
+ /// [Stream] will automatically unsubscribe all its listeners when it
+ /// encounters an error.
+ ///
+ /// If all members of [this] have already completed successfully or with an
+ /// error, it's a [StateError] to try to register a new [Stream].
Bob Nystrom 2013/01/15 22:49:47 Document that it's an error to register an already
nweiz 2013/01/16 00:12:10 See above.
+ Stream registerStream(Stream stream) {
+ if (_isComplete) {
+ throw new StateError("Can't register new members on a complete "
+ "ErrorGroup.");
+ }
+
+ var wrapped = new _ErrorGroupStream(this, stream);
+ _streams.add(wrapped);
+ return wrapped;
+ }
+
+ /// Sends [error] to all members of [this]. Like errors that come from
+ /// members, this will only be passed to the top-level error handler if no
+ /// members have listeners.
+ ///
+ /// If all members of [this] have already completed successfully or with an
+ /// error, it's a [StateError] to try to signal an error.
+ void signalError(AsyncError error) {
+ if (_isComplete) {
+ throw new StateError("Can't signal errors on a complete ErrorGroup.");
+ }
+
+ _signalError(error);
+ }
+
+ /// Signal an error internally. This is just like [signalError], but instead
+ /// of throwing an error if [this] is complete, it just does nothing.
+ void _signalError(AsyncError error) {
+ if (_isComplete) return;
+
+ var caught = false;
+ for (var future in _futures) {
+ if (future._isComplete || future._hasListeners) caught = true;
+ future._signalError(error);
+ }
+
+ for (var stream in _streams) {
+ if (stream._isComplete || stream._hasListeners) caught = true;
+ stream._signalError(error);
+ }
+
+ _isComplete = true;
+ _complete._signalError(error);
+ if (!caught && !_complete._hasListeners) error.throwDelayed();
+ }
+
+ /// Notifies [this] that one of its member [Future]s is complete.
+ void _signalFutureComplete(_ErrorGroupFuture future) {
+ if (_isComplete) return;
+
+ _isComplete = _futures.every((future) => future._isComplete) &&
+ _streams.every((stream) => stream._isComplete);
Bob Nystrom 2013/01/15 22:49:47 Indent another 2.
nweiz 2013/01/16 00:12:10 Done.
+ if (_isComplete) _completeCompleter.complete();
+ }
+
+ /// Notifies [this] that one of its member [Stream]s is complete.
+ void _signalStreamComplete(_ErrorGroupStream stream) {
+ if (_isComplete) return;
+
+ _isComplete = _futures.every((future) => future._isComplete) &&
+ _streams.every((stream) => stream._isComplete);
Bob Nystrom 2013/01/15 22:49:47 Ditto.
nweiz 2013/01/16 00:12:10 Done.
+ if (_isComplete) _completeCompleter.complete();
+ }
+}
+
+/// A [Future] wrapper that keeps track of whether it's been completed and
+/// whether it has any listeners. It also notifies its parent [ErrorGroup] when
+/// it completes successfully or receives an error.
+class _ErrorGroupFuture implements Future {
+ /// The parent [ErrorGroup].
+ final ErrorGroup _group;
+
+ /// Whether [this] has completed, either successfully or with an error.
+ var _isComplete = false;
+
+ /// The underlying [Completer] for [this].
+ final _completer = new Completer();
+
+ /// Whether [this] has any listeners.
+ bool _hasListeners = false;
+
+ /// Creates a new [_ErrorGroupFuture] that's a child of [_group] and wraps
+ /// [inner].
+ _ErrorGroupFuture(this._group, Future inner) {
+ inner.then((value) {
+ if (!_isComplete) _completer.complete(value);
+ _isComplete = true;
+ _group._signalFutureComplete(this);
+ }).catchError((error) => _group._signalError(error));
+
+ // Make sure _completer.future doesn't automatically send errors to the
+ // top-level.
+ _completer.future.catchError((_) {});
+ }
+
+ Future then(onValue(T value), {onError(AsyncError asyncError)}) {
+ _hasListeners = true;
+ return _completer.future.then(onValue, onError: onError);
+ }
+
+ Future catchError(onError(AsyncError asyncError), {bool test(Object error)}) {
+ _hasListeners = true;
+ return _completer.future.catchError(onError, test: test);
+ }
+
+ Future whenComplete(void action()) {
+ _hasListeners = true;
+ return _completer.future.whenComplete(action);
+ }
+
+ Stream<T> asStream() {
+ _hasListeners = true;
+ return _completer.future.asStream();
+ }
+
+ /// Signal that an error from [_group] should be propagated through [this],
+ /// unless it's already complete.
+ void _signalError(AsyncError error) {
+ if (!_isComplete) _completer.completeError(error.error, error.stackTrace);
+ _isComplete = true;
+ }
+}
+
+// TODO(nweiz): currently streams never top-level unhandled errors (issue 7843).
+// When this is fixed, this class will need to prevent such errors from being
+// top-leveled.
+/// A [Stream] wrapper that keeps track of whether it's been completed and
+/// whether it has any listeners. It also notifies its parent [ErrorGroup] when
+/// it completes successfully or receives an error.
+class _ErrorGroupStream extends Stream {
+ /// The parent [ErrorGroup].
+ final ErrorGroup _group;
+
+ /// Whether [this] has completed, either successfully or with an error.
+ var _isComplete = false;
+
+ /// The underlying [StreamController] for [this].
+ final StreamController _controller;
+
+ /// The [StreamSubscription] that connects the wrapped [Stream] to
+ /// [_controller].
+ StreamSubscription _subscription;
+
+ /// Whether [this] has any listeners.
+ bool get _hasListeners => _controller.hasSubscribers;
+
+ /// Creates a new [_ErrorGroupFuture] that's a child of [_group] and wraps
+ /// [inner].
+ _ErrorGroupStream(this._group, Stream inner)
+ : _controller = inner.isSingleSubscription ?
+ new StreamController() :
+ new StreamController.multiSubscription() {
+ _subscription = inner.listen(_controller.add,
+ onError: (e) => _group._signalError(e),
+ onDone: () {
+ _isComplete = true;
+ _group._signalStreamComplete(this);
+ _controller.close();
+ });
+ }
+
+ StreamSubscription listen(void onData(value),
+ {void onError(AsyncError error), void onDone(),
+ bool unsubscribeOnError}) {
+ return _controller.listen(onData,
+ onError: onError,
+ onDone: onDone,
+ unsubscribeOnError: true);
+ }
+
+ /// Signal that an error from [_group] should be propagated through [this],
+ /// unless it's already complete.
+ void _signalError(AsyncError e) {
+ if (_isComplete) return;
+ _subscription.cancel();
+ // Call these asynchronously to work around issue 7913.
+ new Future.immediate(null).then((_) {
+ _controller.signalError(e.error, e.stackTrace);
+ _controller.close();
+ });
+ }
+}
« no previous file with comments | « no previous file | utils/tests/pub/error_group_test.dart » ('j') | utils/tests/pub/error_group_test.dart » ('J')

Powered by Google App Engine
This is Rietveld 408576698