Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2)

Unified Diff: third_party/grpc/src/python/grpcio/grpc/_adapter/_low.py

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/grpc/src/python/grpcio/grpc/_adapter/_low.py
diff --git a/third_party/grpc/src/python/grpcio/grpc/_adapter/_low.py b/third_party/grpc/src/python/grpcio/grpc/_adapter/_low.py
new file mode 100644
index 0000000000000000000000000000000000000000..62fd52ab4013921144722f11e21c8ecc039afcd4
--- /dev/null
+++ b/third_party/grpc/src/python/grpcio/grpc/_adapter/_low.py
@@ -0,0 +1,295 @@
+# Copyright 2015-2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import threading
+
+from grpc import _grpcio_metadata
+from grpc._cython import cygrpc
+from grpc._adapter import _implementations
+from grpc._adapter import _types
+
+_USER_AGENT = 'Python-gRPC-{}'.format(_grpcio_metadata.__version__)
+
+ChannelCredentials = cygrpc.ChannelCredentials
+CallCredentials = cygrpc.CallCredentials
+ServerCredentials = cygrpc.ServerCredentials
+
+channel_credentials_composite = cygrpc.channel_credentials_composite
+call_credentials_composite = cygrpc.call_credentials_composite
+
+def server_credentials_ssl(root_credentials, pair_sequence, force_client_auth):
+ return cygrpc.server_credentials_ssl(
+ root_credentials,
+ [cygrpc.SslPemKeyCertPair(key, pem) for key, pem in pair_sequence],
+ force_client_auth)
+
+def channel_credentials_ssl(
+ root_certificates, private_key, certificate_chain):
+ pair = None
+ if private_key is not None or certificate_chain is not None:
+ pair = cygrpc.SslPemKeyCertPair(private_key, certificate_chain)
+ return cygrpc.channel_credentials_ssl(root_certificates, pair)
+
+
+class _WrappedCygrpcCallback(object):
+
+ def __init__(self, cygrpc_callback):
+ self.is_called = False
+ self.error = None
+ self.is_called_lock = threading.Lock()
+ self.cygrpc_callback = cygrpc_callback
+
+ def _invoke_failure(self, error):
+ # TODO(atash) translate different Exception superclasses into different
+ # status codes.
+ self.cygrpc_callback(
+ cygrpc.Metadata([]), cygrpc.StatusCode.internal, error.message)
+
+ def _invoke_success(self, metadata):
+ try:
+ cygrpc_metadata = cygrpc.Metadata(
+ cygrpc.Metadatum(key, value)
+ for key, value in metadata)
+ except Exception as error:
+ self._invoke_failure(error)
+ return
+ self.cygrpc_callback(cygrpc_metadata, cygrpc.StatusCode.ok, '')
+
+ def __call__(self, metadata, error):
+ with self.is_called_lock:
+ if self.is_called:
+ raise RuntimeError('callback should only ever be invoked once')
+ if self.error:
+ self._invoke_failure(self.error)
+ return
+ self.is_called = True
+ if error is None:
+ self._invoke_success(metadata)
+ else:
+ self._invoke_failure(error)
+
+ def notify_failure(self, error):
+ with self.is_called_lock:
+ if not self.is_called:
+ self.error = error
+
+
+class _WrappedPlugin(object):
+
+ def __init__(self, plugin):
+ self.plugin = plugin
+
+ def __call__(self, context, cygrpc_callback):
+ wrapped_cygrpc_callback = _WrappedCygrpcCallback(cygrpc_callback)
+ wrapped_context = _implementations.AuthMetadataContext(context.service_url,
+ context.method_name)
+ try:
+ self.plugin(
+ wrapped_context,
+ _implementations.AuthMetadataPluginCallback(wrapped_cygrpc_callback))
+ except Exception as error:
+ wrapped_cygrpc_callback.notify_failure(error)
+ raise
+
+
+def call_credentials_metadata_plugin(plugin, name):
+ """
+ Args:
+ plugin: A callable accepting a _types.AuthMetadataContext
+ object and a callback (itself accepting a list of metadata key/value
+ 2-tuples and a None-able exception value). The callback must be eventually
+ called, but need not be called in plugin's invocation.
+ plugin's invocation must be non-blocking.
+ """
+ return cygrpc.call_credentials_metadata_plugin(
+ cygrpc.CredentialsMetadataPlugin(_WrappedPlugin(plugin), name))
+
+
+class CompletionQueue(_types.CompletionQueue):
+
+ def __init__(self):
+ self.completion_queue = cygrpc.CompletionQueue()
+
+ def next(self, deadline=float('+inf')):
+ raw_event = self.completion_queue.poll(cygrpc.Timespec(deadline))
+ if raw_event.type == cygrpc.CompletionType.queue_timeout:
+ return None
+ event_type = raw_event.type
+ event_tag = raw_event.tag
+ event_call = Call(raw_event.operation_call)
+ if raw_event.request_call_details:
+ event_call_details = _types.CallDetails(
+ raw_event.request_call_details.method,
+ raw_event.request_call_details.host,
+ float(raw_event.request_call_details.deadline))
+ else:
+ event_call_details = None
+ event_success = raw_event.success
+ event_results = []
+ if raw_event.is_new_request:
+ event_results.append(_types.OpResult(
+ _types.OpType.RECV_INITIAL_METADATA, raw_event.request_metadata,
+ None, None, None, None))
+ else:
+ if raw_event.batch_operations:
+ for operation in raw_event.batch_operations:
+ result_type = operation.type
+ result_initial_metadata = operation.received_metadata_or_none
+ result_trailing_metadata = operation.received_metadata_or_none
+ result_message = operation.received_message_or_none
+ if result_message is not None:
+ result_message = result_message.bytes()
+ result_cancelled = operation.received_cancelled_or_none
+ if operation.has_status:
+ result_status = _types.Status(
+ operation.received_status_code_or_none,
+ operation.received_status_details_or_none)
+ else:
+ result_status = None
+ event_results.append(
+ _types.OpResult(result_type, result_initial_metadata,
+ result_trailing_metadata, result_message,
+ result_status, result_cancelled))
+ return _types.Event(event_type, event_tag, event_call, event_call_details,
+ event_results, event_success)
+
+ def shutdown(self):
+ self.completion_queue.shutdown()
+
+
+class Call(_types.Call):
+
+ def __init__(self, call):
+ self.call = call
+
+ def start_batch(self, ops, tag):
+ translated_ops = []
+ for op in ops:
+ if op.type == _types.OpType.SEND_INITIAL_METADATA:
+ translated_op = cygrpc.operation_send_initial_metadata(
+ cygrpc.Metadata(
+ cygrpc.Metadatum(key, value)
+ for key, value in op.initial_metadata))
+ elif op.type == _types.OpType.SEND_MESSAGE:
+ translated_op = cygrpc.operation_send_message(op.message)
+ elif op.type == _types.OpType.SEND_CLOSE_FROM_CLIENT:
+ translated_op = cygrpc.operation_send_close_from_client()
+ elif op.type == _types.OpType.SEND_STATUS_FROM_SERVER:
+ translated_op = cygrpc.operation_send_status_from_server(
+ cygrpc.Metadata(
+ cygrpc.Metadatum(key, value)
+ for key, value in op.trailing_metadata),
+ op.status.code,
+ op.status.details)
+ elif op.type == _types.OpType.RECV_INITIAL_METADATA:
+ translated_op = cygrpc.operation_receive_initial_metadata()
+ elif op.type == _types.OpType.RECV_MESSAGE:
+ translated_op = cygrpc.operation_receive_message()
+ elif op.type == _types.OpType.RECV_STATUS_ON_CLIENT:
+ translated_op = cygrpc.operation_receive_status_on_client()
+ elif op.type == _types.OpType.RECV_CLOSE_ON_SERVER:
+ translated_op = cygrpc.operation_receive_close_on_server()
+ else:
+ raise ValueError('unexpected operation type {}'.format(op.type))
+ translated_ops.append(translated_op)
+ return self.call.start_batch(cygrpc.Operations(translated_ops), tag)
+
+ def cancel(self, code=None, details=None):
+ if code is None and details is None:
+ return self.call.cancel()
+ else:
+ return self.call.cancel(code, details)
+
+ def peer(self):
+ return self.call.peer()
+
+ def set_credentials(self, creds):
+ return self.call.set_credentials(creds)
+
+
+class Channel(_types.Channel):
+
+ def __init__(self, target, args, creds=None):
+ args = list(args) + [
+ (cygrpc.ChannelArgKey.primary_user_agent_string, _USER_AGENT)]
+ args = cygrpc.ChannelArgs(
+ cygrpc.ChannelArg(key, value) for key, value in args)
+ if creds is None:
+ self.channel = cygrpc.Channel(target, args)
+ else:
+ self.channel = cygrpc.Channel(target, args, creds)
+
+ def create_call(self, completion_queue, method, host, deadline=None):
+ internal_call = self.channel.create_call(
+ None, 0, completion_queue.completion_queue, method, host,
+ cygrpc.Timespec(deadline))
+ return Call(internal_call)
+
+ def check_connectivity_state(self, try_to_connect):
+ return self.channel.check_connectivity_state(try_to_connect)
+
+ def watch_connectivity_state(self, last_observed_state, deadline,
+ completion_queue, tag):
+ self.channel.watch_connectivity_state(
+ last_observed_state, cygrpc.Timespec(deadline),
+ completion_queue.completion_queue, tag)
+
+ def target(self):
+ return self.channel.target()
+
+
+_NO_TAG = object()
+
+class Server(_types.Server):
+
+ def __init__(self, completion_queue, args):
+ args = cygrpc.ChannelArgs(
+ cygrpc.ChannelArg(key, value) for key, value in args)
+ self.server = cygrpc.Server(args)
+ self.server.register_completion_queue(completion_queue.completion_queue)
+ self.server_queue = completion_queue
+
+ def add_http2_port(self, addr, creds=None):
+ if creds is None:
+ return self.server.add_http2_port(addr)
+ else:
+ return self.server.add_http2_port(addr, creds)
+
+ def start(self):
+ return self.server.start()
+
+ def shutdown(self, tag=None):
+ return self.server.shutdown(self.server_queue.completion_queue, tag)
+
+ def request_call(self, completion_queue, tag):
+ return self.server.request_call(completion_queue.completion_queue,
+ self.server_queue.completion_queue, tag)
+
+ def cancel_all_calls(self):
+ return self.server.cancel_all_calls()

Powered by Google App Engine
This is Rietveld 408576698