Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(680)

Side by Side Diff: appengine/swarming/swarming_bot/bot_code/remote_client_grpc.py

Issue 2592683002: Retry non-streaming gRPC calls (Closed)
Patch Set: Response to comments Created 4 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # Copyright 2016 The LUCI Authors. All rights reserved. 1 # Copyright 2016 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file. 3 # that can be found in the LICENSE file.
4 4
5 # This is a reimplementation of RemoteClientNative but it uses (will use) 5 # This is a reimplementation of RemoteClientNative but it uses (will use)
6 # a gRPC method to communicate with a server instead of REST. 6 # a gRPC method to communicate with a server instead of REST.
7 7
8 import json 8 import json
9 import logging 9 import logging
10 import math
11 import random
12 import time
10 13
11 import grpc 14 import grpc
12 import google.protobuf.json_format 15 import google.protobuf.json_format
13 from proto_bot import swarming_bot_pb2 16 from proto_bot import swarming_bot_pb2
14 from remote_client_errors import InternalError 17 from remote_client_errors import InternalError
15 18
16 19
17 # How long to wait for a response from the server. Keeping the same as 20 # How long to wait for a response from the server. Keeping the same as
18 # the equivalent in remote_client.py for now. 21 # the equivalent in remote_client.py for now.
19 NET_CONNECTION_TIMEOUT_SEC = 5*60 22 NET_CONNECTION_TIMEOUT_SEC = 5*60
20 23
21 24
25 # How many times to retry a gRPC call
26 MAX_GRPC_ATTEMPTS = 30
27
28
29 # Longest time to sleep between gRPC calls
30 MAX_GRPC_SLEEP = 10.
31
32
22 class RemoteClientGrpc(object): 33 class RemoteClientGrpc(object):
23 """RemoteClientGrpc knows how to make calls via gRPC. 34 """RemoteClientGrpc knows how to make calls via gRPC.
24 """ 35 """
25 36
26 def __init__(self, server): 37 def __init__(self, server):
27 logging.info('Communicating with host %s via gRPC', server) 38 logging.info('Communicating with host %s via gRPC', server)
28 self._server = server 39 self._server = server
29 self._channel = grpc.insecure_channel(server) 40 self._channel = grpc.insecure_channel(server)
30 self._stub = swarming_bot_pb2.BotServiceStub(self._channel) 41 self._stub = swarming_bot_pb2.BotServiceStub(self._channel)
31 self._log_is_asleep = False 42 self._log_is_asleep = False
(...skipping 28 matching lines...) Expand all
60 request.output_chunk.data = stdout_and_chunk[0] 71 request.output_chunk.data = stdout_and_chunk[0]
61 request.output_chunk.offset = stdout_and_chunk[1] 72 request.output_chunk.offset = stdout_and_chunk[1]
62 if exit_code != None: 73 if exit_code != None:
63 request.exit_status.code = exit_code 74 request.exit_status.code = exit_code
64 75
65 # Insert everything else. Note that the b64-encoded strings in the dict 76 # Insert everything else. Note that the b64-encoded strings in the dict
66 # are automatically decoded by ParseDict. 77 # are automatically decoded by ParseDict.
67 google.protobuf.json_format.ParseDict(params, request) 78 google.protobuf.json_format.ParseDict(params, request)
68 79
69 # Perform update 80 # Perform update
70 response = self._stub.TaskUpdate(request, 81 response = call_grpc(self._stub.TaskUpdate, request)
71 timeout=NET_CONNECTION_TIMEOUT_SEC)
72 logging.debug('post_task_update() = %s', request) 82 logging.debug('post_task_update() = %s', request)
73 if response.error: 83 if response.error:
74 raise InternalError(response.error) 84 raise InternalError(response.error)
75 return not response.must_stop 85 return not response.must_stop
76 86
77 def post_task_error(self, task_id, bot_id, message): 87 def post_task_error(self, task_id, bot_id, message):
78 request = swarming_bot_pb2.TaskErrorRequest() 88 request = swarming_bot_pb2.TaskErrorRequest()
79 request.bot_id = bot_id 89 request.bot_id = bot_id
80 request.task_id = task_id 90 request.task_id = task_id
81 request.msg = message 91 request.msg = message
82 logging.error('post_task_error() = %s', request) 92 logging.error('post_task_error() = %s', request)
83 93
84 response = self._stub.TaskError(request, timeout=NET_CONNECTION_TIMEOUT_SEC) 94 response = call_grpc(self._stub.TaskError, request)
85 return response.ok 95 return response.ok
86 96
87 def _attributes_json_to_proto(self, json_attr, msg): 97 def _attributes_json_to_proto(self, json_attr, msg):
88 msg.version = json_attr['version'] 98 msg.version = json_attr['version']
89 for k, values in sorted(json_attr['dimensions'].iteritems()): 99 for k, values in sorted(json_attr['dimensions'].iteritems()):
90 pair = msg.dimensions.add() 100 pair = msg.dimensions.add()
91 pair.name = k 101 pair.name = k
92 pair.values.extend(values) 102 pair.values.extend(values)
93 create_state_proto(json_attr['state'], msg.state) 103 create_state_proto(json_attr['state'], msg.state)
94 104
95 def do_handshake(self, attributes): 105 def do_handshake(self, attributes):
96 request = swarming_bot_pb2.HandshakeRequest() 106 request = swarming_bot_pb2.HandshakeRequest()
97 self._attributes_json_to_proto(attributes, request.attributes) 107 self._attributes_json_to_proto(attributes, request.attributes)
98 response = self._stub.Handshake(request, timeout=NET_CONNECTION_TIMEOUT_SEC) 108 response = call_grpc(self._stub.Handshake, request)
99 resp = { 109 resp = {
100 'server_version': response.server_version, 110 'server_version': response.server_version,
101 'bot_version': response.bot_version, 111 'bot_version': response.bot_version,
102 'bot_group_cfg_version': response.bot_group_cfg_version, 112 'bot_group_cfg_version': response.bot_group_cfg_version,
103 'bot_group_cfg': { 113 'bot_group_cfg': {
104 'dimensions': { 114 'dimensions': {
105 d.name: d.values for d in response.bot_group_cfg.dimensions 115 d.name: d.values for d in response.bot_group_cfg.dimensions
106 }, 116 },
107 }, 117 },
108 } 118 }
109 logging.info('Completed handshake: %s', resp) 119 logging.info('Completed handshake: %s', resp)
110 return resp 120 return resp
111 121
112 def poll(self, attributes): 122 def poll(self, attributes):
113 request = swarming_bot_pb2.PollRequest() 123 request = swarming_bot_pb2.PollRequest()
114 self._attributes_json_to_proto(attributes, request.attributes) 124 self._attributes_json_to_proto(attributes, request.attributes)
115 # TODO(aludwin): gRPC-specific exception handling 125 # TODO(aludwin): gRPC-specific exception handling
116 response = self._stub.Poll(request, timeout=NET_CONNECTION_TIMEOUT_SEC) 126 response = call_grpc(self._stub.Poll, request)
117 127
118 if response.cmd == swarming_bot_pb2.PollResponse.UPDATE: 128 if response.cmd == swarming_bot_pb2.PollResponse.UPDATE:
119 return 'update', response.version 129 return 'update', response.version
120 130
121 if response.cmd == swarming_bot_pb2.PollResponse.SLEEP: 131 if response.cmd == swarming_bot_pb2.PollResponse.SLEEP:
122 if not self._log_is_asleep: 132 if not self._log_is_asleep:
123 logging.info('Going to sleep') 133 logging.info('Going to sleep')
124 self._log_is_asleep = True 134 self._log_is_asleep = True
125 return 'sleep', response.sleep_time 135 return 'sleep', response.sleep_time
126 136
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
166 self._log_is_asleep = False 176 self._log_is_asleep = False
167 return 'run', manifest 177 return 'run', manifest
168 178
169 raise ValueError('Unknown command in response: %s' % response) 179 raise ValueError('Unknown command in response: %s' % response)
170 180
171 def get_bot_code(self, new_zip_fn, bot_version, _bot_id): 181 def get_bot_code(self, new_zip_fn, bot_version, _bot_id):
172 # TODO(aludwin): exception handling, pass bot_id 182 # TODO(aludwin): exception handling, pass bot_id
173 logging.info('Updating to version: %s', bot_version) 183 logging.info('Updating to version: %s', bot_version)
174 request = swarming_bot_pb2.BotUpdateRequest() 184 request = swarming_bot_pb2.BotUpdateRequest()
175 request.bot_version = bot_version 185 request.bot_version = bot_version
176 response = self._stub.BotUpdate(request, timeout=NET_CONNECTION_TIMEOUT_SEC) 186 response = call_grpc(self._stub.BotUpdate, request)
177 with open(new_zip_fn, 'wb') as f: 187 with open(new_zip_fn, 'wb') as f:
178 f.write(response.bot_code) 188 f.write(response.bot_code)
179 189
180 def ping(self): 190 def ping(self):
181 pass 191 pass
182 192
183 193
184 def create_state_proto(state_dict, message): 194 def create_state_proto(state_dict, message):
185 """ Constructs a State message out of a state dict. 195 """ Constructs a State message out of a state dict.
186 196
(...skipping 27 matching lines...) Expand all
214 def insert_dict_as_submessage(message, keyname, value): 224 def insert_dict_as_submessage(message, keyname, value):
215 """Encodes a dict as a Protobuf message. 225 """Encodes a dict as a Protobuf message.
216 226
217 The keyname for the message field is passed in to simplify the creation 227 The keyname for the message field is passed in to simplify the creation
218 of the submessage in the first place - you need to say getattr and not 228 of the submessage in the first place - you need to say getattr and not
219 simply refer to message.keyname since the former actually creates the 229 simply refer to message.keyname since the former actually creates the
220 submessage while the latter does not. 230 submessage while the latter does not.
221 """ 231 """
222 sub_msg = getattr(message, keyname) 232 sub_msg = getattr(message, keyname)
223 google.protobuf.json_format.Parse(json.dumps(value), sub_msg) 233 google.protobuf.json_format.Parse(json.dumps(value), sub_msg)
234
235
236 def call_grpc(method, request):
237 """Retries a command a set number of times"""
238 num_attempts = 0
239 while True:
240 try:
241 num_attempts += 1
242 return method(request, timeout=NET_CONNECTION_TIMEOUT_SEC)
243 except grpc.RpcError as rpc_error:
244 logging.warning('call_grpc - gRPC error: %s', str(rpc_error))
245 if (rpc_error.code() is grpc.StatusCode.UNAVAILABLE
246 and num_attempts < MAX_GRPC_ATTEMPTS):
247 logging.warning('Swallowing UNAVAILABLE error (attempt %d/%d)',
248 num_attempts, MAX_GRPC_ATTEMPTS)
249 # random.random() returns [0.0, 1.0). Starts with relatively short
M-A Ruel 2016/12/20 22:05:55 Please call net.calculate_sleep_before_retry(num_a
250 # waiting time by starting with 1.5/2+1.5^-1 median offset.
251 duration = (random.random() * 1.5) + math.pow(1.5, (num_attempts - 1))
252 assert duration > 0.1
253 duration = min(MAX_GRPC_SLEEP, duration)
254 time.sleep(duration)
255 else:
256 raise
OLDNEW
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698