Index: third_party/grpc/src/python/grpcio/grpc/framework/crust/_control.py |
diff --git a/third_party/grpc/src/python/grpcio/grpc/framework/crust/_control.py b/third_party/grpc/src/python/grpcio/grpc/framework/crust/_control.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..5e9efdf7322790c3f5c790907c30ed231fd5eefc |
--- /dev/null |
+++ b/third_party/grpc/src/python/grpcio/grpc/framework/crust/_control.py |
@@ -0,0 +1,581 @@ |
+# Copyright 2015, Google Inc. |
+# All rights reserved. |
+# |
+# Redistribution and use in source and binary forms, with or without |
+# modification, are permitted provided that the following conditions are |
+# met: |
+# |
+# * Redistributions of source code must retain the above copyright |
+# notice, this list of conditions and the following disclaimer. |
+# * Redistributions in binary form must reproduce the above |
+# copyright notice, this list of conditions and the following disclaimer |
+# in the documentation and/or other materials provided with the |
+# distribution. |
+# * Neither the name of Google Inc. nor the names of its |
+# contributors may be used to endorse or promote products derived from |
+# this software without specific prior written permission. |
+# |
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
+ |
+"""State and behavior for translating between sync and async control flow.""" |
+ |
+import collections |
+import enum |
+import sys |
+import threading |
+import time |
+ |
+from grpc.framework.foundation import abandonment |
+from grpc.framework.foundation import callable_util |
+from grpc.framework.foundation import future |
+from grpc.framework.foundation import stream |
+from grpc.framework.interfaces.base import base |
+from grpc.framework.interfaces.base import utilities |
+from grpc.framework.interfaces.face import face |
+ |
+_DONE_CALLBACK_LOG_MESSAGE = 'Exception calling Future "done" callback!' |
+_INTERNAL_ERROR_LOG_MESSAGE = ':-( RPC Framework (Crust) Internal Error! )-:' |
+ |
+_CANNOT_SET_INITIAL_METADATA = ( |
+ 'Could not set initial metadata - has it already been set, or has a ' + |
+ 'payload already been sent?') |
+_CANNOT_SET_TERMINAL_METADATA = ( |
+ 'Could not set terminal metadata - has it already been set, or has RPC ' + |
+ 'completion already been indicated?') |
+_CANNOT_SET_CODE = ( |
+ 'Could not set code - has it already been set, or has RPC completion ' + |
+ 'already been indicated?') |
+_CANNOT_SET_DETAILS = ( |
+ 'Could not set details - has it already been set, or has RPC completion ' + |
+ 'already been indicated?') |
+ |
+ |
+class _DummyOperator(base.Operator): |
+ |
+ def advance( |
+ self, initial_metadata=None, payload=None, completion=None, |
+ allowance=None): |
+ pass |
+ |
+_DUMMY_OPERATOR = _DummyOperator() |
+ |
+ |
+class _Awaited( |
+ collections.namedtuple('_Awaited', ('kind', 'value',))): |
+ |
+ @enum.unique |
+ class Kind(enum.Enum): |
+ NOT_YET_ARRIVED = 'not yet arrived' |
+ ARRIVED = 'arrived' |
+ |
+_NOT_YET_ARRIVED = _Awaited(_Awaited.Kind.NOT_YET_ARRIVED, None) |
+_ARRIVED_AND_NONE = _Awaited(_Awaited.Kind.ARRIVED, None) |
+ |
+ |
+class _Transitory( |
+ collections.namedtuple('_Transitory', ('kind', 'value',))): |
+ |
+ @enum.unique |
+ class Kind(enum.Enum): |
+ NOT_YET_SEEN = 'not yet seen' |
+ PRESENT = 'present' |
+ GONE = 'gone' |
+ |
+_NOT_YET_SEEN = _Transitory(_Transitory.Kind.NOT_YET_SEEN, None) |
+_GONE = _Transitory(_Transitory.Kind.GONE, None) |
+ |
+ |
+class _Termination( |
+ collections.namedtuple( |
+ '_Termination', ('terminated', 'abortion', 'abortion_error',))): |
+ """Values indicating whether and how an RPC has terminated. |
+ |
+ Attributes: |
+ terminated: A boolean indicating whether or not the RPC has terminated. |
+ abortion: A face.Abortion value describing the RPC's abortion or None if the |
+ RPC did not abort. |
+ abortion_error: A face.AbortionError describing the RPC's abortion or None |
+ if the RPC did not abort. |
+ """ |
+ |
+_NOT_TERMINATED = _Termination(False, None, None) |
+ |
+_OPERATION_OUTCOME_KIND_TO_TERMINATION_CONSTRUCTOR = { |
+ base.Outcome.Kind.COMPLETED: lambda *unused_args: _Termination( |
+ True, None, None), |
+ base.Outcome.Kind.CANCELLED: lambda *args: _Termination( |
+ True, face.Abortion(face.Abortion.Kind.CANCELLED, *args), |
+ face.CancellationError(*args)), |
+ base.Outcome.Kind.EXPIRED: lambda *args: _Termination( |
+ True, face.Abortion(face.Abortion.Kind.EXPIRED, *args), |
+ face.ExpirationError(*args)), |
+ base.Outcome.Kind.LOCAL_SHUTDOWN: lambda *args: _Termination( |
+ True, face.Abortion(face.Abortion.Kind.LOCAL_SHUTDOWN, *args), |
+ face.LocalShutdownError(*args)), |
+ base.Outcome.Kind.REMOTE_SHUTDOWN: lambda *args: _Termination( |
+ True, face.Abortion(face.Abortion.Kind.REMOTE_SHUTDOWN, *args), |
+ face.RemoteShutdownError(*args)), |
+ base.Outcome.Kind.RECEPTION_FAILURE: lambda *args: _Termination( |
+ True, face.Abortion(face.Abortion.Kind.NETWORK_FAILURE, *args), |
+ face.NetworkError(*args)), |
+ base.Outcome.Kind.TRANSMISSION_FAILURE: lambda *args: _Termination( |
+ True, face.Abortion(face.Abortion.Kind.NETWORK_FAILURE, *args), |
+ face.NetworkError(*args)), |
+ base.Outcome.Kind.LOCAL_FAILURE: lambda *args: _Termination( |
+ True, face.Abortion(face.Abortion.Kind.LOCAL_FAILURE, *args), |
+ face.LocalError(*args)), |
+ base.Outcome.Kind.REMOTE_FAILURE: lambda *args: _Termination( |
+ True, face.Abortion(face.Abortion.Kind.REMOTE_FAILURE, *args), |
+ face.RemoteError(*args)), |
+} |
+ |
+ |
+def _wait_once_until(condition, until): |
+ if until is None: |
+ condition.wait() |
+ else: |
+ remaining = until - time.time() |
+ if remaining < 0: |
+ raise future.TimeoutError() |
+ else: |
+ condition.wait(timeout=remaining) |
+ |
+ |
+def _done_callback_as_operation_termination_callback( |
+ done_callback, rendezvous): |
+ def operation_termination_callback(operation_outcome): |
+ rendezvous.set_outcome(operation_outcome) |
+ done_callback(rendezvous) |
+ return operation_termination_callback |
+ |
+ |
+def _abortion_callback_as_operation_termination_callback( |
+ rpc_abortion_callback, rendezvous_set_outcome): |
+ def operation_termination_callback(operation_outcome): |
+ termination = rendezvous_set_outcome(operation_outcome) |
+ if termination.abortion is not None: |
+ rpc_abortion_callback(termination.abortion) |
+ return operation_termination_callback |
+ |
+ |
+class Rendezvous(base.Operator, future.Future, stream.Consumer, face.Call): |
+ """A rendez-vous for the threads of an operation. |
+ |
+ Instances of this object present iterator and stream.Consumer interfaces for |
+ interacting with application code and present a base.Operator interface and |
+ maintain a base.Operator internally for interacting with base interface code. |
+ """ |
+ |
+ def __init__(self, operator, operation_context): |
+ self._condition = threading.Condition() |
+ |
+ self._operator = operator |
+ self._operation_context = operation_context |
+ |
+ self._protocol_context = _NOT_YET_ARRIVED |
+ |
+ self._up_initial_metadata = _NOT_YET_ARRIVED |
+ self._up_payload = None |
+ self._up_allowance = 1 |
+ self._up_completion = _NOT_YET_ARRIVED |
+ self._down_initial_metadata = _NOT_YET_SEEN |
+ self._down_payload = None |
+ self._down_allowance = 1 |
+ self._down_terminal_metadata = _NOT_YET_SEEN |
+ self._down_code = _NOT_YET_SEEN |
+ self._down_details = _NOT_YET_SEEN |
+ |
+ self._termination = _NOT_TERMINATED |
+ |
+ # The semantics of future.Future.cancel and future.Future.cancelled are |
+ # slightly wonky, so they have to be tracked separately from the rest of the |
+ # result of the RPC. This field tracks whether cancellation was requested |
+ # prior to termination of the RPC |
+ self._cancelled = False |
+ |
+ def set_operator_and_context(self, operator, operation_context): |
+ with self._condition: |
+ self._operator = operator |
+ self._operation_context = operation_context |
+ |
+ def _down_completion(self): |
+ if self._down_terminal_metadata.kind is _Transitory.Kind.NOT_YET_SEEN: |
+ terminal_metadata = None |
+ self._down_terminal_metadata = _GONE |
+ elif self._down_terminal_metadata.kind is _Transitory.Kind.PRESENT: |
+ terminal_metadata = self._down_terminal_metadata.value |
+ self._down_terminal_metadata = _GONE |
+ else: |
+ terminal_metadata = None |
+ if self._down_code.kind is _Transitory.Kind.NOT_YET_SEEN: |
+ code = None |
+ self._down_code = _GONE |
+ elif self._down_code.kind is _Transitory.Kind.PRESENT: |
+ code = self._down_code.value |
+ self._down_code = _GONE |
+ else: |
+ code = None |
+ if self._down_details.kind is _Transitory.Kind.NOT_YET_SEEN: |
+ details = None |
+ self._down_details = _GONE |
+ elif self._down_details.kind is _Transitory.Kind.PRESENT: |
+ details = self._down_details.value |
+ self._down_details = _GONE |
+ else: |
+ details = None |
+ return utilities.completion(terminal_metadata, code, details) |
+ |
+ def _set_outcome(self, outcome): |
+ if not self._termination.terminated: |
+ self._operator = _DUMMY_OPERATOR |
+ self._operation_context = None |
+ self._down_initial_metadata = _GONE |
+ self._down_payload = None |
+ self._down_terminal_metadata = _GONE |
+ self._down_code = _GONE |
+ self._down_details = _GONE |
+ |
+ if self._up_initial_metadata.kind is _Awaited.Kind.NOT_YET_ARRIVED: |
+ initial_metadata = None |
+ else: |
+ initial_metadata = self._up_initial_metadata.value |
+ if self._up_completion.kind is _Awaited.Kind.NOT_YET_ARRIVED: |
+ terminal_metadata = None |
+ else: |
+ terminal_metadata = self._up_completion.value.terminal_metadata |
+ if outcome.kind is base.Outcome.Kind.COMPLETED: |
+ code = self._up_completion.value.code |
+ details = self._up_completion.value.message |
+ else: |
+ code = outcome.code |
+ details = outcome.details |
+ self._termination = _OPERATION_OUTCOME_KIND_TO_TERMINATION_CONSTRUCTOR[ |
+ outcome.kind](initial_metadata, terminal_metadata, code, details) |
+ |
+ self._condition.notify_all() |
+ |
+ return self._termination |
+ |
+ def advance( |
+ self, initial_metadata=None, payload=None, completion=None, |
+ allowance=None): |
+ with self._condition: |
+ if initial_metadata is not None: |
+ self._up_initial_metadata = _Awaited( |
+ _Awaited.Kind.ARRIVED, initial_metadata) |
+ if payload is not None: |
+ if self._up_initial_metadata.kind is _Awaited.Kind.NOT_YET_ARRIVED: |
+ self._up_initial_metadata = _ARRIVED_AND_NONE |
+ self._up_payload = payload |
+ self._up_allowance -= 1 |
+ if completion is not None: |
+ if self._up_initial_metadata.kind is _Awaited.Kind.NOT_YET_ARRIVED: |
+ self._up_initial_metadata = _ARRIVED_AND_NONE |
+ self._up_completion = _Awaited( |
+ _Awaited.Kind.ARRIVED, completion) |
+ if allowance is not None: |
+ if self._down_payload is not None: |
+ self._operator.advance(payload=self._down_payload) |
+ self._down_payload = None |
+ self._down_allowance += allowance - 1 |
+ else: |
+ self._down_allowance += allowance |
+ self._condition.notify_all() |
+ |
+ def cancel(self): |
+ with self._condition: |
+ if self._operation_context is not None: |
+ self._operation_context.cancel() |
+ self._cancelled = True |
+ return False |
+ |
+ def cancelled(self): |
+ with self._condition: |
+ return self._cancelled |
+ |
+ def running(self): |
+ with self._condition: |
+ return not self._termination.terminated |
+ |
+ def done(self): |
+ with self._condition: |
+ return self._termination.terminated |
+ |
+ def result(self, timeout=None): |
+ until = None if timeout is None else time.time() + timeout |
+ with self._condition: |
+ while True: |
+ if self._termination.terminated: |
+ if self._termination.abortion is None: |
+ return self._up_payload |
+ elif self._termination.abortion.kind is face.Abortion.Kind.CANCELLED: |
+ raise future.CancelledError() |
+ else: |
+ raise self._termination.abortion_error # pylint: disable=raising-bad-type |
+ else: |
+ _wait_once_until(self._condition, until) |
+ |
+ def exception(self, timeout=None): |
+ until = None if timeout is None else time.time() + timeout |
+ with self._condition: |
+ while True: |
+ if self._termination.terminated: |
+ if self._termination.abortion is None: |
+ return None |
+ else: |
+ return self._termination.abortion_error |
+ else: |
+ _wait_once_until(self._condition, until) |
+ |
+ def traceback(self, timeout=None): |
+ until = None if timeout is None else time.time() + timeout |
+ with self._condition: |
+ while True: |
+ if self._termination.terminated: |
+ if self._termination.abortion_error is None: |
+ return None |
+ else: |
+ abortion_error = self._termination.abortion_error |
+ break |
+ else: |
+ _wait_once_until(self._condition, until) |
+ |
+ try: |
+ raise abortion_error |
+ except face.AbortionError: |
+ return sys.exc_info()[2] |
+ |
+ def add_done_callback(self, fn): |
+ with self._condition: |
+ if self._operation_context is not None: |
+ outcome = self._operation_context.add_termination_callback( |
+ _done_callback_as_operation_termination_callback(fn, self)) |
+ if outcome is None: |
+ return |
+ else: |
+ self._set_outcome(outcome) |
+ |
+ fn(self) |
+ |
+ def consume(self, value): |
+ with self._condition: |
+ while True: |
+ if self._termination.terminated: |
+ return |
+ elif 0 < self._down_allowance: |
+ self._operator.advance(payload=value) |
+ self._down_allowance -= 1 |
+ return |
+ else: |
+ self._condition.wait() |
+ |
+ def terminate(self): |
+ with self._condition: |
+ if self._termination.terminated: |
+ return |
+ elif self._down_code.kind is _Transitory.Kind.GONE: |
+ # Conform to specified idempotence of terminate by ignoring extra calls. |
+ return |
+ else: |
+ completion = self._down_completion() |
+ self._operator.advance(completion=completion) |
+ |
+ def consume_and_terminate(self, value): |
+ with self._condition: |
+ while True: |
+ if self._termination.terminated: |
+ return |
+ elif 0 < self._down_allowance: |
+ completion = self._down_completion() |
+ self._operator.advance(payload=value, completion=completion) |
+ return |
+ else: |
+ self._condition.wait() |
+ |
+ def __iter__(self): |
+ return self |
+ |
+ def next(self): |
+ with self._condition: |
+ while True: |
+ if self._termination.abortion_error is not None: |
+ raise self._termination.abortion_error |
+ elif self._up_payload is not None: |
+ payload = self._up_payload |
+ self._up_payload = None |
+ if self._up_completion.kind is _Awaited.Kind.NOT_YET_ARRIVED: |
+ self._operator.advance(allowance=1) |
+ return payload |
+ elif self._up_completion.kind is _Awaited.Kind.ARRIVED: |
+ raise StopIteration() |
+ else: |
+ self._condition.wait() |
+ |
+ def is_active(self): |
+ with self._condition: |
+ return not self._termination.terminated |
+ |
+ def time_remaining(self): |
+ if self._operation_context is None: |
+ return 0 |
+ else: |
+ return self._operation_context.time_remaining() |
+ |
+ def add_abortion_callback(self, abortion_callback): |
+ with self._condition: |
+ if self._operation_context is None: |
+ return self._termination.abortion |
+ else: |
+ outcome = self._operation_context.add_termination_callback( |
+ _abortion_callback_as_operation_termination_callback( |
+ abortion_callback, self.set_outcome)) |
+ if outcome is not None: |
+ return self._set_outcome(outcome).abortion |
+ else: |
+ return self._termination.abortion |
+ |
+ def protocol_context(self): |
+ with self._condition: |
+ while True: |
+ if self._protocol_context.kind is _Awaited.Kind.ARRIVED: |
+ return self._protocol_context.value |
+ elif self._termination.abortion_error is not None: |
+ raise self._termination.abortion_error |
+ else: |
+ self._condition.wait() |
+ |
+ def initial_metadata(self): |
+ with self._condition: |
+ while True: |
+ if self._up_initial_metadata.kind is _Awaited.Kind.ARRIVED: |
+ return self._up_initial_metadata.value |
+ elif self._termination.terminated: |
+ return None |
+ else: |
+ self._condition.wait() |
+ |
+ def terminal_metadata(self): |
+ with self._condition: |
+ while True: |
+ if self._up_completion.kind is _Awaited.Kind.ARRIVED: |
+ return self._up_completion.value.terminal_metadata |
+ elif self._termination.terminated: |
+ return None |
+ else: |
+ self._condition.wait() |
+ |
+ def code(self): |
+ with self._condition: |
+ while True: |
+ if self._up_completion.kind is _Awaited.Kind.ARRIVED: |
+ return self._up_completion.value.code |
+ elif self._termination.terminated: |
+ return None |
+ else: |
+ self._condition.wait() |
+ |
+ def details(self): |
+ with self._condition: |
+ while True: |
+ if self._up_completion.kind is _Awaited.Kind.ARRIVED: |
+ return self._up_completion.value.message |
+ elif self._termination.terminated: |
+ return None |
+ else: |
+ self._condition.wait() |
+ |
+ def set_initial_metadata(self, initial_metadata): |
+ with self._condition: |
+ if (self._down_initial_metadata.kind is not |
+ _Transitory.Kind.NOT_YET_SEEN): |
+ raise ValueError(_CANNOT_SET_INITIAL_METADATA) |
+ else: |
+ self._down_initial_metadata = _GONE |
+ self._operator.advance(initial_metadata=initial_metadata) |
+ |
+ def set_terminal_metadata(self, terminal_metadata): |
+ with self._condition: |
+ if (self._down_terminal_metadata.kind is not |
+ _Transitory.Kind.NOT_YET_SEEN): |
+ raise ValueError(_CANNOT_SET_TERMINAL_METADATA) |
+ else: |
+ self._down_terminal_metadata = _Transitory( |
+ _Transitory.Kind.PRESENT, terminal_metadata) |
+ |
+ def set_code(self, code): |
+ with self._condition: |
+ if self._down_code.kind is not _Transitory.Kind.NOT_YET_SEEN: |
+ raise ValueError(_CANNOT_SET_CODE) |
+ else: |
+ self._down_code = _Transitory(_Transitory.Kind.PRESENT, code) |
+ |
+ def set_details(self, details): |
+ with self._condition: |
+ if self._down_details.kind is not _Transitory.Kind.NOT_YET_SEEN: |
+ raise ValueError(_CANNOT_SET_DETAILS) |
+ else: |
+ self._down_details = _Transitory(_Transitory.Kind.PRESENT, details) |
+ |
+ def set_protocol_context(self, protocol_context): |
+ with self._condition: |
+ self._protocol_context = _Awaited( |
+ _Awaited.Kind.ARRIVED, protocol_context) |
+ self._condition.notify_all() |
+ |
+ def set_outcome(self, outcome): |
+ with self._condition: |
+ return self._set_outcome(outcome) |
+ |
+ |
+class _ProtocolReceiver(base.ProtocolReceiver): |
+ |
+ def __init__(self, rendezvous): |
+ self._rendezvous = rendezvous |
+ |
+ def context(self, protocol_context): |
+ self._rendezvous.set_protocol_context(protocol_context) |
+ |
+ |
+def protocol_receiver(rendezvous): |
+ return _ProtocolReceiver(rendezvous) |
+ |
+ |
+def pool_wrap(behavior, operation_context): |
+ """Wraps an operation-related behavior so that it may be called in a pool. |
+ |
+ Args: |
+ behavior: A callable related to carrying out an operation. |
+ operation_context: A base_interfaces.OperationContext for the operation. |
+ |
+ Returns: |
+ A callable that when called carries out the behavior of the given callable |
+ and handles whatever exceptions it raises appropriately. |
+ """ |
+ def translation(*args): |
+ try: |
+ behavior(*args) |
+ except ( |
+ abandonment.Abandoned, |
+ face.CancellationError, |
+ face.ExpirationError, |
+ face.LocalShutdownError, |
+ face.RemoteShutdownError, |
+ face.NetworkError, |
+ face.RemoteError, |
+ ) as e: |
+ if operation_context.outcome() is None: |
+ operation_context.fail(e) |
+ except Exception as e: |
+ operation_context.fail(e) |
+ return callable_util.with_exceptions_logged( |
+ translation, _INTERNAL_ERROR_LOG_MESSAGE) |