Index: third_party/grpc/src/python/grpcio/grpc/framework/core/_transmission.py |
diff --git a/third_party/grpc/src/python/grpcio/grpc/framework/core/_transmission.py b/third_party/grpc/src/python/grpcio/grpc/framework/core/_transmission.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..65b12c4160eb28778dc1ba0d1a7ff4055175de25 |
--- /dev/null |
+++ b/third_party/grpc/src/python/grpcio/grpc/framework/core/_transmission.py |
@@ -0,0 +1,335 @@ |
+# Copyright 2015, Google Inc. |
+# All rights reserved. |
+# |
+# Redistribution and use in source and binary forms, with or without |
+# modification, are permitted provided that the following conditions are |
+# met: |
+# |
+# * Redistributions of source code must retain the above copyright |
+# notice, this list of conditions and the following disclaimer. |
+# * Redistributions in binary form must reproduce the above |
+# copyright notice, this list of conditions and the following disclaimer |
+# in the documentation and/or other materials provided with the |
+# distribution. |
+# * Neither the name of Google Inc. nor the names of its |
+# contributors may be used to endorse or promote products derived from |
+# this software without specific prior written permission. |
+# |
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
+ |
+"""State and behavior for ticket transmission during an operation.""" |
+ |
+import collections |
+import enum |
+ |
+from grpc.framework.core import _constants |
+from grpc.framework.core import _interfaces |
+from grpc.framework.core import _utilities |
+from grpc.framework.foundation import callable_util |
+from grpc.framework.interfaces.base import base |
+from grpc.framework.interfaces.links import links |
+ |
+_TRANSMISSION_EXCEPTION_LOG_MESSAGE = 'Exception during transmission!' |
+ |
+_TRANSMISSION_FAILURE_OUTCOME = _utilities.Outcome( |
+ base.Outcome.Kind.TRANSMISSION_FAILURE, None, None) |
+ |
+ |
+def _explode_completion(completion): |
+ if completion is None: |
+ return None, None, None, None |
+ else: |
+ return ( |
+ completion.terminal_metadata, completion.code, completion.message, |
+ links.Ticket.Termination.COMPLETION) |
+ |
+ |
+class _Abort( |
+ collections.namedtuple( |
+ '_Abort', ('kind', 'termination', 'code', 'details',))): |
+ """Tracks whether the operation aborted and what is to be done about it. |
+ |
+ Attributes: |
+ kind: A Kind value describing the overall kind of the _Abort. |
+ termination: A links.Ticket.Termination value to be sent to the other side |
+ of the operation. Only valid if kind is Kind.ABORTED_NOTIFY_NEEDED. |
+ code: A code value to be sent to the other side of the operation. Only |
+ valid if kind is Kind.ABORTED_NOTIFY_NEEDED. |
+ details: A details value to be sent to the other side of the operation. |
+ Only valid if kind is Kind.ABORTED_NOTIFY_NEEDED. |
+ """ |
+ |
+ @enum.unique |
+ class Kind(enum.Enum): |
+ NOT_ABORTED = 'not aborted' |
+ ABORTED_NOTIFY_NEEDED = 'aborted notify needed' |
+ ABORTED_NO_NOTIFY = 'aborted no notify' |
+ |
+_NOT_ABORTED = _Abort(_Abort.Kind.NOT_ABORTED, None, None, None) |
+_ABORTED_NO_NOTIFY = _Abort(_Abort.Kind.ABORTED_NO_NOTIFY, None, None, None) |
+ |
+ |
+class TransmissionManager(_interfaces.TransmissionManager): |
+ """An _interfaces.TransmissionManager that sends links.Tickets.""" |
+ |
+ def __init__( |
+ self, operation_id, ticket_sink, lock, pool, termination_manager): |
+ """Constructor. |
+ |
+ Args: |
+ operation_id: The operation's ID. |
+ ticket_sink: A callable that accepts tickets and sends them to the other |
+ side of the operation. |
+ lock: The operation-servicing-wide lock object. |
+ pool: A thread pool in which the work of transmitting tickets will be |
+ performed. |
+ termination_manager: The _interfaces.TerminationManager associated with |
+ this operation. |
+ """ |
+ self._lock = lock |
+ self._pool = pool |
+ self._ticket_sink = ticket_sink |
+ self._operation_id = operation_id |
+ self._termination_manager = termination_manager |
+ self._expiration_manager = None |
+ |
+ self._lowest_unused_sequence_number = 0 |
+ self._remote_allowance = 1 |
+ self._remote_complete = False |
+ self._timeout = None |
+ self._local_allowance = 0 |
+ self._initial_metadata = None |
+ self._payloads = [] |
+ self._completion = None |
+ self._abort = _NOT_ABORTED |
+ self._transmitting = False |
+ |
+ def set_expiration_manager(self, expiration_manager): |
+ """Sets the ExpirationManager with which this manager will cooperate.""" |
+ self._expiration_manager = expiration_manager |
+ |
+ def _next_ticket(self): |
+ """Creates the next ticket to be transmitted. |
+ |
+ Returns: |
+ A links.Ticket to be sent to the other side of the operation or None if |
+ there is nothing to be sent at this time. |
+ """ |
+ if self._abort.kind is _Abort.Kind.ABORTED_NO_NOTIFY: |
+ return None |
+ elif self._abort.kind is _Abort.Kind.ABORTED_NOTIFY_NEEDED: |
+ termination = self._abort.termination |
+ code, details = self._abort.code, self._abort.details |
+ self._abort = _ABORTED_NO_NOTIFY |
+ return links.Ticket( |
+ self._operation_id, self._lowest_unused_sequence_number, None, None, |
+ None, None, None, None, None, None, code, details, termination, None) |
+ |
+ action = False |
+ # TODO(nathaniel): Support other subscriptions. |
+ local_subscription = links.Ticket.Subscription.FULL |
+ timeout = self._timeout |
+ if timeout is not None: |
+ self._timeout = None |
+ action = True |
+ if self._local_allowance <= 0: |
+ allowance = None |
+ else: |
+ allowance = self._local_allowance |
+ self._local_allowance = 0 |
+ action = True |
+ initial_metadata = self._initial_metadata |
+ if initial_metadata is not None: |
+ self._initial_metadata = None |
+ action = True |
+ if not self._payloads or self._remote_allowance <= 0: |
+ payload = None |
+ else: |
+ payload = self._payloads.pop(0) |
+ self._remote_allowance -= 1 |
+ action = True |
+ if self._completion is None or self._payloads: |
+ terminal_metadata, code, message, termination = None, None, None, None |
+ else: |
+ terminal_metadata, code, message, termination = _explode_completion( |
+ self._completion) |
+ self._completion = None |
+ action = True |
+ |
+ if action: |
+ ticket = links.Ticket( |
+ self._operation_id, self._lowest_unused_sequence_number, None, None, |
+ local_subscription, timeout, allowance, initial_metadata, payload, |
+ terminal_metadata, code, message, termination, None) |
+ self._lowest_unused_sequence_number += 1 |
+ return ticket |
+ else: |
+ return None |
+ |
+ def _transmit(self, ticket): |
+ """Commences the transmission loop sending tickets. |
+ |
+ Args: |
+ ticket: A links.Ticket to be sent to the other side of the operation. |
+ """ |
+ def transmit(ticket): |
+ while True: |
+ transmission_outcome = callable_util.call_logging_exceptions( |
+ self._ticket_sink, _TRANSMISSION_EXCEPTION_LOG_MESSAGE, ticket) |
+ if transmission_outcome.exception is None: |
+ with self._lock: |
+ if ticket.termination is links.Ticket.Termination.COMPLETION: |
+ self._termination_manager.transmission_complete() |
+ ticket = self._next_ticket() |
+ if ticket is None: |
+ self._transmitting = False |
+ return |
+ else: |
+ with self._lock: |
+ self._abort = _ABORTED_NO_NOTIFY |
+ if self._termination_manager.outcome is None: |
+ self._termination_manager.abort(_TRANSMISSION_FAILURE_OUTCOME) |
+ self._expiration_manager.terminate() |
+ return |
+ |
+ self._pool.submit(callable_util.with_exceptions_logged( |
+ transmit, _constants.INTERNAL_ERROR_LOG_MESSAGE), ticket) |
+ self._transmitting = True |
+ |
+ def kick_off( |
+ self, group, method, timeout, protocol_options, initial_metadata, |
+ payload, completion, allowance): |
+ """See _interfaces.TransmissionManager.kickoff for specification.""" |
+ # TODO(nathaniel): Support other subscriptions. |
+ subscription = links.Ticket.Subscription.FULL |
+ terminal_metadata, code, message, termination = _explode_completion( |
+ completion) |
+ self._remote_allowance = 1 if payload is None else 0 |
+ protocol = links.Protocol(links.Protocol.Kind.CALL_OPTION, protocol_options) |
+ ticket = links.Ticket( |
+ self._operation_id, 0, group, method, subscription, timeout, allowance, |
+ initial_metadata, payload, terminal_metadata, code, message, |
+ termination, protocol) |
+ self._lowest_unused_sequence_number = 1 |
+ self._transmit(ticket) |
+ |
+ def advance(self, initial_metadata, payload, completion, allowance): |
+ """See _interfaces.TransmissionManager.advance for specification.""" |
+ if self._abort.kind is not _Abort.Kind.NOT_ABORTED: |
+ return |
+ |
+ effective_initial_metadata = initial_metadata |
+ effective_payload = payload |
+ effective_completion = completion |
+ if allowance is not None and not self._remote_complete: |
+ effective_allowance = allowance |
+ else: |
+ effective_allowance = None |
+ if self._transmitting: |
+ if effective_initial_metadata is not None: |
+ self._initial_metadata = effective_initial_metadata |
+ if effective_payload is not None: |
+ self._payloads.append(effective_payload) |
+ if effective_completion is not None: |
+ self._completion = effective_completion |
+ if effective_allowance is not None: |
+ self._local_allowance += effective_allowance |
+ else: |
+ if effective_payload is not None: |
+ if 0 < self._remote_allowance: |
+ ticket_payload = effective_payload |
+ self._remote_allowance -= 1 |
+ else: |
+ self._payloads.append(effective_payload) |
+ ticket_payload = None |
+ else: |
+ ticket_payload = None |
+ if effective_completion is not None and not self._payloads: |
+ ticket_completion = effective_completion |
+ else: |
+ self._completion = effective_completion |
+ ticket_completion = None |
+ if any( |
+ (effective_initial_metadata, ticket_payload, ticket_completion, |
+ effective_allowance)): |
+ terminal_metadata, code, message, termination = _explode_completion( |
+ completion) |
+ ticket = links.Ticket( |
+ self._operation_id, self._lowest_unused_sequence_number, None, None, |
+ None, None, allowance, effective_initial_metadata, ticket_payload, |
+ terminal_metadata, code, message, termination, None) |
+ self._lowest_unused_sequence_number += 1 |
+ self._transmit(ticket) |
+ |
+ def timeout(self, timeout): |
+ """See _interfaces.TransmissionManager.timeout for specification.""" |
+ if self._abort.kind is not _Abort.Kind.NOT_ABORTED: |
+ return |
+ elif self._transmitting: |
+ self._timeout = timeout |
+ else: |
+ ticket = links.Ticket( |
+ self._operation_id, self._lowest_unused_sequence_number, None, None, |
+ None, timeout, None, None, None, None, None, None, None, None) |
+ self._lowest_unused_sequence_number += 1 |
+ self._transmit(ticket) |
+ |
+ def allowance(self, allowance): |
+ """See _interfaces.TransmissionManager.allowance for specification.""" |
+ if self._abort.kind is not _Abort.Kind.NOT_ABORTED: |
+ return |
+ elif self._transmitting or not self._payloads: |
+ self._remote_allowance += allowance |
+ else: |
+ self._remote_allowance += allowance - 1 |
+ payload = self._payloads.pop(0) |
+ if self._payloads: |
+ completion = None |
+ else: |
+ completion = self._completion |
+ self._completion = None |
+ terminal_metadata, code, message, termination = _explode_completion( |
+ completion) |
+ ticket = links.Ticket( |
+ self._operation_id, self._lowest_unused_sequence_number, None, None, |
+ None, None, None, None, payload, terminal_metadata, code, message, |
+ termination, None) |
+ self._lowest_unused_sequence_number += 1 |
+ self._transmit(ticket) |
+ |
+ def remote_complete(self): |
+ """See _interfaces.TransmissionManager.remote_complete for specification.""" |
+ self._remote_complete = True |
+ self._local_allowance = 0 |
+ |
+ def abort(self, outcome): |
+ """See _interfaces.TransmissionManager.abort for specification.""" |
+ if self._abort.kind is _Abort.Kind.NOT_ABORTED: |
+ if outcome is None: |
+ self._abort = _ABORTED_NO_NOTIFY |
+ else: |
+ termination = _constants.ABORTION_OUTCOME_TO_TICKET_TERMINATION.get( |
+ outcome.kind) |
+ if termination is None: |
+ self._abort = _ABORTED_NO_NOTIFY |
+ elif self._transmitting: |
+ self._abort = _Abort( |
+ _Abort.Kind.ABORTED_NOTIFY_NEEDED, termination, outcome.code, |
+ outcome.details) |
+ else: |
+ ticket = links.Ticket( |
+ self._operation_id, self._lowest_unused_sequence_number, None, |
+ None, None, None, None, None, None, None, outcome.code, |
+ outcome.details, termination, None) |
+ self._transmit(ticket) |
+ self._abort = _ABORTED_NO_NOTIFY |