Index: tools/telemetry/third_party/gsutilz/gslib/tracker_file.py |
diff --git a/tools/telemetry/third_party/gsutilz/gslib/tracker_file.py b/tools/telemetry/third_party/gsutilz/gslib/tracker_file.py |
index 4fddc8a13d5d81336fdc73d7e4077e3b68cb0690..55541c1737573fb8ffe733e5a49d459d7c7bba14 100644 |
--- a/tools/telemetry/third_party/gsutilz/gslib/tracker_file.py |
+++ b/tools/telemetry/third_party/gsutilz/gslib/tracker_file.py |
@@ -16,6 +16,7 @@ |
import errno |
import hashlib |
+import json |
import os |
import re |
@@ -40,7 +41,9 @@ TRACKER_FILE_UNWRITABLE_EXCEPTION_TEXT = ( |
class TrackerFileType(object): |
UPLOAD = 'upload' |
DOWNLOAD = 'download' |
+ DOWNLOAD_COMPONENT = 'download_component' |
PARALLEL_UPLOAD = 'parallel_upload' |
+ SLICED_DOWNLOAD = 'sliced_download' |
REWRITE = 'rewrite' |
@@ -110,7 +113,8 @@ def GetRewriteTrackerFilePath(src_bucket_name, src_obj_name, dst_bucket_name, |
return _HashAndReturnPath(res_tracker_file_name, TrackerFileType.REWRITE) |
-def GetTrackerFilePath(dst_url, tracker_file_type, api_selector, src_url=None): |
+def GetTrackerFilePath(dst_url, tracker_file_type, api_selector, src_url=None, |
+ component_num=None): |
"""Gets the tracker file name described by the arguments. |
Args: |
@@ -118,6 +122,7 @@ def GetTrackerFilePath(dst_url, tracker_file_type, api_selector, src_url=None): |
tracker_file_type: TrackerFileType for this operation. |
api_selector: API to use for this operation. |
src_url: Source URL for the source file name for parallel uploads. |
+ component_num: Component number if this is a download component, else None. |
Returns: |
File path to tracker file. |
@@ -132,6 +137,13 @@ def GetTrackerFilePath(dst_url, tracker_file_type, api_selector, src_url=None): |
res_tracker_file_name = ( |
re.sub('[/\\\\]', '_', 'resumable_download__%s__%s.etag' % |
(os.path.realpath(dst_url.object_name), api_selector))) |
+ elif tracker_file_type == TrackerFileType.DOWNLOAD_COMPONENT: |
+ # Encode the fully-qualified dest file name and the component number |
+ # into the tracker file name. |
+ res_tracker_file_name = ( |
+ re.sub('[/\\\\]', '_', 'resumable_download__%s__%s__%d.etag' % |
+ (os.path.realpath(dst_url.object_name), api_selector, |
+ component_num))) |
elif tracker_file_type == TrackerFileType.PARALLEL_UPLOAD: |
# Encode the dest bucket and object names as well as the source file name |
# into the tracker file name. |
@@ -139,6 +151,11 @@ def GetTrackerFilePath(dst_url, tracker_file_type, api_selector, src_url=None): |
re.sub('[/\\\\]', '_', 'parallel_upload__%s__%s__%s__%s.url' % |
(dst_url.bucket_name, dst_url.object_name, |
src_url, api_selector))) |
+ elif tracker_file_type == TrackerFileType.SLICED_DOWNLOAD: |
+ # Encode the fully-qualified dest file name into the tracker file name. |
+ res_tracker_file_name = ( |
+ re.sub('[/\\\\]', '_', 'sliced_download__%s__%s.etag' % |
+ (os.path.realpath(dst_url.object_name), api_selector))) |
elif tracker_file_type == TrackerFileType.REWRITE: |
# Should use GetRewriteTrackerFilePath instead. |
raise NotImplementedError() |
@@ -146,7 +163,73 @@ def GetTrackerFilePath(dst_url, tracker_file_type, api_selector, src_url=None): |
return _HashAndReturnPath(res_tracker_file_name, tracker_file_type) |
+def DeleteDownloadTrackerFiles(dst_url, api_selector): |
+ """Deletes all tracker files corresponding to an object download. |
+ |
+ Args: |
+ dst_url: StorageUrl describing the destination file. |
+ api_selector: The Cloud API implementation used. |
+ """ |
+ # Delete non-sliced download tracker file. |
+ DeleteTrackerFile(GetTrackerFilePath(dst_url, TrackerFileType.DOWNLOAD, |
+ api_selector)) |
+ |
+ # Delete all sliced download tracker files. |
+ tracker_files = GetSlicedDownloadTrackerFilePaths(dst_url, api_selector) |
+ for tracker_file in tracker_files: |
+ DeleteTrackerFile(tracker_file) |
+ |
+ |
+def GetSlicedDownloadTrackerFilePaths(dst_url, api_selector, |
+ num_components=None): |
+ """Gets a list of sliced download tracker file paths. |
+ |
+ The list consists of the parent tracker file path in index 0, and then |
+ any existing component tracker files in [1:]. |
+ |
+ Args: |
+ dst_url: Destination URL for tracker file. |
+ api_selector: API to use for this operation. |
+ num_components: The number of component tracker files, if already known. |
+ If not known, the number will be retrieved from the parent |
+ tracker file on disk. |
+ Returns: |
+ File path to tracker file. |
+ """ |
+ parallel_tracker_file_path = GetTrackerFilePath( |
+ dst_url, TrackerFileType.SLICED_DOWNLOAD, api_selector) |
+ tracker_file_paths = [parallel_tracker_file_path] |
+ |
+ # If we don't know the number of components, check the tracker file. |
+ if num_components is None: |
+ tracker_file = None |
+ try: |
+ tracker_file = open(parallel_tracker_file_path, 'r') |
+ num_components = json.load(tracker_file)['num_components'] |
+ except (IOError, ValueError): |
+ return tracker_file_paths |
+ finally: |
+ if tracker_file: |
+ tracker_file.close() |
+ |
+ for i in range(num_components): |
+ tracker_file_paths.append(GetTrackerFilePath( |
+ dst_url, TrackerFileType.DOWNLOAD_COMPONENT, api_selector, |
+ component_num=i)) |
+ |
+ return tracker_file_paths |
+ |
+ |
def _HashAndReturnPath(res_tracker_file_name, tracker_file_type): |
+ """Hashes and returns a tracker file path. |
+ |
+ Args: |
+ res_tracker_file_name: The tracker file name prior to it being hashed. |
+ tracker_file_type: The TrackerFileType of res_tracker_file_name. |
+ |
+ Returns: |
+ Final (hashed) tracker file path. |
+ """ |
resumable_tracker_dir = CreateTrackerDirIfNeeded() |
hashed_tracker_file_name = _HashFilename(res_tracker_file_name) |
tracker_file_name = '%s_%s' % (str(tracker_file_type).lower(), |
@@ -256,49 +339,104 @@ def WriteRewriteTrackerFile(tracker_file_name, rewrite_params_hash, |
rewrite_token)) |
-def ReadOrCreateDownloadTrackerFile(src_obj_metadata, dst_url, |
- api_selector): |
+def ReadOrCreateDownloadTrackerFile(src_obj_metadata, dst_url, logger, |
+ api_selector, start_byte, |
+ existing_file_size, component_num=None): |
"""Checks for a download tracker file and creates one if it does not exist. |
+ The methodology for determining the download start point differs between |
+ normal and sliced downloads. For normal downloads, the existing bytes in |
+ the file are presumed to be correct and have been previously downloaded from |
+ the server (if a tracker file exists). In this case, the existing file size |
+ is used to determine the download start point. For sliced downloads, the |
+ number of bytes previously retrieved from the server cannot be determined |
+ from the existing file size, and so the number of bytes known to have been |
+ previously downloaded is retrieved from the tracker file. |
+ |
Args: |
- src_obj_metadata: Metadata for the source object. Must include |
- etag and size. |
- dst_url: Destination file StorageUrl. |
- api_selector: API mode to use (for tracker file naming). |
+ src_obj_metadata: Metadata for the source object. Must include etag and |
+ generation. |
+ dst_url: Destination URL for tracker file. |
+ logger: For outputting log messages. |
+ api_selector: API to use for this operation. |
+ start_byte: The start byte of the byte range for this download. |
+ existing_file_size: Size of existing file for this download on disk. |
+ component_num: The component number, if this is a component of a parallel |
+ download, else None. |
Returns: |
- True if the tracker file already exists (resume existing download), |
- False if we created a new tracker file (new download). |
+ tracker_file_name: The name of the tracker file, if one was used. |
+ download_start_byte: The first byte that still needs to be downloaded. |
""" |
+ assert src_obj_metadata.etag |
+ |
+ tracker_file_name = None |
if src_obj_metadata.size < ResumableThreshold(): |
# Don't create a tracker file for a small downloads; cross-process resumes |
# won't work, but restarting a small download is inexpensive. |
- return False |
+ return tracker_file_name, start_byte |
- assert src_obj_metadata.etag |
- tracker_file_name = GetTrackerFilePath( |
- dst_url, TrackerFileType.DOWNLOAD, api_selector) |
- tracker_file = None |
+ download_name = dst_url.object_name |
+ if component_num is None: |
+ tracker_file_type = TrackerFileType.DOWNLOAD |
+ else: |
+ tracker_file_type = TrackerFileType.DOWNLOAD_COMPONENT |
+ download_name += ' component %d' % component_num |
+ tracker_file_name = GetTrackerFilePath(dst_url, tracker_file_type, |
+ api_selector, |
+ component_num=component_num) |
+ tracker_file = None |
# Check to see if we already have a matching tracker file. |
try: |
tracker_file = open(tracker_file_name, 'r') |
- etag_value = tracker_file.readline().rstrip('\n') |
- if etag_value == src_obj_metadata.etag: |
- return True |
- except IOError as e: |
+ if tracker_file_type is TrackerFileType.DOWNLOAD: |
+ etag_value = tracker_file.readline().rstrip('\n') |
+ if etag_value == src_obj_metadata.etag: |
+ return tracker_file_name, existing_file_size |
+ elif tracker_file_type is TrackerFileType.DOWNLOAD_COMPONENT: |
+ component_data = json.loads(tracker_file.read()) |
+ if (component_data['etag'] == src_obj_metadata.etag and |
+ component_data['generation'] == src_obj_metadata.generation): |
+ return tracker_file_name, component_data['download_start_byte'] |
+ |
+ logger.warn('Tracker file doesn\'t match for download of %s. Restarting ' |
+ 'download from scratch.' % download_name) |
+ |
+ except (IOError, ValueError) as e: |
# Ignore non-existent file (happens first time a download |
# is attempted on an object), but warn user for other errors. |
- if e.errno != errno.ENOENT: |
- print('Couldn\'t read URL tracker file (%s): %s. Restarting ' |
- 'download from scratch.' % |
- (tracker_file_name, e.strerror)) |
+ if isinstance(e, ValueError) or e.errno != errno.ENOENT: |
+ logger.warn('Couldn\'t read download tracker file (%s): %s. Restarting ' |
+ 'download from scratch.' % (tracker_file_name, str(e))) |
finally: |
if tracker_file: |
tracker_file.close() |
- # Otherwise, create a new tracker file and start from scratch. |
- _WriteTrackerFile(tracker_file_name, '%s\n' % src_obj_metadata.etag) |
+ # There wasn't a matching tracker file, so create one and then start the |
+ # download from scratch. |
+ if tracker_file_type is TrackerFileType.DOWNLOAD: |
+ _WriteTrackerFile(tracker_file_name, '%s\n' % src_obj_metadata.etag) |
+ elif tracker_file_type is TrackerFileType.DOWNLOAD_COMPONENT: |
+ WriteDownloadComponentTrackerFile(tracker_file_name, src_obj_metadata, |
+ start_byte) |
+ return tracker_file_name, start_byte |
+ |
+ |
+def WriteDownloadComponentTrackerFile(tracker_file_name, src_obj_metadata, |
+ current_file_pos): |
+ """Updates or creates a download component tracker file on disk. |
+ |
+ Args: |
+ tracker_file_name: The name of the tracker file. |
+ src_obj_metadata: Metadata for the source object. Must include etag. |
+ current_file_pos: The current position in the file. |
+ """ |
+ component_data = {'etag': src_obj_metadata.etag, |
+ 'generation': src_obj_metadata.generation, |
+ 'download_start_byte': current_file_pos} |
+ |
+ _WriteTrackerFile(tracker_file_name, json.dumps(component_data)) |
def _WriteTrackerFile(tracker_file_name, data): |