Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(197)

Unified Diff: third_party/grpc/src/python/grpcio/grpc/_adapter/_intermediary_low.py

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/grpc/src/python/grpcio/grpc/_adapter/_intermediary_low.py
diff --git a/third_party/grpc/src/python/grpcio/grpc/_adapter/_intermediary_low.py b/third_party/grpc/src/python/grpcio/grpc/_adapter/_intermediary_low.py
new file mode 100644
index 0000000000000000000000000000000000000000..9698ffeabf1f967f8c4c86f147f22c488251d5cc
--- /dev/null
+++ b/third_party/grpc/src/python/grpcio/grpc/_adapter/_intermediary_low.py
@@ -0,0 +1,258 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Temporary old _low-like layer.
+
+Eases refactoring burden while we overhaul the Python framework.
+
+Plan:
+ The layers used to look like:
+ ... # outside _adapter
+ fore.py + rear.py # visible outside _adapter
+ _low
+ _c
+ The layers currently look like:
+ ... # outside _adapter
+ fore.py + rear.py # visible outside _adapter
+ _low_intermediary # adapter for new '_low' to old '_low'
+ _low # new '_low'
+ _c # new '_c'
+ We will later remove _low_intermediary after refactoring of fore.py and
+ rear.py according to the ticket system refactoring and get:
+ ... # outside _adapter, refactored
+ fore.py + rear.py # visible outside _adapter, refactored
+ _low # new '_low'
+ _c # new '_c'
+"""
+
+import collections
+import enum
+
+from grpc._adapter import _low
+from grpc._adapter import _types
+
+_IGNORE_ME_TAG = object()
+Code = _types.StatusCode
+WriteFlags = _types.OpWriteFlags
+
+
+class Status(collections.namedtuple('Status', ['code', 'details'])):
+ """Describes an RPC's overall status."""
+
+
+class ServiceAcceptance(
+ collections.namedtuple(
+ 'ServiceAcceptance', ['call', 'method', 'host', 'deadline'])):
+ """Describes an RPC on the service side at the start of service."""
+
+
+class Event(
+ collections.namedtuple(
+ 'Event',
+ ['kind', 'tag', 'write_accepted', 'complete_accepted',
+ 'service_acceptance', 'bytes', 'status', 'metadata'])):
+ """Describes an event emitted from a completion queue."""
+
+ @enum.unique
+ class Kind(enum.Enum):
+ """Describes the kind of an event."""
+
+ STOP = object()
+ WRITE_ACCEPTED = object()
+ COMPLETE_ACCEPTED = object()
+ SERVICE_ACCEPTED = object()
+ READ_ACCEPTED = object()
+ METADATA_ACCEPTED = object()
+ FINISH = object()
+
+
+class _TagAdapter(collections.namedtuple('_TagAdapter', [
+ 'user_tag',
+ 'kind'
+ ])):
+ pass
+
+
+class Call(object):
+ """Adapter from old _low.Call interface to new _low.Call."""
+
+ def __init__(self, channel, completion_queue, method, host, deadline):
+ self._internal = channel._internal.create_call(
+ completion_queue._internal, method, host, deadline)
+ self._metadata = []
+
+ @staticmethod
+ def _from_internal(internal):
+ call = Call.__new__(Call)
+ call._internal = internal
+ call._metadata = []
+ return call
+
+ def invoke(self, completion_queue, metadata_tag, finish_tag):
+ err = self._internal.start_batch([
+ _types.OpArgs.send_initial_metadata(self._metadata)
+ ], _IGNORE_ME_TAG)
+ if err != _types.CallError.OK:
+ return err
+ err = self._internal.start_batch([
+ _types.OpArgs.recv_initial_metadata()
+ ], _TagAdapter(metadata_tag, Event.Kind.METADATA_ACCEPTED))
+ if err != _types.CallError.OK:
+ return err
+ err = self._internal.start_batch([
+ _types.OpArgs.recv_status_on_client()
+ ], _TagAdapter(finish_tag, Event.Kind.FINISH))
+ return err
+
+ def write(self, message, tag, flags):
+ return self._internal.start_batch([
+ _types.OpArgs.send_message(message, flags)
+ ], _TagAdapter(tag, Event.Kind.WRITE_ACCEPTED))
+
+ def complete(self, tag):
+ return self._internal.start_batch([
+ _types.OpArgs.send_close_from_client()
+ ], _TagAdapter(tag, Event.Kind.COMPLETE_ACCEPTED))
+
+ def accept(self, completion_queue, tag):
+ return self._internal.start_batch([
+ _types.OpArgs.recv_close_on_server()
+ ], _TagAdapter(tag, Event.Kind.FINISH))
+
+ def add_metadata(self, key, value):
+ self._metadata.append((key, value))
+
+ def premetadata(self):
+ result = self._internal.start_batch([
+ _types.OpArgs.send_initial_metadata(self._metadata)
+ ], _IGNORE_ME_TAG)
+ self._metadata = []
+ return result
+
+ def read(self, tag):
+ return self._internal.start_batch([
+ _types.OpArgs.recv_message()
+ ], _TagAdapter(tag, Event.Kind.READ_ACCEPTED))
+
+ def status(self, status, tag):
+ return self._internal.start_batch([
+ _types.OpArgs.send_status_from_server(
+ self._metadata, status.code, status.details)
+ ], _TagAdapter(tag, Event.Kind.COMPLETE_ACCEPTED))
+
+ def cancel(self):
+ return self._internal.cancel()
+
+ def peer(self):
+ return self._internal.peer()
+
+ def set_credentials(self, creds):
+ return self._internal.set_credentials(creds)
+
+
+class Channel(object):
+ """Adapter from old _low.Channel interface to new _low.Channel."""
+
+ def __init__(self, hostport, channel_credentials, server_host_override=None):
+ args = []
+ if server_host_override:
+ args.append((_types.GrpcChannelArgumentKeys.SSL_TARGET_NAME_OVERRIDE.value, server_host_override))
+ self._internal = _low.Channel(hostport, args, channel_credentials)
+
+
+class CompletionQueue(object):
+ """Adapter from old _low.CompletionQueue interface to new _low.CompletionQueue."""
+
+ def __init__(self):
+ self._internal = _low.CompletionQueue()
+
+ def get(self, deadline=None):
+ if deadline is None:
+ ev = self._internal.next(float('+inf'))
+ else:
+ ev = self._internal.next(deadline)
+ if ev is None:
+ return None
+ elif ev.tag is _IGNORE_ME_TAG:
+ return self.get(deadline)
+ elif ev.type == _types.EventType.QUEUE_SHUTDOWN:
+ kind = Event.Kind.STOP
+ tag = None
+ write_accepted = None
+ complete_accepted = None
+ service_acceptance = None
+ message_bytes = None
+ status = None
+ metadata = None
+ elif ev.type == _types.EventType.OP_COMPLETE:
+ kind = ev.tag.kind
+ tag = ev.tag.user_tag
+ write_accepted = ev.success if kind == Event.Kind.WRITE_ACCEPTED else None
+ complete_accepted = ev.success if kind == Event.Kind.COMPLETE_ACCEPTED else None
+ service_acceptance = ServiceAcceptance(Call._from_internal(ev.call), ev.call_details.method, ev.call_details.host, ev.call_details.deadline) if kind == Event.Kind.SERVICE_ACCEPTED else None
+ message_bytes = ev.results[0].message if kind == Event.Kind.READ_ACCEPTED else None
+ status = Status(ev.results[0].status.code, ev.results[0].status.details) if (kind == Event.Kind.FINISH and ev.results[0].status) else Status(_types.StatusCode.CANCELLED if ev.results[0].cancelled else _types.StatusCode.OK, '') if len(ev.results) > 0 and ev.results[0].cancelled is not None else None
+ metadata = ev.results[0].initial_metadata if (kind in [Event.Kind.SERVICE_ACCEPTED, Event.Kind.METADATA_ACCEPTED]) else (ev.results[0].trailing_metadata if kind == Event.Kind.FINISH else None)
+ else:
+ raise RuntimeError('unknown event')
+ result_ev = Event(kind=kind, tag=tag, write_accepted=write_accepted, complete_accepted=complete_accepted, service_acceptance=service_acceptance, bytes=message_bytes, status=status, metadata=metadata)
+ return result_ev
+
+ def stop(self):
+ self._internal.shutdown()
+
+
+class Server(object):
+ """Adapter from old _low.Server interface to new _low.Server."""
+
+ def __init__(self, completion_queue):
+ self._internal = _low.Server(completion_queue._internal, [])
+ self._internal_cq = completion_queue._internal
+
+ def add_http2_addr(self, addr):
+ return self._internal.add_http2_port(addr)
+
+ def add_secure_http2_addr(self, addr, server_credentials):
+ if server_credentials is None:
+ return self._internal.add_http2_port(addr, None)
+ else:
+ return self._internal.add_http2_port(addr, server_credentials)
+
+ def start(self):
+ return self._internal.start()
+
+ def service(self, tag):
+ return self._internal.request_call(self._internal_cq, _TagAdapter(tag, Event.Kind.SERVICE_ACCEPTED))
+
+ def cancel_all_calls(self):
+ self._internal.cancel_all_calls()
+
+ def stop(self):
+ return self._internal.shutdown(_TagAdapter(None, Event.Kind.STOP))
+

Powered by Google App Engine
This is Rietveld 408576698