Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1810)

Unified Diff: appengine/swarming/swarming_bot/bot_code/remote_client_grpc.py

Issue 2987333002: Refactor all gRPC proxy code into a single class. (Closed)
Patch Set: Fix pylint errors Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/swarming/swarming_bot/bot_code/remote_client_grpc.py
diff --git a/appengine/swarming/swarming_bot/bot_code/remote_client_grpc.py b/appengine/swarming/swarming_bot/bot_code/remote_client_grpc.py
index 8990ef17e7af4532c3043e043b71938a57211804..6bc4a62cc99201f344f552ab3f5e4296512e7764 100644
--- a/appengine/swarming/swarming_bot/bot_code/remote_client_grpc.py
+++ b/appengine/swarming/swarming_bot/bot_code/remote_client_grpc.py
@@ -10,26 +10,13 @@ import json
import logging
import time
-import grpc
import google.protobuf.json_format
from proto_bot import swarming_bot_pb2
from remote_client_errors import InternalError
from remote_client_errors import MintOAuthTokenError
from remote_client_errors import PollError
from utils import net
-
-
-# How long to wait for a response from the server. Keeping the same as
-# the equivalent in remote_client.py for now.
-NET_CONNECTION_TIMEOUT_SEC = 5*60
-
-
-# How many times to retry a gRPC call
-MAX_GRPC_ATTEMPTS = 30
-
-
-# Longest time to sleep between gRPC calls
-MAX_GRPC_SLEEP = 10.
+from utils import grpc_proxy
class RemoteClientGrpc(object):
@@ -40,11 +27,13 @@ class RemoteClientGrpc(object):
might be garbage-collected before the values are used.
"""
- def __init__(self, server):
+ def __init__(self, server, fake_proxy=None):
logging.info('Communicating with host %s via gRPC', server)
+ if fake_proxy:
+ self._proxy = fake_proxy
+ else:
+ self._proxy = grpc_proxy.Proxy(server, swarming_bot_pb2.BotServiceStub)
self._server = server
- self._channel = grpc.insecure_channel(server)
- self._stub = swarming_bot_pb2.BotServiceStub(self._channel)
self._log_is_asleep = False
def is_grpc(self):
@@ -84,7 +73,7 @@ class RemoteClientGrpc(object):
google.protobuf.json_format.ParseDict(params, request)
# Perform update
- response = call_grpc(self._stub.TaskUpdate, request)
+ response = self._proxy.call_unary('TaskUpdate', request)
logging.debug('post_task_update() = %s', request)
if response.error:
raise InternalError(response.error)
@@ -97,7 +86,7 @@ class RemoteClientGrpc(object):
request.msg = message
logging.error('post_task_error() = %s', request)
- response = call_grpc(self._stub.TaskError, request)
+ response = self._proxy.call_unary('TaskError', request)
return response.ok
def _attributes_json_to_proto(self, json_attr, msg):
@@ -111,7 +100,7 @@ class RemoteClientGrpc(object):
def do_handshake(self, attributes):
request = swarming_bot_pb2.HandshakeRequest()
self._attributes_json_to_proto(attributes, request.attributes)
- response = call_grpc(self._stub.Handshake, request)
+ response = self._proxy.call_unary('Handshake', request)
resp = {
'server_version': response.server_version,
'bot_version': response.bot_version,
@@ -129,7 +118,7 @@ class RemoteClientGrpc(object):
request = swarming_bot_pb2.PollRequest()
self._attributes_json_to_proto(attributes, request.attributes)
# TODO(aludwin): gRPC-specific exception handling (raise PollError).
- response = call_grpc(self._stub.Poll, request)
+ response = self._proxy.call_unary('Poll', request)
if response.cmd == swarming_bot_pb2.PollResponse.UPDATE:
return 'update', response.version
@@ -189,7 +178,7 @@ class RemoteClientGrpc(object):
logging.info('Updating to version: %s', bot_version)
request = swarming_bot_pb2.BotUpdateRequest()
request.bot_version = bot_version
- response = call_grpc(self._stub.BotUpdate, request)
+ response = self._proxy.call_unary('BotUpdate', request)
with open(new_zip_fn, 'wb') as f:
f.write(response.bot_code)
@@ -244,18 +233,3 @@ def insert_dict_as_submessage(message, keyname, value):
google.protobuf.json_format.Parse(json.dumps(value), sub_msg)
-def call_grpc(method, request):
- """Retries a command a set number of times"""
- for attempt in range(1, MAX_GRPC_ATTEMPTS+1):
- try:
- return method(request, timeout=NET_CONNECTION_TIMEOUT_SEC)
- except grpc.RpcError as g:
- if g.code() is not grpc.StatusCode.UNAVAILABLE:
- raise
- logging.warning('call_grpc - proxy is unavailable (attempt %d/%d)',
- attempt, MAX_GRPC_ATTEMPTS)
- grpc_error = g
- time.sleep(net.calculate_sleep_before_retry(attempt, MAX_GRPC_SLEEP))
- # If we get here, it must be because we got (and saved) an error
- assert grpc_error is not None
- raise grpc_error

Powered by Google App Engine
This is Rietveld 408576698