OLD | NEW |
| (Empty) |
1 #!/usr/bin/env python | |
2 # Copyright 2015 The Chromium Authors. All rights reserved. | |
3 # Use of this source code is governed by a BSD-style license that can be | |
4 # found in the LICENSE file. | |
5 | |
6 """A host test module demonstrating interacting with remote subprocesses.""" | |
7 | |
8 import argparse | |
9 import logging | |
10 import os | |
11 import sys | |
12 import time | |
13 import xmlrpclib | |
14 | |
15 # Map the testing directory so we can import legion.legion_test. | |
16 TESTING_DIR = os.path.join( | |
17 os.path.dirname(os.path.abspath(__file__)), | |
18 '..', '..', '..', '..', 'testing') | |
19 sys.path.append(TESTING_DIR) | |
20 | |
21 from legion import legion_test_case | |
22 | |
23 | |
24 class ExampleTestController(legion_test_case.TestCase): | |
25 """An example controller using the remote subprocess functions.""" | |
26 | |
27 @classmethod | |
28 def setUpClass(cls): | |
29 """Creates the task machine and waits until it connects.""" | |
30 parser = argparse.ArgumentParser() | |
31 parser.add_argument('--task-hash') | |
32 parser.add_argument('--os', default='Ubuntu-14.04') | |
33 args, _ = parser.parse_known_args() | |
34 | |
35 cls.task = cls.CreateTask( | |
36 isolated_hash=args.task_hash, | |
37 dimensions={'os': args.os}, | |
38 idle_timeout_secs=90, | |
39 connection_timeout_secs=90, | |
40 verbosity=logging.DEBUG) | |
41 cls.task.Create() | |
42 cls.task.WaitForConnection() | |
43 | |
44 def testMultipleProcesses(self): | |
45 """Tests that processes can be run and controlled simultaneously.""" | |
46 start = time.time() | |
47 logging.info('Starting "sleep 10" and "sleep 20"') | |
48 sleep10 = self.task.Process(['sleep', '10']) | |
49 sleep20 = self.task.Process(['sleep', '20']) | |
50 | |
51 logging.info('Waiting for sleep 10 to finish and verifying timing') | |
52 sleep10.Wait() | |
53 elapsed = time.time() - start | |
54 self.assertGreaterEqual(elapsed, 10) | |
55 self.assertLess(elapsed, 11) | |
56 | |
57 logging.info('Waiting for sleep 20 to finish and verifying timing') | |
58 sleep20.Wait() | |
59 elapsed = time.time() - start | |
60 self.assertGreaterEqual(elapsed, 20) | |
61 | |
62 sleep10.Delete() | |
63 sleep20.Delete() | |
64 | |
65 def testTerminate(self): | |
66 """Tests that a process can be correctly terminated.""" | |
67 start = time.time() | |
68 | |
69 logging.info('Starting "sleep 20"') | |
70 sleep20 = self.task.Process(['sleep', '20']) | |
71 logging.info('Calling Terminate()') | |
72 sleep20.Terminate() | |
73 try: | |
74 logging.info('Trying to wait for sleep 20 to complete') | |
75 sleep20.Wait() | |
76 except xmlrpclib.Fault: | |
77 pass | |
78 finally: | |
79 sleep20.Delete() | |
80 logging.info('Checking to make sure sleep 20 was actually terminated') | |
81 self.assertLess(time.time() - start, 20) | |
82 | |
83 def testLs(self): | |
84 """Tests that the returned results from a process are correct.""" | |
85 logging.info('Calling "ls"') | |
86 ls = self.task.Process(['ls']) | |
87 logging.info('Trying to wait for ls to complete') | |
88 ls.Wait() | |
89 logging.info('Checking that ls completed and returned the correct results') | |
90 self.assertEqual(ls.GetReturncode(), 0) | |
91 self.assertIn('task.isolate', ls.ReadStdout()) | |
92 | |
93 | |
94 if __name__ == '__main__': | |
95 legion_test_case.main() | |
OLD | NEW |