| Index: client/site_tests/network_3GDisableWhileConnecting/network_3GDisableWhileConnecting.py
|
| diff --git a/client/site_tests/network_3GDisableWhileConnecting/network_3GDisableWhileConnecting.py b/client/site_tests/network_3GDisableWhileConnecting/network_3GDisableWhileConnecting.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..5462d57d5e887ac2219d9318093bff949e29f37c
|
| --- /dev/null
|
| +++ b/client/site_tests/network_3GDisableWhileConnecting/network_3GDisableWhileConnecting.py
|
| @@ -0,0 +1,181 @@
|
| +#!/usr/bin/env python
|
| +# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
|
| +# Use of this source code is governed by a BSD-style license that can be
|
| +# found in the LICENSE file.
|
| +
|
| +from autotest_lib.client.bin import test, utils
|
| +from autotest_lib.client.common_lib import error
|
| +
|
| +import logging, pprint, time, traceback, sys
|
| +import dbus, dbus.mainloop.glib, glib, gobject
|
| +
|
| +from autotest_lib.client.cros import flimflam_test_path
|
| +import mm, flimflam
|
| +
|
| +import os
|
| +
|
| +def ExceptionForward(func):
|
| + def wrapper(self, *args, **kwargs):
|
| + try:
|
| + return func(self, *args, **kwargs)
|
| + except Exception, e:
|
| + logging.warning('Saving exception: %s' % e)
|
| + logging.warning(''.join(traceback.format_exception(*sys.exc_info())))
|
| + self._forwarded_exception = e
|
| + self.main_loop.quit()
|
| + return False
|
| + return wrapper
|
| +
|
| +class DisconnnectTesterMainLoop(object):
|
| + version = 1
|
| +
|
| + def __init__(self, test, main_loop):
|
| + self._forwarded_exception = None
|
| + self.main_loop = main_loop
|
| + self.test = test
|
| +
|
| + def assert_(self, arg):
|
| + self.test.assert_(self, arg)
|
| +
|
| + @ExceptionForward
|
| + def timeout_main_loop(self):
|
| + logging.warning('Requirements unsatisfied upon timeout: %s' %
|
| + self.remaining_requirements)
|
| + self.main_loop.quit()
|
| + raise error.TestFail('Main loop timed out')
|
| +
|
| + def requirement_completed(self, requirement, warn_if_already_completed=True):
|
| + """Record that a requirement was completed. Exit if all are."""
|
| + should_log = True
|
| + try:
|
| + self.remaining_requirements.remove(requirement)
|
| + except KeyError:
|
| + if warn_if_already_completed:
|
| + logging.warning('requirement %s was not present to be completed',
|
| + requirement)
|
| + else:
|
| + should_log = False
|
| +
|
| + if not self.remaining_requirements:
|
| + logging.info('All requirements satisfied')
|
| + self.main_loop.quit()
|
| + else:
|
| + if should_log:
|
| + logging.info('Requirement %s satisfied. Remaining: %s' %
|
| + (requirement, self.remaining_requirements))
|
| +
|
| + def perform_one_test(self):
|
| + """Subclasses override this function to do their testing."""
|
| + raise Exception('perform_one_test must be overridden')
|
| +
|
| + @ExceptionForward
|
| + def generic_dbus_error_handler(self, e):
|
| + raise error.TestFail('Dbus call failed: %s' % e)
|
| +
|
| + def run(self, **kwargs):
|
| + self.test_args = kwargs
|
| + gobject.timeout_add(int(self.test_args.get('timeout_s', 10) * 1000),
|
| + self.timeout_main_loop)
|
| + gobject.idle_add(self.perform_one_test)
|
| + self.main_loop.run()
|
| + if self._forwarded_exception:
|
| + raise self._forwarded_exception
|
| + self.after_main_loop()
|
| +
|
| +class ModemDisconnectTester(DisconnnectTesterMainLoop):
|
| + def __init__(self, test, main_loop):
|
| + super(ModemDisconnectTester, self).__init__(test, main_loop)
|
| + self.remaining_requirements = set(['connect', 'disable', 'get_status'])
|
| +
|
| + def modem_enabled(self):
|
| + return self.modem_manager.Properties(self.modem_path).get('Enabled', -1)
|
| +
|
| + def configure_modem(self):
|
| + self.modem_manager, self.modem_path = mm.PickOneModem('')
|
| + self.modem = self.modem_manager.Modem(self.modem_path)
|
| + self.simple_modem = self.modem_manager.SimpleModem(self.modem_path)
|
| + self.gobi_modem = self.modem_manager.GobiModem(self.modem_path)
|
| +
|
| + if self.gobi_modem:
|
| + sleep_ms = self.test_args.get('async_connect_sleep_ms', 0)
|
| +
|
| + # Tell the modem manager to sleep this long before completing a
|
| + # connect
|
| + self.gobi_modem.InjectFault('AsyncConnectSleepMs', sleep_ms)
|
| +
|
| + self.modem.Enable(False)
|
| + self.modem.Enable(True)
|
| +
|
| + @ExceptionForward
|
| + def perform_one_test(self):
|
| + self.configure_modem()
|
| + logging.info('connecting')
|
| +
|
| + retval = self.simple_modem.Connect(
|
| + {},
|
| + reply_handler=self.connect_success_handler,
|
| + error_handler=self.connect_error_handler)
|
| + logging.info('connect call made. retval = %s', retval)
|
| +
|
| + disable_delay_ms = (
|
| + self.test_args.get('delay_before_disable_ms', 0) +
|
| + self.test.iteration *
|
| + self.test_args.get('disable_delay_per_iteration_ms', 0))
|
| + gobject.timeout_add(disable_delay_ms, self.start_disable)
|
| +
|
| + self.status_delay_ms = self.test_args.get('status_delay_ms', 200)
|
| + gobject.timeout_add(self.status_delay_ms, self.start_get_status)
|
| +
|
| + @ExceptionForward
|
| + def connect_success_handler(self, *ignored_args):
|
| + logging.info('Reply done')
|
| + self.requirement_completed('connect')
|
| +
|
| + @ExceptionForward
|
| + def connect_error_handler(self, *ignored_args):
|
| + logging.info('Reply errored.')
|
| + self.requirement_completed('connect')
|
| +
|
| + @ExceptionForward
|
| + def start_disable(self):
|
| + logging.info('disabling')
|
| + self.disable_start = time.time()
|
| + self.modem.Enable(False,
|
| + reply_handler=self.disable_success_handler,
|
| + error_handler=self.generic_dbus_error_handler)
|
| +
|
| + @ExceptionForward
|
| + def disable_success_handler(self):
|
| + disable_elapsed = time.time() - self.disable_start
|
| + self.assert_(disable_elapsed <
|
| + 1.0 + self.test_args.get('async_connect_sleep_ms', 0))
|
| + self.requirement_completed('disable')
|
| +
|
| + @ExceptionForward
|
| + def start_get_status(self):
|
| + # Keep on calling get_status to make sure it works at all times
|
| + self.simple_modem.GetStatus(reply_handler=self.get_status_success_handler,
|
| + error_handler=self.generic_dbus_error_handler)
|
| +
|
| + @ExceptionForward
|
| + def get_status_success_handler(self, status):
|
| + logging.info('Got status')
|
| + self.requirement_completed('get_status', warn_if_already_completed=False)
|
| + gobject.timeout_add(self.status_delay_ms, self.start_get_status)
|
| +
|
| + def after_main_loop(self):
|
| + enabled = self.modem_enabled()
|
| + logging.info('Modem enabled: %s', enabled)
|
| + self.assert_(enabled == 0)
|
| +
|
| +
|
| +class network_3GDisconnectWhileConnecting(test.test):
|
| + version = 1
|
| + def run_once(self, **kwargs):
|
| + logging.info('setting main loop')
|
| + dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
|
| + self.main_loop = gobject.MainLoop()
|
| +
|
| + modem = ModemDisconnectTester(self, self.main_loop)
|
| +
|
| + modem.run(**kwargs)
|
|
|