| Index: third_party/gsutil/gslib/tracker_file.py
|
| diff --git a/third_party/gsutil/gslib/tracker_file.py b/third_party/gsutil/gslib/tracker_file.py
|
| index 4fddc8a13d5d81336fdc73d7e4077e3b68cb0690..55541c1737573fb8ffe733e5a49d459d7c7bba14 100644
|
| --- a/third_party/gsutil/gslib/tracker_file.py
|
| +++ b/third_party/gsutil/gslib/tracker_file.py
|
| @@ -16,6 +16,7 @@
|
|
|
| import errno
|
| import hashlib
|
| +import json
|
| import os
|
| import re
|
|
|
| @@ -40,7 +41,9 @@ TRACKER_FILE_UNWRITABLE_EXCEPTION_TEXT = (
|
| class TrackerFileType(object):
|
| UPLOAD = 'upload'
|
| DOWNLOAD = 'download'
|
| + DOWNLOAD_COMPONENT = 'download_component'
|
| PARALLEL_UPLOAD = 'parallel_upload'
|
| + SLICED_DOWNLOAD = 'sliced_download'
|
| REWRITE = 'rewrite'
|
|
|
|
|
| @@ -110,7 +113,8 @@ def GetRewriteTrackerFilePath(src_bucket_name, src_obj_name, dst_bucket_name,
|
| return _HashAndReturnPath(res_tracker_file_name, TrackerFileType.REWRITE)
|
|
|
|
|
| -def GetTrackerFilePath(dst_url, tracker_file_type, api_selector, src_url=None):
|
| +def GetTrackerFilePath(dst_url, tracker_file_type, api_selector, src_url=None,
|
| + component_num=None):
|
| """Gets the tracker file name described by the arguments.
|
|
|
| Args:
|
| @@ -118,6 +122,7 @@ def GetTrackerFilePath(dst_url, tracker_file_type, api_selector, src_url=None):
|
| tracker_file_type: TrackerFileType for this operation.
|
| api_selector: API to use for this operation.
|
| src_url: Source URL for the source file name for parallel uploads.
|
| + component_num: Component number if this is a download component, else None.
|
|
|
| Returns:
|
| File path to tracker file.
|
| @@ -132,6 +137,13 @@ def GetTrackerFilePath(dst_url, tracker_file_type, api_selector, src_url=None):
|
| res_tracker_file_name = (
|
| re.sub('[/\\\\]', '_', 'resumable_download__%s__%s.etag' %
|
| (os.path.realpath(dst_url.object_name), api_selector)))
|
| + elif tracker_file_type == TrackerFileType.DOWNLOAD_COMPONENT:
|
| + # Encode the fully-qualified dest file name and the component number
|
| + # into the tracker file name.
|
| + res_tracker_file_name = (
|
| + re.sub('[/\\\\]', '_', 'resumable_download__%s__%s__%d.etag' %
|
| + (os.path.realpath(dst_url.object_name), api_selector,
|
| + component_num)))
|
| elif tracker_file_type == TrackerFileType.PARALLEL_UPLOAD:
|
| # Encode the dest bucket and object names as well as the source file name
|
| # into the tracker file name.
|
| @@ -139,6 +151,11 @@ def GetTrackerFilePath(dst_url, tracker_file_type, api_selector, src_url=None):
|
| re.sub('[/\\\\]', '_', 'parallel_upload__%s__%s__%s__%s.url' %
|
| (dst_url.bucket_name, dst_url.object_name,
|
| src_url, api_selector)))
|
| + elif tracker_file_type == TrackerFileType.SLICED_DOWNLOAD:
|
| + # Encode the fully-qualified dest file name into the tracker file name.
|
| + res_tracker_file_name = (
|
| + re.sub('[/\\\\]', '_', 'sliced_download__%s__%s.etag' %
|
| + (os.path.realpath(dst_url.object_name), api_selector)))
|
| elif tracker_file_type == TrackerFileType.REWRITE:
|
| # Should use GetRewriteTrackerFilePath instead.
|
| raise NotImplementedError()
|
| @@ -146,7 +163,73 @@ def GetTrackerFilePath(dst_url, tracker_file_type, api_selector, src_url=None):
|
| return _HashAndReturnPath(res_tracker_file_name, tracker_file_type)
|
|
|
|
|
| +def DeleteDownloadTrackerFiles(dst_url, api_selector):
|
| + """Deletes all tracker files corresponding to an object download.
|
| +
|
| + Args:
|
| + dst_url: StorageUrl describing the destination file.
|
| + api_selector: The Cloud API implementation used.
|
| + """
|
| + # Delete non-sliced download tracker file.
|
| + DeleteTrackerFile(GetTrackerFilePath(dst_url, TrackerFileType.DOWNLOAD,
|
| + api_selector))
|
| +
|
| + # Delete all sliced download tracker files.
|
| + tracker_files = GetSlicedDownloadTrackerFilePaths(dst_url, api_selector)
|
| + for tracker_file in tracker_files:
|
| + DeleteTrackerFile(tracker_file)
|
| +
|
| +
|
| +def GetSlicedDownloadTrackerFilePaths(dst_url, api_selector,
|
| + num_components=None):
|
| + """Gets a list of sliced download tracker file paths.
|
| +
|
| + The list consists of the parent tracker file path in index 0, and then
|
| + any existing component tracker files in [1:].
|
| +
|
| + Args:
|
| + dst_url: Destination URL for tracker file.
|
| + api_selector: API to use for this operation.
|
| + num_components: The number of component tracker files, if already known.
|
| + If not known, the number will be retrieved from the parent
|
| + tracker file on disk.
|
| + Returns:
|
| + File path to tracker file.
|
| + """
|
| + parallel_tracker_file_path = GetTrackerFilePath(
|
| + dst_url, TrackerFileType.SLICED_DOWNLOAD, api_selector)
|
| + tracker_file_paths = [parallel_tracker_file_path]
|
| +
|
| + # If we don't know the number of components, check the tracker file.
|
| + if num_components is None:
|
| + tracker_file = None
|
| + try:
|
| + tracker_file = open(parallel_tracker_file_path, 'r')
|
| + num_components = json.load(tracker_file)['num_components']
|
| + except (IOError, ValueError):
|
| + return tracker_file_paths
|
| + finally:
|
| + if tracker_file:
|
| + tracker_file.close()
|
| +
|
| + for i in range(num_components):
|
| + tracker_file_paths.append(GetTrackerFilePath(
|
| + dst_url, TrackerFileType.DOWNLOAD_COMPONENT, api_selector,
|
| + component_num=i))
|
| +
|
| + return tracker_file_paths
|
| +
|
| +
|
| def _HashAndReturnPath(res_tracker_file_name, tracker_file_type):
|
| + """Hashes and returns a tracker file path.
|
| +
|
| + Args:
|
| + res_tracker_file_name: The tracker file name prior to it being hashed.
|
| + tracker_file_type: The TrackerFileType of res_tracker_file_name.
|
| +
|
| + Returns:
|
| + Final (hashed) tracker file path.
|
| + """
|
| resumable_tracker_dir = CreateTrackerDirIfNeeded()
|
| hashed_tracker_file_name = _HashFilename(res_tracker_file_name)
|
| tracker_file_name = '%s_%s' % (str(tracker_file_type).lower(),
|
| @@ -256,49 +339,104 @@ def WriteRewriteTrackerFile(tracker_file_name, rewrite_params_hash,
|
| rewrite_token))
|
|
|
|
|
| -def ReadOrCreateDownloadTrackerFile(src_obj_metadata, dst_url,
|
| - api_selector):
|
| +def ReadOrCreateDownloadTrackerFile(src_obj_metadata, dst_url, logger,
|
| + api_selector, start_byte,
|
| + existing_file_size, component_num=None):
|
| """Checks for a download tracker file and creates one if it does not exist.
|
|
|
| + The methodology for determining the download start point differs between
|
| + normal and sliced downloads. For normal downloads, the existing bytes in
|
| + the file are presumed to be correct and have been previously downloaded from
|
| + the server (if a tracker file exists). In this case, the existing file size
|
| + is used to determine the download start point. For sliced downloads, the
|
| + number of bytes previously retrieved from the server cannot be determined
|
| + from the existing file size, and so the number of bytes known to have been
|
| + previously downloaded is retrieved from the tracker file.
|
| +
|
| Args:
|
| - src_obj_metadata: Metadata for the source object. Must include
|
| - etag and size.
|
| - dst_url: Destination file StorageUrl.
|
| - api_selector: API mode to use (for tracker file naming).
|
| + src_obj_metadata: Metadata for the source object. Must include etag and
|
| + generation.
|
| + dst_url: Destination URL for tracker file.
|
| + logger: For outputting log messages.
|
| + api_selector: API to use for this operation.
|
| + start_byte: The start byte of the byte range for this download.
|
| + existing_file_size: Size of existing file for this download on disk.
|
| + component_num: The component number, if this is a component of a parallel
|
| + download, else None.
|
|
|
| Returns:
|
| - True if the tracker file already exists (resume existing download),
|
| - False if we created a new tracker file (new download).
|
| + tracker_file_name: The name of the tracker file, if one was used.
|
| + download_start_byte: The first byte that still needs to be downloaded.
|
| """
|
| + assert src_obj_metadata.etag
|
| +
|
| + tracker_file_name = None
|
| if src_obj_metadata.size < ResumableThreshold():
|
| # Don't create a tracker file for a small downloads; cross-process resumes
|
| # won't work, but restarting a small download is inexpensive.
|
| - return False
|
| + return tracker_file_name, start_byte
|
|
|
| - assert src_obj_metadata.etag
|
| - tracker_file_name = GetTrackerFilePath(
|
| - dst_url, TrackerFileType.DOWNLOAD, api_selector)
|
| - tracker_file = None
|
| + download_name = dst_url.object_name
|
| + if component_num is None:
|
| + tracker_file_type = TrackerFileType.DOWNLOAD
|
| + else:
|
| + tracker_file_type = TrackerFileType.DOWNLOAD_COMPONENT
|
| + download_name += ' component %d' % component_num
|
|
|
| + tracker_file_name = GetTrackerFilePath(dst_url, tracker_file_type,
|
| + api_selector,
|
| + component_num=component_num)
|
| + tracker_file = None
|
| # Check to see if we already have a matching tracker file.
|
| try:
|
| tracker_file = open(tracker_file_name, 'r')
|
| - etag_value = tracker_file.readline().rstrip('\n')
|
| - if etag_value == src_obj_metadata.etag:
|
| - return True
|
| - except IOError as e:
|
| + if tracker_file_type is TrackerFileType.DOWNLOAD:
|
| + etag_value = tracker_file.readline().rstrip('\n')
|
| + if etag_value == src_obj_metadata.etag:
|
| + return tracker_file_name, existing_file_size
|
| + elif tracker_file_type is TrackerFileType.DOWNLOAD_COMPONENT:
|
| + component_data = json.loads(tracker_file.read())
|
| + if (component_data['etag'] == src_obj_metadata.etag and
|
| + component_data['generation'] == src_obj_metadata.generation):
|
| + return tracker_file_name, component_data['download_start_byte']
|
| +
|
| + logger.warn('Tracker file doesn\'t match for download of %s. Restarting '
|
| + 'download from scratch.' % download_name)
|
| +
|
| + except (IOError, ValueError) as e:
|
| # Ignore non-existent file (happens first time a download
|
| # is attempted on an object), but warn user for other errors.
|
| - if e.errno != errno.ENOENT:
|
| - print('Couldn\'t read URL tracker file (%s): %s. Restarting '
|
| - 'download from scratch.' %
|
| - (tracker_file_name, e.strerror))
|
| + if isinstance(e, ValueError) or e.errno != errno.ENOENT:
|
| + logger.warn('Couldn\'t read download tracker file (%s): %s. Restarting '
|
| + 'download from scratch.' % (tracker_file_name, str(e)))
|
| finally:
|
| if tracker_file:
|
| tracker_file.close()
|
|
|
| - # Otherwise, create a new tracker file and start from scratch.
|
| - _WriteTrackerFile(tracker_file_name, '%s\n' % src_obj_metadata.etag)
|
| + # There wasn't a matching tracker file, so create one and then start the
|
| + # download from scratch.
|
| + if tracker_file_type is TrackerFileType.DOWNLOAD:
|
| + _WriteTrackerFile(tracker_file_name, '%s\n' % src_obj_metadata.etag)
|
| + elif tracker_file_type is TrackerFileType.DOWNLOAD_COMPONENT:
|
| + WriteDownloadComponentTrackerFile(tracker_file_name, src_obj_metadata,
|
| + start_byte)
|
| + return tracker_file_name, start_byte
|
| +
|
| +
|
| +def WriteDownloadComponentTrackerFile(tracker_file_name, src_obj_metadata,
|
| + current_file_pos):
|
| + """Updates or creates a download component tracker file on disk.
|
| +
|
| + Args:
|
| + tracker_file_name: The name of the tracker file.
|
| + src_obj_metadata: Metadata for the source object. Must include etag.
|
| + current_file_pos: The current position in the file.
|
| + """
|
| + component_data = {'etag': src_obj_metadata.etag,
|
| + 'generation': src_obj_metadata.generation,
|
| + 'download_start_byte': current_file_pos}
|
| +
|
| + _WriteTrackerFile(tracker_file_name, json.dumps(component_data))
|
|
|
|
|
| def _WriteTrackerFile(tracker_file_name, data):
|
|
|