Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(602)

Unified Diff: appengine/swarming/swarming_bot/bot_code/bot_main.py

Issue 2024313003: Send authorization headers when calling Swarming backend. (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-py@master
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/swarming/swarming_bot/bot_code/bot_main.py
diff --git a/appengine/swarming/swarming_bot/bot_code/bot_main.py b/appengine/swarming/swarming_bot/bot_code/bot_main.py
index a5ca317c21e072f5f79a65a8d6d6d0daff2247ca..60592c6338d052252e22a5ee5f5732810d09a850 100644
--- a/appengine/swarming/swarming_bot/bot_code/bot_main.py
+++ b/appengine/swarming/swarming_bot/bot_code/bot_main.py
@@ -15,12 +15,13 @@ server-provided bot_config.py. This permits safe load testing.
"""
import contextlib
+import functools
import json
import logging
import optparse
import os
+import Queue
import shutil
-import signal
import sys
import tempfile
import threading
@@ -29,6 +30,7 @@ import traceback
import zipfile
import common
+import remote_client
import singleton
from api import bot
from api import os_utilities
@@ -170,6 +172,16 @@ def setup_bot(skip_reboot):
botobj.restart('Starting new swarming bot: %s' % THIS_FILE)
+def get_authentication_headers(botobj):
+ """Calls bot_config.get_authentication_headers() if it is defined."""
+ if _in_load_test_mode():
+ return (None, None)
+ logging.info('get_authentication_headers()')
+ from config import bot_config
+ func = getattr(bot_config, 'get_authentication_headers', None)
+ return func(botobj) if func else (None, None)
M-A Ruel 2016/06/01 17:08:02 wrap that to catch all exceptions like the other f
Vadim Sh. 2016/06/03 01:15:32 Exceptions are handled in RemoteClient, where it k
+
+
### end of bot_config handler part.
@@ -228,8 +240,8 @@ def post_error_task(botobj, error, task_id):
'message': error,
'task_id': task_id,
}
- return net.url_read_json(
- botobj.server + '/swarming/api/v1/bot/task_error/%s' % task_id, data=data)
+ return botobj.remote.url_read_json(
+ '/swarming/api/v1/bot/task_error/%s' % task_id, data=data)
def on_shutdown_hook(b):
@@ -256,19 +268,30 @@ def get_bot():
config = get_config()
assert not config['server'].endswith('/'), config
- # Create a temporary object to call the hooks.
- botobj = bot.Bot(
+ # Use temporary Bot object to call get_attributes. Attributes are needed to
+ # construct the "real" bot.Bot.
+ attributes = get_attributes(
+ bot.Bot(
+ remote_client.RemoteClient(config['server'], None),
attributes,
config['server'],
config['server_version'],
os.path.dirname(THIS_FILE),
- on_shutdown_hook)
- return bot.Bot(
- get_attributes(botobj),
+ on_shutdown_hook))
+
+ # Make remote client callback use the returned bot object. We assume here
+ # RemoteClient doesn't call its callback in the constructor (since 'botobj' is
+ # undefined during the construction).
+ botobj = bot.Bot(
+ remote_client.RemoteClient(
+ config['server'],
+ lambda: get_authentication_headers(botobj)),
+ attributes,
config['server'],
config['server_version'],
os.path.dirname(THIS_FILE),
on_shutdown_hook)
+ return botobj
def clean_isolated_cache(botobj):
@@ -335,16 +358,24 @@ def run_bot(arg_error):
# clause is kept there "just in case".
logging.exception('server_ping threw')
+ # Next we make sure the bot can make authenticated calls by grabbing
+ # the auth headers, retrying on errors a bunch of times. We don't give up
+ # if it fails though (maybe the bot will "fix itself" later).
+ botobj = get_bot()
+ try:
+ botobj.remote.initialize(quit_bit)
+ except remote_client.InitializationError as exc:
+ botobj.post_error('failed to grab auth headers: %s' % exc.last_error)
+ logging.error('Can\'t grab auth headers, continuing anyway...')
+
if quit_bit.is_set():
logging.info('Early quit 1')
return 0
# If this fails, there's hardly anything that can be done, the bot can't
# even get to the point to be able to self-update.
- botobj = get_bot()
- resp = net.url_read_json(
- botobj.server + '/swarming/api/v1/bot/handshake',
- data=botobj._attributes)
+ resp = botobj.remote.url_read_json(
+ '/swarming/api/v1/bot/handshake', data=botobj._attributes)
if not resp:
logging.error('Failed to contact for handshake')
else:
@@ -411,8 +442,8 @@ def poll_server(botobj, quit_bit):
"""
# Access to a protected member _XXX of a client class - pylint: disable=W0212
start = time.time()
- resp = net.url_read_json(
- botobj.server + '/swarming/api/v1/bot/poll', data=botobj._attributes)
+ resp = botobj.remote.url_read_json(
+ '/swarming/api/v1/bot/poll', data=botobj._attributes)
if not resp:
return False
logging.debug('Server response:\n%s', resp)
@@ -436,8 +467,8 @@ def poll_server(botobj, quit_bit):
'output_chunk_start': 0,
'task_id': resp['task_id'],
}
- net.url_read_json(
- botobj.server + '/swarming/api/v1/bot/task_update/%s' % resp['task_id'],
+ botobj.remote.url_read_json(
+ '/swarming/api/v1/bot/task_update/%s' % resp['task_id'],
data=params)
return False
@@ -494,6 +525,7 @@ def run_manifest(botobj, manifest, start):
failure = False
internal_failure = False
msg = None
+ headers_dumper = None
work_dir = os.path.join(botobj.base_dir, 'work')
try:
try:
@@ -521,6 +553,15 @@ def run_manifest(botobj, manifest, start):
task_result_file = os.path.join(work_dir, 'task_runner_out.json')
if os.path.exists(task_result_file):
os.remove(task_result_file)
+
+ # Start a thread that periodically puts authentication header in a file on
+ # disk. task_runner reads them from there before making HTTP calls to the
+ # swarming server.
+ headers_file = os.path.join(work_dir, 'bot_auth_headers.json')
+ if botobj.remote.uses_auth:
+ headers_dumper = AuthHeadersDumper(botobj.remote, headers_file)
+ headers_dumper.start()
+
command = [
sys.executable, THIS_FILE, 'task_runner',
'--swarming-server', url,
@@ -531,7 +572,10 @@ def run_manifest(botobj, manifest, start):
'--start', str(start),
'--min-free-space', str(get_min_free_space()),
]
+ if botobj.remote.uses_auth:
+ command.extend(['--auth-headers-file', headers_file])
logging.debug('Running command: %s', command)
+
# Put the output file into the current working directory, which should be
# the one containing swarming_bot.zip.
log_path = os.path.join(botobj.base_dir, 'logs', 'task_runner_stdout.log')
@@ -592,6 +636,8 @@ def run_manifest(botobj, manifest, start):
e, traceback.format_exc()[-2048:])
internal_failure = True
finally:
+ if headers_dumper:
+ headers_dumper.stop()
if internal_failure:
post_error_task(botobj, msg, task_id)
call_hook(
@@ -622,12 +668,12 @@ def update_bot(botobj, version):
new_zip = os.path.join(os.path.dirname(THIS_FILE), new_zip)
# Download as a new file.
- url = botobj.server + '/swarming/api/v1/bot/bot_code/%s' % version
- if not net.url_retrieve(new_zip, url):
+ url_path = '/swarming/api/v1/bot/bot_code/%s' % version
+ if not botobj.remote.url_retrieve(new_zip, url_path):
# It can happen when a server is rapidly updated multiple times in a row.
botobj.post_error(
'Unable to download %s from %s; first tried version %s' %
- (new_zip, url, version))
+ (new_zip, botobj.server + url_path, version))
# Poll again, this may work next time. To prevent busy-loop, sleep a little.
time.sleep(2)
return
@@ -692,6 +738,54 @@ def update_lkgbc(botobj):
botobj.post_error('Failed to update LKGBC: %s' % e)
+class AuthHeadersDumper(object):
+ """Represents a thread that dumps auth headers to a file on disk each minute.
+
+ 'task_runner' reads them from there when making HTTP calls.
+ """
+
+ def __init__(self, remote, path):
+ self._remote = remote
+ self._path = path
+ self._thread = None
+ self._signal = None
+
+ def start(self):
+ """Starts a thread that dumps headers to the file."""
+ assert self._thread is None
+ self._dump_headers() # initial dump
+ self._signal = Queue.Queue()
+ self._thread = threading.Thread(
+ target=self._run,
+ args=(self._signal,),
+ name='AuthHeadersDumper %s' % self._path)
+ self._thread.daemon = True
+ self._thread.start()
+
+ def stop(self):
+ """Stops the dumping thread (if it is running)."""
+ if not self._thread:
+ return
+ self._signal.put(None)
+ self._thread.join(60) # don't wait forever
+ if self._thread.is_alive():
+ logging.error('AuthHeadersDumper thread failed to terminate in time')
+ self._signal = None
+ self._thread = None
+
+ def _dump_headers(self):
+ logging.info('Dumping auth headers to %s', self._path)
+ # TODO(vadimsh): Implement.
+
+ def _run(self, signal):
+ while True:
+ try:
+ signal.get(timeout=60)
+ return # the stop signal received, quit the thread
+ except Queue.Empty:
+ self._dump_headers()
+
+
def get_config():
"""Returns the data from config.json."""
global _ERROR_HANDLER_WAS_REGISTERED
« no previous file with comments | « appengine/swarming/swarming_bot/api/bot.py ('k') | appengine/swarming/swarming_bot/bot_code/remote_client.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698