Index: client/utils/grpc_proxy.py |
diff --git a/client/utils/grpc_proxy.py b/client/utils/grpc_proxy.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..0215df530426d5a08c25cb7f1d924f3d6adc8cd9 |
--- /dev/null |
+++ b/client/utils/grpc_proxy.py |
@@ -0,0 +1,268 @@ |
+# Copyright 2017 The LUCI Authors. All rights reserved. |
+# Use of this source code is governed under the Apache License, Version 2.0 |
+# that can be found in the LICENSE file. |
+ |
+"""Common gRPC implementation for Swarming and Isolate""" |
+ |
+import logging |
+import os |
+import re |
+import time |
+import urlparse |
+from utils import net |
+ |
+# gRPC may not be installed on the worker machine. This is fine, as long as |
+# the bot doesn't attempt to use gRPC (checked in IsolateServerGrpc.__init__). |
+# Full external requirements are: grpcio, certifi. |
+try: |
+ import grpc |
+ from google import auth as google_auth |
+ from google.auth.transport import grpc as google_auth_transport_grpc |
+ from google.auth.transport import requests as google_auth_transport_requests |
+except ImportError as err: |
+ grpc = None |
+ |
+ |
+# If gRPC was successfully imported, try to import certifi as well. This is not |
+# actually used anywhere in this module, but if certifi is missing, |
+# google.auth.transport will fail (see |
+# https://stackoverflow.com/questions/24973326). So checking it here allows us |
+# to print out a somewhat-sane error message. |
+certifi = None |
+if grpc is not None: |
+ try: |
+ import certifi |
+ except ImportError: |
+ # Will print out error messages later (ie when we have a logger) |
+ pass |
+ |
+ |
+# How many times to retry a gRPC call |
+MAX_GRPC_ATTEMPTS = 30 |
+ |
+ |
+# Longest time to sleep between gRPC calls |
+MAX_GRPC_SLEEP = 10. |
+ |
+ |
+# Start the timeout at three minutes. |
+GRPC_TIMEOUT_SEC = 3 * 60 |
+ |
+ |
+def available(): |
+ """Returns true if gRPC can be used on this host.""" |
+ return grpc != None |
+ |
+ |
+class Proxy(object): |
+ """Represents a gRPC proxy. |
+ |
+ If the proxy begins with 'https', the returned channel will be secure and |
+ authorized using default application credentials - see |
+ https://developers.google.com/identity/protocols/application-default-credentials. |
+ Currently, we're using Cloud Container Builder scopes for testing; this may |
+ change in the future to allow different scopes to be passed in for different |
+ channels. |
+ |
+ To use the returned channel to call methods directly, say: |
+ |
+ proxy = grpc_proxy.Proxy('https://grpc.luci.org/resource/prefix', |
+ myapi_pb2.MyApiStub) |
+ |
+ To make a unary call with retries (recommended): |
+ |
+ proto_output = proxy.call_unary('MyMethod', proto_input) |
+ |
+ To make a unary call without retries, or to pass in a client side stream |
+ (proto_input can be an iterator here): |
+ |
+ proto_output = proxy.call_no_retries('MyMethod', proto_input) |
+ |
+ You can also call the stub directly (not recommended, since no errors will be |
+ caught or logged): |
+ |
+ proto_output = proxy.stub.MyMethod(proto_input) |
+ |
+ To make a call to a server-side streaming call (these are not retried): |
+ |
+ for response in proxy.get_stream('MyStreaminingMethod', proto_input): |
+ <process response> |
+ |
+ To retrieve the prefix: |
+ |
+ prefix = proxy.prefix # returns "prefix/for/resource/names" |
+ |
+ All exceptions are logged using "logging.warning." |
+ """ |
+ |
+ def __init__(self, proxy, stub_class): |
+ self._verbose = os.environ.get('LUCI_GRPC_PROXY_VERBOSE') |
+ if self._verbose: |
+ logging.info('Enabled verbose mode for %s with stub %s', |
+ proxy, stub_class.__name__) |
+ # NB: everything in url is unicode; convert to strings where |
+ # needed. |
+ url = urlparse.urlparse(proxy) |
+ if self._verbose: |
+ logging.info('Parsed URL for proxy is %r', url) |
+ if url.scheme == 'http': |
+ self._secure = False |
+ elif url.scheme == 'https': |
+ self._secure = True |
+ else: |
+ raise ValueError('gRPC proxy %s must use http[s], not %s' % ( |
+ proxy, url.scheme)) |
+ if url.netloc == '': |
+ raise ValueError('gRPC proxy is missing hostname: %s' % proxy) |
+ self._host = url.netloc |
+ self._prefix = url.path |
+ if self._prefix.endswith('/'): |
+ self._prefix = self._prefix[:-1] |
+ if self._prefix.startswith('/'): |
+ self._prefix = self._prefix[1:] |
+ if url.params != '' or url.fragment != '': |
+ raise ValueError('gRPC proxy may not contain params or fragments: %s' % |
+ proxy) |
+ self._debug_info = ['full proxy name: ' + proxy] |
+ self._channel = self._create_channel() |
+ self._stub = stub_class(self._channel) |
+ logging.info('%s: initialized', self.name) |
+ if self._verbose: |
+ self._dump_proxy_info() |
+ |
+ @property |
+ def prefix(self): |
+ return self._prefix |
+ |
+ @property |
+ def channel(self): |
+ return self._channel |
+ |
+ @property |
+ def stub(self): |
+ return self._stub |
+ |
+ @property |
+ def name(self): |
+ security = 'insecure' |
+ if self._secure: |
+ security = 'secure' |
+ return 'gRPC %s proxy %s/%s' % ( |
+ security, self._host, self._stub.__class__.__name__) |
+ |
+ def call_unary(self, name, request): |
+ """Calls a method, waiting if the service is not available. |
+ |
+ Usage: proto_output = proxy.call_unary('MyMethod', proto_input) |
+ """ |
+ for attempt in range(1, MAX_GRPC_ATTEMPTS+1): |
+ try: |
+ return self.call_no_retries(name, request) |
+ except grpc.RpcError as g: |
+ if g.code() is not grpc.StatusCode.UNAVAILABLE: |
+ raise |
+ logging.warning('%s: call_grpc - proxy is unavailable (attempt %d/%d)', |
+ self.name, attempt, MAX_GRPC_ATTEMPTS) |
+ # Save the error in case we need to return it |
+ grpc_error = g |
+ time.sleep(net.calculate_sleep_before_retry(attempt, MAX_GRPC_SLEEP)) |
+ # If we get here, it must be because we got (and saved) an error |
+ assert grpc_error is not None |
+ raise grpc_error |
+ |
+ def get_stream(self, name, request): |
+ """Calls a server-side streaming method, returning an iterator. |
+ |
+ Usage: for resp in proxy.get_stream('MyMethod', proto_input'): |
+ """ |
+ stream = self.call_no_retries(name, request) |
+ while True: |
+ # The lambda "next(stream, 1)" will return a protobuf on success, or the |
+ # integer 1 if the stream has ended. This allows us to avoid attempting |
+ # to catch StopIteration, which gets logged by _wrap_grpc_operation. |
+ response = self._wrap_grpc_operation(name + ' pull from stream', |
+ lambda: next(stream, 1)) |
+ if isinstance(response, int): |
+ # Iteration is finished |
+ return |
+ yield response |
+ |
+ def call_no_retries(self, name, request): |
+ """Calls a method without any retries. |
+ |
+ Recommended for client-side streaming or nonidempotent unary calls. |
+ """ |
+ method = getattr(self._stub, name) |
+ if method is None: |
+ raise NameError('%s: "%s" is not a valid method name', self.name, name) |
+ return self._wrap_grpc_operation( |
+ name, lambda: method(request, timeout=GRPC_TIMEOUT_SEC)) |
+ |
+ def _wrap_grpc_operation(self, name, fn): |
+ """Wraps a gRPC operation (call or iterator increment) for logging.""" |
+ if self._verbose: |
+ logging.info('%s/%s - starting gRPC operation', self.name, name) |
+ try: |
+ return fn() |
+ except grpc.RpcError as g: |
+ logging.warning('\n\nFailure in %s/%s: gRPC error %s', self.name, name, g) |
+ self._dump_proxy_info() |
+ raise g |
+ except Exception as e: |
+ logging.warning('\n\nFailure in %s/%s: exception %s', self.name, name, e) |
+ self._dump_proxy_info() |
+ raise e |
+ |
+ def _dump_proxy_info(self): |
+ logging.warning('DETAILED PROXY INFO') |
+ logging.warning('prefix = %s', self.prefix) |
+ logging.warning('debug info:\n\t%s\n\n', |
+ '\n\t'.join(self._debug_info)) |
+ |
+ def _create_channel(self): |
+ # Make sure grpc was successfully imported |
+ assert available() |
+ |
+ if not self._secure: |
+ return grpc.insecure_channel(self._host) |
+ |
+ # Authenticate the host. |
+ # |
+ # You're allowed to override the root certs and server if necessary. For |
+ # example, if you're running your proxy on localhost, you'll need to set |
+ # GRPC_PROXY_TLS_ROOTS to the "roots.crt" file specifying the certificate |
+ # for the root CA that your localhost server has used to certify itself, and |
+ # the GRPC_PROXY_TLS_OVERRIDE to the name that your server is using to |
+ # identify itself. For example, the ROOTS env var might be |
+ # "/path/to/roots.crt" while the OVERRIDE env var might be "test_server," if |
+ # this is what's used by the server you're running. |
+ # |
+ # If you're connecting to a real server with real SSL, none of this should |
+ # be used. |
+ if not certifi: |
+ self._debug_info.append('CERTIFI IS NOT PRESENT;' + |
+ ' gRPC HTTPS CONNECTIONS MAY FAIL') |
+ root_certs = None |
+ roots = os.environ.get('LUCI_GRPC_PROXY_TLS_ROOTS') |
+ if roots: |
+ self._debug_info.append('Overridden root CA: %s' % roots) |
+ with open(roots) as f: |
+ root_certs = f.read() |
+ else: |
+ self._debug_info.append('Using default root CAs from certifi') |
+ overd = os.environ.get('LUCI_GRPC_PROXY_TLS_OVERRIDE') |
+ options = () |
+ if overd: |
+ options=(('grpc.ssl_target_name_override', overd),) |
+ ssl_creds = grpc.ssl_channel_credentials(root_certificates=root_certs) |
+ |
+ # Authenticate the user. |
+ scopes = ('https://www.googleapis.com/auth/cloud-source-tools',) |
+ self._debug_info.append('Scopes are: %r' % scopes) |
+ user_creds, _ = google_auth.default(scopes=scopes) |
+ |
+ # Create the channel. |
+ request = google_auth_transport_requests.Request() |
+ self._debug_info.append('Options are: %r' % options) |
+ return google_auth_transport_grpc.secure_authorized_channel( |
+ user_creds, request, self._host, ssl_creds, options=options) |