Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(147)

Unified Diff: client/utils/grpc_proxy.py

Issue 2987333002: Refactor all gRPC proxy code into a single class. (Closed)
Patch Set: Refactor all gRPC proxy code into a single class. Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: client/utils/grpc_proxy.py
diff --git a/client/utils/grpc_proxy.py b/client/utils/grpc_proxy.py
new file mode 100644
index 0000000000000000000000000000000000000000..a3eb165f48b6d17b053e5dd2a3f1820717bba842
--- /dev/null
+++ b/client/utils/grpc_proxy.py
@@ -0,0 +1,260 @@
+#!/usr/bin/env python
M-A Ruel 2017/08/02 21:05:11 This file is not executable.
aludwin 2017/08/04 01:44:27 Done.
+# Copyright 2017 The LUCI Authors. All rights reserved.
+# Use of this source code is governed under the Apache License, Version 2.0
+# that can be found in the LICENSE file.
+
+"""Common gRPC implementation for Swarming and Isolate"""
+
+import logging
+import os
+import re
+from utils import net
+
+# gRPC may not be installed on the worker machine. This is fine, as long as
+# the bot doesn't attempt to use gRPC (checked in IsolateServerGrpc.__init__).
+# Full external requirements are: grpcio, certifi.
+try:
+ import grpc
+ from google import auth as google_auth
+ from google.auth.transport import grpc as google_auth_transport_grpc
+ from google.auth.transport import requests as google_auth_transport_requests
+except ImportError as err:
+ grpc = None
+
+
+# If gRPC was successfully imported, try to import certifi as well. This is not
+# actually used anywhere in this module, but if certifi is missing,
+# google.auth.transport (https://stackoverflow.com/questions/24973326). So this
M-A Ruel 2017/08/02 21:05:11 the sentence is incomplete
aludwin 2017/08/04 01:44:26 Done.
+# allows us to print out a somewhat-sane error message.
+certifi = None
+if grpc is not None:
+ try:
+ import certifi
+ except ImportError:
+ # Will print out error messages later (ie when we have a logger)
+ pass
+
+
+# How many times to retry a gRPC call
+MAX_GRPC_ATTEMPTS = 30
+
+
+# Longest time to sleep between gRPC calls
+MAX_GRPC_SLEEP = 10.
+
+
+# Start the timeout at three minutes.
+GRPC_TIMEOUT_SEC = 3 * 60
+
+
+def available():
+ """Returns true if gRPC can be used on this host."""
+ return grpc != None
+
+
+class Proxy(object):
+ """Represents a gRPC proxy.
+
+ If the proxy begins with 'https', the returned channel will be secure and
+ authorized using default application credentials
+ (https://developers.google.com/identity/protocols/application-default-credentials).
M-A Ruel 2017/08/02 21:05:11 () are unnecessary
aludwin 2017/08/04 01:44:26 Done.
+ Currently, we're using Cloud Container Builder scopes for testing; this may
+ change in the future to allow different scopes to be passed in for different
+ channels.
+
+ To use the returned channel to call methods directly, say:
+
+ proxy = grpc_proxy.Proxy('https://grpc.luci.org/resource/prefix',
+ myapi_pb2.MyApiStub)
+
+ To make a unary call with retries (recommended):
+
+ proto_output = proxy.call_unary('MyMethod', proto_input)
+
+ To make a unary call without retries, or to pass in a client side stream
+ (proto_input can be an iterator here):
+
+ proto_output = proxy.call_no_retries('MyMethod', proto_input)
+
+ You can also call the stub directly (not recommended, since no errors will be
+ caught or logged):
+
+ proto_output = proxy.stub.MyMethod(proto_input)
+
+ To make a call to a server-side streaming call (these are not retried):
+
+ for response in proxy.get_stream('MyStreaminingMethod', proto_input):
+ <process response>
+
+ To retrieve the prefix:
+
+ prefix = proxy.prefix # returns "prefix/for/resource/names"
+
+ All exceptions are logged using "logging.warning."
+ """
+
+ def __init__(self, proxy, stub_class):
+ # `proxy` must be of the form: http[s]://<server>[:port][/prefix]
+ m = re.search('^(https?):\/\/([^\/]+)/?(.*)$', proxy)
M-A Ruel 2017/08/02 21:05:11 I prefer to use urlparse + startswith(('http://',
aludwin 2017/08/04 01:44:26 Done.
+ if not m:
+ raise ValueError(('gRPC proxy must have the form: '
+ 'http[s]://<server>[:port][/prefix] '
+ '(given: %s)') % proxy)
+ transport = m.group(1)
+ if transport == 'http':
+ self._secure = False
+ elif transport == 'https':
+ self._secure = True
+ else:
+ raise ValueError('unknown transport %s (should be http[s])' % transport)
+ self._host = m.group(2)
+ self._prefix = m.group(3)
+ self._debug_info = []
+ self._channel = self._create_channel()
+ self._stub = stub_class(self._channel)
+ logging.info('%s: initialized', self.name)
+
+ @property
+ def host(self):
+ return self._host
+
+ @property
+ def prefix(self):
+ return self._prefix
+
+ @property
+ def secure(self):
+ return self._secure
+
+ @property
+ def channel(self):
+ return self._channel
+
+ @property
+ def stub(self):
+ return self._stub
+
+ @property
+ def name(self):
+ security = 'insecure'
+ if self._secure:
+ security = 'secure'
+ return 'gRPC %s proxy %s/%s' % (
+ security, self.host, self._stub.__class__.__name__)
+
+ def call_unary(self, name, request):
+ """Calls a method, waiting if the service is not available.
+
+ Usage: proto_output = proxy.call_unary('MyMethod', proto_input)
+ """
+ grpc_error = None
+ for attempt in range(1, MAX_GRPC_ATTEMPTS+1):
+ try:
+ return self.call_no_retries(name, request)
+ except grpc.RpcError as g:
+ if g.code() is not grpc.StatusCode.UNAVAILABLE:
+ raise
+ logging.warning('%s: call_grpc - proxy is unavailable (attempt %d/%d)',
+ self.name, attempt, MAX_GRPC_ATTEMPTS)
+ # Save the error in case we need to return it
+ grpc_error = g
+ time.sleep(net.calculate_sleep_before_retry(attempt, MAX_GRPC_SLEEP))
+ # If we get here, it must be because we got (and saved) an error
+ assert grpc_error is not None
+ raise grpc_error
+
+ def get_stream(self, name, request):
+ """Calls a server-side streaming method, returning an iterator.
+
+ Usage: for resp in proxy.get_stream('MyMethod', proto_input'):
+ """
+ def stream_proxy(stream, name):
+ while True:
+ # The lambda "next(stream, 1)" will return a protobuf on success, or the
+ # integer 1 if the stream has ended. This allows us to avoid attempting
+ # to catch StopIteration, which gets logged by _wrap_grpc_operation.
+ response = self._wrap_grpc_operation(name + ' pull from stream',
+ lambda: next(stream, 1))
+ if isinstance(response, int):
+ # Iteration is finished
+ return
+ yield response
+ stream = self.call_no_retries(name, request)
+ sp = lambda x: stream_proxy(x, name)
M-A Ruel 2017/08/02 21:05:11 I don't understand why you create a lambda just to
aludwin 2017/08/04 01:44:26 I created a lambda because I forgot how generators
+ return sp(stream)
+
+ def call_no_retries(self, name, request):
+ """Calls a method without any retries.
+
+ Recommended for client-side streaming or nonidempotent unary calls.
+ """
+ method = getattr(self._stub, name)
+ if method is None:
+ raise NameError('%s: "%s" is not a valid method name', self.name, name)
+ return self._wrap_grpc_operation(name, lambda: method(request, timeout=GRPC_TIMEOUT_SEC))
M-A Ruel 2017/08/02 21:05:11 wrap (Everywhere)
aludwin 2017/08/04 01:44:27 Done.
+
+ def _wrap_grpc_operation(self, name, fn):
+ """Wraps a gRPC operation (call or iterator increment) for logging."""
+ try:
+ return fn()
+ except grpc.RpcError as g:
+ logging.warning('%s/%s - gRPC error: %s', self.name, name, g)
+ self._dump_proxy_info()
+ raise g
+ except Exception as e:
+ logging.warning('caught exception %s; none is %s', repr(e), e is None)
+ logging.warning('%s/%s - generic error: %s', self.name, name, e)
+ self._dump_proxy_info()
+ raise e
+
+ def _dump_proxy_info(self):
+ logging.warning('%s: DETAILED PROXY INFO', self.name)
+ logging.warning('%s: prefix = %s', self.name, self.prefix)
+ logging.warning('%s: debug info:\n\t%s', self.name, '\n\t'.join(self._debug_info))
+
+ def _create_channel(self):
+ # Make sure grpc was successfully imported
+ assert available()
+
+ if not self._secure:
+ return grpc.insecure_channel(self._host)
+
+ # Authenticate the host.
+ #
+ # You're allowed to override the root certs and server if necessary. For
+ # example, if you're running your proxy on localhost, you'll need to set
+ # GRPC_PROXY_TLS_ROOTS to the "roots.crt" file specifying the certificate for
+ # the root CA that your localhost server has used to certify itself, and the
+ # GRPC_PROXY_TLS_OVERRIDE to the name that your server is using to identify
+ # itself. For example, the ROOTS env var might be "/path/to/roots.crt" while
+ # the OVERRIDE env var might be "test_server," if this is what's used by the
+ # server you're running.
+ #
+ # If you're connecting to a real server with real SSL, none of this should be
+ # used.
+ if not certifi:
+ self._debug_info.append('CERTIFI IS NOT PRESENT; gRPC HTTPS CONNECTIONS MAY FAIL')
+ root_certs = None
+ roots = os.environ.get('GRPC_PROXY_TLS_ROOTS')
M-A Ruel 2017/08/02 21:05:11 We'd need to document these under doc/
aludwin 2017/08/04 01:44:26 Done.
+ if roots is not None:
+ self._debug_info.append('Overridden root CA: %s' % roots)
+ with open(roots) as f:
+ root_certs = f.read()
+ else:
+ self._debug_info.append('Using default root CAs from certifi')
+ overd = os.environ.get('GRPC_PROXY_TLS_OVERRIDE')
+ options = ()
+ if overd is not None:
+ options=(('grpc.ssl_target_name_override', overd),)
+ ssl_creds = grpc.ssl_channel_credentials(root_certificates=root_certs)
+
+ # Authenticate the user.
+ scopes = ('https://www.googleapis.com/auth/cloud-source-tools',)
+ self._debug_info.append('Scopes are: %s' % repr(scopes))
+ user_creds, _ = google_auth.default(scopes=scopes)
+
+ # Create the channel.
+ request = google_auth_transport_requests.Request()
+ self._debug_info.append('Options are: %s' % repr(options))
+ return google_auth_transport_grpc.secure_authorized_channel(
+ user_creds, request, self._host, ssl_creds, options=options)
« appengine/swarming/swarming_bot/bot_code/remote_client.py ('K') | « client/isolate_storage.py ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698