Index: testing/legion/client_lib.py |
diff --git a/testing/legion/client_lib.py b/testing/legion/client_lib.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..4c69967d285735d57fb5443c471369f090ba2bc1 |
--- /dev/null |
+++ b/testing/legion/client_lib.py |
@@ -0,0 +1,284 @@ |
+# Copyright 2015 The Chromium Authors. All rights reserved. |
+# Use of this source code is governed by a BSD-style license that can be |
+# found in the LICENSE file. |
+import argparse |
Marc-Antoine Ruel (Google)
2015/01/30 18:00:21
In general we keep an empty line in between
Mike Meade
2015/01/30 21:04:04
Done.
|
+import logging |
+import os |
+import socket |
+import subprocess |
+import tempfile |
+import threading |
+import uuid |
+import xmlrpclib |
+ |
+#pylint: disable=relative-import |
+import common_lib |
+import client_rpc_server |
+ |
+THIS_DIR = os.path.dirname(os.path.abspath(__file__)) |
+SWARMING_DIR = os.path.join(THIS_DIR, '../../tools/swarming_client') |
Marc-Antoine Ruel (Google)
2015/01/30 18:00:21
if you plan this to work on windows, use os.path.j
Mike Meade
2015/01/30 21:04:04
Done.
|
+ISOLATE_PY = os.path.join(SWARMING_DIR, 'isolate.py') |
+SWARMING_PY = os.path.join(SWARMING_DIR, 'swarming.py') |
+# ISOLATE_SERVER = 'omnibot-isolate-server.appspot.com' |
+# SWARMING_SERVER = 'https://omnibot-swarming-server.appspot.com/' |
+CLIENT_CONNECTION_TIMEOUT = 30 * 60 # 30 minutes |
+args = None |
+ |
+ |
+class Error(Exception): |
+ pass |
+ |
+ |
+class ConnectionTimeoutError(Error): |
+ pass |
+ |
+ |
+def GetArgs(): |
+ """Parse command line args args. |
+ |
+ Returns: |
+ Parsed command line args. |
+ """ |
+ global args |
Marc-Antoine Ruel (Google)
2015/01/30 18:00:21
not a fan
Mike Meade
2015/01/30 21:04:04
Moved into the class.
|
+ if not args: |
+ parser = argparse.ArgumentParser() |
+ parser.add_argument('--isolate-server') |
+ parser.add_argument('--swarming-server') |
+ parser.add_argument('--client-connection-timeout', |
+ default=CLIENT_CONNECTION_TIMEOUT) |
+ args, _ = parser.parse_known_args() |
+ return args |
+ |
+ |
+class Client(object): |
+ """The main client class. |
+ |
+ This class is used to create clients, connect to their RPC servers, and |
+ run RPC commands. |
+ """ |
+ |
+ _client_count = 0 |
+ |
+ def __init__(self, isolate_file, discovery_server, name=None): |
+ """ctor. |
+ |
+ Args: |
+ isolate_file: The path to the client isolate file. |
+ discovery_server: The discovery server to register with |
+ name: A name for the client. |
+ """ |
+ self._IncreaseCount() |
+ self._discovery_server = discovery_server |
+ self._name = name or self._CreateName() |
+ self._priority = 100 |
+ self._isolate_file = isolate_file |
+ self._isolated_file = isolate_file + 'd' |
+ self._connected = False |
+ self._connect_event = threading.Event() |
+ self._ip_address = None |
+ self._config_vars = [] |
+ self._dimensions = [] |
+ self._rpc_timeout = None |
+ self._otp = str(uuid.uuid1()) |
Marc-Antoine Ruel (Google)
2015/01/30 18:00:21
eventually it'd be nice for this to be determinist
Mike Meade
2015/01/30 21:04:04
Maybe HOST_FQDN\CLIENT_NAME\SHA-1_LEGION_FILES?
T
|
+ self._rpc = None |
+ self._verbose = False |
+ |
+ @property |
+ def connected(self): |
+ """Return the value of self._connected.""" |
+ return self._connected |
+ |
+ @property |
+ def connect_event(self): |
+ return self._connect_event |
+ |
+ @property |
+ def rpc(self): |
+ """Return the rpc object.""" |
+ return self._rpc |
+ |
+ @property |
+ def priority(self): |
+ return self._priority |
+ |
+ @property |
+ def rpc_idle_timeout(self): |
+ return self._rpc_idle_timeout |
+ |
+ @property |
+ def name(self): |
+ return self._name |
+ |
+ @classmethod |
+ def _IncreaseCount(cls): |
+ """Increase the client_count parameter.""" |
+ cls._client_count += 1 |
+ |
+ @classmethod |
+ def _CreateName(cls): |
+ """Create a name for this client. |
+ |
+ By default the name is "Client%s" where %s is the number of clients that |
+ currently exist. |
+ """ |
+ return 'Client%d' % cls._client_count |
+ |
+ def SetPriority(self, priority): |
Marc-Antoine Ruel (Google)
2015/01/30 18:00:21
Why not
@priority.setter
def priority(self, value)
Mike Meade
2015/01/30 21:04:04
The only reason I added the method like this was f
|
+ """Sets the priority of the client task. |
+ |
+ Args: |
+ priority: The priority to set. |
+ """ |
+ logging.debug('Setting %s priority to %s', self._name, priority) |
+ self._priority = priority |
+ |
+ def SetVerbose(self): |
+ """Set the client verbosity to debug.""" |
+ logging.debug('Setting %s --verbosity', self._name) |
+ self._verbose = True |
+ |
+ def AddConfigVars(self, key, value): |
+ """Add a set of config vars to isolate.py. |
+ |
+ Args: |
+ key: The config vars key. |
+ value: The config vars value. |
+ """ |
+ logging.debug('Adding --config-var %s %s to %s', key, value, |
+ self._name) |
+ self._config_vars.append((key, value)) |
+ |
+ def AddDimension(self, key, value): |
+ """Add a set of dimensions to swarming.py. |
+ |
+ Args: |
+ key: The dimension key. |
+ value: The dimension value. |
+ """ |
+ logging.debug('Adding --dimension %s %s to %s', key, value, |
+ self._name) |
+ self._dimensions.append((key, value)) |
+ |
+ def SetRPCTimeout(self, timeout): |
+ """Sets the client's RPC timeout value. |
+ |
+ If the RPC server does not receive an RPC request within this time the |
+ client controller will exit. |
+ |
+ Args: |
+ timeout: The RPC server timeout in seconds. |
+ """ |
+ logging.debug('Setting %s RPC timeout to %s', self._name, timeout) |
+ self._rpc_timeout = timeout |
+ |
+ def Create(self, wait=False, timeout=None): |
+ """Create the client machine and wait for it to be created if specified. |
+ |
+ Args: |
+ wait: True to block until created, False to return immediately. |
+ timeout: The timeout to block before raising a ConnectionTimeoutError. |
+ """ |
+ logging.info('Creating %s', self._name) |
+ self._connect_event.clear() |
+ self._RegisterOnConnectCallback() |
+ self._ExecuteIsolate() |
+ self._ExecuteSwarming() |
+ if wait: |
+ self.WaitForConnection(timeout) |
+ |
+ def WaitForConnection(self, timeout=None): |
+ """Connect to the client machine. |
+ |
+ This method waits for the client machine to register itself |
+ with the discovery server. |
+ |
+ Args: |
+ timeout: The timeout in seconds. |
+ |
+ Raises: |
+ TimeoutError if the client doesn't connect in time. |
+ """ |
+ timeout = timeout or GetArgs().client_connection_timeout |
+ msg = ('Waiting for %s to connect with a timeout of %d seconds' % |
+ (self._name, timeout)) |
+ logging.info(msg) |
+ self._connect_event.wait(timeout) |
+ if not self._connect_event.is_set(): |
+ raise ConnectionTimeoutError() |
+ |
+ def Release(self): |
+ """Quit the client's RPC server so it can release the machine.""" |
+ if self._rpc is not None and self._connected: |
+ logging.info('Releasing %s', self._name) |
+ try: |
+ self._rpc.Quit() |
+ except (socket.error, xmlrpclib.Fault): |
+ logging.error('Unable to connect to %s to call Quit', self._name) |
+ self._rpc = None |
+ self._connected = False |
+ |
+ def _ExecuteIsolate(self): |
+ """Execute isolate.py with the given args.""" |
+ cmd = [ |
+ 'python', |
+ ISOLATE_PY, |
+ 'archive', |
+ '--isolate=' + self._isolate_file, |
+ '--isolated=' + self._isolated_file, |
+ ] |
+ |
+ if GetArgs().isolate_server: |
+ cmd += ['--isolate-server', GetArgs().isolate_server] |
+ for key, value in self._config_vars: |
+ cmd += ['--config-var', key, value] |
+ |
+ logging.debug('Running %s', ' '.join(cmd)) |
+ if subprocess.call(cmd, stdout=subprocess.PIPE) != 0: |
+ raise Error('Error calling isolate.py') |
+ |
+ def _ExecuteSwarming(self): |
+ """Execute swarming.py with the vars.""" |
+ cmd = [ |
+ 'python', |
+ SWARMING_PY, |
+ 'trigger', |
+ self._isolated_file, |
+ '--priority=' + str(self._priority) |
+ ] |
+ |
+ if GetArgs().isolate_server: |
+ cmd += ['--isolate-server', GetArgs().isolate_server] |
+ if GetArgs().swarming_server: |
+ cmd += ['--swarming', GetArgs().swarming_server] |
+ for key, value in self._dimensions: |
+ cmd += ['--dimension', key, value] |
+ |
+ cmd += ['--', '--host', str(common_lib.MY_IP), '--otp', self._otp] |
+ if self._rpc_timeout: |
+ cmd += ['--idle-timeout', str(self._rpc_timeout)] |
+ if self._verbose: |
+ cmd += ['--verbose'] |
+ |
+ logging.debug('Running %s', ' '.join(cmd)) |
+ if subprocess.call(cmd, stdout=subprocess.PIPE) != 0: |
+ raise Error('Error calling swarming.py') |
+ |
+ def _RegisterOnConnectCallback(self): |
+ """Register a callback with the discovery server. |
+ |
+ This callback is used to receive the client's IP address once it starts |
+ and contacts the discovery server. |
+ """ |
+ self._discovery_server.RegisterClientCallback(self._otp, self._OnConnect) |
+ |
+ def _OnConnect(self, ip_address): |
+ """The OnConnect callback method. |
+ |
+ This method receives the ip address received by the discovery server from |
+ the client and sets the object's connected state to True. |
+ """ |
+ self._ip_address = ip_address |
+ self._connected = True |
+ self._rpc = client_rpc_server.RPCServer.Connect(self._ip_address) |
+ logging.info('%s connected from %s', self._name, ip_address) |
+ self._connect_event.set() |