Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(5)

Unified Diff: third_party/gsutil/gslib/name_expansion.py

Issue 1377933002: [catapult] - Copy Telemetry's gsutilz over to third_party. (Closed) Base URL: https://github.com/catapult-project/catapult.git@master
Patch Set: Rename to gsutil. Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « third_party/gsutil/gslib/ls_helper.py ('k') | third_party/gsutil/gslib/no_op_auth_plugin.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: third_party/gsutil/gslib/name_expansion.py
diff --git a/third_party/gsutil/gslib/name_expansion.py b/third_party/gsutil/gslib/name_expansion.py
new file mode 100644
index 0000000000000000000000000000000000000000..0d8b6cae9bbb4b1dec88942eeca0ed4c70328497
--- /dev/null
+++ b/third_party/gsutil/gslib/name_expansion.py
@@ -0,0 +1,530 @@
+# -*- coding: utf-8 -*-
+# Copyright 2012 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Name expansion iterator and result classes.
+
+Name expansion support for the various ways gsutil lets users refer to
+collections of data (via explicit wildcarding as well as directory,
+bucket, and bucket subdir implicit wildcarding). This class encapsulates
+the various rules for determining how these expansions are done.
+"""
+
+# Disable warnings for NameExpansionIteratorQueue functions; they implement
+# an interface which does not follow lint guidelines.
+# pylint: disable=invalid-name
+
+from __future__ import absolute_import
+
+import multiprocessing
+import os
+import sys
+
+from gslib.exception import CommandException
+from gslib.plurality_checkable_iterator import PluralityCheckableIterator
+import gslib.wildcard_iterator
+from gslib.wildcard_iterator import StorageUrlFromString
+
+
+class NameExpansionResult(object):
+ """Holds one fully expanded result from iterating over NameExpansionIterator.
+
+ The member data in this class need to be pickleable because
+ NameExpansionResult instances are passed through Multiprocessing.Queue. In
+ particular, don't include any boto state like StorageUri, since that pulls
+ in a big tree of objects, some of which aren't pickleable (and even if
+ they were, pickling/unpickling such a large object tree would result in
+ significant overhead).
+
+ The state held in this object is needed for handling the various naming cases
+ (e.g., copying from a single source URL to a directory generates different
+ dest URL names than copying multiple URLs to a directory, to be consistent
+ with naming rules used by the Unix cp command). For more details see comments
+ in _NameExpansionIterator.
+ """
+
+ def __init__(self, source_storage_url, is_multi_source_request,
+ names_container, expanded_storage_url):
+ """Instantiates a result from name expansion.
+
+ Args:
+ source_storage_url: StorageUrl that was being expanded.
+ is_multi_source_request: bool indicator whether src_url_str expanded to
+ more than one BucketListingRef.
+ names_container: Bool indicator whether src_url names a container.
+ expanded_storage_url: StorageUrl that was expanded.
+ """
+ self.source_storage_url = source_storage_url
+ self.is_multi_source_request = is_multi_source_request
+ self.names_container = names_container
+ self.expanded_storage_url = expanded_storage_url
+
+ def __repr__(self):
+ return '%s' % self._expanded_storage_url
+
+
+class _NameExpansionIterator(object):
+ """Class that iterates over all source URLs passed to the iterator.
+
+ See details in __iter__ function doc.
+ """
+
+ def __init__(self, command_name, debug, logger, gsutil_api, url_strs,
+ recursion_requested, all_versions=False,
+ cmd_supports_recursion=True, project_id=None,
+ continue_on_error=False):
+ """Creates a NameExpansionIterator.
+
+ Args:
+ command_name: name of command being run.
+ debug: Debug level to pass to underlying iterators (range 0..3).
+ logger: logging.Logger object.
+ gsutil_api: Cloud storage interface. Settable for testing/mocking.
+ url_strs: PluralityCheckableIterator of URL strings needing expansion.
+ recursion_requested: True if -r specified on command-line. If so,
+ listings will be flattened so mapped-to results contain objects
+ spanning subdirectories.
+ all_versions: Bool indicating whether to iterate over all object versions.
+ cmd_supports_recursion: Bool indicating whether this command supports a
+ '-r' flag. Useful for printing helpful error messages.
+ project_id: Project id to use for bucket retrieval.
+ continue_on_error: If true, yield no-match exceptions encountered during
+ iteration instead of raising them.
+
+ Examples of _NameExpansionIterator with recursion_requested=True:
+ - Calling with one of the url_strs being 'gs://bucket' will enumerate all
+ top-level objects, as will 'gs://bucket/' and 'gs://bucket/*'.
+ - 'gs://bucket/**' will enumerate all objects in the bucket.
+ - 'gs://bucket/abc' will enumerate either the single object abc or, if
+ abc is a subdirectory, all objects under abc and any of its
+ subdirectories.
+ - 'gs://bucket/abc/**' will enumerate all objects under abc or any of its
+ subdirectories.
+ - 'file:///tmp' will enumerate all files under /tmp, as will
+ 'file:///tmp/*'
+ - 'file:///tmp/**' will enumerate all files under /tmp or any of its
+ subdirectories.
+
+ Example if recursion_requested=False:
+ calling with gs://bucket/abc/* lists matching objects
+ or subdirs, but not sub-subdirs or objects beneath subdirs.
+
+ Note: In step-by-step comments below we give examples assuming there's a
+ gs://bucket with object paths:
+ abcd/o1.txt
+ abcd/o2.txt
+ xyz/o1.txt
+ xyz/o2.txt
+ and a directory file://dir with file paths:
+ dir/a.txt
+ dir/b.txt
+ dir/c/
+ """
+ self.command_name = command_name
+ self.debug = debug
+ self.logger = logger
+ self.gsutil_api = gsutil_api
+ self.url_strs = url_strs
+ self.recursion_requested = recursion_requested
+ self.all_versions = all_versions
+ # Check self.url_strs.HasPlurality() at start because its value can change
+ # if url_strs is itself an iterator.
+ self.url_strs.has_plurality = self.url_strs.HasPlurality()
+ self.cmd_supports_recursion = cmd_supports_recursion
+ self.project_id = project_id
+ self.continue_on_error = continue_on_error
+
+ # Map holding wildcard strings to use for flat vs subdir-by-subdir listings.
+ # (A flat listing means show all objects expanded all the way down.)
+ self._flatness_wildcard = {True: '**', False: '*'}
+
+ def __iter__(self):
+ """Iterates over all source URLs passed to the iterator.
+
+ For each src url, expands wildcards, object-less bucket names,
+ subdir bucket names, and directory names, and generates a flat listing of
+ all the matching objects/files.
+
+ You should instantiate this object using the static factory function
+ NameExpansionIterator, because consumers of this iterator need the
+ PluralityCheckableIterator wrapper built by that function.
+
+ Yields:
+ gslib.name_expansion.NameExpansionResult.
+
+ Raises:
+ CommandException: if errors encountered.
+ """
+ for url_str in self.url_strs:
+ storage_url = StorageUrlFromString(url_str)
+
+ if storage_url.IsFileUrl() and storage_url.IsStream():
+ if self.url_strs.has_plurality:
+ raise CommandException('Multiple URL strings are not supported '
+ 'with streaming ("-") URLs.')
+ yield NameExpansionResult(storage_url, False, False, storage_url)
+ continue
+
+ # Step 1: Expand any explicitly specified wildcards. The output from this
+ # step is an iterator of BucketListingRef.
+ # Starting with gs://buck*/abc* this step would expand to gs://bucket/abcd
+
+ src_names_bucket = False
+ if (storage_url.IsCloudUrl() and storage_url.IsBucket()
+ and not self.recursion_requested):
+ # UNIX commands like rm and cp will omit directory references.
+ # If url_str refers only to buckets and we are not recursing,
+ # then produce references of type BUCKET, because they are guaranteed
+ # to pass through Step 2 and be omitted in Step 3.
+ post_step1_iter = PluralityCheckableIterator(
+ self.WildcardIterator(url_str).IterBuckets(
+ bucket_fields=['id']))
+ else:
+ # Get a list of objects and prefixes, expanding the top level for
+ # any listed buckets. If our source is a bucket, however, we need
+ # to treat all of the top level expansions as names_container=True.
+ post_step1_iter = PluralityCheckableIterator(
+ self.WildcardIterator(url_str).IterAll(
+ bucket_listing_fields=['name'],
+ expand_top_level_buckets=True))
+ if storage_url.IsCloudUrl() and storage_url.IsBucket():
+ src_names_bucket = True
+
+ # Step 2: Expand bucket subdirs. The output from this
+ # step is an iterator of (names_container, BucketListingRef).
+ # Starting with gs://bucket/abcd this step would expand to:
+ # iter([(True, abcd/o1.txt), (True, abcd/o2.txt)]).
+ subdir_exp_wildcard = self._flatness_wildcard[self.recursion_requested]
+ if self.recursion_requested:
+ post_step2_iter = _ImplicitBucketSubdirIterator(
+ self, post_step1_iter, subdir_exp_wildcard)
+ else:
+ post_step2_iter = _NonContainerTuplifyIterator(post_step1_iter)
+ post_step2_iter = PluralityCheckableIterator(post_step2_iter)
+
+ # Because we actually perform and check object listings here, this will
+ # raise if url_args includes a non-existent object. However,
+ # plurality_checkable_iterator will buffer the exception for us, not
+ # raising it until the iterator is actually asked to yield the first
+ # result.
+ if post_step2_iter.IsEmpty():
+ if self.continue_on_error:
+ try:
+ raise CommandException('No URLs matched: %s' % url_str)
+ except CommandException, e:
+ # Yield a specialized tuple of (exception, stack_trace) to
+ # the wrapping PluralityCheckableIterator.
+ yield (e, sys.exc_info()[2])
+ else:
+ raise CommandException('No URLs matched: %s' % url_str)
+
+ # Step 3. Omit any directories, buckets, or bucket subdirectories for
+ # non-recursive expansions.
+ post_step3_iter = PluralityCheckableIterator(_OmitNonRecursiveIterator(
+ post_step2_iter, self.recursion_requested, self.command_name,
+ self.cmd_supports_recursion, self.logger))
+
+ src_url_expands_to_multi = post_step3_iter.HasPlurality()
+ is_multi_source_request = (self.url_strs.has_plurality
+ or src_url_expands_to_multi)
+
+ # Step 4. Expand directories and buckets. This step yields the iterated
+ # values. Starting with gs://bucket this step would expand to:
+ # [abcd/o1.txt, abcd/o2.txt, xyz/o1.txt, xyz/o2.txt]
+ # Starting with file://dir this step would expand to:
+ # [dir/a.txt, dir/b.txt, dir/c/]
+ for (names_container, blr) in post_step3_iter:
+ src_names_container = src_names_bucket or names_container
+
+ if blr.IsObject():
+ yield NameExpansionResult(
+ storage_url, is_multi_source_request, src_names_container,
+ blr.storage_url)
+ else:
+ # Use implicit wildcarding to do the enumeration.
+ # At this point we are guaranteed that:
+ # - Recursion has been requested because non-object entries are
+ # filtered in step 3 otherwise.
+ # - This is a prefix or bucket subdirectory because only
+ # non-recursive iterations product bucket references.
+ expanded_url = StorageUrlFromString(blr.url_string)
+ if expanded_url.IsFileUrl():
+ # Convert dir to implicit recursive wildcard.
+ url_to_iterate = '%s%s%s' % (blr, os.sep, subdir_exp_wildcard)
+ else:
+ # Convert subdir to implicit recursive wildcard.
+ url_to_iterate = expanded_url.CreatePrefixUrl(
+ wildcard_suffix=subdir_exp_wildcard)
+
+ wc_iter = PluralityCheckableIterator(
+ self.WildcardIterator(url_to_iterate).IterObjects(
+ bucket_listing_fields=['name']))
+ src_url_expands_to_multi = (src_url_expands_to_multi
+ or wc_iter.HasPlurality())
+ is_multi_source_request = (self.url_strs.has_plurality
+ or src_url_expands_to_multi)
+ # This will be a flattened listing of all underlying objects in the
+ # subdir.
+ for blr in wc_iter:
+ yield NameExpansionResult(
+ storage_url, is_multi_source_request, True, blr.storage_url)
+
+ def WildcardIterator(self, url_string):
+ """Helper to instantiate gslib.WildcardIterator.
+
+ Args are same as gslib.WildcardIterator interface, but this method fills
+ in most of the values from instance state.
+
+ Args:
+ url_string: URL string naming wildcard objects to iterate.
+
+ Returns:
+ Wildcard iterator over URL string.
+ """
+ return gslib.wildcard_iterator.CreateWildcardIterator(
+ url_string, self.gsutil_api, debug=self.debug,
+ all_versions=self.all_versions,
+ project_id=self.project_id)
+
+
+def NameExpansionIterator(command_name, debug, logger, gsutil_api, url_strs,
+ recursion_requested, all_versions=False,
+ cmd_supports_recursion=True, project_id=None,
+ continue_on_error=False):
+ """Static factory function for instantiating _NameExpansionIterator.
+
+ This wraps the resulting iterator in a PluralityCheckableIterator and checks
+ that it is non-empty. Also, allows url_strs to be either an array or an
+ iterator.
+
+ Args:
+ command_name: name of command being run.
+ debug: Debug level to pass to underlying iterators (range 0..3).
+ logger: logging.Logger object.
+ gsutil_api: Cloud storage interface. Settable for testing/mocking.
+ url_strs: Iterable URL strings needing expansion.
+ recursion_requested: True if -r specified on command-line. If so,
+ listings will be flattened so mapped-to results contain objects
+ spanning subdirectories.
+ all_versions: Bool indicating whether to iterate over all object versions.
+ cmd_supports_recursion: Bool indicating whether this command supports a '-r'
+ flag. Useful for printing helpful error messages.
+ project_id: Project id to use for the current command.
+ continue_on_error: If true, yield no-match exceptions encountered during
+ iteration instead of raising them.
+
+ Raises:
+ CommandException if underlying iterator is empty.
+
+ Returns:
+ Name expansion iterator instance.
+
+ For example semantics, see comments in NameExpansionIterator.__init__.
+ """
+ url_strs = PluralityCheckableIterator(url_strs)
+ name_expansion_iterator = _NameExpansionIterator(
+ command_name, debug, logger, gsutil_api, url_strs, recursion_requested,
+ all_versions=all_versions, cmd_supports_recursion=cmd_supports_recursion,
+ project_id=project_id, continue_on_error=continue_on_error)
+ name_expansion_iterator = PluralityCheckableIterator(name_expansion_iterator)
+ if name_expansion_iterator.IsEmpty():
+ raise CommandException('No URLs matched')
+ return name_expansion_iterator
+
+
+class NameExpansionIteratorQueue(object):
+ """Wrapper around NameExpansionIterator with Multiprocessing.Queue interface.
+
+ Only a blocking get() function can be called, and the block and timeout
+ params on that function are ignored. All other class functions raise
+ NotImplementedError.
+
+ This class is thread safe.
+ """
+
+ def __init__(self, name_expansion_iterator, final_value):
+ self.name_expansion_iterator = name_expansion_iterator
+ self.final_value = final_value
+ self.lock = multiprocessing.Manager().Lock()
+
+ def qsize(self):
+ raise NotImplementedError(
+ 'NameExpansionIteratorQueue.qsize() not implemented')
+
+ def empty(self):
+ raise NotImplementedError(
+ 'NameExpansionIteratorQueue.empty() not implemented')
+
+ def full(self):
+ raise NotImplementedError(
+ 'NameExpansionIteratorQueue.full() not implemented')
+
+ # pylint: disable=unused-argument
+ def put(self, obj=None, block=None, timeout=None):
+ raise NotImplementedError(
+ 'NameExpansionIteratorQueue.put() not implemented')
+
+ def put_nowait(self, obj):
+ raise NotImplementedError(
+ 'NameExpansionIteratorQueue.put_nowait() not implemented')
+
+ # pylint: disable=unused-argument
+ def get(self, block=None, timeout=None):
+ self.lock.acquire()
+ try:
+ if self.name_expansion_iterator.IsEmpty():
+ return self.final_value
+ return self.name_expansion_iterator.next()
+ finally:
+ self.lock.release()
+
+ def get_nowait(self):
+ raise NotImplementedError(
+ 'NameExpansionIteratorQueue.get_nowait() not implemented')
+
+ def get_no_wait(self):
+ raise NotImplementedError(
+ 'NameExpansionIteratorQueue.get_no_wait() not implemented')
+
+ def close(self):
+ raise NotImplementedError(
+ 'NameExpansionIteratorQueue.close() not implemented')
+
+ def join_thread(self):
+ raise NotImplementedError(
+ 'NameExpansionIteratorQueue.join_thread() not implemented')
+
+ def cancel_join_thread(self):
+ raise NotImplementedError(
+ 'NameExpansionIteratorQueue.cancel_join_thread() not implemented')
+
+
+class _NonContainerTuplifyIterator(object):
+ """Iterator that produces the tuple (False, blr) for each iterated value.
+
+ Used for cases where blr_iter iterates over a set of
+ BucketListingRefs known not to name containers.
+ """
+
+ def __init__(self, blr_iter):
+ """Instantiates iterator.
+
+ Args:
+ blr_iter: iterator of BucketListingRef.
+ """
+ self.blr_iter = blr_iter
+
+ def __iter__(self):
+ for blr in self.blr_iter:
+ yield (False, blr)
+
+
+class _OmitNonRecursiveIterator(object):
+ """Iterator wrapper for that omits certain values for non-recursive requests.
+
+ This iterates over tuples of (names_container, BucketListingReference) and
+ omits directories, prefixes, and buckets from non-recurisve requests
+ so that we can properly calculate whether the source URL expands to multiple
+ URLs.
+
+ For example, if we have a bucket containing two objects: bucket/foo and
+ bucket/foo/bar and we do a non-recursive iteration, only bucket/foo will be
+ yielded.
+ """
+
+ def __init__(self, tuple_iter, recursion_requested, command_name,
+ cmd_supports_recursion, logger):
+ """Instanties the iterator.
+
+ Args:
+ tuple_iter: Iterator over names_container, BucketListingReference
+ from step 2 in the NameExpansionIterator
+ recursion_requested: If false, omit buckets, dirs, and subdirs
+ command_name: Command name for user messages
+ cmd_supports_recursion: Command recursion support for user messages
+ logger: Log object for user messages
+ """
+ self.tuple_iter = tuple_iter
+ self.recursion_requested = recursion_requested
+ self.command_name = command_name
+ self.cmd_supports_recursion = cmd_supports_recursion
+ self.logger = logger
+
+ def __iter__(self):
+ for (names_container, blr) in self.tuple_iter:
+ if not self.recursion_requested and not blr.IsObject():
+ # At this point we either have a bucket or a prefix,
+ # so if recursion is not requested, we're going to omit it.
+ expanded_url = StorageUrlFromString(blr.url_string)
+ if expanded_url.IsFileUrl():
+ desc = 'directory'
+ else:
+ desc = blr.type_name
+ if self.cmd_supports_recursion:
+ self.logger.info(
+ 'Omitting %s "%s". (Did you mean to do %s -r?)',
+ desc, blr.url_string, self.command_name)
+ else:
+ self.logger.info('Omitting %s "%s".', desc, blr.url_string)
+ else:
+ yield (names_container, blr)
+
+
+class _ImplicitBucketSubdirIterator(object):
+ """Iterator wrapper that performs implicit bucket subdir expansion.
+
+ Each iteration yields tuple (names_container, expanded BucketListingRefs)
+ where names_container is true if URL names a directory, bucket,
+ or bucket subdir.
+
+ For example, iterating over [BucketListingRef("gs://abc")] would expand to:
+ [BucketListingRef("gs://abc/o1"), BucketListingRef("gs://abc/o2")]
+ if those subdir objects exist, and [BucketListingRef("gs://abc") otherwise.
+ """
+
+ def __init__(self, name_exp_instance, blr_iter, subdir_exp_wildcard):
+ """Instantiates the iterator.
+
+ Args:
+ name_exp_instance: calling instance of NameExpansion class.
+ blr_iter: iterator over BucketListingRef prefixes and objects.
+ subdir_exp_wildcard: wildcard for expanding subdirectories;
+ expected values are ** if the mapped-to results should contain
+ objects spanning subdirectories, or * if only one level should
+ be listed.
+ """
+ self.blr_iter = blr_iter
+ self.name_exp_instance = name_exp_instance
+ self.subdir_exp_wildcard = subdir_exp_wildcard
+
+ def __iter__(self):
+ for blr in self.blr_iter:
+ if blr.IsPrefix():
+ # This is a bucket subdirectory, list objects according to the wildcard.
+ prefix_url = StorageUrlFromString(blr.url_string).CreatePrefixUrl(
+ wildcard_suffix=self.subdir_exp_wildcard)
+ implicit_subdir_iterator = PluralityCheckableIterator(
+ self.name_exp_instance.WildcardIterator(
+ prefix_url).IterAll(bucket_listing_fields=['name']))
+ if not implicit_subdir_iterator.IsEmpty():
+ for exp_blr in implicit_subdir_iterator:
+ yield (True, exp_blr)
+ else:
+ # Prefix that contains no objects, for example in the $folder$ case
+ # or an empty filesystem directory.
+ yield (False, blr)
+ elif blr.IsObject():
+ yield (False, blr)
+ else:
+ raise CommandException(
+ '_ImplicitBucketSubdirIterator got a bucket reference %s' % blr)
« no previous file with comments | « third_party/gsutil/gslib/ls_helper.py ('k') | third_party/gsutil/gslib/no_op_auth_plugin.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698