| Index: third_party/grpc/src/python/grpcio/grpc/framework/core/_end.py
|
| diff --git a/third_party/grpc/src/python/grpcio/grpc/framework/core/_end.py b/third_party/grpc/src/python/grpcio/grpc/framework/core/_end.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..9c615672aa71ee843f976cce15b97f68012bb6e1
|
| --- /dev/null
|
| +++ b/third_party/grpc/src/python/grpcio/grpc/framework/core/_end.py
|
| @@ -0,0 +1,243 @@
|
| +# Copyright 2015, Google Inc.
|
| +# All rights reserved.
|
| +#
|
| +# Redistribution and use in source and binary forms, with or without
|
| +# modification, are permitted provided that the following conditions are
|
| +# met:
|
| +#
|
| +# * Redistributions of source code must retain the above copyright
|
| +# notice, this list of conditions and the following disclaimer.
|
| +# * Redistributions in binary form must reproduce the above
|
| +# copyright notice, this list of conditions and the following disclaimer
|
| +# in the documentation and/or other materials provided with the
|
| +# distribution.
|
| +# * Neither the name of Google Inc. nor the names of its
|
| +# contributors may be used to endorse or promote products derived from
|
| +# this software without specific prior written permission.
|
| +#
|
| +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
| +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
| +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
| +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
| +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
| +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
| +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
| +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
| +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
| +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
| +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
| +
|
| +"""Implementation of base.End."""
|
| +
|
| +import abc
|
| +import threading
|
| +import uuid
|
| +
|
| +from grpc.framework.core import _operation
|
| +from grpc.framework.core import _utilities
|
| +from grpc.framework.foundation import callable_util
|
| +from grpc.framework.foundation import later
|
| +from grpc.framework.foundation import logging_pool
|
| +from grpc.framework.interfaces.base import base
|
| +from grpc.framework.interfaces.links import links
|
| +from grpc.framework.interfaces.links import utilities
|
| +
|
| +_IDLE_ACTION_EXCEPTION_LOG_MESSAGE = 'Exception calling idle action!'
|
| +
|
| +
|
| +class End(base.End, links.Link):
|
| + """A bridge between base.End and links.Link.
|
| +
|
| + Implementations of this interface translate arriving tickets into
|
| + calls on application objects implementing base interfaces and
|
| + translate calls from application objects implementing base interfaces
|
| + into tickets sent to a joined link.
|
| + """
|
| + __metaclass__ = abc.ABCMeta
|
| +
|
| +
|
| +class _Cycle(object):
|
| + """State for a single start-stop End lifecycle."""
|
| +
|
| + def __init__(self, pool):
|
| + self.pool = pool
|
| + self.grace = False
|
| + self.futures = []
|
| + self.operations = {}
|
| + self.idle_actions = []
|
| +
|
| +
|
| +def _abort(operations):
|
| + for operation in operations:
|
| + operation.abort(base.Outcome.Kind.LOCAL_SHUTDOWN)
|
| +
|
| +
|
| +def _cancel_futures(futures):
|
| + for future in futures:
|
| + future.cancel()
|
| +
|
| +
|
| +def _future_shutdown(lock, cycle, event):
|
| + def in_future():
|
| + with lock:
|
| + _abort(cycle.operations.values())
|
| + _cancel_futures(cycle.futures)
|
| + return in_future
|
| +
|
| +
|
| +class _End(End):
|
| + """An End implementation."""
|
| +
|
| + def __init__(self, servicer_package):
|
| + """Constructor.
|
| +
|
| + Args:
|
| + servicer_package: A _ServicerPackage for servicing operations or None if
|
| + this end will not be used to service operations.
|
| + """
|
| + self._lock = threading.Condition()
|
| + self._servicer_package = servicer_package
|
| +
|
| + self._stats = {outcome_kind: 0 for outcome_kind in base.Outcome.Kind}
|
| +
|
| + self._mate = None
|
| +
|
| + self._cycle = None
|
| +
|
| + def _termination_action(self, operation_id):
|
| + """Constructs the termination action for a single operation.
|
| +
|
| + Args:
|
| + operation_id: The operation ID for the termination action.
|
| +
|
| + Returns:
|
| + A callable that takes an operation outcome kind as its sole parameter and
|
| + that should be used as the termination action for the operation
|
| + associated with the given operation ID.
|
| + """
|
| + def termination_action(outcome_kind):
|
| + with self._lock:
|
| + self._stats[outcome_kind] += 1
|
| + self._cycle.operations.pop(operation_id, None)
|
| + if not self._cycle.operations:
|
| + for action in self._cycle.idle_actions:
|
| + self._cycle.pool.submit(action)
|
| + self._cycle.idle_actions = []
|
| + if self._cycle.grace:
|
| + _cancel_futures(self._cycle.futures)
|
| + self._cycle.pool.shutdown(wait=False)
|
| + self._cycle = None
|
| + return termination_action
|
| +
|
| + def start(self):
|
| + """See base.End.start for specification."""
|
| + with self._lock:
|
| + if self._cycle is not None:
|
| + raise ValueError('Tried to start a not-stopped End!')
|
| + else:
|
| + self._cycle = _Cycle(logging_pool.pool(1))
|
| +
|
| + def stop(self, grace):
|
| + """See base.End.stop for specification."""
|
| + with self._lock:
|
| + if self._cycle is None:
|
| + event = threading.Event()
|
| + event.set()
|
| + return event
|
| + elif not self._cycle.operations:
|
| + event = threading.Event()
|
| + self._cycle.pool.submit(event.set)
|
| + self._cycle.pool.shutdown(wait=False)
|
| + self._cycle = None
|
| + return event
|
| + else:
|
| + self._cycle.grace = True
|
| + event = threading.Event()
|
| + self._cycle.idle_actions.append(event.set)
|
| + if 0 < grace:
|
| + future = later.later(
|
| + grace, _future_shutdown(self._lock, self._cycle, event))
|
| + self._cycle.futures.append(future)
|
| + else:
|
| + _abort(self._cycle.operations.values())
|
| + return event
|
| +
|
| + def operate(
|
| + self, group, method, subscription, timeout, initial_metadata=None,
|
| + payload=None, completion=None, protocol_options=None):
|
| + """See base.End.operate for specification."""
|
| + operation_id = uuid.uuid4()
|
| + with self._lock:
|
| + if self._cycle is None or self._cycle.grace:
|
| + raise ValueError('Can\'t operate on stopped or stopping End!')
|
| + termination_action = self._termination_action(operation_id)
|
| + operation = _operation.invocation_operate(
|
| + operation_id, group, method, subscription, timeout, protocol_options,
|
| + initial_metadata, payload, completion, self._mate.accept_ticket,
|
| + termination_action, self._cycle.pool)
|
| + self._cycle.operations[operation_id] = operation
|
| + return operation.context, operation.operator
|
| +
|
| + def operation_stats(self):
|
| + """See base.End.operation_stats for specification."""
|
| + with self._lock:
|
| + return dict(self._stats)
|
| +
|
| + def add_idle_action(self, action):
|
| + """See base.End.add_idle_action for specification."""
|
| + with self._lock:
|
| + if self._cycle is None:
|
| + raise ValueError('Can\'t add idle action to stopped End!')
|
| + action_with_exceptions_logged = callable_util.with_exceptions_logged(
|
| + action, _IDLE_ACTION_EXCEPTION_LOG_MESSAGE)
|
| + if self._cycle.operations:
|
| + self._cycle.idle_actions.append(action_with_exceptions_logged)
|
| + else:
|
| + self._cycle.pool.submit(action_with_exceptions_logged)
|
| +
|
| + def accept_ticket(self, ticket):
|
| + """See links.Link.accept_ticket for specification."""
|
| + with self._lock:
|
| + if self._cycle is not None:
|
| + operation = self._cycle.operations.get(ticket.operation_id)
|
| + if operation is not None:
|
| + operation.handle_ticket(ticket)
|
| + elif self._servicer_package is not None and not self._cycle.grace:
|
| + termination_action = self._termination_action(ticket.operation_id)
|
| + operation = _operation.service_operate(
|
| + self._servicer_package, ticket, self._mate.accept_ticket,
|
| + termination_action, self._cycle.pool)
|
| + if operation is not None:
|
| + self._cycle.operations[ticket.operation_id] = operation
|
| +
|
| + def join_link(self, link):
|
| + """See links.Link.join_link for specification."""
|
| + with self._lock:
|
| + self._mate = utilities.NULL_LINK if link is None else link
|
| +
|
| +
|
| +def serviceless_end_link():
|
| + """Constructs an End usable only for invoking operations.
|
| +
|
| + Returns:
|
| + An End usable for translating operations into ticket exchange.
|
| + """
|
| + return _End(None)
|
| +
|
| +
|
| +def serviceful_end_link(servicer, default_timeout, maximum_timeout):
|
| + """Constructs an End capable of servicing operations.
|
| +
|
| + Args:
|
| + servicer: An interfaces.Servicer for servicing operations.
|
| + default_timeout: A length of time in seconds to be used as the default
|
| + time alloted for a single operation.
|
| + maximum_timeout: A length of time in seconds to be used as the maximum
|
| + time alloted for a single operation.
|
| +
|
| + Returns:
|
| + An End capable of servicing the operations requested of it through ticket
|
| + exchange.
|
| + """
|
| + return _End(
|
| + _utilities.ServicerPackage(servicer, default_timeout, maximum_timeout))
|
|
|