| Index: third_party/grpc/src/python/grpcio/grpc/_adapter/fore.py
|
| diff --git a/third_party/grpc/src/python/grpcio/grpc/_adapter/fore.py b/third_party/grpc/src/python/grpcio/grpc/_adapter/fore.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..acdd69c4206b7bea1c8c53d7ee00ef2ba06c936e
|
| --- /dev/null
|
| +++ b/third_party/grpc/src/python/grpcio/grpc/_adapter/fore.py
|
| @@ -0,0 +1,363 @@
|
| +# Copyright 2015, Google Inc.
|
| +# All rights reserved.
|
| +#
|
| +# Redistribution and use in source and binary forms, with or without
|
| +# modification, are permitted provided that the following conditions are
|
| +# met:
|
| +#
|
| +# * Redistributions of source code must retain the above copyright
|
| +# notice, this list of conditions and the following disclaimer.
|
| +# * Redistributions in binary form must reproduce the above
|
| +# copyright notice, this list of conditions and the following disclaimer
|
| +# in the documentation and/or other materials provided with the
|
| +# distribution.
|
| +# * Neither the name of Google Inc. nor the names of its
|
| +# contributors may be used to endorse or promote products derived from
|
| +# this software without specific prior written permission.
|
| +#
|
| +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
| +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
| +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
| +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
| +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
| +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
| +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
| +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
| +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
| +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
| +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
| +
|
| +"""The RPC-service-side bridge between RPC Framework and GRPC-on-the-wire."""
|
| +
|
| +import enum
|
| +import logging
|
| +import threading
|
| +import time
|
| +
|
| +from grpc._adapter import _common
|
| +from grpc._adapter import _intermediary_low as _low
|
| +from grpc.framework.base import interfaces as base_interfaces
|
| +from grpc.framework.base import null
|
| +from grpc.framework.foundation import activated
|
| +from grpc.framework.foundation import logging_pool
|
| +
|
| +_THREAD_POOL_SIZE = 10
|
| +
|
| +
|
| +@enum.unique
|
| +class _LowWrite(enum.Enum):
|
| + """The possible categories of low-level write state."""
|
| +
|
| + OPEN = 'OPEN'
|
| + ACTIVE = 'ACTIVE'
|
| + CLOSED = 'CLOSED'
|
| +
|
| +
|
| +def _write(call, rpc_state, payload):
|
| + serialized_payload = rpc_state.serializer(payload)
|
| + if rpc_state.write.low is _LowWrite.OPEN:
|
| + call.write(serialized_payload, call, 0)
|
| + rpc_state.write.low = _LowWrite.ACTIVE
|
| + else:
|
| + rpc_state.write.pending.append(serialized_payload)
|
| +
|
| +
|
| +def _status(call, rpc_state):
|
| + call.status(_low.Status(_low.Code.OK, ''), call)
|
| + rpc_state.write.low = _LowWrite.CLOSED
|
| +
|
| +
|
| +class ForeLink(base_interfaces.ForeLink, activated.Activated):
|
| + """A service-side bridge between RPC Framework and the C-ish _low code."""
|
| +
|
| + def __init__(
|
| + self, pool, request_deserializers, response_serializers,
|
| + root_certificates, key_chain_pairs, port=None):
|
| + """Constructor.
|
| +
|
| + Args:
|
| + pool: A thread pool.
|
| + request_deserializers: A dict from RPC method names to request object
|
| + deserializer behaviors.
|
| + response_serializers: A dict from RPC method names to response object
|
| + serializer behaviors.
|
| + root_certificates: The PEM-encoded client root certificates as a
|
| + bytestring or None.
|
| + key_chain_pairs: A sequence of PEM-encoded private key-certificate chain
|
| + pairs.
|
| + port: The port on which to serve, or None to have a port selected
|
| + automatically.
|
| + """
|
| + self._condition = threading.Condition()
|
| + self._pool = pool
|
| + self._request_deserializers = request_deserializers
|
| + self._response_serializers = response_serializers
|
| + self._root_certificates = root_certificates
|
| + self._key_chain_pairs = key_chain_pairs
|
| + self._requested_port = port
|
| +
|
| + self._rear_link = null.NULL_REAR_LINK
|
| + self._completion_queue = None
|
| + self._server = None
|
| + self._rpc_states = {}
|
| + self._spinning = False
|
| + self._port = None
|
| +
|
| + def _on_stop_event(self):
|
| + self._spinning = False
|
| + self._condition.notify_all()
|
| +
|
| + def _on_service_acceptance_event(self, event, server):
|
| + """Handle a service invocation event."""
|
| + service_acceptance = event.service_acceptance
|
| + if service_acceptance is None:
|
| + return
|
| +
|
| + call = service_acceptance.call
|
| + call.accept(self._completion_queue, call)
|
| + # TODO(nathaniel): Metadata support.
|
| + call.premetadata()
|
| + call.read(call)
|
| + method = service_acceptance.method
|
| +
|
| + self._rpc_states[call] = _common.CommonRPCState(
|
| + _common.WriteState(_LowWrite.OPEN, _common.HighWrite.OPEN, []), 1,
|
| + self._request_deserializers[method],
|
| + self._response_serializers[method])
|
| +
|
| + ticket = base_interfaces.FrontToBackTicket(
|
| + call, 0, base_interfaces.FrontToBackTicket.Kind.COMMENCEMENT, method,
|
| + base_interfaces.ServicedSubscription.Kind.FULL, None, None,
|
| + service_acceptance.deadline - time.time())
|
| + self._rear_link.accept_front_to_back_ticket(ticket)
|
| +
|
| + server.service(None)
|
| +
|
| + def _on_read_event(self, event):
|
| + """Handle data arriving during an RPC."""
|
| + call = event.tag
|
| + rpc_state = self._rpc_states.get(call, None)
|
| + if rpc_state is None:
|
| + return
|
| +
|
| + sequence_number = rpc_state.sequence_number
|
| + rpc_state.sequence_number += 1
|
| + if event.bytes is None:
|
| + ticket = base_interfaces.FrontToBackTicket(
|
| + call, sequence_number,
|
| + base_interfaces.FrontToBackTicket.Kind.COMPLETION, None, None, None,
|
| + None, None)
|
| + else:
|
| + call.read(call)
|
| + ticket = base_interfaces.FrontToBackTicket(
|
| + call, sequence_number,
|
| + base_interfaces.FrontToBackTicket.Kind.CONTINUATION, None, None,
|
| + None, rpc_state.deserializer(event.bytes), None)
|
| +
|
| + self._rear_link.accept_front_to_back_ticket(ticket)
|
| +
|
| + def _on_write_event(self, event):
|
| + call = event.tag
|
| + rpc_state = self._rpc_states.get(call, None)
|
| + if rpc_state is None:
|
| + return
|
| +
|
| + if rpc_state.write.pending:
|
| + serialized_payload = rpc_state.write.pending.pop(0)
|
| + call.write(serialized_payload, call, 0)
|
| + elif rpc_state.write.high is _common.HighWrite.CLOSED:
|
| + _status(call, rpc_state)
|
| + else:
|
| + rpc_state.write.low = _LowWrite.OPEN
|
| +
|
| + def _on_complete_event(self, event):
|
| + if not event.complete_accepted:
|
| + logging.error('Complete not accepted! %s', (event,))
|
| + call = event.tag
|
| + rpc_state = self._rpc_states.pop(call, None)
|
| + if rpc_state is None:
|
| + return
|
| +
|
| + sequence_number = rpc_state.sequence_number
|
| + rpc_state.sequence_number += 1
|
| + ticket = base_interfaces.FrontToBackTicket(
|
| + call, sequence_number,
|
| + base_interfaces.FrontToBackTicket.Kind.TRANSMISSION_FAILURE, None,
|
| + None, None, None, None)
|
| + self._rear_link.accept_front_to_back_ticket(ticket)
|
| +
|
| + def _on_finish_event(self, event):
|
| + """Handle termination of an RPC."""
|
| + call = event.tag
|
| + rpc_state = self._rpc_states.pop(call, None)
|
| + if rpc_state is None:
|
| + return
|
| +
|
| + code = event.status.code
|
| + if code is _low.Code.OK:
|
| + return
|
| +
|
| + sequence_number = rpc_state.sequence_number
|
| + rpc_state.sequence_number += 1
|
| + if code is _low.Code.CANCELLED:
|
| + ticket = base_interfaces.FrontToBackTicket(
|
| + call, sequence_number,
|
| + base_interfaces.FrontToBackTicket.Kind.CANCELLATION, None, None,
|
| + None, None, None)
|
| + elif code is _low.Code.DEADLINE_EXCEEDED:
|
| + ticket = base_interfaces.FrontToBackTicket(
|
| + call, sequence_number,
|
| + base_interfaces.FrontToBackTicket.Kind.EXPIRATION, None, None, None,
|
| + None, None)
|
| + else:
|
| + # TODO(nathaniel): Better mapping of codes to ticket-categories
|
| + ticket = base_interfaces.FrontToBackTicket(
|
| + call, sequence_number,
|
| + base_interfaces.FrontToBackTicket.Kind.TRANSMISSION_FAILURE, None,
|
| + None, None, None, None)
|
| + self._rear_link.accept_front_to_back_ticket(ticket)
|
| +
|
| + def _spin(self, completion_queue, server):
|
| + while True:
|
| + event = completion_queue.get(None)
|
| +
|
| + with self._condition:
|
| + if event.kind is _low.Event.Kind.STOP:
|
| + self._on_stop_event()
|
| + return
|
| + elif self._server is None:
|
| + continue
|
| + elif event.kind is _low.Event.Kind.SERVICE_ACCEPTED:
|
| + self._on_service_acceptance_event(event, server)
|
| + elif event.kind is _low.Event.Kind.READ_ACCEPTED:
|
| + self._on_read_event(event)
|
| + elif event.kind is _low.Event.Kind.WRITE_ACCEPTED:
|
| + self._on_write_event(event)
|
| + elif event.kind is _low.Event.Kind.COMPLETE_ACCEPTED:
|
| + self._on_complete_event(event)
|
| + elif event.kind is _low.Event.Kind.FINISH:
|
| + self._on_finish_event(event)
|
| + else:
|
| + logging.error('Illegal event! %s', (event,))
|
| +
|
| + def _continue(self, call, payload):
|
| + rpc_state = self._rpc_states.get(call, None)
|
| + if rpc_state is None:
|
| + return
|
| +
|
| + _write(call, rpc_state, payload)
|
| +
|
| + def _complete(self, call, payload):
|
| + """Handle completion of the writes of an RPC."""
|
| + rpc_state = self._rpc_states.get(call, None)
|
| + if rpc_state is None:
|
| + return
|
| +
|
| + if rpc_state.write.low is _LowWrite.OPEN:
|
| + if payload is None:
|
| + _status(call, rpc_state)
|
| + else:
|
| + _write(call, rpc_state, payload)
|
| + elif rpc_state.write.low is _LowWrite.ACTIVE:
|
| + if payload is not None:
|
| + rpc_state.write.pending.append(rpc_state.serializer(payload))
|
| + else:
|
| + raise ValueError('Called to complete after having already completed!')
|
| + rpc_state.write.high = _common.HighWrite.CLOSED
|
| +
|
| + def _cancel(self, call):
|
| + call.cancel()
|
| + self._rpc_states.pop(call, None)
|
| +
|
| + def join_rear_link(self, rear_link):
|
| + """See base_interfaces.ForeLink.join_rear_link for specification."""
|
| + self._rear_link = null.NULL_REAR_LINK if rear_link is None else rear_link
|
| +
|
| + def _start(self):
|
| + """Starts this ForeLink.
|
| +
|
| + This method must be called before attempting to exchange tickets with this
|
| + object.
|
| + """
|
| + with self._condition:
|
| + address = '[::]:%d' % (
|
| + 0 if self._requested_port is None else self._requested_port)
|
| + self._completion_queue = _low.CompletionQueue()
|
| + if self._root_certificates is None and not self._key_chain_pairs:
|
| + self._server = _low.Server(self._completion_queue)
|
| + self._port = self._server.add_http2_addr(address)
|
| + else:
|
| + server_credentials = _low.ServerCredentials(
|
| + self._root_certificates, self._key_chain_pairs, False)
|
| + self._server = _low.Server(self._completion_queue)
|
| + self._port = self._server.add_secure_http2_addr(
|
| + address, server_credentials)
|
| + self._server.start()
|
| +
|
| + self._server.service(None)
|
| +
|
| + self._pool.submit(self._spin, self._completion_queue, self._server)
|
| + self._spinning = True
|
| +
|
| + return self
|
| +
|
| + # TODO(nathaniel): Expose graceful-shutdown semantics in which this object
|
| + # enters a state in which it finishes ongoing RPCs but refuses new ones.
|
| + def _stop(self):
|
| + """Stops this ForeLink.
|
| +
|
| + This method must be called for proper termination of this object, and no
|
| + attempts to exchange tickets with this object may be made after this method
|
| + has been called.
|
| + """
|
| + with self._condition:
|
| + self._server.stop()
|
| + # TODO(nathaniel): Yep, this is weird. Deleting a server shouldn't have a
|
| + # behaviorally significant side-effect.
|
| + self._server = None
|
| + self._completion_queue.stop()
|
| +
|
| + while self._spinning:
|
| + self._condition.wait()
|
| +
|
| + self._port = None
|
| +
|
| + def __enter__(self):
|
| + """See activated.Activated.__enter__ for specification."""
|
| + return self._start()
|
| +
|
| + def __exit__(self, exc_type, exc_val, exc_tb):
|
| + """See activated.Activated.__exit__ for specification."""
|
| + self._stop()
|
| + return False
|
| +
|
| + def start(self):
|
| + """See activated.Activated.start for specification."""
|
| + return self._start()
|
| +
|
| + def stop(self):
|
| + """See activated.Activated.stop for specification."""
|
| + self._stop()
|
| +
|
| + def port(self):
|
| + """Identifies the port on which this ForeLink is servicing RPCs.
|
| +
|
| + Returns:
|
| + The number of the port on which this ForeLink is servicing RPCs, or None
|
| + if this ForeLink is not currently activated and servicing RPCs.
|
| + """
|
| + with self._condition:
|
| + return self._port
|
| +
|
| + def accept_back_to_front_ticket(self, ticket):
|
| + """See base_interfaces.ForeLink.accept_back_to_front_ticket for spec."""
|
| + with self._condition:
|
| + if self._server is None:
|
| + return
|
| +
|
| + if ticket.kind is base_interfaces.BackToFrontTicket.Kind.CONTINUATION:
|
| + self._continue(ticket.operation_id, ticket.payload)
|
| + elif ticket.kind is base_interfaces.BackToFrontTicket.Kind.COMPLETION:
|
| + self._complete(ticket.operation_id, ticket.payload)
|
| + else:
|
| + self._cancel(ticket.operation_id)
|
|
|