OLD | NEW |
---|---|
1 #!/usr/bin/python | 1 #!/usr/bin/python |
2 | 2 |
3 # pylint: disable=C0301 | 3 # pylint: disable=C0301 |
4 """ | 4 """ |
5 Copyright 2014 Google Inc. | 5 Copyright 2014 Google Inc. |
6 | 6 |
7 Use of this source code is governed by a BSD-style license that can be | 7 Use of this source code is governed by a BSD-style license that can be |
8 found in the LICENSE file. | 8 found in the LICENSE file. |
9 | 9 |
10 Utilities for accessing Google Cloud Storage, using the boto library (wrapper | 10 Utilities for accessing Google Cloud Storage, using the boto library (wrapper |
11 for the XML API). | 11 for the XML API). |
12 | 12 |
13 API/library references: | 13 API/library references: |
14 - https://developers.google.com/storage/docs/reference-guide | 14 - https://developers.google.com/storage/docs/reference-guide |
15 - http://googlecloudstorage.blogspot.com/2012/09/google-cloud-storage-tutorial-u sing-boto.html | 15 - http://googlecloudstorage.blogspot.com/2012/09/google-cloud-storage-tutorial-u sing-boto.html |
16 """ | 16 """ |
17 # pylint: enable=C0301 | 17 # pylint: enable=C0301 |
18 | 18 |
19 # System-level imports | 19 # System-level imports |
20 import errno | 20 import errno |
21 import hashlib | 21 import hashlib |
22 import logging | |
22 import os | 23 import os |
23 import posixpath | 24 import posixpath |
24 import re | 25 import re |
25 import sys | 26 import sys |
27 from Queue import Queue | |
28 from threading import Thread | |
26 | 29 |
27 # Imports from third-party code | 30 # Imports from third-party code |
28 TRUNK_DIRECTORY = os.path.abspath(os.path.join( | 31 TRUNK_DIRECTORY = os.path.abspath(os.path.join( |
29 os.path.dirname(__file__), os.pardir, os.pardir)) | 32 os.path.dirname(__file__), os.pardir, os.pardir)) |
30 for import_subdir in ['boto']: | 33 for import_subdir in ['boto']: |
31 import_dirpath = os.path.join( | 34 import_dirpath = os.path.join( |
32 TRUNK_DIRECTORY, 'third_party', 'externals', import_subdir) | 35 TRUNK_DIRECTORY, 'third_party', 'externals', import_subdir) |
33 if import_dirpath not in sys.path: | 36 if import_dirpath not in sys.path: |
34 # We need to insert at the beginning of the path, to make sure that our | 37 # We need to insert at the beginning of the path, to make sure that our |
35 # imported versions are favored over others that might be in the path. | 38 # imported versions are favored over others that might be in the path. |
(...skipping 73 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
109 take longer than just uploading the file. | 112 take longer than just uploading the file. |
110 See http://skbug.com/2778 ('gs_utils: when uploading IF_NEW, batch up | 113 See http://skbug.com/2778 ('gs_utils: when uploading IF_NEW, batch up |
111 checks for existing files within a single remote directory') | 114 checks for existing files within a single remote directory') |
112 """ | 115 """ |
113 ALWAYS = 1 # always upload the file | 116 ALWAYS = 1 # always upload the file |
114 IF_NEW = 2 # if there is an existing file with the same name, | 117 IF_NEW = 2 # if there is an existing file with the same name, |
115 # leave it alone | 118 # leave it alone |
116 IF_MODIFIED = 3 # if there is an existing file with the same name and | 119 IF_MODIFIED = 3 # if there is an existing file with the same name and |
117 # contents, leave it alone | 120 # contents, leave it alone |
118 | 121 |
119 def __init__(self, boto_file_path=None): | 122 def __init__(self, boto_file_path=None, logger=None): |
120 """Constructor. | 123 """Constructor. |
121 | 124 |
122 Params: | 125 Params: |
123 boto_file_path: full path (local-OS-style) on local disk where .boto | 126 boto_file_path: full path (local-OS-style) on local disk where .boto |
124 credentials file can be found. If None, then the GSUtils object | 127 credentials file can be found. If None, then the GSUtils object |
125 created will be able to access only public files in Google Storage. | 128 created will be able to access only public files in Google Storage. |
129 logger: a logging.Logger instance to use for logging output; if None, | |
130 one will be created with default characteristics | |
126 | 131 |
127 Raises an exception if no file is found at boto_file_path, or if the file | 132 Raises an exception if no file is found at boto_file_path, or if the file |
128 found there is malformed. | 133 found there is malformed. |
129 """ | 134 """ |
135 self.logger = logger or logging.getLogger(__name__) | |
130 self._gs_access_key_id = None | 136 self._gs_access_key_id = None |
131 self._gs_secret_access_key = None | 137 self._gs_secret_access_key = None |
132 if boto_file_path: | 138 if boto_file_path: |
133 print 'Reading boto file from %s' % boto_file_path | 139 self.logger.info('Reading boto file from %s' % boto_file_path) |
134 boto_dict = _config_file_as_dict(filepath=boto_file_path) | 140 boto_dict = _config_file_as_dict(filepath=boto_file_path) |
135 self._gs_access_key_id = boto_dict['gs_access_key_id'] | 141 self._gs_access_key_id = boto_dict['gs_access_key_id'] |
136 self._gs_secret_access_key = boto_dict['gs_secret_access_key'] | 142 self._gs_secret_access_key = boto_dict['gs_secret_access_key'] |
137 # Which field we get/set in ACL entries, depending on IdType. | 143 # Which field we get/set in ACL entries, depending on IdType. |
138 self._field_by_id_type = { | 144 self._field_by_id_type = { |
139 self.IdType.GROUP_BY_DOMAIN: 'domain', | 145 self.IdType.GROUP_BY_DOMAIN: 'domain', |
140 self.IdType.GROUP_BY_EMAIL: 'email_address', | 146 self.IdType.GROUP_BY_EMAIL: 'email_address', |
141 self.IdType.GROUP_BY_ID: 'id', | 147 self.IdType.GROUP_BY_ID: 'id', |
142 self.IdType.USER_BY_EMAIL: 'email_address', | 148 self.IdType.USER_BY_EMAIL: 'email_address', |
143 self.IdType.USER_BY_ID: 'id', | 149 self.IdType.USER_BY_ID: 'id', |
(...skipping 64 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
208 so that HTTP downloads of the file would be unzipped automatically. | 214 so that HTTP downloads of the file would be unzipped automatically. |
209 See https://developers.google.com/storage/docs/gsutil/addlhelp/ | 215 See https://developers.google.com/storage/docs/gsutil/addlhelp/ |
210 WorkingWithObjectMetadata#content-encoding | 216 WorkingWithObjectMetadata#content-encoding |
211 """ | 217 """ |
212 b = self._connect_to_bucket(bucket=dest_bucket) | 218 b = self._connect_to_bucket(bucket=dest_bucket) |
213 local_md5 = None # filled in lazily | 219 local_md5 = None # filled in lazily |
214 | 220 |
215 if upload_if == self.UploadIf.IF_NEW: | 221 if upload_if == self.UploadIf.IF_NEW: |
216 old_key = b.get_key(key_name=dest_path) | 222 old_key = b.get_key(key_name=dest_path) |
217 if old_key: | 223 if old_key: |
218 print 'Skipping upload of existing file gs://%s/%s' % ( | 224 self.logger.info('Skipping upload of existing file gs://%s/%s' % ( |
219 b.name, dest_path) | 225 b.name, dest_path)) |
220 return | 226 return |
221 elif upload_if == self.UploadIf.IF_MODIFIED: | 227 elif upload_if == self.UploadIf.IF_MODIFIED: |
222 old_key = b.get_key(key_name=dest_path) | 228 old_key = b.get_key(key_name=dest_path) |
223 if old_key: | 229 if old_key: |
224 if not local_md5: | 230 if not local_md5: |
225 local_md5 = _get_local_md5(path=source_path) | 231 local_md5 = _get_local_md5(path=source_path) |
226 if ('"%s"' % local_md5) == old_key.etag: | 232 if ('"%s"' % local_md5) == old_key.etag: |
227 print 'Skipping upload of unmodified file gs://%s/%s : %s' % ( | 233 self.logger.info( |
228 b.name, dest_path, local_md5) | 234 'Skipping upload of unmodified file gs://%s/%s : %s' % ( |
235 b.name, dest_path, local_md5)) | |
229 return | 236 return |
230 elif upload_if != self.UploadIf.ALWAYS: | 237 elif upload_if != self.UploadIf.ALWAYS: |
231 raise Exception('unknown value of upload_if: %s' % upload_if) | 238 raise Exception('unknown value of upload_if: %s' % upload_if) |
232 | 239 |
233 # Upload the file using a temporary name at first, in case the transfer | 240 # Upload the file using a temporary name at first, in case the transfer |
234 # is interrupted partway through. | 241 # is interrupted partway through. |
235 if not local_md5: | 242 if not local_md5: |
236 local_md5 = _get_local_md5(path=source_path) | 243 local_md5 = _get_local_md5(path=source_path) |
237 initial_key = Key(b) | 244 initial_key = Key(b) |
238 initial_key.name = dest_path + '-uploading-' + local_md5 | 245 initial_key.name = dest_path + '-uploading-' + local_md5 |
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
273 | 280 |
274 # Set ACLs on the file. | 281 # Set ACLs on the file. |
275 # We do this *after* copy_key(), because copy_key's preserve_acl | 282 # We do this *after* copy_key(), because copy_key's preserve_acl |
276 # functionality would incur a performance hit. | 283 # functionality would incur a performance hit. |
277 for (id_type, id_value, permission) in fine_grained_acl_list or []: | 284 for (id_type, id_value, permission) in fine_grained_acl_list or []: |
278 self.set_acl( | 285 self.set_acl( |
279 bucket=b, path=final_key.name, | 286 bucket=b, path=final_key.name, |
280 id_type=id_type, id_value=id_value, permission=permission) | 287 id_type=id_type, id_value=id_value, permission=permission) |
281 | 288 |
282 def upload_dir_contents(self, source_dir, dest_bucket, dest_dir, | 289 def upload_dir_contents(self, source_dir, dest_bucket, dest_dir, |
283 upload_if=UploadIf.ALWAYS, **kwargs): | 290 num_threads=10, upload_if=UploadIf.ALWAYS, **kwargs): |
284 """Recursively upload contents of a local directory to Google Storage. | 291 """Recursively upload contents of a local directory to Google Storage. |
285 | 292 |
286 params: | 293 params: |
287 source_dir: full path (local-OS-style) on local disk of directory to copy | 294 source_dir: full path (local-OS-style) on local disk of directory to copy |
288 contents of | 295 contents of |
289 dest_bucket: GS bucket to copy the files into | 296 dest_bucket: GS bucket to copy the files into |
290 dest_dir: full path (Posix-style) within that bucket; write the files into | 297 dest_dir: full path (Posix-style) within that bucket; write the files into |
291 this directory. If None, write into the root directory of the bucket. | 298 this directory. If None, write into the root directory of the bucket. |
299 num_threads: how many files to upload at once | |
292 upload_if: one of the UploadIf values, describing in which cases we should | 300 upload_if: one of the UploadIf values, describing in which cases we should |
293 upload the file | 301 upload the file |
294 kwargs: any additional keyword arguments "inherited" from upload_file() | 302 kwargs: any additional keyword arguments "inherited" from upload_file() |
295 | 303 |
296 The copy operates as a merge: any files in source_dir will be "overlaid" on | 304 The copy operates as a merge: any files in source_dir will be "overlaid" on |
297 top of the existing content in dest_dir. Existing files with the same names | 305 top of the existing content in dest_dir. Existing files with the same names |
298 may or may not be overwritten, depending on the value of upload_if. | 306 may or may not be overwritten, depending on the value of upload_if. |
299 | 307 |
300 TODO(epoger): Upload multiple files simultaneously to reduce latency. | 308 TODO(epoger): Upload multiple files simultaneously to reduce latency. |
301 """ | 309 """ |
302 b = self._connect_to_bucket(bucket=dest_bucket) | 310 b = self._connect_to_bucket(bucket=dest_bucket) |
303 if not dest_dir: | 311 if not dest_dir: |
304 dest_dir = '' | 312 dest_dir = '' |
305 | 313 |
306 # Create a set of all files within source_dir. | 314 # Create a set of all files within source_dir. |
307 source_fileset = set() | 315 source_fileset = set() |
308 prefix_length = len(source_dir)+1 | 316 prefix_length = len(source_dir)+1 |
309 for dirpath, _, filenames in os.walk(source_dir): | 317 for dirpath, _, filenames in os.walk(source_dir): |
310 relative_dirpath = dirpath[prefix_length:] | 318 relative_dirpath = dirpath[prefix_length:] |
311 for filename in filenames: | 319 for filename in filenames: |
312 source_fileset.add(os.path.join(relative_dirpath, filename)) | 320 source_fileset.add(os.path.join(relative_dirpath, filename)) |
321 num_files_total = len(source_fileset) | |
313 | 322 |
314 # If we are only uploading files conditionally, remove any unnecessary | 323 # If we are only uploading files conditionally, remove any unnecessary |
315 # files from source_fileset. | 324 # files from source_fileset. |
316 if upload_if == self.UploadIf.ALWAYS: | 325 if upload_if == self.UploadIf.ALWAYS: |
317 pass # there are no shortcuts... upload them all | 326 pass # there are no shortcuts... upload them all |
318 else: | 327 else: |
319 # Create a mapping of filename to Key for existing files within dest_dir | 328 # Create a mapping of filename to Key for existing files within dest_dir |
320 existing_dest_filemap = {} | 329 existing_dest_filemap = {} |
321 prefix = dest_dir | 330 prefix = dest_dir |
322 if prefix and not prefix.endswith('/'): | 331 if prefix and not prefix.endswith('/'): |
(...skipping 12 matching lines...) Expand all Loading... | |
335 elif upload_if == self.UploadIf.IF_MODIFIED: | 344 elif upload_if == self.UploadIf.IF_MODIFIED: |
336 for rel_path in files_in_common: | 345 for rel_path in files_in_common: |
337 local_md5 = '"%s"' % _get_local_md5(path=os.path.join( | 346 local_md5 = '"%s"' % _get_local_md5(path=os.path.join( |
338 source_dir, rel_path)) | 347 source_dir, rel_path)) |
339 key = existing_dest_filemap[rel_path] | 348 key = existing_dest_filemap[rel_path] |
340 if local_md5 == key.etag: | 349 if local_md5 == key.etag: |
341 source_fileset.remove(rel_path) | 350 source_fileset.remove(rel_path) |
342 else: | 351 else: |
343 raise Exception('unknown value of upload_if: %s' % upload_if) | 352 raise Exception('unknown value of upload_if: %s' % upload_if) |
344 | 353 |
354 # Set up a counter of files that have been uploaded. | |
355 # Python's Global Interpreter Lock should make this thread-safe; see | |
356 # http://www.gossamer-threads.com/lists/python/dev/273403 | |
357 num_files_to_upload = len(source_fileset) | |
358 atomic_incrementer = iter(xrange(1, num_files_to_upload+1)).next | |
359 | |
345 # Upload any files still in source_fileset. | 360 # Upload any files still in source_fileset. |
346 for rel_path in sorted(source_fileset): | 361 self.logger.info('Uploading %d files, skipping %d ...' % ( |
347 self.upload_file( | 362 num_files_to_upload, num_files_total - num_files_to_upload)) |
348 source_path=os.path.join(source_dir, rel_path), | 363 if num_files_to_upload == 0: |
349 dest_bucket=b, | 364 return |
350 dest_path=posixpath.join(dest_dir, rel_path), | 365 if num_threads > num_files_to_upload: |
351 upload_if=self.UploadIf.ALWAYS, | 366 num_threads = num_files_to_upload |
borenet
2014/07/28 13:32:11
Maybe line 358 should be moved here so that it doe
epoger
2014/07/28 14:14:13
Done. I don't think there is significant performa
| |
352 **kwargs) | 367 q = Queue(maxsize=2*num_threads) |
368 end_of_queue = object() | |
369 def worker(): | |
370 while True: | |
371 rel_path = q.get() | |
372 if rel_path is end_of_queue: | |
373 q.task_done() | |
374 return | |
borenet
2014/07/28 13:32:11
Why not use q.get(False) or q.get_nowait() and try
epoger
2014/07/28 14:14:14
The problem with checking for an empty queue is th
borenet
2014/07/28 14:20:07
I was thinking of #2, since you're dealing with th
| |
375 self.logger.info(' Uploading file %d/%d: %s' % ( | |
376 atomic_incrementer(), num_files_to_upload, rel_path)) | |
377 self.upload_file( | |
378 source_path=os.path.join(source_dir, rel_path), | |
379 dest_bucket=b, | |
380 dest_path=posixpath.join(dest_dir, rel_path), | |
381 upload_if=self.UploadIf.ALWAYS, | |
382 **kwargs) | |
383 q.task_done() | |
384 # Spin up the threads. | |
385 for _ in range(num_threads): | |
386 t = Thread(target=worker) | |
387 t.daemon = True | |
388 t.start() | |
389 # Start loading files into the work queue. | |
390 for rel_path in source_fileset: | |
391 q.put(rel_path) | |
392 # Tell all workers to go home. | |
393 for _ in range(num_threads): | |
394 q.put(end_of_queue) | |
395 # Block until all files have been uploaded and all workers have gone home. | |
396 q.join() | |
353 | 397 |
354 def download_file(self, source_bucket, source_path, dest_path, | 398 def download_file(self, source_bucket, source_path, dest_path, |
355 create_subdirs_if_needed=False): | 399 create_subdirs_if_needed=False): |
356 """Downloads a single file from Google Cloud Storage to local disk. | 400 """Downloads a single file from Google Cloud Storage to local disk. |
357 | 401 |
358 Args: | 402 Args: |
359 source_bucket: GS bucket to download the file from | 403 source_bucket: GS bucket to download the file from |
360 source_path: full path (Posix-style) within that bucket | 404 source_path: full path (Posix-style) within that bucket |
361 dest_path: full path (local-OS-style) on local disk to copy the file to | 405 dest_path: full path (local-OS-style) on local disk to copy the file to |
362 create_subdirs_if_needed: boolean; whether to create subdirectories as | 406 create_subdirs_if_needed: boolean; whether to create subdirectories as |
(...skipping 240 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
603 | 647 |
604 def _get_local_md5(path): | 648 def _get_local_md5(path): |
605 """Returns the MD5 hash of a file on local disk.""" | 649 """Returns the MD5 hash of a file on local disk.""" |
606 hasher = hashlib.md5() | 650 hasher = hashlib.md5() |
607 with open(path, 'rb') as f: | 651 with open(path, 'rb') as f: |
608 while True: | 652 while True: |
609 data = f.read(64*1024) | 653 data = f.read(64*1024) |
610 if not data: | 654 if not data: |
611 return hasher.hexdigest() | 655 return hasher.hexdigest() |
612 hasher.update(data) | 656 hasher.update(data) |
OLD | NEW |