Index: third_party/grpc/src/python/grpcio/grpc/_links/service.py |
diff --git a/third_party/grpc/src/python/grpcio/grpc/_links/service.py b/third_party/grpc/src/python/grpcio/grpc/_links/service.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..01edee6896186defca95aa10918dd5d1b5cfa293 |
--- /dev/null |
+++ b/third_party/grpc/src/python/grpcio/grpc/_links/service.py |
@@ -0,0 +1,505 @@ |
+# Copyright 2015, Google Inc. |
+# All rights reserved. |
+# |
+# Redistribution and use in source and binary forms, with or without |
+# modification, are permitted provided that the following conditions are |
+# met: |
+# |
+# * Redistributions of source code must retain the above copyright |
+# notice, this list of conditions and the following disclaimer. |
+# * Redistributions in binary form must reproduce the above |
+# copyright notice, this list of conditions and the following disclaimer |
+# in the documentation and/or other materials provided with the |
+# distribution. |
+# * Neither the name of Google Inc. nor the names of its |
+# contributors may be used to endorse or promote products derived from |
+# this software without specific prior written permission. |
+# |
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
+ |
+"""The RPC-service-side bridge between RPC Framework and GRPC-on-the-wire.""" |
+ |
+import abc |
+import enum |
+import logging |
+import threading |
+import time |
+ |
+from grpc._adapter import _intermediary_low |
+from grpc._links import _constants |
+from grpc.beta import interfaces as beta_interfaces |
+from grpc.framework.foundation import logging_pool |
+from grpc.framework.foundation import relay |
+from grpc.framework.interfaces.links import links |
+ |
+_IDENTITY = lambda x: x |
+ |
+_TERMINATION_KIND_TO_CODE = { |
+ links.Ticket.Termination.COMPLETION: _intermediary_low.Code.OK, |
+ links.Ticket.Termination.CANCELLATION: _intermediary_low.Code.CANCELLED, |
+ links.Ticket.Termination.EXPIRATION: |
+ _intermediary_low.Code.DEADLINE_EXCEEDED, |
+ links.Ticket.Termination.SHUTDOWN: _intermediary_low.Code.UNAVAILABLE, |
+ links.Ticket.Termination.RECEPTION_FAILURE: _intermediary_low.Code.INTERNAL, |
+ links.Ticket.Termination.TRANSMISSION_FAILURE: |
+ _intermediary_low.Code.INTERNAL, |
+ links.Ticket.Termination.LOCAL_FAILURE: _intermediary_low.Code.UNKNOWN, |
+ links.Ticket.Termination.REMOTE_FAILURE: _intermediary_low.Code.UNKNOWN, |
+} |
+ |
+_STOP = _intermediary_low.Event.Kind.STOP |
+_WRITE = _intermediary_low.Event.Kind.WRITE_ACCEPTED |
+_COMPLETE = _intermediary_low.Event.Kind.COMPLETE_ACCEPTED |
+_SERVICE = _intermediary_low.Event.Kind.SERVICE_ACCEPTED |
+_READ = _intermediary_low.Event.Kind.READ_ACCEPTED |
+_FINISH = _intermediary_low.Event.Kind.FINISH |
+ |
+ |
+@enum.unique |
+class _Read(enum.Enum): |
+ READING = 'reading' |
+ # TODO(issue 2916): This state will again be necessary after eliminating the |
+ # "early_read" field of _RPCState and going back to only reading when granted |
+ # allowance to read. |
+ # AWAITING_ALLOWANCE = 'awaiting allowance' |
+ CLOSED = 'closed' |
+ |
+ |
+@enum.unique |
+class _HighWrite(enum.Enum): |
+ OPEN = 'open' |
+ CLOSED = 'closed' |
+ |
+ |
+@enum.unique |
+class _LowWrite(enum.Enum): |
+ """The possible categories of low-level write state.""" |
+ |
+ OPEN = 'OPEN' |
+ ACTIVE = 'ACTIVE' |
+ CLOSED = 'CLOSED' |
+ |
+ |
+class _Context(beta_interfaces.GRPCServicerContext): |
+ |
+ def __init__(self, call): |
+ self._lock = threading.Lock() |
+ self._call = call |
+ self._disable_next_compression = False |
+ |
+ def peer(self): |
+ with self._lock: |
+ return self._call.peer() |
+ |
+ def disable_next_response_compression(self): |
+ with self._lock: |
+ self._disable_next_compression = True |
+ |
+ def next_compression_disabled(self): |
+ with self._lock: |
+ disabled = self._disable_next_compression |
+ self._disable_next_compression = False |
+ return disabled |
+ |
+ |
+class _RPCState(object): |
+ |
+ def __init__( |
+ self, request_deserializer, response_serializer, sequence_number, read, |
+ early_read, allowance, high_write, low_write, premetadataed, |
+ terminal_metadata, code, message, due, context): |
+ self.request_deserializer = request_deserializer |
+ self.response_serializer = response_serializer |
+ self.sequence_number = sequence_number |
+ self.read = read |
+ # TODO(issue 2916): Eliminate this by eliminating the necessity of calling |
+ # call.read just to advance the RPC. |
+ self.early_read = early_read # A raw (not deserialized) read. |
+ self.allowance = allowance |
+ self.high_write = high_write |
+ self.low_write = low_write |
+ self.premetadataed = premetadataed |
+ self.terminal_metadata = terminal_metadata |
+ self.code = code |
+ self.message = message |
+ self.due = due |
+ self.context = context |
+ |
+ |
+def _no_longer_due(kind, rpc_state, key, rpc_states): |
+ rpc_state.due.remove(kind) |
+ if not rpc_state.due: |
+ del rpc_states[key] |
+ |
+ |
+def _metadatafy(call, metadata): |
+ for metadata_key, metadata_value in metadata: |
+ call.add_metadata(metadata_key, metadata_value) |
+ |
+ |
+def _status(termination_kind, high_code, details): |
+ low_details = b'' if details is None else details |
+ if high_code is None: |
+ low_code = _TERMINATION_KIND_TO_CODE[termination_kind] |
+ else: |
+ low_code = _constants.HIGH_STATUS_CODE_TO_LOW_STATUS_CODE[high_code] |
+ return _intermediary_low.Status(low_code, low_details) |
+ |
+ |
+class _Kernel(object): |
+ |
+ def __init__(self, request_deserializers, response_serializers, ticket_relay): |
+ self._lock = threading.Lock() |
+ self._request_deserializers = request_deserializers |
+ self._response_serializers = response_serializers |
+ self._relay = ticket_relay |
+ |
+ self._completion_queue = None |
+ self._due = set() |
+ self._server = None |
+ self._rpc_states = {} |
+ self._pool = None |
+ |
+ def _on_service_acceptance_event(self, event, server): |
+ server.service(None) |
+ |
+ service_acceptance = event.service_acceptance |
+ call = service_acceptance.call |
+ call.accept(self._completion_queue, call) |
+ try: |
+ group, method = service_acceptance.method.split('/')[1:3] |
+ except ValueError: |
+ logging.info('Illegal path "%s"!', service_acceptance.method) |
+ return |
+ request_deserializer = self._request_deserializers.get( |
+ (group, method), _IDENTITY) |
+ response_serializer = self._response_serializers.get( |
+ (group, method), _IDENTITY) |
+ |
+ call.read(call) |
+ context = _Context(call) |
+ self._rpc_states[call] = _RPCState( |
+ request_deserializer, response_serializer, 1, _Read.READING, None, 1, |
+ _HighWrite.OPEN, _LowWrite.OPEN, False, None, None, None, |
+ set((_READ, _FINISH,)), context) |
+ protocol = links.Protocol(links.Protocol.Kind.SERVICER_CONTEXT, context) |
+ ticket = links.Ticket( |
+ call, 0, group, method, links.Ticket.Subscription.FULL, |
+ service_acceptance.deadline - time.time(), None, event.metadata, None, |
+ None, None, None, None, protocol) |
+ self._relay.add_value(ticket) |
+ |
+ def _on_read_event(self, event): |
+ call = event.tag |
+ rpc_state = self._rpc_states[call] |
+ |
+ if event.bytes is None: |
+ rpc_state.read = _Read.CLOSED |
+ payload = None |
+ termination = links.Ticket.Termination.COMPLETION |
+ _no_longer_due(_READ, rpc_state, call, self._rpc_states) |
+ else: |
+ if 0 < rpc_state.allowance: |
+ payload = rpc_state.request_deserializer(event.bytes) |
+ termination = None |
+ rpc_state.allowance -= 1 |
+ call.read(call) |
+ else: |
+ rpc_state.early_read = event.bytes |
+ _no_longer_due(_READ, rpc_state, call, self._rpc_states) |
+ return |
+ # TODO(issue 2916): Instead of returning: |
+ # rpc_state.read = _Read.AWAITING_ALLOWANCE |
+ ticket = links.Ticket( |
+ call, rpc_state.sequence_number, None, None, None, None, None, None, |
+ payload, None, None, None, termination, None) |
+ rpc_state.sequence_number += 1 |
+ self._relay.add_value(ticket) |
+ |
+ def _on_write_event(self, event): |
+ call = event.tag |
+ rpc_state = self._rpc_states[call] |
+ |
+ if rpc_state.high_write is _HighWrite.CLOSED: |
+ if rpc_state.terminal_metadata is not None: |
+ _metadatafy(call, rpc_state.terminal_metadata) |
+ status = _status( |
+ links.Ticket.Termination.COMPLETION, rpc_state.code, |
+ rpc_state.message) |
+ call.status(status, call) |
+ rpc_state.low_write = _LowWrite.CLOSED |
+ rpc_state.due.add(_COMPLETE) |
+ rpc_state.due.remove(_WRITE) |
+ else: |
+ ticket = links.Ticket( |
+ call, rpc_state.sequence_number, None, None, None, None, 1, None, |
+ None, None, None, None, None, None) |
+ rpc_state.sequence_number += 1 |
+ self._relay.add_value(ticket) |
+ rpc_state.low_write = _LowWrite.OPEN |
+ _no_longer_due(_WRITE, rpc_state, call, self._rpc_states) |
+ |
+ def _on_finish_event(self, event): |
+ call = event.tag |
+ rpc_state = self._rpc_states[call] |
+ _no_longer_due(_FINISH, rpc_state, call, self._rpc_states) |
+ code = event.status.code |
+ if code == _intermediary_low.Code.OK: |
+ return |
+ |
+ if code == _intermediary_low.Code.CANCELLED: |
+ termination = links.Ticket.Termination.CANCELLATION |
+ elif code == _intermediary_low.Code.DEADLINE_EXCEEDED: |
+ termination = links.Ticket.Termination.EXPIRATION |
+ else: |
+ termination = links.Ticket.Termination.TRANSMISSION_FAILURE |
+ ticket = links.Ticket( |
+ call, rpc_state.sequence_number, None, None, None, None, None, None, |
+ None, None, None, None, termination, None) |
+ rpc_state.sequence_number += 1 |
+ self._relay.add_value(ticket) |
+ |
+ def _spin(self, completion_queue, server): |
+ while True: |
+ event = completion_queue.get(None) |
+ with self._lock: |
+ if event.kind is _STOP: |
+ self._due.remove(_STOP) |
+ elif event.kind is _READ: |
+ self._on_read_event(event) |
+ elif event.kind is _WRITE: |
+ self._on_write_event(event) |
+ elif event.kind is _COMPLETE: |
+ _no_longer_due( |
+ _COMPLETE, self._rpc_states.get(event.tag), event.tag, |
+ self._rpc_states) |
+ elif event.kind is _intermediary_low.Event.Kind.FINISH: |
+ self._on_finish_event(event) |
+ elif event.kind is _SERVICE: |
+ if self._server is None: |
+ self._due.remove(_SERVICE) |
+ else: |
+ self._on_service_acceptance_event(event, server) |
+ else: |
+ logging.error('Illegal event! %s', (event,)) |
+ |
+ if not self._due and not self._rpc_states: |
+ completion_queue.stop() |
+ return |
+ |
+ def add_ticket(self, ticket): |
+ with self._lock: |
+ call = ticket.operation_id |
+ rpc_state = self._rpc_states.get(call) |
+ if rpc_state is None: |
+ return |
+ |
+ if ticket.initial_metadata is not None: |
+ _metadatafy(call, ticket.initial_metadata) |
+ call.premetadata() |
+ rpc_state.premetadataed = True |
+ elif not rpc_state.premetadataed: |
+ if (ticket.terminal_metadata is not None or |
+ ticket.payload is not None or |
+ ticket.termination is not None or |
+ ticket.code is not None or |
+ ticket.message is not None): |
+ call.premetadata() |
+ rpc_state.premetadataed = True |
+ |
+ if ticket.allowance is not None: |
+ if rpc_state.early_read is None: |
+ rpc_state.allowance += ticket.allowance |
+ else: |
+ payload = rpc_state.request_deserializer(rpc_state.early_read) |
+ rpc_state.allowance += ticket.allowance - 1 |
+ rpc_state.early_read = None |
+ if rpc_state.read is _Read.READING: |
+ call.read(call) |
+ rpc_state.due.add(_READ) |
+ termination = None |
+ else: |
+ termination = links.Ticket.Termination.COMPLETION |
+ early_read_ticket = links.Ticket( |
+ call, rpc_state.sequence_number, None, None, None, None, None, |
+ None, payload, None, None, None, termination, None) |
+ rpc_state.sequence_number += 1 |
+ self._relay.add_value(early_read_ticket) |
+ |
+ if ticket.payload is not None: |
+ disable_compression = rpc_state.context.next_compression_disabled() |
+ if disable_compression: |
+ flags = _intermediary_low.WriteFlags.WRITE_NO_COMPRESS |
+ else: |
+ flags = 0 |
+ call.write(rpc_state.response_serializer(ticket.payload), call, flags) |
+ rpc_state.due.add(_WRITE) |
+ rpc_state.low_write = _LowWrite.ACTIVE |
+ |
+ if ticket.terminal_metadata is not None: |
+ rpc_state.terminal_metadata = ticket.terminal_metadata |
+ if ticket.code is not None: |
+ rpc_state.code = ticket.code |
+ if ticket.message is not None: |
+ rpc_state.message = ticket.message |
+ |
+ if ticket.termination is links.Ticket.Termination.COMPLETION: |
+ rpc_state.high_write = _HighWrite.CLOSED |
+ if rpc_state.low_write is _LowWrite.OPEN: |
+ if rpc_state.terminal_metadata is not None: |
+ _metadatafy(call, rpc_state.terminal_metadata) |
+ status = _status( |
+ links.Ticket.Termination.COMPLETION, rpc_state.code, |
+ rpc_state.message) |
+ call.status(status, call) |
+ rpc_state.due.add(_COMPLETE) |
+ rpc_state.low_write = _LowWrite.CLOSED |
+ elif ticket.termination is not None: |
+ if rpc_state.terminal_metadata is not None: |
+ _metadatafy(call, rpc_state.terminal_metadata) |
+ status = _status( |
+ ticket.termination, rpc_state.code, rpc_state.message) |
+ call.status(status, call) |
+ rpc_state.due.add(_COMPLETE) |
+ |
+ def add_port(self, address, server_credentials): |
+ with self._lock: |
+ if self._server is None: |
+ self._completion_queue = _intermediary_low.CompletionQueue() |
+ self._server = _intermediary_low.Server(self._completion_queue) |
+ if server_credentials is None: |
+ return self._server.add_http2_addr(address) |
+ else: |
+ return self._server.add_secure_http2_addr(address, server_credentials) |
+ |
+ def start(self): |
+ with self._lock: |
+ if self._server is None: |
+ self._completion_queue = _intermediary_low.CompletionQueue() |
+ self._server = _intermediary_low.Server(self._completion_queue) |
+ self._pool = logging_pool.pool(1) |
+ self._pool.submit(self._spin, self._completion_queue, self._server) |
+ self._server.start() |
+ self._server.service(None) |
+ self._due.add(_SERVICE) |
+ |
+ def begin_stop(self): |
+ with self._lock: |
+ self._server.stop() |
+ self._due.add(_STOP) |
+ self._server = None |
+ |
+ def end_stop(self): |
+ with self._lock: |
+ pool = self._pool |
+ pool.shutdown(wait=True) |
+ |
+ |
+class ServiceLink(links.Link): |
+ """A links.Link for use on the service-side of a gRPC connection. |
+ |
+ Implementations of this interface are only valid for use between calls to |
+ their start method and one of their stop methods. |
+ """ |
+ |
+ @abc.abstractmethod |
+ def add_port(self, address, server_credentials): |
+ """Adds a port on which to service RPCs after this link has been started. |
+ |
+ Args: |
+ address: The address on which to service RPCs with a port number of zero |
+ requesting that a port number be automatically selected and used. |
+ server_credentials: An _intermediary_low.ServerCredentials object, or |
+ None for insecure service. |
+ |
+ Returns: |
+ An integer port on which RPCs will be serviced after this link has been |
+ started. This is typically the same number as the port number contained |
+ in the passed address, but will likely be different if the port number |
+ contained in the passed address was zero. |
+ """ |
+ raise NotImplementedError() |
+ |
+ @abc.abstractmethod |
+ def start(self): |
+ """Starts this object. |
+ |
+ This method must be called before attempting to use this Link in ticket |
+ exchange. |
+ """ |
+ raise NotImplementedError() |
+ |
+ @abc.abstractmethod |
+ def begin_stop(self): |
+ """Indicate imminent link stop and immediate rejection of new RPCs. |
+ |
+ New RPCs will be rejected as soon as this method is called, but ongoing RPCs |
+ will be allowed to continue until they terminate. This method does not |
+ block. |
+ """ |
+ raise NotImplementedError() |
+ |
+ @abc.abstractmethod |
+ def end_stop(self): |
+ """Finishes stopping this link. |
+ |
+ begin_stop must have been called exactly once before calling this method. |
+ |
+ All in-progress RPCs will be terminated immediately. |
+ """ |
+ raise NotImplementedError() |
+ |
+ |
+class _ServiceLink(ServiceLink): |
+ |
+ def __init__(self, request_deserializers, response_serializers): |
+ self._relay = relay.relay(None) |
+ self._kernel = _Kernel( |
+ {} if request_deserializers is None else request_deserializers, |
+ {} if response_serializers is None else response_serializers, |
+ self._relay) |
+ |
+ def accept_ticket(self, ticket): |
+ self._kernel.add_ticket(ticket) |
+ |
+ def join_link(self, link): |
+ self._relay.set_behavior(link.accept_ticket) |
+ |
+ def add_port(self, address, server_credentials): |
+ return self._kernel.add_port(address, server_credentials) |
+ |
+ def start(self): |
+ self._relay.start() |
+ return self._kernel.start() |
+ |
+ def begin_stop(self): |
+ self._kernel.begin_stop() |
+ |
+ def end_stop(self): |
+ self._kernel.end_stop() |
+ self._relay.stop() |
+ |
+ |
+def service_link(request_deserializers, response_serializers): |
+ """Creates a ServiceLink. |
+ |
+ Args: |
+ request_deserializers: A dict from group-method pair to request object |
+ deserialization behavior. |
+ response_serializers: A dict from group-method pair to response ojbect |
+ serialization behavior. |
+ |
+ Returns: |
+ A ServiceLink. |
+ """ |
+ return _ServiceLink(request_deserializers, response_serializers) |