| Index: third_party/grpc/src/python/grpcio/grpc/framework/base/_transmission.py
|
| diff --git a/third_party/grpc/src/python/grpcio/grpc/framework/base/_transmission.py b/third_party/grpc/src/python/grpcio/grpc/framework/base/_transmission.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..684512923443a8ca4e85ec5989762f77180cdd04
|
| --- /dev/null
|
| +++ b/third_party/grpc/src/python/grpcio/grpc/framework/base/_transmission.py
|
| @@ -0,0 +1,429 @@
|
| +# Copyright 2015, Google Inc.
|
| +# All rights reserved.
|
| +#
|
| +# Redistribution and use in source and binary forms, with or without
|
| +# modification, are permitted provided that the following conditions are
|
| +# met:
|
| +#
|
| +# * Redistributions of source code must retain the above copyright
|
| +# notice, this list of conditions and the following disclaimer.
|
| +# * Redistributions in binary form must reproduce the above
|
| +# copyright notice, this list of conditions and the following disclaimer
|
| +# in the documentation and/or other materials provided with the
|
| +# distribution.
|
| +# * Neither the name of Google Inc. nor the names of its
|
| +# contributors may be used to endorse or promote products derived from
|
| +# this software without specific prior written permission.
|
| +#
|
| +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
| +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
| +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
| +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
| +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
| +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
| +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
| +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
| +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
| +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
| +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
| +
|
| +"""State and behavior for ticket transmission during an operation."""
|
| +
|
| +import abc
|
| +
|
| +from grpc.framework.base import _constants
|
| +from grpc.framework.base import _interfaces
|
| +from grpc.framework.base import interfaces
|
| +from grpc.framework.foundation import callable_util
|
| +
|
| +_TRANSMISSION_EXCEPTION_LOG_MESSAGE = 'Exception during transmission!'
|
| +
|
| +_FRONT_TO_BACK_NO_TRANSMISSION_OUTCOMES = (
|
| + interfaces.Outcome.SERVICER_FAILURE,
|
| + )
|
| +_BACK_TO_FRONT_NO_TRANSMISSION_OUTCOMES = (
|
| + interfaces.Outcome.CANCELLED,
|
| + interfaces.Outcome.SERVICED_FAILURE,
|
| + )
|
| +
|
| +_ABORTION_OUTCOME_TO_FRONT_TO_BACK_TICKET_KIND = {
|
| + interfaces.Outcome.CANCELLED:
|
| + interfaces.FrontToBackTicket.Kind.CANCELLATION,
|
| + interfaces.Outcome.EXPIRED:
|
| + interfaces.FrontToBackTicket.Kind.EXPIRATION,
|
| + interfaces.Outcome.RECEPTION_FAILURE:
|
| + interfaces.FrontToBackTicket.Kind.RECEPTION_FAILURE,
|
| + interfaces.Outcome.TRANSMISSION_FAILURE:
|
| + interfaces.FrontToBackTicket.Kind.TRANSMISSION_FAILURE,
|
| + interfaces.Outcome.SERVICED_FAILURE:
|
| + interfaces.FrontToBackTicket.Kind.SERVICED_FAILURE,
|
| + interfaces.Outcome.SERVICER_FAILURE:
|
| + interfaces.FrontToBackTicket.Kind.SERVICER_FAILURE,
|
| +}
|
| +
|
| +_ABORTION_OUTCOME_TO_BACK_TO_FRONT_TICKET_KIND = {
|
| + interfaces.Outcome.CANCELLED:
|
| + interfaces.BackToFrontTicket.Kind.CANCELLATION,
|
| + interfaces.Outcome.EXPIRED:
|
| + interfaces.BackToFrontTicket.Kind.EXPIRATION,
|
| + interfaces.Outcome.RECEPTION_FAILURE:
|
| + interfaces.BackToFrontTicket.Kind.RECEPTION_FAILURE,
|
| + interfaces.Outcome.TRANSMISSION_FAILURE:
|
| + interfaces.BackToFrontTicket.Kind.TRANSMISSION_FAILURE,
|
| + interfaces.Outcome.SERVICED_FAILURE:
|
| + interfaces.BackToFrontTicket.Kind.SERVICED_FAILURE,
|
| + interfaces.Outcome.SERVICER_FAILURE:
|
| + interfaces.BackToFrontTicket.Kind.SERVICER_FAILURE,
|
| +}
|
| +
|
| +
|
| +class _Ticketizer(object):
|
| + """Common specification of different ticket-creating behavior."""
|
| + __metaclass__ = abc.ABCMeta
|
| +
|
| + @abc.abstractmethod
|
| + def ticketize(self, operation_id, sequence_number, payload, complete):
|
| + """Creates a ticket indicating ordinary operation progress.
|
| +
|
| + Args:
|
| + operation_id: The operation ID for the current operation.
|
| + sequence_number: A sequence number for the ticket.
|
| + payload: A customer payload object. May be None if sequence_number is
|
| + zero or complete is true.
|
| + complete: A boolean indicating whether or not the ticket should describe
|
| + itself as (but for a later indication of operation abortion) the last
|
| + ticket to be sent.
|
| +
|
| + Returns:
|
| + An object of an appropriate type suitable for transmission to the other
|
| + side of the operation.
|
| + """
|
| + raise NotImplementedError()
|
| +
|
| + @abc.abstractmethod
|
| + def ticketize_abortion(self, operation_id, sequence_number, outcome):
|
| + """Creates a ticket indicating that the operation is aborted.
|
| +
|
| + Args:
|
| + operation_id: The operation ID for the current operation.
|
| + sequence_number: A sequence number for the ticket.
|
| + outcome: An interfaces.Outcome value describing the operation abortion.
|
| +
|
| + Returns:
|
| + An object of an appropriate type suitable for transmission to the other
|
| + side of the operation, or None if transmission is not appropriate for
|
| + the given outcome.
|
| + """
|
| + raise NotImplementedError()
|
| +
|
| +
|
| +class _FrontTicketizer(_Ticketizer):
|
| + """Front-side ticket-creating behavior."""
|
| +
|
| + def __init__(self, name, subscription_kind, trace_id, timeout):
|
| + """Constructor.
|
| +
|
| + Args:
|
| + name: The name of the operation.
|
| + subscription_kind: An interfaces.ServicedSubscription.Kind value
|
| + describing the interest the front has in tickets sent from the back.
|
| + trace_id: A uuid.UUID identifying a set of related operations to which
|
| + this operation belongs.
|
| + timeout: A length of time in seconds to allow for the entire operation.
|
| + """
|
| + self._name = name
|
| + self._subscription_kind = subscription_kind
|
| + self._trace_id = trace_id
|
| + self._timeout = timeout
|
| +
|
| + def ticketize(self, operation_id, sequence_number, payload, complete):
|
| + """See _Ticketizer.ticketize for specification."""
|
| + if sequence_number:
|
| + if complete:
|
| + kind = interfaces.FrontToBackTicket.Kind.COMPLETION
|
| + else:
|
| + kind = interfaces.FrontToBackTicket.Kind.CONTINUATION
|
| + return interfaces.FrontToBackTicket(
|
| + operation_id, sequence_number, kind, self._name,
|
| + self._subscription_kind, self._trace_id, payload, self._timeout)
|
| + else:
|
| + if complete:
|
| + kind = interfaces.FrontToBackTicket.Kind.ENTIRE
|
| + else:
|
| + kind = interfaces.FrontToBackTicket.Kind.COMMENCEMENT
|
| + return interfaces.FrontToBackTicket(
|
| + operation_id, 0, kind, self._name, self._subscription_kind,
|
| + self._trace_id, payload, self._timeout)
|
| +
|
| + def ticketize_abortion(self, operation_id, sequence_number, outcome):
|
| + """See _Ticketizer.ticketize_abortion for specification."""
|
| + if outcome in _FRONT_TO_BACK_NO_TRANSMISSION_OUTCOMES:
|
| + return None
|
| + else:
|
| + kind = _ABORTION_OUTCOME_TO_FRONT_TO_BACK_TICKET_KIND[outcome]
|
| + return interfaces.FrontToBackTicket(
|
| + operation_id, sequence_number, kind, None, None, None, None, None)
|
| +
|
| +
|
| +class _BackTicketizer(_Ticketizer):
|
| + """Back-side ticket-creating behavior."""
|
| +
|
| + def ticketize(self, operation_id, sequence_number, payload, complete):
|
| + """See _Ticketizer.ticketize for specification."""
|
| + if complete:
|
| + kind = interfaces.BackToFrontTicket.Kind.COMPLETION
|
| + else:
|
| + kind = interfaces.BackToFrontTicket.Kind.CONTINUATION
|
| + return interfaces.BackToFrontTicket(
|
| + operation_id, sequence_number, kind, payload)
|
| +
|
| + def ticketize_abortion(self, operation_id, sequence_number, outcome):
|
| + """See _Ticketizer.ticketize_abortion for specification."""
|
| + if outcome in _BACK_TO_FRONT_NO_TRANSMISSION_OUTCOMES:
|
| + return None
|
| + else:
|
| + kind = _ABORTION_OUTCOME_TO_BACK_TO_FRONT_TICKET_KIND[outcome]
|
| + return interfaces.BackToFrontTicket(
|
| + operation_id, sequence_number, kind, None)
|
| +
|
| +
|
| +class TransmissionManager(_interfaces.TransmissionManager):
|
| + """A _interfaces.TransmissionManager on which other managers may be set."""
|
| + __metaclass__ = abc.ABCMeta
|
| +
|
| + @abc.abstractmethod
|
| + def set_ingestion_and_expiration_managers(
|
| + self, ingestion_manager, expiration_manager):
|
| + """Sets two of the other managers with which this manager may interact.
|
| +
|
| + Args:
|
| + ingestion_manager: The _interfaces.IngestionManager associated with the
|
| + current operation.
|
| + expiration_manager: The _interfaces.ExpirationManager associated with the
|
| + current operation.
|
| + """
|
| + raise NotImplementedError()
|
| +
|
| +
|
| +class _EmptyTransmissionManager(TransmissionManager):
|
| + """A completely no-operative _interfaces.TransmissionManager."""
|
| +
|
| + def set_ingestion_and_expiration_managers(
|
| + self, ingestion_manager, expiration_manager):
|
| + """See overriden method for specification."""
|
| +
|
| + def inmit(self, emission, complete):
|
| + """See _interfaces.TransmissionManager.inmit for specification."""
|
| +
|
| + def abort(self, outcome):
|
| + """See _interfaces.TransmissionManager.abort for specification."""
|
| +
|
| +
|
| +class _TransmittingTransmissionManager(TransmissionManager):
|
| + """A TransmissionManager implementation that sends tickets."""
|
| +
|
| + def __init__(
|
| + self, lock, pool, callback, operation_id, ticketizer,
|
| + termination_manager):
|
| + """Constructor.
|
| +
|
| + Args:
|
| + lock: The operation-servicing-wide lock object.
|
| + pool: A thread pool in which the work of transmitting tickets will be
|
| + performed.
|
| + callback: A callable that accepts tickets and sends them to the other side
|
| + of the operation.
|
| + operation_id: The operation's ID.
|
| + ticketizer: A _Ticketizer for ticket creation.
|
| + termination_manager: The _interfaces.TerminationManager associated with
|
| + this operation.
|
| + """
|
| + self._lock = lock
|
| + self._pool = pool
|
| + self._callback = callback
|
| + self._operation_id = operation_id
|
| + self._ticketizer = ticketizer
|
| + self._termination_manager = termination_manager
|
| + self._ingestion_manager = None
|
| + self._expiration_manager = None
|
| +
|
| + self._emissions = []
|
| + self._emission_complete = False
|
| + self._outcome = None
|
| + self._lowest_unused_sequence_number = 0
|
| + self._transmitting = False
|
| +
|
| + def set_ingestion_and_expiration_managers(
|
| + self, ingestion_manager, expiration_manager):
|
| + """See overridden method for specification."""
|
| + self._ingestion_manager = ingestion_manager
|
| + self._expiration_manager = expiration_manager
|
| +
|
| + def _lead_ticket(self, emission, complete):
|
| + """Creates a ticket suitable for leading off the transmission loop.
|
| +
|
| + Args:
|
| + emission: A customer payload object to be sent to the other side of the
|
| + operation.
|
| + complete: Whether or not the sequence of customer payloads ends with
|
| + the passed object.
|
| +
|
| + Returns:
|
| + A ticket with which to lead off the transmission loop.
|
| + """
|
| + sequence_number = self._lowest_unused_sequence_number
|
| + self._lowest_unused_sequence_number += 1
|
| + return self._ticketizer.ticketize(
|
| + self._operation_id, sequence_number, emission, complete)
|
| +
|
| + def _abortive_response_ticket(self, outcome):
|
| + """Creates a ticket indicating operation abortion.
|
| +
|
| + Args:
|
| + outcome: An interfaces.Outcome value describing operation abortion.
|
| +
|
| + Returns:
|
| + A ticket indicating operation abortion.
|
| + """
|
| + ticket = self._ticketizer.ticketize_abortion(
|
| + self._operation_id, self._lowest_unused_sequence_number, outcome)
|
| + if ticket is None:
|
| + return None
|
| + else:
|
| + self._lowest_unused_sequence_number += 1
|
| + return ticket
|
| +
|
| + def _next_ticket(self):
|
| + """Creates the next ticket to be sent to the other side of the operation.
|
| +
|
| + Returns:
|
| + A (completed, ticket) tuple comprised of a boolean indicating whether or
|
| + not the sequence of tickets has completed normally and a ticket to send
|
| + to the other side if the sequence of tickets hasn't completed. The tuple
|
| + will never have both a True first element and a non-None second element.
|
| + """
|
| + if self._emissions is None:
|
| + return False, None
|
| + elif self._outcome is None:
|
| + if self._emissions:
|
| + payload = self._emissions.pop(0)
|
| + complete = self._emission_complete and not self._emissions
|
| + sequence_number = self._lowest_unused_sequence_number
|
| + self._lowest_unused_sequence_number += 1
|
| + return complete, self._ticketizer.ticketize(
|
| + self._operation_id, sequence_number, payload, complete)
|
| + else:
|
| + return self._emission_complete, None
|
| + else:
|
| + ticket = self._abortive_response_ticket(self._outcome)
|
| + self._emissions = None
|
| + return False, None if ticket is None else ticket
|
| +
|
| + def _transmit(self, ticket):
|
| + """Commences the transmission loop sending tickets.
|
| +
|
| + Args:
|
| + ticket: A ticket to be sent to the other side of the operation.
|
| + """
|
| + def transmit(ticket):
|
| + while True:
|
| + transmission_outcome = callable_util.call_logging_exceptions(
|
| + self._callback, _TRANSMISSION_EXCEPTION_LOG_MESSAGE, ticket)
|
| + if transmission_outcome.exception is None:
|
| + with self._lock:
|
| + complete, ticket = self._next_ticket()
|
| + if ticket is None:
|
| + if complete:
|
| + self._termination_manager.transmission_complete()
|
| + self._transmitting = False
|
| + return
|
| + else:
|
| + with self._lock:
|
| + self._emissions = None
|
| + self._termination_manager.abort(
|
| + interfaces.Outcome.TRANSMISSION_FAILURE)
|
| + self._ingestion_manager.abort()
|
| + self._expiration_manager.abort()
|
| + self._transmitting = False
|
| + return
|
| +
|
| + self._pool.submit(callable_util.with_exceptions_logged(
|
| + transmit, _constants.INTERNAL_ERROR_LOG_MESSAGE), ticket)
|
| + self._transmitting = True
|
| +
|
| + def inmit(self, emission, complete):
|
| + """See _interfaces.TransmissionManager.inmit for specification."""
|
| + if self._emissions is not None and self._outcome is None:
|
| + self._emission_complete = complete
|
| + if self._transmitting:
|
| + self._emissions.append(emission)
|
| + else:
|
| + self._transmit(self._lead_ticket(emission, complete))
|
| +
|
| + def abort(self, outcome):
|
| + """See _interfaces.TransmissionManager.abort for specification."""
|
| + if self._emissions is not None and self._outcome is None:
|
| + self._outcome = outcome
|
| + if not self._transmitting:
|
| + ticket = self._abortive_response_ticket(outcome)
|
| + self._emissions = None
|
| + if ticket is not None:
|
| + self._transmit(ticket)
|
| +
|
| +
|
| +def front_transmission_manager(
|
| + lock, pool, callback, operation_id, name, subscription_kind, trace_id,
|
| + timeout, termination_manager):
|
| + """Creates a TransmissionManager appropriate for front-side use.
|
| +
|
| + Args:
|
| + lock: The operation-servicing-wide lock object.
|
| + pool: A thread pool in which the work of transmitting tickets will be
|
| + performed.
|
| + callback: A callable that accepts tickets and sends them to the other side
|
| + of the operation.
|
| + operation_id: The operation's ID.
|
| + name: The name of the operation.
|
| + subscription_kind: An interfaces.ServicedSubscription.Kind value
|
| + describing the interest the front has in tickets sent from the back.
|
| + trace_id: A uuid.UUID identifying a set of related operations to which
|
| + this operation belongs.
|
| + timeout: A length of time in seconds to allow for the entire operation.
|
| + termination_manager: The _interfaces.TerminationManager associated with
|
| + this operation.
|
| +
|
| + Returns:
|
| + A TransmissionManager appropriate for front-side use.
|
| + """
|
| + return _TransmittingTransmissionManager(
|
| + lock, pool, callback, operation_id, _FrontTicketizer(
|
| + name, subscription_kind, trace_id, timeout),
|
| + termination_manager)
|
| +
|
| +
|
| +def back_transmission_manager(
|
| + lock, pool, callback, operation_id, termination_manager,
|
| + subscription_kind):
|
| + """Creates a TransmissionManager appropriate for back-side use.
|
| +
|
| + Args:
|
| + lock: The operation-servicing-wide lock object.
|
| + pool: A thread pool in which the work of transmitting tickets will be
|
| + performed.
|
| + callback: A callable that accepts tickets and sends them to the other side
|
| + of the operation.
|
| + operation_id: The operation's ID.
|
| + termination_manager: The _interfaces.TerminationManager associated with
|
| + this operation.
|
| + subscription_kind: An interfaces.ServicedSubscription.Kind value
|
| + describing the interest the front has in tickets sent from the back.
|
| +
|
| + Returns:
|
| + A TransmissionManager appropriate for back-side use.
|
| + """
|
| + if subscription_kind is interfaces.ServicedSubscription.Kind.NONE:
|
| + return _EmptyTransmissionManager()
|
| + else:
|
| + return _TransmittingTransmissionManager(
|
| + lock, pool, callback, operation_id, _BackTicketizer(),
|
| + termination_manager)
|
|
|