| Index: testing/legion/examples/subprocess/subprocess_test.py
|
| diff --git a/testing/legion/examples/subprocess/subprocess_test.py b/testing/legion/examples/subprocess/subprocess_test.py
|
| deleted file mode 100755
|
| index cea871a4051f9fb28b950ae461b50dc47573cdcf..0000000000000000000000000000000000000000
|
| --- a/testing/legion/examples/subprocess/subprocess_test.py
|
| +++ /dev/null
|
| @@ -1,95 +0,0 @@
|
| -#!/usr/bin/env python
|
| -# Copyright 2015 The Chromium Authors. All rights reserved.
|
| -# Use of this source code is governed by a BSD-style license that can be
|
| -# found in the LICENSE file.
|
| -
|
| -"""A host test module demonstrating interacting with remote subprocesses."""
|
| -
|
| -import argparse
|
| -import logging
|
| -import os
|
| -import sys
|
| -import time
|
| -import xmlrpclib
|
| -
|
| -# Map the testing directory so we can import legion.legion_test.
|
| -TESTING_DIR = os.path.join(
|
| - os.path.dirname(os.path.abspath(__file__)),
|
| - '..', '..', '..', '..', 'testing')
|
| -sys.path.append(TESTING_DIR)
|
| -
|
| -from legion import legion_test_case
|
| -
|
| -
|
| -class ExampleTestController(legion_test_case.TestCase):
|
| - """An example controller using the remote subprocess functions."""
|
| -
|
| - @classmethod
|
| - def setUpClass(cls):
|
| - """Creates the task machine and waits until it connects."""
|
| - parser = argparse.ArgumentParser()
|
| - parser.add_argument('--task-hash')
|
| - parser.add_argument('--os', default='Ubuntu-14.04')
|
| - args, _ = parser.parse_known_args()
|
| -
|
| - cls.task = cls.CreateTask(
|
| - isolated_hash=args.task_hash,
|
| - dimensions={'os': args.os},
|
| - idle_timeout_secs=90,
|
| - connection_timeout_secs=90,
|
| - verbosity=logging.DEBUG)
|
| - cls.task.Create()
|
| - cls.task.WaitForConnection()
|
| -
|
| - def testMultipleProcesses(self):
|
| - """Tests that processes can be run and controlled simultaneously."""
|
| - start = time.time()
|
| - logging.info('Starting "sleep 10" and "sleep 20"')
|
| - sleep10 = self.task.Process(['sleep', '10'])
|
| - sleep20 = self.task.Process(['sleep', '20'])
|
| -
|
| - logging.info('Waiting for sleep 10 to finish and verifying timing')
|
| - sleep10.Wait()
|
| - elapsed = time.time() - start
|
| - self.assertGreaterEqual(elapsed, 10)
|
| - self.assertLess(elapsed, 11)
|
| -
|
| - logging.info('Waiting for sleep 20 to finish and verifying timing')
|
| - sleep20.Wait()
|
| - elapsed = time.time() - start
|
| - self.assertGreaterEqual(elapsed, 20)
|
| -
|
| - sleep10.Delete()
|
| - sleep20.Delete()
|
| -
|
| - def testTerminate(self):
|
| - """Tests that a process can be correctly terminated."""
|
| - start = time.time()
|
| -
|
| - logging.info('Starting "sleep 20"')
|
| - sleep20 = self.task.Process(['sleep', '20'])
|
| - logging.info('Calling Terminate()')
|
| - sleep20.Terminate()
|
| - try:
|
| - logging.info('Trying to wait for sleep 20 to complete')
|
| - sleep20.Wait()
|
| - except xmlrpclib.Fault:
|
| - pass
|
| - finally:
|
| - sleep20.Delete()
|
| - logging.info('Checking to make sure sleep 20 was actually terminated')
|
| - self.assertLess(time.time() - start, 20)
|
| -
|
| - def testLs(self):
|
| - """Tests that the returned results from a process are correct."""
|
| - logging.info('Calling "ls"')
|
| - ls = self.task.Process(['ls'])
|
| - logging.info('Trying to wait for ls to complete')
|
| - ls.Wait()
|
| - logging.info('Checking that ls completed and returned the correct results')
|
| - self.assertEqual(ls.GetReturncode(), 0)
|
| - self.assertIn('task.isolate', ls.ReadStdout())
|
| -
|
| -
|
| -if __name__ == '__main__':
|
| - legion_test_case.main()
|
|
|