| Index: third_party/grpc/src/python/grpcio/grpc/_adapter/_intermediary_low.py
|
| diff --git a/third_party/grpc/src/python/grpcio/grpc/_adapter/_intermediary_low.py b/third_party/grpc/src/python/grpcio/grpc/_adapter/_intermediary_low.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..9698ffeabf1f967f8c4c86f147f22c488251d5cc
|
| --- /dev/null
|
| +++ b/third_party/grpc/src/python/grpcio/grpc/_adapter/_intermediary_low.py
|
| @@ -0,0 +1,258 @@
|
| +# Copyright 2015, Google Inc.
|
| +# All rights reserved.
|
| +#
|
| +# Redistribution and use in source and binary forms, with or without
|
| +# modification, are permitted provided that the following conditions are
|
| +# met:
|
| +#
|
| +# * Redistributions of source code must retain the above copyright
|
| +# notice, this list of conditions and the following disclaimer.
|
| +# * Redistributions in binary form must reproduce the above
|
| +# copyright notice, this list of conditions and the following disclaimer
|
| +# in the documentation and/or other materials provided with the
|
| +# distribution.
|
| +# * Neither the name of Google Inc. nor the names of its
|
| +# contributors may be used to endorse or promote products derived from
|
| +# this software without specific prior written permission.
|
| +#
|
| +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
| +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
| +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
| +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
| +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
| +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
| +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
| +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
| +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
| +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
| +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
| +
|
| +"""Temporary old _low-like layer.
|
| +
|
| +Eases refactoring burden while we overhaul the Python framework.
|
| +
|
| +Plan:
|
| + The layers used to look like:
|
| + ... # outside _adapter
|
| + fore.py + rear.py # visible outside _adapter
|
| + _low
|
| + _c
|
| + The layers currently look like:
|
| + ... # outside _adapter
|
| + fore.py + rear.py # visible outside _adapter
|
| + _low_intermediary # adapter for new '_low' to old '_low'
|
| + _low # new '_low'
|
| + _c # new '_c'
|
| + We will later remove _low_intermediary after refactoring of fore.py and
|
| + rear.py according to the ticket system refactoring and get:
|
| + ... # outside _adapter, refactored
|
| + fore.py + rear.py # visible outside _adapter, refactored
|
| + _low # new '_low'
|
| + _c # new '_c'
|
| +"""
|
| +
|
| +import collections
|
| +import enum
|
| +
|
| +from grpc._adapter import _low
|
| +from grpc._adapter import _types
|
| +
|
| +_IGNORE_ME_TAG = object()
|
| +Code = _types.StatusCode
|
| +WriteFlags = _types.OpWriteFlags
|
| +
|
| +
|
| +class Status(collections.namedtuple('Status', ['code', 'details'])):
|
| + """Describes an RPC's overall status."""
|
| +
|
| +
|
| +class ServiceAcceptance(
|
| + collections.namedtuple(
|
| + 'ServiceAcceptance', ['call', 'method', 'host', 'deadline'])):
|
| + """Describes an RPC on the service side at the start of service."""
|
| +
|
| +
|
| +class Event(
|
| + collections.namedtuple(
|
| + 'Event',
|
| + ['kind', 'tag', 'write_accepted', 'complete_accepted',
|
| + 'service_acceptance', 'bytes', 'status', 'metadata'])):
|
| + """Describes an event emitted from a completion queue."""
|
| +
|
| + @enum.unique
|
| + class Kind(enum.Enum):
|
| + """Describes the kind of an event."""
|
| +
|
| + STOP = object()
|
| + WRITE_ACCEPTED = object()
|
| + COMPLETE_ACCEPTED = object()
|
| + SERVICE_ACCEPTED = object()
|
| + READ_ACCEPTED = object()
|
| + METADATA_ACCEPTED = object()
|
| + FINISH = object()
|
| +
|
| +
|
| +class _TagAdapter(collections.namedtuple('_TagAdapter', [
|
| + 'user_tag',
|
| + 'kind'
|
| + ])):
|
| + pass
|
| +
|
| +
|
| +class Call(object):
|
| + """Adapter from old _low.Call interface to new _low.Call."""
|
| +
|
| + def __init__(self, channel, completion_queue, method, host, deadline):
|
| + self._internal = channel._internal.create_call(
|
| + completion_queue._internal, method, host, deadline)
|
| + self._metadata = []
|
| +
|
| + @staticmethod
|
| + def _from_internal(internal):
|
| + call = Call.__new__(Call)
|
| + call._internal = internal
|
| + call._metadata = []
|
| + return call
|
| +
|
| + def invoke(self, completion_queue, metadata_tag, finish_tag):
|
| + err = self._internal.start_batch([
|
| + _types.OpArgs.send_initial_metadata(self._metadata)
|
| + ], _IGNORE_ME_TAG)
|
| + if err != _types.CallError.OK:
|
| + return err
|
| + err = self._internal.start_batch([
|
| + _types.OpArgs.recv_initial_metadata()
|
| + ], _TagAdapter(metadata_tag, Event.Kind.METADATA_ACCEPTED))
|
| + if err != _types.CallError.OK:
|
| + return err
|
| + err = self._internal.start_batch([
|
| + _types.OpArgs.recv_status_on_client()
|
| + ], _TagAdapter(finish_tag, Event.Kind.FINISH))
|
| + return err
|
| +
|
| + def write(self, message, tag, flags):
|
| + return self._internal.start_batch([
|
| + _types.OpArgs.send_message(message, flags)
|
| + ], _TagAdapter(tag, Event.Kind.WRITE_ACCEPTED))
|
| +
|
| + def complete(self, tag):
|
| + return self._internal.start_batch([
|
| + _types.OpArgs.send_close_from_client()
|
| + ], _TagAdapter(tag, Event.Kind.COMPLETE_ACCEPTED))
|
| +
|
| + def accept(self, completion_queue, tag):
|
| + return self._internal.start_batch([
|
| + _types.OpArgs.recv_close_on_server()
|
| + ], _TagAdapter(tag, Event.Kind.FINISH))
|
| +
|
| + def add_metadata(self, key, value):
|
| + self._metadata.append((key, value))
|
| +
|
| + def premetadata(self):
|
| + result = self._internal.start_batch([
|
| + _types.OpArgs.send_initial_metadata(self._metadata)
|
| + ], _IGNORE_ME_TAG)
|
| + self._metadata = []
|
| + return result
|
| +
|
| + def read(self, tag):
|
| + return self._internal.start_batch([
|
| + _types.OpArgs.recv_message()
|
| + ], _TagAdapter(tag, Event.Kind.READ_ACCEPTED))
|
| +
|
| + def status(self, status, tag):
|
| + return self._internal.start_batch([
|
| + _types.OpArgs.send_status_from_server(
|
| + self._metadata, status.code, status.details)
|
| + ], _TagAdapter(tag, Event.Kind.COMPLETE_ACCEPTED))
|
| +
|
| + def cancel(self):
|
| + return self._internal.cancel()
|
| +
|
| + def peer(self):
|
| + return self._internal.peer()
|
| +
|
| + def set_credentials(self, creds):
|
| + return self._internal.set_credentials(creds)
|
| +
|
| +
|
| +class Channel(object):
|
| + """Adapter from old _low.Channel interface to new _low.Channel."""
|
| +
|
| + def __init__(self, hostport, channel_credentials, server_host_override=None):
|
| + args = []
|
| + if server_host_override:
|
| + args.append((_types.GrpcChannelArgumentKeys.SSL_TARGET_NAME_OVERRIDE.value, server_host_override))
|
| + self._internal = _low.Channel(hostport, args, channel_credentials)
|
| +
|
| +
|
| +class CompletionQueue(object):
|
| + """Adapter from old _low.CompletionQueue interface to new _low.CompletionQueue."""
|
| +
|
| + def __init__(self):
|
| + self._internal = _low.CompletionQueue()
|
| +
|
| + def get(self, deadline=None):
|
| + if deadline is None:
|
| + ev = self._internal.next(float('+inf'))
|
| + else:
|
| + ev = self._internal.next(deadline)
|
| + if ev is None:
|
| + return None
|
| + elif ev.tag is _IGNORE_ME_TAG:
|
| + return self.get(deadline)
|
| + elif ev.type == _types.EventType.QUEUE_SHUTDOWN:
|
| + kind = Event.Kind.STOP
|
| + tag = None
|
| + write_accepted = None
|
| + complete_accepted = None
|
| + service_acceptance = None
|
| + message_bytes = None
|
| + status = None
|
| + metadata = None
|
| + elif ev.type == _types.EventType.OP_COMPLETE:
|
| + kind = ev.tag.kind
|
| + tag = ev.tag.user_tag
|
| + write_accepted = ev.success if kind == Event.Kind.WRITE_ACCEPTED else None
|
| + complete_accepted = ev.success if kind == Event.Kind.COMPLETE_ACCEPTED else None
|
| + service_acceptance = ServiceAcceptance(Call._from_internal(ev.call), ev.call_details.method, ev.call_details.host, ev.call_details.deadline) if kind == Event.Kind.SERVICE_ACCEPTED else None
|
| + message_bytes = ev.results[0].message if kind == Event.Kind.READ_ACCEPTED else None
|
| + status = Status(ev.results[0].status.code, ev.results[0].status.details) if (kind == Event.Kind.FINISH and ev.results[0].status) else Status(_types.StatusCode.CANCELLED if ev.results[0].cancelled else _types.StatusCode.OK, '') if len(ev.results) > 0 and ev.results[0].cancelled is not None else None
|
| + metadata = ev.results[0].initial_metadata if (kind in [Event.Kind.SERVICE_ACCEPTED, Event.Kind.METADATA_ACCEPTED]) else (ev.results[0].trailing_metadata if kind == Event.Kind.FINISH else None)
|
| + else:
|
| + raise RuntimeError('unknown event')
|
| + result_ev = Event(kind=kind, tag=tag, write_accepted=write_accepted, complete_accepted=complete_accepted, service_acceptance=service_acceptance, bytes=message_bytes, status=status, metadata=metadata)
|
| + return result_ev
|
| +
|
| + def stop(self):
|
| + self._internal.shutdown()
|
| +
|
| +
|
| +class Server(object):
|
| + """Adapter from old _low.Server interface to new _low.Server."""
|
| +
|
| + def __init__(self, completion_queue):
|
| + self._internal = _low.Server(completion_queue._internal, [])
|
| + self._internal_cq = completion_queue._internal
|
| +
|
| + def add_http2_addr(self, addr):
|
| + return self._internal.add_http2_port(addr)
|
| +
|
| + def add_secure_http2_addr(self, addr, server_credentials):
|
| + if server_credentials is None:
|
| + return self._internal.add_http2_port(addr, None)
|
| + else:
|
| + return self._internal.add_http2_port(addr, server_credentials)
|
| +
|
| + def start(self):
|
| + return self._internal.start()
|
| +
|
| + def service(self, tag):
|
| + return self._internal.request_call(self._internal_cq, _TagAdapter(tag, Event.Kind.SERVICE_ACCEPTED))
|
| +
|
| + def cancel_all_calls(self):
|
| + self._internal.cancel_all_calls()
|
| +
|
| + def stop(self):
|
| + return self._internal.shutdown(_TagAdapter(None, Event.Kind.STOP))
|
| +
|
|
|