Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(14)

Side by Side Diff: third_party/gsutil/gslib/command.py

Issue 2280023003: depot_tools: Remove third_party/gsutil (Closed)
Patch Set: Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2010 Google Inc. All Rights Reserved.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """Base class for gsutil commands.
16
17 In addition to base class code, this file contains helpers that depend on base
18 class state (such as GetAclCommandHelper, which depends on self.gsutil_bin_dir,
19 self.bucket_storage_uri_class, etc.) In general, functions that depend on class
20 state and that are used by multiple commands belong in this file. Functions that
21 don't depend on class state belong in util.py, and non-shared helpers belong in
22 individual subclasses.
23 """
24
25 import boto
26 import getopt
27 import gslib
28 import logging
29 import multiprocessing
30 import os
31 import platform
32 import re
33 import sys
34 import wildcard_iterator
35 import xml.dom.minidom
36
37 from boto import handler
38 from boto.storage_uri import StorageUri
39 from getopt import GetoptError
40 from gslib import util
41 from gslib.exception import CommandException
42 from gslib.help_provider import HelpProvider
43 from gslib.name_expansion import NameExpansionIterator
44 from gslib.name_expansion import NameExpansionIteratorQueue
45 from gslib.project_id import ProjectIdHandler
46 from gslib.storage_uri_builder import StorageUriBuilder
47 from gslib.thread_pool import ThreadPool
48 from gslib.util import HAVE_OAUTH2
49 from gslib.util import NO_MAX
50
51 from gslib.wildcard_iterator import ContainsWildcard
52
53
54 def _ThreadedLogger():
55 """Creates a logger that resembles 'print' output, but is thread safe.
56
57 The logger will display all messages logged with level INFO or above. Log
58 propagation is disabled.
59
60 Returns:
61 A logger object.
62 """
63 log = logging.getLogger('threaded-logging')
64 log.propagate = False
65 log.setLevel(logging.INFO)
66 log_handler = logging.StreamHandler()
67 log_handler.setFormatter(logging.Formatter('%(message)s'))
68 log.addHandler(log_handler)
69 return log
70
71 # command_spec key constants.
72 COMMAND_NAME = 'command_name'
73 COMMAND_NAME_ALIASES = 'command_name_aliases'
74 MIN_ARGS = 'min_args'
75 MAX_ARGS = 'max_args'
76 SUPPORTED_SUB_ARGS = 'supported_sub_args'
77 FILE_URIS_OK = 'file_uri_ok'
78 PROVIDER_URIS_OK = 'provider_uri_ok'
79 URIS_START_ARG = 'uris_start_arg'
80 CONFIG_REQUIRED = 'config_required'
81
82 _EOF_NAME_EXPANSION_RESULT = ("EOF")
83
84
85 class Command(object):
86 # Global instance of a threaded logger object.
87 THREADED_LOGGER = _ThreadedLogger()
88
89 REQUIRED_SPEC_KEYS = [COMMAND_NAME]
90
91 # Each subclass must define the following map, minimally including the
92 # keys in REQUIRED_SPEC_KEYS; other values below will be used as defaults,
93 # although for readbility subclasses should specify the complete map.
94 command_spec = {
95 # Name of command.
96 COMMAND_NAME : None,
97 # List of command name aliases.
98 COMMAND_NAME_ALIASES : [],
99 # Min number of args required by this command.
100 MIN_ARGS : 0,
101 # Max number of args required by this command, or NO_MAX.
102 MAX_ARGS : NO_MAX,
103 # Getopt-style string specifying acceptable sub args.
104 SUPPORTED_SUB_ARGS : '',
105 # True if file URIs are acceptable for this command.
106 FILE_URIS_OK : False,
107 # True if provider-only URIs are acceptable for this command.
108 PROVIDER_URIS_OK : False,
109 # Index in args of first URI arg.
110 URIS_START_ARG : 0,
111 # True if must configure gsutil before running command.
112 CONFIG_REQUIRED : True,
113 }
114 _default_command_spec = command_spec
115 help_spec = HelpProvider.help_spec
116
117 """Define an empty test specification, which derived classes must populate.
118
119 This is a list of tuples containing the following values:
120
121 step_name - mnemonic name for test, displayed when test is run
122 cmd_line - shell command line to run test
123 expect_ret or None - expected return code from test (None means ignore)
124 (result_file, expect_file) or None - tuple of result file and expected
125 file to diff for additional test
126 verification beyond the return code
127 (None means no diff requested)
128 Notes:
129
130 - Setting expected_ret to None means there is no expectation and,
131 hence, any returned value will pass.
132
133 - Any occurrences of the string 'gsutil' in the cmd_line parameter
134 are expanded to the full path to the gsutil command under test.
135
136 - The cmd_line, result_file and expect_file parameters may
137 contain the following special substrings:
138
139 $Bn - converted to one of 10 unique-for-testing bucket names (n=0..9)
140 $On - converted to one of 10 unique-for-testing object names (n=0..9)
141 $Fn - converted to one of 10 unique-for-testing file names (n=0..9)
142 $G - converted to the directory where gsutil is installed. Useful for
143 referencing test data.
144
145 - The generated file names are full pathnames, whereas the generated
146 bucket and object names are simple relative names.
147
148 - Tests with a non-None result_file and expect_file automatically
149 trigger an implicit diff of the two files.
150
151 - These test specifications, in combination with the conversion strings
152 allow tests to be constructed parametrically. For example, here's an
153 annotated subset of a test_steps for the cp command:
154
155 # Copy local file to object, verify 0 return code.
156 ('simple cp', 'gsutil cp $F1 gs://$B1/$O1', 0, None, None),
157 # Copy uploaded object back to local file and diff vs. orig file.
158 ('verify cp', 'gsutil cp gs://$B1/$O1 $F2', 0, '$F2', '$F1'),
159
160 - After pattern substitution, the specs are run sequentially, in the
161 order in which they appear in the test_steps list.
162 """
163 test_steps = []
164
165 # Define a convenience property for command name, since it's used many places.
166 def _GetDefaultCommandName(self):
167 return self.command_spec[COMMAND_NAME]
168 command_name = property(_GetDefaultCommandName)
169
170 def __init__(self, command_runner, args, headers, debug, parallel_operations,
171 gsutil_bin_dir, boto_lib_dir, config_file_list, gsutil_ver,
172 bucket_storage_uri_class, test_method=None,
173 bypass_prodaccess=True):
174 """
175 Args:
176 command_runner: CommandRunner (for commands built atop other commands).
177 args: Command-line args (arg0 = actual arg, not command name ala bash).
178 headers: Dictionary containing optional HTTP headers to pass to boto.
179 debug: Debug level to pass in to boto connection (range 0..3).
180 parallel_operations: Should command operations be executed in parallel?
181 gsutil_bin_dir: Bin dir from which gsutil is running.
182 boto_lib_dir: Lib dir where boto runs.
183 config_file_list: Config file list returned by _GetBotoConfigFileList().
184 gsutil_ver: Version string of currently running gsutil command.
185 bucket_storage_uri_class: Class to instantiate for cloud StorageUris.
186 Settable for testing/mocking.
187 test_method: Optional general purpose method for testing purposes.
188 Application and semantics of this method will vary by
189 command and test type.
190 bypass_prodaccess: Boolean to ignore the existance of prodaccess.
191
192 Implementation note: subclasses shouldn't need to define an __init__
193 method, and instead depend on the shared initialization that happens
194 here. If you do define an __init__ method in a subclass you'll need to
195 explicitly call super().__init__(). But you're encouraged not to do this,
196 because it will make changing the __init__ interface more painful.
197 """
198 # Save class values from constructor params.
199 self.command_runner = command_runner
200 self.args = args
201 self.unparsed_args = args
202 self.headers = headers
203 self.debug = debug
204 self.parallel_operations = parallel_operations
205 self.gsutil_bin_dir = gsutil_bin_dir
206 self.boto_lib_dir = boto_lib_dir
207 self.config_file_list = config_file_list
208 self.gsutil_ver = gsutil_ver
209 self.bucket_storage_uri_class = bucket_storage_uri_class
210 self.test_method = test_method
211 self.exclude_symlinks = False
212 self.recursion_requested = False
213 self.all_versions = False
214 self.bypass_prodaccess = bypass_prodaccess
215
216 # Process sub-command instance specifications.
217 # First, ensure subclass implementation sets all required keys.
218 for k in self.REQUIRED_SPEC_KEYS:
219 if k not in self.command_spec or self.command_spec[k] is None:
220 raise CommandException('"%s" command implementation is missing %s '
221 'specification' % (self.command_name, k))
222 # Now override default command_spec with subclass-specified values.
223 tmp = self._default_command_spec
224 tmp.update(self.command_spec)
225 self.command_spec = tmp
226 del tmp
227
228 # Make sure command provides a test specification.
229 if not self.test_steps:
230 # TODO: Uncomment following lines when test feature is ready.
231 #raise CommandException('"%s" command implementation is missing test '
232 #'specification' % self.command_name)
233 pass
234
235 # Parse and validate args.
236 try:
237 (self.sub_opts, self.args) = getopt.getopt(
238 args, self.command_spec[SUPPORTED_SUB_ARGS])
239 except GetoptError, e:
240 raise CommandException('%s for "%s" command.' % (e.msg,
241 self.command_name))
242 if (len(self.args) < self.command_spec[MIN_ARGS]
243 or len(self.args) > self.command_spec[MAX_ARGS]):
244 raise CommandException('Wrong number of arguments for "%s" command.' %
245 self.command_name)
246 if (not self.command_spec[FILE_URIS_OK]
247 and self.HaveFileUris(self.args[self.command_spec[URIS_START_ARG]:])):
248 raise CommandException('"%s" command does not support "file://" URIs. '
249 'Did you mean to use a gs:// URI?' %
250 self.command_name)
251 if (not self.command_spec[PROVIDER_URIS_OK]
252 and self._HaveProviderUris(
253 self.args[self.command_spec[URIS_START_ARG]:])):
254 raise CommandException('"%s" command does not support provider-only '
255 'URIs.' % self.command_name)
256 if self.command_spec[CONFIG_REQUIRED]:
257 self._ConfigureNoOpAuthIfNeeded()
258
259 self.proj_id_handler = ProjectIdHandler()
260 self.suri_builder = StorageUriBuilder(debug, bucket_storage_uri_class)
261
262 # Cross-platform path to run gsutil binary.
263 self.gsutil_cmd = ''
264 # Cross-platform list containing gsutil path for use with subprocess.
265 self.gsutil_exec_list = []
266 # If running on Windows, invoke python interpreter explicitly.
267 if platform.system() == "Windows":
268 self.gsutil_cmd += 'python '
269 self.gsutil_exec_list += ['python']
270 # Add full path to gsutil to make sure we test the correct version.
271 self.gsutil_path = os.path.join(self.gsutil_bin_dir, 'gsutil')
272 self.gsutil_cmd += self.gsutil_path
273 self.gsutil_exec_list += [self.gsutil_path]
274
275 # We're treating recursion_requested like it's used by all commands, but
276 # only some of the commands accept the -R option.
277 if self.sub_opts:
278 for o, unused_a in self.sub_opts:
279 if o == '-r' or o == '-R':
280 self.recursion_requested = True
281 break
282
283 def WildcardIterator(self, uri_or_str, all_versions=False):
284 """
285 Helper to instantiate gslib.WildcardIterator. Args are same as
286 gslib.WildcardIterator interface, but this method fills in most of the
287 values from instance state.
288
289 Args:
290 uri_or_str: StorageUri or URI string naming wildcard objects to iterate.
291 """
292 return wildcard_iterator.wildcard_iterator(
293 uri_or_str, self.proj_id_handler,
294 bucket_storage_uri_class=self.bucket_storage_uri_class,
295 all_versions=all_versions,
296 headers=self.headers, debug=self.debug)
297
298 def RunCommand(self):
299 """Abstract function in base class. Subclasses must implement this. The
300 return value of this function will be used as the exit status of the
301 process, so subclass commands should return an integer exit code (0 for
302 success, a value in [1,255] for failure).
303 """
304 raise CommandException('Command %s is missing its RunCommand() '
305 'implementation' % self.command_name)
306
307 ############################################################
308 # Shared helper functions that depend on base class state. #
309 ############################################################
310
311 def UrisAreForSingleProvider(self, uri_args):
312 """Tests whether the uris are all for a single provider.
313
314 Returns: a StorageUri for one of the uris on success, None on failure.
315 """
316 provider = None
317 uri = None
318 for uri_str in uri_args:
319 # validate=False because we allow wildcard uris.
320 uri = boto.storage_uri(
321 uri_str, debug=self.debug, validate=False,
322 bucket_storage_uri_class=self.bucket_storage_uri_class)
323 if not provider:
324 provider = uri.scheme
325 elif uri.scheme != provider:
326 return None
327 return uri
328
329 def SetAclCommandHelper(self):
330 """
331 Common logic for setting ACLs. Sets the standard ACL or the default
332 object ACL depending on self.command_name.
333 """
334
335 acl_arg = self.args[0]
336 uri_args = self.args[1:]
337 # Disallow multi-provider setacl requests, because there are differences in
338 # the ACL models.
339 storage_uri = self.UrisAreForSingleProvider(uri_args)
340 if not storage_uri:
341 raise CommandException('"%s" command spanning providers not allowed.' %
342 self.command_name)
343
344 # Determine whether acl_arg names a file containing XML ACL text vs. the
345 # string name of a canned ACL.
346 if os.path.isfile(acl_arg):
347 acl_file = open(acl_arg, 'r')
348 acl_arg = acl_file.read()
349
350 # TODO: Remove this workaround when GCS allows
351 # whitespace in the Permission element on the server-side
352 acl_arg = re.sub(r'<Permission>\s*(\S+)\s*</Permission>',
353 r'<Permission>\1</Permission>', acl_arg)
354
355 acl_file.close()
356 self.canned = False
357 else:
358 # No file exists, so expect a canned ACL string.
359 canned_acls = storage_uri.canned_acls()
360 if acl_arg not in canned_acls:
361 raise CommandException('Invalid canned ACL "%s".' % acl_arg)
362 self.canned = True
363
364 # Used to track if any ACLs failed to be set.
365 self.everything_set_okay = True
366
367 def _SetAclExceptionHandler(e):
368 """Simple exception handler to allow post-completion status."""
369 self.THREADED_LOGGER.error(str(e))
370 self.everything_set_okay = False
371
372 def _SetAclFunc(name_expansion_result):
373 exp_src_uri = self.suri_builder.StorageUri(
374 name_expansion_result.GetExpandedUriStr())
375 # We don't do bucket operations multi-threaded (see comment below).
376 assert self.command_name != 'setdefacl'
377 self.THREADED_LOGGER.info('Setting ACL on %s...' %
378 name_expansion_result.expanded_uri_str)
379 if self.canned:
380 exp_src_uri.set_acl(acl_arg, exp_src_uri.object_name, False,
381 self.headers)
382 else:
383 exp_src_uri.set_xml_acl(acl_arg, exp_src_uri.object_name, False,
384 self.headers)
385
386 # If user specified -R option, convert any bucket args to bucket wildcards
387 # (e.g., gs://bucket/*), to prevent the operation from being applied to
388 # the buckets themselves.
389 if self.recursion_requested:
390 for i in range(len(uri_args)):
391 uri = self.suri_builder.StorageUri(uri_args[i])
392 if uri.names_bucket():
393 uri_args[i] = uri.clone_replace_name('*').uri
394 else:
395 # Handle bucket ACL setting operations single-threaded, because
396 # our threading machinery currently assumes it's working with objects
397 # (name_expansion_iterator), and normally we wouldn't expect users to need
398 # to set ACLs on huge numbers of buckets at once anyway.
399 for i in range(len(uri_args)):
400 uri_str = uri_args[i]
401 if self.suri_builder.StorageUri(uri_str).names_bucket():
402 self._RunSingleThreadedSetAcl(acl_arg, uri_args)
403 return
404
405 name_expansion_iterator = NameExpansionIterator(
406 self.command_name, self.proj_id_handler, self.headers, self.debug,
407 self.bucket_storage_uri_class, uri_args, self.recursion_requested,
408 self.recursion_requested, all_versions=self.all_versions)
409
410 # Perform requests in parallel (-m) mode, if requested, using
411 # configured number of parallel processes and threads. Otherwise,
412 # perform requests with sequential function calls in current process.
413 self.Apply(_SetAclFunc, name_expansion_iterator, _SetAclExceptionHandler)
414
415 if not self.everything_set_okay:
416 raise CommandException('ACLs for some objects could not be set.')
417
418 def _RunSingleThreadedSetAcl(self, acl_arg, uri_args):
419 some_matched = False
420 for uri_str in uri_args:
421 for blr in self.WildcardIterator(uri_str):
422 if blr.HasPrefix():
423 continue
424 some_matched = True
425 uri = blr.GetUri()
426 if self.command_name == 'setdefacl':
427 print 'Setting default object ACL on %s...' % uri
428 if self.canned:
429 uri.set_def_acl(acl_arg, uri.object_name, False, self.headers)
430 else:
431 uri.set_def_xml_acl(acl_arg, False, self.headers)
432 else:
433 print 'Setting ACL on %s...' % uri
434 if self.canned:
435 uri.set_acl(acl_arg, uri.object_name, False, self.headers)
436 else:
437 uri.set_xml_acl(acl_arg, uri.object_name, False, self.headers)
438 if not some_matched:
439 raise CommandException('No URIs matched')
440
441 def GetAclCommandHelper(self):
442 """Common logic for getting ACLs. Gets the standard ACL or the default
443 object ACL depending on self.command_name."""
444
445 # Resolve to just one object.
446 # Handle wildcard-less URI specially in case this is a version-specific
447 # URI, because WildcardIterator().IterUris() would lose the versioning info.
448 if not ContainsWildcard(self.args[0]):
449 uri = self.suri_builder.StorageUri(self.args[0])
450 else:
451 uris = list(self.WildcardIterator(self.args[0]).IterUris())
452 if len(uris) == 0:
453 raise CommandException('No URIs matched')
454 if len(uris) != 1:
455 raise CommandException('%s matched more than one URI, which is not '
456 'allowed by the %s command' % (self.args[0], self.command_name))
457 uri = uris[0]
458 if not uri.names_bucket() and not uri.names_object():
459 raise CommandException('"%s" command must specify a bucket or '
460 'object.' % self.command_name)
461 if self.command_name == 'getdefacl':
462 acl = uri.get_def_acl(False, self.headers)
463 else:
464 acl = uri.get_acl(False, self.headers)
465 # Pretty-print the XML to make it more easily human editable.
466 parsed_xml = xml.dom.minidom.parseString(acl.to_xml().encode('utf-8'))
467 print parsed_xml.toprettyxml(indent=' ')
468
469 def GetXmlSubresource(self, subresource, uri_arg):
470 """Print an xml subresource, e.g. logging, for a bucket/object.
471
472 Args:
473 subresource: The subresource name.
474 uri_arg: URI for the bucket/object. Wildcards will be expanded.
475
476 Raises:
477 CommandException: if errors encountered.
478 """
479 # Wildcarding is allowed but must resolve to just one bucket.
480 uris = list(self.WildcardIterator(uri_arg).IterUris())
481 if len(uris) != 1:
482 raise CommandException('Wildcards must resolve to exactly one item for '
483 'get %s' % subresource)
484 uri = uris[0]
485 xml_str = uri.get_subresource(subresource, False, self.headers)
486 # Pretty-print the XML to make it more easily human editable.
487 parsed_xml = xml.dom.minidom.parseString(xml_str.encode('utf-8'))
488 print parsed_xml.toprettyxml(indent=' ')
489
490 def Apply(self, func, name_expansion_iterator, thr_exc_handler,
491 shared_attrs=None):
492 """Dispatch input URI assignments across a pool of parallel OS
493 processes and/or Python threads, based on options (-m or not)
494 and settings in the user's config file. If non-parallel mode
495 or only one OS process requested, execute requests sequentially
496 in the current OS process.
497
498 Args:
499 func: Function to call to process each URI.
500 name_expansion_iterator: Iterator of NameExpansionResult.
501 thr_exc_handler: Exception handler for ThreadPool class.
502 shared_attrs: List of attributes to manage across sub-processes.
503
504 Raises:
505 CommandException if invalid config encountered.
506 """
507
508 # Set OS process and python thread count as a function of options
509 # and config.
510 if self.parallel_operations:
511 process_count = boto.config.getint(
512 'GSUtil', 'parallel_process_count',
513 gslib.commands.config.DEFAULT_PARALLEL_PROCESS_COUNT)
514 if process_count < 1:
515 raise CommandException('Invalid parallel_process_count "%d".' %
516 process_count)
517 thread_count = boto.config.getint(
518 'GSUtil', 'parallel_thread_count',
519 gslib.commands.config.DEFAULT_PARALLEL_THREAD_COUNT)
520 if thread_count < 1:
521 raise CommandException('Invalid parallel_thread_count "%d".' %
522 thread_count)
523 else:
524 # If -m not specified, then assume 1 OS process and 1 Python thread.
525 process_count = 1
526 thread_count = 1
527
528 if self.debug:
529 self.THREADED_LOGGER.info('process count: %d', process_count)
530 self.THREADED_LOGGER.info('thread count: %d', thread_count)
531
532 if self.parallel_operations and process_count > 1:
533 procs = []
534 # If any shared attributes passed by caller, create a dictionary of
535 # shared memory variables for every element in the list of shared
536 # attributes.
537 shared_vars = None
538 if shared_attrs:
539 for name in shared_attrs:
540 if not shared_vars:
541 shared_vars = {}
542 shared_vars[name] = multiprocessing.Value('i', 0)
543 # Construct work queue for parceling out work to multiprocessing workers,
544 # setting the max queue length of 50k so we will block if workers don't
545 # empty the queue as fast as we can continue iterating over the bucket
546 # listing. This number may need tuning; it should be large enough to
547 # keep workers busy (overlapping bucket list next-page retrieval with
548 # operations being fed from the queue) but small enough that we don't
549 # overfill memory when runing across a slow network link.
550 work_queue = multiprocessing.Queue(50000)
551 for shard in range(process_count):
552 # Spawn a separate OS process for each shard.
553 if self.debug:
554 self.THREADED_LOGGER.info('spawning process for shard %d', shard)
555 p = multiprocessing.Process(target=self._ApplyThreads,
556 args=(func, work_queue, shard,
557 thread_count, thr_exc_handler,
558 shared_vars))
559 procs.append(p)
560 p.start()
561
562 last_name_expansion_result = None
563 try:
564 # Feed all work into the queue being emptied by the workers.
565 for name_expansion_result in name_expansion_iterator:
566 last_name_expansion_result = name_expansion_result
567 work_queue.put(name_expansion_result)
568 except:
569 sys.stderr.write('Failed URI iteration. Last result (prior to '
570 'exception) was: %s\n'
571 % repr(last_name_expansion_result))
572 finally:
573 # We do all of the process cleanup in a finally cause in case the name
574 # expansion iterator throws an exception. This will send EOF to all the
575 # child processes and join them back into the parent process.
576
577 # Send an EOF per worker.
578 for shard in range(process_count):
579 work_queue.put(_EOF_NAME_EXPANSION_RESULT)
580
581 # Wait for all spawned OS processes to finish.
582 failed_process_count = 0
583 for p in procs:
584 p.join()
585 # Count number of procs that returned non-zero exit code.
586 if p.exitcode != 0:
587 failed_process_count += 1
588
589 # Propagate shared variables back to caller's attributes.
590 if shared_vars:
591 for (name, var) in shared_vars.items():
592 setattr(self, name, var.value)
593
594 # Abort main process if one or more sub-processes failed. Note that this
595 # is outside the finally clause, because we only want to raise a new
596 # exception if an exception wasn't already raised in the try clause above.
597 if failed_process_count:
598 plural_str = ''
599 if failed_process_count > 1:
600 plural_str = 'es'
601 raise Exception('unexpected failure in %d sub-process%s, '
602 'aborting...' % (failed_process_count, plural_str))
603
604 else:
605 # Using just 1 process, so funnel results to _ApplyThreads using facade
606 # that makes NameExpansionIterator look like a Multiprocessing.Queue
607 # that sends one EOF once the iterator empties.
608 work_queue = NameExpansionIteratorQueue(name_expansion_iterator,
609 _EOF_NAME_EXPANSION_RESULT)
610 self._ApplyThreads(func, work_queue, 0, thread_count, thr_exc_handler,
611 None)
612
613 def HaveFileUris(self, args_to_check):
614 """Checks whether args_to_check contain any file URIs.
615
616 Args:
617 args_to_check: Command-line argument subset to check.
618
619 Returns:
620 True if args_to_check contains any file URIs.
621 """
622 for uri_str in args_to_check:
623 if uri_str.lower().startswith('file://') or uri_str.find(':') == -1:
624 return True
625 return False
626
627 ######################
628 # Private functions. #
629 ######################
630
631 def _HaveProviderUris(self, args_to_check):
632 """Checks whether args_to_check contains any provider URIs (like 'gs://').
633
634 Args:
635 args_to_check: Command-line argument subset to check.
636
637 Returns:
638 True if args_to_check contains any provider URIs.
639 """
640 for uri_str in args_to_check:
641 if re.match('^[a-z]+://$', uri_str):
642 return True
643 return False
644
645 def _ConfigureNoOpAuthIfNeeded(self):
646 """Sets up no-op auth handler if no boto credentials are configured."""
647 config = boto.config
648 if not util.HasConfiguredCredentials(self.bypass_prodaccess):
649 if self.config_file_list:
650 if (config.has_option('Credentials', 'gs_oauth2_refresh_token')
651 and not HAVE_OAUTH2):
652 raise CommandException(
653 'Your gsutil is configured with OAuth2 authentication '
654 'credentials.\nHowever, OAuth2 is only supported when running '
655 'under Python 2.6 or later\n(unless additional dependencies are '
656 'installed, see README for details); you are running Python %s.' %
657 sys.version)
658 raise CommandException('You have no storage service credentials in any '
659 'of the following boto config\nfiles. Please '
660 'add your credentials as described in the '
661 'gsutil README file, or else\nre-run '
662 '"gsutil config" to re-create a config '
663 'file:\n%s' % self.config_file_list)
664 else:
665 # With no boto config file the user can still access publicly readable
666 # buckets and objects.
667 from gslib import no_op_auth_plugin
668
669 def _ApplyThreads(self, func, work_queue, shard, num_threads,
670 thr_exc_handler=None, shared_vars=None):
671 """
672 Perform subset of required requests across a caller specified
673 number of parallel Python threads, which may be one, in which
674 case the requests are processed in the current thread.
675
676 Args:
677 func: Function to call for each request.
678 work_queue: shared queue of NameExpansionResult to process.
679 shard: Assigned subset (shard number) for this function.
680 num_threads: Number of Python threads to spawn to process this shard.
681 thr_exc_handler: Exception handler for ThreadPool class.
682 shared_vars: Dict of shared memory variables to be managed.
683 (only relevant, and non-None, if this function is
684 run in a separate OS process).
685 """
686 # Each OS process needs to establish its own set of connections to
687 # the server to avoid writes from different OS processes interleaving
688 # onto the same socket (and garbling the underlying SSL session).
689 # We ensure each process gets its own set of connections here by
690 # closing all connections in the storage provider connection pool.
691 connection_pool = StorageUri.provider_pool
692 if connection_pool:
693 for i in connection_pool:
694 connection_pool[i].connection.close()
695
696 if num_threads > 1:
697 thread_pool = ThreadPool(num_threads, thr_exc_handler)
698 try:
699 while True: # Loop until we hit EOF marker.
700 name_expansion_result = work_queue.get()
701 if name_expansion_result == _EOF_NAME_EXPANSION_RESULT:
702 break
703 exp_src_uri = self.suri_builder.StorageUri(
704 name_expansion_result.GetExpandedUriStr())
705 if self.debug:
706 self.THREADED_LOGGER.info('process %d shard %d is handling uri %s',
707 os.getpid(), shard, exp_src_uri)
708 if (self.exclude_symlinks and exp_src_uri.is_file_uri()
709 and os.path.islink(exp_src_uri.object_name)):
710 self.THREADED_LOGGER.info('Skipping symbolic link %s...', exp_src_uri)
711 elif num_threads > 1:
712 thread_pool.AddTask(func, name_expansion_result)
713 else:
714 func(name_expansion_result)
715 # If any Python threads created, wait here for them to finish.
716 if num_threads > 1:
717 thread_pool.WaitCompletion()
718 finally:
719 if num_threads > 1:
720 thread_pool.Shutdown()
721 # If any shared variables (which means we are running in a separate OS
722 # process), increment value for each shared variable.
723 if shared_vars:
724 for (name, var) in shared_vars.items():
725 var.value += getattr(self, name)
OLDNEW
« no previous file with comments | « third_party/gsutil/gslib/bucket_listing_ref.py ('k') | third_party/gsutil/gslib/command_runner.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698