OLD | NEW |
(Empty) | |
| 1 # Copyright 2010 Google Inc. All Rights Reserved. |
| 2 # |
| 3 # Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 # you may not use this file except in compliance with the License. |
| 5 # You may obtain a copy of the License at |
| 6 # |
| 7 # http://www.apache.org/licenses/LICENSE-2.0 |
| 8 # |
| 9 # Unless required by applicable law or agreed to in writing, software |
| 10 # distributed under the License is distributed on an "AS IS" BASIS, |
| 11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 # See the License for the specific language governing permissions and |
| 13 # limitations under the License. |
| 14 |
| 15 """Base class for gsutil commands. |
| 16 |
| 17 In addition to base class code, this file contains helpers that depend on base |
| 18 class state (such as GetAclCommandHelper, which depends on self.gsutil_bin_dir, |
| 19 self.bucket_storage_uri_class, etc.) In general, functions that depend on class |
| 20 state and that are used by multiple commands belong in this file. Functions that |
| 21 don't depend on class state belong in util.py, and non-shared helpers belong in |
| 22 individual subclasses. |
| 23 """ |
| 24 |
| 25 import boto |
| 26 import getopt |
| 27 import gslib |
| 28 import logging |
| 29 import multiprocessing |
| 30 import os |
| 31 import platform |
| 32 import re |
| 33 import sys |
| 34 import wildcard_iterator |
| 35 import xml.dom.minidom |
| 36 |
| 37 from boto import handler |
| 38 from boto.storage_uri import StorageUri |
| 39 from getopt import GetoptError |
| 40 from gslib import util |
| 41 from gslib.exception import CommandException |
| 42 from gslib.help_provider import HelpProvider |
| 43 from gslib.name_expansion import NameExpansionIterator |
| 44 from gslib.name_expansion import NameExpansionIteratorQueue |
| 45 from gslib.project_id import ProjectIdHandler |
| 46 from gslib.storage_uri_builder import StorageUriBuilder |
| 47 from gslib.thread_pool import ThreadPool |
| 48 from gslib.util import HAVE_OAUTH2 |
| 49 from gslib.util import NO_MAX |
| 50 |
| 51 from gslib.wildcard_iterator import ContainsWildcard |
| 52 |
| 53 |
| 54 def _ThreadedLogger(): |
| 55 """Creates a logger that resembles 'print' output, but is thread safe. |
| 56 |
| 57 The logger will display all messages logged with level INFO or above. Log |
| 58 propagation is disabled. |
| 59 |
| 60 Returns: |
| 61 A logger object. |
| 62 """ |
| 63 log = logging.getLogger('threaded-logging') |
| 64 log.propagate = False |
| 65 log.setLevel(logging.INFO) |
| 66 log_handler = logging.StreamHandler() |
| 67 log_handler.setFormatter(logging.Formatter('%(message)s')) |
| 68 log.addHandler(log_handler) |
| 69 return log |
| 70 |
| 71 # command_spec key constants. |
| 72 COMMAND_NAME = 'command_name' |
| 73 COMMAND_NAME_ALIASES = 'command_name_aliases' |
| 74 MIN_ARGS = 'min_args' |
| 75 MAX_ARGS = 'max_args' |
| 76 SUPPORTED_SUB_ARGS = 'supported_sub_args' |
| 77 FILE_URIS_OK = 'file_uri_ok' |
| 78 PROVIDER_URIS_OK = 'provider_uri_ok' |
| 79 URIS_START_ARG = 'uris_start_arg' |
| 80 CONFIG_REQUIRED = 'config_required' |
| 81 |
| 82 _EOF_NAME_EXPANSION_RESULT = ("EOF") |
| 83 |
| 84 |
| 85 class Command(object): |
| 86 # Global instance of a threaded logger object. |
| 87 THREADED_LOGGER = _ThreadedLogger() |
| 88 |
| 89 REQUIRED_SPEC_KEYS = [COMMAND_NAME] |
| 90 |
| 91 # Each subclass must define the following map, minimally including the |
| 92 # keys in REQUIRED_SPEC_KEYS; other values below will be used as defaults, |
| 93 # although for readbility subclasses should specify the complete map. |
| 94 command_spec = { |
| 95 # Name of command. |
| 96 COMMAND_NAME : None, |
| 97 # List of command name aliases. |
| 98 COMMAND_NAME_ALIASES : [], |
| 99 # Min number of args required by this command. |
| 100 MIN_ARGS : 0, |
| 101 # Max number of args required by this command, or NO_MAX. |
| 102 MAX_ARGS : NO_MAX, |
| 103 # Getopt-style string specifying acceptable sub args. |
| 104 SUPPORTED_SUB_ARGS : '', |
| 105 # True if file URIs are acceptable for this command. |
| 106 FILE_URIS_OK : False, |
| 107 # True if provider-only URIs are acceptable for this command. |
| 108 PROVIDER_URIS_OK : False, |
| 109 # Index in args of first URI arg. |
| 110 URIS_START_ARG : 0, |
| 111 # True if must configure gsutil before running command. |
| 112 CONFIG_REQUIRED : True, |
| 113 } |
| 114 _default_command_spec = command_spec |
| 115 help_spec = HelpProvider.help_spec |
| 116 |
| 117 """Define an empty test specification, which derived classes must populate. |
| 118 |
| 119 This is a list of tuples containing the following values: |
| 120 |
| 121 step_name - mnemonic name for test, displayed when test is run |
| 122 cmd_line - shell command line to run test |
| 123 expect_ret or None - expected return code from test (None means ignore) |
| 124 (result_file, expect_file) or None - tuple of result file and expected |
| 125 file to diff for additional test |
| 126 verification beyond the return code |
| 127 (None means no diff requested) |
| 128 Notes: |
| 129 |
| 130 - Setting expected_ret to None means there is no expectation and, |
| 131 hence, any returned value will pass. |
| 132 |
| 133 - Any occurrences of the string 'gsutil' in the cmd_line parameter |
| 134 are expanded to the full path to the gsutil command under test. |
| 135 |
| 136 - The cmd_line, result_file and expect_file parameters may |
| 137 contain the following special substrings: |
| 138 |
| 139 $Bn - converted to one of 10 unique-for-testing bucket names (n=0..9) |
| 140 $On - converted to one of 10 unique-for-testing object names (n=0..9) |
| 141 $Fn - converted to one of 10 unique-for-testing file names (n=0..9) |
| 142 $G - converted to the directory where gsutil is installed. Useful for |
| 143 referencing test data. |
| 144 |
| 145 - The generated file names are full pathnames, whereas the generated |
| 146 bucket and object names are simple relative names. |
| 147 |
| 148 - Tests with a non-None result_file and expect_file automatically |
| 149 trigger an implicit diff of the two files. |
| 150 |
| 151 - These test specifications, in combination with the conversion strings |
| 152 allow tests to be constructed parametrically. For example, here's an |
| 153 annotated subset of a test_steps for the cp command: |
| 154 |
| 155 # Copy local file to object, verify 0 return code. |
| 156 ('simple cp', 'gsutil cp $F1 gs://$B1/$O1', 0, None, None), |
| 157 # Copy uploaded object back to local file and diff vs. orig file. |
| 158 ('verify cp', 'gsutil cp gs://$B1/$O1 $F2', 0, '$F2', '$F1'), |
| 159 |
| 160 - After pattern substitution, the specs are run sequentially, in the |
| 161 order in which they appear in the test_steps list. |
| 162 """ |
| 163 test_steps = [] |
| 164 |
| 165 # Define a convenience property for command name, since it's used many places. |
| 166 def _GetDefaultCommandName(self): |
| 167 return self.command_spec[COMMAND_NAME] |
| 168 command_name = property(_GetDefaultCommandName) |
| 169 |
| 170 def __init__(self, command_runner, args, headers, debug, parallel_operations, |
| 171 gsutil_bin_dir, boto_lib_dir, config_file_list, gsutil_ver, |
| 172 bucket_storage_uri_class, test_method=None): |
| 173 """ |
| 174 Args: |
| 175 command_runner: CommandRunner (for commands built atop other commands). |
| 176 args: Command-line args (arg0 = actual arg, not command name ala bash). |
| 177 headers: Dictionary containing optional HTTP headers to pass to boto. |
| 178 debug: Debug level to pass in to boto connection (range 0..3). |
| 179 parallel_operations: Should command operations be executed in parallel? |
| 180 gsutil_bin_dir: Bin dir from which gsutil is running. |
| 181 boto_lib_dir: Lib dir where boto runs. |
| 182 config_file_list: Config file list returned by _GetBotoConfigFileList(). |
| 183 gsutil_ver: Version string of currently running gsutil command. |
| 184 bucket_storage_uri_class: Class to instantiate for cloud StorageUris. |
| 185 Settable for testing/mocking. |
| 186 test_method: Optional general purpose method for testing purposes. |
| 187 Application and semantics of this method will vary by |
| 188 command and test type. |
| 189 |
| 190 Implementation note: subclasses shouldn't need to define an __init__ |
| 191 method, and instead depend on the shared initialization that happens |
| 192 here. If you do define an __init__ method in a subclass you'll need to |
| 193 explicitly call super().__init__(). But you're encouraged not to do this, |
| 194 because it will make changing the __init__ interface more painful. |
| 195 """ |
| 196 # Save class values from constructor params. |
| 197 self.command_runner = command_runner |
| 198 self.args = args |
| 199 self.unparsed_args = args |
| 200 self.headers = headers |
| 201 self.debug = debug |
| 202 self.parallel_operations = parallel_operations |
| 203 self.gsutil_bin_dir = gsutil_bin_dir |
| 204 self.boto_lib_dir = boto_lib_dir |
| 205 self.config_file_list = config_file_list |
| 206 self.gsutil_ver = gsutil_ver |
| 207 self.bucket_storage_uri_class = bucket_storage_uri_class |
| 208 self.test_method = test_method |
| 209 self.exclude_symlinks = False |
| 210 self.recursion_requested = False |
| 211 self.all_versions = False |
| 212 |
| 213 # Process sub-command instance specifications. |
| 214 # First, ensure subclass implementation sets all required keys. |
| 215 for k in self.REQUIRED_SPEC_KEYS: |
| 216 if k not in self.command_spec or self.command_spec[k] is None: |
| 217 raise CommandException('"%s" command implementation is missing %s ' |
| 218 'specification' % (self.command_name, k)) |
| 219 # Now override default command_spec with subclass-specified values. |
| 220 tmp = self._default_command_spec |
| 221 tmp.update(self.command_spec) |
| 222 self.command_spec = tmp |
| 223 del tmp |
| 224 |
| 225 # Make sure command provides a test specification. |
| 226 if not self.test_steps: |
| 227 # TODO: Uncomment following lines when test feature is ready. |
| 228 #raise CommandException('"%s" command implementation is missing test ' |
| 229 #'specification' % self.command_name) |
| 230 pass |
| 231 |
| 232 # Parse and validate args. |
| 233 try: |
| 234 (self.sub_opts, self.args) = getopt.getopt( |
| 235 args, self.command_spec[SUPPORTED_SUB_ARGS]) |
| 236 except GetoptError, e: |
| 237 raise CommandException('%s for "%s" command.' % (e.msg, |
| 238 self.command_name)) |
| 239 if (len(self.args) < self.command_spec[MIN_ARGS] |
| 240 or len(self.args) > self.command_spec[MAX_ARGS]): |
| 241 raise CommandException('Wrong number of arguments for "%s" command.' % |
| 242 self.command_name) |
| 243 if (not self.command_spec[FILE_URIS_OK] |
| 244 and self.HaveFileUris(self.args[self.command_spec[URIS_START_ARG]:])): |
| 245 raise CommandException('"%s" command does not support "file://" URIs. ' |
| 246 'Did you mean to use a gs:// URI?' % |
| 247 self.command_name) |
| 248 if (not self.command_spec[PROVIDER_URIS_OK] |
| 249 and self._HaveProviderUris( |
| 250 self.args[self.command_spec[URIS_START_ARG]:])): |
| 251 raise CommandException('"%s" command does not support provider-only ' |
| 252 'URIs.' % self.command_name) |
| 253 if self.command_spec[CONFIG_REQUIRED]: |
| 254 self._ConfigureNoOpAuthIfNeeded() |
| 255 |
| 256 self.proj_id_handler = ProjectIdHandler() |
| 257 self.suri_builder = StorageUriBuilder(debug, bucket_storage_uri_class) |
| 258 |
| 259 # Cross-platform path to run gsutil binary. |
| 260 self.gsutil_cmd = '' |
| 261 # Cross-platform list containing gsutil path for use with subprocess. |
| 262 self.gsutil_exec_list = [] |
| 263 # If running on Windows, invoke python interpreter explicitly. |
| 264 if platform.system() == "Windows": |
| 265 self.gsutil_cmd += 'python ' |
| 266 self.gsutil_exec_list += ['python'] |
| 267 # Add full path to gsutil to make sure we test the correct version. |
| 268 self.gsutil_path = os.path.join(self.gsutil_bin_dir, 'gsutil') |
| 269 self.gsutil_cmd += self.gsutil_path |
| 270 self.gsutil_exec_list += [self.gsutil_path] |
| 271 |
| 272 # We're treating recursion_requested like it's used by all commands, but |
| 273 # only some of the commands accept the -R option. |
| 274 if self.sub_opts: |
| 275 for o, unused_a in self.sub_opts: |
| 276 if o == '-r' or o == '-R': |
| 277 self.recursion_requested = True |
| 278 break |
| 279 |
| 280 def WildcardIterator(self, uri_or_str, all_versions=False): |
| 281 """ |
| 282 Helper to instantiate gslib.WildcardIterator. Args are same as |
| 283 gslib.WildcardIterator interface, but this method fills in most of the |
| 284 values from instance state. |
| 285 |
| 286 Args: |
| 287 uri_or_str: StorageUri or URI string naming wildcard objects to iterate. |
| 288 """ |
| 289 return wildcard_iterator.wildcard_iterator( |
| 290 uri_or_str, self.proj_id_handler, |
| 291 bucket_storage_uri_class=self.bucket_storage_uri_class, |
| 292 all_versions=all_versions, |
| 293 headers=self.headers, debug=self.debug) |
| 294 |
| 295 def RunCommand(self): |
| 296 """Abstract function in base class. Subclasses must implement this. The |
| 297 return value of this function will be used as the exit status of the |
| 298 process, so subclass commands should return an integer exit code (0 for |
| 299 success, a value in [1,255] for failure). |
| 300 """ |
| 301 raise CommandException('Command %s is missing its RunCommand() ' |
| 302 'implementation' % self.command_name) |
| 303 |
| 304 ############################################################ |
| 305 # Shared helper functions that depend on base class state. # |
| 306 ############################################################ |
| 307 |
| 308 def UrisAreForSingleProvider(self, uri_args): |
| 309 """Tests whether the uris are all for a single provider. |
| 310 |
| 311 Returns: a StorageUri for one of the uris on success, None on failure. |
| 312 """ |
| 313 provider = None |
| 314 uri = None |
| 315 for uri_str in uri_args: |
| 316 # validate=False because we allow wildcard uris. |
| 317 uri = boto.storage_uri( |
| 318 uri_str, debug=self.debug, validate=False, |
| 319 bucket_storage_uri_class=self.bucket_storage_uri_class) |
| 320 if not provider: |
| 321 provider = uri.scheme |
| 322 elif uri.scheme != provider: |
| 323 return None |
| 324 return uri |
| 325 |
| 326 def SetAclCommandHelper(self): |
| 327 """ |
| 328 Common logic for setting ACLs. Sets the standard ACL or the default |
| 329 object ACL depending on self.command_name. |
| 330 """ |
| 331 |
| 332 acl_arg = self.args[0] |
| 333 uri_args = self.args[1:] |
| 334 # Disallow multi-provider setacl requests, because there are differences in |
| 335 # the ACL models. |
| 336 storage_uri = self.UrisAreForSingleProvider(uri_args) |
| 337 if not storage_uri: |
| 338 raise CommandException('"%s" command spanning providers not allowed.' % |
| 339 self.command_name) |
| 340 |
| 341 # Determine whether acl_arg names a file containing XML ACL text vs. the |
| 342 # string name of a canned ACL. |
| 343 if os.path.isfile(acl_arg): |
| 344 acl_file = open(acl_arg, 'r') |
| 345 acl_arg = acl_file.read() |
| 346 |
| 347 # TODO: Remove this workaround when GCS allows |
| 348 # whitespace in the Permission element on the server-side |
| 349 acl_arg = re.sub(r'<Permission>\s*(\S+)\s*</Permission>', |
| 350 r'<Permission>\1</Permission>', acl_arg) |
| 351 |
| 352 acl_file.close() |
| 353 self.canned = False |
| 354 else: |
| 355 # No file exists, so expect a canned ACL string. |
| 356 canned_acls = storage_uri.canned_acls() |
| 357 if acl_arg not in canned_acls: |
| 358 raise CommandException('Invalid canned ACL "%s".' % acl_arg) |
| 359 self.canned = True |
| 360 |
| 361 # Used to track if any ACLs failed to be set. |
| 362 self.everything_set_okay = True |
| 363 |
| 364 def _SetAclExceptionHandler(e): |
| 365 """Simple exception handler to allow post-completion status.""" |
| 366 self.THREADED_LOGGER.error(str(e)) |
| 367 self.everything_set_okay = False |
| 368 |
| 369 def _SetAclFunc(name_expansion_result): |
| 370 exp_src_uri = self.suri_builder.StorageUri( |
| 371 name_expansion_result.GetExpandedUriStr()) |
| 372 # We don't do bucket operations multi-threaded (see comment below). |
| 373 assert self.command_name != 'setdefacl' |
| 374 self.THREADED_LOGGER.info('Setting ACL on %s...' % |
| 375 name_expansion_result.expanded_uri_str) |
| 376 if self.canned: |
| 377 exp_src_uri.set_acl(acl_arg, exp_src_uri.object_name, False, |
| 378 self.headers) |
| 379 else: |
| 380 exp_src_uri.set_xml_acl(acl_arg, exp_src_uri.object_name, False, |
| 381 self.headers) |
| 382 |
| 383 # If user specified -R option, convert any bucket args to bucket wildcards |
| 384 # (e.g., gs://bucket/*), to prevent the operation from being applied to |
| 385 # the buckets themselves. |
| 386 if self.recursion_requested: |
| 387 for i in range(len(uri_args)): |
| 388 uri = self.suri_builder.StorageUri(uri_args[i]) |
| 389 if uri.names_bucket(): |
| 390 uri_args[i] = uri.clone_replace_name('*').uri |
| 391 else: |
| 392 # Handle bucket ACL setting operations single-threaded, because |
| 393 # our threading machinery currently assumes it's working with objects |
| 394 # (name_expansion_iterator), and normally we wouldn't expect users to need |
| 395 # to set ACLs on huge numbers of buckets at once anyway. |
| 396 for i in range(len(uri_args)): |
| 397 uri_str = uri_args[i] |
| 398 if self.suri_builder.StorageUri(uri_str).names_bucket(): |
| 399 self._RunSingleThreadedSetAcl(acl_arg, uri_args) |
| 400 return |
| 401 |
| 402 name_expansion_iterator = NameExpansionIterator( |
| 403 self.command_name, self.proj_id_handler, self.headers, self.debug, |
| 404 self.bucket_storage_uri_class, uri_args, self.recursion_requested, |
| 405 self.recursion_requested, all_versions=self.all_versions) |
| 406 |
| 407 # Perform requests in parallel (-m) mode, if requested, using |
| 408 # configured number of parallel processes and threads. Otherwise, |
| 409 # perform requests with sequential function calls in current process. |
| 410 self.Apply(_SetAclFunc, name_expansion_iterator, _SetAclExceptionHandler) |
| 411 |
| 412 if not self.everything_set_okay: |
| 413 raise CommandException('ACLs for some objects could not be set.') |
| 414 |
| 415 def _RunSingleThreadedSetAcl(self, acl_arg, uri_args): |
| 416 some_matched = False |
| 417 for uri_str in uri_args: |
| 418 for blr in self.WildcardIterator(uri_str): |
| 419 if blr.HasPrefix(): |
| 420 continue |
| 421 some_matched = True |
| 422 uri = blr.GetUri() |
| 423 if self.command_name == 'setdefacl': |
| 424 print 'Setting default object ACL on %s...' % uri |
| 425 if self.canned: |
| 426 uri.set_def_acl(acl_arg, uri.object_name, False, self.headers) |
| 427 else: |
| 428 uri.set_def_xml_acl(acl_arg, False, self.headers) |
| 429 else: |
| 430 print 'Setting ACL on %s...' % uri |
| 431 if self.canned: |
| 432 uri.set_acl(acl_arg, uri.object_name, False, self.headers) |
| 433 else: |
| 434 uri.set_xml_acl(acl_arg, uri.object_name, False, self.headers) |
| 435 if not some_matched: |
| 436 raise CommandException('No URIs matched') |
| 437 |
| 438 def GetAclCommandHelper(self): |
| 439 """Common logic for getting ACLs. Gets the standard ACL or the default |
| 440 object ACL depending on self.command_name.""" |
| 441 |
| 442 # Resolve to just one object. |
| 443 # Handle wildcard-less URI specially in case this is a version-specific |
| 444 # URI, because WildcardIterator().IterUris() would lose the versioning info. |
| 445 if not ContainsWildcard(self.args[0]): |
| 446 uri = self.suri_builder.StorageUri(self.args[0]) |
| 447 else: |
| 448 uris = list(self.WildcardIterator(self.args[0]).IterUris()) |
| 449 if len(uris) == 0: |
| 450 raise CommandException('No URIs matched') |
| 451 if len(uris) != 1: |
| 452 raise CommandException('%s matched more than one URI, which is not ' |
| 453 'allowed by the %s command' % (self.args[0], self.command_name)) |
| 454 uri = uris[0] |
| 455 if not uri.names_bucket() and not uri.names_object(): |
| 456 raise CommandException('"%s" command must specify a bucket or ' |
| 457 'object.' % self.command_name) |
| 458 if self.command_name == 'getdefacl': |
| 459 acl = uri.get_def_acl(False, self.headers) |
| 460 else: |
| 461 acl = uri.get_acl(False, self.headers) |
| 462 # Pretty-print the XML to make it more easily human editable. |
| 463 parsed_xml = xml.dom.minidom.parseString(acl.to_xml().encode('utf-8')) |
| 464 print parsed_xml.toprettyxml(indent=' ') |
| 465 |
| 466 def GetXmlSubresource(self, subresource, uri_arg): |
| 467 """Print an xml subresource, e.g. logging, for a bucket/object. |
| 468 |
| 469 Args: |
| 470 subresource: The subresource name. |
| 471 uri_arg: URI for the bucket/object. Wildcards will be expanded. |
| 472 |
| 473 Raises: |
| 474 CommandException: if errors encountered. |
| 475 """ |
| 476 # Wildcarding is allowed but must resolve to just one bucket. |
| 477 uris = list(self.WildcardIterator(uri_arg).IterUris()) |
| 478 if len(uris) != 1: |
| 479 raise CommandException('Wildcards must resolve to exactly one item for ' |
| 480 'get %s' % subresource) |
| 481 uri = uris[0] |
| 482 xml_str = uri.get_subresource(subresource, False, self.headers) |
| 483 # Pretty-print the XML to make it more easily human editable. |
| 484 parsed_xml = xml.dom.minidom.parseString(xml_str.encode('utf-8')) |
| 485 print parsed_xml.toprettyxml(indent=' ') |
| 486 |
| 487 def Apply(self, func, name_expansion_iterator, thr_exc_handler, |
| 488 shared_attrs=None): |
| 489 """Dispatch input URI assignments across a pool of parallel OS |
| 490 processes and/or Python threads, based on options (-m or not) |
| 491 and settings in the user's config file. If non-parallel mode |
| 492 or only one OS process requested, execute requests sequentially |
| 493 in the current OS process. |
| 494 |
| 495 Args: |
| 496 func: Function to call to process each URI. |
| 497 name_expansion_iterator: Iterator of NameExpansionResult. |
| 498 thr_exc_handler: Exception handler for ThreadPool class. |
| 499 shared_attrs: List of attributes to manage across sub-processes. |
| 500 |
| 501 Raises: |
| 502 CommandException if invalid config encountered. |
| 503 """ |
| 504 |
| 505 # Set OS process and python thread count as a function of options |
| 506 # and config. |
| 507 if self.parallel_operations: |
| 508 process_count = boto.config.getint( |
| 509 'GSUtil', 'parallel_process_count', |
| 510 gslib.commands.config.DEFAULT_PARALLEL_PROCESS_COUNT) |
| 511 if process_count < 1: |
| 512 raise CommandException('Invalid parallel_process_count "%d".' % |
| 513 process_count) |
| 514 thread_count = boto.config.getint( |
| 515 'GSUtil', 'parallel_thread_count', |
| 516 gslib.commands.config.DEFAULT_PARALLEL_THREAD_COUNT) |
| 517 if thread_count < 1: |
| 518 raise CommandException('Invalid parallel_thread_count "%d".' % |
| 519 thread_count) |
| 520 else: |
| 521 # If -m not specified, then assume 1 OS process and 1 Python thread. |
| 522 process_count = 1 |
| 523 thread_count = 1 |
| 524 |
| 525 if self.debug: |
| 526 self.THREADED_LOGGER.info('process count: %d', process_count) |
| 527 self.THREADED_LOGGER.info('thread count: %d', thread_count) |
| 528 |
| 529 if self.parallel_operations and process_count > 1: |
| 530 procs = [] |
| 531 # If any shared attributes passed by caller, create a dictionary of |
| 532 # shared memory variables for every element in the list of shared |
| 533 # attributes. |
| 534 shared_vars = None |
| 535 if shared_attrs: |
| 536 for name in shared_attrs: |
| 537 if not shared_vars: |
| 538 shared_vars = {} |
| 539 shared_vars[name] = multiprocessing.Value('i', 0) |
| 540 # Construct work queue for parceling out work to multiprocessing workers, |
| 541 # setting the max queue length of 50k so we will block if workers don't |
| 542 # empty the queue as fast as we can continue iterating over the bucket |
| 543 # listing. This number may need tuning; it should be large enough to |
| 544 # keep workers busy (overlapping bucket list next-page retrieval with |
| 545 # operations being fed from the queue) but small enough that we don't |
| 546 # overfill memory when runing across a slow network link. |
| 547 work_queue = multiprocessing.Queue(50000) |
| 548 for shard in range(process_count): |
| 549 # Spawn a separate OS process for each shard. |
| 550 if self.debug: |
| 551 self.THREADED_LOGGER.info('spawning process for shard %d', shard) |
| 552 p = multiprocessing.Process(target=self._ApplyThreads, |
| 553 args=(func, work_queue, shard, |
| 554 thread_count, thr_exc_handler, |
| 555 shared_vars)) |
| 556 procs.append(p) |
| 557 p.start() |
| 558 |
| 559 last_name_expansion_result = None |
| 560 try: |
| 561 # Feed all work into the queue being emptied by the workers. |
| 562 for name_expansion_result in name_expansion_iterator: |
| 563 last_name_expansion_result = name_expansion_result |
| 564 work_queue.put(name_expansion_result) |
| 565 except: |
| 566 sys.stderr.write('Failed URI iteration. Last result (prior to ' |
| 567 'exception) was: %s\n' |
| 568 % repr(last_name_expansion_result)) |
| 569 finally: |
| 570 # We do all of the process cleanup in a finally cause in case the name |
| 571 # expansion iterator throws an exception. This will send EOF to all the |
| 572 # child processes and join them back into the parent process. |
| 573 |
| 574 # Send an EOF per worker. |
| 575 for shard in range(process_count): |
| 576 work_queue.put(_EOF_NAME_EXPANSION_RESULT) |
| 577 |
| 578 # Wait for all spawned OS processes to finish. |
| 579 failed_process_count = 0 |
| 580 for p in procs: |
| 581 p.join() |
| 582 # Count number of procs that returned non-zero exit code. |
| 583 if p.exitcode != 0: |
| 584 failed_process_count += 1 |
| 585 |
| 586 # Propagate shared variables back to caller's attributes. |
| 587 if shared_vars: |
| 588 for (name, var) in shared_vars.items(): |
| 589 setattr(self, name, var.value) |
| 590 |
| 591 # Abort main process if one or more sub-processes failed. Note that this |
| 592 # is outside the finally clause, because we only want to raise a new |
| 593 # exception if an exception wasn't already raised in the try clause above. |
| 594 if failed_process_count: |
| 595 plural_str = '' |
| 596 if failed_process_count > 1: |
| 597 plural_str = 'es' |
| 598 raise Exception('unexpected failure in %d sub-process%s, ' |
| 599 'aborting...' % (failed_process_count, plural_str)) |
| 600 |
| 601 else: |
| 602 # Using just 1 process, so funnel results to _ApplyThreads using facade |
| 603 # that makes NameExpansionIterator look like a Multiprocessing.Queue |
| 604 # that sends one EOF once the iterator empties. |
| 605 work_queue = NameExpansionIteratorQueue(name_expansion_iterator, |
| 606 _EOF_NAME_EXPANSION_RESULT) |
| 607 self._ApplyThreads(func, work_queue, 0, thread_count, thr_exc_handler, |
| 608 None) |
| 609 |
| 610 def HaveFileUris(self, args_to_check): |
| 611 """Checks whether args_to_check contain any file URIs. |
| 612 |
| 613 Args: |
| 614 args_to_check: Command-line argument subset to check. |
| 615 |
| 616 Returns: |
| 617 True if args_to_check contains any file URIs. |
| 618 """ |
| 619 for uri_str in args_to_check: |
| 620 if uri_str.lower().startswith('file://') or uri_str.find(':') == -1: |
| 621 return True |
| 622 return False |
| 623 |
| 624 ###################### |
| 625 # Private functions. # |
| 626 ###################### |
| 627 |
| 628 def _HaveProviderUris(self, args_to_check): |
| 629 """Checks whether args_to_check contains any provider URIs (like 'gs://'). |
| 630 |
| 631 Args: |
| 632 args_to_check: Command-line argument subset to check. |
| 633 |
| 634 Returns: |
| 635 True if args_to_check contains any provider URIs. |
| 636 """ |
| 637 for uri_str in args_to_check: |
| 638 if re.match('^[a-z]+://$', uri_str): |
| 639 return True |
| 640 return False |
| 641 |
| 642 def _ConfigureNoOpAuthIfNeeded(self): |
| 643 """Sets up no-op auth handler if no boto credentials are configured.""" |
| 644 config = boto.config |
| 645 if not util.HasConfiguredCredentials(): |
| 646 if self.config_file_list: |
| 647 if (config.has_option('Credentials', 'gs_oauth2_refresh_token') |
| 648 and not HAVE_OAUTH2): |
| 649 raise CommandException( |
| 650 'Your gsutil is configured with OAuth2 authentication ' |
| 651 'credentials.\nHowever, OAuth2 is only supported when running ' |
| 652 'under Python 2.6 or later\n(unless additional dependencies are ' |
| 653 'installed, see README for details); you are running Python %s.' % |
| 654 sys.version) |
| 655 raise CommandException('You have no storage service credentials in any ' |
| 656 'of the following boto config\nfiles. Please ' |
| 657 'add your credentials as described in the ' |
| 658 'gsutil README file, or else\nre-run ' |
| 659 '"gsutil config" to re-create a config ' |
| 660 'file:\n%s' % self.config_file_list) |
| 661 else: |
| 662 # With no boto config file the user can still access publicly readable |
| 663 # buckets and objects. |
| 664 from gslib import no_op_auth_plugin |
| 665 |
| 666 def _ApplyThreads(self, func, work_queue, shard, num_threads, |
| 667 thr_exc_handler=None, shared_vars=None): |
| 668 """ |
| 669 Perform subset of required requests across a caller specified |
| 670 number of parallel Python threads, which may be one, in which |
| 671 case the requests are processed in the current thread. |
| 672 |
| 673 Args: |
| 674 func: Function to call for each request. |
| 675 work_queue: shared queue of NameExpansionResult to process. |
| 676 shard: Assigned subset (shard number) for this function. |
| 677 num_threads: Number of Python threads to spawn to process this shard. |
| 678 thr_exc_handler: Exception handler for ThreadPool class. |
| 679 shared_vars: Dict of shared memory variables to be managed. |
| 680 (only relevant, and non-None, if this function is |
| 681 run in a separate OS process). |
| 682 """ |
| 683 # Each OS process needs to establish its own set of connections to |
| 684 # the server to avoid writes from different OS processes interleaving |
| 685 # onto the same socket (and garbling the underlying SSL session). |
| 686 # We ensure each process gets its own set of connections here by |
| 687 # closing all connections in the storage provider connection pool. |
| 688 connection_pool = StorageUri.provider_pool |
| 689 if connection_pool: |
| 690 for i in connection_pool: |
| 691 connection_pool[i].connection.close() |
| 692 |
| 693 if num_threads > 1: |
| 694 thread_pool = ThreadPool(num_threads, thr_exc_handler) |
| 695 try: |
| 696 while True: # Loop until we hit EOF marker. |
| 697 name_expansion_result = work_queue.get() |
| 698 if name_expansion_result == _EOF_NAME_EXPANSION_RESULT: |
| 699 break |
| 700 exp_src_uri = self.suri_builder.StorageUri( |
| 701 name_expansion_result.GetExpandedUriStr()) |
| 702 if self.debug: |
| 703 self.THREADED_LOGGER.info('process %d shard %d is handling uri %s', |
| 704 os.getpid(), shard, exp_src_uri) |
| 705 if (self.exclude_symlinks and exp_src_uri.is_file_uri() |
| 706 and os.path.islink(exp_src_uri.object_name)): |
| 707 self.THREADED_LOGGER.info('Skipping symbolic link %s...', exp_src_uri) |
| 708 elif num_threads > 1: |
| 709 thread_pool.AddTask(func, name_expansion_result) |
| 710 else: |
| 711 func(name_expansion_result) |
| 712 # If any Python threads created, wait here for them to finish. |
| 713 if num_threads > 1: |
| 714 thread_pool.WaitCompletion() |
| 715 finally: |
| 716 if num_threads > 1: |
| 717 thread_pool.Shutdown() |
| 718 # If any shared variables (which means we are running in a separate OS |
| 719 # process), increment value for each shared variable. |
| 720 if shared_vars: |
| 721 for (name, var) in shared_vars.items(): |
| 722 var.value += getattr(self, name) |
OLD | NEW |