Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(208)

Unified Diff: third_party/grpc/src/python/grpcio/grpc/framework/face/_calls.py

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/grpc/src/python/grpcio/grpc/framework/face/_calls.py
diff --git a/third_party/grpc/src/python/grpcio/grpc/framework/face/_calls.py b/third_party/grpc/src/python/grpcio/grpc/framework/face/_calls.py
new file mode 100644
index 0000000000000000000000000000000000000000..87edeb0f0e511942ee2b679393215c5f67569730
--- /dev/null
+++ b/third_party/grpc/src/python/grpcio/grpc/framework/face/_calls.py
@@ -0,0 +1,422 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Utility functions for invoking RPCs."""
+
+import sys
+import threading
+
+from grpc.framework.base import interfaces as base_interfaces
+from grpc.framework.base import util as base_util
+from grpc.framework.face import _control
+from grpc.framework.face import interfaces
+from grpc.framework.foundation import callable_util
+from grpc.framework.foundation import future
+
+_ITERATOR_EXCEPTION_LOG_MESSAGE = 'Exception iterating over requests!'
+_DONE_CALLBACK_LOG_MESSAGE = 'Exception calling Future "done" callback!'
+
+
+class _RendezvousServicedIngestor(base_interfaces.ServicedIngestor):
+
+ def __init__(self, rendezvous):
+ self._rendezvous = rendezvous
+
+ def consumer(self, operation_context):
+ return self._rendezvous
+
+
+class _EventServicedIngestor(base_interfaces.ServicedIngestor):
+
+ def __init__(self, result_consumer, abortion_callback):
+ self._result_consumer = result_consumer
+ self._abortion_callback = abortion_callback
+
+ def consumer(self, operation_context):
+ operation_context.add_termination_callback(
+ _control.as_operation_termination_callback(self._abortion_callback))
+ return self._result_consumer
+
+
+def _rendezvous_subscription(rendezvous):
+ return base_util.full_serviced_subscription(
+ _RendezvousServicedIngestor(rendezvous))
+
+
+def _unary_event_subscription(completion_callback, abortion_callback):
+ return base_util.full_serviced_subscription(
+ _EventServicedIngestor(
+ _control.UnaryConsumer(completion_callback), abortion_callback))
+
+
+def _stream_event_subscription(result_consumer, abortion_callback):
+ return base_util.full_serviced_subscription(
+ _EventServicedIngestor(result_consumer, abortion_callback))
+
+
+# NOTE(nathaniel): This class has some extremely special semantics around
+# cancellation that allow it to be used by both "blocking" APIs and "futures"
+# APIs.
+#
+# Since futures.Future defines its own exception for cancellation, we want these
+# objects, when returned by methods of a returning-Futures-from-other-methods
+# object, to raise the same exception for cancellation. But that's weird in a
+# blocking API - why should this object, also returned by methods of blocking
+# APIs, raise exceptions from the "future" module? Should we do something like
+# have this class be parameterized by the type of exception that it raises in
+# cancellation circumstances?
+#
+# We don't have to take such a dramatic step: since blocking APIs define no
+# cancellation semantics whatsoever, there is no supported way for
+# blocking-API-users of these objects to cancel RPCs, and thus no supported way
+# for them to see an exception the type of which would be weird to them.
+#
+# Bonus: in both blocking and futures APIs, this object still properly raises
+# exceptions.CancellationError for any *server-side cancellation* of an RPC.
+class _OperationCancellableIterator(interfaces.CancellableIterator):
+ """An interfaces.CancellableIterator for response-streaming operations."""
+
+ def __init__(self, rendezvous, operation):
+ self._lock = threading.Lock()
+ self._rendezvous = rendezvous
+ self._operation = operation
+ self._cancelled = False
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ with self._lock:
+ if self._cancelled:
+ raise future.CancelledError()
+ return next(self._rendezvous)
+
+ def cancel(self):
+ with self._lock:
+ self._cancelled = True
+ self._operation.cancel()
+ self._rendezvous.set_outcome(base_interfaces.Outcome.CANCELLED)
+
+
+class _OperationFuture(future.Future):
+ """A future.Future interface to an operation."""
+
+ def __init__(self, rendezvous, operation):
+ self._condition = threading.Condition()
+ self._rendezvous = rendezvous
+ self._operation = operation
+
+ self._cancelled = False
+ self._computed = False
+ self._payload = None
+ self._exception = None
+ self._traceback = None
+ self._callbacks = []
+
+ def cancel(self):
+ """See future.Future.cancel for specification."""
+ with self._condition:
+ if not self._cancelled and not self._computed:
+ self._operation.cancel()
+ self._cancelled = True
+ self._condition.notify_all()
+ return False
+
+ def cancelled(self):
+ """See future.Future.cancelled for specification."""
+ with self._condition:
+ return self._cancelled
+
+ def running(self):
+ """See future.Future.running for specification."""
+ with self._condition:
+ return not self._cancelled and not self._computed
+
+ def done(self):
+ """See future.Future.done for specification."""
+ with self._condition:
+ return self._cancelled or self._computed
+
+ def result(self, timeout=None):
+ """See future.Future.result for specification."""
+ with self._condition:
+ if self._cancelled:
+ raise future.CancelledError()
+ if self._computed:
+ if self._payload is None:
+ raise self._exception # pylint: disable=raising-bad-type
+ else:
+ return self._payload
+
+ condition = threading.Condition()
+ def notify_condition(unused_future):
+ with condition:
+ condition.notify()
+ self._callbacks.append(notify_condition)
+
+ with condition:
+ condition.wait(timeout=timeout)
+
+ with self._condition:
+ if self._cancelled:
+ raise future.CancelledError()
+ elif self._computed:
+ if self._payload is None:
+ raise self._exception # pylint: disable=raising-bad-type
+ else:
+ return self._payload
+ else:
+ raise future.TimeoutError()
+
+ def exception(self, timeout=None):
+ """See future.Future.exception for specification."""
+ with self._condition:
+ if self._cancelled:
+ raise future.CancelledError()
+ if self._computed:
+ return self._exception
+
+ condition = threading.Condition()
+ def notify_condition(unused_future):
+ with condition:
+ condition.notify()
+ self._callbacks.append(notify_condition)
+
+ with condition:
+ condition.wait(timeout=timeout)
+
+ with self._condition:
+ if self._cancelled:
+ raise future.CancelledError()
+ elif self._computed:
+ return self._exception
+ else:
+ raise future.TimeoutError()
+
+ def traceback(self, timeout=None):
+ """See future.Future.traceback for specification."""
+ with self._condition:
+ if self._cancelled:
+ raise future.CancelledError()
+ if self._computed:
+ return self._traceback
+
+ condition = threading.Condition()
+ def notify_condition(unused_future):
+ with condition:
+ condition.notify()
+ self._callbacks.append(notify_condition)
+
+ with condition:
+ condition.wait(timeout=timeout)
+
+ with self._condition:
+ if self._cancelled:
+ raise future.CancelledError()
+ elif self._computed:
+ return self._traceback
+ else:
+ raise future.TimeoutError()
+
+ def add_done_callback(self, fn):
+ """See future.Future.add_done_callback for specification."""
+ with self._condition:
+ if self._callbacks is not None:
+ self._callbacks.append(fn)
+ return
+
+ callable_util.call_logging_exceptions(fn, _DONE_CALLBACK_LOG_MESSAGE, self)
+
+ def on_operation_termination(self, operation_outcome):
+ """Indicates to this object that the operation has terminated.
+
+ Args:
+ operation_outcome: A base_interfaces.Outcome value indicating the
+ outcome of the operation.
+ """
+ with self._condition:
+ cancelled = self._cancelled
+ if cancelled:
+ callbacks = list(self._callbacks)
+ self._callbacks = None
+ else:
+ rendezvous = self._rendezvous
+
+ if not cancelled:
+ payload = None
+ exception = None
+ traceback = None
+ if operation_outcome == base_interfaces.Outcome.COMPLETED:
+ try:
+ payload = next(rendezvous)
+ except Exception as e: # pylint: disable=broad-except
+ exception = e
+ traceback = sys.exc_info()[2]
+ else:
+ try:
+ # We raise and then immediately catch in order to create a traceback.
+ raise _control.abortion_outcome_to_exception(operation_outcome)
+ except Exception as e: # pylint: disable=broad-except
+ exception = e
+ traceback = sys.exc_info()[2]
+ with self._condition:
+ if not self._cancelled:
+ self._computed = True
+ self._payload = payload
+ self._exception = exception
+ self._traceback = traceback
+ callbacks = list(self._callbacks)
+ self._callbacks = None
+
+ for callback in callbacks:
+ callable_util.call_logging_exceptions(
+ callback, _DONE_CALLBACK_LOG_MESSAGE, self)
+
+
+class _Call(interfaces.Call):
+
+ def __init__(self, operation):
+ self._operation = operation
+ self.context = _control.RpcContext(operation.context)
+
+ def cancel(self):
+ self._operation.cancel()
+
+
+def blocking_value_in_value_out(front, name, payload, timeout, trace_id):
+ """Services in a blocking fashion a value-in value-out servicer method."""
+ rendezvous = _control.Rendezvous()
+ subscription = _rendezvous_subscription(rendezvous)
+ operation = front.operate(
+ name, payload, True, timeout, subscription, trace_id)
+ operation.context.add_termination_callback(rendezvous.set_outcome)
+ return next(rendezvous)
+
+
+def future_value_in_value_out(front, name, payload, timeout, trace_id):
+ """Services a value-in value-out servicer method by returning a Future."""
+ rendezvous = _control.Rendezvous()
+ subscription = _rendezvous_subscription(rendezvous)
+ operation = front.operate(
+ name, payload, True, timeout, subscription, trace_id)
+ operation.context.add_termination_callback(rendezvous.set_outcome)
+ operation_future = _OperationFuture(rendezvous, operation)
+ operation.context.add_termination_callback(
+ operation_future.on_operation_termination)
+ return operation_future
+
+
+def inline_value_in_stream_out(front, name, payload, timeout, trace_id):
+ """Services a value-in stream-out servicer method."""
+ rendezvous = _control.Rendezvous()
+ subscription = _rendezvous_subscription(rendezvous)
+ operation = front.operate(
+ name, payload, True, timeout, subscription, trace_id)
+ operation.context.add_termination_callback(rendezvous.set_outcome)
+ return _OperationCancellableIterator(rendezvous, operation)
+
+
+def blocking_stream_in_value_out(
+ front, name, payload_iterator, timeout, trace_id):
+ """Services in a blocking fashion a stream-in value-out servicer method."""
+ rendezvous = _control.Rendezvous()
+ subscription = _rendezvous_subscription(rendezvous)
+ operation = front.operate(name, None, False, timeout, subscription, trace_id)
+ operation.context.add_termination_callback(rendezvous.set_outcome)
+ for payload in payload_iterator:
+ operation.consumer.consume(payload)
+ operation.consumer.terminate()
+ return next(rendezvous)
+
+
+def future_stream_in_value_out(
+ front, name, payload_iterator, timeout, trace_id, pool):
+ """Services a stream-in value-out servicer method by returning a Future."""
+ rendezvous = _control.Rendezvous()
+ subscription = _rendezvous_subscription(rendezvous)
+ operation = front.operate(name, None, False, timeout, subscription, trace_id)
+ operation.context.add_termination_callback(rendezvous.set_outcome)
+ pool.submit(
+ callable_util.with_exceptions_logged(
+ _control.pipe_iterator_to_consumer, _ITERATOR_EXCEPTION_LOG_MESSAGE),
+ payload_iterator, operation.consumer, lambda: True, True)
+ operation_future = _OperationFuture(rendezvous, operation)
+ operation.context.add_termination_callback(
+ operation_future.on_operation_termination)
+ return operation_future
+
+
+def inline_stream_in_stream_out(
+ front, name, payload_iterator, timeout, trace_id, pool):
+ """Services a stream-in stream-out servicer method."""
+ rendezvous = _control.Rendezvous()
+ subscription = _rendezvous_subscription(rendezvous)
+ operation = front.operate(name, None, False, timeout, subscription, trace_id)
+ operation.context.add_termination_callback(rendezvous.set_outcome)
+ pool.submit(
+ callable_util.with_exceptions_logged(
+ _control.pipe_iterator_to_consumer, _ITERATOR_EXCEPTION_LOG_MESSAGE),
+ payload_iterator, operation.consumer, lambda: True, True)
+ return _OperationCancellableIterator(rendezvous, operation)
+
+
+def event_value_in_value_out(
+ front, name, payload, completion_callback, abortion_callback, timeout,
+ trace_id):
+ subscription = _unary_event_subscription(
+ completion_callback, abortion_callback)
+ operation = front.operate(
+ name, payload, True, timeout, subscription, trace_id)
+ return _Call(operation)
+
+
+def event_value_in_stream_out(
+ front, name, payload, result_payload_consumer, abortion_callback, timeout,
+ trace_id):
+ subscription = _stream_event_subscription(
+ result_payload_consumer, abortion_callback)
+ operation = front.operate(
+ name, payload, True, timeout, subscription, trace_id)
+ return _Call(operation)
+
+
+def event_stream_in_value_out(
+ front, name, completion_callback, abortion_callback, timeout, trace_id):
+ subscription = _unary_event_subscription(
+ completion_callback, abortion_callback)
+ operation = front.operate(name, None, False, timeout, subscription, trace_id)
+ return _Call(operation), operation.consumer
+
+
+def event_stream_in_stream_out(
+ front, name, result_payload_consumer, abortion_callback, timeout, trace_id):
+ subscription = _stream_event_subscription(
+ result_payload_consumer, abortion_callback)
+ operation = front.operate(name, None, False, timeout, subscription, trace_id)
+ return _Call(operation), operation.consumer

Powered by Google App Engine
This is Rietveld 408576698