Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(45)

Side by Side Diff: third_party/gsutil/gslib/name_expansion.py

Issue 12042069: Scripts to download files from google storage based on sha1 sums (Closed) Base URL: https://chromium.googlesource.com/chromium/tools/depot_tools.git@master
Patch Set: Removed gsutil/tests and gsutil/docs Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2012 Google Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import copy
16 import threading
17 import wildcard_iterator
18
19 from bucket_listing_ref import BucketListingRef
20 from gslib.exception import CommandException
21 from gslib.plurality_checkable_iterator import PluralityCheckableIterator
22 from gslib.storage_uri_builder import StorageUriBuilder
23 from wildcard_iterator import ContainsWildcard
24
25 """
26 Name expansion support for the various ways gsutil lets users refer to
27 collections of data (via explicit wildcarding as well as directory,
28 bucket, and bucket subdir implicit wildcarding). This class encapsulates
29 the various rules for determining how these expansions are done.
30 """
31
32
33 class NameExpansionResult(object):
34 """
35 Holds one fully expanded result from iterating over NameExpansionIterator.
36
37 The member data in this class need to be pickleable because
38 NameExpansionResult instances are passed through Multiprocessing.Queue. In
39 particular, don't include any boto state like StorageUri, since that pulls
40 in a big tree of objects, some of which aren't pickleable (and even if
41 they were, pickling/unpickling such a large object tree would result in
42 significant overhead).
43
44 The state held in this object is needed for handling the various naming cases
45 (e.g., copying from a single source URI to a directory generates different
46 dest URI names than copying multiple URIs to a directory, to be consistent
47 with naming rules used by the Unix cp command). For more details see comments
48 in _NameExpansionIterator.
49 """
50
51 is_current_version = False
52
53 def __init__(self, src_uri_str, is_multi_src_request,
54 src_uri_expands_to_multi, names_container, expanded_uri_str,
55 have_existing_dst_container=None, parse_version=False):
56 """
57 Args:
58 src_uri_str: string representation of StorageUri that was expanded.
59 is_multi_src_request: bool indicator whether src_uri_str expanded to more
60 than 1 BucketListingRef.
61 src_uri_expands_to_multi: bool indicator whether the current src_uri
62 expanded to more than 1 BucketListingRef.
63 names_container: Bool indicator whether src_uri names a container.
64 expanded_uri_str: string representation of StorageUri to which src_uri_str
65 expands.
66 have_existing_dst_container: bool indicator whether this is a copy
67 request to an existing bucket, bucket subdir, or directory. Default
68 None value should be used in cases where this is not needed (commands
69 other than cp).
70 parse_version: Bool indicating that the result is version-ful and should
71 be parsed accordingly.
72 """
73 self.src_uri_str = src_uri_str
74 self.is_multi_src_request = is_multi_src_request
75 self.src_uri_expands_to_multi = src_uri_expands_to_multi
76 self.names_container = names_container
77 self.expanded_uri_str = expanded_uri_str
78 self.have_existing_dst_container = have_existing_dst_container
79 self.parse_version = parse_version
80
81 def __repr__(self):
82 return '%s' % self.expanded_uri_str
83
84 def IsEmpty(self):
85 """Returns True if name expansion yielded no matches."""
86 return self.expanded_blr is None
87
88 def GetSrcUriStr(self):
89 """Returns the string representation of the StorageUri that was expanded."""
90 return self.src_uri_str
91
92 def IsMultiSrcRequest(self):
93 """
94 Returns bool indicator whether name expansion resulted in more than 0
95 BucketListingRef.
96 """
97 return self.is_multi_src_request
98
99 def SrcUriExpandsToMulti(self):
100 """
101 Returns bool indicator whether the current src_uri expanded to more than
102 1 BucketListingRef
103 """
104 return self.src_uri_expands_to_multi
105
106 def NamesContainer(self):
107 """
108 Returns bool indicator of whether src_uri names a directory, bucket, or
109 bucket subdir.
110 """
111 return self.names_container
112
113 def GetExpandedUriStr(self):
114 """
115 Returns the string representation of StorageUri to which src_uri_str
116 expands.
117 """
118 return self.expanded_uri_str
119
120 def HaveExistingDstContainer(self):
121 """Returns bool indicator whether this is a copy request to an
122 existing bucket, bucket subdir, or directory, or None if not
123 relevant."""
124 return self.have_existing_dst_container
125
126
127 class _NameExpansionIterator(object):
128 """
129 Iterates over all src_uris, expanding wildcards, object-less bucket names,
130 subdir bucket names, and directory names, generating a flat listing of all
131 the matching objects/files.
132
133 You should instantiate this object using the static factory function
134 NameExpansionIterator, because consumers of this iterator need the
135 PluralityCheckableIterator wrapper built by that function.
136
137 Yields:
138 gslib.name_expansion.NameExpansionResult.
139
140 Raises:
141 CommandException: if errors encountered.
142 """
143
144 def __init__(self, command_name, proj_id_handler, headers, debug,
145 bucket_storage_uri_class, uri_strs, recursion_requested,
146 have_existing_dst_container=None, flat=True,
147 all_versions=False, for_all_version_delete=False,
148 parse_versions=False):
149 """
150 Args:
151 command_name: name of command being run.
152 proj_id_handler: ProjectIdHandler to use for current command.
153 headers: Dictionary containing optional HTTP headers to pass to boto.
154 debug: Debug level to pass in to boto connection (range 0..3).
155 bucket_storage_uri_class: Class to instantiate for cloud StorageUris.
156 Settable for testing/mocking.
157 uri_strs: PluralityCheckableIterator of URI strings needing expansion.
158 recursion_requested: True if -R specified on command-line.
159 have_existing_dst_container: Bool indicator whether this is a copy
160 request to an existing bucket, bucket subdir, or directory. Default
161 None value should be used in cases where this is not needed (commands
162 other than cp).
163 flat: Bool indicating whether bucket listings should be flattened, i.e.,
164 so the mapped-to results contain objects spanning subdirectories.
165 all_versions: Bool indicating whether to iterate over all object versions.
166 for_all_version_delete: Bool indicating whether this is for an all-version
167 delete.
168 parse_versions: Bool indicating that the uri_strs are version-ful.
169
170 Examples of ExpandWildcardsAndContainers with flat=True:
171 - Calling with one of the uri_strs being 'gs://bucket' will enumerate all
172 top-level objects, as will 'gs://bucket/' and 'gs://bucket/*'.
173 - 'gs://bucket/**' will enumerate all objects in the bucket.
174 - 'gs://bucket/abc' will enumerate all next-level objects under directory
175 abc (i.e., not including subdirectories of abc) if gs://bucket/abc/*
176 matches any objects; otherwise it will enumerate the single name
177 gs://bucket/abc
178 - 'gs://bucket/abc/**' will enumerate all objects under abc or any of its
179 subdirectories.
180 - 'file:///tmp' will enumerate all files under /tmp, as will
181 'file:///tmp/*'
182 - 'file:///tmp/**' will enumerate all files under /tmp or any of its
183 subdirectories.
184
185 Example if flat=False: calling with gs://bucket/abc/* lists matching objects
186 or subdirs, but not sub-subdirs or objects beneath subdirs.
187
188 Note: In step-by-step comments below we give examples assuming there's a
189 gs://bucket with object paths:
190 abcd/o1.txt
191 abcd/o2.txt
192 xyz/o1.txt
193 xyz/o2.txt
194 and a directory file://dir with file paths:
195 dir/a.txt
196 dir/b.txt
197 dir/c/
198 """
199 self.command_name = command_name
200 self.proj_id_handler = proj_id_handler
201 self.headers = headers
202 self.debug = debug
203 self.bucket_storage_uri_class = bucket_storage_uri_class
204 self.suri_builder = StorageUriBuilder(debug, bucket_storage_uri_class)
205 self.uri_strs = uri_strs
206 self.recursion_requested = recursion_requested
207 self.have_existing_dst_container = have_existing_dst_container
208 self.flat = flat
209 self.all_versions = all_versions
210 self.for_all_version_delete = for_all_version_delete
211 self.parse_versions = parse_versions
212
213 # Map holding wildcard strings to use for flat vs subdir-by-subdir listings.
214 # (A flat listing means show all objects expanded all the way down.)
215 self._flatness_wildcard = {True: '**', False: '*'}
216
217 def __iter__(self):
218 if self.all_versions:
219 return self._VersionedIter()
220 else:
221 return self._VersionAgnosticIter()
222
223 def _VersionedIter(self):
224 for ne_result in self._VersionAgnosticIter():
225 exp_src_uri = self.suri_builder.StorageUri(ne_result.GetExpandedUriStr())
226
227 # If a current version exists, hold onto its URI.
228 current_version_str = None
229 if self.for_all_version_delete and exp_src_uri.exists():
230 key = exp_src_uri.get_key()
231 current_version_str = (
232 '%s://%s/%s#%s' % (exp_src_uri.scheme, key.bucket.name, key.name,
233 key.version_id or key.generation))
234
235 for key in exp_src_uri.list_bucket(prefix=exp_src_uri.object_name,
236 headers=self.headers,
237 all_versions=True):
238 if key.name != exp_src_uri.object_name:
239 # The desired entries will be alphabetically first in this listing.
240 break
241 versioned_ne_result = copy.deepcopy(ne_result)
242 versioned_ne_result.expanded_uri_str = (
243 '%s://%s/%s#%s' % (exp_src_uri.scheme, key.bucket.name, key.name,
244 key.version_id or key.generation))
245 versioned_ne_result.parse_version = True
246 # If this is the current version, and we're doing an "rm -a", then set
247 # the is_current_version flag, so the remove function will delete it
248 # twice (in versioned buckets, the first delete just marks the object
249 # deleted so it won't show up in bucket listings without deleting data).
250 if (self.for_all_version_delete and
251 current_version_str == versioned_ne_result.expanded_uri_str):
252 versioned_ne_result.is_current_version = True
253 yield versioned_ne_result
254
255 def _VersionAgnosticIter(self):
256 for uri_str in self.uri_strs:
257
258 # Step 1: Expand any explicitly specified wildcards. The output from this
259 # step is an iterator of BucketListingRef.
260 # Starting with gs://buck*/abc* this step would expand to gs://bucket/abcd
261 if ContainsWildcard(uri_str):
262 post_step1_iter = self._WildcardIterator(uri_str)
263 else:
264 post_step1_iter = iter([
265 BucketListingRef(self.suri_builder.StorageUri(uri_str))])
266 post_step1_iter = PluralityCheckableIterator(post_step1_iter)
267
268 # Step 2: Expand bucket subdirs. The output from this
269 # step is an iterator of (names_container, BucketListingRef).
270 # Starting with gs://bucket/abcd this step would expand to:
271 # iter([(True, abcd/o1.txt), (True, abcd/o2.txt)]).
272 if self.flat and self.recursion_requested:
273 post_step2_iter = _ImplicitBucketSubdirIterator(self,
274 post_step1_iter, self.flat)
275 else:
276 post_step2_iter = _NonContainerTuplifyIterator(post_step1_iter)
277 post_step2_iter = PluralityCheckableIterator(post_step2_iter)
278
279 # Step 3. Expand directories and buckets. This step yields the iterated
280 # values. Starting with gs://bucket this step would expand to:
281 # [abcd/o1.txt, abcd/o2.txt, xyz/o1.txt, xyz/o2.txt]
282 # Starting with file://dir this step would expand to:
283 # [dir/a.txt, dir/b.txt, dir/c/]
284 exp_src_bucket_listing_refs = []
285 wc = self._flatness_wildcard[self.flat]
286 src_uri_expands_to_multi = (post_step1_iter.has_plurality()
287 or post_step2_iter.has_plurality())
288 is_multi_src_request = (self.uri_strs.has_plurality()
289 or src_uri_expands_to_multi)
290 for (names_container, blr) in post_step2_iter:
291 if (not blr.GetUri().names_container()
292 and (self.flat or not blr.HasPrefix())):
293 yield NameExpansionResult(uri_str, is_multi_src_request,
294 src_uri_expands_to_multi, names_container,
295 blr.GetUriString(),
296 self.have_existing_dst_container,
297 parse_version=self.parse_versions)
298 continue
299 if not self.recursion_requested:
300 if blr.GetUri().is_file_uri():
301 desc = 'directory'
302 else:
303 desc = 'bucket'
304 print 'Omitting %s "%s". (Did you mean to do %s -R?)' % (
305 desc, blr.GetUri(), self.command_name)
306 continue
307 if blr.GetUri().is_file_uri():
308 # Convert dir to implicit recursive wildcard.
309 uri_to_iterate = '%s/%s' % (blr.GetUriString(), wc)
310 else:
311 # Convert bucket to implicit recursive wildcard.
312 uri_to_iterate = blr.GetUri().clone_replace_name(wc)
313 wc_iter = PluralityCheckableIterator(
314 self._WildcardIterator(uri_to_iterate))
315 src_uri_expands_to_multi = (src_uri_expands_to_multi
316 or wc_iter.has_plurality())
317 is_multi_src_request = (self.uri_strs.has_plurality()
318 or src_uri_expands_to_multi)
319 for blr in wc_iter:
320 yield NameExpansionResult(uri_str, is_multi_src_request,
321 src_uri_expands_to_multi, True,
322 blr.GetUriString(),
323 self.have_existing_dst_container)
324
325 def _WildcardIterator(self, uri_or_str):
326 """
327 Helper to instantiate gslib.WildcardIterator. Args are same as
328 gslib.WildcardIterator interface, but this method fills in most of the
329 values from instance state.
330
331 Args:
332 uri_or_str: StorageUri or URI string naming wildcard objects to iterate.
333 """
334 return wildcard_iterator.wildcard_iterator(
335 uri_or_str, self.proj_id_handler,
336 bucket_storage_uri_class=self.bucket_storage_uri_class,
337 headers=self.headers, debug=self.debug,
338 all_versions=self.all_versions)
339
340
341 def NameExpansionIterator(command_name, proj_id_handler, headers, debug,
342 bucket_storage_uri_class, uri_strs,
343 recursion_requested,
344 have_existing_dst_container=None, flat=True,
345 all_versions=False,
346 for_all_version_delete=False,
347 parse_versions=False):
348 """
349 Static factory function for instantiating _NameExpansionIterator, which
350 wraps the resulting iterator in a PluralityCheckableIterator and checks
351 that it is non-empty. Also, allows uri_strs can be either an array or an
352 iterator.
353
354 Args:
355 command_name: name of command being run.
356 proj_id_handler: ProjectIdHandler to use for current command.
357 headers: Dictionary containing optional HTTP headers to pass to boto.
358 debug: Debug level to pass in to boto connection (range 0..3).
359 bucket_storage_uri_class: Class to instantiate for cloud StorageUris.
360 Settable for testing/mocking.
361 uri_strs: PluralityCheckableIterator of URI strings needing expansion.
362 recursion_requested: True if -R specified on command-line.
363 have_existing_dst_container: Bool indicator whether this is a copy
364 request to an existing bucket, bucket subdir, or directory. Default
365 None value should be used in cases where this is not needed (commands
366 other than cp).
367 flat: Bool indicating whether bucket listings should be flattened, i.e.,
368 so the mapped-to results contain objects spanning subdirectories.
369 all_versions: Bool indicating whether to iterate over all object versions.
370 for_all_version_delete: Bool indicating whether this is for an all-version
371 delete.
372 parse_versions: Bool indicating that the uri_strs are version-ful.
373
374 Examples of ExpandWildcardsAndContainers with flat=True:
375 - Calling with one of the uri_strs being 'gs://bucket' will enumerate all
376 top-level objects, as will 'gs://bucket/' and 'gs://bucket/*'.
377 - 'gs://bucket/**' will enumerate all objects in the bucket.
378 - 'gs://bucket/abc' will enumerate all next-level objects under directory
379 abc (i.e., not including subdirectories of abc) if gs://bucket/abc/*
380 matches any objects; otherwise it will enumerate the single name
381 gs://bucket/abc
382 - 'gs://bucket/abc/**' will enumerate all objects under abc or any of its
383 subdirectories.
384 - 'file:///tmp' will enumerate all files under /tmp, as will
385 'file:///tmp/*'
386 - 'file:///tmp/**' will enumerate all files under /tmp or any of its
387 subdirectories.
388
389 Example if flat=False: calling with gs://bucket/abc/* lists matching objects
390 or subdirs, but not sub-subdirs or objects beneath subdirs.
391
392 Note: In step-by-step comments below we give examples assuming there's a
393 gs://bucket with object paths:
394 abcd/o1.txt
395 abcd/o2.txt
396 xyz/o1.txt
397 xyz/o2.txt
398 and a directory file://dir with file paths:
399 dir/a.txt
400 dir/b.txt
401 dir/c/
402 """
403 uri_strs = PluralityCheckableIterator(uri_strs)
404 name_expansion_iterator = _NameExpansionIterator(
405 command_name, proj_id_handler, headers, debug, bucket_storage_uri_class,
406 uri_strs, recursion_requested, have_existing_dst_container, flat,
407 all_versions=all_versions, for_all_version_delete=for_all_version_delete,
408 parse_versions=parse_versions)
409 name_expansion_iterator = PluralityCheckableIterator(name_expansion_iterator)
410 if name_expansion_iterator.is_empty():
411 raise CommandException('No URIs matched')
412 return name_expansion_iterator
413
414
415 class NameExpansionIteratorQueue(object):
416 """
417 Wrapper around NameExpansionIterator that provides a Multiprocessing.Queue
418 facade.
419
420 Only a blocking get() function can be called, and the block and timeout
421 params on that function are ignored. All other class functions raise
422 NotImplementedError.
423
424 This class is thread safe.
425 """
426
427 def __init__(self, name_expansion_iterator, final_value):
428 self.name_expansion_iterator = name_expansion_iterator
429 self.final_value = final_value
430 self.lock = threading.Lock()
431
432 def qsize(self):
433 raise NotImplementedError(
434 "NameExpansionIteratorQueue.qsize() not implemented")
435
436 def empty(self):
437 raise NotImplementedError(
438 "NameExpansionIteratorQueue.empty() not implemented")
439
440 def full(self):
441 raise NotImplementedError(
442 "NameExpansionIteratorQueue.full() not implemented")
443
444 def put(self, obj=None, block=None, timeout=None):
445 raise NotImplementedError(
446 "NameExpansionIteratorQueue.put() not implemented")
447
448 def put_nowait(self, obj):
449 raise NotImplementedError(
450 "NameExpansionIteratorQueue.put_nowait() not implemented")
451
452 def get(self, block=None, timeout=None):
453 self.lock.acquire()
454 try:
455 if self.name_expansion_iterator.is_empty():
456 return self.final_value
457 return self.name_expansion_iterator.next()
458 finally:
459 self.lock.release()
460
461 def get_nowait(self):
462 raise NotImplementedError(
463 "NameExpansionIteratorQueue.get_nowait() not implemented")
464
465 def get_no_wait(self):
466 raise NotImplementedError(
467 "NameExpansionIteratorQueue.get_no_wait() not implemented")
468
469 def close(self):
470 raise NotImplementedError(
471 "NameExpansionIteratorQueue.close() not implemented")
472
473 def join_thread(self):
474 raise NotImplementedError(
475 "NameExpansionIteratorQueue.join_thread() not implemented")
476
477 def cancel_join_thread(self):
478 raise NotImplementedError(
479 "NameExpansionIteratorQueue.cancel_join_thread() not implemented")
480
481
482 class _NonContainerTuplifyIterator(object):
483 """
484 Iterator that produces the tuple (False, blr) for each iteration
485 of blr_iter. Used for cases where blr_iter iterates over a set of
486 BucketListingRefs known not to name containers.
487 """
488
489 def __init__(self, blr_iter):
490 """
491 Args:
492 blr_iter: iterator of BucketListingRef.
493 """
494 self.blr_iter = blr_iter
495
496 def __iter__(self):
497 for blr in self.blr_iter:
498 yield (False, blr)
499
500
501 class _ImplicitBucketSubdirIterator(object):
502
503 """
504 Iterator wrapper that iterates over blr_iter, performing implicit bucket
505 subdir expansion.
506
507 Each iteration yields tuple (names_container, expanded BucketListingRefs)
508 where names_container is true if URI names a directory, bucket,
509 or bucket subdir (vs how StorageUri.names_container() doesn't
510 handle latter case).
511
512 For example, iterating over [BucketListingRef("gs://abc")] would expand to:
513 [BucketListingRef("gs://abc/o1"), BucketListingRef("gs://abc/o2")]
514 if those subdir objects exist, and [BucketListingRef("gs://abc") otherwise.
515 """
516
517 def __init__(self, name_expansion_instance, blr_iter, flat):
518 """
519 Args:
520 name_expansion_instance: calling instance of NameExpansion class.
521 blr_iter: iterator of BucketListingRef.
522 flat: bool indicating whether bucket listings should be flattened, i.e.,
523 so the mapped-to results contain objects spanning subdirectories.
524 """
525 self.blr_iter = blr_iter
526 self.name_expansion_instance = name_expansion_instance
527 self.flat = flat
528
529 def __iter__(self):
530 for blr in self.blr_iter:
531 uri = blr.GetUri()
532 if uri.names_object():
533 # URI could be a bucket subdir.
534 implicit_subdir_iterator = PluralityCheckableIterator(
535 self.name_expansion_instance._WildcardIterator(
536 self.name_expansion_instance.suri_builder.StorageUri(
537 '%s/%s' % (uri.uri.rstrip('/'),
538 self.name_expansion_instance._flatness_wildcard[
539 self.flat]))))
540 if not implicit_subdir_iterator.is_empty():
541 for exp_blr in implicit_subdir_iterator:
542 yield (True, exp_blr)
543 else:
544 yield (False, blr)
545 else:
546 yield (False, blr)
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698