Index: third_party/grpc/src/python/grpcio/grpc/framework/base/_ends.py |
diff --git a/third_party/grpc/src/python/grpcio/grpc/framework/base/_ends.py b/third_party/grpc/src/python/grpcio/grpc/framework/base/_ends.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..176f3ac06e3d77e5f15d96258fe0456e1132764b |
--- /dev/null |
+++ b/third_party/grpc/src/python/grpcio/grpc/framework/base/_ends.py |
@@ -0,0 +1,399 @@ |
+# Copyright 2015, Google Inc. |
+# All rights reserved. |
+# |
+# Redistribution and use in source and binary forms, with or without |
+# modification, are permitted provided that the following conditions are |
+# met: |
+# |
+# * Redistributions of source code must retain the above copyright |
+# notice, this list of conditions and the following disclaimer. |
+# * Redistributions in binary form must reproduce the above |
+# copyright notice, this list of conditions and the following disclaimer |
+# in the documentation and/or other materials provided with the |
+# distribution. |
+# * Neither the name of Google Inc. nor the names of its |
+# contributors may be used to endorse or promote products derived from |
+# this software without specific prior written permission. |
+# |
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
+ |
+"""Implementations of FrontLinks and BackLinks.""" |
+ |
+import collections |
+import threading |
+import uuid |
+ |
+# _interfaces is referenced from specification in this module. |
+from grpc.framework.base import _cancellation |
+from grpc.framework.base import _context |
+from grpc.framework.base import _emission |
+from grpc.framework.base import _expiration |
+from grpc.framework.base import _ingestion |
+from grpc.framework.base import _interfaces # pylint: disable=unused-import |
+from grpc.framework.base import _reception |
+from grpc.framework.base import _termination |
+from grpc.framework.base import _transmission |
+from grpc.framework.base import interfaces |
+from grpc.framework.foundation import callable_util |
+ |
+_IDLE_ACTION_EXCEPTION_LOG_MESSAGE = 'Exception calling idle action!' |
+ |
+ |
+class _EasyOperation(interfaces.Operation): |
+ """A trivial implementation of interfaces.Operation.""" |
+ |
+ def __init__(self, emission_manager, context, cancellation_manager): |
+ """Constructor. |
+ |
+ Args: |
+ emission_manager: The _interfaces.EmissionManager for the operation that |
+ will accept values emitted by customer code. |
+ context: The interfaces.OperationContext for use by the customer |
+ during the operation. |
+ cancellation_manager: The _interfaces.CancellationManager for the |
+ operation. |
+ """ |
+ self.consumer = emission_manager |
+ self.context = context |
+ self._cancellation_manager = cancellation_manager |
+ |
+ def cancel(self): |
+ self._cancellation_manager.cancel() |
+ |
+ |
+class _Endlette(object): |
+ """Utility for stateful behavior common to Fronts and Backs.""" |
+ |
+ def __init__(self, pool): |
+ """Constructor. |
+ |
+ Args: |
+ pool: A thread pool to use when calling registered idle actions. |
+ """ |
+ self._lock = threading.Lock() |
+ self._pool = pool |
+ # Dictionary from operation IDs to ReceptionManager-or-None. A None value |
+ # indicates an in-progress fire-and-forget operation for which the customer |
+ # has chosen to ignore results. |
+ self._operations = {} |
+ self._stats = {outcome: 0 for outcome in interfaces.Outcome} |
+ self._idle_actions = [] |
+ |
+ def terminal_action(self, operation_id): |
+ """Constructs the termination action for a single operation. |
+ |
+ Args: |
+ operation_id: An operation ID. |
+ |
+ Returns: |
+ A callable that takes an operation outcome for an argument to be used as |
+ the termination action for the operation associated with the given |
+ operation ID. |
+ """ |
+ def termination_action(outcome): |
+ with self._lock: |
+ self._stats[outcome] += 1 |
+ self._operations.pop(operation_id, None) |
+ if not self._operations: |
+ for action in self._idle_actions: |
+ self._pool.submit(callable_util.with_exceptions_logged( |
+ action, _IDLE_ACTION_EXCEPTION_LOG_MESSAGE)) |
+ self._idle_actions = [] |
+ return termination_action |
+ |
+ def __enter__(self): |
+ self._lock.acquire() |
+ |
+ def __exit__(self, exc_type, exc_val, exc_tb): |
+ self._lock.release() |
+ |
+ def get_operation(self, operation_id): |
+ return self._operations.get(operation_id, None) |
+ |
+ def add_operation(self, operation_id, operation_reception_manager): |
+ self._operations[operation_id] = operation_reception_manager |
+ |
+ def operation_stats(self): |
+ with self._lock: |
+ return dict(self._stats) |
+ |
+ def add_idle_action(self, action): |
+ with self._lock: |
+ if self._operations: |
+ self._idle_actions.append(action) |
+ else: |
+ self._pool.submit(callable_util.with_exceptions_logged( |
+ action, _IDLE_ACTION_EXCEPTION_LOG_MESSAGE)) |
+ |
+ |
+class _FrontManagement( |
+ collections.namedtuple( |
+ '_FrontManagement', |
+ ('reception', 'emission', 'operation', 'cancellation'))): |
+ """Just a trivial helper class to bundle four fellow-traveling objects.""" |
+ |
+ |
+def _front_operate( |
+ callback, work_pool, transmission_pool, utility_pool, |
+ termination_action, operation_id, name, payload, complete, timeout, |
+ subscription, trace_id): |
+ """Constructs objects necessary for front-side operation management. |
+ |
+ Args: |
+ callback: A callable that accepts interfaces.FrontToBackTickets and |
+ delivers them to the other side of the operation. Execution of this |
+ callable may take any arbitrary length of time. |
+ work_pool: A thread pool in which to execute customer code. |
+ transmission_pool: A thread pool to use for transmitting to the other side |
+ of the operation. |
+ utility_pool: A thread pool for utility tasks. |
+ termination_action: A no-arg behavior to be called upon operation |
+ completion. |
+ operation_id: An object identifying the operation. |
+ name: The name of the method being called during the operation. |
+ payload: The first customer-significant value to be transmitted to the other |
+ side. May be None if there is no such value or if the customer chose not |
+ to pass it at operation invocation. |
+ complete: A boolean indicating whether or not additional payloads will be |
+ supplied by the customer. |
+ timeout: A length of time in seconds to allow for the operation. |
+ subscription: A interfaces.ServicedSubscription describing the |
+ customer's interest in the results of the operation. |
+ trace_id: A uuid.UUID identifying a set of related operations to which this |
+ operation belongs. May be None. |
+ |
+ Returns: |
+ A _FrontManagement object bundling together the |
+ _interfaces.ReceptionManager, _interfaces.EmissionManager, |
+ _context.OperationContext, and _interfaces.CancellationManager for the |
+ operation. |
+ """ |
+ lock = threading.Lock() |
+ with lock: |
+ termination_manager = _termination.front_termination_manager( |
+ work_pool, utility_pool, termination_action, subscription.kind) |
+ transmission_manager = _transmission.front_transmission_manager( |
+ lock, transmission_pool, callback, operation_id, name, |
+ subscription.kind, trace_id, timeout, termination_manager) |
+ operation_context = _context.OperationContext( |
+ lock, operation_id, interfaces.Outcome.SERVICED_FAILURE, |
+ termination_manager, transmission_manager) |
+ emission_manager = _emission.front_emission_manager( |
+ lock, termination_manager, transmission_manager) |
+ ingestion_manager = _ingestion.front_ingestion_manager( |
+ lock, work_pool, subscription, termination_manager, |
+ transmission_manager, operation_context) |
+ expiration_manager = _expiration.front_expiration_manager( |
+ lock, termination_manager, transmission_manager, ingestion_manager, |
+ timeout) |
+ reception_manager = _reception.front_reception_manager( |
+ lock, termination_manager, transmission_manager, ingestion_manager, |
+ expiration_manager) |
+ cancellation_manager = _cancellation.CancellationManager( |
+ lock, termination_manager, transmission_manager, ingestion_manager, |
+ expiration_manager) |
+ |
+ termination_manager.set_expiration_manager(expiration_manager) |
+ transmission_manager.set_ingestion_and_expiration_managers( |
+ ingestion_manager, expiration_manager) |
+ operation_context.set_ingestion_and_expiration_managers( |
+ ingestion_manager, expiration_manager) |
+ emission_manager.set_ingestion_manager_and_expiration_manager( |
+ ingestion_manager, expiration_manager) |
+ ingestion_manager.set_expiration_manager(expiration_manager) |
+ |
+ transmission_manager.inmit(payload, complete) |
+ |
+ if subscription.kind is interfaces.ServicedSubscription.Kind.NONE: |
+ returned_reception_manager = None |
+ else: |
+ returned_reception_manager = reception_manager |
+ |
+ return _FrontManagement( |
+ returned_reception_manager, emission_manager, operation_context, |
+ cancellation_manager) |
+ |
+ |
+class FrontLink(interfaces.FrontLink): |
+ """An implementation of interfaces.FrontLink.""" |
+ |
+ def __init__(self, work_pool, transmission_pool, utility_pool): |
+ """Constructor. |
+ |
+ Args: |
+ work_pool: A thread pool to be used for executing customer code. |
+ transmission_pool: A thread pool to be used for transmitting values to |
+ the other side of the operation. |
+ utility_pool: A thread pool to be used for utility tasks. |
+ """ |
+ self._endlette = _Endlette(utility_pool) |
+ self._work_pool = work_pool |
+ self._transmission_pool = transmission_pool |
+ self._utility_pool = utility_pool |
+ self._callback = None |
+ |
+ self._operations = {} |
+ |
+ def join_rear_link(self, rear_link): |
+ """See interfaces.ForeLink.join_rear_link for specification.""" |
+ with self._endlette: |
+ self._callback = rear_link.accept_front_to_back_ticket |
+ |
+ def operation_stats(self): |
+ """See interfaces.End.operation_stats for specification.""" |
+ return self._endlette.operation_stats() |
+ |
+ def add_idle_action(self, action): |
+ """See interfaces.End.add_idle_action for specification.""" |
+ self._endlette.add_idle_action(action) |
+ |
+ def operate( |
+ self, name, payload, complete, timeout, subscription, trace_id): |
+ """See interfaces.Front.operate for specification.""" |
+ operation_id = uuid.uuid4() |
+ with self._endlette: |
+ management = _front_operate( |
+ self._callback, self._work_pool, self._transmission_pool, |
+ self._utility_pool, self._endlette.terminal_action(operation_id), |
+ operation_id, name, payload, complete, timeout, subscription, |
+ trace_id) |
+ self._endlette.add_operation(operation_id, management.reception) |
+ return _EasyOperation( |
+ management.emission, management.operation, management.cancellation) |
+ |
+ def accept_back_to_front_ticket(self, ticket): |
+ """See interfaces.End.act for specification.""" |
+ with self._endlette: |
+ reception_manager = self._endlette.get_operation(ticket.operation_id) |
+ if reception_manager: |
+ reception_manager.receive_ticket(ticket) |
+ |
+ |
+def _back_operate( |
+ servicer, callback, work_pool, transmission_pool, utility_pool, |
+ termination_action, ticket, default_timeout, maximum_timeout): |
+ """Constructs objects necessary for back-side operation management. |
+ |
+ Also begins back-side operation by feeding the first received ticket into the |
+ constructed _interfaces.ReceptionManager. |
+ |
+ Args: |
+ servicer: An interfaces.Servicer for servicing operations. |
+ callback: A callable that accepts interfaces.BackToFrontTickets and |
+ delivers them to the other side of the operation. Execution of this |
+ callable may take any arbitrary length of time. |
+ work_pool: A thread pool in which to execute customer code. |
+ transmission_pool: A thread pool to use for transmitting to the other side |
+ of the operation. |
+ utility_pool: A thread pool for utility tasks. |
+ termination_action: A no-arg behavior to be called upon operation |
+ completion. |
+ ticket: The first interfaces.FrontToBackTicket received for the operation. |
+ default_timeout: A length of time in seconds to be used as the default |
+ time alloted for a single operation. |
+ maximum_timeout: A length of time in seconds to be used as the maximum |
+ time alloted for a single operation. |
+ |
+ Returns: |
+ The _interfaces.ReceptionManager to be used for the operation. |
+ """ |
+ lock = threading.Lock() |
+ with lock: |
+ termination_manager = _termination.back_termination_manager( |
+ work_pool, utility_pool, termination_action, ticket.subscription) |
+ transmission_manager = _transmission.back_transmission_manager( |
+ lock, transmission_pool, callback, ticket.operation_id, |
+ termination_manager, ticket.subscription) |
+ operation_context = _context.OperationContext( |
+ lock, ticket.operation_id, interfaces.Outcome.SERVICER_FAILURE, |
+ termination_manager, transmission_manager) |
+ emission_manager = _emission.back_emission_manager( |
+ lock, termination_manager, transmission_manager) |
+ ingestion_manager = _ingestion.back_ingestion_manager( |
+ lock, work_pool, servicer, termination_manager, |
+ transmission_manager, operation_context, emission_manager) |
+ expiration_manager = _expiration.back_expiration_manager( |
+ lock, termination_manager, transmission_manager, ingestion_manager, |
+ ticket.timeout, default_timeout, maximum_timeout) |
+ reception_manager = _reception.back_reception_manager( |
+ lock, termination_manager, transmission_manager, ingestion_manager, |
+ expiration_manager) |
+ |
+ termination_manager.set_expiration_manager(expiration_manager) |
+ transmission_manager.set_ingestion_and_expiration_managers( |
+ ingestion_manager, expiration_manager) |
+ operation_context.set_ingestion_and_expiration_managers( |
+ ingestion_manager, expiration_manager) |
+ emission_manager.set_ingestion_manager_and_expiration_manager( |
+ ingestion_manager, expiration_manager) |
+ ingestion_manager.set_expiration_manager(expiration_manager) |
+ |
+ reception_manager.receive_ticket(ticket) |
+ |
+ return reception_manager |
+ |
+ |
+class BackLink(interfaces.BackLink): |
+ """An implementation of interfaces.BackLink.""" |
+ |
+ def __init__( |
+ self, servicer, work_pool, transmission_pool, utility_pool, |
+ default_timeout, maximum_timeout): |
+ """Constructor. |
+ |
+ Args: |
+ servicer: An interfaces.Servicer for servicing operations. |
+ work_pool: A thread pool in which to execute customer code. |
+ transmission_pool: A thread pool to use for transmitting to the other side |
+ of the operation. |
+ utility_pool: A thread pool for utility tasks. |
+ default_timeout: A length of time in seconds to be used as the default |
+ time alloted for a single operation. |
+ maximum_timeout: A length of time in seconds to be used as the maximum |
+ time alloted for a single operation. |
+ """ |
+ self._endlette = _Endlette(utility_pool) |
+ self._servicer = servicer |
+ self._work_pool = work_pool |
+ self._transmission_pool = transmission_pool |
+ self._utility_pool = utility_pool |
+ self._default_timeout = default_timeout |
+ self._maximum_timeout = maximum_timeout |
+ self._callback = None |
+ |
+ def join_fore_link(self, fore_link): |
+ """See interfaces.RearLink.join_fore_link for specification.""" |
+ with self._endlette: |
+ self._callback = fore_link.accept_back_to_front_ticket |
+ |
+ def accept_front_to_back_ticket(self, ticket): |
+ """See interfaces.RearLink.accept_front_to_back_ticket for specification.""" |
+ with self._endlette: |
+ reception_manager = self._endlette.get_operation(ticket.operation_id) |
+ if reception_manager is None: |
+ reception_manager = _back_operate( |
+ self._servicer, self._callback, self._work_pool, |
+ self._transmission_pool, self._utility_pool, |
+ self._endlette.terminal_action(ticket.operation_id), ticket, |
+ self._default_timeout, self._maximum_timeout) |
+ self._endlette.add_operation(ticket.operation_id, reception_manager) |
+ else: |
+ reception_manager.receive_ticket(ticket) |
+ |
+ def operation_stats(self): |
+ """See interfaces.End.operation_stats for specification.""" |
+ return self._endlette.operation_stats() |
+ |
+ def add_idle_action(self, action): |
+ """See interfaces.End.add_idle_action for specification.""" |
+ self._endlette.add_idle_action(action) |