OLD | NEW |
1 # Copyright 2016 The LUCI Authors. All rights reserved. | 1 # Copyright 2016 The LUCI Authors. All rights reserved. |
2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
4 | 4 |
5 # This is a reimplementation of RemoteClientNative but it uses (will use) | 5 # This is a reimplementation of RemoteClientNative but it uses (will use) |
6 # a gRPC method to communicate with a server instead of REST. | 6 # a gRPC method to communicate with a server instead of REST. |
7 | 7 |
| 8 import copy |
8 import json | 9 import json |
9 import logging | 10 import logging |
10 import time | 11 import time |
11 | 12 |
12 import grpc | 13 import grpc |
13 import google.protobuf.json_format | 14 import google.protobuf.json_format |
14 from proto_bot import swarming_bot_pb2 | 15 from proto_bot import swarming_bot_pb2 |
15 from remote_client_errors import InternalError | 16 from remote_client_errors import InternalError |
16 from remote_client_errors import PollError | 17 from remote_client_errors import PollError |
17 from utils import net | 18 from utils import net |
18 | 19 |
19 | 20 |
20 # How long to wait for a response from the server. Keeping the same as | 21 # How long to wait for a response from the server. Keeping the same as |
21 # the equivalent in remote_client.py for now. | 22 # the equivalent in remote_client.py for now. |
22 NET_CONNECTION_TIMEOUT_SEC = 5*60 | 23 NET_CONNECTION_TIMEOUT_SEC = 5*60 |
23 | 24 |
24 | 25 |
25 # How many times to retry a gRPC call | 26 # How many times to retry a gRPC call |
26 MAX_GRPC_ATTEMPTS = 30 | 27 MAX_GRPC_ATTEMPTS = 30 |
27 | 28 |
28 | 29 |
29 # Longest time to sleep between gRPC calls | 30 # Longest time to sleep between gRPC calls |
30 MAX_GRPC_SLEEP = 10. | 31 MAX_GRPC_SLEEP = 10. |
31 | 32 |
32 | 33 |
33 class RemoteClientGrpc(object): | 34 class RemoteClientGrpc(object): |
34 """RemoteClientGrpc knows how to make calls via gRPC. | 35 """RemoteClientGrpc knows how to make calls via gRPC. |
| 36 |
| 37 Any non-scalar value that is returned that references values from the proto |
| 38 messages should be deepcopy'd since protos make use of weak references and |
| 39 might be garbage-collected before the values are used. |
35 """ | 40 """ |
36 | 41 |
37 def __init__(self, server): | 42 def __init__(self, server): |
38 logging.info('Communicating with host %s via gRPC', server) | 43 logging.info('Communicating with host %s via gRPC', server) |
39 self._server = server | 44 self._server = server |
40 self._channel = grpc.insecure_channel(server) | 45 self._channel = grpc.insecure_channel(server) |
41 self._stub = swarming_bot_pb2.BotServiceStub(self._channel) | 46 self._stub = swarming_bot_pb2.BotServiceStub(self._channel) |
42 self._log_is_asleep = False | 47 self._log_is_asleep = False |
43 | 48 |
44 def is_grpc(self): | 49 def is_grpc(self): |
(...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
110 'server_version': response.server_version, | 115 'server_version': response.server_version, |
111 'bot_version': response.bot_version, | 116 'bot_version': response.bot_version, |
112 'bot_group_cfg_version': response.bot_group_cfg_version, | 117 'bot_group_cfg_version': response.bot_group_cfg_version, |
113 'bot_group_cfg': { | 118 'bot_group_cfg': { |
114 'dimensions': { | 119 'dimensions': { |
115 d.name: d.values for d in response.bot_group_cfg.dimensions | 120 d.name: d.values for d in response.bot_group_cfg.dimensions |
116 }, | 121 }, |
117 }, | 122 }, |
118 } | 123 } |
119 logging.info('Completed handshake: %s', resp) | 124 logging.info('Completed handshake: %s', resp) |
120 return resp | 125 return copy.deepcopy(resp) |
121 | 126 |
122 def poll(self, attributes): | 127 def poll(self, attributes): |
123 request = swarming_bot_pb2.PollRequest() | 128 request = swarming_bot_pb2.PollRequest() |
124 self._attributes_json_to_proto(attributes, request.attributes) | 129 self._attributes_json_to_proto(attributes, request.attributes) |
125 # TODO(aludwin): gRPC-specific exception handling (raise PollError). | 130 # TODO(aludwin): gRPC-specific exception handling (raise PollError). |
126 response = call_grpc(self._stub.Poll, request) | 131 response = call_grpc(self._stub.Poll, request) |
127 | 132 |
128 if response.cmd == swarming_bot_pb2.PollResponse.UPDATE: | 133 if response.cmd == swarming_bot_pb2.PollResponse.UPDATE: |
129 return 'update', response.version | 134 return 'update', response.version |
130 | 135 |
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
167 'task_id': protoManifest.task_id, | 172 'task_id': protoManifest.task_id, |
168 | 173 |
169 # These keys are only needed by raw commands. While this method | 174 # These keys are only needed by raw commands. While this method |
170 # only supports isolated commands, the keys need to exist to avoid | 175 # only supports isolated commands, the keys need to exist to avoid |
171 # missing key errors. | 176 # missing key errors. |
172 'command': None, | 177 'command': None, |
173 'extra_args': None, | 178 'extra_args': None, |
174 } | 179 } |
175 logging.info('Received job manifest: %s', manifest) | 180 logging.info('Received job manifest: %s', manifest) |
176 self._log_is_asleep = False | 181 self._log_is_asleep = False |
177 return 'run', manifest | 182 return 'run', copy.deepcopy(manifest) |
178 | 183 |
179 raise PollError('Unknown command in response: %s' % response) | 184 raise PollError('Unknown command in response: %s' % response) |
180 | 185 |
181 def get_bot_code(self, new_zip_fn, bot_version, _bot_id): | 186 def get_bot_code(self, new_zip_fn, bot_version, _bot_id): |
182 # TODO(aludwin): exception handling, pass bot_id | 187 # TODO(aludwin): exception handling, pass bot_id |
183 logging.info('Updating to version: %s', bot_version) | 188 logging.info('Updating to version: %s', bot_version) |
184 request = swarming_bot_pb2.BotUpdateRequest() | 189 request = swarming_bot_pb2.BotUpdateRequest() |
185 request.bot_version = bot_version | 190 request.bot_version = bot_version |
186 response = call_grpc(self._stub.BotUpdate, request) | 191 response = call_grpc(self._stub.BotUpdate, request) |
187 with open(new_zip_fn, 'wb') as f: | 192 with open(new_zip_fn, 'wb') as f: |
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
241 except grpc.RpcError as g: | 246 except grpc.RpcError as g: |
242 if g.code() is not grpc.StatusCode.UNAVAILABLE: | 247 if g.code() is not grpc.StatusCode.UNAVAILABLE: |
243 raise | 248 raise |
244 logging.warning('call_grpc - proxy is unavailable (attempt %d/%d)', | 249 logging.warning('call_grpc - proxy is unavailable (attempt %d/%d)', |
245 attempt, MAX_GRPC_ATTEMPTS) | 250 attempt, MAX_GRPC_ATTEMPTS) |
246 grpc_error = g | 251 grpc_error = g |
247 time.sleep(net.calculate_sleep_before_retry(attempt, MAX_GRPC_SLEEP)) | 252 time.sleep(net.calculate_sleep_before_retry(attempt, MAX_GRPC_SLEEP)) |
248 # If we get here, it must be because we got (and saved) an error | 253 # If we get here, it must be because we got (and saved) an error |
249 assert grpc_error is not None | 254 assert grpc_error is not None |
250 raise grpc_error | 255 raise grpc_error |
OLD | NEW |