Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(14)

Unified Diff: third_party/grpc/src/python/grpcio/grpc/framework/base/_termination.py

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/grpc/src/python/grpcio/grpc/framework/base/_termination.py
diff --git a/third_party/grpc/src/python/grpcio/grpc/framework/base/_termination.py b/third_party/grpc/src/python/grpcio/grpc/framework/base/_termination.py
new file mode 100644
index 0000000000000000000000000000000000000000..ddcbc60293d010c82dda7266cd25edc47590123e
--- /dev/null
+++ b/third_party/grpc/src/python/grpcio/grpc/framework/base/_termination.py
@@ -0,0 +1,204 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for operation termination."""
+
+import enum
+
+from grpc.framework.base import _constants
+from grpc.framework.base import _interfaces
+from grpc.framework.base import interfaces
+from grpc.framework.foundation import callable_util
+
+_CALLBACK_EXCEPTION_LOG_MESSAGE = 'Exception calling termination callback!'
+
+
+@enum.unique
+class _Requirement(enum.Enum):
+ """Symbols indicating events required for termination."""
+
+ EMISSION = 'emission'
+ TRANSMISSION = 'transmission'
+ INGESTION = 'ingestion'
+
+_FRONT_NOT_LISTENING_REQUIREMENTS = (_Requirement.TRANSMISSION,)
+_BACK_NOT_LISTENING_REQUIREMENTS = (
+ _Requirement.EMISSION, _Requirement.INGESTION,)
+_LISTENING_REQUIREMENTS = (
+ _Requirement.TRANSMISSION, _Requirement.INGESTION,)
+
+
+class _TerminationManager(_interfaces.TerminationManager):
+ """An implementation of _interfaces.TerminationManager."""
+
+ def __init__(
+ self, work_pool, utility_pool, action, requirements, local_failure):
+ """Constructor.
+
+ Args:
+ work_pool: A thread pool in which customer work will be done.
+ utility_pool: A thread pool in which work utility work will be done.
+ action: An action to call on operation termination.
+ requirements: A combination of _Requirement values identifying what
+ must finish for the operation to be considered completed.
+ local_failure: An interfaces.Outcome specifying what constitutes local
+ failure of customer work.
+ """
+ self._work_pool = work_pool
+ self._utility_pool = utility_pool
+ self._action = action
+ self._local_failure = local_failure
+ self._has_locally_failed = False
+ self._expiration_manager = None
+
+ self._outstanding_requirements = set(requirements)
+ self._outcome = None
+ self._callbacks = []
+
+ def set_expiration_manager(self, expiration_manager):
+ self._expiration_manager = expiration_manager
+
+ def _terminate(self, outcome):
+ """Terminates the operation.
+
+ Args:
+ outcome: An interfaces.Outcome describing the outcome of the operation.
+ """
+ self._expiration_manager.abort()
+ self._outstanding_requirements = None
+ callbacks = list(self._callbacks)
+ self._callbacks = None
+ self._outcome = outcome
+
+ act = callable_util.with_exceptions_logged(
+ self._action, _constants.INTERNAL_ERROR_LOG_MESSAGE)
+
+ if self._has_locally_failed:
+ self._utility_pool.submit(act, outcome)
+ else:
+ def call_callbacks_and_act(callbacks, outcome):
+ for callback in callbacks:
+ callback_outcome = callable_util.call_logging_exceptions(
+ callback, _CALLBACK_EXCEPTION_LOG_MESSAGE, outcome)
+ if callback_outcome.exception is not None:
+ outcome = self._local_failure
+ break
+ self._utility_pool.submit(act, outcome)
+
+ self._work_pool.submit(callable_util.with_exceptions_logged(
+ call_callbacks_and_act,
+ _constants.INTERNAL_ERROR_LOG_MESSAGE),
+ callbacks, outcome)
+
+ def is_active(self):
+ """See _interfaces.TerminationManager.is_active for specification."""
+ return self._outstanding_requirements is not None
+
+ def add_callback(self, callback):
+ """See _interfaces.TerminationManager.add_callback for specification."""
+ if not self._has_locally_failed:
+ if self._outstanding_requirements is None:
+ self._work_pool.submit(
+ callable_util.with_exceptions_logged(
+ callback, _CALLBACK_EXCEPTION_LOG_MESSAGE), self._outcome)
+ else:
+ self._callbacks.append(callback)
+
+ def emission_complete(self):
+ """See superclass method for specification."""
+ if self._outstanding_requirements is not None:
+ self._outstanding_requirements.discard(_Requirement.EMISSION)
+ if not self._outstanding_requirements:
+ self._terminate(interfaces.Outcome.COMPLETED)
+
+ def transmission_complete(self):
+ """See superclass method for specification."""
+ if self._outstanding_requirements is not None:
+ self._outstanding_requirements.discard(_Requirement.TRANSMISSION)
+ if not self._outstanding_requirements:
+ self._terminate(interfaces.Outcome.COMPLETED)
+
+ def ingestion_complete(self):
+ """See superclass method for specification."""
+ if self._outstanding_requirements is not None:
+ self._outstanding_requirements.discard(_Requirement.INGESTION)
+ if not self._outstanding_requirements:
+ self._terminate(interfaces.Outcome.COMPLETED)
+
+ def abort(self, outcome):
+ """See _interfaces.TerminationManager.abort for specification."""
+ if outcome is self._local_failure:
+ self._has_failed_locally = True
+ if self._outstanding_requirements is not None:
+ self._terminate(outcome)
+
+
+def front_termination_manager(
+ work_pool, utility_pool, action, subscription_kind):
+ """Creates a TerminationManager appropriate for front-side use.
+
+ Args:
+ work_pool: A thread pool in which customer work will be done.
+ utility_pool: A thread pool in which work utility work will be done.
+ action: An action to call on operation termination.
+ subscription_kind: An interfaces.ServicedSubscription.Kind value.
+
+ Returns:
+ A TerminationManager appropriate for front-side use.
+ """
+ if subscription_kind is interfaces.ServicedSubscription.Kind.NONE:
+ requirements = _FRONT_NOT_LISTENING_REQUIREMENTS
+ else:
+ requirements = _LISTENING_REQUIREMENTS
+
+ return _TerminationManager(
+ work_pool, utility_pool, action, requirements,
+ interfaces.Outcome.SERVICED_FAILURE)
+
+
+def back_termination_manager(work_pool, utility_pool, action, subscription_kind):
+ """Creates a TerminationManager appropriate for back-side use.
+
+ Args:
+ work_pool: A thread pool in which customer work will be done.
+ utility_pool: A thread pool in which work utility work will be done.
+ action: An action to call on operation termination.
+ subscription_kind: An interfaces.ServicedSubscription.Kind value.
+
+ Returns:
+ A TerminationManager appropriate for back-side use.
+ """
+ if subscription_kind is interfaces.ServicedSubscription.Kind.NONE:
+ requirements = _BACK_NOT_LISTENING_REQUIREMENTS
+ else:
+ requirements = _LISTENING_REQUIREMENTS
+
+ return _TerminationManager(
+ work_pool, utility_pool, action, requirements,
+ interfaces.Outcome.SERVICER_FAILURE)

Powered by Google App Engine
This is Rietveld 408576698