OLD | NEW |
(Empty) | |
| 1 # Copyright 2017 The LUCI Authors. All rights reserved. |
| 2 # Use of this source code is governed under the Apache License, Version 2.0 |
| 3 # that can be found in the LICENSE file. |
| 4 |
| 5 """Common gRPC implementation for Swarming and Isolate""" |
| 6 |
| 7 import logging |
| 8 import os |
| 9 import re |
| 10 import time |
| 11 import urlparse |
| 12 from utils import net |
| 13 |
| 14 # gRPC may not be installed on the worker machine. This is fine, as long as |
| 15 # the bot doesn't attempt to use gRPC (checked in IsolateServerGrpc.__init__). |
| 16 # Full external requirements are: grpcio, certifi. |
| 17 try: |
| 18 import grpc |
| 19 from google import auth as google_auth |
| 20 from google.auth.transport import grpc as google_auth_transport_grpc |
| 21 from google.auth.transport import requests as google_auth_transport_requests |
| 22 except ImportError as err: |
| 23 grpc = None |
| 24 |
| 25 |
| 26 # If gRPC was successfully imported, try to import certifi as well. This is not |
| 27 # actually used anywhere in this module, but if certifi is missing, |
| 28 # google.auth.transport will fail (see |
| 29 # https://stackoverflow.com/questions/24973326). So checking it here allows us |
| 30 # to print out a somewhat-sane error message. |
| 31 certifi = None |
| 32 if grpc is not None: |
| 33 try: |
| 34 import certifi |
| 35 except ImportError: |
| 36 # Will print out error messages later (ie when we have a logger) |
| 37 pass |
| 38 |
| 39 |
| 40 # How many times to retry a gRPC call |
| 41 MAX_GRPC_ATTEMPTS = 30 |
| 42 |
| 43 |
| 44 # Longest time to sleep between gRPC calls |
| 45 MAX_GRPC_SLEEP = 10. |
| 46 |
| 47 |
| 48 # Start the timeout at three minutes. |
| 49 GRPC_TIMEOUT_SEC = 3 * 60 |
| 50 |
| 51 |
| 52 def available(): |
| 53 """Returns true if gRPC can be used on this host.""" |
| 54 return grpc != None |
| 55 |
| 56 |
| 57 class Proxy(object): |
| 58 """Represents a gRPC proxy. |
| 59 |
| 60 If the proxy begins with 'https', the returned channel will be secure and |
| 61 authorized using default application credentials - see |
| 62 https://developers.google.com/identity/protocols/application-default-credentia
ls. |
| 63 Currently, we're using Cloud Container Builder scopes for testing; this may |
| 64 change in the future to allow different scopes to be passed in for different |
| 65 channels. |
| 66 |
| 67 To use the returned channel to call methods directly, say: |
| 68 |
| 69 proxy = grpc_proxy.Proxy('https://grpc.luci.org/resource/prefix', |
| 70 myapi_pb2.MyApiStub) |
| 71 |
| 72 To make a unary call with retries (recommended): |
| 73 |
| 74 proto_output = proxy.call_unary('MyMethod', proto_input) |
| 75 |
| 76 To make a unary call without retries, or to pass in a client side stream |
| 77 (proto_input can be an iterator here): |
| 78 |
| 79 proto_output = proxy.call_no_retries('MyMethod', proto_input) |
| 80 |
| 81 You can also call the stub directly (not recommended, since no errors will be |
| 82 caught or logged): |
| 83 |
| 84 proto_output = proxy.stub.MyMethod(proto_input) |
| 85 |
| 86 To make a call to a server-side streaming call (these are not retried): |
| 87 |
| 88 for response in proxy.get_stream('MyStreaminingMethod', proto_input): |
| 89 <process response> |
| 90 |
| 91 To retrieve the prefix: |
| 92 |
| 93 prefix = proxy.prefix # returns "prefix/for/resource/names" |
| 94 |
| 95 All exceptions are logged using "logging.warning." |
| 96 """ |
| 97 |
| 98 def __init__(self, proxy, stub_class): |
| 99 self._verbose = os.environ.get('LUCI_GRPC_PROXY_VERBOSE') |
| 100 if self._verbose: |
| 101 logging.info('Enabled verbose mode for %s with stub %s', |
| 102 proxy, stub_class.__name__) |
| 103 # NB: everything in url is unicode; convert to strings where |
| 104 # needed. |
| 105 url = urlparse.urlparse(proxy) |
| 106 if self._verbose: |
| 107 logging.info('Parsed URL for proxy is %r', url) |
| 108 if url.scheme == 'http': |
| 109 self._secure = False |
| 110 elif url.scheme == 'https': |
| 111 self._secure = True |
| 112 else: |
| 113 raise ValueError('gRPC proxy %s must use http[s], not %s' % ( |
| 114 proxy, url.scheme)) |
| 115 if url.netloc == '': |
| 116 raise ValueError('gRPC proxy is missing hostname: %s' % proxy) |
| 117 self._host = url.netloc |
| 118 self._prefix = url.path |
| 119 if self._prefix.endswith('/'): |
| 120 self._prefix = self._prefix[:-1] |
| 121 if self._prefix.startswith('/'): |
| 122 self._prefix = self._prefix[1:] |
| 123 if url.params != '' or url.fragment != '': |
| 124 raise ValueError('gRPC proxy may not contain params or fragments: %s' % |
| 125 proxy) |
| 126 self._debug_info = ['full proxy name: ' + proxy] |
| 127 self._channel = self._create_channel() |
| 128 self._stub = stub_class(self._channel) |
| 129 logging.info('%s: initialized', self.name) |
| 130 if self._verbose: |
| 131 self._dump_proxy_info() |
| 132 |
| 133 @property |
| 134 def prefix(self): |
| 135 return self._prefix |
| 136 |
| 137 @property |
| 138 def channel(self): |
| 139 return self._channel |
| 140 |
| 141 @property |
| 142 def stub(self): |
| 143 return self._stub |
| 144 |
| 145 @property |
| 146 def name(self): |
| 147 security = 'insecure' |
| 148 if self._secure: |
| 149 security = 'secure' |
| 150 return 'gRPC %s proxy %s/%s' % ( |
| 151 security, self._host, self._stub.__class__.__name__) |
| 152 |
| 153 def call_unary(self, name, request): |
| 154 """Calls a method, waiting if the service is not available. |
| 155 |
| 156 Usage: proto_output = proxy.call_unary('MyMethod', proto_input) |
| 157 """ |
| 158 for attempt in range(1, MAX_GRPC_ATTEMPTS+1): |
| 159 try: |
| 160 return self.call_no_retries(name, request) |
| 161 except grpc.RpcError as g: |
| 162 if g.code() is not grpc.StatusCode.UNAVAILABLE: |
| 163 raise |
| 164 logging.warning('%s: call_grpc - proxy is unavailable (attempt %d/%d)', |
| 165 self.name, attempt, MAX_GRPC_ATTEMPTS) |
| 166 # Save the error in case we need to return it |
| 167 grpc_error = g |
| 168 time.sleep(net.calculate_sleep_before_retry(attempt, MAX_GRPC_SLEEP)) |
| 169 # If we get here, it must be because we got (and saved) an error |
| 170 assert grpc_error is not None |
| 171 raise grpc_error |
| 172 |
| 173 def get_stream(self, name, request): |
| 174 """Calls a server-side streaming method, returning an iterator. |
| 175 |
| 176 Usage: for resp in proxy.get_stream('MyMethod', proto_input'): |
| 177 """ |
| 178 stream = self.call_no_retries(name, request) |
| 179 while True: |
| 180 # The lambda "next(stream, 1)" will return a protobuf on success, or the |
| 181 # integer 1 if the stream has ended. This allows us to avoid attempting |
| 182 # to catch StopIteration, which gets logged by _wrap_grpc_operation. |
| 183 response = self._wrap_grpc_operation(name + ' pull from stream', |
| 184 lambda: next(stream, 1)) |
| 185 if isinstance(response, int): |
| 186 # Iteration is finished |
| 187 return |
| 188 yield response |
| 189 |
| 190 def call_no_retries(self, name, request): |
| 191 """Calls a method without any retries. |
| 192 |
| 193 Recommended for client-side streaming or nonidempotent unary calls. |
| 194 """ |
| 195 method = getattr(self._stub, name) |
| 196 if method is None: |
| 197 raise NameError('%s: "%s" is not a valid method name', self.name, name) |
| 198 return self._wrap_grpc_operation( |
| 199 name, lambda: method(request, timeout=GRPC_TIMEOUT_SEC)) |
| 200 |
| 201 def _wrap_grpc_operation(self, name, fn): |
| 202 """Wraps a gRPC operation (call or iterator increment) for logging.""" |
| 203 if self._verbose: |
| 204 logging.info('%s/%s - starting gRPC operation', self.name, name) |
| 205 try: |
| 206 return fn() |
| 207 except grpc.RpcError as g: |
| 208 logging.warning('\n\nFailure in %s/%s: gRPC error %s', self.name, name, g) |
| 209 self._dump_proxy_info() |
| 210 raise g |
| 211 except Exception as e: |
| 212 logging.warning('\n\nFailure in %s/%s: exception %s', self.name, name, e) |
| 213 self._dump_proxy_info() |
| 214 raise e |
| 215 |
| 216 def _dump_proxy_info(self): |
| 217 logging.warning('DETAILED PROXY INFO') |
| 218 logging.warning('prefix = %s', self.prefix) |
| 219 logging.warning('debug info:\n\t%s\n\n', |
| 220 '\n\t'.join(self._debug_info)) |
| 221 |
| 222 def _create_channel(self): |
| 223 # Make sure grpc was successfully imported |
| 224 assert available() |
| 225 |
| 226 if not self._secure: |
| 227 return grpc.insecure_channel(self._host) |
| 228 |
| 229 # Authenticate the host. |
| 230 # |
| 231 # You're allowed to override the root certs and server if necessary. For |
| 232 # example, if you're running your proxy on localhost, you'll need to set |
| 233 # GRPC_PROXY_TLS_ROOTS to the "roots.crt" file specifying the certificate |
| 234 # for the root CA that your localhost server has used to certify itself, and |
| 235 # the GRPC_PROXY_TLS_OVERRIDE to the name that your server is using to |
| 236 # identify itself. For example, the ROOTS env var might be |
| 237 # "/path/to/roots.crt" while the OVERRIDE env var might be "test_server," if |
| 238 # this is what's used by the server you're running. |
| 239 # |
| 240 # If you're connecting to a real server with real SSL, none of this should |
| 241 # be used. |
| 242 if not certifi: |
| 243 self._debug_info.append('CERTIFI IS NOT PRESENT;' + |
| 244 ' gRPC HTTPS CONNECTIONS MAY FAIL') |
| 245 root_certs = None |
| 246 roots = os.environ.get('LUCI_GRPC_PROXY_TLS_ROOTS') |
| 247 if roots: |
| 248 self._debug_info.append('Overridden root CA: %s' % roots) |
| 249 with open(roots) as f: |
| 250 root_certs = f.read() |
| 251 else: |
| 252 self._debug_info.append('Using default root CAs from certifi') |
| 253 overd = os.environ.get('LUCI_GRPC_PROXY_TLS_OVERRIDE') |
| 254 options = () |
| 255 if overd: |
| 256 options=(('grpc.ssl_target_name_override', overd),) |
| 257 ssl_creds = grpc.ssl_channel_credentials(root_certificates=root_certs) |
| 258 |
| 259 # Authenticate the user. |
| 260 scopes = ('https://www.googleapis.com/auth/cloud-source-tools',) |
| 261 self._debug_info.append('Scopes are: %r' % scopes) |
| 262 user_creds, _ = google_auth.default(scopes=scopes) |
| 263 |
| 264 # Create the channel. |
| 265 request = google_auth_transport_requests.Request() |
| 266 self._debug_info.append('Options are: %r' % options) |
| 267 return google_auth_transport_grpc.secure_authorized_channel( |
| 268 user_creds, request, self._host, ssl_creds, options=options) |
OLD | NEW |