| Index: client/utils/grpc_proxy.py
|
| diff --git a/client/utils/grpc_proxy.py b/client/utils/grpc_proxy.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..0215df530426d5a08c25cb7f1d924f3d6adc8cd9
|
| --- /dev/null
|
| +++ b/client/utils/grpc_proxy.py
|
| @@ -0,0 +1,268 @@
|
| +# Copyright 2017 The LUCI Authors. All rights reserved.
|
| +# Use of this source code is governed under the Apache License, Version 2.0
|
| +# that can be found in the LICENSE file.
|
| +
|
| +"""Common gRPC implementation for Swarming and Isolate"""
|
| +
|
| +import logging
|
| +import os
|
| +import re
|
| +import time
|
| +import urlparse
|
| +from utils import net
|
| +
|
| +# gRPC may not be installed on the worker machine. This is fine, as long as
|
| +# the bot doesn't attempt to use gRPC (checked in IsolateServerGrpc.__init__).
|
| +# Full external requirements are: grpcio, certifi.
|
| +try:
|
| + import grpc
|
| + from google import auth as google_auth
|
| + from google.auth.transport import grpc as google_auth_transport_grpc
|
| + from google.auth.transport import requests as google_auth_transport_requests
|
| +except ImportError as err:
|
| + grpc = None
|
| +
|
| +
|
| +# If gRPC was successfully imported, try to import certifi as well. This is not
|
| +# actually used anywhere in this module, but if certifi is missing,
|
| +# google.auth.transport will fail (see
|
| +# https://stackoverflow.com/questions/24973326). So checking it here allows us
|
| +# to print out a somewhat-sane error message.
|
| +certifi = None
|
| +if grpc is not None:
|
| + try:
|
| + import certifi
|
| + except ImportError:
|
| + # Will print out error messages later (ie when we have a logger)
|
| + pass
|
| +
|
| +
|
| +# How many times to retry a gRPC call
|
| +MAX_GRPC_ATTEMPTS = 30
|
| +
|
| +
|
| +# Longest time to sleep between gRPC calls
|
| +MAX_GRPC_SLEEP = 10.
|
| +
|
| +
|
| +# Start the timeout at three minutes.
|
| +GRPC_TIMEOUT_SEC = 3 * 60
|
| +
|
| +
|
| +def available():
|
| + """Returns true if gRPC can be used on this host."""
|
| + return grpc != None
|
| +
|
| +
|
| +class Proxy(object):
|
| + """Represents a gRPC proxy.
|
| +
|
| + If the proxy begins with 'https', the returned channel will be secure and
|
| + authorized using default application credentials - see
|
| + https://developers.google.com/identity/protocols/application-default-credentials.
|
| + Currently, we're using Cloud Container Builder scopes for testing; this may
|
| + change in the future to allow different scopes to be passed in for different
|
| + channels.
|
| +
|
| + To use the returned channel to call methods directly, say:
|
| +
|
| + proxy = grpc_proxy.Proxy('https://grpc.luci.org/resource/prefix',
|
| + myapi_pb2.MyApiStub)
|
| +
|
| + To make a unary call with retries (recommended):
|
| +
|
| + proto_output = proxy.call_unary('MyMethod', proto_input)
|
| +
|
| + To make a unary call without retries, or to pass in a client side stream
|
| + (proto_input can be an iterator here):
|
| +
|
| + proto_output = proxy.call_no_retries('MyMethod', proto_input)
|
| +
|
| + You can also call the stub directly (not recommended, since no errors will be
|
| + caught or logged):
|
| +
|
| + proto_output = proxy.stub.MyMethod(proto_input)
|
| +
|
| + To make a call to a server-side streaming call (these are not retried):
|
| +
|
| + for response in proxy.get_stream('MyStreaminingMethod', proto_input):
|
| + <process response>
|
| +
|
| + To retrieve the prefix:
|
| +
|
| + prefix = proxy.prefix # returns "prefix/for/resource/names"
|
| +
|
| + All exceptions are logged using "logging.warning."
|
| + """
|
| +
|
| + def __init__(self, proxy, stub_class):
|
| + self._verbose = os.environ.get('LUCI_GRPC_PROXY_VERBOSE')
|
| + if self._verbose:
|
| + logging.info('Enabled verbose mode for %s with stub %s',
|
| + proxy, stub_class.__name__)
|
| + # NB: everything in url is unicode; convert to strings where
|
| + # needed.
|
| + url = urlparse.urlparse(proxy)
|
| + if self._verbose:
|
| + logging.info('Parsed URL for proxy is %r', url)
|
| + if url.scheme == 'http':
|
| + self._secure = False
|
| + elif url.scheme == 'https':
|
| + self._secure = True
|
| + else:
|
| + raise ValueError('gRPC proxy %s must use http[s], not %s' % (
|
| + proxy, url.scheme))
|
| + if url.netloc == '':
|
| + raise ValueError('gRPC proxy is missing hostname: %s' % proxy)
|
| + self._host = url.netloc
|
| + self._prefix = url.path
|
| + if self._prefix.endswith('/'):
|
| + self._prefix = self._prefix[:-1]
|
| + if self._prefix.startswith('/'):
|
| + self._prefix = self._prefix[1:]
|
| + if url.params != '' or url.fragment != '':
|
| + raise ValueError('gRPC proxy may not contain params or fragments: %s' %
|
| + proxy)
|
| + self._debug_info = ['full proxy name: ' + proxy]
|
| + self._channel = self._create_channel()
|
| + self._stub = stub_class(self._channel)
|
| + logging.info('%s: initialized', self.name)
|
| + if self._verbose:
|
| + self._dump_proxy_info()
|
| +
|
| + @property
|
| + def prefix(self):
|
| + return self._prefix
|
| +
|
| + @property
|
| + def channel(self):
|
| + return self._channel
|
| +
|
| + @property
|
| + def stub(self):
|
| + return self._stub
|
| +
|
| + @property
|
| + def name(self):
|
| + security = 'insecure'
|
| + if self._secure:
|
| + security = 'secure'
|
| + return 'gRPC %s proxy %s/%s' % (
|
| + security, self._host, self._stub.__class__.__name__)
|
| +
|
| + def call_unary(self, name, request):
|
| + """Calls a method, waiting if the service is not available.
|
| +
|
| + Usage: proto_output = proxy.call_unary('MyMethod', proto_input)
|
| + """
|
| + for attempt in range(1, MAX_GRPC_ATTEMPTS+1):
|
| + try:
|
| + return self.call_no_retries(name, request)
|
| + except grpc.RpcError as g:
|
| + if g.code() is not grpc.StatusCode.UNAVAILABLE:
|
| + raise
|
| + logging.warning('%s: call_grpc - proxy is unavailable (attempt %d/%d)',
|
| + self.name, attempt, MAX_GRPC_ATTEMPTS)
|
| + # Save the error in case we need to return it
|
| + grpc_error = g
|
| + time.sleep(net.calculate_sleep_before_retry(attempt, MAX_GRPC_SLEEP))
|
| + # If we get here, it must be because we got (and saved) an error
|
| + assert grpc_error is not None
|
| + raise grpc_error
|
| +
|
| + def get_stream(self, name, request):
|
| + """Calls a server-side streaming method, returning an iterator.
|
| +
|
| + Usage: for resp in proxy.get_stream('MyMethod', proto_input'):
|
| + """
|
| + stream = self.call_no_retries(name, request)
|
| + while True:
|
| + # The lambda "next(stream, 1)" will return a protobuf on success, or the
|
| + # integer 1 if the stream has ended. This allows us to avoid attempting
|
| + # to catch StopIteration, which gets logged by _wrap_grpc_operation.
|
| + response = self._wrap_grpc_operation(name + ' pull from stream',
|
| + lambda: next(stream, 1))
|
| + if isinstance(response, int):
|
| + # Iteration is finished
|
| + return
|
| + yield response
|
| +
|
| + def call_no_retries(self, name, request):
|
| + """Calls a method without any retries.
|
| +
|
| + Recommended for client-side streaming or nonidempotent unary calls.
|
| + """
|
| + method = getattr(self._stub, name)
|
| + if method is None:
|
| + raise NameError('%s: "%s" is not a valid method name', self.name, name)
|
| + return self._wrap_grpc_operation(
|
| + name, lambda: method(request, timeout=GRPC_TIMEOUT_SEC))
|
| +
|
| + def _wrap_grpc_operation(self, name, fn):
|
| + """Wraps a gRPC operation (call or iterator increment) for logging."""
|
| + if self._verbose:
|
| + logging.info('%s/%s - starting gRPC operation', self.name, name)
|
| + try:
|
| + return fn()
|
| + except grpc.RpcError as g:
|
| + logging.warning('\n\nFailure in %s/%s: gRPC error %s', self.name, name, g)
|
| + self._dump_proxy_info()
|
| + raise g
|
| + except Exception as e:
|
| + logging.warning('\n\nFailure in %s/%s: exception %s', self.name, name, e)
|
| + self._dump_proxy_info()
|
| + raise e
|
| +
|
| + def _dump_proxy_info(self):
|
| + logging.warning('DETAILED PROXY INFO')
|
| + logging.warning('prefix = %s', self.prefix)
|
| + logging.warning('debug info:\n\t%s\n\n',
|
| + '\n\t'.join(self._debug_info))
|
| +
|
| + def _create_channel(self):
|
| + # Make sure grpc was successfully imported
|
| + assert available()
|
| +
|
| + if not self._secure:
|
| + return grpc.insecure_channel(self._host)
|
| +
|
| + # Authenticate the host.
|
| + #
|
| + # You're allowed to override the root certs and server if necessary. For
|
| + # example, if you're running your proxy on localhost, you'll need to set
|
| + # GRPC_PROXY_TLS_ROOTS to the "roots.crt" file specifying the certificate
|
| + # for the root CA that your localhost server has used to certify itself, and
|
| + # the GRPC_PROXY_TLS_OVERRIDE to the name that your server is using to
|
| + # identify itself. For example, the ROOTS env var might be
|
| + # "/path/to/roots.crt" while the OVERRIDE env var might be "test_server," if
|
| + # this is what's used by the server you're running.
|
| + #
|
| + # If you're connecting to a real server with real SSL, none of this should
|
| + # be used.
|
| + if not certifi:
|
| + self._debug_info.append('CERTIFI IS NOT PRESENT;' +
|
| + ' gRPC HTTPS CONNECTIONS MAY FAIL')
|
| + root_certs = None
|
| + roots = os.environ.get('LUCI_GRPC_PROXY_TLS_ROOTS')
|
| + if roots:
|
| + self._debug_info.append('Overridden root CA: %s' % roots)
|
| + with open(roots) as f:
|
| + root_certs = f.read()
|
| + else:
|
| + self._debug_info.append('Using default root CAs from certifi')
|
| + overd = os.environ.get('LUCI_GRPC_PROXY_TLS_OVERRIDE')
|
| + options = ()
|
| + if overd:
|
| + options=(('grpc.ssl_target_name_override', overd),)
|
| + ssl_creds = grpc.ssl_channel_credentials(root_certificates=root_certs)
|
| +
|
| + # Authenticate the user.
|
| + scopes = ('https://www.googleapis.com/auth/cloud-source-tools',)
|
| + self._debug_info.append('Scopes are: %r' % scopes)
|
| + user_creds, _ = google_auth.default(scopes=scopes)
|
| +
|
| + # Create the channel.
|
| + request = google_auth_transport_requests.Request()
|
| + self._debug_info.append('Options are: %r' % options)
|
| + return google_auth_transport_grpc.secure_authorized_channel(
|
| + user_creds, request, self._host, ssl_creds, options=options)
|
|
|