OLD | NEW |
1 # Copyright 2016 The LUCI Authors. All rights reserved. | 1 # Copyright 2016 The LUCI Authors. All rights reserved. |
2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
4 | 4 |
5 # This is a reimplementation of RemoteClientNative but it uses (will use) | 5 # This is a reimplementation of RemoteClientNative but it uses (will use) |
6 # a gRPC method to communicate with a server instead of REST. | 6 # a gRPC method to communicate with a server instead of REST. |
7 | 7 |
8 import json | 8 import json |
9 import logging | 9 import logging |
| 10 import math |
| 11 import random |
| 12 import time |
10 | 13 |
11 import grpc | 14 import grpc |
12 import google.protobuf.json_format | 15 import google.protobuf.json_format |
13 from proto_bot import swarming_bot_pb2 | 16 from proto_bot import swarming_bot_pb2 |
14 from remote_client_errors import InternalError | 17 from remote_client_errors import InternalError |
| 18 from utils import net |
15 | 19 |
16 | 20 |
17 # How long to wait for a response from the server. Keeping the same as | 21 # How long to wait for a response from the server. Keeping the same as |
18 # the equivalent in remote_client.py for now. | 22 # the equivalent in remote_client.py for now. |
19 NET_CONNECTION_TIMEOUT_SEC = 5*60 | 23 NET_CONNECTION_TIMEOUT_SEC = 5*60 |
20 | 24 |
21 | 25 |
| 26 # How many times to retry a gRPC call |
| 27 MAX_GRPC_ATTEMPTS = 30 |
| 28 |
| 29 |
| 30 # Longest time to sleep between gRPC calls |
| 31 MAX_GRPC_SLEEP = 10. |
| 32 |
| 33 |
22 class RemoteClientGrpc(object): | 34 class RemoteClientGrpc(object): |
23 """RemoteClientGrpc knows how to make calls via gRPC. | 35 """RemoteClientGrpc knows how to make calls via gRPC. |
24 """ | 36 """ |
25 | 37 |
26 def __init__(self, server): | 38 def __init__(self, server): |
27 logging.info('Communicating with host %s via gRPC', server) | 39 logging.info('Communicating with host %s via gRPC', server) |
28 self._server = server | 40 self._server = server |
29 self._channel = grpc.insecure_channel(server) | 41 self._channel = grpc.insecure_channel(server) |
30 self._stub = swarming_bot_pb2.BotServiceStub(self._channel) | 42 self._stub = swarming_bot_pb2.BotServiceStub(self._channel) |
31 self._log_is_asleep = False | 43 self._log_is_asleep = False |
(...skipping 28 matching lines...) Loading... |
60 request.output_chunk.data = stdout_and_chunk[0] | 72 request.output_chunk.data = stdout_and_chunk[0] |
61 request.output_chunk.offset = stdout_and_chunk[1] | 73 request.output_chunk.offset = stdout_and_chunk[1] |
62 if exit_code != None: | 74 if exit_code != None: |
63 request.exit_status.code = exit_code | 75 request.exit_status.code = exit_code |
64 | 76 |
65 # Insert everything else. Note that the b64-encoded strings in the dict | 77 # Insert everything else. Note that the b64-encoded strings in the dict |
66 # are automatically decoded by ParseDict. | 78 # are automatically decoded by ParseDict. |
67 google.protobuf.json_format.ParseDict(params, request) | 79 google.protobuf.json_format.ParseDict(params, request) |
68 | 80 |
69 # Perform update | 81 # Perform update |
70 response = self._stub.TaskUpdate(request, | 82 response = call_grpc(self._stub.TaskUpdate, request) |
71 timeout=NET_CONNECTION_TIMEOUT_SEC) | |
72 logging.debug('post_task_update() = %s', request) | 83 logging.debug('post_task_update() = %s', request) |
73 if response.error: | 84 if response.error: |
74 raise InternalError(response.error) | 85 raise InternalError(response.error) |
75 return not response.must_stop | 86 return not response.must_stop |
76 | 87 |
77 def post_task_error(self, task_id, bot_id, message): | 88 def post_task_error(self, task_id, bot_id, message): |
78 request = swarming_bot_pb2.TaskErrorRequest() | 89 request = swarming_bot_pb2.TaskErrorRequest() |
79 request.bot_id = bot_id | 90 request.bot_id = bot_id |
80 request.task_id = task_id | 91 request.task_id = task_id |
81 request.msg = message | 92 request.msg = message |
82 logging.error('post_task_error() = %s', request) | 93 logging.error('post_task_error() = %s', request) |
83 | 94 |
84 response = self._stub.TaskError(request, timeout=NET_CONNECTION_TIMEOUT_SEC) | 95 response = call_grpc(self._stub.TaskError, request) |
85 return response.ok | 96 return response.ok |
86 | 97 |
87 def _attributes_json_to_proto(self, json_attr, msg): | 98 def _attributes_json_to_proto(self, json_attr, msg): |
88 msg.version = json_attr['version'] | 99 msg.version = json_attr['version'] |
89 for k, values in sorted(json_attr['dimensions'].iteritems()): | 100 for k, values in sorted(json_attr['dimensions'].iteritems()): |
90 pair = msg.dimensions.add() | 101 pair = msg.dimensions.add() |
91 pair.name = k | 102 pair.name = k |
92 pair.values.extend(values) | 103 pair.values.extend(values) |
93 create_state_proto(json_attr['state'], msg.state) | 104 create_state_proto(json_attr['state'], msg.state) |
94 | 105 |
95 def do_handshake(self, attributes): | 106 def do_handshake(self, attributes): |
96 request = swarming_bot_pb2.HandshakeRequest() | 107 request = swarming_bot_pb2.HandshakeRequest() |
97 self._attributes_json_to_proto(attributes, request.attributes) | 108 self._attributes_json_to_proto(attributes, request.attributes) |
98 response = self._stub.Handshake(request, timeout=NET_CONNECTION_TIMEOUT_SEC) | 109 response = call_grpc(self._stub.Handshake, request) |
99 resp = { | 110 resp = { |
100 'server_version': response.server_version, | 111 'server_version': response.server_version, |
101 'bot_version': response.bot_version, | 112 'bot_version': response.bot_version, |
102 'bot_group_cfg_version': response.bot_group_cfg_version, | 113 'bot_group_cfg_version': response.bot_group_cfg_version, |
103 'bot_group_cfg': { | 114 'bot_group_cfg': { |
104 'dimensions': { | 115 'dimensions': { |
105 d.name: d.values for d in response.bot_group_cfg.dimensions | 116 d.name: d.values for d in response.bot_group_cfg.dimensions |
106 }, | 117 }, |
107 }, | 118 }, |
108 } | 119 } |
109 logging.info('Completed handshake: %s', resp) | 120 logging.info('Completed handshake: %s', resp) |
110 return resp | 121 return resp |
111 | 122 |
112 def poll(self, attributes): | 123 def poll(self, attributes): |
113 request = swarming_bot_pb2.PollRequest() | 124 request = swarming_bot_pb2.PollRequest() |
114 self._attributes_json_to_proto(attributes, request.attributes) | 125 self._attributes_json_to_proto(attributes, request.attributes) |
115 # TODO(aludwin): gRPC-specific exception handling | 126 # TODO(aludwin): gRPC-specific exception handling |
116 response = self._stub.Poll(request, timeout=NET_CONNECTION_TIMEOUT_SEC) | 127 response = call_grpc(self._stub.Poll, request) |
117 | 128 |
118 if response.cmd == swarming_bot_pb2.PollResponse.UPDATE: | 129 if response.cmd == swarming_bot_pb2.PollResponse.UPDATE: |
119 return 'update', response.version | 130 return 'update', response.version |
120 | 131 |
121 if response.cmd == swarming_bot_pb2.PollResponse.SLEEP: | 132 if response.cmd == swarming_bot_pb2.PollResponse.SLEEP: |
122 if not self._log_is_asleep: | 133 if not self._log_is_asleep: |
123 logging.info('Going to sleep') | 134 logging.info('Going to sleep') |
124 self._log_is_asleep = True | 135 self._log_is_asleep = True |
125 return 'sleep', response.sleep_time | 136 return 'sleep', response.sleep_time |
126 | 137 |
(...skipping 39 matching lines...) Loading... |
166 self._log_is_asleep = False | 177 self._log_is_asleep = False |
167 return 'run', manifest | 178 return 'run', manifest |
168 | 179 |
169 raise ValueError('Unknown command in response: %s' % response) | 180 raise ValueError('Unknown command in response: %s' % response) |
170 | 181 |
171 def get_bot_code(self, new_zip_fn, bot_version, _bot_id): | 182 def get_bot_code(self, new_zip_fn, bot_version, _bot_id): |
172 # TODO(aludwin): exception handling, pass bot_id | 183 # TODO(aludwin): exception handling, pass bot_id |
173 logging.info('Updating to version: %s', bot_version) | 184 logging.info('Updating to version: %s', bot_version) |
174 request = swarming_bot_pb2.BotUpdateRequest() | 185 request = swarming_bot_pb2.BotUpdateRequest() |
175 request.bot_version = bot_version | 186 request.bot_version = bot_version |
176 response = self._stub.BotUpdate(request, timeout=NET_CONNECTION_TIMEOUT_SEC) | 187 response = call_grpc(self._stub.BotUpdate, request) |
177 with open(new_zip_fn, 'wb') as f: | 188 with open(new_zip_fn, 'wb') as f: |
178 f.write(response.bot_code) | 189 f.write(response.bot_code) |
179 | 190 |
180 def ping(self): | 191 def ping(self): |
181 pass | 192 pass |
182 | 193 |
183 | 194 |
184 def create_state_proto(state_dict, message): | 195 def create_state_proto(state_dict, message): |
185 """ Constructs a State message out of a state dict. | 196 """ Constructs a State message out of a state dict. |
186 | 197 |
(...skipping 27 matching lines...) Loading... |
214 def insert_dict_as_submessage(message, keyname, value): | 225 def insert_dict_as_submessage(message, keyname, value): |
215 """Encodes a dict as a Protobuf message. | 226 """Encodes a dict as a Protobuf message. |
216 | 227 |
217 The keyname for the message field is passed in to simplify the creation | 228 The keyname for the message field is passed in to simplify the creation |
218 of the submessage in the first place - you need to say getattr and not | 229 of the submessage in the first place - you need to say getattr and not |
219 simply refer to message.keyname since the former actually creates the | 230 simply refer to message.keyname since the former actually creates the |
220 submessage while the latter does not. | 231 submessage while the latter does not. |
221 """ | 232 """ |
222 sub_msg = getattr(message, keyname) | 233 sub_msg = getattr(message, keyname) |
223 google.protobuf.json_format.Parse(json.dumps(value), sub_msg) | 234 google.protobuf.json_format.Parse(json.dumps(value), sub_msg) |
| 235 |
| 236 |
| 237 def call_grpc(method, request): |
| 238 """Retries a command a set number of times""" |
| 239 for attempt in range(1, MAX_GRPC_ATTEMPTS+1): |
| 240 try: |
| 241 return method(request, timeout=NET_CONNECTION_TIMEOUT_SEC) |
| 242 except grpc.RpcError as g: |
| 243 if g.code() is not grpc.StatusCode.UNAVAILABLE: |
| 244 raise |
| 245 logging.warning('call_grpc - proxy is unavailable (attempt %d/%d)', |
| 246 attempt, MAX_GRPC_ATTEMPTS) |
| 247 grpc_error = g |
| 248 time.sleep(net.calculate_sleep_before_retry(attempt, MAX_GRPC_SLEEP)) |
| 249 # If we get here, it must be because we got (and saved) an error |
| 250 assert grpc_error is not None |
| 251 raise grpc_error |
OLD | NEW |