Index: appengine/swarming/swarming_bot/bot_code/remote_client_grpc.py |
diff --git a/appengine/swarming/swarming_bot/bot_code/remote_client_grpc.py b/appengine/swarming/swarming_bot/bot_code/remote_client_grpc.py |
index 3ef861d0017d73dc99af579aa4cbc7c56ac65bf7..4bcfb736690b9350ec0f37b3318deb4dff9c7548 100644 |
--- a/appengine/swarming/swarming_bot/bot_code/remote_client_grpc.py |
+++ b/appengine/swarming/swarming_bot/bot_code/remote_client_grpc.py |
@@ -7,6 +7,9 @@ |
import json |
import logging |
+import math |
+import random |
+import time |
import grpc |
import google.protobuf.json_format |
@@ -19,6 +22,14 @@ from remote_client_errors import InternalError |
NET_CONNECTION_TIMEOUT_SEC = 5*60 |
+# How many times to retry a gRPC call |
+MAX_GRPC_ATTEMPTS = 30 |
+ |
+ |
+# Longest time to sleep between gRPC calls |
+MAX_GRPC_SLEEP = 10. |
+ |
+ |
class RemoteClientGrpc(object): |
"""RemoteClientGrpc knows how to make calls via gRPC. |
""" |
@@ -67,8 +78,7 @@ class RemoteClientGrpc(object): |
google.protobuf.json_format.ParseDict(params, request) |
# Perform update |
- response = self._stub.TaskUpdate(request, |
- timeout=NET_CONNECTION_TIMEOUT_SEC) |
+ response = call_grpc(self._stub.TaskUpdate, request) |
logging.debug('post_task_update() = %s', request) |
if response.error: |
raise InternalError(response.error) |
@@ -81,7 +91,7 @@ class RemoteClientGrpc(object): |
request.msg = message |
logging.error('post_task_error() = %s', request) |
- response = self._stub.TaskError(request, timeout=NET_CONNECTION_TIMEOUT_SEC) |
+ response = call_grpc(self._stub.TaskError, request) |
return response.ok |
def _attributes_json_to_proto(self, json_attr, msg): |
@@ -95,7 +105,7 @@ class RemoteClientGrpc(object): |
def do_handshake(self, attributes): |
request = swarming_bot_pb2.HandshakeRequest() |
self._attributes_json_to_proto(attributes, request.attributes) |
- response = self._stub.Handshake(request, timeout=NET_CONNECTION_TIMEOUT_SEC) |
+ response = call_grpc(self._stub.Handshake, request) |
resp = { |
'server_version': response.server_version, |
'bot_version': response.bot_version, |
@@ -113,7 +123,7 @@ class RemoteClientGrpc(object): |
request = swarming_bot_pb2.PollRequest() |
self._attributes_json_to_proto(attributes, request.attributes) |
# TODO(aludwin): gRPC-specific exception handling |
- response = self._stub.Poll(request, timeout=NET_CONNECTION_TIMEOUT_SEC) |
+ response = call_grpc(self._stub.Poll, request) |
if response.cmd == swarming_bot_pb2.PollResponse.UPDATE: |
return 'update', response.version |
@@ -173,7 +183,7 @@ class RemoteClientGrpc(object): |
logging.info('Updating to version: %s', bot_version) |
request = swarming_bot_pb2.BotUpdateRequest() |
request.bot_version = bot_version |
- response = self._stub.BotUpdate(request, timeout=NET_CONNECTION_TIMEOUT_SEC) |
+ response = call_grpc(self._stub.BotUpdate, request) |
with open(new_zip_fn, 'wb') as f: |
f.write(response.bot_code) |
@@ -221,3 +231,26 @@ def insert_dict_as_submessage(message, keyname, value): |
""" |
sub_msg = getattr(message, keyname) |
google.protobuf.json_format.Parse(json.dumps(value), sub_msg) |
+ |
+ |
+def call_grpc(method, request): |
+ """Retries a command a set number of times""" |
+ num_attempts = 0 |
+ while True: |
+ try: |
+ num_attempts += 1 |
+ return method(request, timeout=NET_CONNECTION_TIMEOUT_SEC) |
+ except grpc.RpcError as rpc_error: |
+ logging.warning('call_grpc - gRPC error: %s', str(rpc_error)) |
+ if (rpc_error.code() is grpc.StatusCode.UNAVAILABLE |
+ and num_attempts < MAX_GRPC_ATTEMPTS): |
+ logging.warning('Swallowing UNAVAILABLE error (attempt %d/%d)', |
+ num_attempts, MAX_GRPC_ATTEMPTS) |
+ # random.random() returns [0.0, 1.0). Starts with relatively short |
M-A Ruel
2016/12/20 22:05:55
Please call net.calculate_sleep_before_retry(num_a
|
+ # waiting time by starting with 1.5/2+1.5^-1 median offset. |
+ duration = (random.random() * 1.5) + math.pow(1.5, (num_attempts - 1)) |
+ assert duration > 0.1 |
+ duration = min(MAX_GRPC_SLEEP, duration) |
+ time.sleep(duration) |
+ else: |
+ raise |