Index: third_party/gsutil/gslib/name_expansion.py |
diff --git a/third_party/gsutil/gslib/name_expansion.py b/third_party/gsutil/gslib/name_expansion.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..0d8b6cae9bbb4b1dec88942eeca0ed4c70328497 |
--- /dev/null |
+++ b/third_party/gsutil/gslib/name_expansion.py |
@@ -0,0 +1,530 @@ |
+# -*- coding: utf-8 -*- |
+# Copyright 2012 Google Inc. All Rights Reserved. |
+# |
+# Licensed under the Apache License, Version 2.0 (the "License"); |
+# you may not use this file except in compliance with the License. |
+# You may obtain a copy of the License at |
+# |
+# http://www.apache.org/licenses/LICENSE-2.0 |
+# |
+# Unless required by applicable law or agreed to in writing, software |
+# distributed under the License is distributed on an "AS IS" BASIS, |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
+# See the License for the specific language governing permissions and |
+# limitations under the License. |
+"""Name expansion iterator and result classes. |
+ |
+Name expansion support for the various ways gsutil lets users refer to |
+collections of data (via explicit wildcarding as well as directory, |
+bucket, and bucket subdir implicit wildcarding). This class encapsulates |
+the various rules for determining how these expansions are done. |
+""" |
+ |
+# Disable warnings for NameExpansionIteratorQueue functions; they implement |
+# an interface which does not follow lint guidelines. |
+# pylint: disable=invalid-name |
+ |
+from __future__ import absolute_import |
+ |
+import multiprocessing |
+import os |
+import sys |
+ |
+from gslib.exception import CommandException |
+from gslib.plurality_checkable_iterator import PluralityCheckableIterator |
+import gslib.wildcard_iterator |
+from gslib.wildcard_iterator import StorageUrlFromString |
+ |
+ |
+class NameExpansionResult(object): |
+ """Holds one fully expanded result from iterating over NameExpansionIterator. |
+ |
+ The member data in this class need to be pickleable because |
+ NameExpansionResult instances are passed through Multiprocessing.Queue. In |
+ particular, don't include any boto state like StorageUri, since that pulls |
+ in a big tree of objects, some of which aren't pickleable (and even if |
+ they were, pickling/unpickling such a large object tree would result in |
+ significant overhead). |
+ |
+ The state held in this object is needed for handling the various naming cases |
+ (e.g., copying from a single source URL to a directory generates different |
+ dest URL names than copying multiple URLs to a directory, to be consistent |
+ with naming rules used by the Unix cp command). For more details see comments |
+ in _NameExpansionIterator. |
+ """ |
+ |
+ def __init__(self, source_storage_url, is_multi_source_request, |
+ names_container, expanded_storage_url): |
+ """Instantiates a result from name expansion. |
+ |
+ Args: |
+ source_storage_url: StorageUrl that was being expanded. |
+ is_multi_source_request: bool indicator whether src_url_str expanded to |
+ more than one BucketListingRef. |
+ names_container: Bool indicator whether src_url names a container. |
+ expanded_storage_url: StorageUrl that was expanded. |
+ """ |
+ self.source_storage_url = source_storage_url |
+ self.is_multi_source_request = is_multi_source_request |
+ self.names_container = names_container |
+ self.expanded_storage_url = expanded_storage_url |
+ |
+ def __repr__(self): |
+ return '%s' % self._expanded_storage_url |
+ |
+ |
+class _NameExpansionIterator(object): |
+ """Class that iterates over all source URLs passed to the iterator. |
+ |
+ See details in __iter__ function doc. |
+ """ |
+ |
+ def __init__(self, command_name, debug, logger, gsutil_api, url_strs, |
+ recursion_requested, all_versions=False, |
+ cmd_supports_recursion=True, project_id=None, |
+ continue_on_error=False): |
+ """Creates a NameExpansionIterator. |
+ |
+ Args: |
+ command_name: name of command being run. |
+ debug: Debug level to pass to underlying iterators (range 0..3). |
+ logger: logging.Logger object. |
+ gsutil_api: Cloud storage interface. Settable for testing/mocking. |
+ url_strs: PluralityCheckableIterator of URL strings needing expansion. |
+ recursion_requested: True if -r specified on command-line. If so, |
+ listings will be flattened so mapped-to results contain objects |
+ spanning subdirectories. |
+ all_versions: Bool indicating whether to iterate over all object versions. |
+ cmd_supports_recursion: Bool indicating whether this command supports a |
+ '-r' flag. Useful for printing helpful error messages. |
+ project_id: Project id to use for bucket retrieval. |
+ continue_on_error: If true, yield no-match exceptions encountered during |
+ iteration instead of raising them. |
+ |
+ Examples of _NameExpansionIterator with recursion_requested=True: |
+ - Calling with one of the url_strs being 'gs://bucket' will enumerate all |
+ top-level objects, as will 'gs://bucket/' and 'gs://bucket/*'. |
+ - 'gs://bucket/**' will enumerate all objects in the bucket. |
+ - 'gs://bucket/abc' will enumerate either the single object abc or, if |
+ abc is a subdirectory, all objects under abc and any of its |
+ subdirectories. |
+ - 'gs://bucket/abc/**' will enumerate all objects under abc or any of its |
+ subdirectories. |
+ - 'file:///tmp' will enumerate all files under /tmp, as will |
+ 'file:///tmp/*' |
+ - 'file:///tmp/**' will enumerate all files under /tmp or any of its |
+ subdirectories. |
+ |
+ Example if recursion_requested=False: |
+ calling with gs://bucket/abc/* lists matching objects |
+ or subdirs, but not sub-subdirs or objects beneath subdirs. |
+ |
+ Note: In step-by-step comments below we give examples assuming there's a |
+ gs://bucket with object paths: |
+ abcd/o1.txt |
+ abcd/o2.txt |
+ xyz/o1.txt |
+ xyz/o2.txt |
+ and a directory file://dir with file paths: |
+ dir/a.txt |
+ dir/b.txt |
+ dir/c/ |
+ """ |
+ self.command_name = command_name |
+ self.debug = debug |
+ self.logger = logger |
+ self.gsutil_api = gsutil_api |
+ self.url_strs = url_strs |
+ self.recursion_requested = recursion_requested |
+ self.all_versions = all_versions |
+ # Check self.url_strs.HasPlurality() at start because its value can change |
+ # if url_strs is itself an iterator. |
+ self.url_strs.has_plurality = self.url_strs.HasPlurality() |
+ self.cmd_supports_recursion = cmd_supports_recursion |
+ self.project_id = project_id |
+ self.continue_on_error = continue_on_error |
+ |
+ # Map holding wildcard strings to use for flat vs subdir-by-subdir listings. |
+ # (A flat listing means show all objects expanded all the way down.) |
+ self._flatness_wildcard = {True: '**', False: '*'} |
+ |
+ def __iter__(self): |
+ """Iterates over all source URLs passed to the iterator. |
+ |
+ For each src url, expands wildcards, object-less bucket names, |
+ subdir bucket names, and directory names, and generates a flat listing of |
+ all the matching objects/files. |
+ |
+ You should instantiate this object using the static factory function |
+ NameExpansionIterator, because consumers of this iterator need the |
+ PluralityCheckableIterator wrapper built by that function. |
+ |
+ Yields: |
+ gslib.name_expansion.NameExpansionResult. |
+ |
+ Raises: |
+ CommandException: if errors encountered. |
+ """ |
+ for url_str in self.url_strs: |
+ storage_url = StorageUrlFromString(url_str) |
+ |
+ if storage_url.IsFileUrl() and storage_url.IsStream(): |
+ if self.url_strs.has_plurality: |
+ raise CommandException('Multiple URL strings are not supported ' |
+ 'with streaming ("-") URLs.') |
+ yield NameExpansionResult(storage_url, False, False, storage_url) |
+ continue |
+ |
+ # Step 1: Expand any explicitly specified wildcards. The output from this |
+ # step is an iterator of BucketListingRef. |
+ # Starting with gs://buck*/abc* this step would expand to gs://bucket/abcd |
+ |
+ src_names_bucket = False |
+ if (storage_url.IsCloudUrl() and storage_url.IsBucket() |
+ and not self.recursion_requested): |
+ # UNIX commands like rm and cp will omit directory references. |
+ # If url_str refers only to buckets and we are not recursing, |
+ # then produce references of type BUCKET, because they are guaranteed |
+ # to pass through Step 2 and be omitted in Step 3. |
+ post_step1_iter = PluralityCheckableIterator( |
+ self.WildcardIterator(url_str).IterBuckets( |
+ bucket_fields=['id'])) |
+ else: |
+ # Get a list of objects and prefixes, expanding the top level for |
+ # any listed buckets. If our source is a bucket, however, we need |
+ # to treat all of the top level expansions as names_container=True. |
+ post_step1_iter = PluralityCheckableIterator( |
+ self.WildcardIterator(url_str).IterAll( |
+ bucket_listing_fields=['name'], |
+ expand_top_level_buckets=True)) |
+ if storage_url.IsCloudUrl() and storage_url.IsBucket(): |
+ src_names_bucket = True |
+ |
+ # Step 2: Expand bucket subdirs. The output from this |
+ # step is an iterator of (names_container, BucketListingRef). |
+ # Starting with gs://bucket/abcd this step would expand to: |
+ # iter([(True, abcd/o1.txt), (True, abcd/o2.txt)]). |
+ subdir_exp_wildcard = self._flatness_wildcard[self.recursion_requested] |
+ if self.recursion_requested: |
+ post_step2_iter = _ImplicitBucketSubdirIterator( |
+ self, post_step1_iter, subdir_exp_wildcard) |
+ else: |
+ post_step2_iter = _NonContainerTuplifyIterator(post_step1_iter) |
+ post_step2_iter = PluralityCheckableIterator(post_step2_iter) |
+ |
+ # Because we actually perform and check object listings here, this will |
+ # raise if url_args includes a non-existent object. However, |
+ # plurality_checkable_iterator will buffer the exception for us, not |
+ # raising it until the iterator is actually asked to yield the first |
+ # result. |
+ if post_step2_iter.IsEmpty(): |
+ if self.continue_on_error: |
+ try: |
+ raise CommandException('No URLs matched: %s' % url_str) |
+ except CommandException, e: |
+ # Yield a specialized tuple of (exception, stack_trace) to |
+ # the wrapping PluralityCheckableIterator. |
+ yield (e, sys.exc_info()[2]) |
+ else: |
+ raise CommandException('No URLs matched: %s' % url_str) |
+ |
+ # Step 3. Omit any directories, buckets, or bucket subdirectories for |
+ # non-recursive expansions. |
+ post_step3_iter = PluralityCheckableIterator(_OmitNonRecursiveIterator( |
+ post_step2_iter, self.recursion_requested, self.command_name, |
+ self.cmd_supports_recursion, self.logger)) |
+ |
+ src_url_expands_to_multi = post_step3_iter.HasPlurality() |
+ is_multi_source_request = (self.url_strs.has_plurality |
+ or src_url_expands_to_multi) |
+ |
+ # Step 4. Expand directories and buckets. This step yields the iterated |
+ # values. Starting with gs://bucket this step would expand to: |
+ # [abcd/o1.txt, abcd/o2.txt, xyz/o1.txt, xyz/o2.txt] |
+ # Starting with file://dir this step would expand to: |
+ # [dir/a.txt, dir/b.txt, dir/c/] |
+ for (names_container, blr) in post_step3_iter: |
+ src_names_container = src_names_bucket or names_container |
+ |
+ if blr.IsObject(): |
+ yield NameExpansionResult( |
+ storage_url, is_multi_source_request, src_names_container, |
+ blr.storage_url) |
+ else: |
+ # Use implicit wildcarding to do the enumeration. |
+ # At this point we are guaranteed that: |
+ # - Recursion has been requested because non-object entries are |
+ # filtered in step 3 otherwise. |
+ # - This is a prefix or bucket subdirectory because only |
+ # non-recursive iterations product bucket references. |
+ expanded_url = StorageUrlFromString(blr.url_string) |
+ if expanded_url.IsFileUrl(): |
+ # Convert dir to implicit recursive wildcard. |
+ url_to_iterate = '%s%s%s' % (blr, os.sep, subdir_exp_wildcard) |
+ else: |
+ # Convert subdir to implicit recursive wildcard. |
+ url_to_iterate = expanded_url.CreatePrefixUrl( |
+ wildcard_suffix=subdir_exp_wildcard) |
+ |
+ wc_iter = PluralityCheckableIterator( |
+ self.WildcardIterator(url_to_iterate).IterObjects( |
+ bucket_listing_fields=['name'])) |
+ src_url_expands_to_multi = (src_url_expands_to_multi |
+ or wc_iter.HasPlurality()) |
+ is_multi_source_request = (self.url_strs.has_plurality |
+ or src_url_expands_to_multi) |
+ # This will be a flattened listing of all underlying objects in the |
+ # subdir. |
+ for blr in wc_iter: |
+ yield NameExpansionResult( |
+ storage_url, is_multi_source_request, True, blr.storage_url) |
+ |
+ def WildcardIterator(self, url_string): |
+ """Helper to instantiate gslib.WildcardIterator. |
+ |
+ Args are same as gslib.WildcardIterator interface, but this method fills |
+ in most of the values from instance state. |
+ |
+ Args: |
+ url_string: URL string naming wildcard objects to iterate. |
+ |
+ Returns: |
+ Wildcard iterator over URL string. |
+ """ |
+ return gslib.wildcard_iterator.CreateWildcardIterator( |
+ url_string, self.gsutil_api, debug=self.debug, |
+ all_versions=self.all_versions, |
+ project_id=self.project_id) |
+ |
+ |
+def NameExpansionIterator(command_name, debug, logger, gsutil_api, url_strs, |
+ recursion_requested, all_versions=False, |
+ cmd_supports_recursion=True, project_id=None, |
+ continue_on_error=False): |
+ """Static factory function for instantiating _NameExpansionIterator. |
+ |
+ This wraps the resulting iterator in a PluralityCheckableIterator and checks |
+ that it is non-empty. Also, allows url_strs to be either an array or an |
+ iterator. |
+ |
+ Args: |
+ command_name: name of command being run. |
+ debug: Debug level to pass to underlying iterators (range 0..3). |
+ logger: logging.Logger object. |
+ gsutil_api: Cloud storage interface. Settable for testing/mocking. |
+ url_strs: Iterable URL strings needing expansion. |
+ recursion_requested: True if -r specified on command-line. If so, |
+ listings will be flattened so mapped-to results contain objects |
+ spanning subdirectories. |
+ all_versions: Bool indicating whether to iterate over all object versions. |
+ cmd_supports_recursion: Bool indicating whether this command supports a '-r' |
+ flag. Useful for printing helpful error messages. |
+ project_id: Project id to use for the current command. |
+ continue_on_error: If true, yield no-match exceptions encountered during |
+ iteration instead of raising them. |
+ |
+ Raises: |
+ CommandException if underlying iterator is empty. |
+ |
+ Returns: |
+ Name expansion iterator instance. |
+ |
+ For example semantics, see comments in NameExpansionIterator.__init__. |
+ """ |
+ url_strs = PluralityCheckableIterator(url_strs) |
+ name_expansion_iterator = _NameExpansionIterator( |
+ command_name, debug, logger, gsutil_api, url_strs, recursion_requested, |
+ all_versions=all_versions, cmd_supports_recursion=cmd_supports_recursion, |
+ project_id=project_id, continue_on_error=continue_on_error) |
+ name_expansion_iterator = PluralityCheckableIterator(name_expansion_iterator) |
+ if name_expansion_iterator.IsEmpty(): |
+ raise CommandException('No URLs matched') |
+ return name_expansion_iterator |
+ |
+ |
+class NameExpansionIteratorQueue(object): |
+ """Wrapper around NameExpansionIterator with Multiprocessing.Queue interface. |
+ |
+ Only a blocking get() function can be called, and the block and timeout |
+ params on that function are ignored. All other class functions raise |
+ NotImplementedError. |
+ |
+ This class is thread safe. |
+ """ |
+ |
+ def __init__(self, name_expansion_iterator, final_value): |
+ self.name_expansion_iterator = name_expansion_iterator |
+ self.final_value = final_value |
+ self.lock = multiprocessing.Manager().Lock() |
+ |
+ def qsize(self): |
+ raise NotImplementedError( |
+ 'NameExpansionIteratorQueue.qsize() not implemented') |
+ |
+ def empty(self): |
+ raise NotImplementedError( |
+ 'NameExpansionIteratorQueue.empty() not implemented') |
+ |
+ def full(self): |
+ raise NotImplementedError( |
+ 'NameExpansionIteratorQueue.full() not implemented') |
+ |
+ # pylint: disable=unused-argument |
+ def put(self, obj=None, block=None, timeout=None): |
+ raise NotImplementedError( |
+ 'NameExpansionIteratorQueue.put() not implemented') |
+ |
+ def put_nowait(self, obj): |
+ raise NotImplementedError( |
+ 'NameExpansionIteratorQueue.put_nowait() not implemented') |
+ |
+ # pylint: disable=unused-argument |
+ def get(self, block=None, timeout=None): |
+ self.lock.acquire() |
+ try: |
+ if self.name_expansion_iterator.IsEmpty(): |
+ return self.final_value |
+ return self.name_expansion_iterator.next() |
+ finally: |
+ self.lock.release() |
+ |
+ def get_nowait(self): |
+ raise NotImplementedError( |
+ 'NameExpansionIteratorQueue.get_nowait() not implemented') |
+ |
+ def get_no_wait(self): |
+ raise NotImplementedError( |
+ 'NameExpansionIteratorQueue.get_no_wait() not implemented') |
+ |
+ def close(self): |
+ raise NotImplementedError( |
+ 'NameExpansionIteratorQueue.close() not implemented') |
+ |
+ def join_thread(self): |
+ raise NotImplementedError( |
+ 'NameExpansionIteratorQueue.join_thread() not implemented') |
+ |
+ def cancel_join_thread(self): |
+ raise NotImplementedError( |
+ 'NameExpansionIteratorQueue.cancel_join_thread() not implemented') |
+ |
+ |
+class _NonContainerTuplifyIterator(object): |
+ """Iterator that produces the tuple (False, blr) for each iterated value. |
+ |
+ Used for cases where blr_iter iterates over a set of |
+ BucketListingRefs known not to name containers. |
+ """ |
+ |
+ def __init__(self, blr_iter): |
+ """Instantiates iterator. |
+ |
+ Args: |
+ blr_iter: iterator of BucketListingRef. |
+ """ |
+ self.blr_iter = blr_iter |
+ |
+ def __iter__(self): |
+ for blr in self.blr_iter: |
+ yield (False, blr) |
+ |
+ |
+class _OmitNonRecursiveIterator(object): |
+ """Iterator wrapper for that omits certain values for non-recursive requests. |
+ |
+ This iterates over tuples of (names_container, BucketListingReference) and |
+ omits directories, prefixes, and buckets from non-recurisve requests |
+ so that we can properly calculate whether the source URL expands to multiple |
+ URLs. |
+ |
+ For example, if we have a bucket containing two objects: bucket/foo and |
+ bucket/foo/bar and we do a non-recursive iteration, only bucket/foo will be |
+ yielded. |
+ """ |
+ |
+ def __init__(self, tuple_iter, recursion_requested, command_name, |
+ cmd_supports_recursion, logger): |
+ """Instanties the iterator. |
+ |
+ Args: |
+ tuple_iter: Iterator over names_container, BucketListingReference |
+ from step 2 in the NameExpansionIterator |
+ recursion_requested: If false, omit buckets, dirs, and subdirs |
+ command_name: Command name for user messages |
+ cmd_supports_recursion: Command recursion support for user messages |
+ logger: Log object for user messages |
+ """ |
+ self.tuple_iter = tuple_iter |
+ self.recursion_requested = recursion_requested |
+ self.command_name = command_name |
+ self.cmd_supports_recursion = cmd_supports_recursion |
+ self.logger = logger |
+ |
+ def __iter__(self): |
+ for (names_container, blr) in self.tuple_iter: |
+ if not self.recursion_requested and not blr.IsObject(): |
+ # At this point we either have a bucket or a prefix, |
+ # so if recursion is not requested, we're going to omit it. |
+ expanded_url = StorageUrlFromString(blr.url_string) |
+ if expanded_url.IsFileUrl(): |
+ desc = 'directory' |
+ else: |
+ desc = blr.type_name |
+ if self.cmd_supports_recursion: |
+ self.logger.info( |
+ 'Omitting %s "%s". (Did you mean to do %s -r?)', |
+ desc, blr.url_string, self.command_name) |
+ else: |
+ self.logger.info('Omitting %s "%s".', desc, blr.url_string) |
+ else: |
+ yield (names_container, blr) |
+ |
+ |
+class _ImplicitBucketSubdirIterator(object): |
+ """Iterator wrapper that performs implicit bucket subdir expansion. |
+ |
+ Each iteration yields tuple (names_container, expanded BucketListingRefs) |
+ where names_container is true if URL names a directory, bucket, |
+ or bucket subdir. |
+ |
+ For example, iterating over [BucketListingRef("gs://abc")] would expand to: |
+ [BucketListingRef("gs://abc/o1"), BucketListingRef("gs://abc/o2")] |
+ if those subdir objects exist, and [BucketListingRef("gs://abc") otherwise. |
+ """ |
+ |
+ def __init__(self, name_exp_instance, blr_iter, subdir_exp_wildcard): |
+ """Instantiates the iterator. |
+ |
+ Args: |
+ name_exp_instance: calling instance of NameExpansion class. |
+ blr_iter: iterator over BucketListingRef prefixes and objects. |
+ subdir_exp_wildcard: wildcard for expanding subdirectories; |
+ expected values are ** if the mapped-to results should contain |
+ objects spanning subdirectories, or * if only one level should |
+ be listed. |
+ """ |
+ self.blr_iter = blr_iter |
+ self.name_exp_instance = name_exp_instance |
+ self.subdir_exp_wildcard = subdir_exp_wildcard |
+ |
+ def __iter__(self): |
+ for blr in self.blr_iter: |
+ if blr.IsPrefix(): |
+ # This is a bucket subdirectory, list objects according to the wildcard. |
+ prefix_url = StorageUrlFromString(blr.url_string).CreatePrefixUrl( |
+ wildcard_suffix=self.subdir_exp_wildcard) |
+ implicit_subdir_iterator = PluralityCheckableIterator( |
+ self.name_exp_instance.WildcardIterator( |
+ prefix_url).IterAll(bucket_listing_fields=['name'])) |
+ if not implicit_subdir_iterator.IsEmpty(): |
+ for exp_blr in implicit_subdir_iterator: |
+ yield (True, exp_blr) |
+ else: |
+ # Prefix that contains no objects, for example in the $folder$ case |
+ # or an empty filesystem directory. |
+ yield (False, blr) |
+ elif blr.IsObject(): |
+ yield (False, blr) |
+ else: |
+ raise CommandException( |
+ '_ImplicitBucketSubdirIterator got a bucket reference %s' % blr) |