Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(188)

Unified Diff: testing/legion/client_lib.py

Issue 890773003: Adding the initial code for Omnibot multi-machine support (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: testing/legion/client_lib.py
diff --git a/testing/legion/client_lib.py b/testing/legion/client_lib.py
new file mode 100644
index 0000000000000000000000000000000000000000..4c69967d285735d57fb5443c471369f090ba2bc1
--- /dev/null
+++ b/testing/legion/client_lib.py
@@ -0,0 +1,284 @@
+# Copyright 2015 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+import argparse
Marc-Antoine Ruel (Google) 2015/01/30 18:00:21 In general we keep an empty line in between
Mike Meade 2015/01/30 21:04:04 Done.
+import logging
+import os
+import socket
+import subprocess
+import tempfile
+import threading
+import uuid
+import xmlrpclib
+
+#pylint: disable=relative-import
+import common_lib
+import client_rpc_server
+
+THIS_DIR = os.path.dirname(os.path.abspath(__file__))
+SWARMING_DIR = os.path.join(THIS_DIR, '../../tools/swarming_client')
Marc-Antoine Ruel (Google) 2015/01/30 18:00:21 if you plan this to work on windows, use os.path.j
Mike Meade 2015/01/30 21:04:04 Done.
+ISOLATE_PY = os.path.join(SWARMING_DIR, 'isolate.py')
+SWARMING_PY = os.path.join(SWARMING_DIR, 'swarming.py')
+# ISOLATE_SERVER = 'omnibot-isolate-server.appspot.com'
+# SWARMING_SERVER = 'https://omnibot-swarming-server.appspot.com/'
+CLIENT_CONNECTION_TIMEOUT = 30 * 60 # 30 minutes
+args = None
+
+
+class Error(Exception):
+ pass
+
+
+class ConnectionTimeoutError(Error):
+ pass
+
+
+def GetArgs():
+ """Parse command line args args.
+
+ Returns:
+ Parsed command line args.
+ """
+ global args
Marc-Antoine Ruel (Google) 2015/01/30 18:00:21 not a fan
Mike Meade 2015/01/30 21:04:04 Moved into the class.
+ if not args:
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--isolate-server')
+ parser.add_argument('--swarming-server')
+ parser.add_argument('--client-connection-timeout',
+ default=CLIENT_CONNECTION_TIMEOUT)
+ args, _ = parser.parse_known_args()
+ return args
+
+
+class Client(object):
+ """The main client class.
+
+ This class is used to create clients, connect to their RPC servers, and
+ run RPC commands.
+ """
+
+ _client_count = 0
+
+ def __init__(self, isolate_file, discovery_server, name=None):
+ """ctor.
+
+ Args:
+ isolate_file: The path to the client isolate file.
+ discovery_server: The discovery server to register with
+ name: A name for the client.
+ """
+ self._IncreaseCount()
+ self._discovery_server = discovery_server
+ self._name = name or self._CreateName()
+ self._priority = 100
+ self._isolate_file = isolate_file
+ self._isolated_file = isolate_file + 'd'
+ self._connected = False
+ self._connect_event = threading.Event()
+ self._ip_address = None
+ self._config_vars = []
+ self._dimensions = []
+ self._rpc_timeout = None
+ self._otp = str(uuid.uuid1())
Marc-Antoine Ruel (Google) 2015/01/30 18:00:21 eventually it'd be nice for this to be determinist
Mike Meade 2015/01/30 21:04:04 Maybe HOST_FQDN\CLIENT_NAME\SHA-1_LEGION_FILES? T
+ self._rpc = None
+ self._verbose = False
+
+ @property
+ def connected(self):
+ """Return the value of self._connected."""
+ return self._connected
+
+ @property
+ def connect_event(self):
+ return self._connect_event
+
+ @property
+ def rpc(self):
+ """Return the rpc object."""
+ return self._rpc
+
+ @property
+ def priority(self):
+ return self._priority
+
+ @property
+ def rpc_idle_timeout(self):
+ return self._rpc_idle_timeout
+
+ @property
+ def name(self):
+ return self._name
+
+ @classmethod
+ def _IncreaseCount(cls):
+ """Increase the client_count parameter."""
+ cls._client_count += 1
+
+ @classmethod
+ def _CreateName(cls):
+ """Create a name for this client.
+
+ By default the name is "Client%s" where %s is the number of clients that
+ currently exist.
+ """
+ return 'Client%d' % cls._client_count
+
+ def SetPriority(self, priority):
Marc-Antoine Ruel (Google) 2015/01/30 18:00:21 Why not @priority.setter def priority(self, value)
Mike Meade 2015/01/30 21:04:04 The only reason I added the method like this was f
+ """Sets the priority of the client task.
+
+ Args:
+ priority: The priority to set.
+ """
+ logging.debug('Setting %s priority to %s', self._name, priority)
+ self._priority = priority
+
+ def SetVerbose(self):
+ """Set the client verbosity to debug."""
+ logging.debug('Setting %s --verbosity', self._name)
+ self._verbose = True
+
+ def AddConfigVars(self, key, value):
+ """Add a set of config vars to isolate.py.
+
+ Args:
+ key: The config vars key.
+ value: The config vars value.
+ """
+ logging.debug('Adding --config-var %s %s to %s', key, value,
+ self._name)
+ self._config_vars.append((key, value))
+
+ def AddDimension(self, key, value):
+ """Add a set of dimensions to swarming.py.
+
+ Args:
+ key: The dimension key.
+ value: The dimension value.
+ """
+ logging.debug('Adding --dimension %s %s to %s', key, value,
+ self._name)
+ self._dimensions.append((key, value))
+
+ def SetRPCTimeout(self, timeout):
+ """Sets the client's RPC timeout value.
+
+ If the RPC server does not receive an RPC request within this time the
+ client controller will exit.
+
+ Args:
+ timeout: The RPC server timeout in seconds.
+ """
+ logging.debug('Setting %s RPC timeout to %s', self._name, timeout)
+ self._rpc_timeout = timeout
+
+ def Create(self, wait=False, timeout=None):
+ """Create the client machine and wait for it to be created if specified.
+
+ Args:
+ wait: True to block until created, False to return immediately.
+ timeout: The timeout to block before raising a ConnectionTimeoutError.
+ """
+ logging.info('Creating %s', self._name)
+ self._connect_event.clear()
+ self._RegisterOnConnectCallback()
+ self._ExecuteIsolate()
+ self._ExecuteSwarming()
+ if wait:
+ self.WaitForConnection(timeout)
+
+ def WaitForConnection(self, timeout=None):
+ """Connect to the client machine.
+
+ This method waits for the client machine to register itself
+ with the discovery server.
+
+ Args:
+ timeout: The timeout in seconds.
+
+ Raises:
+ TimeoutError if the client doesn't connect in time.
+ """
+ timeout = timeout or GetArgs().client_connection_timeout
+ msg = ('Waiting for %s to connect with a timeout of %d seconds' %
+ (self._name, timeout))
+ logging.info(msg)
+ self._connect_event.wait(timeout)
+ if not self._connect_event.is_set():
+ raise ConnectionTimeoutError()
+
+ def Release(self):
+ """Quit the client's RPC server so it can release the machine."""
+ if self._rpc is not None and self._connected:
+ logging.info('Releasing %s', self._name)
+ try:
+ self._rpc.Quit()
+ except (socket.error, xmlrpclib.Fault):
+ logging.error('Unable to connect to %s to call Quit', self._name)
+ self._rpc = None
+ self._connected = False
+
+ def _ExecuteIsolate(self):
+ """Execute isolate.py with the given args."""
+ cmd = [
+ 'python',
+ ISOLATE_PY,
+ 'archive',
+ '--isolate=' + self._isolate_file,
+ '--isolated=' + self._isolated_file,
+ ]
+
+ if GetArgs().isolate_server:
+ cmd += ['--isolate-server', GetArgs().isolate_server]
+ for key, value in self._config_vars:
+ cmd += ['--config-var', key, value]
+
+ logging.debug('Running %s', ' '.join(cmd))
+ if subprocess.call(cmd, stdout=subprocess.PIPE) != 0:
+ raise Error('Error calling isolate.py')
+
+ def _ExecuteSwarming(self):
+ """Execute swarming.py with the vars."""
+ cmd = [
+ 'python',
+ SWARMING_PY,
+ 'trigger',
+ self._isolated_file,
+ '--priority=' + str(self._priority)
+ ]
+
+ if GetArgs().isolate_server:
+ cmd += ['--isolate-server', GetArgs().isolate_server]
+ if GetArgs().swarming_server:
+ cmd += ['--swarming', GetArgs().swarming_server]
+ for key, value in self._dimensions:
+ cmd += ['--dimension', key, value]
+
+ cmd += ['--', '--host', str(common_lib.MY_IP), '--otp', self._otp]
+ if self._rpc_timeout:
+ cmd += ['--idle-timeout', str(self._rpc_timeout)]
+ if self._verbose:
+ cmd += ['--verbose']
+
+ logging.debug('Running %s', ' '.join(cmd))
+ if subprocess.call(cmd, stdout=subprocess.PIPE) != 0:
+ raise Error('Error calling swarming.py')
+
+ def _RegisterOnConnectCallback(self):
+ """Register a callback with the discovery server.
+
+ This callback is used to receive the client's IP address once it starts
+ and contacts the discovery server.
+ """
+ self._discovery_server.RegisterClientCallback(self._otp, self._OnConnect)
+
+ def _OnConnect(self, ip_address):
+ """The OnConnect callback method.
+
+ This method receives the ip address received by the discovery server from
+ the client and sets the object's connected state to True.
+ """
+ self._ip_address = ip_address
+ self._connected = True
+ self._rpc = client_rpc_server.RPCServer.Connect(self._ip_address)
+ logging.info('%s connected from %s', self._name, ip_address)
+ self._connect_event.set()

Powered by Google App Engine
This is Rietveld 408576698