OLD | NEW |
(Empty) | |
| 1 # Copyright (c) 2010 The Chromium OS Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. |
| 4 |
| 5 import logging, re |
| 6 |
| 7 class NotImplemented(Exception): |
| 8 def __init__(self, what): |
| 9 self.what = what |
| 10 def __str__(self): |
| 11 return repr("Test method '%s' not implemented" % self.what) |
| 12 |
| 13 |
| 14 class BSDRouter(object): |
| 15 """ |
| 16 BSD-style WiFi Router support for WiFiTest class. |
| 17 |
| 18 This class implements test methods/steps that communicate with a |
| 19 router implemented with FreeBSD 8.0 and later. The router must |
| 20 be pre-configured to enable ssh access and have a net80211-based |
| 21 wireless device. We also assume hostapd is present for handling |
| 22 authenticator duties and any necessary modules are pre-loaded |
| 23 (e.g. wlan_ccmp, wlan_tkip, wlan_wep, wlan_xauth). |
| 24 """ |
| 25 |
| 26 |
| 27 def __init__(self, host, params, defssid): |
| 28 self.router = host |
| 29 # TODO(sleffler) default to 1st available wireless nic |
| 30 self.phydev = params['phydev'] |
| 31 # TODO(sleffler) default to 1st available wired nic |
| 32 self.wiredif = params['wiredev'] |
| 33 self.defssid = defssid; |
| 34 self.wlanif = None |
| 35 self.bridgeif = None |
| 36 |
| 37 self.hostapd_keys = ("wpa", "wpa_passphrase", "wpa_key_mgmt", |
| 38 "wpa_pairwise", "wpa_group_rekey", "wpa_strict_rekey", |
| 39 "wpa_gmk_rekey", "wpa_ptk_rekey", |
| 40 "rsn_pairwise", |
| 41 "rsn_preauth", "rsh_preauth_interfaces", |
| 42 "peerkey") |
| 43 self.hostapd_conf = None |
| 44 |
| 45 |
| 46 def create(self, params): |
| 47 """ Create a wifi device of the specified type """ |
| 48 |
| 49 phydev = params.get('phydev', self.phydev) |
| 50 result = self.router.run("ifconfig wlan create wlandev %s" \ |
| 51 " wlanmode %s" % (phydev, params['type'])) |
| 52 self.wlanif = result.stdout[:-1] |
| 53 |
| 54 # NB: can't create+addm together 'cuz of ifconfig bug |
| 55 result = self.router.run("ifconfig bridge create") |
| 56 self.bridgeif = result.stdout[:-1] |
| 57 result = self.router.run("ifconfig %s addm %s addm %s" % \ |
| 58 (self.bridgeif, self.wlanif, self.wiredif)) |
| 59 logging.info("Use '%s' for %s mode vap and '%s' for bridge" % \ |
| 60 (self.wlanif, params['type'], self.bridgeif)) |
| 61 |
| 62 |
| 63 def destroy(self, params): |
| 64 """ Destroy a previously created device """ |
| 65 |
| 66 if self.wlanif is not None: |
| 67 self.deconfig(params) |
| 68 self.router.run("ifconfig %s destroy" % self.wlanif, \ |
| 69 ignore_status=True) |
| 70 if self.bridgeif is not None: |
| 71 self.router.run("ifconfig %s destroy" % self.bridgeif, \ |
| 72 ignore_status=True) |
| 73 |
| 74 |
| 75 def __get_args(self, params): |
| 76 # |
| 77 # Convert test parameters to ifconfig arguments. These |
| 78 # mostly are passed through unchanged; the wep keys must |
| 79 # be mapped. |
| 80 # |
| 81 args = "" |
| 82 for (k, v) in params.items(): |
| 83 if v is None: |
| 84 args += " %s" % k |
| 85 elif k == "wep_key0": |
| 86 args += " wepkey 1:0x%s" % v |
| 87 elif k == "wep_key1": |
| 88 args += " wepkey 2:0x%s" % v |
| 89 elif k == "wep_key2": |
| 90 args += " wepkey 3:0x%s" % v |
| 91 elif k == "wep_key3": |
| 92 args += " wepkey 4:0x%s" % v |
| 93 elif k == "deftxkey": |
| 94 args += " deftxkey %s" % str(int(v)+1) |
| 95 else: |
| 96 args += " %s '%s'" % (k, v) |
| 97 return args |
| 98 |
| 99 |
| 100 def config(self, params): |
| 101 """ |
| 102 Configure the AP per test requirements. This can be done |
| 103 entirely with ifconfig unless we need an authenticator in |
| 104 which case we must also setup hostapd. |
| 105 """ |
| 106 |
| 107 if 'ssid' not in params: |
| 108 params['ssid'] = self.defssid |
| 109 |
| 110 args = "" |
| 111 hostapd_args = "" |
| 112 if "wpa" in params: |
| 113 # |
| 114 # WPA/RSN requires hostapd as an authenticator; split out |
| 115 # ifconfig args from hostapd configuration and setup to |
| 116 # construct the hostapd.conf file below. |
| 117 # |
| 118 for (k, v) in params.items(): |
| 119 if k in self.hostapd_keys: |
| 120 if v is None: |
| 121 hostapd_args += "%s\n" % k |
| 122 else: |
| 123 hostapd_args += "%s=%s\n" % (k, v) |
| 124 else: |
| 125 if v is None: |
| 126 args += " %s" % k |
| 127 else: # XXX wep_key? |
| 128 args += " %s %s" % (k, v) |
| 129 |
| 130 else: |
| 131 args += self.__get_args(params) |
| 132 |
| 133 # configure the interface and mark it up |
| 134 self.router.run("ifconfig %s %s up" % (self.wlanif, args)) |
| 135 |
| 136 if hostapd_args is not "": |
| 137 # |
| 138 # Construct the hostapd.conf file and start hostapd; |
| 139 # note this must come after the interface is configured |
| 140 # so hostapd can adopt information such as the ssid. |
| 141 # |
| 142 self.hostapd_conf = "/tmp/%s.conf" % self.wlanif |
| 143 self.router.run("cat<<'EOF' >%s\ninterface=%s\n%sEOF\n" % \ |
| 144 (self.hostapd_conf, self.wlanif, hostapd_args)) |
| 145 self.router.run("hostapd -B %s" % self.hostapd_conf) |
| 146 else: |
| 147 self.hostapd_conf = None |
| 148 |
| 149 # finally bring the bridge up |
| 150 self.router.run("ifconfig %s up" % self.bridgeif) |
| 151 |
| 152 |
| 153 def deconfig(self, params): |
| 154 """ De-configure the AP (typically marks wlanif down) """ |
| 155 |
| 156 self.router.run("ifconfig %s down" % self.wlanif) |
| 157 if self.hostapd_conf is not None: |
| 158 self.router.run("killall hostapd >/dev/null 2>&1") |
| 159 self.router.run("rm -f %s" % self.hostapd_conf) |
| 160 self.hostapd_conf = None |
| 161 |
| 162 |
| 163 def client_check_bintval(self, params): |
| 164 result = self.router.run("ifconfig %s" % self.wlanif) |
| 165 want = params[0] |
| 166 m = re.search('bintval ([0-9]*)', result.stdout) |
| 167 if m is None: |
| 168 raise NameError |
| 169 if m.group(1) != want: |
| 170 logging.error("client_check_bintval: wanted %s got %s" % \ |
| 171 (want, m.group(1))) |
| 172 raise AssertionError |
| 173 |
| 174 |
| 175 def client_check_dtimperiod(self, params): |
| 176 result = self.router.run("ifconfig %s" % self.wlanif) |
| 177 want = params[0] |
| 178 m = re.search('dtimperiod ([0-9]*)', result.stdout) |
| 179 if m is None: |
| 180 raise NameError |
| 181 if m.group(1) != want: |
| 182 logging.error("client_check_dtimperiod: wanted %s got %s" % \ |
| 183 (want, m.group(1))) |
| 184 raise AssertionError |
| 185 |
| 186 |
| 187 def client_check_rifs(self, params): |
| 188 result = self.router.run("ifconfig %s" % self.wlanif) |
| 189 m = re.search('[^-]rifs', result.stdout) |
| 190 if m is None: |
| 191 raise AssertionError |
| 192 |
| 193 |
| 194 def client_check_shortgi(self, params): |
| 195 result = self.router.run("ifconfig %s" % self.wlanif) |
| 196 m = re.search('[^-]shortgi', result.stdout) |
| 197 if m is None: |
| 198 raise AssertionError |
| 199 |
| 200 |
| 201 def monitor_start(self, params): |
| 202 """ Start monitoring system events """ |
| 203 raise NotImplemented("monitor_start") |
| 204 |
| 205 |
| 206 def monitor_stop(self, params): |
| 207 """ Stop monitoring system events """ |
| 208 raise NotImplemented("monitor_stop") |
| 209 |
| 210 |
| 211 def check_client_event_mic(self, params): |
| 212 """ Check for MIC error event """ |
| 213 raise NotImplemented("check_client_event_mic") |
| 214 |
| 215 |
| 216 def check_client_event_countermeasures(self, params): |
| 217 """ Check for WPA CounterMeasures event """ |
| 218 raise NotImplemented("check_client_event_countermeasures") |
| 219 |
| 220 |
| 221 def force_mic_error(self, params): |
| 222 """ |
| 223 Force a Michael MIC error on the next packet. Note this requires |
| 224 a driver that uses software crypto and a kernel with the support |
| 225 to fire oneshot MIC errors (first appeared in FreeBSD 8.1). |
| 226 """ |
| 227 raise NotImplemented("force_mic_error") |
OLD | NEW |