OLD | NEW |
(Empty) | |
| 1 from ctypes import sizeof, windll, addressof, c_wchar, create_unicode_buffer |
| 2 from ctypes.wintypes import DWORD, HANDLE |
| 3 |
| 4 PROCESS_TERMINATE = 0x0001 |
| 5 PROCESS_QUERY_INFORMATION = 0x0400 |
| 6 PROCESS_VM_READ = 0x0010 |
| 7 |
| 8 def get_pids(process_name): |
| 9 BIG_ARRAY = DWORD * 4096 |
| 10 processes = BIG_ARRAY() |
| 11 needed = DWORD() |
| 12 |
| 13 pids = [] |
| 14 result = windll.psapi.EnumProcesses(processes, |
| 15 sizeof(processes), |
| 16 addressof(needed)) |
| 17 if not result: |
| 18 return pids |
| 19 |
| 20 num_results = needed.value / sizeof(DWORD) |
| 21 |
| 22 for i in range(num_results): |
| 23 pid = processes[i] |
| 24 process = windll.kernel32.OpenProcess(PROCESS_QUERY_INFORMATION | |
| 25 PROCESS_VM_READ, |
| 26 0, pid) |
| 27 if process: |
| 28 module = HANDLE() |
| 29 result = windll.psapi.EnumProcessModules(process, |
| 30 addressof(module), |
| 31 sizeof(module), |
| 32 addressof(needed)) |
| 33 if result: |
| 34 name = create_unicode_buffer(1024) |
| 35 result = windll.psapi.GetModuleBaseNameW(process, module, |
| 36 name, len(name)) |
| 37 # TODO: This might not be the best way to |
| 38 # match a process name; maybe use a regexp instead. |
| 39 if name.value.startswith(process_name): |
| 40 pids.append(pid) |
| 41 windll.kernel32.CloseHandle(module) |
| 42 windll.kernel32.CloseHandle(process) |
| 43 |
| 44 return pids |
| 45 |
| 46 def kill_pid(pid): |
| 47 process = windll.kernel32.OpenProcess(PROCESS_TERMINATE, 0, pid) |
| 48 if process: |
| 49 windll.kernel32.TerminateProcess(process, 0) |
| 50 windll.kernel32.CloseHandle(process) |
| 51 |
| 52 def kill_process_by_name(name): |
| 53 pids = get_pids(name) |
| 54 for pid in pids: |
| 55 kill_pid(pid) |
| 56 |
| 57 if __name__ == '__main__': |
| 58 import subprocess |
| 59 import time |
| 60 |
| 61 # This test just opens a new notepad instance and kills it. |
| 62 |
| 63 name = 'notepad' |
| 64 |
| 65 old_pids = set(get_pids(name)) |
| 66 subprocess.Popen([name]) |
| 67 time.sleep(0.25) |
| 68 new_pids = set(get_pids(name)).difference(old_pids) |
| 69 |
| 70 if len(new_pids) != 1: |
| 71 raise Exception('%s was not opened or get_pids() is ' |
| 72 'malfunctioning' % name) |
| 73 |
| 74 kill_pid(tuple(new_pids)[0]) |
| 75 |
| 76 newest_pids = set(get_pids(name)).difference(old_pids) |
| 77 |
| 78 if len(newest_pids) != 0: |
| 79 raise Exception('kill_pid() is malfunctioning') |
| 80 |
| 81 print "Test passed." |
OLD | NEW |