OLD | NEW |
1 #!/usr/bin/python | 1 #!/usr/bin/python |
2 | 2 |
3 # pylint: disable=C0301 | 3 # pylint: disable=C0301 |
4 """ | 4 """ |
5 Copyright 2014 Google Inc. | 5 Copyright 2014 Google Inc. |
6 | 6 |
7 Use of this source code is governed by a BSD-style license that can be | 7 Use of this source code is governed by a BSD-style license that can be |
8 found in the LICENSE file. | 8 found in the LICENSE file. |
9 | 9 |
10 Utilities for accessing Google Cloud Storage, using the boto library (wrapper | 10 Utilities for accessing Google Cloud Storage, using the boto library (wrapper |
11 for the XML API). | 11 for the XML API). |
12 | 12 |
13 API/library references: | 13 API/library references: |
14 - https://developers.google.com/storage/docs/reference-guide | 14 - https://developers.google.com/storage/docs/reference-guide |
15 - http://googlecloudstorage.blogspot.com/2012/09/google-cloud-storage-tutorial-u
sing-boto.html | 15 - http://googlecloudstorage.blogspot.com/2012/09/google-cloud-storage-tutorial-u
sing-boto.html |
16 """ | 16 """ |
17 # pylint: enable=C0301 | 17 # pylint: enable=C0301 |
18 | 18 |
19 # System-level imports | 19 # System-level imports |
20 import errno | 20 import errno |
21 import hashlib | 21 import hashlib |
| 22 import logging |
22 import os | 23 import os |
23 import posixpath | 24 import posixpath |
| 25 import Queue |
24 import re | 26 import re |
25 import sys | 27 import sys |
| 28 import threading |
26 | 29 |
27 # Imports from third-party code | 30 # Imports from third-party code |
28 TRUNK_DIRECTORY = os.path.abspath(os.path.join( | 31 TRUNK_DIRECTORY = os.path.abspath(os.path.join( |
29 os.path.dirname(__file__), os.pardir, os.pardir)) | 32 os.path.dirname(__file__), os.pardir, os.pardir)) |
30 for import_subdir in ['boto']: | 33 for import_subdir in ['boto']: |
31 import_dirpath = os.path.join( | 34 import_dirpath = os.path.join( |
32 TRUNK_DIRECTORY, 'third_party', 'externals', import_subdir) | 35 TRUNK_DIRECTORY, 'third_party', 'externals', import_subdir) |
33 if import_dirpath not in sys.path: | 36 if import_dirpath not in sys.path: |
34 # We need to insert at the beginning of the path, to make sure that our | 37 # We need to insert at the beginning of the path, to make sure that our |
35 # imported versions are favored over others that might be in the path. | 38 # imported versions are favored over others that might be in the path. |
36 sys.path.insert(0, import_dirpath) | 39 sys.path.insert(0, import_dirpath) |
37 from boto.exception import BotoServerError | 40 from boto.exception import BotoServerError |
38 from boto.gs import acl | 41 from boto.gs import acl |
39 from boto.gs.bucket import Bucket | 42 from boto.gs.bucket import Bucket |
40 from boto.gs.connection import GSConnection | 43 from boto.gs.connection import GSConnection |
41 from boto.gs.key import Key | 44 from boto.gs.key import Key |
42 from boto.s3.bucketlistresultset import BucketListResultSet | 45 from boto.s3.bucketlistresultset import BucketListResultSet |
43 from boto.s3.connection import SubdomainCallingFormat | 46 from boto.s3.connection import SubdomainCallingFormat |
44 from boto.s3.prefix import Prefix | 47 from boto.s3.prefix import Prefix |
45 | 48 |
| 49 # How many files to upload at once, by default. |
| 50 # TODO(epoger): Is there a way to compute this intelligently? To some extent |
| 51 # it is a function of how many cores are on the machine, and how many other |
| 52 # processes it is running; but it's probably more a function of how much time |
| 53 # each core sits idle waiting for network I/O to complete. |
| 54 DEFAULT_UPLOAD_THREADS = 10 |
| 55 |
46 | 56 |
47 class AnonymousGSConnection(GSConnection): | 57 class AnonymousGSConnection(GSConnection): |
48 """GSConnection class that allows anonymous connections. | 58 """GSConnection class that allows anonymous connections. |
49 | 59 |
50 The GSConnection class constructor in | 60 The GSConnection class constructor in |
51 https://github.com/boto/boto/blob/develop/boto/gs/connection.py doesn't allow | 61 https://github.com/boto/boto/blob/develop/boto/gs/connection.py doesn't allow |
52 for anonymous connections (connections without credentials), so we have to | 62 for anonymous connections (connections without credentials), so we have to |
53 override it. | 63 override it. |
54 """ | 64 """ |
55 def __init__(self): | 65 def __init__(self): |
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
109 take longer than just uploading the file. | 119 take longer than just uploading the file. |
110 See http://skbug.com/2778 ('gs_utils: when uploading IF_NEW, batch up | 120 See http://skbug.com/2778 ('gs_utils: when uploading IF_NEW, batch up |
111 checks for existing files within a single remote directory') | 121 checks for existing files within a single remote directory') |
112 """ | 122 """ |
113 ALWAYS = 1 # always upload the file | 123 ALWAYS = 1 # always upload the file |
114 IF_NEW = 2 # if there is an existing file with the same name, | 124 IF_NEW = 2 # if there is an existing file with the same name, |
115 # leave it alone | 125 # leave it alone |
116 IF_MODIFIED = 3 # if there is an existing file with the same name and | 126 IF_MODIFIED = 3 # if there is an existing file with the same name and |
117 # contents, leave it alone | 127 # contents, leave it alone |
118 | 128 |
119 def __init__(self, boto_file_path=None): | 129 def __init__(self, boto_file_path=None, logger=None): |
120 """Constructor. | 130 """Constructor. |
121 | 131 |
122 Params: | 132 Params: |
123 boto_file_path: full path (local-OS-style) on local disk where .boto | 133 boto_file_path: full path (local-OS-style) on local disk where .boto |
124 credentials file can be found. If None, then the GSUtils object | 134 credentials file can be found. If None, then the GSUtils object |
125 created will be able to access only public files in Google Storage. | 135 created will be able to access only public files in Google Storage. |
| 136 logger: a logging.Logger instance to use for logging output; if None, |
| 137 one will be created with default characteristics |
126 | 138 |
127 Raises an exception if no file is found at boto_file_path, or if the file | 139 Raises an exception if no file is found at boto_file_path, or if the file |
128 found there is malformed. | 140 found there is malformed. |
129 """ | 141 """ |
| 142 self.logger = logger or logging.getLogger(__name__) |
130 self._gs_access_key_id = None | 143 self._gs_access_key_id = None |
131 self._gs_secret_access_key = None | 144 self._gs_secret_access_key = None |
132 if boto_file_path: | 145 if boto_file_path: |
133 print 'Reading boto file from %s' % boto_file_path | 146 self.logger.info('Reading boto file from %s' % boto_file_path) |
134 boto_dict = _config_file_as_dict(filepath=boto_file_path) | 147 boto_dict = _config_file_as_dict(filepath=boto_file_path) |
135 self._gs_access_key_id = boto_dict['gs_access_key_id'] | 148 self._gs_access_key_id = boto_dict['gs_access_key_id'] |
136 self._gs_secret_access_key = boto_dict['gs_secret_access_key'] | 149 self._gs_secret_access_key = boto_dict['gs_secret_access_key'] |
137 # Which field we get/set in ACL entries, depending on IdType. | 150 # Which field we get/set in ACL entries, depending on IdType. |
138 self._field_by_id_type = { | 151 self._field_by_id_type = { |
139 self.IdType.GROUP_BY_DOMAIN: 'domain', | 152 self.IdType.GROUP_BY_DOMAIN: 'domain', |
140 self.IdType.GROUP_BY_EMAIL: 'email_address', | 153 self.IdType.GROUP_BY_EMAIL: 'email_address', |
141 self.IdType.GROUP_BY_ID: 'id', | 154 self.IdType.GROUP_BY_ID: 'id', |
142 self.IdType.USER_BY_EMAIL: 'email_address', | 155 self.IdType.USER_BY_EMAIL: 'email_address', |
143 self.IdType.USER_BY_ID: 'id', | 156 self.IdType.USER_BY_ID: 'id', |
(...skipping 64 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
208 so that HTTP downloads of the file would be unzipped automatically. | 221 so that HTTP downloads of the file would be unzipped automatically. |
209 See https://developers.google.com/storage/docs/gsutil/addlhelp/ | 222 See https://developers.google.com/storage/docs/gsutil/addlhelp/ |
210 WorkingWithObjectMetadata#content-encoding | 223 WorkingWithObjectMetadata#content-encoding |
211 """ | 224 """ |
212 b = self._connect_to_bucket(bucket=dest_bucket) | 225 b = self._connect_to_bucket(bucket=dest_bucket) |
213 local_md5 = None # filled in lazily | 226 local_md5 = None # filled in lazily |
214 | 227 |
215 if upload_if == self.UploadIf.IF_NEW: | 228 if upload_if == self.UploadIf.IF_NEW: |
216 old_key = b.get_key(key_name=dest_path) | 229 old_key = b.get_key(key_name=dest_path) |
217 if old_key: | 230 if old_key: |
218 print 'Skipping upload of existing file gs://%s/%s' % ( | 231 self.logger.info('Skipping upload of existing file gs://%s/%s' % ( |
219 b.name, dest_path) | 232 b.name, dest_path)) |
220 return | 233 return |
221 elif upload_if == self.UploadIf.IF_MODIFIED: | 234 elif upload_if == self.UploadIf.IF_MODIFIED: |
222 old_key = b.get_key(key_name=dest_path) | 235 old_key = b.get_key(key_name=dest_path) |
223 if old_key: | 236 if old_key: |
224 if not local_md5: | 237 if not local_md5: |
225 local_md5 = _get_local_md5(path=source_path) | 238 local_md5 = _get_local_md5(path=source_path) |
226 if ('"%s"' % local_md5) == old_key.etag: | 239 if ('"%s"' % local_md5) == old_key.etag: |
227 print 'Skipping upload of unmodified file gs://%s/%s : %s' % ( | 240 self.logger.info( |
228 b.name, dest_path, local_md5) | 241 'Skipping upload of unmodified file gs://%s/%s : %s' % ( |
| 242 b.name, dest_path, local_md5)) |
229 return | 243 return |
230 elif upload_if != self.UploadIf.ALWAYS: | 244 elif upload_if != self.UploadIf.ALWAYS: |
231 raise Exception('unknown value of upload_if: %s' % upload_if) | 245 raise Exception('unknown value of upload_if: %s' % upload_if) |
232 | 246 |
233 # Upload the file using a temporary name at first, in case the transfer | 247 # Upload the file using a temporary name at first, in case the transfer |
234 # is interrupted partway through. | 248 # is interrupted partway through. |
235 if not local_md5: | 249 if not local_md5: |
236 local_md5 = _get_local_md5(path=source_path) | 250 local_md5 = _get_local_md5(path=source_path) |
237 initial_key = Key(b) | 251 initial_key = Key(b) |
238 initial_key.name = dest_path + '-uploading-' + local_md5 | 252 initial_key.name = dest_path + '-uploading-' + local_md5 |
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
273 | 287 |
274 # Set ACLs on the file. | 288 # Set ACLs on the file. |
275 # We do this *after* copy_key(), because copy_key's preserve_acl | 289 # We do this *after* copy_key(), because copy_key's preserve_acl |
276 # functionality would incur a performance hit. | 290 # functionality would incur a performance hit. |
277 for (id_type, id_value, permission) in fine_grained_acl_list or []: | 291 for (id_type, id_value, permission) in fine_grained_acl_list or []: |
278 self.set_acl( | 292 self.set_acl( |
279 bucket=b, path=final_key.name, | 293 bucket=b, path=final_key.name, |
280 id_type=id_type, id_value=id_value, permission=permission) | 294 id_type=id_type, id_value=id_value, permission=permission) |
281 | 295 |
282 def upload_dir_contents(self, source_dir, dest_bucket, dest_dir, | 296 def upload_dir_contents(self, source_dir, dest_bucket, dest_dir, |
| 297 num_threads=DEFAULT_UPLOAD_THREADS, |
283 upload_if=UploadIf.ALWAYS, **kwargs): | 298 upload_if=UploadIf.ALWAYS, **kwargs): |
284 """Recursively upload contents of a local directory to Google Storage. | 299 """Recursively upload contents of a local directory to Google Storage. |
285 | 300 |
286 params: | 301 params: |
287 source_dir: full path (local-OS-style) on local disk of directory to copy | 302 source_dir: full path (local-OS-style) on local disk of directory to copy |
288 contents of | 303 contents of |
289 dest_bucket: GS bucket to copy the files into | 304 dest_bucket: GS bucket to copy the files into |
290 dest_dir: full path (Posix-style) within that bucket; write the files into | 305 dest_dir: full path (Posix-style) within that bucket; write the files into |
291 this directory. If None, write into the root directory of the bucket. | 306 this directory. If None, write into the root directory of the bucket. |
| 307 num_threads: how many files to upload at once |
292 upload_if: one of the UploadIf values, describing in which cases we should | 308 upload_if: one of the UploadIf values, describing in which cases we should |
293 upload the file | 309 upload the file |
294 kwargs: any additional keyword arguments "inherited" from upload_file() | 310 kwargs: any additional keyword arguments "inherited" from upload_file() |
295 | 311 |
296 The copy operates as a merge: any files in source_dir will be "overlaid" on | 312 The copy operates as a merge: any files in source_dir will be "overlaid" on |
297 top of the existing content in dest_dir. Existing files with the same names | 313 top of the existing content in dest_dir. Existing files with the same names |
298 may or may not be overwritten, depending on the value of upload_if. | 314 may or may not be overwritten, depending on the value of upload_if. |
299 | 315 |
300 TODO(epoger): Upload multiple files simultaneously to reduce latency. | 316 TODO(epoger): Upload multiple files simultaneously to reduce latency. |
301 """ | 317 """ |
302 b = self._connect_to_bucket(bucket=dest_bucket) | 318 b = self._connect_to_bucket(bucket=dest_bucket) |
303 if not dest_dir: | 319 if not dest_dir: |
304 dest_dir = '' | 320 dest_dir = '' |
305 | 321 |
306 # Create a set of all files within source_dir. | 322 # Create a set of all files within source_dir. |
307 source_fileset = set() | 323 source_fileset = set() |
308 prefix_length = len(source_dir)+1 | 324 prefix_length = len(source_dir)+1 |
309 for dirpath, _, filenames in os.walk(source_dir): | 325 for dirpath, _, filenames in os.walk(source_dir): |
310 relative_dirpath = dirpath[prefix_length:] | 326 relative_dirpath = dirpath[prefix_length:] |
311 for filename in filenames: | 327 for filename in filenames: |
312 source_fileset.add(os.path.join(relative_dirpath, filename)) | 328 source_fileset.add(os.path.join(relative_dirpath, filename)) |
| 329 num_files_total = len(source_fileset) |
313 | 330 |
314 # If we are only uploading files conditionally, remove any unnecessary | 331 # If we are only uploading files conditionally, remove any unnecessary |
315 # files from source_fileset. | 332 # files from source_fileset. |
316 if upload_if == self.UploadIf.ALWAYS: | 333 if upload_if == self.UploadIf.ALWAYS: |
317 pass # there are no shortcuts... upload them all | 334 pass # there are no shortcuts... upload them all |
318 else: | 335 else: |
319 # Create a mapping of filename to Key for existing files within dest_dir | 336 # Create a mapping of filename to Key for existing files within dest_dir |
320 existing_dest_filemap = {} | 337 existing_dest_filemap = {} |
321 prefix = dest_dir | 338 prefix = dest_dir |
322 if prefix and not prefix.endswith('/'): | 339 if prefix and not prefix.endswith('/'): |
(...skipping 13 matching lines...) Expand all Loading... |
336 for rel_path in files_in_common: | 353 for rel_path in files_in_common: |
337 local_md5 = '"%s"' % _get_local_md5(path=os.path.join( | 354 local_md5 = '"%s"' % _get_local_md5(path=os.path.join( |
338 source_dir, rel_path)) | 355 source_dir, rel_path)) |
339 key = existing_dest_filemap[rel_path] | 356 key = existing_dest_filemap[rel_path] |
340 if local_md5 == key.etag: | 357 if local_md5 == key.etag: |
341 source_fileset.remove(rel_path) | 358 source_fileset.remove(rel_path) |
342 else: | 359 else: |
343 raise Exception('unknown value of upload_if: %s' % upload_if) | 360 raise Exception('unknown value of upload_if: %s' % upload_if) |
344 | 361 |
345 # Upload any files still in source_fileset. | 362 # Upload any files still in source_fileset. |
346 for rel_path in sorted(source_fileset): | 363 num_files_to_upload = len(source_fileset) |
347 self.upload_file( | 364 self.logger.info('Uploading %d files, skipping %d ...' % ( |
348 source_path=os.path.join(source_dir, rel_path), | 365 num_files_to_upload, num_files_total - num_files_to_upload)) |
349 dest_bucket=b, | 366 if num_files_to_upload == 0: |
350 dest_path=posixpath.join(dest_dir, rel_path), | 367 return |
351 upload_if=self.UploadIf.ALWAYS, | 368 if num_threads > num_files_to_upload: |
352 **kwargs) | 369 num_threads = num_files_to_upload |
| 370 |
| 371 # Create a work queue with all files that need to be uploaded. |
| 372 q = Queue.Queue(maxsize=num_files_to_upload) |
| 373 for rel_path in source_fileset: |
| 374 q.put(rel_path) |
| 375 |
| 376 # Spin up worker threads to read from the task queue. |
| 377 def worker(): |
| 378 while True: |
| 379 try: |
| 380 rel_path = q.get(block=False) |
| 381 except Queue.Empty: |
| 382 return # no more tasks in the queue, so exit |
| 383 self.logger.info(' Uploading file %d/%d: %s' % ( |
| 384 num_files_to_upload - q.qsize(), num_files_to_upload, rel_path)) |
| 385 self.upload_file( |
| 386 source_path=os.path.join(source_dir, rel_path), |
| 387 dest_bucket=b, |
| 388 dest_path=posixpath.join(dest_dir, rel_path), |
| 389 upload_if=self.UploadIf.ALWAYS, |
| 390 **kwargs) |
| 391 q.task_done() |
| 392 for _ in range(num_threads): |
| 393 t = threading.Thread(target=worker) |
| 394 t.daemon = True |
| 395 t.start() |
| 396 |
| 397 # Block until all files have been uploaded and all workers have exited. |
| 398 q.join() |
353 | 399 |
354 def download_file(self, source_bucket, source_path, dest_path, | 400 def download_file(self, source_bucket, source_path, dest_path, |
355 create_subdirs_if_needed=False): | 401 create_subdirs_if_needed=False): |
356 """Downloads a single file from Google Cloud Storage to local disk. | 402 """Downloads a single file from Google Cloud Storage to local disk. |
357 | 403 |
358 Args: | 404 Args: |
359 source_bucket: GS bucket to download the file from | 405 source_bucket: GS bucket to download the file from |
360 source_path: full path (Posix-style) within that bucket | 406 source_path: full path (Posix-style) within that bucket |
361 dest_path: full path (local-OS-style) on local disk to copy the file to | 407 dest_path: full path (local-OS-style) on local disk to copy the file to |
362 create_subdirs_if_needed: boolean; whether to create subdirectories as | 408 create_subdirs_if_needed: boolean; whether to create subdirectories as |
(...skipping 240 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
603 | 649 |
604 def _get_local_md5(path): | 650 def _get_local_md5(path): |
605 """Returns the MD5 hash of a file on local disk.""" | 651 """Returns the MD5 hash of a file on local disk.""" |
606 hasher = hashlib.md5() | 652 hasher = hashlib.md5() |
607 with open(path, 'rb') as f: | 653 with open(path, 'rb') as f: |
608 while True: | 654 while True: |
609 data = f.read(64*1024) | 655 data = f.read(64*1024) |
610 if not data: | 656 if not data: |
611 return hasher.hexdigest() | 657 return hasher.hexdigest() |
612 hasher.update(data) | 658 hasher.update(data) |
OLD | NEW |