Index: third_party/grpc/src/python/grpcio/grpc/framework/face/_calls.py |
diff --git a/third_party/grpc/src/python/grpcio/grpc/framework/face/_calls.py b/third_party/grpc/src/python/grpcio/grpc/framework/face/_calls.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..87edeb0f0e511942ee2b679393215c5f67569730 |
--- /dev/null |
+++ b/third_party/grpc/src/python/grpcio/grpc/framework/face/_calls.py |
@@ -0,0 +1,422 @@ |
+# Copyright 2015, Google Inc. |
+# All rights reserved. |
+# |
+# Redistribution and use in source and binary forms, with or without |
+# modification, are permitted provided that the following conditions are |
+# met: |
+# |
+# * Redistributions of source code must retain the above copyright |
+# notice, this list of conditions and the following disclaimer. |
+# * Redistributions in binary form must reproduce the above |
+# copyright notice, this list of conditions and the following disclaimer |
+# in the documentation and/or other materials provided with the |
+# distribution. |
+# * Neither the name of Google Inc. nor the names of its |
+# contributors may be used to endorse or promote products derived from |
+# this software without specific prior written permission. |
+# |
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
+ |
+"""Utility functions for invoking RPCs.""" |
+ |
+import sys |
+import threading |
+ |
+from grpc.framework.base import interfaces as base_interfaces |
+from grpc.framework.base import util as base_util |
+from grpc.framework.face import _control |
+from grpc.framework.face import interfaces |
+from grpc.framework.foundation import callable_util |
+from grpc.framework.foundation import future |
+ |
+_ITERATOR_EXCEPTION_LOG_MESSAGE = 'Exception iterating over requests!' |
+_DONE_CALLBACK_LOG_MESSAGE = 'Exception calling Future "done" callback!' |
+ |
+ |
+class _RendezvousServicedIngestor(base_interfaces.ServicedIngestor): |
+ |
+ def __init__(self, rendezvous): |
+ self._rendezvous = rendezvous |
+ |
+ def consumer(self, operation_context): |
+ return self._rendezvous |
+ |
+ |
+class _EventServicedIngestor(base_interfaces.ServicedIngestor): |
+ |
+ def __init__(self, result_consumer, abortion_callback): |
+ self._result_consumer = result_consumer |
+ self._abortion_callback = abortion_callback |
+ |
+ def consumer(self, operation_context): |
+ operation_context.add_termination_callback( |
+ _control.as_operation_termination_callback(self._abortion_callback)) |
+ return self._result_consumer |
+ |
+ |
+def _rendezvous_subscription(rendezvous): |
+ return base_util.full_serviced_subscription( |
+ _RendezvousServicedIngestor(rendezvous)) |
+ |
+ |
+def _unary_event_subscription(completion_callback, abortion_callback): |
+ return base_util.full_serviced_subscription( |
+ _EventServicedIngestor( |
+ _control.UnaryConsumer(completion_callback), abortion_callback)) |
+ |
+ |
+def _stream_event_subscription(result_consumer, abortion_callback): |
+ return base_util.full_serviced_subscription( |
+ _EventServicedIngestor(result_consumer, abortion_callback)) |
+ |
+ |
+# NOTE(nathaniel): This class has some extremely special semantics around |
+# cancellation that allow it to be used by both "blocking" APIs and "futures" |
+# APIs. |
+# |
+# Since futures.Future defines its own exception for cancellation, we want these |
+# objects, when returned by methods of a returning-Futures-from-other-methods |
+# object, to raise the same exception for cancellation. But that's weird in a |
+# blocking API - why should this object, also returned by methods of blocking |
+# APIs, raise exceptions from the "future" module? Should we do something like |
+# have this class be parameterized by the type of exception that it raises in |
+# cancellation circumstances? |
+# |
+# We don't have to take such a dramatic step: since blocking APIs define no |
+# cancellation semantics whatsoever, there is no supported way for |
+# blocking-API-users of these objects to cancel RPCs, and thus no supported way |
+# for them to see an exception the type of which would be weird to them. |
+# |
+# Bonus: in both blocking and futures APIs, this object still properly raises |
+# exceptions.CancellationError for any *server-side cancellation* of an RPC. |
+class _OperationCancellableIterator(interfaces.CancellableIterator): |
+ """An interfaces.CancellableIterator for response-streaming operations.""" |
+ |
+ def __init__(self, rendezvous, operation): |
+ self._lock = threading.Lock() |
+ self._rendezvous = rendezvous |
+ self._operation = operation |
+ self._cancelled = False |
+ |
+ def __iter__(self): |
+ return self |
+ |
+ def next(self): |
+ with self._lock: |
+ if self._cancelled: |
+ raise future.CancelledError() |
+ return next(self._rendezvous) |
+ |
+ def cancel(self): |
+ with self._lock: |
+ self._cancelled = True |
+ self._operation.cancel() |
+ self._rendezvous.set_outcome(base_interfaces.Outcome.CANCELLED) |
+ |
+ |
+class _OperationFuture(future.Future): |
+ """A future.Future interface to an operation.""" |
+ |
+ def __init__(self, rendezvous, operation): |
+ self._condition = threading.Condition() |
+ self._rendezvous = rendezvous |
+ self._operation = operation |
+ |
+ self._cancelled = False |
+ self._computed = False |
+ self._payload = None |
+ self._exception = None |
+ self._traceback = None |
+ self._callbacks = [] |
+ |
+ def cancel(self): |
+ """See future.Future.cancel for specification.""" |
+ with self._condition: |
+ if not self._cancelled and not self._computed: |
+ self._operation.cancel() |
+ self._cancelled = True |
+ self._condition.notify_all() |
+ return False |
+ |
+ def cancelled(self): |
+ """See future.Future.cancelled for specification.""" |
+ with self._condition: |
+ return self._cancelled |
+ |
+ def running(self): |
+ """See future.Future.running for specification.""" |
+ with self._condition: |
+ return not self._cancelled and not self._computed |
+ |
+ def done(self): |
+ """See future.Future.done for specification.""" |
+ with self._condition: |
+ return self._cancelled or self._computed |
+ |
+ def result(self, timeout=None): |
+ """See future.Future.result for specification.""" |
+ with self._condition: |
+ if self._cancelled: |
+ raise future.CancelledError() |
+ if self._computed: |
+ if self._payload is None: |
+ raise self._exception # pylint: disable=raising-bad-type |
+ else: |
+ return self._payload |
+ |
+ condition = threading.Condition() |
+ def notify_condition(unused_future): |
+ with condition: |
+ condition.notify() |
+ self._callbacks.append(notify_condition) |
+ |
+ with condition: |
+ condition.wait(timeout=timeout) |
+ |
+ with self._condition: |
+ if self._cancelled: |
+ raise future.CancelledError() |
+ elif self._computed: |
+ if self._payload is None: |
+ raise self._exception # pylint: disable=raising-bad-type |
+ else: |
+ return self._payload |
+ else: |
+ raise future.TimeoutError() |
+ |
+ def exception(self, timeout=None): |
+ """See future.Future.exception for specification.""" |
+ with self._condition: |
+ if self._cancelled: |
+ raise future.CancelledError() |
+ if self._computed: |
+ return self._exception |
+ |
+ condition = threading.Condition() |
+ def notify_condition(unused_future): |
+ with condition: |
+ condition.notify() |
+ self._callbacks.append(notify_condition) |
+ |
+ with condition: |
+ condition.wait(timeout=timeout) |
+ |
+ with self._condition: |
+ if self._cancelled: |
+ raise future.CancelledError() |
+ elif self._computed: |
+ return self._exception |
+ else: |
+ raise future.TimeoutError() |
+ |
+ def traceback(self, timeout=None): |
+ """See future.Future.traceback for specification.""" |
+ with self._condition: |
+ if self._cancelled: |
+ raise future.CancelledError() |
+ if self._computed: |
+ return self._traceback |
+ |
+ condition = threading.Condition() |
+ def notify_condition(unused_future): |
+ with condition: |
+ condition.notify() |
+ self._callbacks.append(notify_condition) |
+ |
+ with condition: |
+ condition.wait(timeout=timeout) |
+ |
+ with self._condition: |
+ if self._cancelled: |
+ raise future.CancelledError() |
+ elif self._computed: |
+ return self._traceback |
+ else: |
+ raise future.TimeoutError() |
+ |
+ def add_done_callback(self, fn): |
+ """See future.Future.add_done_callback for specification.""" |
+ with self._condition: |
+ if self._callbacks is not None: |
+ self._callbacks.append(fn) |
+ return |
+ |
+ callable_util.call_logging_exceptions(fn, _DONE_CALLBACK_LOG_MESSAGE, self) |
+ |
+ def on_operation_termination(self, operation_outcome): |
+ """Indicates to this object that the operation has terminated. |
+ |
+ Args: |
+ operation_outcome: A base_interfaces.Outcome value indicating the |
+ outcome of the operation. |
+ """ |
+ with self._condition: |
+ cancelled = self._cancelled |
+ if cancelled: |
+ callbacks = list(self._callbacks) |
+ self._callbacks = None |
+ else: |
+ rendezvous = self._rendezvous |
+ |
+ if not cancelled: |
+ payload = None |
+ exception = None |
+ traceback = None |
+ if operation_outcome == base_interfaces.Outcome.COMPLETED: |
+ try: |
+ payload = next(rendezvous) |
+ except Exception as e: # pylint: disable=broad-except |
+ exception = e |
+ traceback = sys.exc_info()[2] |
+ else: |
+ try: |
+ # We raise and then immediately catch in order to create a traceback. |
+ raise _control.abortion_outcome_to_exception(operation_outcome) |
+ except Exception as e: # pylint: disable=broad-except |
+ exception = e |
+ traceback = sys.exc_info()[2] |
+ with self._condition: |
+ if not self._cancelled: |
+ self._computed = True |
+ self._payload = payload |
+ self._exception = exception |
+ self._traceback = traceback |
+ callbacks = list(self._callbacks) |
+ self._callbacks = None |
+ |
+ for callback in callbacks: |
+ callable_util.call_logging_exceptions( |
+ callback, _DONE_CALLBACK_LOG_MESSAGE, self) |
+ |
+ |
+class _Call(interfaces.Call): |
+ |
+ def __init__(self, operation): |
+ self._operation = operation |
+ self.context = _control.RpcContext(operation.context) |
+ |
+ def cancel(self): |
+ self._operation.cancel() |
+ |
+ |
+def blocking_value_in_value_out(front, name, payload, timeout, trace_id): |
+ """Services in a blocking fashion a value-in value-out servicer method.""" |
+ rendezvous = _control.Rendezvous() |
+ subscription = _rendezvous_subscription(rendezvous) |
+ operation = front.operate( |
+ name, payload, True, timeout, subscription, trace_id) |
+ operation.context.add_termination_callback(rendezvous.set_outcome) |
+ return next(rendezvous) |
+ |
+ |
+def future_value_in_value_out(front, name, payload, timeout, trace_id): |
+ """Services a value-in value-out servicer method by returning a Future.""" |
+ rendezvous = _control.Rendezvous() |
+ subscription = _rendezvous_subscription(rendezvous) |
+ operation = front.operate( |
+ name, payload, True, timeout, subscription, trace_id) |
+ operation.context.add_termination_callback(rendezvous.set_outcome) |
+ operation_future = _OperationFuture(rendezvous, operation) |
+ operation.context.add_termination_callback( |
+ operation_future.on_operation_termination) |
+ return operation_future |
+ |
+ |
+def inline_value_in_stream_out(front, name, payload, timeout, trace_id): |
+ """Services a value-in stream-out servicer method.""" |
+ rendezvous = _control.Rendezvous() |
+ subscription = _rendezvous_subscription(rendezvous) |
+ operation = front.operate( |
+ name, payload, True, timeout, subscription, trace_id) |
+ operation.context.add_termination_callback(rendezvous.set_outcome) |
+ return _OperationCancellableIterator(rendezvous, operation) |
+ |
+ |
+def blocking_stream_in_value_out( |
+ front, name, payload_iterator, timeout, trace_id): |
+ """Services in a blocking fashion a stream-in value-out servicer method.""" |
+ rendezvous = _control.Rendezvous() |
+ subscription = _rendezvous_subscription(rendezvous) |
+ operation = front.operate(name, None, False, timeout, subscription, trace_id) |
+ operation.context.add_termination_callback(rendezvous.set_outcome) |
+ for payload in payload_iterator: |
+ operation.consumer.consume(payload) |
+ operation.consumer.terminate() |
+ return next(rendezvous) |
+ |
+ |
+def future_stream_in_value_out( |
+ front, name, payload_iterator, timeout, trace_id, pool): |
+ """Services a stream-in value-out servicer method by returning a Future.""" |
+ rendezvous = _control.Rendezvous() |
+ subscription = _rendezvous_subscription(rendezvous) |
+ operation = front.operate(name, None, False, timeout, subscription, trace_id) |
+ operation.context.add_termination_callback(rendezvous.set_outcome) |
+ pool.submit( |
+ callable_util.with_exceptions_logged( |
+ _control.pipe_iterator_to_consumer, _ITERATOR_EXCEPTION_LOG_MESSAGE), |
+ payload_iterator, operation.consumer, lambda: True, True) |
+ operation_future = _OperationFuture(rendezvous, operation) |
+ operation.context.add_termination_callback( |
+ operation_future.on_operation_termination) |
+ return operation_future |
+ |
+ |
+def inline_stream_in_stream_out( |
+ front, name, payload_iterator, timeout, trace_id, pool): |
+ """Services a stream-in stream-out servicer method.""" |
+ rendezvous = _control.Rendezvous() |
+ subscription = _rendezvous_subscription(rendezvous) |
+ operation = front.operate(name, None, False, timeout, subscription, trace_id) |
+ operation.context.add_termination_callback(rendezvous.set_outcome) |
+ pool.submit( |
+ callable_util.with_exceptions_logged( |
+ _control.pipe_iterator_to_consumer, _ITERATOR_EXCEPTION_LOG_MESSAGE), |
+ payload_iterator, operation.consumer, lambda: True, True) |
+ return _OperationCancellableIterator(rendezvous, operation) |
+ |
+ |
+def event_value_in_value_out( |
+ front, name, payload, completion_callback, abortion_callback, timeout, |
+ trace_id): |
+ subscription = _unary_event_subscription( |
+ completion_callback, abortion_callback) |
+ operation = front.operate( |
+ name, payload, True, timeout, subscription, trace_id) |
+ return _Call(operation) |
+ |
+ |
+def event_value_in_stream_out( |
+ front, name, payload, result_payload_consumer, abortion_callback, timeout, |
+ trace_id): |
+ subscription = _stream_event_subscription( |
+ result_payload_consumer, abortion_callback) |
+ operation = front.operate( |
+ name, payload, True, timeout, subscription, trace_id) |
+ return _Call(operation) |
+ |
+ |
+def event_stream_in_value_out( |
+ front, name, completion_callback, abortion_callback, timeout, trace_id): |
+ subscription = _unary_event_subscription( |
+ completion_callback, abortion_callback) |
+ operation = front.operate(name, None, False, timeout, subscription, trace_id) |
+ return _Call(operation), operation.consumer |
+ |
+ |
+def event_stream_in_stream_out( |
+ front, name, result_payload_consumer, abortion_callback, timeout, trace_id): |
+ subscription = _stream_event_subscription( |
+ result_payload_consumer, abortion_callback) |
+ operation = front.operate(name, None, False, timeout, subscription, trace_id) |
+ return _Call(operation), operation.consumer |