OLD | NEW |
1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
2 # Copyright 2014 The Chromium Authors. All rights reserved. | 2 # Copyright 2014 The Chromium Authors. All rights reserved. |
3 # Use of this source code is governed by a BSD-style license that can be | 3 # Use of this source code is governed by a BSD-style license that can be |
4 # found in the LICENSE file. | 4 # found in the LICENSE file. |
5 | 5 |
6 """Mutational ClusterFuzz fuzzer. A pre-built corpus of ipcdump files has | 6 """Mutational ClusterFuzz fuzzer. A pre-built corpus of ipcdump files has |
7 to be uploaded to ClusterFuzz along with this script. As chrome is being | 7 to be uploaded to ClusterFuzz along with this script. As chrome is being |
8 developed, the corpus will become out-of-date and needs to be updated. | 8 developed, the corpus will become out-of-date and needs to be updated. |
9 | 9 |
10 This fuzzer will pick some ipcdumps from the corpus, concatenate them with | 10 This fuzzer will pick some ipcdumps from the corpus, concatenate them with |
11 ipc_message_util and mutate the result with ipc_fuzzer_mutate. | 11 ipc_message_util and mutate the result with ipc_fuzzer_mutate. |
12 """ | 12 """ |
13 | 13 |
14 import argparse | |
15 import os | 14 import os |
16 import random | 15 import random |
17 import string | |
18 import subprocess | 16 import subprocess |
19 import sys | 17 import sys |
20 import tempfile | 18 import utils |
21 import time | |
22 | 19 |
23 # Number of ipcdumps to concatenate | 20 IPC_MESSAGE_UTIL_APPLICATION = 'ipc_message_util' |
24 NUM_IPCDUMPS = 50 | 21 IPC_MUTATE_APPLICATION = 'ipc_fuzzer_mutate' |
25 | 22 IPC_REPLAY_APPLICATION = 'ipc_fuzzer_replay' |
26 def platform(): | 23 IPCDUMP_MERGE_LIMIT = 50 |
27 if sys.platform.startswith('win'): | |
28 return 'WINDOWS' | |
29 if sys.platform.startswith('linux'): | |
30 return 'LINUX' | |
31 if sys.platform == 'darwin': | |
32 return 'MAC' | |
33 | |
34 assert False, 'Unknown platform' | |
35 | |
36 def create_temp_file(): | |
37 temp_file = tempfile.NamedTemporaryFile(delete=False) | |
38 temp_file.close() | |
39 return temp_file.name | |
40 | |
41 def random_id(size=16, chars=string.ascii_lowercase): | |
42 return ''.join(random.choice(chars) for x in range(size)) | |
43 | |
44 def random_ipcdump_path(ipcdump_dir): | |
45 return os.path.join(ipcdump_dir, 'fuzz-' + random_id() + '.ipcdump') | |
46 | 24 |
47 class MutationalFuzzer: | 25 class MutationalFuzzer: |
48 def parse_cf_args(self): | 26 def parse_arguments(self): |
49 parser = argparse.ArgumentParser() | 27 self.args = utils.parse_arguments() |
50 parser.add_argument('--input_dir') | |
51 parser.add_argument('--output_dir') | |
52 parser.add_argument('--no_of_files', type=int) | |
53 self.args = args = parser.parse_args(); | |
54 if not args.input_dir or not args.output_dir or not args.no_of_files: | |
55 parser.print_help() | |
56 sys.exit(1) | |
57 | 28 |
58 def get_paths(self): | 29 def set_application_paths(self): |
59 app_path_key = 'APP_PATH' | 30 chrome_application_path = utils.get_application_path() |
60 self.mutate_binary = 'ipc_fuzzer_mutate' | 31 chrome_application_directory = os.path.dirname(chrome_application_path) |
61 self.util_binary = 'ipc_message_util' | |
62 if platform() == 'WINDOWS': | |
63 self.mutate_binary += '.exe' | |
64 self.util_binary += '.exe' | |
65 | 32 |
66 if app_path_key not in os.environ: | 33 self.ipc_message_util_binary = utils.application_name_for_platform( |
67 sys.exit('Env var %s should be set to chrome path' % app_path_key) | 34 IPC_MESSAGE_UTIL_APPLICATION) |
68 chrome_path = os.environ[app_path_key] | 35 self.ipc_mutate_binary = utils.application_name_for_platform( |
69 out_dir = os.path.dirname(chrome_path) | 36 IPC_MUTATE_APPLICATION) |
70 self.util_path = os.path.join(out_dir, self.util_binary) | 37 self.ipc_replay_binary = utils.application_name_for_platform( |
71 self.mutate_path = os.path.join(out_dir, self.mutate_binary) | 38 IPC_REPLAY_APPLICATION) |
| 39 self.ipc_message_util_binary_path = os.path.join( |
| 40 chrome_application_directory, self.ipc_message_util_binary) |
| 41 self.ipc_mutate_binary_path = os.path.join( |
| 42 chrome_application_directory, self.ipc_mutate_binary) |
| 43 self.ipc_replay_binary_path = os.path.join( |
| 44 chrome_application_directory, self.ipc_replay_binary) |
72 | 45 |
73 def list_corpus(self): | 46 def set_corpus(self): |
74 input_dir = self.args.input_dir | 47 input_directory = self.args.input_dir |
75 entries = os.listdir(input_dir) | 48 entries = os.listdir(input_directory) |
76 entries = [i for i in entries if i.endswith('.ipcdump')] | 49 entries = [i for i in entries if i.endswith(utils.IPCDUMP_EXTENSION)] |
77 self.corpus = [os.path.join(input_dir, entry) for entry in entries] | 50 self.corpus = [os.path.join(input_directory, entry) for entry in entries] |
78 | 51 |
79 def create_mutated_ipcdump(self): | 52 def create_mutated_ipcdump_testcase(self): |
80 ipcdumps = ','.join(random.sample(self.corpus, NUM_IPCDUMPS)) | 53 ipcdumps = ','.join(random.sample(self.corpus, IPCDUMP_MERGE_LIMIT)) |
81 tmp_ipcdump = create_temp_file() | 54 tmp_ipcdump_testcase = utils.create_temp_file() |
82 mutated_ipcdump = random_ipcdump_path(self.args.output_dir) | 55 mutated_ipcdump_testcase = ( |
| 56 utils.random_ipcdump_testcase_path(self.args.output_dir)) |
83 | 57 |
84 # concatenate ipcdumps -> tmp_ipcdump | 58 # Concatenate ipcdumps -> tmp_ipcdump. |
85 cmd = [self.util_path, ipcdumps, tmp_ipcdump] | 59 cmd = [ |
| 60 self.ipc_message_util_binary_path, |
| 61 ipcdumps, |
| 62 tmp_ipcdump_testcase, |
| 63 ] |
86 if subprocess.call(cmd): | 64 if subprocess.call(cmd): |
87 sys.exit('%s failed' % self.util_binary) | 65 sys.exit('%s failed.' % self.ipc_message_util_binary) |
88 | 66 |
89 # mutate tmp_ipcdump -> mutated_ipcdump | 67 # Mutate tmp_ipcdump -> mutated_ipcdump. |
90 cmd = [self.mutate_path, tmp_ipcdump, mutated_ipcdump] | 68 cmd = [ |
| 69 self.ipc_mutate_binary_path, |
| 70 tmp_ipcdump_testcase, |
| 71 mutated_ipcdump_testcase, |
| 72 ] |
91 if subprocess.call(cmd): | 73 if subprocess.call(cmd): |
92 sys.exit('%s failed' % self.mutate_binary) | 74 sys.exit('%s failed.' % self.ipc_mutate_binary) |
93 os.remove(tmp_ipcdump) | 75 |
| 76 utils.create_flags_file( |
| 77 mutated_ipcdump_testcase, self.ipc_replay_binary_path) |
| 78 os.remove(tmp_ipcdump_testcase) |
94 | 79 |
95 def main(self): | 80 def main(self): |
96 self.parse_cf_args() | 81 self.parse_arguments() |
97 self.get_paths() | 82 self.set_application_paths() |
98 self.list_corpus() | 83 self.set_corpus() |
99 for i in xrange(self.args.no_of_files): | 84 for _ in xrange(self.args.no_of_files): |
100 self.create_mutated_ipcdump() | 85 self.create_mutated_ipcdump_testcase() |
| 86 |
101 return 0 | 87 return 0 |
102 | 88 |
103 if __name__ == "__main__": | 89 if __name__ == "__main__": |
104 fuzzer = MutationalFuzzer() | 90 fuzzer = MutationalFuzzer() |
105 sys.exit(fuzzer.main()) | 91 sys.exit(fuzzer.main()) |
OLD | NEW |