| OLD | NEW |
| (Empty) |
| 1 # Copyright 2014 Google Inc. All Rights Reserved. | |
| 2 # | |
| 3 # Licensed under the Apache License, Version 2.0 (the "License"); | |
| 4 # you may not use this file except in compliance with the License. | |
| 5 # You may obtain a copy of the License at | |
| 6 # | |
| 7 # http://www.apache.org/licenses/LICENSE-2.0 | |
| 8 # | |
| 9 # Unless required by applicable law or agreed to in writing, software | |
| 10 # distributed under the License is distributed on an "AS IS" BASIS, | |
| 11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| 12 # See the License for the specific language governing permissions and | |
| 13 # limitations under the License. | |
| 14 | |
| 15 """Installs certificate on phone with KitKat.""" | |
| 16 | |
| 17 import argparse | |
| 18 import logging | |
| 19 import os | |
| 20 import subprocess | |
| 21 import sys | |
| 22 | |
| 23 KEYCODE_ENTER = '66' | |
| 24 KEYCODE_TAB = '61' | |
| 25 | |
| 26 | |
| 27 class CertInstallError(Exception): | |
| 28 pass | |
| 29 | |
| 30 | |
| 31 class CertRemovalError(Exception): | |
| 32 pass | |
| 33 | |
| 34 | |
| 35 class AdbShellError(subprocess.CalledProcessError): | |
| 36 pass | |
| 37 | |
| 38 | |
| 39 _ANDROID_M_BUILD_VERSION = 23 | |
| 40 | |
| 41 | |
| 42 class AndroidCertInstaller(object): | |
| 43 """Certificate installer for phones with KitKat.""" | |
| 44 | |
| 45 def __init__(self, device_id, cert_name, cert_path, adb_path=None): | |
| 46 if not os.path.exists(cert_path): | |
| 47 raise ValueError('Not a valid certificate path') | |
| 48 self.adb_path = adb_path or 'adb' | |
| 49 self.android_cacerts_path = None | |
| 50 self.cert_name = cert_name | |
| 51 self.cert_path = cert_path | |
| 52 self.device_id = device_id | |
| 53 self.file_name = os.path.basename(self.cert_path) | |
| 54 self.reformatted_cert_fname = None | |
| 55 self.reformatted_cert_path = None | |
| 56 | |
| 57 @staticmethod | |
| 58 def _run_cmd(cmd, dirname=None): | |
| 59 return subprocess.check_output(cmd, cwd=dirname) | |
| 60 | |
| 61 def _get_adb_cmd(self, *args): | |
| 62 cmd = [self.adb_path] | |
| 63 if self.device_id: | |
| 64 cmd.extend(['-s', self.device_id]) | |
| 65 cmd.extend(args) | |
| 66 return cmd | |
| 67 | |
| 68 def _adb(self, *args): | |
| 69 """Runs the adb command.""" | |
| 70 return self._run_cmd(self._get_adb_cmd(*args)) | |
| 71 | |
| 72 def _adb_shell(self, *args): | |
| 73 """Runs the adb shell command.""" | |
| 74 # We are not using self._adb() because adb shell return 0 even if the | |
| 75 # command has failed. This method is taking care of checking the actual | |
| 76 # return code of the command line ran on the device. | |
| 77 RETURN_CODE_PREFIX = '%%%s%% ' % __file__ | |
| 78 adb_cmd = self._get_adb_cmd('shell', '(%s); echo %s$?' % ( | |
| 79 subprocess.list2cmdline(args), RETURN_CODE_PREFIX)) | |
| 80 process = subprocess.Popen(adb_cmd, stdout=subprocess.PIPE) | |
| 81 adb_stdout, _ = process.communicate() | |
| 82 if process.returncode != 0: | |
| 83 raise subprocess.CalledProcessError( | |
| 84 cmd=adb_cmd, returncode=process.returncode, output=adb_stdout) | |
| 85 assert adb_stdout[-1] == '\n' | |
| 86 prefix_pos = adb_stdout.rfind(RETURN_CODE_PREFIX) | |
| 87 assert prefix_pos != -1, \ | |
| 88 'Couldn\'t find "%s" at the end of the output of %s' % ( | |
| 89 RETURN_CODE_PREFIX, subprocess.list2cmdline(adb_cmd)) | |
| 90 returncode = int(adb_stdout[prefix_pos + len(RETURN_CODE_PREFIX):]) | |
| 91 stdout = adb_stdout[:prefix_pos] | |
| 92 if returncode != 0: | |
| 93 raise AdbShellError(cmd=args, returncode=returncode, output=stdout) | |
| 94 return stdout | |
| 95 | |
| 96 def _adb_su_shell(self, *args): | |
| 97 """Runs command as root.""" | |
| 98 build_version_sdk = int(self._get_property('ro.build.version.sdk')) | |
| 99 if build_version_sdk >= _ANDROID_M_BUILD_VERSION: | |
| 100 cmd = ['su', '0'] | |
| 101 else: | |
| 102 cmd = ['su', '-c'] | |
| 103 cmd.extend(args) | |
| 104 return self._adb_shell(*cmd) | |
| 105 | |
| 106 def _get_property(self, prop): | |
| 107 return self._adb_shell('getprop', prop).strip() | |
| 108 | |
| 109 def check_device(self): | |
| 110 install_warning = False | |
| 111 if self._get_property('ro.product.device') != 'hammerhead': | |
| 112 logging.warning('Device is not hammerhead') | |
| 113 install_warning = True | |
| 114 if self._get_property('ro.build.version.release') != '4.4.2': | |
| 115 logging.warning('Version is not 4.4.2') | |
| 116 install_warning = True | |
| 117 if install_warning: | |
| 118 logging.warning('Certificate may not install properly') | |
| 119 | |
| 120 def _input_key(self, key): | |
| 121 """Inputs a keyevent.""" | |
| 122 self._adb_shell('input', 'keyevent', key) | |
| 123 | |
| 124 def _input_text(self, text): | |
| 125 """Inputs text.""" | |
| 126 self._adb_shell('input', 'text', text) | |
| 127 | |
| 128 @staticmethod | |
| 129 def _remove(file_name): | |
| 130 """Deletes file.""" | |
| 131 if os.path.exists(file_name): | |
| 132 os.remove(file_name) | |
| 133 | |
| 134 def _format_hashed_cert(self): | |
| 135 """Makes a certificate file that follows the format of files in cacerts.""" | |
| 136 self._remove(self.reformatted_cert_path) | |
| 137 contents = self._run_cmd(['openssl', 'x509', '-inform', 'PEM', '-text', | |
| 138 '-in', self.cert_path]) | |
| 139 description, begin_cert, cert_body = contents.rpartition('-----BEGIN ' | |
| 140 'CERTIFICATE') | |
| 141 contents = ''.join([begin_cert, cert_body, description]) | |
| 142 with open(self.reformatted_cert_path, 'w') as cert_file: | |
| 143 cert_file.write(contents) | |
| 144 | |
| 145 def _remove_cert_from_cacerts(self): | |
| 146 self._adb_su_shell('mount', '-o', 'remount,rw', '/system') | |
| 147 self._adb_su_shell('rm', '-f', self.android_cacerts_path) | |
| 148 | |
| 149 def _is_cert_installed(self): | |
| 150 try: | |
| 151 return (self._adb_su_shell('ls', self.android_cacerts_path).strip() == | |
| 152 self.android_cacerts_path) | |
| 153 except AdbShellError: | |
| 154 return False | |
| 155 | |
| 156 def _generate_reformatted_cert_path(self): | |
| 157 # Determine OpenSSL version, string is of the form | |
| 158 # 'OpenSSL 0.9.8za 5 Jun 2014' . | |
| 159 openssl_version = self._run_cmd(['openssl', 'version']).split() | |
| 160 | |
| 161 if len(openssl_version) < 2: | |
| 162 raise ValueError('Unexpected OpenSSL version string: ', openssl_version) | |
| 163 | |
| 164 # subject_hash flag name changed as of OpenSSL version 1.0.0 . | |
| 165 is_old_openssl_version = openssl_version[1].startswith('0') | |
| 166 subject_hash_flag = ( | |
| 167 '-subject_hash' if is_old_openssl_version else '-subject_hash_old') | |
| 168 | |
| 169 output = self._run_cmd(['openssl', 'x509', '-inform', 'PEM', | |
| 170 subject_hash_flag, '-in', self.cert_path], | |
| 171 os.path.dirname(self.cert_path)) | |
| 172 self.reformatted_cert_fname = output.partition('\n')[0].strip() + '.0' | |
| 173 self.reformatted_cert_path = os.path.join(os.path.dirname(self.cert_path), | |
| 174 self.reformatted_cert_fname) | |
| 175 self.android_cacerts_path = ('/system/etc/security/cacerts/%s' % | |
| 176 self.reformatted_cert_fname) | |
| 177 | |
| 178 def remove_cert(self): | |
| 179 self._generate_reformatted_cert_path() | |
| 180 | |
| 181 if self._is_cert_installed(): | |
| 182 self._remove_cert_from_cacerts() | |
| 183 | |
| 184 if self._is_cert_installed(): | |
| 185 raise CertRemovalError('Cert Removal Failed') | |
| 186 | |
| 187 def install_cert(self, overwrite_cert=False): | |
| 188 """Installs a certificate putting it in /system/etc/security/cacerts.""" | |
| 189 self._generate_reformatted_cert_path() | |
| 190 | |
| 191 if self._is_cert_installed(): | |
| 192 if overwrite_cert: | |
| 193 self._remove_cert_from_cacerts() | |
| 194 else: | |
| 195 logging.info('cert is already installed') | |
| 196 return | |
| 197 | |
| 198 self._format_hashed_cert() | |
| 199 self._adb('push', self.reformatted_cert_path, '/sdcard/') | |
| 200 self._remove(self.reformatted_cert_path) | |
| 201 self._adb_su_shell('mount', '-o', 'remount,rw', '/system') | |
| 202 self._adb_su_shell( | |
| 203 'cp', '/sdcard/%s' % self.reformatted_cert_fname, | |
| 204 '/system/etc/security/cacerts/%s' % self.reformatted_cert_fname) | |
| 205 self._adb_su_shell('chmod', '644', self.android_cacerts_path) | |
| 206 if not self._is_cert_installed(): | |
| 207 raise CertInstallError('Cert Install Failed') | |
| 208 | |
| 209 def install_cert_using_gui(self): | |
| 210 """Installs certificate on the device using adb commands.""" | |
| 211 self.check_device() | |
| 212 # TODO(mruthven): Add a check to see if the certificate is already installed | |
| 213 # Install the certificate. | |
| 214 logging.info('Installing %s on %s', self.cert_path, self.device_id) | |
| 215 self._adb('push', self.cert_path, '/sdcard/') | |
| 216 | |
| 217 # Start credential install intent. | |
| 218 self._adb_shell('am', 'start', '-W', '-a', 'android.credentials.INSTALL') | |
| 219 | |
| 220 # Move to and click search button. | |
| 221 self._input_key(KEYCODE_TAB) | |
| 222 self._input_key(KEYCODE_TAB) | |
| 223 self._input_key(KEYCODE_ENTER) | |
| 224 | |
| 225 # Search for certificate and click it. | |
| 226 # Search only works with lower case letters | |
| 227 self._input_text(self.file_name.lower()) | |
| 228 self._input_key(KEYCODE_ENTER) | |
| 229 | |
| 230 # These coordinates work for hammerhead devices. | |
| 231 self._adb_shell('input', 'tap', '300', '300') | |
| 232 | |
| 233 # Name the certificate and click enter. | |
| 234 self._input_text(self.cert_name) | |
| 235 self._input_key(KEYCODE_TAB) | |
| 236 self._input_key(KEYCODE_TAB) | |
| 237 self._input_key(KEYCODE_TAB) | |
| 238 self._input_key(KEYCODE_ENTER) | |
| 239 | |
| 240 # Remove the file. | |
| 241 self._adb_shell('rm', '/sdcard/' + self.file_name) | |
| 242 | |
| 243 | |
| 244 def parse_args(): | |
| 245 """Parses command line arguments.""" | |
| 246 parser = argparse.ArgumentParser(description='Install cert on device.') | |
| 247 parser.add_argument( | |
| 248 '-n', '--cert-name', default='dummycert', help='certificate name') | |
| 249 parser.add_argument( | |
| 250 '--overwrite', default=False, action='store_true', | |
| 251 help='Overwrite certificate file if it is already installed') | |
| 252 parser.add_argument( | |
| 253 '--remove', default=False, action='store_true', | |
| 254 help='Remove certificate file if it is installed') | |
| 255 parser.add_argument( | |
| 256 '--device-id', help='device serial number') | |
| 257 parser.add_argument( | |
| 258 '--adb-path', help='adb binary path') | |
| 259 parser.add_argument( | |
| 260 'cert_path', help='Certificate file path') | |
| 261 return parser.parse_args() | |
| 262 | |
| 263 | |
| 264 def main(): | |
| 265 args = parse_args() | |
| 266 cert_installer = AndroidCertInstaller(args.device_id, args.cert_name, | |
| 267 args.cert_path, adb_path=args.adb_path) | |
| 268 if args.remove: | |
| 269 cert_installer.remove_cert() | |
| 270 else: | |
| 271 cert_installer.install_cert(args.overwrite) | |
| 272 | |
| 273 | |
| 274 if __name__ == '__main__': | |
| 275 sys.exit(main()) | |
| OLD | NEW |