| Index: third_party/grpc/src/python/grpcio/grpc/framework/crust/_control.py
|
| diff --git a/third_party/grpc/src/python/grpcio/grpc/framework/crust/_control.py b/third_party/grpc/src/python/grpcio/grpc/framework/crust/_control.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..5e9efdf7322790c3f5c790907c30ed231fd5eefc
|
| --- /dev/null
|
| +++ b/third_party/grpc/src/python/grpcio/grpc/framework/crust/_control.py
|
| @@ -0,0 +1,581 @@
|
| +# Copyright 2015, Google Inc.
|
| +# All rights reserved.
|
| +#
|
| +# Redistribution and use in source and binary forms, with or without
|
| +# modification, are permitted provided that the following conditions are
|
| +# met:
|
| +#
|
| +# * Redistributions of source code must retain the above copyright
|
| +# notice, this list of conditions and the following disclaimer.
|
| +# * Redistributions in binary form must reproduce the above
|
| +# copyright notice, this list of conditions and the following disclaimer
|
| +# in the documentation and/or other materials provided with the
|
| +# distribution.
|
| +# * Neither the name of Google Inc. nor the names of its
|
| +# contributors may be used to endorse or promote products derived from
|
| +# this software without specific prior written permission.
|
| +#
|
| +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
| +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
| +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
| +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
| +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
| +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
| +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
| +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
| +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
| +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
| +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
| +
|
| +"""State and behavior for translating between sync and async control flow."""
|
| +
|
| +import collections
|
| +import enum
|
| +import sys
|
| +import threading
|
| +import time
|
| +
|
| +from grpc.framework.foundation import abandonment
|
| +from grpc.framework.foundation import callable_util
|
| +from grpc.framework.foundation import future
|
| +from grpc.framework.foundation import stream
|
| +from grpc.framework.interfaces.base import base
|
| +from grpc.framework.interfaces.base import utilities
|
| +from grpc.framework.interfaces.face import face
|
| +
|
| +_DONE_CALLBACK_LOG_MESSAGE = 'Exception calling Future "done" callback!'
|
| +_INTERNAL_ERROR_LOG_MESSAGE = ':-( RPC Framework (Crust) Internal Error! )-:'
|
| +
|
| +_CANNOT_SET_INITIAL_METADATA = (
|
| + 'Could not set initial metadata - has it already been set, or has a ' +
|
| + 'payload already been sent?')
|
| +_CANNOT_SET_TERMINAL_METADATA = (
|
| + 'Could not set terminal metadata - has it already been set, or has RPC ' +
|
| + 'completion already been indicated?')
|
| +_CANNOT_SET_CODE = (
|
| + 'Could not set code - has it already been set, or has RPC completion ' +
|
| + 'already been indicated?')
|
| +_CANNOT_SET_DETAILS = (
|
| + 'Could not set details - has it already been set, or has RPC completion ' +
|
| + 'already been indicated?')
|
| +
|
| +
|
| +class _DummyOperator(base.Operator):
|
| +
|
| + def advance(
|
| + self, initial_metadata=None, payload=None, completion=None,
|
| + allowance=None):
|
| + pass
|
| +
|
| +_DUMMY_OPERATOR = _DummyOperator()
|
| +
|
| +
|
| +class _Awaited(
|
| + collections.namedtuple('_Awaited', ('kind', 'value',))):
|
| +
|
| + @enum.unique
|
| + class Kind(enum.Enum):
|
| + NOT_YET_ARRIVED = 'not yet arrived'
|
| + ARRIVED = 'arrived'
|
| +
|
| +_NOT_YET_ARRIVED = _Awaited(_Awaited.Kind.NOT_YET_ARRIVED, None)
|
| +_ARRIVED_AND_NONE = _Awaited(_Awaited.Kind.ARRIVED, None)
|
| +
|
| +
|
| +class _Transitory(
|
| + collections.namedtuple('_Transitory', ('kind', 'value',))):
|
| +
|
| + @enum.unique
|
| + class Kind(enum.Enum):
|
| + NOT_YET_SEEN = 'not yet seen'
|
| + PRESENT = 'present'
|
| + GONE = 'gone'
|
| +
|
| +_NOT_YET_SEEN = _Transitory(_Transitory.Kind.NOT_YET_SEEN, None)
|
| +_GONE = _Transitory(_Transitory.Kind.GONE, None)
|
| +
|
| +
|
| +class _Termination(
|
| + collections.namedtuple(
|
| + '_Termination', ('terminated', 'abortion', 'abortion_error',))):
|
| + """Values indicating whether and how an RPC has terminated.
|
| +
|
| + Attributes:
|
| + terminated: A boolean indicating whether or not the RPC has terminated.
|
| + abortion: A face.Abortion value describing the RPC's abortion or None if the
|
| + RPC did not abort.
|
| + abortion_error: A face.AbortionError describing the RPC's abortion or None
|
| + if the RPC did not abort.
|
| + """
|
| +
|
| +_NOT_TERMINATED = _Termination(False, None, None)
|
| +
|
| +_OPERATION_OUTCOME_KIND_TO_TERMINATION_CONSTRUCTOR = {
|
| + base.Outcome.Kind.COMPLETED: lambda *unused_args: _Termination(
|
| + True, None, None),
|
| + base.Outcome.Kind.CANCELLED: lambda *args: _Termination(
|
| + True, face.Abortion(face.Abortion.Kind.CANCELLED, *args),
|
| + face.CancellationError(*args)),
|
| + base.Outcome.Kind.EXPIRED: lambda *args: _Termination(
|
| + True, face.Abortion(face.Abortion.Kind.EXPIRED, *args),
|
| + face.ExpirationError(*args)),
|
| + base.Outcome.Kind.LOCAL_SHUTDOWN: lambda *args: _Termination(
|
| + True, face.Abortion(face.Abortion.Kind.LOCAL_SHUTDOWN, *args),
|
| + face.LocalShutdownError(*args)),
|
| + base.Outcome.Kind.REMOTE_SHUTDOWN: lambda *args: _Termination(
|
| + True, face.Abortion(face.Abortion.Kind.REMOTE_SHUTDOWN, *args),
|
| + face.RemoteShutdownError(*args)),
|
| + base.Outcome.Kind.RECEPTION_FAILURE: lambda *args: _Termination(
|
| + True, face.Abortion(face.Abortion.Kind.NETWORK_FAILURE, *args),
|
| + face.NetworkError(*args)),
|
| + base.Outcome.Kind.TRANSMISSION_FAILURE: lambda *args: _Termination(
|
| + True, face.Abortion(face.Abortion.Kind.NETWORK_FAILURE, *args),
|
| + face.NetworkError(*args)),
|
| + base.Outcome.Kind.LOCAL_FAILURE: lambda *args: _Termination(
|
| + True, face.Abortion(face.Abortion.Kind.LOCAL_FAILURE, *args),
|
| + face.LocalError(*args)),
|
| + base.Outcome.Kind.REMOTE_FAILURE: lambda *args: _Termination(
|
| + True, face.Abortion(face.Abortion.Kind.REMOTE_FAILURE, *args),
|
| + face.RemoteError(*args)),
|
| +}
|
| +
|
| +
|
| +def _wait_once_until(condition, until):
|
| + if until is None:
|
| + condition.wait()
|
| + else:
|
| + remaining = until - time.time()
|
| + if remaining < 0:
|
| + raise future.TimeoutError()
|
| + else:
|
| + condition.wait(timeout=remaining)
|
| +
|
| +
|
| +def _done_callback_as_operation_termination_callback(
|
| + done_callback, rendezvous):
|
| + def operation_termination_callback(operation_outcome):
|
| + rendezvous.set_outcome(operation_outcome)
|
| + done_callback(rendezvous)
|
| + return operation_termination_callback
|
| +
|
| +
|
| +def _abortion_callback_as_operation_termination_callback(
|
| + rpc_abortion_callback, rendezvous_set_outcome):
|
| + def operation_termination_callback(operation_outcome):
|
| + termination = rendezvous_set_outcome(operation_outcome)
|
| + if termination.abortion is not None:
|
| + rpc_abortion_callback(termination.abortion)
|
| + return operation_termination_callback
|
| +
|
| +
|
| +class Rendezvous(base.Operator, future.Future, stream.Consumer, face.Call):
|
| + """A rendez-vous for the threads of an operation.
|
| +
|
| + Instances of this object present iterator and stream.Consumer interfaces for
|
| + interacting with application code and present a base.Operator interface and
|
| + maintain a base.Operator internally for interacting with base interface code.
|
| + """
|
| +
|
| + def __init__(self, operator, operation_context):
|
| + self._condition = threading.Condition()
|
| +
|
| + self._operator = operator
|
| + self._operation_context = operation_context
|
| +
|
| + self._protocol_context = _NOT_YET_ARRIVED
|
| +
|
| + self._up_initial_metadata = _NOT_YET_ARRIVED
|
| + self._up_payload = None
|
| + self._up_allowance = 1
|
| + self._up_completion = _NOT_YET_ARRIVED
|
| + self._down_initial_metadata = _NOT_YET_SEEN
|
| + self._down_payload = None
|
| + self._down_allowance = 1
|
| + self._down_terminal_metadata = _NOT_YET_SEEN
|
| + self._down_code = _NOT_YET_SEEN
|
| + self._down_details = _NOT_YET_SEEN
|
| +
|
| + self._termination = _NOT_TERMINATED
|
| +
|
| + # The semantics of future.Future.cancel and future.Future.cancelled are
|
| + # slightly wonky, so they have to be tracked separately from the rest of the
|
| + # result of the RPC. This field tracks whether cancellation was requested
|
| + # prior to termination of the RPC
|
| + self._cancelled = False
|
| +
|
| + def set_operator_and_context(self, operator, operation_context):
|
| + with self._condition:
|
| + self._operator = operator
|
| + self._operation_context = operation_context
|
| +
|
| + def _down_completion(self):
|
| + if self._down_terminal_metadata.kind is _Transitory.Kind.NOT_YET_SEEN:
|
| + terminal_metadata = None
|
| + self._down_terminal_metadata = _GONE
|
| + elif self._down_terminal_metadata.kind is _Transitory.Kind.PRESENT:
|
| + terminal_metadata = self._down_terminal_metadata.value
|
| + self._down_terminal_metadata = _GONE
|
| + else:
|
| + terminal_metadata = None
|
| + if self._down_code.kind is _Transitory.Kind.NOT_YET_SEEN:
|
| + code = None
|
| + self._down_code = _GONE
|
| + elif self._down_code.kind is _Transitory.Kind.PRESENT:
|
| + code = self._down_code.value
|
| + self._down_code = _GONE
|
| + else:
|
| + code = None
|
| + if self._down_details.kind is _Transitory.Kind.NOT_YET_SEEN:
|
| + details = None
|
| + self._down_details = _GONE
|
| + elif self._down_details.kind is _Transitory.Kind.PRESENT:
|
| + details = self._down_details.value
|
| + self._down_details = _GONE
|
| + else:
|
| + details = None
|
| + return utilities.completion(terminal_metadata, code, details)
|
| +
|
| + def _set_outcome(self, outcome):
|
| + if not self._termination.terminated:
|
| + self._operator = _DUMMY_OPERATOR
|
| + self._operation_context = None
|
| + self._down_initial_metadata = _GONE
|
| + self._down_payload = None
|
| + self._down_terminal_metadata = _GONE
|
| + self._down_code = _GONE
|
| + self._down_details = _GONE
|
| +
|
| + if self._up_initial_metadata.kind is _Awaited.Kind.NOT_YET_ARRIVED:
|
| + initial_metadata = None
|
| + else:
|
| + initial_metadata = self._up_initial_metadata.value
|
| + if self._up_completion.kind is _Awaited.Kind.NOT_YET_ARRIVED:
|
| + terminal_metadata = None
|
| + else:
|
| + terminal_metadata = self._up_completion.value.terminal_metadata
|
| + if outcome.kind is base.Outcome.Kind.COMPLETED:
|
| + code = self._up_completion.value.code
|
| + details = self._up_completion.value.message
|
| + else:
|
| + code = outcome.code
|
| + details = outcome.details
|
| + self._termination = _OPERATION_OUTCOME_KIND_TO_TERMINATION_CONSTRUCTOR[
|
| + outcome.kind](initial_metadata, terminal_metadata, code, details)
|
| +
|
| + self._condition.notify_all()
|
| +
|
| + return self._termination
|
| +
|
| + def advance(
|
| + self, initial_metadata=None, payload=None, completion=None,
|
| + allowance=None):
|
| + with self._condition:
|
| + if initial_metadata is not None:
|
| + self._up_initial_metadata = _Awaited(
|
| + _Awaited.Kind.ARRIVED, initial_metadata)
|
| + if payload is not None:
|
| + if self._up_initial_metadata.kind is _Awaited.Kind.NOT_YET_ARRIVED:
|
| + self._up_initial_metadata = _ARRIVED_AND_NONE
|
| + self._up_payload = payload
|
| + self._up_allowance -= 1
|
| + if completion is not None:
|
| + if self._up_initial_metadata.kind is _Awaited.Kind.NOT_YET_ARRIVED:
|
| + self._up_initial_metadata = _ARRIVED_AND_NONE
|
| + self._up_completion = _Awaited(
|
| + _Awaited.Kind.ARRIVED, completion)
|
| + if allowance is not None:
|
| + if self._down_payload is not None:
|
| + self._operator.advance(payload=self._down_payload)
|
| + self._down_payload = None
|
| + self._down_allowance += allowance - 1
|
| + else:
|
| + self._down_allowance += allowance
|
| + self._condition.notify_all()
|
| +
|
| + def cancel(self):
|
| + with self._condition:
|
| + if self._operation_context is not None:
|
| + self._operation_context.cancel()
|
| + self._cancelled = True
|
| + return False
|
| +
|
| + def cancelled(self):
|
| + with self._condition:
|
| + return self._cancelled
|
| +
|
| + def running(self):
|
| + with self._condition:
|
| + return not self._termination.terminated
|
| +
|
| + def done(self):
|
| + with self._condition:
|
| + return self._termination.terminated
|
| +
|
| + def result(self, timeout=None):
|
| + until = None if timeout is None else time.time() + timeout
|
| + with self._condition:
|
| + while True:
|
| + if self._termination.terminated:
|
| + if self._termination.abortion is None:
|
| + return self._up_payload
|
| + elif self._termination.abortion.kind is face.Abortion.Kind.CANCELLED:
|
| + raise future.CancelledError()
|
| + else:
|
| + raise self._termination.abortion_error # pylint: disable=raising-bad-type
|
| + else:
|
| + _wait_once_until(self._condition, until)
|
| +
|
| + def exception(self, timeout=None):
|
| + until = None if timeout is None else time.time() + timeout
|
| + with self._condition:
|
| + while True:
|
| + if self._termination.terminated:
|
| + if self._termination.abortion is None:
|
| + return None
|
| + else:
|
| + return self._termination.abortion_error
|
| + else:
|
| + _wait_once_until(self._condition, until)
|
| +
|
| + def traceback(self, timeout=None):
|
| + until = None if timeout is None else time.time() + timeout
|
| + with self._condition:
|
| + while True:
|
| + if self._termination.terminated:
|
| + if self._termination.abortion_error is None:
|
| + return None
|
| + else:
|
| + abortion_error = self._termination.abortion_error
|
| + break
|
| + else:
|
| + _wait_once_until(self._condition, until)
|
| +
|
| + try:
|
| + raise abortion_error
|
| + except face.AbortionError:
|
| + return sys.exc_info()[2]
|
| +
|
| + def add_done_callback(self, fn):
|
| + with self._condition:
|
| + if self._operation_context is not None:
|
| + outcome = self._operation_context.add_termination_callback(
|
| + _done_callback_as_operation_termination_callback(fn, self))
|
| + if outcome is None:
|
| + return
|
| + else:
|
| + self._set_outcome(outcome)
|
| +
|
| + fn(self)
|
| +
|
| + def consume(self, value):
|
| + with self._condition:
|
| + while True:
|
| + if self._termination.terminated:
|
| + return
|
| + elif 0 < self._down_allowance:
|
| + self._operator.advance(payload=value)
|
| + self._down_allowance -= 1
|
| + return
|
| + else:
|
| + self._condition.wait()
|
| +
|
| + def terminate(self):
|
| + with self._condition:
|
| + if self._termination.terminated:
|
| + return
|
| + elif self._down_code.kind is _Transitory.Kind.GONE:
|
| + # Conform to specified idempotence of terminate by ignoring extra calls.
|
| + return
|
| + else:
|
| + completion = self._down_completion()
|
| + self._operator.advance(completion=completion)
|
| +
|
| + def consume_and_terminate(self, value):
|
| + with self._condition:
|
| + while True:
|
| + if self._termination.terminated:
|
| + return
|
| + elif 0 < self._down_allowance:
|
| + completion = self._down_completion()
|
| + self._operator.advance(payload=value, completion=completion)
|
| + return
|
| + else:
|
| + self._condition.wait()
|
| +
|
| + def __iter__(self):
|
| + return self
|
| +
|
| + def next(self):
|
| + with self._condition:
|
| + while True:
|
| + if self._termination.abortion_error is not None:
|
| + raise self._termination.abortion_error
|
| + elif self._up_payload is not None:
|
| + payload = self._up_payload
|
| + self._up_payload = None
|
| + if self._up_completion.kind is _Awaited.Kind.NOT_YET_ARRIVED:
|
| + self._operator.advance(allowance=1)
|
| + return payload
|
| + elif self._up_completion.kind is _Awaited.Kind.ARRIVED:
|
| + raise StopIteration()
|
| + else:
|
| + self._condition.wait()
|
| +
|
| + def is_active(self):
|
| + with self._condition:
|
| + return not self._termination.terminated
|
| +
|
| + def time_remaining(self):
|
| + if self._operation_context is None:
|
| + return 0
|
| + else:
|
| + return self._operation_context.time_remaining()
|
| +
|
| + def add_abortion_callback(self, abortion_callback):
|
| + with self._condition:
|
| + if self._operation_context is None:
|
| + return self._termination.abortion
|
| + else:
|
| + outcome = self._operation_context.add_termination_callback(
|
| + _abortion_callback_as_operation_termination_callback(
|
| + abortion_callback, self.set_outcome))
|
| + if outcome is not None:
|
| + return self._set_outcome(outcome).abortion
|
| + else:
|
| + return self._termination.abortion
|
| +
|
| + def protocol_context(self):
|
| + with self._condition:
|
| + while True:
|
| + if self._protocol_context.kind is _Awaited.Kind.ARRIVED:
|
| + return self._protocol_context.value
|
| + elif self._termination.abortion_error is not None:
|
| + raise self._termination.abortion_error
|
| + else:
|
| + self._condition.wait()
|
| +
|
| + def initial_metadata(self):
|
| + with self._condition:
|
| + while True:
|
| + if self._up_initial_metadata.kind is _Awaited.Kind.ARRIVED:
|
| + return self._up_initial_metadata.value
|
| + elif self._termination.terminated:
|
| + return None
|
| + else:
|
| + self._condition.wait()
|
| +
|
| + def terminal_metadata(self):
|
| + with self._condition:
|
| + while True:
|
| + if self._up_completion.kind is _Awaited.Kind.ARRIVED:
|
| + return self._up_completion.value.terminal_metadata
|
| + elif self._termination.terminated:
|
| + return None
|
| + else:
|
| + self._condition.wait()
|
| +
|
| + def code(self):
|
| + with self._condition:
|
| + while True:
|
| + if self._up_completion.kind is _Awaited.Kind.ARRIVED:
|
| + return self._up_completion.value.code
|
| + elif self._termination.terminated:
|
| + return None
|
| + else:
|
| + self._condition.wait()
|
| +
|
| + def details(self):
|
| + with self._condition:
|
| + while True:
|
| + if self._up_completion.kind is _Awaited.Kind.ARRIVED:
|
| + return self._up_completion.value.message
|
| + elif self._termination.terminated:
|
| + return None
|
| + else:
|
| + self._condition.wait()
|
| +
|
| + def set_initial_metadata(self, initial_metadata):
|
| + with self._condition:
|
| + if (self._down_initial_metadata.kind is not
|
| + _Transitory.Kind.NOT_YET_SEEN):
|
| + raise ValueError(_CANNOT_SET_INITIAL_METADATA)
|
| + else:
|
| + self._down_initial_metadata = _GONE
|
| + self._operator.advance(initial_metadata=initial_metadata)
|
| +
|
| + def set_terminal_metadata(self, terminal_metadata):
|
| + with self._condition:
|
| + if (self._down_terminal_metadata.kind is not
|
| + _Transitory.Kind.NOT_YET_SEEN):
|
| + raise ValueError(_CANNOT_SET_TERMINAL_METADATA)
|
| + else:
|
| + self._down_terminal_metadata = _Transitory(
|
| + _Transitory.Kind.PRESENT, terminal_metadata)
|
| +
|
| + def set_code(self, code):
|
| + with self._condition:
|
| + if self._down_code.kind is not _Transitory.Kind.NOT_YET_SEEN:
|
| + raise ValueError(_CANNOT_SET_CODE)
|
| + else:
|
| + self._down_code = _Transitory(_Transitory.Kind.PRESENT, code)
|
| +
|
| + def set_details(self, details):
|
| + with self._condition:
|
| + if self._down_details.kind is not _Transitory.Kind.NOT_YET_SEEN:
|
| + raise ValueError(_CANNOT_SET_DETAILS)
|
| + else:
|
| + self._down_details = _Transitory(_Transitory.Kind.PRESENT, details)
|
| +
|
| + def set_protocol_context(self, protocol_context):
|
| + with self._condition:
|
| + self._protocol_context = _Awaited(
|
| + _Awaited.Kind.ARRIVED, protocol_context)
|
| + self._condition.notify_all()
|
| +
|
| + def set_outcome(self, outcome):
|
| + with self._condition:
|
| + return self._set_outcome(outcome)
|
| +
|
| +
|
| +class _ProtocolReceiver(base.ProtocolReceiver):
|
| +
|
| + def __init__(self, rendezvous):
|
| + self._rendezvous = rendezvous
|
| +
|
| + def context(self, protocol_context):
|
| + self._rendezvous.set_protocol_context(protocol_context)
|
| +
|
| +
|
| +def protocol_receiver(rendezvous):
|
| + return _ProtocolReceiver(rendezvous)
|
| +
|
| +
|
| +def pool_wrap(behavior, operation_context):
|
| + """Wraps an operation-related behavior so that it may be called in a pool.
|
| +
|
| + Args:
|
| + behavior: A callable related to carrying out an operation.
|
| + operation_context: A base_interfaces.OperationContext for the operation.
|
| +
|
| + Returns:
|
| + A callable that when called carries out the behavior of the given callable
|
| + and handles whatever exceptions it raises appropriately.
|
| + """
|
| + def translation(*args):
|
| + try:
|
| + behavior(*args)
|
| + except (
|
| + abandonment.Abandoned,
|
| + face.CancellationError,
|
| + face.ExpirationError,
|
| + face.LocalShutdownError,
|
| + face.RemoteShutdownError,
|
| + face.NetworkError,
|
| + face.RemoteError,
|
| + ) as e:
|
| + if operation_context.outcome() is None:
|
| + operation_context.fail(e)
|
| + except Exception as e:
|
| + operation_context.fail(e)
|
| + return callable_util.with_exceptions_logged(
|
| + translation, _INTERNAL_ERROR_LOG_MESSAGE)
|
|
|