OLD | NEW |
| (Empty) |
1 # -*- coding: utf-8 -*- | |
2 # Copyright 2012 Google Inc. All Rights Reserved. | |
3 # | |
4 # Licensed under the Apache License, Version 2.0 (the "License"); | |
5 # you may not use this file except in compliance with the License. | |
6 # You may obtain a copy of the License at | |
7 # | |
8 # http://www.apache.org/licenses/LICENSE-2.0 | |
9 # | |
10 # Unless required by applicable law or agreed to in writing, software | |
11 # distributed under the License is distributed on an "AS IS" BASIS, | |
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 # See the License for the specific language governing permissions and | |
14 # limitations under the License. | |
15 """Name expansion iterator and result classes. | |
16 | |
17 Name expansion support for the various ways gsutil lets users refer to | |
18 collections of data (via explicit wildcarding as well as directory, | |
19 bucket, and bucket subdir implicit wildcarding). This class encapsulates | |
20 the various rules for determining how these expansions are done. | |
21 """ | |
22 | |
23 # Disable warnings for NameExpansionIteratorQueue functions; they implement | |
24 # an interface which does not follow lint guidelines. | |
25 # pylint: disable=invalid-name | |
26 | |
27 from __future__ import absolute_import | |
28 | |
29 import multiprocessing | |
30 import os | |
31 import sys | |
32 | |
33 from gslib.exception import CommandException | |
34 from gslib.plurality_checkable_iterator import PluralityCheckableIterator | |
35 import gslib.wildcard_iterator | |
36 from gslib.wildcard_iterator import StorageUrlFromString | |
37 | |
38 | |
39 class NameExpansionResult(object): | |
40 """Holds one fully expanded result from iterating over NameExpansionIterator. | |
41 | |
42 The member data in this class need to be pickleable because | |
43 NameExpansionResult instances are passed through Multiprocessing.Queue. In | |
44 particular, don't include any boto state like StorageUri, since that pulls | |
45 in a big tree of objects, some of which aren't pickleable (and even if | |
46 they were, pickling/unpickling such a large object tree would result in | |
47 significant overhead). | |
48 | |
49 The state held in this object is needed for handling the various naming cases | |
50 (e.g., copying from a single source URL to a directory generates different | |
51 dest URL names than copying multiple URLs to a directory, to be consistent | |
52 with naming rules used by the Unix cp command). For more details see comments | |
53 in _NameExpansionIterator. | |
54 """ | |
55 | |
56 def __init__(self, source_storage_url, is_multi_source_request, | |
57 names_container, expanded_storage_url): | |
58 """Instantiates a result from name expansion. | |
59 | |
60 Args: | |
61 source_storage_url: StorageUrl that was being expanded. | |
62 is_multi_source_request: bool indicator whether src_url_str expanded to | |
63 more than one BucketListingRef. | |
64 names_container: Bool indicator whether src_url names a container. | |
65 expanded_storage_url: StorageUrl that was expanded. | |
66 """ | |
67 self.source_storage_url = source_storage_url | |
68 self.is_multi_source_request = is_multi_source_request | |
69 self.names_container = names_container | |
70 self.expanded_storage_url = expanded_storage_url | |
71 | |
72 def __repr__(self): | |
73 return '%s' % self._expanded_storage_url | |
74 | |
75 | |
76 class _NameExpansionIterator(object): | |
77 """Class that iterates over all source URLs passed to the iterator. | |
78 | |
79 See details in __iter__ function doc. | |
80 """ | |
81 | |
82 def __init__(self, command_name, debug, logger, gsutil_api, url_strs, | |
83 recursion_requested, all_versions=False, | |
84 cmd_supports_recursion=True, project_id=None, | |
85 continue_on_error=False): | |
86 """Creates a NameExpansionIterator. | |
87 | |
88 Args: | |
89 command_name: name of command being run. | |
90 debug: Debug level to pass to underlying iterators (range 0..3). | |
91 logger: logging.Logger object. | |
92 gsutil_api: Cloud storage interface. Settable for testing/mocking. | |
93 url_strs: PluralityCheckableIterator of URL strings needing expansion. | |
94 recursion_requested: True if -r specified on command-line. If so, | |
95 listings will be flattened so mapped-to results contain objects | |
96 spanning subdirectories. | |
97 all_versions: Bool indicating whether to iterate over all object versions. | |
98 cmd_supports_recursion: Bool indicating whether this command supports a | |
99 '-r' flag. Useful for printing helpful error messages. | |
100 project_id: Project id to use for bucket retrieval. | |
101 continue_on_error: If true, yield no-match exceptions encountered during | |
102 iteration instead of raising them. | |
103 | |
104 Examples of _NameExpansionIterator with recursion_requested=True: | |
105 - Calling with one of the url_strs being 'gs://bucket' will enumerate all | |
106 top-level objects, as will 'gs://bucket/' and 'gs://bucket/*'. | |
107 - 'gs://bucket/**' will enumerate all objects in the bucket. | |
108 - 'gs://bucket/abc' will enumerate either the single object abc or, if | |
109 abc is a subdirectory, all objects under abc and any of its | |
110 subdirectories. | |
111 - 'gs://bucket/abc/**' will enumerate all objects under abc or any of its | |
112 subdirectories. | |
113 - 'file:///tmp' will enumerate all files under /tmp, as will | |
114 'file:///tmp/*' | |
115 - 'file:///tmp/**' will enumerate all files under /tmp or any of its | |
116 subdirectories. | |
117 | |
118 Example if recursion_requested=False: | |
119 calling with gs://bucket/abc/* lists matching objects | |
120 or subdirs, but not sub-subdirs or objects beneath subdirs. | |
121 | |
122 Note: In step-by-step comments below we give examples assuming there's a | |
123 gs://bucket with object paths: | |
124 abcd/o1.txt | |
125 abcd/o2.txt | |
126 xyz/o1.txt | |
127 xyz/o2.txt | |
128 and a directory file://dir with file paths: | |
129 dir/a.txt | |
130 dir/b.txt | |
131 dir/c/ | |
132 """ | |
133 self.command_name = command_name | |
134 self.debug = debug | |
135 self.logger = logger | |
136 self.gsutil_api = gsutil_api | |
137 self.url_strs = url_strs | |
138 self.recursion_requested = recursion_requested | |
139 self.all_versions = all_versions | |
140 # Check self.url_strs.HasPlurality() at start because its value can change | |
141 # if url_strs is itself an iterator. | |
142 self.url_strs.has_plurality = self.url_strs.HasPlurality() | |
143 self.cmd_supports_recursion = cmd_supports_recursion | |
144 self.project_id = project_id | |
145 self.continue_on_error = continue_on_error | |
146 | |
147 # Map holding wildcard strings to use for flat vs subdir-by-subdir listings. | |
148 # (A flat listing means show all objects expanded all the way down.) | |
149 self._flatness_wildcard = {True: '**', False: '*'} | |
150 | |
151 def __iter__(self): | |
152 """Iterates over all source URLs passed to the iterator. | |
153 | |
154 For each src url, expands wildcards, object-less bucket names, | |
155 subdir bucket names, and directory names, and generates a flat listing of | |
156 all the matching objects/files. | |
157 | |
158 You should instantiate this object using the static factory function | |
159 NameExpansionIterator, because consumers of this iterator need the | |
160 PluralityCheckableIterator wrapper built by that function. | |
161 | |
162 Yields: | |
163 gslib.name_expansion.NameExpansionResult. | |
164 | |
165 Raises: | |
166 CommandException: if errors encountered. | |
167 """ | |
168 for url_str in self.url_strs: | |
169 storage_url = StorageUrlFromString(url_str) | |
170 | |
171 if storage_url.IsFileUrl() and storage_url.IsStream(): | |
172 if self.url_strs.has_plurality: | |
173 raise CommandException('Multiple URL strings are not supported ' | |
174 'with streaming ("-") URLs.') | |
175 yield NameExpansionResult(storage_url, False, False, storage_url) | |
176 continue | |
177 | |
178 # Step 1: Expand any explicitly specified wildcards. The output from this | |
179 # step is an iterator of BucketListingRef. | |
180 # Starting with gs://buck*/abc* this step would expand to gs://bucket/abcd | |
181 | |
182 src_names_bucket = False | |
183 if (storage_url.IsCloudUrl() and storage_url.IsBucket() | |
184 and not self.recursion_requested): | |
185 # UNIX commands like rm and cp will omit directory references. | |
186 # If url_str refers only to buckets and we are not recursing, | |
187 # then produce references of type BUCKET, because they are guaranteed | |
188 # to pass through Step 2 and be omitted in Step 3. | |
189 post_step1_iter = PluralityCheckableIterator( | |
190 self.WildcardIterator(url_str).IterBuckets( | |
191 bucket_fields=['id'])) | |
192 else: | |
193 # Get a list of objects and prefixes, expanding the top level for | |
194 # any listed buckets. If our source is a bucket, however, we need | |
195 # to treat all of the top level expansions as names_container=True. | |
196 post_step1_iter = PluralityCheckableIterator( | |
197 self.WildcardIterator(url_str).IterAll( | |
198 bucket_listing_fields=['name'], | |
199 expand_top_level_buckets=True)) | |
200 if storage_url.IsCloudUrl() and storage_url.IsBucket(): | |
201 src_names_bucket = True | |
202 | |
203 # Step 2: Expand bucket subdirs. The output from this | |
204 # step is an iterator of (names_container, BucketListingRef). | |
205 # Starting with gs://bucket/abcd this step would expand to: | |
206 # iter([(True, abcd/o1.txt), (True, abcd/o2.txt)]). | |
207 subdir_exp_wildcard = self._flatness_wildcard[self.recursion_requested] | |
208 if self.recursion_requested: | |
209 post_step2_iter = _ImplicitBucketSubdirIterator( | |
210 self, post_step1_iter, subdir_exp_wildcard) | |
211 else: | |
212 post_step2_iter = _NonContainerTuplifyIterator(post_step1_iter) | |
213 post_step2_iter = PluralityCheckableIterator(post_step2_iter) | |
214 | |
215 # Because we actually perform and check object listings here, this will | |
216 # raise if url_args includes a non-existent object. However, | |
217 # plurality_checkable_iterator will buffer the exception for us, not | |
218 # raising it until the iterator is actually asked to yield the first | |
219 # result. | |
220 if post_step2_iter.IsEmpty(): | |
221 if self.continue_on_error: | |
222 try: | |
223 raise CommandException('No URLs matched: %s' % url_str) | |
224 except CommandException, e: | |
225 # Yield a specialized tuple of (exception, stack_trace) to | |
226 # the wrapping PluralityCheckableIterator. | |
227 yield (e, sys.exc_info()[2]) | |
228 else: | |
229 raise CommandException('No URLs matched: %s' % url_str) | |
230 | |
231 # Step 3. Omit any directories, buckets, or bucket subdirectories for | |
232 # non-recursive expansions. | |
233 post_step3_iter = PluralityCheckableIterator(_OmitNonRecursiveIterator( | |
234 post_step2_iter, self.recursion_requested, self.command_name, | |
235 self.cmd_supports_recursion, self.logger)) | |
236 | |
237 src_url_expands_to_multi = post_step3_iter.HasPlurality() | |
238 is_multi_source_request = (self.url_strs.has_plurality | |
239 or src_url_expands_to_multi) | |
240 | |
241 # Step 4. Expand directories and buckets. This step yields the iterated | |
242 # values. Starting with gs://bucket this step would expand to: | |
243 # [abcd/o1.txt, abcd/o2.txt, xyz/o1.txt, xyz/o2.txt] | |
244 # Starting with file://dir this step would expand to: | |
245 # [dir/a.txt, dir/b.txt, dir/c/] | |
246 for (names_container, blr) in post_step3_iter: | |
247 src_names_container = src_names_bucket or names_container | |
248 | |
249 if blr.IsObject(): | |
250 yield NameExpansionResult( | |
251 storage_url, is_multi_source_request, src_names_container, | |
252 blr.storage_url) | |
253 else: | |
254 # Use implicit wildcarding to do the enumeration. | |
255 # At this point we are guaranteed that: | |
256 # - Recursion has been requested because non-object entries are | |
257 # filtered in step 3 otherwise. | |
258 # - This is a prefix or bucket subdirectory because only | |
259 # non-recursive iterations product bucket references. | |
260 expanded_url = StorageUrlFromString(blr.url_string) | |
261 if expanded_url.IsFileUrl(): | |
262 # Convert dir to implicit recursive wildcard. | |
263 url_to_iterate = '%s%s%s' % (blr, os.sep, subdir_exp_wildcard) | |
264 else: | |
265 # Convert subdir to implicit recursive wildcard. | |
266 url_to_iterate = expanded_url.CreatePrefixUrl( | |
267 wildcard_suffix=subdir_exp_wildcard) | |
268 | |
269 wc_iter = PluralityCheckableIterator( | |
270 self.WildcardIterator(url_to_iterate).IterObjects( | |
271 bucket_listing_fields=['name'])) | |
272 src_url_expands_to_multi = (src_url_expands_to_multi | |
273 or wc_iter.HasPlurality()) | |
274 is_multi_source_request = (self.url_strs.has_plurality | |
275 or src_url_expands_to_multi) | |
276 # This will be a flattened listing of all underlying objects in the | |
277 # subdir. | |
278 for blr in wc_iter: | |
279 yield NameExpansionResult( | |
280 storage_url, is_multi_source_request, True, blr.storage_url) | |
281 | |
282 def WildcardIterator(self, url_string): | |
283 """Helper to instantiate gslib.WildcardIterator. | |
284 | |
285 Args are same as gslib.WildcardIterator interface, but this method fills | |
286 in most of the values from instance state. | |
287 | |
288 Args: | |
289 url_string: URL string naming wildcard objects to iterate. | |
290 | |
291 Returns: | |
292 Wildcard iterator over URL string. | |
293 """ | |
294 return gslib.wildcard_iterator.CreateWildcardIterator( | |
295 url_string, self.gsutil_api, debug=self.debug, | |
296 all_versions=self.all_versions, | |
297 project_id=self.project_id) | |
298 | |
299 | |
300 def NameExpansionIterator(command_name, debug, logger, gsutil_api, url_strs, | |
301 recursion_requested, all_versions=False, | |
302 cmd_supports_recursion=True, project_id=None, | |
303 continue_on_error=False): | |
304 """Static factory function for instantiating _NameExpansionIterator. | |
305 | |
306 This wraps the resulting iterator in a PluralityCheckableIterator and checks | |
307 that it is non-empty. Also, allows url_strs to be either an array or an | |
308 iterator. | |
309 | |
310 Args: | |
311 command_name: name of command being run. | |
312 debug: Debug level to pass to underlying iterators (range 0..3). | |
313 logger: logging.Logger object. | |
314 gsutil_api: Cloud storage interface. Settable for testing/mocking. | |
315 url_strs: Iterable URL strings needing expansion. | |
316 recursion_requested: True if -r specified on command-line. If so, | |
317 listings will be flattened so mapped-to results contain objects | |
318 spanning subdirectories. | |
319 all_versions: Bool indicating whether to iterate over all object versions. | |
320 cmd_supports_recursion: Bool indicating whether this command supports a '-r' | |
321 flag. Useful for printing helpful error messages. | |
322 project_id: Project id to use for the current command. | |
323 continue_on_error: If true, yield no-match exceptions encountered during | |
324 iteration instead of raising them. | |
325 | |
326 Raises: | |
327 CommandException if underlying iterator is empty. | |
328 | |
329 Returns: | |
330 Name expansion iterator instance. | |
331 | |
332 For example semantics, see comments in NameExpansionIterator.__init__. | |
333 """ | |
334 url_strs = PluralityCheckableIterator(url_strs) | |
335 name_expansion_iterator = _NameExpansionIterator( | |
336 command_name, debug, logger, gsutil_api, url_strs, recursion_requested, | |
337 all_versions=all_versions, cmd_supports_recursion=cmd_supports_recursion, | |
338 project_id=project_id, continue_on_error=continue_on_error) | |
339 name_expansion_iterator = PluralityCheckableIterator(name_expansion_iterator) | |
340 if name_expansion_iterator.IsEmpty(): | |
341 raise CommandException('No URLs matched') | |
342 return name_expansion_iterator | |
343 | |
344 | |
345 class NameExpansionIteratorQueue(object): | |
346 """Wrapper around NameExpansionIterator with Multiprocessing.Queue interface. | |
347 | |
348 Only a blocking get() function can be called, and the block and timeout | |
349 params on that function are ignored. All other class functions raise | |
350 NotImplementedError. | |
351 | |
352 This class is thread safe. | |
353 """ | |
354 | |
355 def __init__(self, name_expansion_iterator, final_value): | |
356 self.name_expansion_iterator = name_expansion_iterator | |
357 self.final_value = final_value | |
358 self.lock = multiprocessing.Manager().Lock() | |
359 | |
360 def qsize(self): | |
361 raise NotImplementedError( | |
362 'NameExpansionIteratorQueue.qsize() not implemented') | |
363 | |
364 def empty(self): | |
365 raise NotImplementedError( | |
366 'NameExpansionIteratorQueue.empty() not implemented') | |
367 | |
368 def full(self): | |
369 raise NotImplementedError( | |
370 'NameExpansionIteratorQueue.full() not implemented') | |
371 | |
372 # pylint: disable=unused-argument | |
373 def put(self, obj=None, block=None, timeout=None): | |
374 raise NotImplementedError( | |
375 'NameExpansionIteratorQueue.put() not implemented') | |
376 | |
377 def put_nowait(self, obj): | |
378 raise NotImplementedError( | |
379 'NameExpansionIteratorQueue.put_nowait() not implemented') | |
380 | |
381 # pylint: disable=unused-argument | |
382 def get(self, block=None, timeout=None): | |
383 self.lock.acquire() | |
384 try: | |
385 if self.name_expansion_iterator.IsEmpty(): | |
386 return self.final_value | |
387 return self.name_expansion_iterator.next() | |
388 finally: | |
389 self.lock.release() | |
390 | |
391 def get_nowait(self): | |
392 raise NotImplementedError( | |
393 'NameExpansionIteratorQueue.get_nowait() not implemented') | |
394 | |
395 def get_no_wait(self): | |
396 raise NotImplementedError( | |
397 'NameExpansionIteratorQueue.get_no_wait() not implemented') | |
398 | |
399 def close(self): | |
400 raise NotImplementedError( | |
401 'NameExpansionIteratorQueue.close() not implemented') | |
402 | |
403 def join_thread(self): | |
404 raise NotImplementedError( | |
405 'NameExpansionIteratorQueue.join_thread() not implemented') | |
406 | |
407 def cancel_join_thread(self): | |
408 raise NotImplementedError( | |
409 'NameExpansionIteratorQueue.cancel_join_thread() not implemented') | |
410 | |
411 | |
412 class _NonContainerTuplifyIterator(object): | |
413 """Iterator that produces the tuple (False, blr) for each iterated value. | |
414 | |
415 Used for cases where blr_iter iterates over a set of | |
416 BucketListingRefs known not to name containers. | |
417 """ | |
418 | |
419 def __init__(self, blr_iter): | |
420 """Instantiates iterator. | |
421 | |
422 Args: | |
423 blr_iter: iterator of BucketListingRef. | |
424 """ | |
425 self.blr_iter = blr_iter | |
426 | |
427 def __iter__(self): | |
428 for blr in self.blr_iter: | |
429 yield (False, blr) | |
430 | |
431 | |
432 class _OmitNonRecursiveIterator(object): | |
433 """Iterator wrapper for that omits certain values for non-recursive requests. | |
434 | |
435 This iterates over tuples of (names_container, BucketListingReference) and | |
436 omits directories, prefixes, and buckets from non-recurisve requests | |
437 so that we can properly calculate whether the source URL expands to multiple | |
438 URLs. | |
439 | |
440 For example, if we have a bucket containing two objects: bucket/foo and | |
441 bucket/foo/bar and we do a non-recursive iteration, only bucket/foo will be | |
442 yielded. | |
443 """ | |
444 | |
445 def __init__(self, tuple_iter, recursion_requested, command_name, | |
446 cmd_supports_recursion, logger): | |
447 """Instanties the iterator. | |
448 | |
449 Args: | |
450 tuple_iter: Iterator over names_container, BucketListingReference | |
451 from step 2 in the NameExpansionIterator | |
452 recursion_requested: If false, omit buckets, dirs, and subdirs | |
453 command_name: Command name for user messages | |
454 cmd_supports_recursion: Command recursion support for user messages | |
455 logger: Log object for user messages | |
456 """ | |
457 self.tuple_iter = tuple_iter | |
458 self.recursion_requested = recursion_requested | |
459 self.command_name = command_name | |
460 self.cmd_supports_recursion = cmd_supports_recursion | |
461 self.logger = logger | |
462 | |
463 def __iter__(self): | |
464 for (names_container, blr) in self.tuple_iter: | |
465 if not self.recursion_requested and not blr.IsObject(): | |
466 # At this point we either have a bucket or a prefix, | |
467 # so if recursion is not requested, we're going to omit it. | |
468 expanded_url = StorageUrlFromString(blr.url_string) | |
469 if expanded_url.IsFileUrl(): | |
470 desc = 'directory' | |
471 else: | |
472 desc = blr.type_name | |
473 if self.cmd_supports_recursion: | |
474 self.logger.info( | |
475 'Omitting %s "%s". (Did you mean to do %s -r?)', | |
476 desc, blr.url_string, self.command_name) | |
477 else: | |
478 self.logger.info('Omitting %s "%s".', desc, blr.url_string) | |
479 else: | |
480 yield (names_container, blr) | |
481 | |
482 | |
483 class _ImplicitBucketSubdirIterator(object): | |
484 """Iterator wrapper that performs implicit bucket subdir expansion. | |
485 | |
486 Each iteration yields tuple (names_container, expanded BucketListingRefs) | |
487 where names_container is true if URL names a directory, bucket, | |
488 or bucket subdir. | |
489 | |
490 For example, iterating over [BucketListingRef("gs://abc")] would expand to: | |
491 [BucketListingRef("gs://abc/o1"), BucketListingRef("gs://abc/o2")] | |
492 if those subdir objects exist, and [BucketListingRef("gs://abc") otherwise. | |
493 """ | |
494 | |
495 def __init__(self, name_exp_instance, blr_iter, subdir_exp_wildcard): | |
496 """Instantiates the iterator. | |
497 | |
498 Args: | |
499 name_exp_instance: calling instance of NameExpansion class. | |
500 blr_iter: iterator over BucketListingRef prefixes and objects. | |
501 subdir_exp_wildcard: wildcard for expanding subdirectories; | |
502 expected values are ** if the mapped-to results should contain | |
503 objects spanning subdirectories, or * if only one level should | |
504 be listed. | |
505 """ | |
506 self.blr_iter = blr_iter | |
507 self.name_exp_instance = name_exp_instance | |
508 self.subdir_exp_wildcard = subdir_exp_wildcard | |
509 | |
510 def __iter__(self): | |
511 for blr in self.blr_iter: | |
512 if blr.IsPrefix(): | |
513 # This is a bucket subdirectory, list objects according to the wildcard. | |
514 prefix_url = StorageUrlFromString(blr.url_string).CreatePrefixUrl( | |
515 wildcard_suffix=self.subdir_exp_wildcard) | |
516 implicit_subdir_iterator = PluralityCheckableIterator( | |
517 self.name_exp_instance.WildcardIterator( | |
518 prefix_url).IterAll(bucket_listing_fields=['name'])) | |
519 if not implicit_subdir_iterator.IsEmpty(): | |
520 for exp_blr in implicit_subdir_iterator: | |
521 yield (True, exp_blr) | |
522 else: | |
523 # Prefix that contains no objects, for example in the $folder$ case | |
524 # or an empty filesystem directory. | |
525 yield (False, blr) | |
526 elif blr.IsObject(): | |
527 yield (False, blr) | |
528 else: | |
529 raise CommandException( | |
530 '_ImplicitBucketSubdirIterator got a bucket reference %s' % blr) | |
OLD | NEW |