Index: server/site_bsd_router.py |
diff --git a/server/site_bsd_router.py b/server/site_bsd_router.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..52224243e1996c178ef9da7bd12a1ea5ea0c2656 |
--- /dev/null |
+++ b/server/site_bsd_router.py |
@@ -0,0 +1,227 @@ |
+# Copyright (c) 2010 The Chromium OS Authors. All rights reserved. |
+# Use of this source code is governed by a BSD-style license that can be |
+# found in the LICENSE file. |
+ |
+import logging, re |
+ |
+class NotImplemented(Exception): |
+ def __init__(self, what): |
+ self.what = what |
+ def __str__(self): |
+ return repr("Test method '%s' not implemented" % self.what) |
+ |
+ |
+class BSDRouter(object): |
+ """ |
+ BSD-style WiFi Router support for WiFiTest class. |
+ |
+ This class implements test methods/steps that communicate with a |
+ router implemented with FreeBSD 8.0 and later. The router must |
+ be pre-configured to enable ssh access and have a net80211-based |
+ wireless device. We also assume hostapd is present for handling |
+ authenticator duties and any necessary modules are pre-loaded |
+ (e.g. wlan_ccmp, wlan_tkip, wlan_wep, wlan_xauth). |
+ """ |
+ |
+ |
+ def __init__(self, host, params, defssid): |
+ self.router = host |
+ # TODO(sleffler) default to 1st available wireless nic |
+ self.phydev = params['phydev'] |
+ # TODO(sleffler) default to 1st available wired nic |
+ self.wiredif = params['wiredev'] |
+ self.defssid = defssid; |
+ self.wlanif = None |
+ self.bridgeif = None |
+ |
+ self.hostapd_keys = ("wpa", "wpa_passphrase", "wpa_key_mgmt", |
+ "wpa_pairwise", "wpa_group_rekey", "wpa_strict_rekey", |
+ "wpa_gmk_rekey", "wpa_ptk_rekey", |
+ "rsn_pairwise", |
+ "rsn_preauth", "rsh_preauth_interfaces", |
+ "peerkey") |
+ self.hostapd_conf = None |
+ |
+ |
+ def create(self, params): |
+ """ Create a wifi device of the specified type """ |
+ |
+ phydev = params.get('phydev', self.phydev) |
+ result = self.router.run("ifconfig wlan create wlandev %s" \ |
+ " wlanmode %s" % (phydev, params['type'])) |
+ self.wlanif = result.stdout[:-1] |
+ |
+ # NB: can't create+addm together 'cuz of ifconfig bug |
+ result = self.router.run("ifconfig bridge create") |
+ self.bridgeif = result.stdout[:-1] |
+ result = self.router.run("ifconfig %s addm %s addm %s" % \ |
+ (self.bridgeif, self.wlanif, self.wiredif)) |
+ logging.info("Use '%s' for %s mode vap and '%s' for bridge" % \ |
+ (self.wlanif, params['type'], self.bridgeif)) |
+ |
+ |
+ def destroy(self, params): |
+ """ Destroy a previously created device """ |
+ |
+ if self.wlanif is not None: |
+ self.deconfig(params) |
+ self.router.run("ifconfig %s destroy" % self.wlanif, \ |
+ ignore_status=True) |
+ if self.bridgeif is not None: |
+ self.router.run("ifconfig %s destroy" % self.bridgeif, \ |
+ ignore_status=True) |
+ |
+ |
+ def __get_args(self, params): |
+ # |
+ # Convert test parameters to ifconfig arguments. These |
+ # mostly are passed through unchanged; the wep keys must |
+ # be mapped. |
+ # |
+ args = "" |
+ for (k, v) in params.items(): |
+ if v is None: |
+ args += " %s" % k |
+ elif k == "wep_key0": |
+ args += " wepkey 1:0x%s" % v |
+ elif k == "wep_key1": |
+ args += " wepkey 2:0x%s" % v |
+ elif k == "wep_key2": |
+ args += " wepkey 3:0x%s" % v |
+ elif k == "wep_key3": |
+ args += " wepkey 4:0x%s" % v |
+ elif k == "deftxkey": |
+ args += " deftxkey %s" % str(int(v)+1) |
+ else: |
+ args += " %s '%s'" % (k, v) |
+ return args |
+ |
+ |
+ def config(self, params): |
+ """ |
+ Configure the AP per test requirements. This can be done |
+ entirely with ifconfig unless we need an authenticator in |
+ which case we must also setup hostapd. |
+ """ |
+ |
+ if 'ssid' not in params: |
+ params['ssid'] = self.defssid |
+ |
+ args = "" |
+ hostapd_args = "" |
+ if "wpa" in params: |
+ # |
+ # WPA/RSN requires hostapd as an authenticator; split out |
+ # ifconfig args from hostapd configuration and setup to |
+ # construct the hostapd.conf file below. |
+ # |
+ for (k, v) in params.items(): |
+ if k in self.hostapd_keys: |
+ if v is None: |
+ hostapd_args += "%s\n" % k |
+ else: |
+ hostapd_args += "%s=%s\n" % (k, v) |
+ else: |
+ if v is None: |
+ args += " %s" % k |
+ else: # XXX wep_key? |
+ args += " %s %s" % (k, v) |
+ |
+ else: |
+ args += self.__get_args(params) |
+ |
+ # configure the interface and mark it up |
+ self.router.run("ifconfig %s %s up" % (self.wlanif, args)) |
+ |
+ if hostapd_args is not "": |
+ # |
+ # Construct the hostapd.conf file and start hostapd; |
+ # note this must come after the interface is configured |
+ # so hostapd can adopt information such as the ssid. |
+ # |
+ self.hostapd_conf = "/tmp/%s.conf" % self.wlanif |
+ self.router.run("cat<<'EOF' >%s\ninterface=%s\n%sEOF\n" % \ |
+ (self.hostapd_conf, self.wlanif, hostapd_args)) |
+ self.router.run("hostapd -B %s" % self.hostapd_conf) |
+ else: |
+ self.hostapd_conf = None |
+ |
+ # finally bring the bridge up |
+ self.router.run("ifconfig %s up" % self.bridgeif) |
+ |
+ |
+ def deconfig(self, params): |
+ """ De-configure the AP (typically marks wlanif down) """ |
+ |
+ self.router.run("ifconfig %s down" % self.wlanif) |
+ if self.hostapd_conf is not None: |
+ self.router.run("killall hostapd >/dev/null 2>&1") |
+ self.router.run("rm -f %s" % self.hostapd_conf) |
+ self.hostapd_conf = None |
+ |
+ |
+ def client_check_bintval(self, params): |
+ result = self.router.run("ifconfig %s" % self.wlanif) |
+ want = params[0] |
+ m = re.search('bintval ([0-9]*)', result.stdout) |
+ if m is None: |
+ raise NameError |
+ if m.group(1) != want: |
+ logging.error("client_check_bintval: wanted %s got %s" % \ |
+ (want, m.group(1))) |
+ raise AssertionError |
+ |
+ |
+ def client_check_dtimperiod(self, params): |
+ result = self.router.run("ifconfig %s" % self.wlanif) |
+ want = params[0] |
+ m = re.search('dtimperiod ([0-9]*)', result.stdout) |
+ if m is None: |
+ raise NameError |
+ if m.group(1) != want: |
+ logging.error("client_check_dtimperiod: wanted %s got %s" % \ |
+ (want, m.group(1))) |
+ raise AssertionError |
+ |
+ |
+ def client_check_rifs(self, params): |
+ result = self.router.run("ifconfig %s" % self.wlanif) |
+ m = re.search('[^-]rifs', result.stdout) |
+ if m is None: |
+ raise AssertionError |
+ |
+ |
+ def client_check_shortgi(self, params): |
+ result = self.router.run("ifconfig %s" % self.wlanif) |
+ m = re.search('[^-]shortgi', result.stdout) |
+ if m is None: |
+ raise AssertionError |
+ |
+ |
+ def monitor_start(self, params): |
+ """ Start monitoring system events """ |
+ raise NotImplemented("monitor_start") |
+ |
+ |
+ def monitor_stop(self, params): |
+ """ Stop monitoring system events """ |
+ raise NotImplemented("monitor_stop") |
+ |
+ |
+ def check_client_event_mic(self, params): |
+ """ Check for MIC error event """ |
+ raise NotImplemented("check_client_event_mic") |
+ |
+ |
+ def check_client_event_countermeasures(self, params): |
+ """ Check for WPA CounterMeasures event """ |
+ raise NotImplemented("check_client_event_countermeasures") |
+ |
+ |
+ def force_mic_error(self, params): |
+ """ |
+ Force a Michael MIC error on the next packet. Note this requires |
+ a driver that uses software crypto and a kernel with the support |
+ to fire oneshot MIC errors (first appeared in FreeBSD 8.1). |
+ """ |
+ raise NotImplemented("force_mic_error") |