Index: third_party/grpc/src/python/grpcio/grpc/framework/core/_ingestion.py |
diff --git a/third_party/grpc/src/python/grpcio/grpc/framework/core/_ingestion.py b/third_party/grpc/src/python/grpcio/grpc/framework/core/_ingestion.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..4129a8ce43e10ee8d6e4217013371faf34998348 |
--- /dev/null |
+++ b/third_party/grpc/src/python/grpcio/grpc/framework/core/_ingestion.py |
@@ -0,0 +1,438 @@ |
+# Copyright 2015, Google Inc. |
+# All rights reserved. |
+# |
+# Redistribution and use in source and binary forms, with or without |
+# modification, are permitted provided that the following conditions are |
+# met: |
+# |
+# * Redistributions of source code must retain the above copyright |
+# notice, this list of conditions and the following disclaimer. |
+# * Redistributions in binary form must reproduce the above |
+# copyright notice, this list of conditions and the following disclaimer |
+# in the documentation and/or other materials provided with the |
+# distribution. |
+# * Neither the name of Google Inc. nor the names of its |
+# contributors may be used to endorse or promote products derived from |
+# this software without specific prior written permission. |
+# |
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
+ |
+"""State and behavior for ingestion during an operation.""" |
+ |
+import abc |
+import collections |
+import enum |
+ |
+from grpc.framework.core import _constants |
+from grpc.framework.core import _interfaces |
+from grpc.framework.core import _utilities |
+from grpc.framework.foundation import abandonment |
+from grpc.framework.foundation import callable_util |
+from grpc.framework.interfaces.base import base |
+ |
+_CREATE_SUBSCRIPTION_EXCEPTION_LOG_MESSAGE = 'Exception initializing ingestion!' |
+_INGESTION_EXCEPTION_LOG_MESSAGE = 'Exception during ingestion!' |
+ |
+ |
+class _SubscriptionCreation( |
+ collections.namedtuple( |
+ '_SubscriptionCreation', |
+ ('kind', 'subscription', 'code', 'details',))): |
+ """A sum type for the outcome of ingestion initialization. |
+ |
+ Attributes: |
+ kind: A Kind value coarsely indicating how subscription creation completed. |
+ subscription: The created subscription. Only present if kind is |
+ Kind.SUBSCRIPTION. |
+ code: A code value to be sent to the other side of the operation along with |
+ an indication that the operation is being aborted due to an error on the |
+ remote side of the operation. Only present if kind is Kind.REMOTE_ERROR. |
+ details: A details value to be sent to the other side of the operation |
+ along with an indication that the operation is being aborted due to an |
+ error on the remote side of the operation. Only present if kind is |
+ Kind.REMOTE_ERROR. |
+ """ |
+ |
+ @enum.unique |
+ class Kind(enum.Enum): |
+ SUBSCRIPTION = 'subscription' |
+ REMOTE_ERROR = 'remote error' |
+ ABANDONED = 'abandoned' |
+ |
+ |
+class _SubscriptionCreator(object): |
+ """Common specification of subscription-creating behavior.""" |
+ __metaclass__ = abc.ABCMeta |
+ |
+ @abc.abstractmethod |
+ def create(self, group, method): |
+ """Creates the base.Subscription of the local customer. |
+ |
+ Any exceptions raised by this method should be attributed to and treated as |
+ defects in the customer code called by this method. |
+ |
+ Args: |
+ group: The group identifier of the operation. |
+ method: The method identifier of the operation. |
+ |
+ Returns: |
+ A _SubscriptionCreation describing the result of subscription creation. |
+ """ |
+ raise NotImplementedError() |
+ |
+ |
+class _ServiceSubscriptionCreator(_SubscriptionCreator): |
+ """A _SubscriptionCreator appropriate for service-side use.""" |
+ |
+ def __init__(self, servicer, operation_context, output_operator): |
+ """Constructor. |
+ |
+ Args: |
+ servicer: The base.Servicer that will service the operation. |
+ operation_context: A base.OperationContext for the operation to be passed |
+ to the customer. |
+ output_operator: A base.Operator for the operation to be passed to the |
+ customer and to be called by the customer to accept operation data |
+ emitted by the customer. |
+ """ |
+ self._servicer = servicer |
+ self._operation_context = operation_context |
+ self._output_operator = output_operator |
+ |
+ def create(self, group, method): |
+ try: |
+ subscription = self._servicer.service( |
+ group, method, self._operation_context, self._output_operator) |
+ except base.NoSuchMethodError as e: |
+ return _SubscriptionCreation( |
+ _SubscriptionCreation.Kind.REMOTE_ERROR, None, e.code, e.details) |
+ except abandonment.Abandoned: |
+ return _SubscriptionCreation( |
+ _SubscriptionCreation.Kind.ABANDONED, None, None, None) |
+ else: |
+ return _SubscriptionCreation( |
+ _SubscriptionCreation.Kind.SUBSCRIPTION, subscription, None, None) |
+ |
+ |
+def _wrap(behavior): |
+ def wrapped(*args, **kwargs): |
+ try: |
+ behavior(*args, **kwargs) |
+ except abandonment.Abandoned: |
+ return False |
+ else: |
+ return True |
+ return wrapped |
+ |
+ |
+class _IngestionManager(_interfaces.IngestionManager): |
+ """An implementation of _interfaces.IngestionManager.""" |
+ |
+ def __init__( |
+ self, lock, pool, subscription, subscription_creator, termination_manager, |
+ transmission_manager, expiration_manager, protocol_manager): |
+ """Constructor. |
+ |
+ Args: |
+ lock: The operation-wide lock. |
+ pool: A thread pool in which to execute customer code. |
+ subscription: A base.Subscription describing the customer's interest in |
+ operation values from the other side. May be None if |
+ subscription_creator is not None. |
+ subscription_creator: A _SubscriptionCreator wrapping the portion of |
+ customer code that when called returns the base.Subscription describing |
+ the customer's interest in operation values from the other side. May be |
+ None if subscription is not None. |
+ termination_manager: The _interfaces.TerminationManager for the operation. |
+ transmission_manager: The _interfaces.TransmissionManager for the |
+ operation. |
+ expiration_manager: The _interfaces.ExpirationManager for the operation. |
+ protocol_manager: The _interfaces.ProtocolManager for the operation. |
+ """ |
+ self._lock = lock |
+ self._pool = pool |
+ self._termination_manager = termination_manager |
+ self._transmission_manager = transmission_manager |
+ self._expiration_manager = expiration_manager |
+ self._protocol_manager = protocol_manager |
+ |
+ if subscription is None: |
+ self._subscription_creator = subscription_creator |
+ self._wrapped_operator = None |
+ elif subscription.kind is base.Subscription.Kind.FULL: |
+ self._subscription_creator = None |
+ self._wrapped_operator = _wrap(subscription.operator.advance) |
+ else: |
+ # TODO(nathaniel): Support other subscriptions. |
+ raise ValueError('Unsupported subscription "%s"!' % subscription.kind) |
+ self._pending_initial_metadata = None |
+ self._pending_payloads = [] |
+ self._pending_completion = None |
+ self._local_allowance = 1 |
+ # A nonnegative integer or None, with None indicating that the local |
+ # customer is done emitting anyway so there's no need to bother it by |
+ # informing it that the remote customer has granted it further permission to |
+ # emit. |
+ self._remote_allowance = 0 |
+ self._processing = False |
+ |
+ def _abort_internal_only(self): |
+ self._subscription_creator = None |
+ self._wrapped_operator = None |
+ self._pending_initial_metadata = None |
+ self._pending_payloads = None |
+ self._pending_completion = None |
+ |
+ def _abort_and_notify(self, outcome_kind, code, details): |
+ self._abort_internal_only() |
+ if self._termination_manager.outcome is None: |
+ outcome = _utilities.Outcome(outcome_kind, code, details) |
+ self._termination_manager.abort(outcome) |
+ self._transmission_manager.abort(outcome) |
+ self._expiration_manager.terminate() |
+ |
+ def _operator_next(self): |
+ """Computes the next step for full-subscription ingestion. |
+ |
+ Returns: |
+ An initial_metadata, payload, completion, allowance, continue quintet |
+ indicating what operation values (if any) are available to pass into |
+ customer code and whether or not there is anything immediately |
+ actionable to call customer code to do. |
+ """ |
+ if self._wrapped_operator is None: |
+ return None, None, None, None, False |
+ else: |
+ initial_metadata, payload, completion, allowance, action = [None] * 5 |
+ if self._pending_initial_metadata is not None: |
+ initial_metadata = self._pending_initial_metadata |
+ self._pending_initial_metadata = None |
+ action = True |
+ if self._pending_payloads and 0 < self._local_allowance: |
+ payload = self._pending_payloads.pop(0) |
+ self._local_allowance -= 1 |
+ action = True |
+ if not self._pending_payloads and self._pending_completion is not None: |
+ completion = self._pending_completion |
+ self._pending_completion = None |
+ action = True |
+ if self._remote_allowance is not None and 0 < self._remote_allowance: |
+ allowance = self._remote_allowance |
+ self._remote_allowance = 0 |
+ action = True |
+ return initial_metadata, payload, completion, allowance, bool(action) |
+ |
+ def _operator_process( |
+ self, wrapped_operator, initial_metadata, payload, |
+ completion, allowance): |
+ while True: |
+ advance_outcome = callable_util.call_logging_exceptions( |
+ wrapped_operator, _INGESTION_EXCEPTION_LOG_MESSAGE, |
+ initial_metadata=initial_metadata, payload=payload, |
+ completion=completion, allowance=allowance) |
+ if advance_outcome.exception is None: |
+ if advance_outcome.return_value: |
+ with self._lock: |
+ if self._termination_manager.outcome is not None: |
+ return |
+ if completion is not None: |
+ self._termination_manager.ingestion_complete() |
+ initial_metadata, payload, completion, allowance, moar = ( |
+ self._operator_next()) |
+ if not moar: |
+ self._processing = False |
+ return |
+ else: |
+ with self._lock: |
+ if self._termination_manager.outcome is None: |
+ self._abort_and_notify( |
+ base.Outcome.Kind.LOCAL_FAILURE, None, None) |
+ return |
+ else: |
+ with self._lock: |
+ if self._termination_manager.outcome is None: |
+ self._abort_and_notify(base.Outcome.Kind.LOCAL_FAILURE, None, None) |
+ return |
+ |
+ def _operator_post_create(self, subscription): |
+ wrapped_operator = _wrap(subscription.operator.advance) |
+ with self._lock: |
+ if self._termination_manager.outcome is not None: |
+ return |
+ self._wrapped_operator = wrapped_operator |
+ self._subscription_creator = None |
+ metadata, payload, completion, allowance, moar = self._operator_next() |
+ if not moar: |
+ self._processing = False |
+ return |
+ self._operator_process( |
+ wrapped_operator, metadata, payload, completion, allowance) |
+ |
+ def _create(self, subscription_creator, group, name): |
+ outcome = callable_util.call_logging_exceptions( |
+ subscription_creator.create, |
+ _CREATE_SUBSCRIPTION_EXCEPTION_LOG_MESSAGE, group, name) |
+ if outcome.return_value is None: |
+ with self._lock: |
+ if self._termination_manager.outcome is None: |
+ self._abort_and_notify(base.Outcome.Kind.LOCAL_FAILURE, None, None) |
+ elif outcome.return_value.kind is _SubscriptionCreation.Kind.ABANDONED: |
+ with self._lock: |
+ if self._termination_manager.outcome is None: |
+ self._abort_and_notify(base.Outcome.Kind.LOCAL_FAILURE, None, None) |
+ elif outcome.return_value.kind is _SubscriptionCreation.Kind.REMOTE_ERROR: |
+ code = outcome.return_value.code |
+ details = outcome.return_value.details |
+ with self._lock: |
+ if self._termination_manager.outcome is None: |
+ self._abort_and_notify( |
+ base.Outcome.Kind.REMOTE_FAILURE, code, details) |
+ elif outcome.return_value.subscription.kind is base.Subscription.Kind.FULL: |
+ self._protocol_manager.set_protocol_receiver( |
+ outcome.return_value.subscription.protocol_receiver) |
+ self._operator_post_create(outcome.return_value.subscription) |
+ else: |
+ # TODO(nathaniel): Support other subscriptions. |
+ raise ValueError( |
+ 'Unsupported "%s"!' % outcome.return_value.subscription.kind) |
+ |
+ def _store_advance(self, initial_metadata, payload, completion, allowance): |
+ if initial_metadata is not None: |
+ self._pending_initial_metadata = initial_metadata |
+ if payload is not None: |
+ self._pending_payloads.append(payload) |
+ if completion is not None: |
+ self._pending_completion = completion |
+ if allowance is not None and self._remote_allowance is not None: |
+ self._remote_allowance += allowance |
+ |
+ def _operator_advance(self, initial_metadata, payload, completion, allowance): |
+ if self._processing: |
+ self._store_advance(initial_metadata, payload, completion, allowance) |
+ else: |
+ action = False |
+ if initial_metadata is not None: |
+ action = True |
+ if payload is not None: |
+ if 0 < self._local_allowance: |
+ self._local_allowance -= 1 |
+ action = True |
+ else: |
+ self._pending_payloads.append(payload) |
+ payload = False |
+ if completion is not None: |
+ if self._pending_payloads: |
+ self._pending_completion = completion |
+ else: |
+ action = True |
+ if allowance is not None and self._remote_allowance is not None: |
+ allowance += self._remote_allowance |
+ self._remote_allowance = 0 |
+ action = True |
+ if action: |
+ self._pool.submit( |
+ callable_util.with_exceptions_logged( |
+ self._operator_process, _constants.INTERNAL_ERROR_LOG_MESSAGE), |
+ self._wrapped_operator, initial_metadata, payload, completion, |
+ allowance) |
+ |
+ def set_group_and_method(self, group, method): |
+ """See _interfaces.IngestionManager.set_group_and_method for spec.""" |
+ if self._subscription_creator is not None and not self._processing: |
+ self._pool.submit( |
+ callable_util.with_exceptions_logged( |
+ self._create, _constants.INTERNAL_ERROR_LOG_MESSAGE), |
+ self._subscription_creator, group, method) |
+ self._processing = True |
+ |
+ def add_local_allowance(self, allowance): |
+ """See _interfaces.IngestionManager.add_local_allowance for spec.""" |
+ if any((self._subscription_creator, self._wrapped_operator,)): |
+ self._local_allowance += allowance |
+ if not self._processing: |
+ initial_metadata, payload, completion, allowance, moar = ( |
+ self._operator_next()) |
+ if moar: |
+ self._pool.submit( |
+ callable_util.with_exceptions_logged( |
+ self._operator_process, |
+ _constants.INTERNAL_ERROR_LOG_MESSAGE), |
+ initial_metadata, payload, completion, allowance) |
+ |
+ def local_emissions_done(self): |
+ self._remote_allowance = None |
+ |
+ def advance(self, initial_metadata, payload, completion, allowance): |
+ """See _interfaces.IngestionManager.advance for specification.""" |
+ if self._subscription_creator is not None: |
+ self._store_advance(initial_metadata, payload, completion, allowance) |
+ elif self._wrapped_operator is not None: |
+ self._operator_advance(initial_metadata, payload, completion, allowance) |
+ |
+ |
+def invocation_ingestion_manager( |
+ subscription, lock, pool, termination_manager, transmission_manager, |
+ expiration_manager, protocol_manager): |
+ """Creates an IngestionManager appropriate for invocation-side use. |
+ |
+ Args: |
+ subscription: A base.Subscription indicating the customer's interest in the |
+ data and results from the service-side of the operation. |
+ lock: The operation-wide lock. |
+ pool: A thread pool in which to execute customer code. |
+ termination_manager: The _interfaces.TerminationManager for the operation. |
+ transmission_manager: The _interfaces.TransmissionManager for the |
+ operation. |
+ expiration_manager: The _interfaces.ExpirationManager for the operation. |
+ protocol_manager: The _interfaces.ProtocolManager for the operation. |
+ |
+ Returns: |
+ An IngestionManager appropriate for invocation-side use. |
+ """ |
+ return _IngestionManager( |
+ lock, pool, subscription, None, termination_manager, transmission_manager, |
+ expiration_manager, protocol_manager) |
+ |
+ |
+def service_ingestion_manager( |
+ servicer, operation_context, output_operator, lock, pool, |
+ termination_manager, transmission_manager, expiration_manager, |
+ protocol_manager): |
+ """Creates an IngestionManager appropriate for service-side use. |
+ |
+ The returned IngestionManager will require its set_group_and_name method to be |
+ called before its advance method may be called. |
+ |
+ Args: |
+ servicer: A base.Servicer for servicing the operation. |
+ operation_context: A base.OperationContext for the operation to be passed to |
+ the customer. |
+ output_operator: A base.Operator for the operation to be passed to the |
+ customer and to be called by the customer to accept operation data output |
+ by the customer. |
+ lock: The operation-wide lock. |
+ pool: A thread pool in which to execute customer code. |
+ termination_manager: The _interfaces.TerminationManager for the operation. |
+ transmission_manager: The _interfaces.TransmissionManager for the |
+ operation. |
+ expiration_manager: The _interfaces.ExpirationManager for the operation. |
+ protocol_manager: The _interfaces.ProtocolManager for the operation. |
+ |
+ Returns: |
+ An IngestionManager appropriate for service-side use. |
+ """ |
+ subscription_creator = _ServiceSubscriptionCreator( |
+ servicer, operation_context, output_operator) |
+ return _IngestionManager( |
+ lock, pool, None, subscription_creator, termination_manager, |
+ transmission_manager, expiration_manager, protocol_manager) |