Index: third_party/grpc/src/python/grpcio/grpc/framework/core/_protocol.py |
diff --git a/third_party/grpc/src/python/grpcio/grpc/framework/core/_protocol.py b/third_party/grpc/src/python/grpcio/grpc/framework/core/_protocol.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..3177b5e302a1e4fdc2fa2a2f10c4eb3c0ef3f035 |
--- /dev/null |
+++ b/third_party/grpc/src/python/grpcio/grpc/framework/core/_protocol.py |
@@ -0,0 +1,176 @@ |
+# Copyright 2015, Google Inc. |
+# All rights reserved. |
+# |
+# Redistribution and use in source and binary forms, with or without |
+# modification, are permitted provided that the following conditions are |
+# met: |
+# |
+# * Redistributions of source code must retain the above copyright |
+# notice, this list of conditions and the following disclaimer. |
+# * Redistributions in binary form must reproduce the above |
+# copyright notice, this list of conditions and the following disclaimer |
+# in the documentation and/or other materials provided with the |
+# distribution. |
+# * Neither the name of Google Inc. nor the names of its |
+# contributors may be used to endorse or promote products derived from |
+# this software without specific prior written permission. |
+# |
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
+ |
+"""State and behavior for passing protocol objects in an operation.""" |
+ |
+import collections |
+import enum |
+ |
+from grpc.framework.core import _constants |
+from grpc.framework.core import _interfaces |
+from grpc.framework.core import _utilities |
+from grpc.framework.foundation import callable_util |
+from grpc.framework.interfaces.base import base |
+ |
+_EXCEPTION_LOG_MESSAGE = 'Exception delivering protocol object!' |
+ |
+_LOCAL_FAILURE_OUTCOME = _utilities.Outcome( |
+ base.Outcome.Kind.LOCAL_FAILURE, None, None) |
+ |
+ |
+class _Awaited( |
+ collections.namedtuple('_Awaited', ('kind', 'value',))): |
+ |
+ @enum.unique |
+ class Kind(enum.Enum): |
+ NOT_YET_ARRIVED = 'not yet arrived' |
+ ARRIVED = 'arrived' |
+ |
+_NOT_YET_ARRIVED = _Awaited(_Awaited.Kind.NOT_YET_ARRIVED, None) |
+_ARRIVED_AND_NONE = _Awaited(_Awaited.Kind.ARRIVED, None) |
+ |
+ |
+class _Transitory( |
+ collections.namedtuple('_Transitory', ('kind', 'value',))): |
+ |
+ @enum.unique |
+ class Kind(enum.Enum): |
+ NOT_YET_SEEN = 'not yet seen' |
+ PRESENT = 'present' |
+ GONE = 'gone' |
+ |
+_NOT_YET_SEEN = _Transitory(_Transitory.Kind.NOT_YET_SEEN, None) |
+_GONE = _Transitory(_Transitory.Kind.GONE, None) |
+ |
+ |
+class _ProtocolManager(_interfaces.ProtocolManager): |
+ """An implementation of _interfaces.ExpirationManager.""" |
+ |
+ def __init__( |
+ self, protocol_receiver, lock, pool, termination_manager, |
+ transmission_manager, expiration_manager): |
+ """Constructor. |
+ |
+ Args: |
+ protocol_receiver: An _Awaited wrapping of the base.ProtocolReceiver to |
+ which protocol objects should be passed during the operation. May be |
+ of kind _Awaited.Kind.NOT_YET_ARRIVED if the customer's subscription is |
+ not yet known and may be of kind _Awaited.Kind.ARRIVED but with a value |
+ of None if the customer's subscription did not include a |
+ ProtocolReceiver. |
+ lock: The operation-wide lock. |
+ pool: A thread pool. |
+ termination_manager: The _interfaces.TerminationManager for the operation. |
+ transmission_manager: The _interfaces.TransmissionManager for the |
+ operation. |
+ expiration_manager: The _interfaces.ExpirationManager for the operation. |
+ """ |
+ self._lock = lock |
+ self._pool = pool |
+ self._termination_manager = termination_manager |
+ self._transmission_manager = transmission_manager |
+ self._expiration_manager = expiration_manager |
+ |
+ self._protocol_receiver = protocol_receiver |
+ self._context = _NOT_YET_SEEN |
+ |
+ def _abort_and_notify(self, outcome): |
+ if self._termination_manager.outcome is None: |
+ self._termination_manager.abort(outcome) |
+ self._transmission_manager.abort(outcome) |
+ self._expiration_manager.terminate() |
+ |
+ def _deliver(self, behavior, value): |
+ def deliver(): |
+ delivery_outcome = callable_util.call_logging_exceptions( |
+ behavior, _EXCEPTION_LOG_MESSAGE, value) |
+ if delivery_outcome.kind is callable_util.Outcome.Kind.RAISED: |
+ with self._lock: |
+ self._abort_and_notify(_LOCAL_FAILURE_OUTCOME) |
+ self._pool.submit( |
+ callable_util.with_exceptions_logged( |
+ deliver, _constants.INTERNAL_ERROR_LOG_MESSAGE)) |
+ |
+ def set_protocol_receiver(self, protocol_receiver): |
+ """See _interfaces.ProtocolManager.set_protocol_receiver for spec.""" |
+ self._protocol_receiver = _Awaited(_Awaited.Kind.ARRIVED, protocol_receiver) |
+ if (self._context.kind is _Transitory.Kind.PRESENT and |
+ protocol_receiver is not None): |
+ self._deliver(protocol_receiver.context, self._context.value) |
+ self._context = _GONE |
+ |
+ def accept_protocol_context(self, protocol_context): |
+ """See _interfaces.ProtocolManager.accept_protocol_context for spec.""" |
+ if self._protocol_receiver.kind is _Awaited.Kind.ARRIVED: |
+ if self._protocol_receiver.value is not None: |
+ self._deliver(self._protocol_receiver.value.context, protocol_context) |
+ self._context = _GONE |
+ else: |
+ self._context = _Transitory(_Transitory.Kind.PRESENT, protocol_context) |
+ |
+ |
+def invocation_protocol_manager( |
+ subscription, lock, pool, termination_manager, transmission_manager, |
+ expiration_manager): |
+ """Creates an _interfaces.ProtocolManager for invocation-side use. |
+ |
+ Args: |
+ subscription: The local customer's subscription to the operation. |
+ lock: The operation-wide lock. |
+ pool: A thread pool. |
+ termination_manager: The _interfaces.TerminationManager for the operation. |
+ transmission_manager: The _interfaces.TransmissionManager for the |
+ operation. |
+ expiration_manager: The _interfaces.ExpirationManager for the operation. |
+ """ |
+ if subscription.kind is base.Subscription.Kind.FULL: |
+ awaited_protocol_receiver = _Awaited( |
+ _Awaited.Kind.ARRIVED, subscription.protocol_receiver) |
+ else: |
+ awaited_protocol_receiver = _ARRIVED_AND_NONE |
+ return _ProtocolManager( |
+ awaited_protocol_receiver, lock, pool, termination_manager, |
+ transmission_manager, expiration_manager) |
+ |
+ |
+def service_protocol_manager( |
+ lock, pool, termination_manager, transmission_manager, expiration_manager): |
+ """Creates an _interfaces.ProtocolManager for service-side use. |
+ |
+ Args: |
+ lock: The operation-wide lock. |
+ pool: A thread pool. |
+ termination_manager: The _interfaces.TerminationManager for the operation. |
+ transmission_manager: The _interfaces.TransmissionManager for the |
+ operation. |
+ expiration_manager: The _interfaces.ExpirationManager for the operation. |
+ """ |
+ return _ProtocolManager( |
+ _NOT_YET_ARRIVED, lock, pool, termination_manager, transmission_manager, |
+ expiration_manager) |