OLD | NEW |
| (Empty) |
1 #!/usr/bin/env python | |
2 # Copyright 2013 The Chromium Authors. All rights reserved. | |
3 # Use of this source code is governed by a BSD-style license that can be | |
4 # found in the LICENSE file. | |
5 | |
6 """Uploads a ton of stuff to isolateserver to test its handling. | |
7 | |
8 Generates an histogram with the latencies to download a just uploaded file. | |
9 | |
10 Note that it only looks at uploading and downloading and do not test | |
11 /content/contains, which is datastore read bound. | |
12 """ | |
13 | |
14 import functools | |
15 import hashlib | |
16 import json | |
17 import logging | |
18 import optparse | |
19 import os | |
20 import random | |
21 import sys | |
22 import time | |
23 import zlib | |
24 | |
25 ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
26 | |
27 sys.path.insert(0, ROOT_DIR) | |
28 | |
29 from third_party import colorama | |
30 | |
31 import isolateserver | |
32 | |
33 from utils import graph | |
34 from utils import threading_utils | |
35 | |
36 | |
37 class Randomness(object): | |
38 def __init__(self, random_pool_size=1024): | |
39 """Creates 1mb of random data in a pool in 1kb chunks.""" | |
40 self.pool = [ | |
41 ''.join(chr(random.randrange(256)) for _ in xrange(1024)) | |
42 for _ in xrange(random_pool_size) | |
43 ] | |
44 | |
45 def gen(self, size): | |
46 """Returns a str containing random data from the pool of size |size|.""" | |
47 chunks = int(size / 1024) | |
48 rest = size - (chunks*1024) | |
49 data = ''.join(random.choice(self.pool) for _ in xrange(chunks)) | |
50 data += random.choice(self.pool)[:rest] | |
51 return data | |
52 | |
53 | |
54 class Progress(threading_utils.Progress): | |
55 def _render_columns(self): | |
56 """Prints the size data as 'units'.""" | |
57 columns_as_str = [ | |
58 str(self._columns[0]), | |
59 graph.to_units(self._columns[1]).rjust(6), | |
60 str(self._columns[2]), | |
61 ] | |
62 max_len = max((len(columns_as_str[0]), len(columns_as_str[2]))) | |
63 return '/'.join(i.rjust(max_len) for i in columns_as_str) | |
64 | |
65 | |
66 def print_results(results, columns, buckets): | |
67 delays = [i[0] for i in results if isinstance(i[0], float)] | |
68 failures = [i for i in results if not isinstance(i[0], float)] | |
69 sizes = [i[1] for i in results] | |
70 | |
71 print('%sSIZES%s (bytes):' % (colorama.Fore.RED, colorama.Fore.RESET)) | |
72 graph.print_histogram( | |
73 graph.generate_histogram(sizes, buckets), columns, '%d') | |
74 print('') | |
75 total_size = sum(sizes) | |
76 print('Total size : %s' % graph.to_units(total_size)) | |
77 print('Total items : %d' % len(sizes)) | |
78 print('Average size: %s' % graph.to_units(total_size / len(sizes))) | |
79 print('Largest item: %s' % graph.to_units(max(sizes))) | |
80 print('') | |
81 print('%sDELAYS%s (seconds):' % (colorama.Fore.RED, colorama.Fore.RESET)) | |
82 graph.print_histogram( | |
83 graph.generate_histogram(delays, buckets), columns, '%.3f') | |
84 | |
85 if failures: | |
86 print('') | |
87 print('%sFAILURES%s:' % (colorama.Fore.RED, colorama.Fore.RESET)) | |
88 print( | |
89 '\n'.join(' %s (%s)' % (i[0], graph.to_units(i[1])) for i in failures)) | |
90 | |
91 | |
92 def gen_size(mid_size): | |
93 """Interesting non-guassian distribution, to get a few very large files. | |
94 | |
95 Found via guessing on Wikipedia. Module 'random' says it's threadsafe. | |
96 """ | |
97 return int(random.gammavariate(3, 2) * mid_size / 4) | |
98 | |
99 | |
100 def send_and_receive(random_pool, dry_run, zip_it, api, progress, size): | |
101 """Sends a random file and gets it back. | |
102 | |
103 Returns (delay, size) | |
104 """ | |
105 # Create a file out of the pool. | |
106 start = time.time() | |
107 content = random_pool.gen(size) | |
108 hash_value = hashlib.sha1(content).hexdigest() | |
109 pack = zlib.compress if zip_it else lambda x: x | |
110 unpack = zlib.decompress if zip_it else lambda x: x | |
111 try: | |
112 if not dry_run: | |
113 logging.info('contains') | |
114 item = isolateserver.Item(hash_value, len(content)) | |
115 item = api.contains([item])[0] | |
116 | |
117 logging.info('upload') | |
118 api.push(item, [pack(content)]) | |
119 | |
120 logging.info('download') | |
121 start = time.time() | |
122 assert content == unpack(''.join(api.fetch(hash_value))) | |
123 else: | |
124 time.sleep(size / 10.) | |
125 duration = max(0, time.time() - start) | |
126 except isolateserver.MappingError as e: | |
127 duration = str(e) | |
128 if isinstance(duration, float): | |
129 progress.update_item('', index=1, data=size) | |
130 else: | |
131 progress.update_item('', index=1) | |
132 return (duration, size) | |
133 | |
134 | |
135 def main(): | |
136 colorama.init() | |
137 | |
138 parser = optparse.OptionParser(description=sys.modules[__name__].__doc__) | |
139 parser.add_option( | |
140 '-I', '--isolate-server', | |
141 metavar='URL', default='', | |
142 help='Isolate server to use') | |
143 parser.add_option( | |
144 '--namespace', default='temporary%d-gzip' % time.time(), metavar='XX', | |
145 help='Namespace to use on the server, default: %default') | |
146 parser.add_option( | |
147 '--threads', type='int', default=16, metavar='N', | |
148 help='Parallel worker threads to use, default:%default') | |
149 graph.unit_option( | |
150 parser, '--items', default=0, help='Number of items to upload') | |
151 graph.unit_option( | |
152 parser, '--max-size', default=0, | |
153 help='Loop until this amount of data was transferred') | |
154 graph.unit_option( | |
155 parser, '--mid-size', default=100*1024, | |
156 help='Rough average size of each item, default:%default') | |
157 parser.add_option( | |
158 '--columns', type='int', default=graph.get_console_width(), metavar='N', | |
159 help='For histogram display, default:%default') | |
160 parser.add_option( | |
161 '--buckets', type='int', default=20, metavar='N', | |
162 help='Number of buckets for histogram display, default:%default') | |
163 parser.add_option( | |
164 '--dump', metavar='FOO.JSON', help='Dumps to json file') | |
165 parser.add_option( | |
166 '--dry-run', action='store_true', help='Do not send anything') | |
167 parser.add_option( | |
168 '-v', '--verbose', action='store_true', help='Enable logging') | |
169 options, args = parser.parse_args() | |
170 | |
171 logging.basicConfig(level=logging.INFO if options.verbose else logging.FATAL) | |
172 if args: | |
173 parser.error('Unsupported args: %s' % args) | |
174 if bool(options.max_size) == bool(options.items): | |
175 parser.error( | |
176 'Use one of --max-size or --items.\n' | |
177 ' Use --max-size if you want to run it until NN bytes where ' | |
178 'transfered.\n' | |
179 ' Otherwise use --items to run it for NN items.') | |
180 if not options.dry_run: | |
181 options.isolate_server = options.isolate_server.rstrip('/') | |
182 if not options.isolate_server: | |
183 parser.error('--isolate-server is required.') | |
184 | |
185 print( | |
186 ' - Using %d thread, items=%d, max-size=%d, mid-size=%d' % ( | |
187 options.threads, options.items, options.max_size, options.mid_size)) | |
188 if options.dry_run: | |
189 print(' - %sDRY RUN MODE%s' % (colorama.Fore.GREEN, colorama.Fore.RESET)) | |
190 | |
191 start = time.time() | |
192 | |
193 random_pool = Randomness() | |
194 print(' - Generated pool after %.1fs' % (time.time() - start)) | |
195 | |
196 columns = [('index', 0), ('data', 0), ('size', options.items)] | |
197 progress = Progress(columns) | |
198 api = isolateserver.get_storage_api(options.isolate_server, options.namespace) | |
199 do_item = functools.partial( | |
200 send_and_receive, | |
201 random_pool, | |
202 options.dry_run, | |
203 isolateserver.is_namespace_with_compression(options.namespace), | |
204 api, | |
205 progress) | |
206 | |
207 # TODO(maruel): Handle Ctrl-C should: | |
208 # - Stop adding tasks. | |
209 # - Stop scheduling tasks in ThreadPool. | |
210 # - Wait for the remaining ungoing tasks to complete. | |
211 # - Still print details and write the json file. | |
212 with threading_utils.ThreadPoolWithProgress( | |
213 progress, options.threads, options.threads, 0) as pool: | |
214 if options.items: | |
215 for _ in xrange(options.items): | |
216 pool.add_task(0, do_item, gen_size(options.mid_size)) | |
217 progress.print_update() | |
218 elif options.max_size: | |
219 # This one is approximate. | |
220 total = 0 | |
221 while True: | |
222 size = gen_size(options.mid_size) | |
223 progress.update_item('', size=1) | |
224 progress.print_update() | |
225 pool.add_task(0, do_item, size) | |
226 total += size | |
227 if total >= options.max_size: | |
228 break | |
229 results = sorted(pool.join()) | |
230 | |
231 print('') | |
232 print(' - Took %.1fs.' % (time.time() - start)) | |
233 print('') | |
234 print_results(results, options.columns, options.buckets) | |
235 if options.dump: | |
236 with open(options.dump, 'w') as f: | |
237 json.dump(results, f, separators=(',',':')) | |
238 return 0 | |
239 | |
240 | |
241 if __name__ == '__main__': | |
242 sys.exit(main()) | |
OLD | NEW |