Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1093)

Unified Diff: testing/legion/task_controller.py

Issue 951673002: Revert "Pull chromium at 2c3ffb2355a27c32f45e508ef861416b820c823b" (Closed) Base URL: git@github.com:domokit/mojo.git@master
Patch Set: Created 5 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « testing/legion/run_task.py ('k') | testing/legion/task_registration_server.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: testing/legion/task_controller.py
diff --git a/testing/legion/task_controller.py b/testing/legion/task_controller.py
deleted file mode 100644
index e0812b40f2c365d9d254952d89b7ba238db50936..0000000000000000000000000000000000000000
--- a/testing/legion/task_controller.py
+++ /dev/null
@@ -1,224 +0,0 @@
-# Copyright 2015 The Chromium Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""Defines the task controller library."""
-
-import argparse
-import datetime
-import logging
-import os
-import socket
-import subprocess
-import sys
-import tempfile
-import threading
-import xmlrpclib
-
-#pylint: disable=relative-import
-import common_lib
-
-ISOLATE_PY = os.path.join(common_lib.SWARMING_DIR, 'isolate.py')
-SWARMING_PY = os.path.join(common_lib.SWARMING_DIR, 'swarming.py')
-
-
-class Error(Exception):
- pass
-
-
-class ConnectionTimeoutError(Error):
- pass
-
-
-class TaskController(object):
- """Provisions, configures, and controls a task machine.
-
- This class is an abstraction of a physical task machine. It provides an
- end to end API for controlling a task machine. Operations on the task machine
- are performed using the instance's "rpc" property. A simple end to end
- scenario is as follows:
-
- task = TaskController(...)
- task.Create()
- task.WaitForConnection()
- proc = task.rpc.subprocess.Popen(['ls'])
- print task.rpc.subprocess.GetStdout(proc)
- task.Release()
- """
-
- _task_count = 0
- _tasks = []
-
- def __init__(self, isolate_file, config_vars, dimensions, priority=100,
- idle_timeout_secs=common_lib.DEFAULT_TIMEOUT_SECS,
- connection_timeout_secs=common_lib.DEFAULT_TIMEOUT_SECS,
- verbosity='ERROR', name=None):
- assert isinstance(config_vars, dict)
- assert isinstance(dimensions, dict)
- type(self)._tasks.append(self)
- type(self)._task_count += 1
- self.verbosity = verbosity
- self._name = name or 'Task%d' % type(self)._task_count
- self._priority = priority
- self._isolate_file = isolate_file
- self._isolated_file = isolate_file + 'd'
- self._idle_timeout_secs = idle_timeout_secs
- self._config_vars = config_vars
- self._dimensions = dimensions
- self._connect_event = threading.Event()
- self._connected = False
- self._ip_address = None
- self._otp = self._CreateOTP()
- self._rpc = None
-
- parser = argparse.ArgumentParser()
- parser.add_argument('--isolate-server')
- parser.add_argument('--swarming-server')
- parser.add_argument('--task-connection-timeout-secs',
- default=common_lib.DEFAULT_TIMEOUT_SECS)
- args, _ = parser.parse_known_args()
-
- self._isolate_server = args.isolate_server
- self._swarming_server = args.swarming_server
- self._connection_timeout_secs = (connection_timeout_secs or
- args.task_connection_timeout_secs)
-
- @property
- def name(self):
- return self._name
-
- @property
- def otp(self):
- return self._otp
-
- @property
- def connected(self):
- return self._connected
-
- @property
- def connect_event(self):
- return self._connect_event
-
- @property
- def rpc(self):
- return self._rpc
-
- @property
- def verbosity(self):
- return self._verbosity
-
- @verbosity.setter
- def verbosity(self, level):
- """Sets the verbosity level as a string.
-
- Either a string ('INFO', 'DEBUG', etc) or a logging level (logging.INFO,
- logging.DEBUG, etc) is allowed.
- """
- assert isinstance(level, (str, int))
- if isinstance(level, int):
- level = logging.getLevelName(level)
- self._verbosity = level #pylint: disable=attribute-defined-outside-init
-
- @classmethod
- def ReleaseAllTasks(cls):
- for task in cls._tasks:
- task.Release()
-
- def _CreateOTP(self):
- """Creates the OTP."""
- controller_name = socket.gethostname()
- test_name = os.path.basename(sys.argv[0])
- creation_time = datetime.datetime.utcnow()
- otp = 'task:%s controller:%s test:%s creation:%s' % (
- self._name, controller_name, test_name, creation_time)
- return otp
-
- def Create(self):
- """Creates the task machine."""
- logging.info('Creating %s', self.name)
- self._connect_event.clear()
- self._ExecuteIsolate()
- self._ExecuteSwarming()
-
- def WaitForConnection(self):
- """Waits for the task machine to connect.
-
- Raises:
- ConnectionTimeoutError if the task doesn't connect in time.
- """
- logging.info('Waiting for %s to connect with a timeout of %d seconds',
- self._name, self._connection_timeout_secs)
- self._connect_event.wait(self._connection_timeout_secs)
- if not self._connect_event.is_set():
- raise ConnectionTimeoutError('%s failed to connect' % self.name)
-
- def Release(self):
- """Quits the task's RPC server so it can release the machine."""
- if self._rpc is not None and self._connected:
- logging.info('Releasing %s', self._name)
- try:
- self._rpc.Quit()
- except (socket.error, xmlrpclib.Fault):
- logging.error('Unable to connect to %s to call Quit', self.name)
- self._rpc = None
- self._connected = False
-
- def _ExecuteIsolate(self):
- """Executes isolate.py."""
- cmd = [
- 'python',
- ISOLATE_PY,
- 'archive',
- '--isolate', self._isolate_file,
- '--isolated', self._isolated_file,
- ]
-
- if self._isolate_server:
- cmd.extend(['--isolate-server', self._isolate_server])
- for key, value in self._config_vars.iteritems():
- cmd.extend(['--config-var', key, value])
-
- self._ExecuteProcess(cmd)
-
- def _ExecuteSwarming(self):
- """Executes swarming.py."""
- cmd = [
- 'python',
- SWARMING_PY,
- 'trigger',
- self._isolated_file,
- '--priority', str(self._priority),
- ]
-
- if self._isolate_server:
- cmd.extend(['--isolate-server', self._isolate_server])
- if self._swarming_server:
- cmd.extend(['--swarming', self._swarming_server])
- for key, value in self._dimensions.iteritems():
- cmd.extend(['--dimension', key, value])
-
- cmd.extend([
- '--',
- '--controller', common_lib.MY_IP,
- '--otp', self._otp,
- '--verbosity', self._verbosity,
- '--idle-timeout', str(self._idle_timeout_secs),
- ])
-
- self._ExecuteProcess(cmd)
-
- def _ExecuteProcess(self, cmd):
- """Executes a process, waits for it to complete, and checks for success."""
- logging.debug('Running %s', ' '.join(cmd))
- p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- _, stderr = p.communicate()
- if p.returncode != 0:
- raise Error(stderr)
-
- def OnConnect(self, ip_address):
- """Receives task ip address on connection."""
- self._ip_address = ip_address
- self._connected = True
- self._rpc = common_lib.ConnectToServer(self._ip_address)
- logging.info('%s connected from %s', self._name, ip_address)
- self._connect_event.set()
« no previous file with comments | « testing/legion/run_task.py ('k') | testing/legion/task_registration_server.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698