OLD | NEW |
---|---|
(Empty) | |
1 #!/usr/bin/env python | |
2 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. | |
3 # Use of this source code is governed by a BSD-style license that can be | |
4 # found in the LICENSE file. | |
5 | |
6 from autotest_lib.client.bin import test, utils | |
7 from autotest_lib.client.common_lib import error | |
8 | |
9 import logging, pprint, time, traceback, sys | |
10 import dbus, dbus.mainloop.glib, glib, gobject | |
11 | |
12 from autotest_lib.client.cros import flimflam_test_path | |
13 import mm, flimflam | |
14 | |
15 import os | |
16 | |
17 def ExceptionForward(func): | |
18 def wrapper(self, *args, **kwargs): | |
19 try: | |
20 return func(self, *args, **kwargs) | |
21 except Exception, e: | |
22 logging.warning('Saving exception: %s' % e) | |
23 logging.warning(''.join(traceback.format_exception(*sys.exc_info()))) | |
24 self._forwarded_exception = e | |
25 self.main_loop.quit() | |
26 return False | |
27 return wrapper | |
28 | |
29 class DisconnnectTesterMainLoop(object): | |
30 version = 1 | |
31 | |
32 def __init__(self, test, main_loop): | |
33 self._forwarded_exception = None | |
34 self.main_loop = main_loop | |
35 self.test = test | |
36 | |
37 def assert_(self, arg): | |
38 self.test.assert_(self, arg) | |
39 | |
40 @ExceptionForward | |
41 def timeout_main_loop(self): | |
42 logging.warning('Requirements unsatisfied upon timeout: %s' % | |
43 self.remaining_requirements) | |
44 self.main_loop.quit() | |
45 raise error.TestFail('Main loop timed out') | |
46 | |
47 def requirement_completed(self, requirement, warn_if_already_completed=True): | |
48 """Record that a requirement was completed. Exit if all are.""" | |
49 should_log = True | |
50 try: | |
51 self.remaining_requirements.remove(requirement) | |
52 except KeyError: | |
53 if warn_if_already_completed: | |
54 logging.warning('requirement %s was not present to be completed', | |
55 requirement) | |
56 else: | |
57 should_log = False | |
58 | |
59 if not self.remaining_requirements: | |
60 logging.info('All requirements satisfied') | |
61 self.main_loop.quit() | |
62 else: | |
63 if should_log: | |
64 logging.info('Requirement %s satisfied. Remaining: %s' % | |
65 (requirement, self.remaining_requirements)) | |
66 | |
67 def perform_one_test(self): | |
68 """Subclasses override this function to do their testing.""" | |
69 raise Exception('perform_one_test must be overridden') | |
70 | |
71 @ExceptionForward | |
72 def generic_dbus_error_handler(self, e): | |
73 raise error.TestFail('Dbus call failed: %s' % e) | |
74 | |
75 def run(self, **kwargs): | |
76 self.test_args = kwargs | |
77 gobject.timeout_add(int(self.test_args.get('timeout_s', 10) * 1000), | |
78 self.timeout_main_loop) | |
79 gobject.idle_add(self.perform_one_test) | |
80 self.main_loop.run() | |
81 if self._forwarded_exception: | |
82 raise self._forwarded_exception | |
83 self.after_main_loop() | |
84 | |
85 class ModemDisconnectTester(DisconnnectTesterMainLoop): | |
86 def __init__(self, test, main_loop): | |
87 super(ModemDisconnectTester, self).__init__(test, main_loop) | |
88 self.remaining_requirements = set(['connect','disable', 'get_status']) | |
Jason Glasgow
2011/04/11 19:33:22
space after 'connect',
rochberg
2011/04/11 19:38:23
Done.
| |
89 | |
90 def modem_enabled(self): | |
91 return self.modem_manager.Properties(self.modem_path).get('Enabled', -1) | |
92 | |
93 def configure_modem(self): | |
94 self.modem_manager, self.modem_path = mm.PickOneModem('') | |
95 self.modem = self.modem_manager.Modem(self.modem_path) | |
96 self.simple_modem = self.modem_manager.SimpleModem(self.modem_path) | |
97 self.gobi_modem = self.modem_manager.GobiModem(self.modem_path) | |
98 | |
99 if self.gobi_modem: | |
100 sleep_ms = self.test_args.get('async_connect_sleep_ms', 0) | |
101 | |
102 # Tell the modem manager to sleep this long before completing a | |
103 # connect | |
104 self.gobi_modem.InjectFault('AsyncConnectSleepMs', sleep_ms) | |
105 | |
106 self.modem.Enable(False) | |
107 self.modem.Enable(True) | |
108 | |
109 @ExceptionForward | |
110 def perform_one_test(self): | |
111 self.configure_modem() | |
112 logging.info('connecting') | |
113 | |
114 retval = self.simple_modem.Connect( | |
115 {}, | |
116 reply_handler=self.connect_success_handler, | |
117 error_handler=self.connect_error_handler) | |
118 logging.info('connect call made. retval = %s', retval) | |
119 | |
120 disable_delay_ms = ( | |
121 self.test_args.get('delay_before_disable_ms', 0) + | |
122 self.test.iteration * | |
123 self.test_args.get('disable_delay_per_iteration_ms', 0)) | |
124 gobject.timeout_add(disable_delay_ms, self.start_disable) | |
125 | |
126 self.status_delay_ms = self.test_args.get('status_delay_ms', 200) | |
127 gobject.timeout_add(self.status_delay_ms, self.start_get_status) | |
128 | |
129 | |
Jason Glasgow
2011/04/11 19:33:22
extra line
rochberg
2011/04/11 19:38:23
Done.
| |
130 @ExceptionForward | |
131 def connect_success_handler(self, *ignored_args): | |
132 logging.info('Reply done') | |
133 self.requirement_completed('connect') | |
134 | |
135 @ExceptionForward | |
136 def connect_error_handler(self, *ignored_args): | |
137 logging.info('Reply errored.') | |
138 self.requirement_completed('connect') | |
139 | |
140 @ExceptionForward | |
141 def start_disable(self): | |
142 logging.info('disabling') | |
143 self.disable_start = time.time() | |
144 self.modem.Enable(False, | |
145 reply_handler=self.disable_success_handler, | |
146 error_handler=self.generic_dbus_error_handler) | |
147 | |
148 @ExceptionForward | |
149 def disable_success_handler(self): | |
150 disable_elapsed = time.time() - self.disable_start | |
151 self.assert_(disable_elapsed < | |
152 1.0 + self.test_args.get('async_connect_sleep_ms', 0)) | |
153 self.requirement_completed('disable') | |
154 | |
155 @ExceptionForward | |
156 def start_get_status(self): | |
157 self.simple_modem.GetStatus(reply_handler=self.get_status_success_handler, | |
158 error_handler=self.generic_dbus_error_handler) | |
159 | |
160 @ExceptionForward | |
161 def get_status_success_handler(self, status): | |
162 logging.info('Got status') | |
163 self.requirement_completed('get_status', warn_if_already_completed=False) | |
164 gobject.timeout_add(self.status_delay_ms, self.start_get_status) | |
165 | |
166 def after_main_loop(self): | |
167 enabled = self.modem_enabled() | |
168 logging.info('Modem enabled: %s', enabled) | |
169 self.assert_(enabled == 0) | |
170 | |
171 | |
172 class network_3GDisconnectWhileConnecting(test.test): | |
173 version = 1 | |
174 def run_once(self, **kwargs): | |
175 logging.info('setting main loop') | |
176 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) | |
177 self.main_loop = gobject.MainLoop() | |
178 | |
179 modem = ModemDisconnectTester(self, self.main_loop) | |
180 | |
181 modem.run(**kwargs) | |
OLD | NEW |