Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(359)

Unified Diff: third_party/grpc/src/python/grpcio/grpc/framework/core/_transmission.py

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/grpc/src/python/grpcio/grpc/framework/core/_transmission.py
diff --git a/third_party/grpc/src/python/grpcio/grpc/framework/core/_transmission.py b/third_party/grpc/src/python/grpcio/grpc/framework/core/_transmission.py
new file mode 100644
index 0000000000000000000000000000000000000000..65b12c4160eb28778dc1ba0d1a7ff4055175de25
--- /dev/null
+++ b/third_party/grpc/src/python/grpcio/grpc/framework/core/_transmission.py
@@ -0,0 +1,335 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for ticket transmission during an operation."""
+
+import collections
+import enum
+
+from grpc.framework.core import _constants
+from grpc.framework.core import _interfaces
+from grpc.framework.core import _utilities
+from grpc.framework.foundation import callable_util
+from grpc.framework.interfaces.base import base
+from grpc.framework.interfaces.links import links
+
+_TRANSMISSION_EXCEPTION_LOG_MESSAGE = 'Exception during transmission!'
+
+_TRANSMISSION_FAILURE_OUTCOME = _utilities.Outcome(
+ base.Outcome.Kind.TRANSMISSION_FAILURE, None, None)
+
+
+def _explode_completion(completion):
+ if completion is None:
+ return None, None, None, None
+ else:
+ return (
+ completion.terminal_metadata, completion.code, completion.message,
+ links.Ticket.Termination.COMPLETION)
+
+
+class _Abort(
+ collections.namedtuple(
+ '_Abort', ('kind', 'termination', 'code', 'details',))):
+ """Tracks whether the operation aborted and what is to be done about it.
+
+ Attributes:
+ kind: A Kind value describing the overall kind of the _Abort.
+ termination: A links.Ticket.Termination value to be sent to the other side
+ of the operation. Only valid if kind is Kind.ABORTED_NOTIFY_NEEDED.
+ code: A code value to be sent to the other side of the operation. Only
+ valid if kind is Kind.ABORTED_NOTIFY_NEEDED.
+ details: A details value to be sent to the other side of the operation.
+ Only valid if kind is Kind.ABORTED_NOTIFY_NEEDED.
+ """
+
+ @enum.unique
+ class Kind(enum.Enum):
+ NOT_ABORTED = 'not aborted'
+ ABORTED_NOTIFY_NEEDED = 'aborted notify needed'
+ ABORTED_NO_NOTIFY = 'aborted no notify'
+
+_NOT_ABORTED = _Abort(_Abort.Kind.NOT_ABORTED, None, None, None)
+_ABORTED_NO_NOTIFY = _Abort(_Abort.Kind.ABORTED_NO_NOTIFY, None, None, None)
+
+
+class TransmissionManager(_interfaces.TransmissionManager):
+ """An _interfaces.TransmissionManager that sends links.Tickets."""
+
+ def __init__(
+ self, operation_id, ticket_sink, lock, pool, termination_manager):
+ """Constructor.
+
+ Args:
+ operation_id: The operation's ID.
+ ticket_sink: A callable that accepts tickets and sends them to the other
+ side of the operation.
+ lock: The operation-servicing-wide lock object.
+ pool: A thread pool in which the work of transmitting tickets will be
+ performed.
+ termination_manager: The _interfaces.TerminationManager associated with
+ this operation.
+ """
+ self._lock = lock
+ self._pool = pool
+ self._ticket_sink = ticket_sink
+ self._operation_id = operation_id
+ self._termination_manager = termination_manager
+ self._expiration_manager = None
+
+ self._lowest_unused_sequence_number = 0
+ self._remote_allowance = 1
+ self._remote_complete = False
+ self._timeout = None
+ self._local_allowance = 0
+ self._initial_metadata = None
+ self._payloads = []
+ self._completion = None
+ self._abort = _NOT_ABORTED
+ self._transmitting = False
+
+ def set_expiration_manager(self, expiration_manager):
+ """Sets the ExpirationManager with which this manager will cooperate."""
+ self._expiration_manager = expiration_manager
+
+ def _next_ticket(self):
+ """Creates the next ticket to be transmitted.
+
+ Returns:
+ A links.Ticket to be sent to the other side of the operation or None if
+ there is nothing to be sent at this time.
+ """
+ if self._abort.kind is _Abort.Kind.ABORTED_NO_NOTIFY:
+ return None
+ elif self._abort.kind is _Abort.Kind.ABORTED_NOTIFY_NEEDED:
+ termination = self._abort.termination
+ code, details = self._abort.code, self._abort.details
+ self._abort = _ABORTED_NO_NOTIFY
+ return links.Ticket(
+ self._operation_id, self._lowest_unused_sequence_number, None, None,
+ None, None, None, None, None, None, code, details, termination, None)
+
+ action = False
+ # TODO(nathaniel): Support other subscriptions.
+ local_subscription = links.Ticket.Subscription.FULL
+ timeout = self._timeout
+ if timeout is not None:
+ self._timeout = None
+ action = True
+ if self._local_allowance <= 0:
+ allowance = None
+ else:
+ allowance = self._local_allowance
+ self._local_allowance = 0
+ action = True
+ initial_metadata = self._initial_metadata
+ if initial_metadata is not None:
+ self._initial_metadata = None
+ action = True
+ if not self._payloads or self._remote_allowance <= 0:
+ payload = None
+ else:
+ payload = self._payloads.pop(0)
+ self._remote_allowance -= 1
+ action = True
+ if self._completion is None or self._payloads:
+ terminal_metadata, code, message, termination = None, None, None, None
+ else:
+ terminal_metadata, code, message, termination = _explode_completion(
+ self._completion)
+ self._completion = None
+ action = True
+
+ if action:
+ ticket = links.Ticket(
+ self._operation_id, self._lowest_unused_sequence_number, None, None,
+ local_subscription, timeout, allowance, initial_metadata, payload,
+ terminal_metadata, code, message, termination, None)
+ self._lowest_unused_sequence_number += 1
+ return ticket
+ else:
+ return None
+
+ def _transmit(self, ticket):
+ """Commences the transmission loop sending tickets.
+
+ Args:
+ ticket: A links.Ticket to be sent to the other side of the operation.
+ """
+ def transmit(ticket):
+ while True:
+ transmission_outcome = callable_util.call_logging_exceptions(
+ self._ticket_sink, _TRANSMISSION_EXCEPTION_LOG_MESSAGE, ticket)
+ if transmission_outcome.exception is None:
+ with self._lock:
+ if ticket.termination is links.Ticket.Termination.COMPLETION:
+ self._termination_manager.transmission_complete()
+ ticket = self._next_ticket()
+ if ticket is None:
+ self._transmitting = False
+ return
+ else:
+ with self._lock:
+ self._abort = _ABORTED_NO_NOTIFY
+ if self._termination_manager.outcome is None:
+ self._termination_manager.abort(_TRANSMISSION_FAILURE_OUTCOME)
+ self._expiration_manager.terminate()
+ return
+
+ self._pool.submit(callable_util.with_exceptions_logged(
+ transmit, _constants.INTERNAL_ERROR_LOG_MESSAGE), ticket)
+ self._transmitting = True
+
+ def kick_off(
+ self, group, method, timeout, protocol_options, initial_metadata,
+ payload, completion, allowance):
+ """See _interfaces.TransmissionManager.kickoff for specification."""
+ # TODO(nathaniel): Support other subscriptions.
+ subscription = links.Ticket.Subscription.FULL
+ terminal_metadata, code, message, termination = _explode_completion(
+ completion)
+ self._remote_allowance = 1 if payload is None else 0
+ protocol = links.Protocol(links.Protocol.Kind.CALL_OPTION, protocol_options)
+ ticket = links.Ticket(
+ self._operation_id, 0, group, method, subscription, timeout, allowance,
+ initial_metadata, payload, terminal_metadata, code, message,
+ termination, protocol)
+ self._lowest_unused_sequence_number = 1
+ self._transmit(ticket)
+
+ def advance(self, initial_metadata, payload, completion, allowance):
+ """See _interfaces.TransmissionManager.advance for specification."""
+ if self._abort.kind is not _Abort.Kind.NOT_ABORTED:
+ return
+
+ effective_initial_metadata = initial_metadata
+ effective_payload = payload
+ effective_completion = completion
+ if allowance is not None and not self._remote_complete:
+ effective_allowance = allowance
+ else:
+ effective_allowance = None
+ if self._transmitting:
+ if effective_initial_metadata is not None:
+ self._initial_metadata = effective_initial_metadata
+ if effective_payload is not None:
+ self._payloads.append(effective_payload)
+ if effective_completion is not None:
+ self._completion = effective_completion
+ if effective_allowance is not None:
+ self._local_allowance += effective_allowance
+ else:
+ if effective_payload is not None:
+ if 0 < self._remote_allowance:
+ ticket_payload = effective_payload
+ self._remote_allowance -= 1
+ else:
+ self._payloads.append(effective_payload)
+ ticket_payload = None
+ else:
+ ticket_payload = None
+ if effective_completion is not None and not self._payloads:
+ ticket_completion = effective_completion
+ else:
+ self._completion = effective_completion
+ ticket_completion = None
+ if any(
+ (effective_initial_metadata, ticket_payload, ticket_completion,
+ effective_allowance)):
+ terminal_metadata, code, message, termination = _explode_completion(
+ completion)
+ ticket = links.Ticket(
+ self._operation_id, self._lowest_unused_sequence_number, None, None,
+ None, None, allowance, effective_initial_metadata, ticket_payload,
+ terminal_metadata, code, message, termination, None)
+ self._lowest_unused_sequence_number += 1
+ self._transmit(ticket)
+
+ def timeout(self, timeout):
+ """See _interfaces.TransmissionManager.timeout for specification."""
+ if self._abort.kind is not _Abort.Kind.NOT_ABORTED:
+ return
+ elif self._transmitting:
+ self._timeout = timeout
+ else:
+ ticket = links.Ticket(
+ self._operation_id, self._lowest_unused_sequence_number, None, None,
+ None, timeout, None, None, None, None, None, None, None, None)
+ self._lowest_unused_sequence_number += 1
+ self._transmit(ticket)
+
+ def allowance(self, allowance):
+ """See _interfaces.TransmissionManager.allowance for specification."""
+ if self._abort.kind is not _Abort.Kind.NOT_ABORTED:
+ return
+ elif self._transmitting or not self._payloads:
+ self._remote_allowance += allowance
+ else:
+ self._remote_allowance += allowance - 1
+ payload = self._payloads.pop(0)
+ if self._payloads:
+ completion = None
+ else:
+ completion = self._completion
+ self._completion = None
+ terminal_metadata, code, message, termination = _explode_completion(
+ completion)
+ ticket = links.Ticket(
+ self._operation_id, self._lowest_unused_sequence_number, None, None,
+ None, None, None, None, payload, terminal_metadata, code, message,
+ termination, None)
+ self._lowest_unused_sequence_number += 1
+ self._transmit(ticket)
+
+ def remote_complete(self):
+ """See _interfaces.TransmissionManager.remote_complete for specification."""
+ self._remote_complete = True
+ self._local_allowance = 0
+
+ def abort(self, outcome):
+ """See _interfaces.TransmissionManager.abort for specification."""
+ if self._abort.kind is _Abort.Kind.NOT_ABORTED:
+ if outcome is None:
+ self._abort = _ABORTED_NO_NOTIFY
+ else:
+ termination = _constants.ABORTION_OUTCOME_TO_TICKET_TERMINATION.get(
+ outcome.kind)
+ if termination is None:
+ self._abort = _ABORTED_NO_NOTIFY
+ elif self._transmitting:
+ self._abort = _Abort(
+ _Abort.Kind.ABORTED_NOTIFY_NEEDED, termination, outcome.code,
+ outcome.details)
+ else:
+ ticket = links.Ticket(
+ self._operation_id, self._lowest_unused_sequence_number, None,
+ None, None, None, None, None, None, None, outcome.code,
+ outcome.details, termination, None)
+ self._transmit(ticket)
+ self._abort = _ABORTED_NO_NOTIFY

Powered by Google App Engine
This is Rietveld 408576698