| Index: third_party/grpc/src/python/grpcio/grpc/_adapter/_low.py
|
| diff --git a/third_party/grpc/src/python/grpcio/grpc/_adapter/_low.py b/third_party/grpc/src/python/grpcio/grpc/_adapter/_low.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..62fd52ab4013921144722f11e21c8ecc039afcd4
|
| --- /dev/null
|
| +++ b/third_party/grpc/src/python/grpcio/grpc/_adapter/_low.py
|
| @@ -0,0 +1,295 @@
|
| +# Copyright 2015-2016, Google Inc.
|
| +# All rights reserved.
|
| +#
|
| +# Redistribution and use in source and binary forms, with or without
|
| +# modification, are permitted provided that the following conditions are
|
| +# met:
|
| +#
|
| +# * Redistributions of source code must retain the above copyright
|
| +# notice, this list of conditions and the following disclaimer.
|
| +# * Redistributions in binary form must reproduce the above
|
| +# copyright notice, this list of conditions and the following disclaimer
|
| +# in the documentation and/or other materials provided with the
|
| +# distribution.
|
| +# * Neither the name of Google Inc. nor the names of its
|
| +# contributors may be used to endorse or promote products derived from
|
| +# this software without specific prior written permission.
|
| +#
|
| +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
| +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
| +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
| +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
| +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
| +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
| +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
| +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
| +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
| +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
| +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
| +
|
| +import threading
|
| +
|
| +from grpc import _grpcio_metadata
|
| +from grpc._cython import cygrpc
|
| +from grpc._adapter import _implementations
|
| +from grpc._adapter import _types
|
| +
|
| +_USER_AGENT = 'Python-gRPC-{}'.format(_grpcio_metadata.__version__)
|
| +
|
| +ChannelCredentials = cygrpc.ChannelCredentials
|
| +CallCredentials = cygrpc.CallCredentials
|
| +ServerCredentials = cygrpc.ServerCredentials
|
| +
|
| +channel_credentials_composite = cygrpc.channel_credentials_composite
|
| +call_credentials_composite = cygrpc.call_credentials_composite
|
| +
|
| +def server_credentials_ssl(root_credentials, pair_sequence, force_client_auth):
|
| + return cygrpc.server_credentials_ssl(
|
| + root_credentials,
|
| + [cygrpc.SslPemKeyCertPair(key, pem) for key, pem in pair_sequence],
|
| + force_client_auth)
|
| +
|
| +def channel_credentials_ssl(
|
| + root_certificates, private_key, certificate_chain):
|
| + pair = None
|
| + if private_key is not None or certificate_chain is not None:
|
| + pair = cygrpc.SslPemKeyCertPair(private_key, certificate_chain)
|
| + return cygrpc.channel_credentials_ssl(root_certificates, pair)
|
| +
|
| +
|
| +class _WrappedCygrpcCallback(object):
|
| +
|
| + def __init__(self, cygrpc_callback):
|
| + self.is_called = False
|
| + self.error = None
|
| + self.is_called_lock = threading.Lock()
|
| + self.cygrpc_callback = cygrpc_callback
|
| +
|
| + def _invoke_failure(self, error):
|
| + # TODO(atash) translate different Exception superclasses into different
|
| + # status codes.
|
| + self.cygrpc_callback(
|
| + cygrpc.Metadata([]), cygrpc.StatusCode.internal, error.message)
|
| +
|
| + def _invoke_success(self, metadata):
|
| + try:
|
| + cygrpc_metadata = cygrpc.Metadata(
|
| + cygrpc.Metadatum(key, value)
|
| + for key, value in metadata)
|
| + except Exception as error:
|
| + self._invoke_failure(error)
|
| + return
|
| + self.cygrpc_callback(cygrpc_metadata, cygrpc.StatusCode.ok, '')
|
| +
|
| + def __call__(self, metadata, error):
|
| + with self.is_called_lock:
|
| + if self.is_called:
|
| + raise RuntimeError('callback should only ever be invoked once')
|
| + if self.error:
|
| + self._invoke_failure(self.error)
|
| + return
|
| + self.is_called = True
|
| + if error is None:
|
| + self._invoke_success(metadata)
|
| + else:
|
| + self._invoke_failure(error)
|
| +
|
| + def notify_failure(self, error):
|
| + with self.is_called_lock:
|
| + if not self.is_called:
|
| + self.error = error
|
| +
|
| +
|
| +class _WrappedPlugin(object):
|
| +
|
| + def __init__(self, plugin):
|
| + self.plugin = plugin
|
| +
|
| + def __call__(self, context, cygrpc_callback):
|
| + wrapped_cygrpc_callback = _WrappedCygrpcCallback(cygrpc_callback)
|
| + wrapped_context = _implementations.AuthMetadataContext(context.service_url,
|
| + context.method_name)
|
| + try:
|
| + self.plugin(
|
| + wrapped_context,
|
| + _implementations.AuthMetadataPluginCallback(wrapped_cygrpc_callback))
|
| + except Exception as error:
|
| + wrapped_cygrpc_callback.notify_failure(error)
|
| + raise
|
| +
|
| +
|
| +def call_credentials_metadata_plugin(plugin, name):
|
| + """
|
| + Args:
|
| + plugin: A callable accepting a _types.AuthMetadataContext
|
| + object and a callback (itself accepting a list of metadata key/value
|
| + 2-tuples and a None-able exception value). The callback must be eventually
|
| + called, but need not be called in plugin's invocation.
|
| + plugin's invocation must be non-blocking.
|
| + """
|
| + return cygrpc.call_credentials_metadata_plugin(
|
| + cygrpc.CredentialsMetadataPlugin(_WrappedPlugin(plugin), name))
|
| +
|
| +
|
| +class CompletionQueue(_types.CompletionQueue):
|
| +
|
| + def __init__(self):
|
| + self.completion_queue = cygrpc.CompletionQueue()
|
| +
|
| + def next(self, deadline=float('+inf')):
|
| + raw_event = self.completion_queue.poll(cygrpc.Timespec(deadline))
|
| + if raw_event.type == cygrpc.CompletionType.queue_timeout:
|
| + return None
|
| + event_type = raw_event.type
|
| + event_tag = raw_event.tag
|
| + event_call = Call(raw_event.operation_call)
|
| + if raw_event.request_call_details:
|
| + event_call_details = _types.CallDetails(
|
| + raw_event.request_call_details.method,
|
| + raw_event.request_call_details.host,
|
| + float(raw_event.request_call_details.deadline))
|
| + else:
|
| + event_call_details = None
|
| + event_success = raw_event.success
|
| + event_results = []
|
| + if raw_event.is_new_request:
|
| + event_results.append(_types.OpResult(
|
| + _types.OpType.RECV_INITIAL_METADATA, raw_event.request_metadata,
|
| + None, None, None, None))
|
| + else:
|
| + if raw_event.batch_operations:
|
| + for operation in raw_event.batch_operations:
|
| + result_type = operation.type
|
| + result_initial_metadata = operation.received_metadata_or_none
|
| + result_trailing_metadata = operation.received_metadata_or_none
|
| + result_message = operation.received_message_or_none
|
| + if result_message is not None:
|
| + result_message = result_message.bytes()
|
| + result_cancelled = operation.received_cancelled_or_none
|
| + if operation.has_status:
|
| + result_status = _types.Status(
|
| + operation.received_status_code_or_none,
|
| + operation.received_status_details_or_none)
|
| + else:
|
| + result_status = None
|
| + event_results.append(
|
| + _types.OpResult(result_type, result_initial_metadata,
|
| + result_trailing_metadata, result_message,
|
| + result_status, result_cancelled))
|
| + return _types.Event(event_type, event_tag, event_call, event_call_details,
|
| + event_results, event_success)
|
| +
|
| + def shutdown(self):
|
| + self.completion_queue.shutdown()
|
| +
|
| +
|
| +class Call(_types.Call):
|
| +
|
| + def __init__(self, call):
|
| + self.call = call
|
| +
|
| + def start_batch(self, ops, tag):
|
| + translated_ops = []
|
| + for op in ops:
|
| + if op.type == _types.OpType.SEND_INITIAL_METADATA:
|
| + translated_op = cygrpc.operation_send_initial_metadata(
|
| + cygrpc.Metadata(
|
| + cygrpc.Metadatum(key, value)
|
| + for key, value in op.initial_metadata))
|
| + elif op.type == _types.OpType.SEND_MESSAGE:
|
| + translated_op = cygrpc.operation_send_message(op.message)
|
| + elif op.type == _types.OpType.SEND_CLOSE_FROM_CLIENT:
|
| + translated_op = cygrpc.operation_send_close_from_client()
|
| + elif op.type == _types.OpType.SEND_STATUS_FROM_SERVER:
|
| + translated_op = cygrpc.operation_send_status_from_server(
|
| + cygrpc.Metadata(
|
| + cygrpc.Metadatum(key, value)
|
| + for key, value in op.trailing_metadata),
|
| + op.status.code,
|
| + op.status.details)
|
| + elif op.type == _types.OpType.RECV_INITIAL_METADATA:
|
| + translated_op = cygrpc.operation_receive_initial_metadata()
|
| + elif op.type == _types.OpType.RECV_MESSAGE:
|
| + translated_op = cygrpc.operation_receive_message()
|
| + elif op.type == _types.OpType.RECV_STATUS_ON_CLIENT:
|
| + translated_op = cygrpc.operation_receive_status_on_client()
|
| + elif op.type == _types.OpType.RECV_CLOSE_ON_SERVER:
|
| + translated_op = cygrpc.operation_receive_close_on_server()
|
| + else:
|
| + raise ValueError('unexpected operation type {}'.format(op.type))
|
| + translated_ops.append(translated_op)
|
| + return self.call.start_batch(cygrpc.Operations(translated_ops), tag)
|
| +
|
| + def cancel(self, code=None, details=None):
|
| + if code is None and details is None:
|
| + return self.call.cancel()
|
| + else:
|
| + return self.call.cancel(code, details)
|
| +
|
| + def peer(self):
|
| + return self.call.peer()
|
| +
|
| + def set_credentials(self, creds):
|
| + return self.call.set_credentials(creds)
|
| +
|
| +
|
| +class Channel(_types.Channel):
|
| +
|
| + def __init__(self, target, args, creds=None):
|
| + args = list(args) + [
|
| + (cygrpc.ChannelArgKey.primary_user_agent_string, _USER_AGENT)]
|
| + args = cygrpc.ChannelArgs(
|
| + cygrpc.ChannelArg(key, value) for key, value in args)
|
| + if creds is None:
|
| + self.channel = cygrpc.Channel(target, args)
|
| + else:
|
| + self.channel = cygrpc.Channel(target, args, creds)
|
| +
|
| + def create_call(self, completion_queue, method, host, deadline=None):
|
| + internal_call = self.channel.create_call(
|
| + None, 0, completion_queue.completion_queue, method, host,
|
| + cygrpc.Timespec(deadline))
|
| + return Call(internal_call)
|
| +
|
| + def check_connectivity_state(self, try_to_connect):
|
| + return self.channel.check_connectivity_state(try_to_connect)
|
| +
|
| + def watch_connectivity_state(self, last_observed_state, deadline,
|
| + completion_queue, tag):
|
| + self.channel.watch_connectivity_state(
|
| + last_observed_state, cygrpc.Timespec(deadline),
|
| + completion_queue.completion_queue, tag)
|
| +
|
| + def target(self):
|
| + return self.channel.target()
|
| +
|
| +
|
| +_NO_TAG = object()
|
| +
|
| +class Server(_types.Server):
|
| +
|
| + def __init__(self, completion_queue, args):
|
| + args = cygrpc.ChannelArgs(
|
| + cygrpc.ChannelArg(key, value) for key, value in args)
|
| + self.server = cygrpc.Server(args)
|
| + self.server.register_completion_queue(completion_queue.completion_queue)
|
| + self.server_queue = completion_queue
|
| +
|
| + def add_http2_port(self, addr, creds=None):
|
| + if creds is None:
|
| + return self.server.add_http2_port(addr)
|
| + else:
|
| + return self.server.add_http2_port(addr, creds)
|
| +
|
| + def start(self):
|
| + return self.server.start()
|
| +
|
| + def shutdown(self, tag=None):
|
| + return self.server.shutdown(self.server_queue.completion_queue, tag)
|
| +
|
| + def request_call(self, completion_queue, tag):
|
| + return self.server.request_call(completion_queue.completion_queue,
|
| + self.server_queue.completion_queue, tag)
|
| +
|
| + def cancel_all_calls(self):
|
| + return self.server.cancel_all_calls()
|
|
|