Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(629)

Unified Diff: third_party/grpc/src/python/grpcio/grpc/framework/crust/_calls.py

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/grpc/src/python/grpcio/grpc/framework/crust/_calls.py
diff --git a/third_party/grpc/src/python/grpcio/grpc/framework/crust/_calls.py b/third_party/grpc/src/python/grpcio/grpc/framework/crust/_calls.py
new file mode 100644
index 0000000000000000000000000000000000000000..bff940d74710da3a6be3a2c099cf8e661f51f910
--- /dev/null
+++ b/third_party/grpc/src/python/grpcio/grpc/framework/crust/_calls.py
@@ -0,0 +1,223 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Utility functions for invoking RPCs."""
+
+from grpc.framework.crust import _control
+from grpc.framework.interfaces.base import utilities
+from grpc.framework.interfaces.face import face
+
+_ITERATOR_EXCEPTION_LOG_MESSAGE = 'Exception iterating over requests!'
+
+_EMPTY_COMPLETION = utilities.completion(None, None, None)
+
+
+def _invoke(
+ end, group, method, timeout, protocol_options, initial_metadata, payload,
+ complete):
+ rendezvous = _control.Rendezvous(None, None)
+ subscription = utilities.full_subscription(
+ rendezvous, _control.protocol_receiver(rendezvous))
+ operation_context, operator = end.operate(
+ group, method, subscription, timeout, protocol_options=protocol_options,
+ initial_metadata=initial_metadata, payload=payload,
+ completion=_EMPTY_COMPLETION if complete else None)
+ rendezvous.set_operator_and_context(operator, operation_context)
+ outcome = operation_context.add_termination_callback(rendezvous.set_outcome)
+ if outcome is not None:
+ rendezvous.set_outcome(outcome)
+ return rendezvous, operation_context, outcome
+
+
+def _event_return_unary(
+ receiver, abortion_callback, rendezvous, operation_context, outcome, pool):
+ if outcome is None:
+ def in_pool():
+ abortion = rendezvous.add_abortion_callback(abortion_callback)
+ if abortion is None:
+ try:
+ receiver.initial_metadata(rendezvous.initial_metadata())
+ receiver.response(next(rendezvous))
+ receiver.complete(
+ rendezvous.terminal_metadata(), rendezvous.code(),
+ rendezvous.details())
+ except face.AbortionError:
+ pass
+ else:
+ abortion_callback(abortion)
+ pool.submit(_control.pool_wrap(in_pool, operation_context))
+ return rendezvous
+
+
+def _event_return_stream(
+ receiver, abortion_callback, rendezvous, operation_context, outcome, pool):
+ if outcome is None:
+ def in_pool():
+ abortion = rendezvous.add_abortion_callback(abortion_callback)
+ if abortion is None:
+ try:
+ receiver.initial_metadata(rendezvous.initial_metadata())
+ for response in rendezvous:
+ receiver.response(response)
+ receiver.complete(
+ rendezvous.terminal_metadata(), rendezvous.code(),
+ rendezvous.details())
+ except face.AbortionError:
+ pass
+ else:
+ abortion_callback(abortion)
+ pool.submit(_control.pool_wrap(in_pool, operation_context))
+ return rendezvous
+
+
+def blocking_unary_unary(
+ end, group, method, timeout, with_call, protocol_options, initial_metadata,
+ payload):
+ """Services in a blocking fashion a unary-unary servicer method."""
+ rendezvous, unused_operation_context, unused_outcome = _invoke(
+ end, group, method, timeout, protocol_options, initial_metadata, payload,
+ True)
+ if with_call:
+ return next(rendezvous), rendezvous
+ else:
+ return next(rendezvous)
+
+
+def future_unary_unary(
+ end, group, method, timeout, protocol_options, initial_metadata, payload):
+ """Services a value-in value-out servicer method by returning a Future."""
+ rendezvous, unused_operation_context, unused_outcome = _invoke(
+ end, group, method, timeout, protocol_options, initial_metadata, payload,
+ True)
+ return rendezvous
+
+
+def inline_unary_stream(
+ end, group, method, timeout, protocol_options, initial_metadata, payload):
+ """Services a value-in stream-out servicer method."""
+ rendezvous, unused_operation_context, unused_outcome = _invoke(
+ end, group, method, timeout, protocol_options, initial_metadata, payload,
+ True)
+ return rendezvous
+
+
+def blocking_stream_unary(
+ end, group, method, timeout, with_call, protocol_options, initial_metadata,
+ payload_iterator, pool):
+ """Services in a blocking fashion a stream-in value-out servicer method."""
+ rendezvous, operation_context, outcome = _invoke(
+ end, group, method, timeout, protocol_options, initial_metadata, None,
+ False)
+ if outcome is None:
+ def in_pool():
+ for payload in payload_iterator:
+ rendezvous.consume(payload)
+ rendezvous.terminate()
+ pool.submit(_control.pool_wrap(in_pool, operation_context))
+ if with_call:
+ return next(rendezvous), rendezvous
+ else:
+ return next(rendezvous)
+ else:
+ if with_call:
+ return next(rendezvous), rendezvous
+ else:
+ return next(rendezvous)
+
+
+def future_stream_unary(
+ end, group, method, timeout, protocol_options, initial_metadata,
+ payload_iterator, pool):
+ """Services a stream-in value-out servicer method by returning a Future."""
+ rendezvous, operation_context, outcome = _invoke(
+ end, group, method, timeout, protocol_options, initial_metadata, None,
+ False)
+ if outcome is None:
+ def in_pool():
+ for payload in payload_iterator:
+ rendezvous.consume(payload)
+ rendezvous.terminate()
+ pool.submit(_control.pool_wrap(in_pool, operation_context))
+ return rendezvous
+
+
+def inline_stream_stream(
+ end, group, method, timeout, protocol_options, initial_metadata,
+ payload_iterator, pool):
+ """Services a stream-in stream-out servicer method."""
+ rendezvous, operation_context, outcome = _invoke(
+ end, group, method, timeout, protocol_options, initial_metadata, None,
+ False)
+ if outcome is None:
+ def in_pool():
+ for payload in payload_iterator:
+ rendezvous.consume(payload)
+ rendezvous.terminate()
+ pool.submit(_control.pool_wrap(in_pool, operation_context))
+ return rendezvous
+
+
+def event_unary_unary(
+ end, group, method, timeout, protocol_options, initial_metadata, payload,
+ receiver, abortion_callback, pool):
+ rendezvous, operation_context, outcome = _invoke(
+ end, group, method, timeout, protocol_options, initial_metadata, payload,
+ True)
+ return _event_return_unary(
+ receiver, abortion_callback, rendezvous, operation_context, outcome, pool)
+
+
+def event_unary_stream(
+ end, group, method, timeout, protocol_options, initial_metadata, payload,
+ receiver, abortion_callback, pool):
+ rendezvous, operation_context, outcome = _invoke(
+ end, group, method, timeout, protocol_options, initial_metadata, payload,
+ True)
+ return _event_return_stream(
+ receiver, abortion_callback, rendezvous, operation_context, outcome, pool)
+
+
+def event_stream_unary(
+ end, group, method, timeout, protocol_options, initial_metadata, receiver,
+ abortion_callback, pool):
+ rendezvous, operation_context, outcome = _invoke(
+ end, group, method, timeout, protocol_options, initial_metadata, None,
+ False)
+ return _event_return_unary(
+ receiver, abortion_callback, rendezvous, operation_context, outcome, pool)
+
+
+def event_stream_stream(
+ end, group, method, timeout, protocol_options, initial_metadata, receiver,
+ abortion_callback, pool):
+ rendezvous, operation_context, outcome = _invoke(
+ end, group, method, timeout, protocol_options, initial_metadata, None,
+ False)
+ return _event_return_stream(
+ receiver, abortion_callback, rendezvous, operation_context, outcome, pool)

Powered by Google App Engine
This is Rietveld 408576698