Chromium Code Reviews| Index: testing/legion/client_lib.py |
| diff --git a/testing/legion/client_lib.py b/testing/legion/client_lib.py |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..4c69967d285735d57fb5443c471369f090ba2bc1 |
| --- /dev/null |
| +++ b/testing/legion/client_lib.py |
| @@ -0,0 +1,284 @@ |
| +# Copyright 2015 The Chromium Authors. All rights reserved. |
| +# Use of this source code is governed by a BSD-style license that can be |
| +# found in the LICENSE file. |
| +import argparse |
|
Marc-Antoine Ruel (Google)
2015/01/30 18:00:21
In general we keep an empty line in between
Mike Meade
2015/01/30 21:04:04
Done.
|
| +import logging |
| +import os |
| +import socket |
| +import subprocess |
| +import tempfile |
| +import threading |
| +import uuid |
| +import xmlrpclib |
| + |
| +#pylint: disable=relative-import |
| +import common_lib |
| +import client_rpc_server |
| + |
| +THIS_DIR = os.path.dirname(os.path.abspath(__file__)) |
| +SWARMING_DIR = os.path.join(THIS_DIR, '../../tools/swarming_client') |
|
Marc-Antoine Ruel (Google)
2015/01/30 18:00:21
if you plan this to work on windows, use os.path.j
Mike Meade
2015/01/30 21:04:04
Done.
|
| +ISOLATE_PY = os.path.join(SWARMING_DIR, 'isolate.py') |
| +SWARMING_PY = os.path.join(SWARMING_DIR, 'swarming.py') |
| +# ISOLATE_SERVER = 'omnibot-isolate-server.appspot.com' |
| +# SWARMING_SERVER = 'https://omnibot-swarming-server.appspot.com/' |
| +CLIENT_CONNECTION_TIMEOUT = 30 * 60 # 30 minutes |
| +args = None |
| + |
| + |
| +class Error(Exception): |
| + pass |
| + |
| + |
| +class ConnectionTimeoutError(Error): |
| + pass |
| + |
| + |
| +def GetArgs(): |
| + """Parse command line args args. |
| + |
| + Returns: |
| + Parsed command line args. |
| + """ |
| + global args |
|
Marc-Antoine Ruel (Google)
2015/01/30 18:00:21
not a fan
Mike Meade
2015/01/30 21:04:04
Moved into the class.
|
| + if not args: |
| + parser = argparse.ArgumentParser() |
| + parser.add_argument('--isolate-server') |
| + parser.add_argument('--swarming-server') |
| + parser.add_argument('--client-connection-timeout', |
| + default=CLIENT_CONNECTION_TIMEOUT) |
| + args, _ = parser.parse_known_args() |
| + return args |
| + |
| + |
| +class Client(object): |
| + """The main client class. |
| + |
| + This class is used to create clients, connect to their RPC servers, and |
| + run RPC commands. |
| + """ |
| + |
| + _client_count = 0 |
| + |
| + def __init__(self, isolate_file, discovery_server, name=None): |
| + """ctor. |
| + |
| + Args: |
| + isolate_file: The path to the client isolate file. |
| + discovery_server: The discovery server to register with |
| + name: A name for the client. |
| + """ |
| + self._IncreaseCount() |
| + self._discovery_server = discovery_server |
| + self._name = name or self._CreateName() |
| + self._priority = 100 |
| + self._isolate_file = isolate_file |
| + self._isolated_file = isolate_file + 'd' |
| + self._connected = False |
| + self._connect_event = threading.Event() |
| + self._ip_address = None |
| + self._config_vars = [] |
| + self._dimensions = [] |
| + self._rpc_timeout = None |
| + self._otp = str(uuid.uuid1()) |
|
Marc-Antoine Ruel (Google)
2015/01/30 18:00:21
eventually it'd be nice for this to be determinist
Mike Meade
2015/01/30 21:04:04
Maybe HOST_FQDN\CLIENT_NAME\SHA-1_LEGION_FILES?
T
|
| + self._rpc = None |
| + self._verbose = False |
| + |
| + @property |
| + def connected(self): |
| + """Return the value of self._connected.""" |
| + return self._connected |
| + |
| + @property |
| + def connect_event(self): |
| + return self._connect_event |
| + |
| + @property |
| + def rpc(self): |
| + """Return the rpc object.""" |
| + return self._rpc |
| + |
| + @property |
| + def priority(self): |
| + return self._priority |
| + |
| + @property |
| + def rpc_idle_timeout(self): |
| + return self._rpc_idle_timeout |
| + |
| + @property |
| + def name(self): |
| + return self._name |
| + |
| + @classmethod |
| + def _IncreaseCount(cls): |
| + """Increase the client_count parameter.""" |
| + cls._client_count += 1 |
| + |
| + @classmethod |
| + def _CreateName(cls): |
| + """Create a name for this client. |
| + |
| + By default the name is "Client%s" where %s is the number of clients that |
| + currently exist. |
| + """ |
| + return 'Client%d' % cls._client_count |
| + |
| + def SetPriority(self, priority): |
|
Marc-Antoine Ruel (Google)
2015/01/30 18:00:21
Why not
@priority.setter
def priority(self, value)
Mike Meade
2015/01/30 21:04:04
The only reason I added the method like this was f
|
| + """Sets the priority of the client task. |
| + |
| + Args: |
| + priority: The priority to set. |
| + """ |
| + logging.debug('Setting %s priority to %s', self._name, priority) |
| + self._priority = priority |
| + |
| + def SetVerbose(self): |
| + """Set the client verbosity to debug.""" |
| + logging.debug('Setting %s --verbosity', self._name) |
| + self._verbose = True |
| + |
| + def AddConfigVars(self, key, value): |
| + """Add a set of config vars to isolate.py. |
| + |
| + Args: |
| + key: The config vars key. |
| + value: The config vars value. |
| + """ |
| + logging.debug('Adding --config-var %s %s to %s', key, value, |
| + self._name) |
| + self._config_vars.append((key, value)) |
| + |
| + def AddDimension(self, key, value): |
| + """Add a set of dimensions to swarming.py. |
| + |
| + Args: |
| + key: The dimension key. |
| + value: The dimension value. |
| + """ |
| + logging.debug('Adding --dimension %s %s to %s', key, value, |
| + self._name) |
| + self._dimensions.append((key, value)) |
| + |
| + def SetRPCTimeout(self, timeout): |
| + """Sets the client's RPC timeout value. |
| + |
| + If the RPC server does not receive an RPC request within this time the |
| + client controller will exit. |
| + |
| + Args: |
| + timeout: The RPC server timeout in seconds. |
| + """ |
| + logging.debug('Setting %s RPC timeout to %s', self._name, timeout) |
| + self._rpc_timeout = timeout |
| + |
| + def Create(self, wait=False, timeout=None): |
| + """Create the client machine and wait for it to be created if specified. |
| + |
| + Args: |
| + wait: True to block until created, False to return immediately. |
| + timeout: The timeout to block before raising a ConnectionTimeoutError. |
| + """ |
| + logging.info('Creating %s', self._name) |
| + self._connect_event.clear() |
| + self._RegisterOnConnectCallback() |
| + self._ExecuteIsolate() |
| + self._ExecuteSwarming() |
| + if wait: |
| + self.WaitForConnection(timeout) |
| + |
| + def WaitForConnection(self, timeout=None): |
| + """Connect to the client machine. |
| + |
| + This method waits for the client machine to register itself |
| + with the discovery server. |
| + |
| + Args: |
| + timeout: The timeout in seconds. |
| + |
| + Raises: |
| + TimeoutError if the client doesn't connect in time. |
| + """ |
| + timeout = timeout or GetArgs().client_connection_timeout |
| + msg = ('Waiting for %s to connect with a timeout of %d seconds' % |
| + (self._name, timeout)) |
| + logging.info(msg) |
| + self._connect_event.wait(timeout) |
| + if not self._connect_event.is_set(): |
| + raise ConnectionTimeoutError() |
| + |
| + def Release(self): |
| + """Quit the client's RPC server so it can release the machine.""" |
| + if self._rpc is not None and self._connected: |
| + logging.info('Releasing %s', self._name) |
| + try: |
| + self._rpc.Quit() |
| + except (socket.error, xmlrpclib.Fault): |
| + logging.error('Unable to connect to %s to call Quit', self._name) |
| + self._rpc = None |
| + self._connected = False |
| + |
| + def _ExecuteIsolate(self): |
| + """Execute isolate.py with the given args.""" |
| + cmd = [ |
| + 'python', |
| + ISOLATE_PY, |
| + 'archive', |
| + '--isolate=' + self._isolate_file, |
| + '--isolated=' + self._isolated_file, |
| + ] |
| + |
| + if GetArgs().isolate_server: |
| + cmd += ['--isolate-server', GetArgs().isolate_server] |
| + for key, value in self._config_vars: |
| + cmd += ['--config-var', key, value] |
| + |
| + logging.debug('Running %s', ' '.join(cmd)) |
| + if subprocess.call(cmd, stdout=subprocess.PIPE) != 0: |
| + raise Error('Error calling isolate.py') |
| + |
| + def _ExecuteSwarming(self): |
| + """Execute swarming.py with the vars.""" |
| + cmd = [ |
| + 'python', |
| + SWARMING_PY, |
| + 'trigger', |
| + self._isolated_file, |
| + '--priority=' + str(self._priority) |
| + ] |
| + |
| + if GetArgs().isolate_server: |
| + cmd += ['--isolate-server', GetArgs().isolate_server] |
| + if GetArgs().swarming_server: |
| + cmd += ['--swarming', GetArgs().swarming_server] |
| + for key, value in self._dimensions: |
| + cmd += ['--dimension', key, value] |
| + |
| + cmd += ['--', '--host', str(common_lib.MY_IP), '--otp', self._otp] |
| + if self._rpc_timeout: |
| + cmd += ['--idle-timeout', str(self._rpc_timeout)] |
| + if self._verbose: |
| + cmd += ['--verbose'] |
| + |
| + logging.debug('Running %s', ' '.join(cmd)) |
| + if subprocess.call(cmd, stdout=subprocess.PIPE) != 0: |
| + raise Error('Error calling swarming.py') |
| + |
| + def _RegisterOnConnectCallback(self): |
| + """Register a callback with the discovery server. |
| + |
| + This callback is used to receive the client's IP address once it starts |
| + and contacts the discovery server. |
| + """ |
| + self._discovery_server.RegisterClientCallback(self._otp, self._OnConnect) |
| + |
| + def _OnConnect(self, ip_address): |
| + """The OnConnect callback method. |
| + |
| + This method receives the ip address received by the discovery server from |
| + the client and sets the object's connected state to True. |
| + """ |
| + self._ip_address = ip_address |
| + self._connected = True |
| + self._rpc = client_rpc_server.RPCServer.Connect(self._ip_address) |
| + logging.info('%s connected from %s', self._name, ip_address) |
| + self._connect_event.set() |