Chromium Code Reviews| Index: client/utils/grpc_proxy.py |
| diff --git a/client/utils/grpc_proxy.py b/client/utils/grpc_proxy.py |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..a3eb165f48b6d17b053e5dd2a3f1820717bba842 |
| --- /dev/null |
| +++ b/client/utils/grpc_proxy.py |
| @@ -0,0 +1,260 @@ |
| +#!/usr/bin/env python |
|
M-A Ruel
2017/08/02 21:05:11
This file is not executable.
aludwin
2017/08/04 01:44:27
Done.
|
| +# Copyright 2017 The LUCI Authors. All rights reserved. |
| +# Use of this source code is governed under the Apache License, Version 2.0 |
| +# that can be found in the LICENSE file. |
| + |
| +"""Common gRPC implementation for Swarming and Isolate""" |
| + |
| +import logging |
| +import os |
| +import re |
| +from utils import net |
| + |
| +# gRPC may not be installed on the worker machine. This is fine, as long as |
| +# the bot doesn't attempt to use gRPC (checked in IsolateServerGrpc.__init__). |
| +# Full external requirements are: grpcio, certifi. |
| +try: |
| + import grpc |
| + from google import auth as google_auth |
| + from google.auth.transport import grpc as google_auth_transport_grpc |
| + from google.auth.transport import requests as google_auth_transport_requests |
| +except ImportError as err: |
| + grpc = None |
| + |
| + |
| +# If gRPC was successfully imported, try to import certifi as well. This is not |
| +# actually used anywhere in this module, but if certifi is missing, |
| +# google.auth.transport (https://stackoverflow.com/questions/24973326). So this |
|
M-A Ruel
2017/08/02 21:05:11
the sentence is incomplete
aludwin
2017/08/04 01:44:26
Done.
|
| +# allows us to print out a somewhat-sane error message. |
| +certifi = None |
| +if grpc is not None: |
| + try: |
| + import certifi |
| + except ImportError: |
| + # Will print out error messages later (ie when we have a logger) |
| + pass |
| + |
| + |
| +# How many times to retry a gRPC call |
| +MAX_GRPC_ATTEMPTS = 30 |
| + |
| + |
| +# Longest time to sleep between gRPC calls |
| +MAX_GRPC_SLEEP = 10. |
| + |
| + |
| +# Start the timeout at three minutes. |
| +GRPC_TIMEOUT_SEC = 3 * 60 |
| + |
| + |
| +def available(): |
| + """Returns true if gRPC can be used on this host.""" |
| + return grpc != None |
| + |
| + |
| +class Proxy(object): |
| + """Represents a gRPC proxy. |
| + |
| + If the proxy begins with 'https', the returned channel will be secure and |
| + authorized using default application credentials |
| + (https://developers.google.com/identity/protocols/application-default-credentials). |
|
M-A Ruel
2017/08/02 21:05:11
() are unnecessary
aludwin
2017/08/04 01:44:26
Done.
|
| + Currently, we're using Cloud Container Builder scopes for testing; this may |
| + change in the future to allow different scopes to be passed in for different |
| + channels. |
| + |
| + To use the returned channel to call methods directly, say: |
| + |
| + proxy = grpc_proxy.Proxy('https://grpc.luci.org/resource/prefix', |
| + myapi_pb2.MyApiStub) |
| + |
| + To make a unary call with retries (recommended): |
| + |
| + proto_output = proxy.call_unary('MyMethod', proto_input) |
| + |
| + To make a unary call without retries, or to pass in a client side stream |
| + (proto_input can be an iterator here): |
| + |
| + proto_output = proxy.call_no_retries('MyMethod', proto_input) |
| + |
| + You can also call the stub directly (not recommended, since no errors will be |
| + caught or logged): |
| + |
| + proto_output = proxy.stub.MyMethod(proto_input) |
| + |
| + To make a call to a server-side streaming call (these are not retried): |
| + |
| + for response in proxy.get_stream('MyStreaminingMethod', proto_input): |
| + <process response> |
| + |
| + To retrieve the prefix: |
| + |
| + prefix = proxy.prefix # returns "prefix/for/resource/names" |
| + |
| + All exceptions are logged using "logging.warning." |
| + """ |
| + |
| + def __init__(self, proxy, stub_class): |
| + # `proxy` must be of the form: http[s]://<server>[:port][/prefix] |
| + m = re.search('^(https?):\/\/([^\/]+)/?(.*)$', proxy) |
|
M-A Ruel
2017/08/02 21:05:11
I prefer to use urlparse + startswith(('http://',
aludwin
2017/08/04 01:44:26
Done.
|
| + if not m: |
| + raise ValueError(('gRPC proxy must have the form: ' |
| + 'http[s]://<server>[:port][/prefix] ' |
| + '(given: %s)') % proxy) |
| + transport = m.group(1) |
| + if transport == 'http': |
| + self._secure = False |
| + elif transport == 'https': |
| + self._secure = True |
| + else: |
| + raise ValueError('unknown transport %s (should be http[s])' % transport) |
| + self._host = m.group(2) |
| + self._prefix = m.group(3) |
| + self._debug_info = [] |
| + self._channel = self._create_channel() |
| + self._stub = stub_class(self._channel) |
| + logging.info('%s: initialized', self.name) |
| + |
| + @property |
| + def host(self): |
| + return self._host |
| + |
| + @property |
| + def prefix(self): |
| + return self._prefix |
| + |
| + @property |
| + def secure(self): |
| + return self._secure |
| + |
| + @property |
| + def channel(self): |
| + return self._channel |
| + |
| + @property |
| + def stub(self): |
| + return self._stub |
| + |
| + @property |
| + def name(self): |
| + security = 'insecure' |
| + if self._secure: |
| + security = 'secure' |
| + return 'gRPC %s proxy %s/%s' % ( |
| + security, self.host, self._stub.__class__.__name__) |
| + |
| + def call_unary(self, name, request): |
| + """Calls a method, waiting if the service is not available. |
| + |
| + Usage: proto_output = proxy.call_unary('MyMethod', proto_input) |
| + """ |
| + grpc_error = None |
| + for attempt in range(1, MAX_GRPC_ATTEMPTS+1): |
| + try: |
| + return self.call_no_retries(name, request) |
| + except grpc.RpcError as g: |
| + if g.code() is not grpc.StatusCode.UNAVAILABLE: |
| + raise |
| + logging.warning('%s: call_grpc - proxy is unavailable (attempt %d/%d)', |
| + self.name, attempt, MAX_GRPC_ATTEMPTS) |
| + # Save the error in case we need to return it |
| + grpc_error = g |
| + time.sleep(net.calculate_sleep_before_retry(attempt, MAX_GRPC_SLEEP)) |
| + # If we get here, it must be because we got (and saved) an error |
| + assert grpc_error is not None |
| + raise grpc_error |
| + |
| + def get_stream(self, name, request): |
| + """Calls a server-side streaming method, returning an iterator. |
| + |
| + Usage: for resp in proxy.get_stream('MyMethod', proto_input'): |
| + """ |
| + def stream_proxy(stream, name): |
| + while True: |
| + # The lambda "next(stream, 1)" will return a protobuf on success, or the |
| + # integer 1 if the stream has ended. This allows us to avoid attempting |
| + # to catch StopIteration, which gets logged by _wrap_grpc_operation. |
| + response = self._wrap_grpc_operation(name + ' pull from stream', |
| + lambda: next(stream, 1)) |
| + if isinstance(response, int): |
| + # Iteration is finished |
| + return |
| + yield response |
| + stream = self.call_no_retries(name, request) |
| + sp = lambda x: stream_proxy(x, name) |
|
M-A Ruel
2017/08/02 21:05:11
I don't understand why you create a lambda just to
aludwin
2017/08/04 01:44:26
I created a lambda because I forgot how generators
|
| + return sp(stream) |
| + |
| + def call_no_retries(self, name, request): |
| + """Calls a method without any retries. |
| + |
| + Recommended for client-side streaming or nonidempotent unary calls. |
| + """ |
| + method = getattr(self._stub, name) |
| + if method is None: |
| + raise NameError('%s: "%s" is not a valid method name', self.name, name) |
| + return self._wrap_grpc_operation(name, lambda: method(request, timeout=GRPC_TIMEOUT_SEC)) |
|
M-A Ruel
2017/08/02 21:05:11
wrap (Everywhere)
aludwin
2017/08/04 01:44:27
Done.
|
| + |
| + def _wrap_grpc_operation(self, name, fn): |
| + """Wraps a gRPC operation (call or iterator increment) for logging.""" |
| + try: |
| + return fn() |
| + except grpc.RpcError as g: |
| + logging.warning('%s/%s - gRPC error: %s', self.name, name, g) |
| + self._dump_proxy_info() |
| + raise g |
| + except Exception as e: |
| + logging.warning('caught exception %s; none is %s', repr(e), e is None) |
| + logging.warning('%s/%s - generic error: %s', self.name, name, e) |
| + self._dump_proxy_info() |
| + raise e |
| + |
| + def _dump_proxy_info(self): |
| + logging.warning('%s: DETAILED PROXY INFO', self.name) |
| + logging.warning('%s: prefix = %s', self.name, self.prefix) |
| + logging.warning('%s: debug info:\n\t%s', self.name, '\n\t'.join(self._debug_info)) |
| + |
| + def _create_channel(self): |
| + # Make sure grpc was successfully imported |
| + assert available() |
| + |
| + if not self._secure: |
| + return grpc.insecure_channel(self._host) |
| + |
| + # Authenticate the host. |
| + # |
| + # You're allowed to override the root certs and server if necessary. For |
| + # example, if you're running your proxy on localhost, you'll need to set |
| + # GRPC_PROXY_TLS_ROOTS to the "roots.crt" file specifying the certificate for |
| + # the root CA that your localhost server has used to certify itself, and the |
| + # GRPC_PROXY_TLS_OVERRIDE to the name that your server is using to identify |
| + # itself. For example, the ROOTS env var might be "/path/to/roots.crt" while |
| + # the OVERRIDE env var might be "test_server," if this is what's used by the |
| + # server you're running. |
| + # |
| + # If you're connecting to a real server with real SSL, none of this should be |
| + # used. |
| + if not certifi: |
| + self._debug_info.append('CERTIFI IS NOT PRESENT; gRPC HTTPS CONNECTIONS MAY FAIL') |
| + root_certs = None |
| + roots = os.environ.get('GRPC_PROXY_TLS_ROOTS') |
|
M-A Ruel
2017/08/02 21:05:11
We'd need to document these under doc/
aludwin
2017/08/04 01:44:26
Done.
|
| + if roots is not None: |
| + self._debug_info.append('Overridden root CA: %s' % roots) |
| + with open(roots) as f: |
| + root_certs = f.read() |
| + else: |
| + self._debug_info.append('Using default root CAs from certifi') |
| + overd = os.environ.get('GRPC_PROXY_TLS_OVERRIDE') |
| + options = () |
| + if overd is not None: |
| + options=(('grpc.ssl_target_name_override', overd),) |
| + ssl_creds = grpc.ssl_channel_credentials(root_certificates=root_certs) |
| + |
| + # Authenticate the user. |
| + scopes = ('https://www.googleapis.com/auth/cloud-source-tools',) |
| + self._debug_info.append('Scopes are: %s' % repr(scopes)) |
| + user_creds, _ = google_auth.default(scopes=scopes) |
| + |
| + # Create the channel. |
| + request = google_auth_transport_requests.Request() |
| + self._debug_info.append('Options are: %s' % repr(options)) |
| + return google_auth_transport_grpc.secure_authorized_channel( |
| + user_creds, request, self._host, ssl_creds, options=options) |