Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(301)

Unified Diff: appengine/swarming/swarming_bot/bot_code/remote_client_grpc.py

Issue 2592683002: Retry non-streaming gRPC calls (Closed)
Patch Set: Response to comments Created 4 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: appengine/swarming/swarming_bot/bot_code/remote_client_grpc.py
diff --git a/appengine/swarming/swarming_bot/bot_code/remote_client_grpc.py b/appengine/swarming/swarming_bot/bot_code/remote_client_grpc.py
index 3ef861d0017d73dc99af579aa4cbc7c56ac65bf7..4bcfb736690b9350ec0f37b3318deb4dff9c7548 100644
--- a/appengine/swarming/swarming_bot/bot_code/remote_client_grpc.py
+++ b/appengine/swarming/swarming_bot/bot_code/remote_client_grpc.py
@@ -7,6 +7,9 @@
import json
import logging
+import math
+import random
+import time
import grpc
import google.protobuf.json_format
@@ -19,6 +22,14 @@ from remote_client_errors import InternalError
NET_CONNECTION_TIMEOUT_SEC = 5*60
+# How many times to retry a gRPC call
+MAX_GRPC_ATTEMPTS = 30
+
+
+# Longest time to sleep between gRPC calls
+MAX_GRPC_SLEEP = 10.
+
+
class RemoteClientGrpc(object):
"""RemoteClientGrpc knows how to make calls via gRPC.
"""
@@ -67,8 +78,7 @@ class RemoteClientGrpc(object):
google.protobuf.json_format.ParseDict(params, request)
# Perform update
- response = self._stub.TaskUpdate(request,
- timeout=NET_CONNECTION_TIMEOUT_SEC)
+ response = call_grpc(self._stub.TaskUpdate, request)
logging.debug('post_task_update() = %s', request)
if response.error:
raise InternalError(response.error)
@@ -81,7 +91,7 @@ class RemoteClientGrpc(object):
request.msg = message
logging.error('post_task_error() = %s', request)
- response = self._stub.TaskError(request, timeout=NET_CONNECTION_TIMEOUT_SEC)
+ response = call_grpc(self._stub.TaskError, request)
return response.ok
def _attributes_json_to_proto(self, json_attr, msg):
@@ -95,7 +105,7 @@ class RemoteClientGrpc(object):
def do_handshake(self, attributes):
request = swarming_bot_pb2.HandshakeRequest()
self._attributes_json_to_proto(attributes, request.attributes)
- response = self._stub.Handshake(request, timeout=NET_CONNECTION_TIMEOUT_SEC)
+ response = call_grpc(self._stub.Handshake, request)
resp = {
'server_version': response.server_version,
'bot_version': response.bot_version,
@@ -113,7 +123,7 @@ class RemoteClientGrpc(object):
request = swarming_bot_pb2.PollRequest()
self._attributes_json_to_proto(attributes, request.attributes)
# TODO(aludwin): gRPC-specific exception handling
- response = self._stub.Poll(request, timeout=NET_CONNECTION_TIMEOUT_SEC)
+ response = call_grpc(self._stub.Poll, request)
if response.cmd == swarming_bot_pb2.PollResponse.UPDATE:
return 'update', response.version
@@ -173,7 +183,7 @@ class RemoteClientGrpc(object):
logging.info('Updating to version: %s', bot_version)
request = swarming_bot_pb2.BotUpdateRequest()
request.bot_version = bot_version
- response = self._stub.BotUpdate(request, timeout=NET_CONNECTION_TIMEOUT_SEC)
+ response = call_grpc(self._stub.BotUpdate, request)
with open(new_zip_fn, 'wb') as f:
f.write(response.bot_code)
@@ -221,3 +231,26 @@ def insert_dict_as_submessage(message, keyname, value):
"""
sub_msg = getattr(message, keyname)
google.protobuf.json_format.Parse(json.dumps(value), sub_msg)
+
+
+def call_grpc(method, request):
+ """Retries a command a set number of times"""
+ num_attempts = 0
+ while True:
+ try:
+ num_attempts += 1
+ return method(request, timeout=NET_CONNECTION_TIMEOUT_SEC)
+ except grpc.RpcError as rpc_error:
+ logging.warning('call_grpc - gRPC error: %s', str(rpc_error))
+ if (rpc_error.code() is grpc.StatusCode.UNAVAILABLE
+ and num_attempts < MAX_GRPC_ATTEMPTS):
+ logging.warning('Swallowing UNAVAILABLE error (attempt %d/%d)',
+ num_attempts, MAX_GRPC_ATTEMPTS)
+ # random.random() returns [0.0, 1.0). Starts with relatively short
M-A Ruel 2016/12/20 22:05:55 Please call net.calculate_sleep_before_retry(num_a
+ # waiting time by starting with 1.5/2+1.5^-1 median offset.
+ duration = (random.random() * 1.5) + math.pow(1.5, (num_attempts - 1))
+ assert duration > 0.1
+ duration = min(MAX_GRPC_SLEEP, duration)
+ time.sleep(duration)
+ else:
+ raise
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698