| Index: third_party/grpc/src/python/grpcio/grpc/_adapter/rear.py
|
| diff --git a/third_party/grpc/src/python/grpcio/grpc/_adapter/rear.py b/third_party/grpc/src/python/grpcio/grpc/_adapter/rear.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..17fa47f74600f08459aba055296af89ae10a61cf
|
| --- /dev/null
|
| +++ b/third_party/grpc/src/python/grpcio/grpc/_adapter/rear.py
|
| @@ -0,0 +1,395 @@
|
| +# Copyright 2015, Google Inc.
|
| +# All rights reserved.
|
| +#
|
| +# Redistribution and use in source and binary forms, with or without
|
| +# modification, are permitted provided that the following conditions are
|
| +# met:
|
| +#
|
| +# * Redistributions of source code must retain the above copyright
|
| +# notice, this list of conditions and the following disclaimer.
|
| +# * Redistributions in binary form must reproduce the above
|
| +# copyright notice, this list of conditions and the following disclaimer
|
| +# in the documentation and/or other materials provided with the
|
| +# distribution.
|
| +# * Neither the name of Google Inc. nor the names of its
|
| +# contributors may be used to endorse or promote products derived from
|
| +# this software without specific prior written permission.
|
| +#
|
| +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
| +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
| +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
| +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
| +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
| +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
| +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
| +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
| +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
| +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
| +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
| +
|
| +"""The RPC-invocation-side bridge between RPC Framework and GRPC-on-the-wire."""
|
| +
|
| +import enum
|
| +import logging
|
| +import threading
|
| +import time
|
| +
|
| +from grpc._adapter import _common
|
| +from grpc._adapter import _intermediary_low as _low
|
| +from grpc.framework.base import interfaces as base_interfaces
|
| +from grpc.framework.base import null
|
| +from grpc.framework.foundation import activated
|
| +from grpc.framework.foundation import logging_pool
|
| +
|
| +_THREAD_POOL_SIZE = 10
|
| +
|
| +_INVOCATION_EVENT_KINDS = (
|
| + _low.Event.Kind.METADATA_ACCEPTED,
|
| + _low.Event.Kind.FINISH
|
| +)
|
| +
|
| +
|
| +@enum.unique
|
| +class _LowWrite(enum.Enum):
|
| + """The possible categories of low-level write state."""
|
| +
|
| + OPEN = 'OPEN'
|
| + ACTIVE = 'ACTIVE'
|
| + CLOSED = 'CLOSED'
|
| +
|
| +
|
| +class _RPCState(object):
|
| + """The full state of any tracked RPC.
|
| +
|
| + Attributes:
|
| + call: The _low.Call object for the RPC.
|
| + outstanding: The set of Event.Kind values describing expected future events
|
| + for the RPC.
|
| + active: A boolean indicating whether or not the RPC is active.
|
| + common: An _common.RPCState describing additional state for the RPC.
|
| + """
|
| +
|
| + def __init__(self, call, outstanding, active, common):
|
| + self.call = call
|
| + self.outstanding = outstanding
|
| + self.active = active
|
| + self.common = common
|
| +
|
| +
|
| +def _write(operation_id, call, outstanding, write_state, serialized_payload):
|
| + if write_state.low is _LowWrite.OPEN:
|
| + call.write(serialized_payload, operation_id, 0)
|
| + outstanding.add(_low.Event.Kind.WRITE_ACCEPTED)
|
| + write_state.low = _LowWrite.ACTIVE
|
| + elif write_state.low is _LowWrite.ACTIVE:
|
| + write_state.pending.append(serialized_payload)
|
| + else:
|
| + raise ValueError('Write attempted after writes completed!')
|
| +
|
| +
|
| +class RearLink(base_interfaces.RearLink, activated.Activated):
|
| + """An invocation-side bridge between RPC Framework and the C-ish _low code."""
|
| +
|
| + def __init__(
|
| + self, host, port, pool, request_serializers, response_deserializers,
|
| + secure, root_certificates, private_key, certificate_chain,
|
| + metadata_transformer=None, server_host_override=None):
|
| + """Constructor.
|
| +
|
| + Args:
|
| + host: The host to which to connect for RPC service.
|
| + port: The port to which to connect for RPC service.
|
| + pool: A thread pool.
|
| + request_serializers: A dict from RPC method names to request object
|
| + serializer behaviors.
|
| + response_deserializers: A dict from RPC method names to response object
|
| + deserializer behaviors.
|
| + secure: A boolean indicating whether or not to use a secure connection.
|
| + root_certificates: The PEM-encoded root certificates or None to ask for
|
| + them to be retrieved from a default location.
|
| + private_key: The PEM-encoded private key to use or None if no private
|
| + key should be used.
|
| + certificate_chain: The PEM-encoded certificate chain to use or None if
|
| + no certificate chain should be used.
|
| + metadata_transformer: A function that given a metadata object produces
|
| + another metadata to be used in the underlying communication on the
|
| + wire.
|
| + server_host_override: (For testing only) the target name used for SSL
|
| + host name checking.
|
| + """
|
| + self._condition = threading.Condition()
|
| + self._host = host
|
| + self._port = port
|
| + self._pool = pool
|
| + self._request_serializers = request_serializers
|
| + self._response_deserializers = response_deserializers
|
| +
|
| + self._fore_link = null.NULL_FORE_LINK
|
| + self._completion_queue = None
|
| + self._channel = None
|
| + self._rpc_states = {}
|
| + self._spinning = False
|
| + if secure:
|
| + self._client_credentials = _low.ClientCredentials(
|
| + root_certificates, private_key, certificate_chain)
|
| + else:
|
| + self._client_credentials = None
|
| + self._root_certificates = root_certificates
|
| + self._private_key = private_key
|
| + self._certificate_chain = certificate_chain
|
| + self._metadata_transformer = metadata_transformer
|
| + self._server_host_override = server_host_override
|
| +
|
| + def _on_write_event(self, operation_id, event, rpc_state):
|
| + if event.write_accepted:
|
| + if rpc_state.common.write.pending:
|
| + rpc_state.call.write(
|
| + rpc_state.common.write.pending.pop(0), operation_id, 0)
|
| + rpc_state.outstanding.add(_low.Event.Kind.WRITE_ACCEPTED)
|
| + elif rpc_state.common.write.high is _common.HighWrite.CLOSED:
|
| + rpc_state.call.complete(operation_id)
|
| + rpc_state.outstanding.add(_low.Event.Kind.COMPLETE_ACCEPTED)
|
| + rpc_state.common.write.low = _LowWrite.CLOSED
|
| + else:
|
| + rpc_state.common.write.low = _LowWrite.OPEN
|
| + else:
|
| + logging.error('RPC write not accepted! Event: %s', (event,))
|
| + rpc_state.active = False
|
| + ticket = base_interfaces.BackToFrontTicket(
|
| + operation_id, rpc_state.common.sequence_number,
|
| + base_interfaces.BackToFrontTicket.Kind.TRANSMISSION_FAILURE, None)
|
| + rpc_state.common.sequence_number += 1
|
| + self._fore_link.accept_back_to_front_ticket(ticket)
|
| +
|
| + def _on_read_event(self, operation_id, event, rpc_state):
|
| + if event.bytes is not None:
|
| + rpc_state.call.read(operation_id)
|
| + rpc_state.outstanding.add(_low.Event.Kind.READ_ACCEPTED)
|
| +
|
| + ticket = base_interfaces.BackToFrontTicket(
|
| + operation_id, rpc_state.common.sequence_number,
|
| + base_interfaces.BackToFrontTicket.Kind.CONTINUATION,
|
| + rpc_state.common.deserializer(event.bytes))
|
| + rpc_state.common.sequence_number += 1
|
| + self._fore_link.accept_back_to_front_ticket(ticket)
|
| +
|
| + def _on_complete_event(self, operation_id, event, rpc_state):
|
| + if not event.complete_accepted:
|
| + logging.error('RPC complete not accepted! Event: %s', (event,))
|
| + rpc_state.active = False
|
| + ticket = base_interfaces.BackToFrontTicket(
|
| + operation_id, rpc_state.common.sequence_number,
|
| + base_interfaces.BackToFrontTicket.Kind.TRANSMISSION_FAILURE, None)
|
| + rpc_state.common.sequence_number += 1
|
| + self._fore_link.accept_back_to_front_ticket(ticket)
|
| +
|
| + # TODO(nathaniel): Metadata support.
|
| + def _on_metadata_event(self, operation_id, event, rpc_state): # pylint: disable=unused-argument
|
| + rpc_state.call.read(operation_id)
|
| + rpc_state.outstanding.add(_low.Event.Kind.READ_ACCEPTED)
|
| +
|
| + def _on_finish_event(self, operation_id, event, rpc_state):
|
| + """Handle termination of an RPC."""
|
| + # TODO(nathaniel): Cover all statuses.
|
| + if event.status.code is _low.Code.OK:
|
| + kind = base_interfaces.BackToFrontTicket.Kind.COMPLETION
|
| + elif event.status.code is _low.Code.CANCELLED:
|
| + kind = base_interfaces.BackToFrontTicket.Kind.CANCELLATION
|
| + elif event.status.code is _low.Code.DEADLINE_EXCEEDED:
|
| + kind = base_interfaces.BackToFrontTicket.Kind.EXPIRATION
|
| + else:
|
| + kind = base_interfaces.BackToFrontTicket.Kind.TRANSMISSION_FAILURE
|
| + ticket = base_interfaces.BackToFrontTicket(
|
| + operation_id, rpc_state.common.sequence_number, kind, None)
|
| + rpc_state.common.sequence_number += 1
|
| + self._fore_link.accept_back_to_front_ticket(ticket)
|
| +
|
| + def _spin(self, completion_queue):
|
| + while True:
|
| + event = completion_queue.get(None)
|
| + operation_id = event.tag
|
| +
|
| + with self._condition:
|
| + rpc_state = self._rpc_states[operation_id]
|
| + rpc_state.outstanding.remove(event.kind)
|
| + if rpc_state.active and self._completion_queue is not None:
|
| + if event.kind is _low.Event.Kind.WRITE_ACCEPTED:
|
| + self._on_write_event(operation_id, event, rpc_state)
|
| + elif event.kind is _low.Event.Kind.METADATA_ACCEPTED:
|
| + self._on_metadata_event(operation_id, event, rpc_state)
|
| + elif event.kind is _low.Event.Kind.READ_ACCEPTED:
|
| + self._on_read_event(operation_id, event, rpc_state)
|
| + elif event.kind is _low.Event.Kind.COMPLETE_ACCEPTED:
|
| + self._on_complete_event(operation_id, event, rpc_state)
|
| + elif event.kind is _low.Event.Kind.FINISH:
|
| + self._on_finish_event(operation_id, event, rpc_state)
|
| + else:
|
| + logging.error('Illegal RPC event! %s', (event,))
|
| +
|
| + if not rpc_state.outstanding:
|
| + self._rpc_states.pop(operation_id)
|
| + if not self._rpc_states:
|
| + self._spinning = False
|
| + self._condition.notify_all()
|
| + return
|
| +
|
| + def _invoke(self, operation_id, name, high_state, payload, timeout):
|
| + """Invoke an RPC.
|
| +
|
| + Args:
|
| + operation_id: Any object to be used as an operation ID for the RPC.
|
| + name: The RPC method name.
|
| + high_state: A _common.HighWrite value representing the "high write state"
|
| + of the RPC.
|
| + payload: A payload object for the RPC or None if no payload was given at
|
| + invocation-time.
|
| + timeout: A duration of time in seconds to allow for the RPC.
|
| + """
|
| + request_serializer = self._request_serializers[name]
|
| + call = _low.Call(self._channel, self._completion_queue, name, self._host, time.time() + timeout)
|
| + if self._metadata_transformer is not None:
|
| + metadata = self._metadata_transformer([])
|
| + for metadata_key, metadata_value in metadata:
|
| + call.add_metadata(metadata_key, metadata_value)
|
| + call.invoke(self._completion_queue, operation_id, operation_id)
|
| + outstanding = set(_INVOCATION_EVENT_KINDS)
|
| +
|
| + if payload is None:
|
| + if high_state is _common.HighWrite.CLOSED:
|
| + call.complete(operation_id)
|
| + low_state = _LowWrite.CLOSED
|
| + outstanding.add(_low.Event.Kind.COMPLETE_ACCEPTED)
|
| + else:
|
| + low_state = _LowWrite.OPEN
|
| + else:
|
| + serialized_payload = request_serializer(payload)
|
| + call.write(serialized_payload, operation_id, 0)
|
| + outstanding.add(_low.Event.Kind.WRITE_ACCEPTED)
|
| + low_state = _LowWrite.ACTIVE
|
| +
|
| + write_state = _common.WriteState(low_state, high_state, [])
|
| + common_state = _common.CommonRPCState(
|
| + write_state, 0, self._response_deserializers[name], request_serializer)
|
| + self._rpc_states[operation_id] = _RPCState(
|
| + call, outstanding, True, common_state)
|
| +
|
| + if not self._spinning:
|
| + self._pool.submit(self._spin, self._completion_queue)
|
| + self._spinning = True
|
| +
|
| + def _commence(self, operation_id, name, payload, timeout):
|
| + self._invoke(operation_id, name, _common.HighWrite.OPEN, payload, timeout)
|
| +
|
| + def _continue(self, operation_id, payload):
|
| + rpc_state = self._rpc_states.get(operation_id, None)
|
| + if rpc_state is None or not rpc_state.active:
|
| + return
|
| +
|
| + _write(
|
| + operation_id, rpc_state.call, rpc_state.outstanding,
|
| + rpc_state.common.write, rpc_state.common.serializer(payload))
|
| +
|
| + def _complete(self, operation_id, payload):
|
| + """Close writes associated with an ongoing RPC.
|
| +
|
| + Args:
|
| + operation_id: Any object being use as an operation ID for the RPC.
|
| + payload: A payload object for the RPC (and thus the last payload object
|
| + for the RPC) or None if no payload was given along with the instruction
|
| + to indicate the end of writes for the RPC.
|
| + """
|
| + rpc_state = self._rpc_states.get(operation_id, None)
|
| + if rpc_state is None or not rpc_state.active:
|
| + return
|
| +
|
| + write_state = rpc_state.common.write
|
| + if payload is None:
|
| + if write_state.low is _LowWrite.OPEN:
|
| + rpc_state.call.complete(operation_id)
|
| + rpc_state.outstanding.add(_low.Event.Kind.COMPLETE_ACCEPTED)
|
| + write_state.low = _LowWrite.CLOSED
|
| + else:
|
| + _write(
|
| + operation_id, rpc_state.call, rpc_state.outstanding, write_state,
|
| + rpc_state.common.serializer(payload))
|
| + write_state.high = _common.HighWrite.CLOSED
|
| +
|
| + def _entire(self, operation_id, name, payload, timeout):
|
| + self._invoke(operation_id, name, _common.HighWrite.CLOSED, payload, timeout)
|
| +
|
| + def _cancel(self, operation_id):
|
| + rpc_state = self._rpc_states.get(operation_id, None)
|
| + if rpc_state is not None and rpc_state.active:
|
| + rpc_state.call.cancel()
|
| + rpc_state.active = False
|
| +
|
| + def join_fore_link(self, fore_link):
|
| + """See base_interfaces.RearLink.join_fore_link for specification."""
|
| + with self._condition:
|
| + self._fore_link = null.NULL_FORE_LINK if fore_link is None else fore_link
|
| +
|
| + def _start(self):
|
| + """Starts this RearLink.
|
| +
|
| + This method must be called before attempting to exchange tickets with this
|
| + object.
|
| + """
|
| + with self._condition:
|
| + self._completion_queue = _low.CompletionQueue()
|
| + self._channel = _low.Channel(
|
| + '%s:%d' % (self._host, self._port), self._client_credentials,
|
| + server_host_override=self._server_host_override)
|
| + return self
|
| +
|
| + def _stop(self):
|
| + """Stops this RearLink.
|
| +
|
| + This method must be called for proper termination of this object, and no
|
| + attempts to exchange tickets with this object may be made after this method
|
| + has been called.
|
| + """
|
| + with self._condition:
|
| + self._completion_queue.stop()
|
| + self._completion_queue = None
|
| +
|
| + while self._spinning:
|
| + self._condition.wait()
|
| +
|
| + def __enter__(self):
|
| + """See activated.Activated.__enter__ for specification."""
|
| + return self._start()
|
| +
|
| + def __exit__(self, exc_type, exc_val, exc_tb):
|
| + """See activated.Activated.__exit__ for specification."""
|
| + self._stop()
|
| + return False
|
| +
|
| + def start(self):
|
| + """See activated.Activated.start for specification."""
|
| + return self._start()
|
| +
|
| + def stop(self):
|
| + """See activated.Activated.stop for specification."""
|
| + self._stop()
|
| +
|
| + def accept_front_to_back_ticket(self, ticket):
|
| + """See base_interfaces.RearLink.accept_front_to_back_ticket for spec."""
|
| + with self._condition:
|
| + if self._completion_queue is None:
|
| + return
|
| +
|
| + if ticket.kind is base_interfaces.FrontToBackTicket.Kind.COMMENCEMENT:
|
| + self._commence(
|
| + ticket.operation_id, ticket.name, ticket.payload, ticket.timeout)
|
| + elif ticket.kind is base_interfaces.FrontToBackTicket.Kind.CONTINUATION:
|
| + self._continue(ticket.operation_id, ticket.payload)
|
| + elif ticket.kind is base_interfaces.FrontToBackTicket.Kind.COMPLETION:
|
| + self._complete(ticket.operation_id, ticket.payload)
|
| + elif ticket.kind is base_interfaces.FrontToBackTicket.Kind.ENTIRE:
|
| + self._entire(
|
| + ticket.operation_id, ticket.name, ticket.payload, ticket.timeout)
|
| + elif ticket.kind is base_interfaces.FrontToBackTicket.Kind.CANCELLATION:
|
| + self._cancel(ticket.operation_id)
|
| + else:
|
| + # NOTE(nathaniel): All other categories are treated as cancellation.
|
| + self._cancel(ticket.operation_id)
|
|
|