Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3)

Unified Diff: third_party/grpc/src/python/grpcio/grpc/_links/invocation.py

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/grpc/src/python/grpcio/grpc/_links/invocation.py
diff --git a/third_party/grpc/src/python/grpcio/grpc/_links/invocation.py b/third_party/grpc/src/python/grpcio/grpc/_links/invocation.py
new file mode 100644
index 0000000000000000000000000000000000000000..5ca0a0ee606810dd3ec3070e323a989f61ff022b
--- /dev/null
+++ b/third_party/grpc/src/python/grpcio/grpc/_links/invocation.py
@@ -0,0 +1,452 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""The RPC-invocation-side bridge between RPC Framework and GRPC-on-the-wire."""
+
+import abc
+import enum
+import logging
+import threading
+import time
+
+from grpc._adapter import _intermediary_low
+from grpc._links import _constants
+from grpc.beta import interfaces as beta_interfaces
+from grpc.framework.foundation import activated
+from grpc.framework.foundation import logging_pool
+from grpc.framework.foundation import relay
+from grpc.framework.interfaces.links import links
+
+_IDENTITY = lambda x: x
+
+_STOP = _intermediary_low.Event.Kind.STOP
+_WRITE = _intermediary_low.Event.Kind.WRITE_ACCEPTED
+_COMPLETE = _intermediary_low.Event.Kind.COMPLETE_ACCEPTED
+_READ = _intermediary_low.Event.Kind.READ_ACCEPTED
+_METADATA = _intermediary_low.Event.Kind.METADATA_ACCEPTED
+_FINISH = _intermediary_low.Event.Kind.FINISH
+
+
+@enum.unique
+class _Read(enum.Enum):
+ AWAITING_METADATA = 'awaiting metadata'
+ READING = 'reading'
+ AWAITING_ALLOWANCE = 'awaiting allowance'
+ CLOSED = 'closed'
+
+
+@enum.unique
+class _HighWrite(enum.Enum):
+ OPEN = 'open'
+ CLOSED = 'closed'
+
+
+@enum.unique
+class _LowWrite(enum.Enum):
+ OPEN = 'OPEN'
+ ACTIVE = 'ACTIVE'
+ CLOSED = 'CLOSED'
+
+
+class _Context(beta_interfaces.GRPCInvocationContext):
+
+ def __init__(self):
+ self._lock = threading.Lock()
+ self._disable_next_compression = False
+
+ def disable_next_request_compression(self):
+ with self._lock:
+ self._disable_next_compression = True
+
+ def next_compression_disabled(self):
+ with self._lock:
+ disabled = self._disable_next_compression
+ self._disable_next_compression = False
+ return disabled
+
+
+class _RPCState(object):
+
+ def __init__(
+ self, call, request_serializer, response_deserializer, sequence_number,
+ read, allowance, high_write, low_write, due, context):
+ self.call = call
+ self.request_serializer = request_serializer
+ self.response_deserializer = response_deserializer
+ self.sequence_number = sequence_number
+ self.read = read
+ self.allowance = allowance
+ self.high_write = high_write
+ self.low_write = low_write
+ self.due = due
+ self.context = context
+
+
+def _no_longer_due(kind, rpc_state, key, rpc_states):
+ rpc_state.due.remove(kind)
+ if not rpc_state.due:
+ del rpc_states[key]
+
+
+class _Kernel(object):
+
+ def __init__(
+ self, channel, host, metadata_transformer, request_serializers,
+ response_deserializers, ticket_relay):
+ self._lock = threading.Lock()
+ self._channel = channel
+ self._host = host
+ self._metadata_transformer = metadata_transformer
+ self._request_serializers = request_serializers
+ self._response_deserializers = response_deserializers
+ self._relay = ticket_relay
+
+ self._completion_queue = None
+ self._rpc_states = {}
+ self._pool = None
+
+ def _on_write_event(self, operation_id, unused_event, rpc_state):
+ if rpc_state.high_write is _HighWrite.CLOSED:
+ rpc_state.call.complete(operation_id)
+ rpc_state.due.add(_COMPLETE)
+ rpc_state.due.remove(_WRITE)
+ rpc_state.low_write = _LowWrite.CLOSED
+ else:
+ ticket = links.Ticket(
+ operation_id, rpc_state.sequence_number, None, None, None, None, 1,
+ None, None, None, None, None, None, None)
+ rpc_state.sequence_number += 1
+ self._relay.add_value(ticket)
+ rpc_state.low_write = _LowWrite.OPEN
+ _no_longer_due(_WRITE, rpc_state, operation_id, self._rpc_states)
+
+ def _on_read_event(self, operation_id, event, rpc_state):
+ if event.bytes is None or _FINISH not in rpc_state.due:
+ rpc_state.read = _Read.CLOSED
+ _no_longer_due(_READ, rpc_state, operation_id, self._rpc_states)
+ else:
+ if 0 < rpc_state.allowance:
+ rpc_state.allowance -= 1
+ rpc_state.call.read(operation_id)
+ else:
+ rpc_state.read = _Read.AWAITING_ALLOWANCE
+ _no_longer_due(_READ, rpc_state, operation_id, self._rpc_states)
+ ticket = links.Ticket(
+ operation_id, rpc_state.sequence_number, None, None, None, None, None,
+ None, rpc_state.response_deserializer(event.bytes), None, None, None,
+ None, None)
+ rpc_state.sequence_number += 1
+ self._relay.add_value(ticket)
+
+ def _on_metadata_event(self, operation_id, event, rpc_state):
+ if _FINISH in rpc_state.due:
+ rpc_state.allowance -= 1
+ rpc_state.call.read(operation_id)
+ rpc_state.read = _Read.READING
+ rpc_state.due.add(_READ)
+ rpc_state.due.remove(_METADATA)
+ ticket = links.Ticket(
+ operation_id, rpc_state.sequence_number, None, None,
+ links.Ticket.Subscription.FULL, None, None, event.metadata, None,
+ None, None, None, None, None)
+ rpc_state.sequence_number += 1
+ self._relay.add_value(ticket)
+ else:
+ _no_longer_due(_METADATA, rpc_state, operation_id, self._rpc_states)
+
+ def _on_finish_event(self, operation_id, event, rpc_state):
+ _no_longer_due(_FINISH, rpc_state, operation_id, self._rpc_states)
+ if event.status.code == _intermediary_low.Code.OK:
+ termination = links.Ticket.Termination.COMPLETION
+ elif event.status.code == _intermediary_low.Code.CANCELLED:
+ termination = links.Ticket.Termination.CANCELLATION
+ elif event.status.code == _intermediary_low.Code.DEADLINE_EXCEEDED:
+ termination = links.Ticket.Termination.EXPIRATION
+ elif event.status.code == _intermediary_low.Code.UNIMPLEMENTED:
+ termination = links.Ticket.Termination.REMOTE_FAILURE
+ elif event.status.code == _intermediary_low.Code.UNKNOWN:
+ termination = links.Ticket.Termination.LOCAL_FAILURE
+ else:
+ termination = links.Ticket.Termination.TRANSMISSION_FAILURE
+ code = _constants.LOW_STATUS_CODE_TO_HIGH_STATUS_CODE[event.status.code]
+ ticket = links.Ticket(
+ operation_id, rpc_state.sequence_number, None, None, None, None, None,
+ None, None, event.metadata, code, event.status.details, termination,
+ None)
+ rpc_state.sequence_number += 1
+ self._relay.add_value(ticket)
+
+ def _spin(self, completion_queue):
+ while True:
+ event = completion_queue.get(None)
+ with self._lock:
+ rpc_state = self._rpc_states.get(event.tag, None)
+ if event.kind is _STOP:
+ pass
+ elif event.kind is _WRITE:
+ self._on_write_event(event.tag, event, rpc_state)
+ elif event.kind is _METADATA:
+ self._on_metadata_event(event.tag, event, rpc_state)
+ elif event.kind is _READ:
+ self._on_read_event(event.tag, event, rpc_state)
+ elif event.kind is _FINISH:
+ self._on_finish_event(event.tag, event, rpc_state)
+ elif event.kind is _COMPLETE:
+ _no_longer_due(_COMPLETE, rpc_state, event.tag, self._rpc_states)
+ else:
+ logging.error('Illegal RPC event! %s', (event,))
+
+ if self._completion_queue is None and not self._rpc_states:
+ completion_queue.stop()
+ return
+
+ def _invoke(
+ self, operation_id, group, method, initial_metadata, payload, termination,
+ timeout, allowance, options):
+ """Invoke an RPC.
+
+ Args:
+ operation_id: Any object to be used as an operation ID for the RPC.
+ group: The group to which the RPC method belongs.
+ method: The RPC method name.
+ initial_metadata: The initial metadata object for the RPC.
+ payload: A payload object for the RPC or None if no payload was given at
+ invocation-time.
+ termination: A links.Ticket.Termination value or None indicated whether or
+ not more writes will follow from this side of the RPC.
+ timeout: A duration of time in seconds to allow for the RPC.
+ allowance: The number of payloads (beyond the free first one) that the
+ local ticket exchange mate has granted permission to be read.
+ options: A beta_interfaces.GRPCCallOptions value or None.
+ """
+ if termination is links.Ticket.Termination.COMPLETION:
+ high_write = _HighWrite.CLOSED
+ elif termination is None:
+ high_write = _HighWrite.OPEN
+ else:
+ return
+
+ transformed_initial_metadata = self._metadata_transformer(initial_metadata)
+ request_serializer = self._request_serializers.get(
+ (group, method), _IDENTITY)
+ response_deserializer = self._response_deserializers.get(
+ (group, method), _IDENTITY)
+
+ call = _intermediary_low.Call(
+ self._channel, self._completion_queue, '/%s/%s' % (group, method),
+ self._host, time.time() + timeout)
+ if options is not None and options.credentials is not None:
+ call.set_credentials(options.credentials._low_credentials)
+ if transformed_initial_metadata is not None:
+ for metadata_key, metadata_value in transformed_initial_metadata:
+ call.add_metadata(metadata_key, metadata_value)
+ call.invoke(self._completion_queue, operation_id, operation_id)
+ if payload is None:
+ if high_write is _HighWrite.CLOSED:
+ call.complete(operation_id)
+ low_write = _LowWrite.CLOSED
+ due = set((_METADATA, _COMPLETE, _FINISH,))
+ else:
+ low_write = _LowWrite.OPEN
+ due = set((_METADATA, _FINISH,))
+ else:
+ if options is not None and options.disable_compression:
+ flags = _intermediary_low.WriteFlags.WRITE_NO_COMPRESS
+ else:
+ flags = 0
+ call.write(request_serializer(payload), operation_id, flags)
+ low_write = _LowWrite.ACTIVE
+ due = set((_WRITE, _METADATA, _FINISH,))
+ context = _Context()
+ self._rpc_states[operation_id] = _RPCState(
+ call, request_serializer, response_deserializer, 1,
+ _Read.AWAITING_METADATA, 1 if allowance is None else (1 + allowance),
+ high_write, low_write, due, context)
+ protocol = links.Protocol(links.Protocol.Kind.INVOCATION_CONTEXT, context)
+ ticket = links.Ticket(
+ operation_id, 0, None, None, None, None, None, None, None, None, None,
+ None, None, protocol)
+ self._relay.add_value(ticket)
+
+ def _advance(self, operation_id, rpc_state, payload, termination, allowance):
+ if payload is not None:
+ disable_compression = rpc_state.context.next_compression_disabled()
+ if disable_compression:
+ flags = _intermediary_low.WriteFlags.WRITE_NO_COMPRESS
+ else:
+ flags = 0
+ rpc_state.call.write(
+ rpc_state.request_serializer(payload), operation_id, flags)
+ rpc_state.low_write = _LowWrite.ACTIVE
+ rpc_state.due.add(_WRITE)
+
+ if allowance is not None:
+ if rpc_state.read is _Read.AWAITING_ALLOWANCE:
+ rpc_state.allowance += allowance - 1
+ rpc_state.call.read(operation_id)
+ rpc_state.read = _Read.READING
+ rpc_state.due.add(_READ)
+ else:
+ rpc_state.allowance += allowance
+
+ if termination is links.Ticket.Termination.COMPLETION:
+ rpc_state.high_write = _HighWrite.CLOSED
+ if rpc_state.low_write is _LowWrite.OPEN:
+ rpc_state.call.complete(operation_id)
+ rpc_state.due.add(_COMPLETE)
+ rpc_state.low_write = _LowWrite.CLOSED
+ elif termination is not None:
+ rpc_state.call.cancel()
+
+ def add_ticket(self, ticket):
+ with self._lock:
+ if ticket.sequence_number == 0:
+ if self._completion_queue is None:
+ logging.error('Received invocation ticket %s after stop!', ticket)
+ else:
+ if (ticket.protocol is not None and
+ ticket.protocol.kind is links.Protocol.Kind.CALL_OPTION):
+ grpc_call_options = ticket.protocol.value
+ else:
+ grpc_call_options = None
+ self._invoke(
+ ticket.operation_id, ticket.group, ticket.method,
+ ticket.initial_metadata, ticket.payload, ticket.termination,
+ ticket.timeout, ticket.allowance, grpc_call_options)
+ else:
+ rpc_state = self._rpc_states.get(ticket.operation_id)
+ if rpc_state is not None:
+ self._advance(
+ ticket.operation_id, rpc_state, ticket.payload,
+ ticket.termination, ticket.allowance)
+
+ def start(self):
+ """Starts this object.
+
+ This method must be called before attempting to exchange tickets with this
+ object.
+ """
+ with self._lock:
+ self._completion_queue = _intermediary_low.CompletionQueue()
+ self._pool = logging_pool.pool(1)
+ self._pool.submit(self._spin, self._completion_queue)
+
+ def stop(self):
+ """Stops this object.
+
+ This method must be called for proper termination of this object, and no
+ attempts to exchange tickets with this object may be made after this method
+ has been called.
+ """
+ with self._lock:
+ if not self._rpc_states:
+ self._completion_queue.stop()
+ self._completion_queue = None
+ pool = self._pool
+ pool.shutdown(wait=True)
+
+
+class InvocationLink(links.Link, activated.Activated):
+ """A links.Link for use on the invocation-side of a gRPC connection.
+
+ Implementations of this interface are only valid for use when activated.
+ """
+ __metaclass__ = abc.ABCMeta
+
+
+class _InvocationLink(InvocationLink):
+
+ def __init__(
+ self, channel, host, metadata_transformer, request_serializers,
+ response_deserializers):
+ self._relay = relay.relay(None)
+ self._kernel = _Kernel(
+ channel, host,
+ _IDENTITY if metadata_transformer is None else metadata_transformer,
+ {} if request_serializers is None else request_serializers,
+ {} if response_deserializers is None else response_deserializers,
+ self._relay)
+
+ def _start(self):
+ self._relay.start()
+ self._kernel.start()
+ return self
+
+ def _stop(self):
+ self._kernel.stop()
+ self._relay.stop()
+
+ def accept_ticket(self, ticket):
+ """See links.Link.accept_ticket for specification."""
+ self._kernel.add_ticket(ticket)
+
+ def join_link(self, link):
+ """See links.Link.join_link for specification."""
+ self._relay.set_behavior(link.accept_ticket)
+
+ def __enter__(self):
+ """See activated.Activated.__enter__ for specification."""
+ return self._start()
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ """See activated.Activated.__exit__ for specification."""
+ self._stop()
+ return False
+
+ def start(self):
+ """See activated.Activated.start for specification."""
+ return self._start()
+
+ def stop(self):
+ """See activated.Activated.stop for specification."""
+ self._stop()
+
+
+def invocation_link(
+ channel, host, metadata_transformer, request_serializers,
+ response_deserializers):
+ """Creates an InvocationLink.
+
+ Args:
+ channel: An _intermediary_low.Channel for use by the link.
+ host: The host to specify when invoking RPCs.
+ metadata_transformer: A callable that takes an invocation-side initial
+ metadata value and returns another metadata value to send in its place.
+ May be None.
+ request_serializers: A dict from group-method pair to request object
+ serialization behavior.
+ response_deserializers: A dict from group-method pair to response object
+ deserialization behavior.
+
+ Returns:
+ An InvocationLink.
+ """
+ return _InvocationLink(
+ channel, host, metadata_transformer, request_serializers,
+ response_deserializers)

Powered by Google App Engine
This is Rietveld 408576698