OLD | NEW |
---|---|
(Empty) | |
1 #!/usr/bin/env python | |
M-A Ruel
2017/08/02 21:05:11
This file is not executable.
aludwin
2017/08/04 01:44:27
Done.
| |
2 # Copyright 2017 The LUCI Authors. All rights reserved. | |
3 # Use of this source code is governed under the Apache License, Version 2.0 | |
4 # that can be found in the LICENSE file. | |
5 | |
6 """Common gRPC implementation for Swarming and Isolate""" | |
7 | |
8 import logging | |
9 import os | |
10 import re | |
11 from utils import net | |
12 | |
13 # gRPC may not be installed on the worker machine. This is fine, as long as | |
14 # the bot doesn't attempt to use gRPC (checked in IsolateServerGrpc.__init__). | |
15 # Full external requirements are: grpcio, certifi. | |
16 try: | |
17 import grpc | |
18 from google import auth as google_auth | |
19 from google.auth.transport import grpc as google_auth_transport_grpc | |
20 from google.auth.transport import requests as google_auth_transport_requests | |
21 except ImportError as err: | |
22 grpc = None | |
23 | |
24 | |
25 # If gRPC was successfully imported, try to import certifi as well. This is not | |
26 # actually used anywhere in this module, but if certifi is missing, | |
27 # google.auth.transport (https://stackoverflow.com/questions/24973326). So this | |
M-A Ruel
2017/08/02 21:05:11
the sentence is incomplete
aludwin
2017/08/04 01:44:26
Done.
| |
28 # allows us to print out a somewhat-sane error message. | |
29 certifi = None | |
30 if grpc is not None: | |
31 try: | |
32 import certifi | |
33 except ImportError: | |
34 # Will print out error messages later (ie when we have a logger) | |
35 pass | |
36 | |
37 | |
38 # How many times to retry a gRPC call | |
39 MAX_GRPC_ATTEMPTS = 30 | |
40 | |
41 | |
42 # Longest time to sleep between gRPC calls | |
43 MAX_GRPC_SLEEP = 10. | |
44 | |
45 | |
46 # Start the timeout at three minutes. | |
47 GRPC_TIMEOUT_SEC = 3 * 60 | |
48 | |
49 | |
50 def available(): | |
51 """Returns true if gRPC can be used on this host.""" | |
52 return grpc != None | |
53 | |
54 | |
55 class Proxy(object): | |
56 """Represents a gRPC proxy. | |
57 | |
58 If the proxy begins with 'https', the returned channel will be secure and | |
59 authorized using default application credentials | |
60 (https://developers.google.com/identity/protocols/application-default-credenti als). | |
M-A Ruel
2017/08/02 21:05:11
() are unnecessary
aludwin
2017/08/04 01:44:26
Done.
| |
61 Currently, we're using Cloud Container Builder scopes for testing; this may | |
62 change in the future to allow different scopes to be passed in for different | |
63 channels. | |
64 | |
65 To use the returned channel to call methods directly, say: | |
66 | |
67 proxy = grpc_proxy.Proxy('https://grpc.luci.org/resource/prefix', | |
68 myapi_pb2.MyApiStub) | |
69 | |
70 To make a unary call with retries (recommended): | |
71 | |
72 proto_output = proxy.call_unary('MyMethod', proto_input) | |
73 | |
74 To make a unary call without retries, or to pass in a client side stream | |
75 (proto_input can be an iterator here): | |
76 | |
77 proto_output = proxy.call_no_retries('MyMethod', proto_input) | |
78 | |
79 You can also call the stub directly (not recommended, since no errors will be | |
80 caught or logged): | |
81 | |
82 proto_output = proxy.stub.MyMethod(proto_input) | |
83 | |
84 To make a call to a server-side streaming call (these are not retried): | |
85 | |
86 for response in proxy.get_stream('MyStreaminingMethod', proto_input): | |
87 <process response> | |
88 | |
89 To retrieve the prefix: | |
90 | |
91 prefix = proxy.prefix # returns "prefix/for/resource/names" | |
92 | |
93 All exceptions are logged using "logging.warning." | |
94 """ | |
95 | |
96 def __init__(self, proxy, stub_class): | |
97 # `proxy` must be of the form: http[s]://<server>[:port][/prefix] | |
98 m = re.search('^(https?):\/\/([^\/]+)/?(.*)$', proxy) | |
M-A Ruel
2017/08/02 21:05:11
I prefer to use urlparse + startswith(('http://',
aludwin
2017/08/04 01:44:26
Done.
| |
99 if not m: | |
100 raise ValueError(('gRPC proxy must have the form: ' | |
101 'http[s]://<server>[:port][/prefix] ' | |
102 '(given: %s)') % proxy) | |
103 transport = m.group(1) | |
104 if transport == 'http': | |
105 self._secure = False | |
106 elif transport == 'https': | |
107 self._secure = True | |
108 else: | |
109 raise ValueError('unknown transport %s (should be http[s])' % transport) | |
110 self._host = m.group(2) | |
111 self._prefix = m.group(3) | |
112 self._debug_info = [] | |
113 self._channel = self._create_channel() | |
114 self._stub = stub_class(self._channel) | |
115 logging.info('%s: initialized', self.name) | |
116 | |
117 @property | |
118 def host(self): | |
119 return self._host | |
120 | |
121 @property | |
122 def prefix(self): | |
123 return self._prefix | |
124 | |
125 @property | |
126 def secure(self): | |
127 return self._secure | |
128 | |
129 @property | |
130 def channel(self): | |
131 return self._channel | |
132 | |
133 @property | |
134 def stub(self): | |
135 return self._stub | |
136 | |
137 @property | |
138 def name(self): | |
139 security = 'insecure' | |
140 if self._secure: | |
141 security = 'secure' | |
142 return 'gRPC %s proxy %s/%s' % ( | |
143 security, self.host, self._stub.__class__.__name__) | |
144 | |
145 def call_unary(self, name, request): | |
146 """Calls a method, waiting if the service is not available. | |
147 | |
148 Usage: proto_output = proxy.call_unary('MyMethod', proto_input) | |
149 """ | |
150 grpc_error = None | |
151 for attempt in range(1, MAX_GRPC_ATTEMPTS+1): | |
152 try: | |
153 return self.call_no_retries(name, request) | |
154 except grpc.RpcError as g: | |
155 if g.code() is not grpc.StatusCode.UNAVAILABLE: | |
156 raise | |
157 logging.warning('%s: call_grpc - proxy is unavailable (attempt %d/%d)', | |
158 self.name, attempt, MAX_GRPC_ATTEMPTS) | |
159 # Save the error in case we need to return it | |
160 grpc_error = g | |
161 time.sleep(net.calculate_sleep_before_retry(attempt, MAX_GRPC_SLEEP)) | |
162 # If we get here, it must be because we got (and saved) an error | |
163 assert grpc_error is not None | |
164 raise grpc_error | |
165 | |
166 def get_stream(self, name, request): | |
167 """Calls a server-side streaming method, returning an iterator. | |
168 | |
169 Usage: for resp in proxy.get_stream('MyMethod', proto_input'): | |
170 """ | |
171 def stream_proxy(stream, name): | |
172 while True: | |
173 # The lambda "next(stream, 1)" will return a protobuf on success, or the | |
174 # integer 1 if the stream has ended. This allows us to avoid attempting | |
175 # to catch StopIteration, which gets logged by _wrap_grpc_operation. | |
176 response = self._wrap_grpc_operation(name + ' pull from stream', | |
177 lambda: next(stream, 1)) | |
178 if isinstance(response, int): | |
179 # Iteration is finished | |
180 return | |
181 yield response | |
182 stream = self.call_no_retries(name, request) | |
183 sp = lambda x: stream_proxy(x, name) | |
M-A Ruel
2017/08/02 21:05:11
I don't understand why you create a lambda just to
aludwin
2017/08/04 01:44:26
I created a lambda because I forgot how generators
| |
184 return sp(stream) | |
185 | |
186 def call_no_retries(self, name, request): | |
187 """Calls a method without any retries. | |
188 | |
189 Recommended for client-side streaming or nonidempotent unary calls. | |
190 """ | |
191 method = getattr(self._stub, name) | |
192 if method is None: | |
193 raise NameError('%s: "%s" is not a valid method name', self.name, name) | |
194 return self._wrap_grpc_operation(name, lambda: method(request, timeout=GRPC_ TIMEOUT_SEC)) | |
M-A Ruel
2017/08/02 21:05:11
wrap (Everywhere)
aludwin
2017/08/04 01:44:27
Done.
| |
195 | |
196 def _wrap_grpc_operation(self, name, fn): | |
197 """Wraps a gRPC operation (call or iterator increment) for logging.""" | |
198 try: | |
199 return fn() | |
200 except grpc.RpcError as g: | |
201 logging.warning('%s/%s - gRPC error: %s', self.name, name, g) | |
202 self._dump_proxy_info() | |
203 raise g | |
204 except Exception as e: | |
205 logging.warning('caught exception %s; none is %s', repr(e), e is None) | |
206 logging.warning('%s/%s - generic error: %s', self.name, name, e) | |
207 self._dump_proxy_info() | |
208 raise e | |
209 | |
210 def _dump_proxy_info(self): | |
211 logging.warning('%s: DETAILED PROXY INFO', self.name) | |
212 logging.warning('%s: prefix = %s', self.name, self.prefix) | |
213 logging.warning('%s: debug info:\n\t%s', self.name, '\n\t'.join(self._debug_ info)) | |
214 | |
215 def _create_channel(self): | |
216 # Make sure grpc was successfully imported | |
217 assert available() | |
218 | |
219 if not self._secure: | |
220 return grpc.insecure_channel(self._host) | |
221 | |
222 # Authenticate the host. | |
223 # | |
224 # You're allowed to override the root certs and server if necessary. For | |
225 # example, if you're running your proxy on localhost, you'll need to set | |
226 # GRPC_PROXY_TLS_ROOTS to the "roots.crt" file specifying the certificate fo r | |
227 # the root CA that your localhost server has used to certify itself, and the | |
228 # GRPC_PROXY_TLS_OVERRIDE to the name that your server is using to identify | |
229 # itself. For example, the ROOTS env var might be "/path/to/roots.crt" while | |
230 # the OVERRIDE env var might be "test_server," if this is what's used by the | |
231 # server you're running. | |
232 # | |
233 # If you're connecting to a real server with real SSL, none of this should b e | |
234 # used. | |
235 if not certifi: | |
236 self._debug_info.append('CERTIFI IS NOT PRESENT; gRPC HTTPS CONNECTIONS MA Y FAIL') | |
237 root_certs = None | |
238 roots = os.environ.get('GRPC_PROXY_TLS_ROOTS') | |
M-A Ruel
2017/08/02 21:05:11
We'd need to document these under doc/
aludwin
2017/08/04 01:44:26
Done.
| |
239 if roots is not None: | |
240 self._debug_info.append('Overridden root CA: %s' % roots) | |
241 with open(roots) as f: | |
242 root_certs = f.read() | |
243 else: | |
244 self._debug_info.append('Using default root CAs from certifi') | |
245 overd = os.environ.get('GRPC_PROXY_TLS_OVERRIDE') | |
246 options = () | |
247 if overd is not None: | |
248 options=(('grpc.ssl_target_name_override', overd),) | |
249 ssl_creds = grpc.ssl_channel_credentials(root_certificates=root_certs) | |
250 | |
251 # Authenticate the user. | |
252 scopes = ('https://www.googleapis.com/auth/cloud-source-tools',) | |
253 self._debug_info.append('Scopes are: %s' % repr(scopes)) | |
254 user_creds, _ = google_auth.default(scopes=scopes) | |
255 | |
256 # Create the channel. | |
257 request = google_auth_transport_requests.Request() | |
258 self._debug_info.append('Options are: %s' % repr(options)) | |
259 return google_auth_transport_grpc.secure_authorized_channel( | |
260 user_creds, request, self._host, ssl_creds, options=options) | |
OLD | NEW |