Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(91)

Side by Side Diff: third_party/gsutil/gslib/command.py

Issue 12042069: Scripts to download files from google storage based on sha1 sums (Closed) Base URL: https://chromium.googlesource.com/chromium/tools/depot_tools.git@master
Patch Set: Removed gsutil/tests and gsutil/docs Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2010 Google Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """Base class for gsutil commands.
16
17 In addition to base class code, this file contains helpers that depend on base
18 class state (such as GetAclCommandHelper, which depends on self.gsutil_bin_dir,
19 self.bucket_storage_uri_class, etc.) In general, functions that depend on class
20 state and that are used by multiple commands belong in this file. Functions that
21 don't depend on class state belong in util.py, and non-shared helpers belong in
22 individual subclasses.
23 """
24
25 import boto
26 import getopt
27 import gslib
28 import logging
29 import multiprocessing
30 import os
31 import re
32 import sys
33 import wildcard_iterator
34 import xml.dom.minidom
35 import xml.sax.xmlreader
36
37 from boto import handler
38 from boto.storage_uri import StorageUri
39 from getopt import GetoptError
40 from gslib import util
41 from gslib.exception import CommandException
42 from gslib.help_provider import HelpProvider
43 from gslib.name_expansion import NameExpansionIterator
44 from gslib.name_expansion import NameExpansionIteratorQueue
45 from gslib.project_id import ProjectIdHandler
46 from gslib.storage_uri_builder import StorageUriBuilder
47 from gslib.thread_pool import ThreadPool
48 from gslib.util import HAVE_OAUTH2
49 from gslib.util import NO_MAX
50
51 from gslib.wildcard_iterator import ContainsWildcard
52
53
54 def _ThreadedLogger():
55 """Creates a logger that resembles 'print' output, but is thread safe.
56
57 The logger will display all messages logged with level INFO or above. Log
58 propagation is disabled.
59
60 Returns:
61 A logger object.
62 """
63 log = logging.getLogger('threaded-logging')
64 log.propagate = False
65 log.setLevel(logging.INFO)
66 log_handler = logging.StreamHandler()
67 log_handler.setFormatter(logging.Formatter('%(message)s'))
68 log.addHandler(log_handler)
69 return log
70
71 # command_spec key constants.
72 COMMAND_NAME = 'command_name'
73 COMMAND_NAME_ALIASES = 'command_name_aliases'
74 MIN_ARGS = 'min_args'
75 MAX_ARGS = 'max_args'
76 SUPPORTED_SUB_ARGS = 'supported_sub_args'
77 FILE_URIS_OK = 'file_uri_ok'
78 PROVIDER_URIS_OK = 'provider_uri_ok'
79 URIS_START_ARG = 'uris_start_arg'
80 CONFIG_REQUIRED = 'config_required'
81
82 _EOF_NAME_EXPANSION_RESULT = ("EOF")
83
84
85 class Command(object):
86 # Global instance of a threaded logger object.
87 THREADED_LOGGER = _ThreadedLogger()
88
89 REQUIRED_SPEC_KEYS = [COMMAND_NAME]
90
91 # Each subclass must define the following map, minimally including the
92 # keys in REQUIRED_SPEC_KEYS; other values below will be used as defaults,
93 # although for readbility subclasses should specify the complete map.
94 command_spec = {
95 # Name of command.
96 COMMAND_NAME : None,
97 # List of command name aliases.
98 COMMAND_NAME_ALIASES : [],
99 # Min number of args required by this command.
100 MIN_ARGS : 0,
101 # Max number of args required by this command, or NO_MAX.
102 MAX_ARGS : NO_MAX,
103 # Getopt-style string specifying acceptable sub args.
104 SUPPORTED_SUB_ARGS : '',
105 # True if file URIs are acceptable for this command.
106 FILE_URIS_OK : False,
107 # True if provider-only URIs are acceptable for this command.
108 PROVIDER_URIS_OK : False,
109 # Index in args of first URI arg.
110 URIS_START_ARG : 0,
111 # True if must configure gsutil before running command.
112 CONFIG_REQUIRED : True,
113 }
114 _default_command_spec = command_spec
115 help_spec = HelpProvider.help_spec
116
117 """Define an empty test specification, which derived classes must populate.
118
119 This is a list of tuples containing the following values:
120
121 step_name - mnemonic name for test, displayed when test is run
122 cmd_line - shell command line to run test
123 expect_ret or None - expected return code from test (None means ignore)
124 (result_file, expect_file) or None - tuple of result file and expected
125 file to diff for additional test
126 verification beyond the return code
127 (None means no diff requested)
128 Notes:
129
130 - Setting expected_ret to None means there is no expectation and,
131 hence, any returned value will pass.
132
133 - Any occurrences of the string 'gsutil' in the cmd_line parameter
134 are expanded to the full path to the gsutil command under test.
135
136 - The cmd_line, result_file and expect_file parameters may
137 contain the following special substrings:
138
139 $Bn - converted to one of 10 unique-for-testing bucket names (n=0..9)
140 $On - converted to one of 10 unique-for-testing object names (n=0..9)
141 $Fn - converted to one of 10 unique-for-testing file names (n=0..9)
142 $G - converted to the directory where gsutil is installed. Useful for
143 referencing test data.
144
145 - The generated file names are full pathnames, whereas the generated
146 bucket and object names are simple relative names.
147
148 - Tests with a non-None result_file and expect_file automatically
149 trigger an implicit diff of the two files.
150
151 - These test specifications, in combination with the conversion strings
152 allow tests to be constructed parametrically. For example, here's an
153 annotated subset of a test_steps for the cp command:
154
155 # Copy local file to object, verify 0 return code.
156 ('simple cp', 'gsutil cp $F1 gs://$B1/$O1', 0, None, None),
157 # Copy uploaded object back to local file and diff vs. orig file.
158 ('verify cp', 'gsutil cp gs://$B1/$O1 $F2', 0, '$F2', '$F1'),
159
160 - After pattern substitution, the specs are run sequentially, in the
161 order in which they appear in the test_steps list.
162 """
163 test_steps = []
164
165 # Define a convenience property for command name, since it's used many places.
166 def _GetDefaultCommandName(self):
167 return self.command_spec[COMMAND_NAME]
168 command_name = property(_GetDefaultCommandName)
169
170 def __init__(self, command_runner, args, headers, debug, parallel_operations,
171 gsutil_bin_dir, boto_lib_dir, config_file_list, gsutil_ver,
172 bucket_storage_uri_class, test_method=None):
173 """
174 Args:
175 command_runner: CommandRunner (for commands built atop other commands).
176 args: Command-line args (arg0 = actual arg, not command name ala bash).
177 headers: Dictionary containing optional HTTP headers to pass to boto.
178 debug: Debug level to pass in to boto connection (range 0..3).
179 parallel_operations: Should command operations be executed in parallel?
180 gsutil_bin_dir: Bin dir from which gsutil is running.
181 boto_lib_dir: Lib dir where boto runs.
182 config_file_list: Config file list returned by _GetBotoConfigFileList().
183 gsutil_ver: Version string of currently running gsutil command.
184 bucket_storage_uri_class: Class to instantiate for cloud StorageUris.
185 Settable for testing/mocking.
186 test_method: Optional general purpose method for testing purposes.
187 Application and semantics of this method will vary by
188 command and test type.
189
190 Implementation note: subclasses shouldn't need to define an __init__
191 method, and instead depend on the shared initialization that happens
192 here. If you do define an __init__ method in a subclass you'll need to
193 explicitly call super().__init__(). But you're encouraged not to do this,
194 because it will make changing the __init__ interface more painful.
195 """
196 # Save class values from constructor params.
197 self.command_runner = command_runner
198 self.args = args
199 self.unparsed_args = args
200 self.headers = headers
201 self.debug = debug
202 self.parallel_operations = parallel_operations
203 self.gsutil_bin_dir = gsutil_bin_dir
204 self.boto_lib_dir = boto_lib_dir
205 self.config_file_list = config_file_list
206 self.gsutil_ver = gsutil_ver
207 self.bucket_storage_uri_class = bucket_storage_uri_class
208 self.test_method = test_method
209 self.exclude_symlinks = False
210 self.recursion_requested = False
211 self.all_versions = False
212 self.parse_versions = False
213
214 # Process sub-command instance specifications.
215 # First, ensure subclass implementation sets all required keys.
216 for k in self.REQUIRED_SPEC_KEYS:
217 if k not in self.command_spec or self.command_spec[k] is None:
218 raise CommandException('"%s" command implementation is missing %s '
219 'specification' % (self.command_name, k))
220 # Now override default command_spec with subclass-specified values.
221 tmp = self._default_command_spec
222 tmp.update(self.command_spec)
223 self.command_spec = tmp
224 del tmp
225
226 # Make sure command provides a test specification.
227 if not self.test_steps:
228 # TODO: Uncomment following lines when test feature is ready.
229 #raise CommandException('"%s" command implementation is missing test '
230 #'specification' % self.command_name)
231 pass
232
233 # Parse and validate args.
234 try:
235 (self.sub_opts, self.args) = getopt.getopt(
236 args, self.command_spec[SUPPORTED_SUB_ARGS])
237 except GetoptError, e:
238 raise CommandException('%s for "%s" command.' % (e.msg,
239 self.command_name))
240 if (len(self.args) < self.command_spec[MIN_ARGS]
241 or len(self.args) > self.command_spec[MAX_ARGS]):
242 raise CommandException('Wrong number of arguments for "%s" command.' %
243 self.command_name)
244 if (not self.command_spec[FILE_URIS_OK]
245 and self.HaveFileUris(self.args[self.command_spec[URIS_START_ARG]:])):
246 raise CommandException('"%s" command does not support "file://" URIs. '
247 'Did you mean to use a gs:// URI?' %
248 self.command_name)
249 if (not self.command_spec[PROVIDER_URIS_OK]
250 and self._HaveProviderUris(
251 self.args[self.command_spec[URIS_START_ARG]:])):
252 raise CommandException('"%s" command does not support provider-only '
253 'URIs.' % self.command_name)
254 if self.command_spec[CONFIG_REQUIRED]:
255 self._ConfigureNoOpAuthIfNeeded()
256
257 self.proj_id_handler = ProjectIdHandler()
258 self.suri_builder = StorageUriBuilder(debug, bucket_storage_uri_class)
259
260 # We're treating recursion_requested like it's used by all commands, but
261 # only some of the commands accept the -R option.
262 if self.sub_opts:
263 for o, unused_a in self.sub_opts:
264 if o == '-r' or o == '-R':
265 self.recursion_requested = True
266 break
267
268 def WildcardIterator(self, uri_or_str, all_versions=False):
269 """
270 Helper to instantiate gslib.WildcardIterator. Args are same as
271 gslib.WildcardIterator interface, but this method fills in most of the
272 values from instance state.
273
274 Args:
275 uri_or_str: StorageUri or URI string naming wildcard objects to iterate.
276 """
277 return wildcard_iterator.wildcard_iterator(
278 uri_or_str, self.proj_id_handler,
279 bucket_storage_uri_class=self.bucket_storage_uri_class,
280 all_versions=all_versions,
281 headers=self.headers, debug=self.debug)
282
283 def RunCommand(self):
284 """Abstract function in base class. Subclasses must implement this. The
285 return value of this function will be used as the exit status of the
286 process, so subclass commands should return an integer exit code (0 for
287 success, a value in [1,255] for failure).
288 """
289 raise CommandException('Command %s is missing its RunCommand() '
290 'implementation' % self.command_name)
291
292 ############################################################
293 # Shared helper functions that depend on base class state. #
294 ############################################################
295
296 def UrisAreForSingleProvider(self, uri_args):
297 """Tests whether the uris are all for a single provider.
298
299 Returns: a StorageUri for one of the uris on success, None on failure.
300 """
301 provider = None
302 uri = None
303 for uri_str in uri_args:
304 # validate=False because we allow wildcard uris.
305 uri = boto.storage_uri(
306 uri_str, debug=self.debug, validate=False,
307 bucket_storage_uri_class=self.bucket_storage_uri_class)
308 if not provider:
309 provider = uri.scheme
310 elif uri.scheme != provider:
311 return None
312 return uri
313
314 def _CheckNoWildcardsForVersions(self, uri_args):
315 for uri_str in uri_args:
316 if ContainsWildcard(uri_str):
317 # It's exceedingly unlikely that specifying a generation and wildcarding
318 # an object name will match multiple objects, so we explicitly disallow
319 # this behavior. Wildcarding generation numbers is slightly more useful,
320 # but probably still a remote use-case.
321 raise CommandException('Wildcard-ful URI (%s) disallowed with -v flag.'
322 % uri_str)
323
324 def SetAclCommandHelper(self):
325 """
326 Common logic for setting ACLs. Sets the standard ACL or the default
327 object ACL depending on self.command_name.
328 """
329 acl_arg = self.args[0]
330 uri_args = self.args[1:]
331 if self.parse_versions:
332 self._CheckNoWildcardsForVersions(uri_args)
333 # Disallow multi-provider setacl requests, because there are differences in
334 # the ACL models.
335 storage_uri = self.UrisAreForSingleProvider(uri_args)
336 if not storage_uri:
337 raise CommandException('"%s" command spanning providers not allowed.' %
338 self.command_name)
339
340 # Get ACL object from connection for one URI, for interpreting the ACL.
341 # This won't fail because the main startup code insists on at least 1 arg
342 # for this command.
343 acl_class = storage_uri.acl_class()
344 canned_acls = storage_uri.canned_acls()
345
346 # Determine whether acl_arg names a file containing XML ACL text vs. the
347 # string name of a canned ACL.
348 if os.path.isfile(acl_arg):
349 acl_file = open(acl_arg, 'r')
350 acl_txt = acl_file.read()
351 acl_file.close()
352 acl_obj = acl_class()
353 # Handle wildcard-named bucket.
354 if ContainsWildcard(storage_uri.bucket_name):
355 try:
356 bucket_uri = self.WildcardIterator(
357 storage_uri.clone_replace_name('')).IterUris().next()
358 except StopIteration:
359 raise CommandException('No URIs matched')
360 else:
361 bucket_uri = storage_uri
362 h = handler.XmlHandler(acl_obj, bucket_uri.get_bucket())
363 try:
364 xml.sax.parseString(acl_txt, h)
365 except xml.sax._exceptions.SAXParseException, e:
366 raise CommandException('Requested ACL is invalid: %s at line %s, '
367 'column %s' % (e.getMessage(), e.getLineNumber(),
368 e.getColumnNumber()))
369 acl_arg = acl_obj
370 else:
371 # No file exists, so expect a canned ACL string.
372 if acl_arg not in canned_acls:
373 raise CommandException('Invalid canned ACL "%s".' % acl_arg)
374
375 # Used to track if any ACLs failed to be set.
376 self.everything_set_okay = True
377
378 def _SetAclExceptionHandler(e):
379 """Simple exception handler to allow post-completion status."""
380 self.THREADED_LOGGER.error(str(e))
381 self.everything_set_okay = False
382
383 def _SetAclFunc(name_expansion_result):
384 parse_version = self.parse_versions or self.all_versions
385 exp_src_uri = self.suri_builder.StorageUri(
386 name_expansion_result.GetExpandedUriStr(),
387 parse_version=parse_version)
388 # We don't do bucket operations multi-threaded (see comment below).
389 assert self.command_name != 'setdefacl'
390 self.THREADED_LOGGER.info('Setting ACL on %s...' %
391 name_expansion_result.expanded_uri_str)
392 exp_src_uri.set_acl(acl_arg, exp_src_uri.object_name, False,
393 self.headers)
394
395 # If user specified -R option, convert any bucket args to bucket wildcards
396 # (e.g., gs://bucket/*), to prevent the operation from being applied to
397 # the buckets themselves.
398 if self.recursion_requested:
399 for i in range(len(uri_args)):
400 uri = self.suri_builder.StorageUri(uri_args[i])
401 if uri.names_bucket():
402 uri_args[i] = uri.clone_replace_name('*').uri
403 else:
404 # Handle bucket ACL setting operations single-threaded, because
405 # our threading machinery currently assumes it's working with objects
406 # (name_expansion_iterator), and normally we wouldn't expect users to need
407 # to set ACLs on huge numbers of buckets at once anyway.
408 for i in range(len(uri_args)):
409 uri_str = uri_args[i]
410 if self.suri_builder.StorageUri(uri_str).names_bucket():
411 self._RunSingleThreadedSetAcl(acl_arg, uri_args)
412 return
413
414 name_expansion_iterator = NameExpansionIterator(
415 self.command_name, self.proj_id_handler, self.headers, self.debug,
416 self.bucket_storage_uri_class, uri_args, self.recursion_requested,
417 self.recursion_requested, all_versions=self.all_versions)
418
419 # Perform requests in parallel (-m) mode, if requested, using
420 # configured number of parallel processes and threads. Otherwise,
421 # perform requests with sequential function calls in current process.
422 self.Apply(_SetAclFunc, name_expansion_iterator, _SetAclExceptionHandler)
423
424 if not self.everything_set_okay:
425 raise CommandException('ACLs for some objects could not be set.')
426
427 def _RunSingleThreadedSetAcl(self, acl_arg, uri_args):
428 some_matched = False
429 for uri_str in uri_args:
430 for blr in self.WildcardIterator(uri_str):
431 if blr.HasPrefix():
432 continue
433 some_matched = True
434 uri = blr.GetUri()
435 if self.command_name == 'setdefacl':
436 print 'Setting default object ACL on %s...' % uri
437 uri.set_def_acl(acl_arg, uri.object_name, False, self.headers)
438 else:
439 print 'Setting ACL on %s...' % uri
440 uri.set_acl(acl_arg, uri.object_name, False, self.headers)
441 if not some_matched:
442 raise CommandException('No URIs matched')
443
444 def GetAclCommandHelper(self):
445 """Common logic for getting ACLs. Gets the standard ACL or the default
446 object ACL depending on self.command_name."""
447 parse_versions = False
448 if self.sub_opts:
449 for o, a in self.sub_opts:
450 if o == '-v':
451 parse_versions = True
452
453 if parse_versions:
454 uri_str = self.args[0]
455 if ContainsWildcard(uri_str):
456 raise CommandException('Wildcards disallowed with -v flag.')
457 uri = self.suri_builder.StorageUri(uri_str, parse_version=True)
458 else:
459 # Wildcarding is allowed but must resolve to just one object.
460 uris = list(self.WildcardIterator(self.args[0]).IterUris())
461 if len(uris) == 0:
462 raise CommandException('No URIs matched')
463 if len(uris) != 1:
464 raise CommandException('%s matched more than one URI, which is not '
465 'allowed by the %s command' % (self.args[0], self.command_name))
466 uri = uris[0]
467 if not uri.names_bucket() and not uri.names_object():
468 raise CommandException('"%s" command must specify a bucket or '
469 'object.' % self.command_name)
470 if self.command_name == 'getdefacl':
471 acl = uri.get_def_acl(False, self.headers)
472 else:
473 acl = uri.get_acl(False, self.headers)
474 # Pretty-print the XML to make it more easily human editable.
475 parsed_xml = xml.dom.minidom.parseString(acl.to_xml().encode('utf-8'))
476 print parsed_xml.toprettyxml(indent=' ')
477
478 def GetXmlSubresource(self, subresource, uri_arg):
479 """Print an xml subresource, e.g. logging, for a bucket/object.
480
481 Args:
482 subresource: The subresource name.
483 uri_arg: URI for the bucket/object. Wildcards will be expanded.
484
485 Raises:
486 CommandException: if errors encountered.
487 """
488 # Wildcarding is allowed but must resolve to just one bucket.
489 uris = list(self.WildcardIterator(uri_arg).IterUris())
490 if len(uris) != 1:
491 raise CommandException('Wildcards must resolve to exactly one item for '
492 'get %s' % subresource)
493 uri = uris[0]
494 xml_str = uri.get_subresource(subresource, False, self.headers)
495 # Pretty-print the XML to make it more easily human editable.
496 parsed_xml = xml.dom.minidom.parseString(xml_str.encode('utf-8'))
497 print parsed_xml.toprettyxml(indent=' ')
498
499 def Apply(self, func, name_expansion_iterator, thr_exc_handler,
500 shared_attrs=None):
501 """Dispatch input URI assignments across a pool of parallel OS
502 processes and/or Python threads, based on options (-m or not)
503 and settings in the user's config file. If non-parallel mode
504 or only one OS process requested, execute requests sequentially
505 in the current OS process.
506
507 Args:
508 func: Function to call to process each URI.
509 name_expansion_iterator: Iterator of NameExpansionResult.
510 thr_exc_handler: Exception handler for ThreadPool class.
511 shared_attrs: List of attributes to manage across sub-processes.
512
513 Raises:
514 CommandException if invalid config encountered.
515 """
516 # Set OS process and python thread count as a function of options
517 # and config.
518 if self.parallel_operations:
519 process_count = boto.config.getint(
520 'GSUtil', 'parallel_process_count',
521 gslib.commands.config.DEFAULT_PARALLEL_PROCESS_COUNT)
522 if process_count < 1:
523 raise CommandException('Invalid parallel_process_count "%d".' %
524 process_count)
525 thread_count = boto.config.getint(
526 'GSUtil', 'parallel_thread_count',
527 gslib.commands.config.DEFAULT_PARALLEL_THREAD_COUNT)
528 if thread_count < 1:
529 raise CommandException('Invalid parallel_thread_count "%d".' %
530 thread_count)
531 else:
532 # If -m not specified, then assume 1 OS process and 1 Python thread.
533 process_count = 1
534 thread_count = 1
535
536 if self.debug:
537 self.THREADED_LOGGER.info('process count: %d', process_count)
538 self.THREADED_LOGGER.info('thread count: %d', thread_count)
539
540 if self.parallel_operations and process_count > 1:
541 procs = []
542 # If any shared attributes passed by caller, create a dictionary of
543 # shared memory variables for every element in the list of shared
544 # attributes.
545 shared_vars = None
546 if shared_attrs:
547 for name in shared_attrs:
548 if not shared_vars:
549 shared_vars = {}
550 shared_vars[name] = multiprocessing.Value('i', 0)
551 # Construct work queue for parceling out work to multiprocessing workers,
552 # setting the max queue length of 50k so we will block if workers don't
553 # empty the queue as fast as we can continue iterating over the bucket
554 # listing. This number may need tuning; it should be large enough to
555 # keep workers busy (overlapping bucket list next-page retrieval with
556 # operations being fed from the queue) but small enough that we don't
557 # overfill memory when runing across a slow network link.
558 work_queue = multiprocessing.Queue(50000)
559 for shard in range(process_count):
560 # Spawn a separate OS process for each shard.
561 if self.debug:
562 self.THREADED_LOGGER.info('spawning process for shard %d', shard)
563 p = multiprocessing.Process(target=self._ApplyThreads,
564 args=(func, work_queue, shard,
565 thread_count, thr_exc_handler,
566 shared_vars))
567 procs.append(p)
568 p.start()
569 # Feed all work into the queue being emptied by the workers.
570 for name_expansion_result in name_expansion_iterator:
571 work_queue.put(name_expansion_result)
572 # Send an EOF per worker.
573 for shard in range(process_count):
574 work_queue.put(_EOF_NAME_EXPANSION_RESULT)
575
576 # Wait for all spawned OS processes to finish.
577 failed_process_count = 0
578 for p in procs:
579 p.join()
580 # Count number of procs that returned non-zero exit code.
581 if p.exitcode != 0:
582 failed_process_count += 1
583 # Abort main process if one or more sub-processes failed.
584 if failed_process_count:
585 plural_str = ''
586 if failed_process_count > 1:
587 plural_str = 'es'
588 raise Exception('unexpected failure in %d sub-process%s, '
589 'aborting...' % (failed_process_count, plural_str))
590 # Propagate shared variables back to caller's attributes.
591 if shared_vars:
592 for (name, var) in shared_vars.items():
593 setattr(self, name, var.value)
594 else:
595 # Using just 1 process, so funnel results to _ApplyThreads using facade
596 # that makes NameExpansionIterator look like a Multiprocessing.Queue
597 # that sends one EOF once the iterator empties.
598 work_queue = NameExpansionIteratorQueue(name_expansion_iterator,
599 _EOF_NAME_EXPANSION_RESULT)
600 self._ApplyThreads(func, work_queue, 0, thread_count, thr_exc_handler,
601 None)
602
603 def HaveFileUris(self, args_to_check):
604 """Checks whether args_to_check contain any file URIs.
605
606 Args:
607 args_to_check: Command-line argument subset to check.
608
609 Returns:
610 True if args_to_check contains any file URIs.
611 """
612 for uri_str in args_to_check:
613 if uri_str.lower().startswith('file://') or uri_str.find(':') == -1:
614 return True
615 return False
616
617 ######################
618 # Private functions. #
619 ######################
620
621 def _HaveProviderUris(self, args_to_check):
622 """Checks whether args_to_check contains any provider URIs (like 'gs://').
623
624 Args:
625 args_to_check: Command-line argument subset to check.
626
627 Returns:
628 True if args_to_check contains any provider URIs.
629 """
630 for uri_str in args_to_check:
631 if re.match('^[a-z]+://$', uri_str):
632 return True
633 return False
634
635 def _ConfigureNoOpAuthIfNeeded(self):
636 """Sets up no-op auth handler if no boto credentials are configured."""
637 config = boto.config
638 if not util.HasConfiguredCredentials():
639 if self.config_file_list:
640 if (config.has_option('Credentials', 'gs_oauth2_refresh_token')
641 and not HAVE_OAUTH2):
642 raise CommandException(
643 'Your gsutil is configured with OAuth2 authentication '
644 'credentials.\nHowever, OAuth2 is only supported when running '
645 'under Python 2.6 or later\n(unless additional dependencies are '
646 'installed, see README for details); you are running Python %s.' %
647 sys.version)
648 raise CommandException('You have no storage service credentials in any '
649 'of the following boto config\nfiles. Please '
650 'add your credentials as described in the '
651 'gsutil README file, or else\nre-run '
652 '"gsutil config" to re-create a config '
653 'file:\n%s' % self.config_file_list)
654 else:
655 # With no boto config file the user can still access publicly readable
656 # buckets and objects.
657 from gslib import no_op_auth_plugin
658
659 def _ApplyThreads(self, func, work_queue, shard, num_threads,
660 thr_exc_handler=None, shared_vars=None):
661 """
662 Perform subset of required requests across a caller specified
663 number of parallel Python threads, which may be one, in which
664 case the requests are processed in the current thread.
665
666 Args:
667 func: Function to call for each request.
668 work_queue: shared queue of NameExpansionResult to process.
669 shard: Assigned subset (shard number) for this function.
670 num_threads: Number of Python threads to spawn to process this shard.
671 thr_exc_handler: Exception handler for ThreadPool class.
672 shared_vars: Dict of shared memory variables to be managed.
673 (only relevant, and non-None, if this function is
674 run in a separate OS process).
675 """
676 # Each OS process needs to establish its own set of connections to
677 # the server to avoid writes from different OS processes interleaving
678 # onto the same socket (and garbling the underlying SSL session).
679 # We ensure each process gets its own set of connections here by
680 # closing all connections in the storage provider connection pool.
681 connection_pool = StorageUri.provider_pool
682 if connection_pool:
683 for i in connection_pool:
684 connection_pool[i].connection.close()
685
686 if num_threads > 1:
687 thread_pool = ThreadPool(num_threads, thr_exc_handler)
688 try:
689 while True: # Loop until we hit EOF marker.
690 name_expansion_result = work_queue.get()
691 if name_expansion_result == _EOF_NAME_EXPANSION_RESULT:
692 break;
693 exp_src_uri = self.suri_builder.StorageUri(
694 name_expansion_result.GetExpandedUriStr())
695 if self.debug:
696 self.THREADED_LOGGER.info('process %d shard %d is handling uri %s',
697 os.getpid(), shard, exp_src_uri)
698 if (self.exclude_symlinks and exp_src_uri.is_file_uri()
699 and os.path.islink(exp_src_uri.object_name)):
700 self.THREADED_LOGGER.info('Skipping symbolic link %s...', exp_src_uri)
701 elif num_threads > 1:
702 thread_pool.AddTask(func, name_expansion_result)
703 else:
704 func(name_expansion_result)
705 # If any Python threads created, wait here for them to finish.
706 if num_threads > 1:
707 thread_pool.WaitCompletion()
708 finally:
709 if num_threads > 1:
710 thread_pool.Shutdown()
711 # If any shared variables (which means we are running in a separate OS
712 # process), increment value for each shared variable.
713 if shared_vars:
714 for (name, var) in shared_vars.items():
715 var.value += getattr(self, name)
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698