OLD | NEW |
| (Empty) |
1 # Copyright 2010 Google Inc. All Rights Reserved. | |
2 # | |
3 # Licensed under the Apache License, Version 2.0 (the "License"); | |
4 # you may not use this file except in compliance with the License. | |
5 # You may obtain a copy of the License at | |
6 # | |
7 # http://www.apache.org/licenses/LICENSE-2.0 | |
8 # | |
9 # Unless required by applicable law or agreed to in writing, software | |
10 # distributed under the License is distributed on an "AS IS" BASIS, | |
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
12 # See the License for the specific language governing permissions and | |
13 # limitations under the License. | |
14 | |
15 """Base class for gsutil commands. | |
16 | |
17 In addition to base class code, this file contains helpers that depend on base | |
18 class state (such as GetAclCommandHelper, which depends on self.gsutil_bin_dir, | |
19 self.bucket_storage_uri_class, etc.) In general, functions that depend on class | |
20 state and that are used by multiple commands belong in this file. Functions that | |
21 don't depend on class state belong in util.py, and non-shared helpers belong in | |
22 individual subclasses. | |
23 """ | |
24 | |
25 import boto | |
26 import getopt | |
27 import gslib | |
28 import logging | |
29 import multiprocessing | |
30 import os | |
31 import platform | |
32 import re | |
33 import sys | |
34 import wildcard_iterator | |
35 import xml.dom.minidom | |
36 | |
37 from boto import handler | |
38 from boto.storage_uri import StorageUri | |
39 from getopt import GetoptError | |
40 from gslib import util | |
41 from gslib.exception import CommandException | |
42 from gslib.help_provider import HelpProvider | |
43 from gslib.name_expansion import NameExpansionIterator | |
44 from gslib.name_expansion import NameExpansionIteratorQueue | |
45 from gslib.project_id import ProjectIdHandler | |
46 from gslib.storage_uri_builder import StorageUriBuilder | |
47 from gslib.thread_pool import ThreadPool | |
48 from gslib.util import HAVE_OAUTH2 | |
49 from gslib.util import NO_MAX | |
50 | |
51 from gslib.wildcard_iterator import ContainsWildcard | |
52 | |
53 | |
54 def _ThreadedLogger(): | |
55 """Creates a logger that resembles 'print' output, but is thread safe. | |
56 | |
57 The logger will display all messages logged with level INFO or above. Log | |
58 propagation is disabled. | |
59 | |
60 Returns: | |
61 A logger object. | |
62 """ | |
63 log = logging.getLogger('threaded-logging') | |
64 log.propagate = False | |
65 log.setLevel(logging.INFO) | |
66 log_handler = logging.StreamHandler() | |
67 log_handler.setFormatter(logging.Formatter('%(message)s')) | |
68 log.addHandler(log_handler) | |
69 return log | |
70 | |
71 # command_spec key constants. | |
72 COMMAND_NAME = 'command_name' | |
73 COMMAND_NAME_ALIASES = 'command_name_aliases' | |
74 MIN_ARGS = 'min_args' | |
75 MAX_ARGS = 'max_args' | |
76 SUPPORTED_SUB_ARGS = 'supported_sub_args' | |
77 FILE_URIS_OK = 'file_uri_ok' | |
78 PROVIDER_URIS_OK = 'provider_uri_ok' | |
79 URIS_START_ARG = 'uris_start_arg' | |
80 CONFIG_REQUIRED = 'config_required' | |
81 | |
82 _EOF_NAME_EXPANSION_RESULT = ("EOF") | |
83 | |
84 | |
85 class Command(object): | |
86 # Global instance of a threaded logger object. | |
87 THREADED_LOGGER = _ThreadedLogger() | |
88 | |
89 REQUIRED_SPEC_KEYS = [COMMAND_NAME] | |
90 | |
91 # Each subclass must define the following map, minimally including the | |
92 # keys in REQUIRED_SPEC_KEYS; other values below will be used as defaults, | |
93 # although for readbility subclasses should specify the complete map. | |
94 command_spec = { | |
95 # Name of command. | |
96 COMMAND_NAME : None, | |
97 # List of command name aliases. | |
98 COMMAND_NAME_ALIASES : [], | |
99 # Min number of args required by this command. | |
100 MIN_ARGS : 0, | |
101 # Max number of args required by this command, or NO_MAX. | |
102 MAX_ARGS : NO_MAX, | |
103 # Getopt-style string specifying acceptable sub args. | |
104 SUPPORTED_SUB_ARGS : '', | |
105 # True if file URIs are acceptable for this command. | |
106 FILE_URIS_OK : False, | |
107 # True if provider-only URIs are acceptable for this command. | |
108 PROVIDER_URIS_OK : False, | |
109 # Index in args of first URI arg. | |
110 URIS_START_ARG : 0, | |
111 # True if must configure gsutil before running command. | |
112 CONFIG_REQUIRED : True, | |
113 } | |
114 _default_command_spec = command_spec | |
115 help_spec = HelpProvider.help_spec | |
116 | |
117 """Define an empty test specification, which derived classes must populate. | |
118 | |
119 This is a list of tuples containing the following values: | |
120 | |
121 step_name - mnemonic name for test, displayed when test is run | |
122 cmd_line - shell command line to run test | |
123 expect_ret or None - expected return code from test (None means ignore) | |
124 (result_file, expect_file) or None - tuple of result file and expected | |
125 file to diff for additional test | |
126 verification beyond the return code | |
127 (None means no diff requested) | |
128 Notes: | |
129 | |
130 - Setting expected_ret to None means there is no expectation and, | |
131 hence, any returned value will pass. | |
132 | |
133 - Any occurrences of the string 'gsutil' in the cmd_line parameter | |
134 are expanded to the full path to the gsutil command under test. | |
135 | |
136 - The cmd_line, result_file and expect_file parameters may | |
137 contain the following special substrings: | |
138 | |
139 $Bn - converted to one of 10 unique-for-testing bucket names (n=0..9) | |
140 $On - converted to one of 10 unique-for-testing object names (n=0..9) | |
141 $Fn - converted to one of 10 unique-for-testing file names (n=0..9) | |
142 $G - converted to the directory where gsutil is installed. Useful for | |
143 referencing test data. | |
144 | |
145 - The generated file names are full pathnames, whereas the generated | |
146 bucket and object names are simple relative names. | |
147 | |
148 - Tests with a non-None result_file and expect_file automatically | |
149 trigger an implicit diff of the two files. | |
150 | |
151 - These test specifications, in combination with the conversion strings | |
152 allow tests to be constructed parametrically. For example, here's an | |
153 annotated subset of a test_steps for the cp command: | |
154 | |
155 # Copy local file to object, verify 0 return code. | |
156 ('simple cp', 'gsutil cp $F1 gs://$B1/$O1', 0, None, None), | |
157 # Copy uploaded object back to local file and diff vs. orig file. | |
158 ('verify cp', 'gsutil cp gs://$B1/$O1 $F2', 0, '$F2', '$F1'), | |
159 | |
160 - After pattern substitution, the specs are run sequentially, in the | |
161 order in which they appear in the test_steps list. | |
162 """ | |
163 test_steps = [] | |
164 | |
165 # Define a convenience property for command name, since it's used many places. | |
166 def _GetDefaultCommandName(self): | |
167 return self.command_spec[COMMAND_NAME] | |
168 command_name = property(_GetDefaultCommandName) | |
169 | |
170 def __init__(self, command_runner, args, headers, debug, parallel_operations, | |
171 gsutil_bin_dir, boto_lib_dir, config_file_list, gsutil_ver, | |
172 bucket_storage_uri_class, test_method=None, | |
173 bypass_prodaccess=True): | |
174 """ | |
175 Args: | |
176 command_runner: CommandRunner (for commands built atop other commands). | |
177 args: Command-line args (arg0 = actual arg, not command name ala bash). | |
178 headers: Dictionary containing optional HTTP headers to pass to boto. | |
179 debug: Debug level to pass in to boto connection (range 0..3). | |
180 parallel_operations: Should command operations be executed in parallel? | |
181 gsutil_bin_dir: Bin dir from which gsutil is running. | |
182 boto_lib_dir: Lib dir where boto runs. | |
183 config_file_list: Config file list returned by _GetBotoConfigFileList(). | |
184 gsutil_ver: Version string of currently running gsutil command. | |
185 bucket_storage_uri_class: Class to instantiate for cloud StorageUris. | |
186 Settable for testing/mocking. | |
187 test_method: Optional general purpose method for testing purposes. | |
188 Application and semantics of this method will vary by | |
189 command and test type. | |
190 bypass_prodaccess: Boolean to ignore the existance of prodaccess. | |
191 | |
192 Implementation note: subclasses shouldn't need to define an __init__ | |
193 method, and instead depend on the shared initialization that happens | |
194 here. If you do define an __init__ method in a subclass you'll need to | |
195 explicitly call super().__init__(). But you're encouraged not to do this, | |
196 because it will make changing the __init__ interface more painful. | |
197 """ | |
198 # Save class values from constructor params. | |
199 self.command_runner = command_runner | |
200 self.args = args | |
201 self.unparsed_args = args | |
202 self.headers = headers | |
203 self.debug = debug | |
204 self.parallel_operations = parallel_operations | |
205 self.gsutil_bin_dir = gsutil_bin_dir | |
206 self.boto_lib_dir = boto_lib_dir | |
207 self.config_file_list = config_file_list | |
208 self.gsutil_ver = gsutil_ver | |
209 self.bucket_storage_uri_class = bucket_storage_uri_class | |
210 self.test_method = test_method | |
211 self.exclude_symlinks = False | |
212 self.recursion_requested = False | |
213 self.all_versions = False | |
214 self.bypass_prodaccess = bypass_prodaccess | |
215 | |
216 # Process sub-command instance specifications. | |
217 # First, ensure subclass implementation sets all required keys. | |
218 for k in self.REQUIRED_SPEC_KEYS: | |
219 if k not in self.command_spec or self.command_spec[k] is None: | |
220 raise CommandException('"%s" command implementation is missing %s ' | |
221 'specification' % (self.command_name, k)) | |
222 # Now override default command_spec with subclass-specified values. | |
223 tmp = self._default_command_spec | |
224 tmp.update(self.command_spec) | |
225 self.command_spec = tmp | |
226 del tmp | |
227 | |
228 # Make sure command provides a test specification. | |
229 if not self.test_steps: | |
230 # TODO: Uncomment following lines when test feature is ready. | |
231 #raise CommandException('"%s" command implementation is missing test ' | |
232 #'specification' % self.command_name) | |
233 pass | |
234 | |
235 # Parse and validate args. | |
236 try: | |
237 (self.sub_opts, self.args) = getopt.getopt( | |
238 args, self.command_spec[SUPPORTED_SUB_ARGS]) | |
239 except GetoptError, e: | |
240 raise CommandException('%s for "%s" command.' % (e.msg, | |
241 self.command_name)) | |
242 if (len(self.args) < self.command_spec[MIN_ARGS] | |
243 or len(self.args) > self.command_spec[MAX_ARGS]): | |
244 raise CommandException('Wrong number of arguments for "%s" command.' % | |
245 self.command_name) | |
246 if (not self.command_spec[FILE_URIS_OK] | |
247 and self.HaveFileUris(self.args[self.command_spec[URIS_START_ARG]:])): | |
248 raise CommandException('"%s" command does not support "file://" URIs. ' | |
249 'Did you mean to use a gs:// URI?' % | |
250 self.command_name) | |
251 if (not self.command_spec[PROVIDER_URIS_OK] | |
252 and self._HaveProviderUris( | |
253 self.args[self.command_spec[URIS_START_ARG]:])): | |
254 raise CommandException('"%s" command does not support provider-only ' | |
255 'URIs.' % self.command_name) | |
256 if self.command_spec[CONFIG_REQUIRED]: | |
257 self._ConfigureNoOpAuthIfNeeded() | |
258 | |
259 self.proj_id_handler = ProjectIdHandler() | |
260 self.suri_builder = StorageUriBuilder(debug, bucket_storage_uri_class) | |
261 | |
262 # Cross-platform path to run gsutil binary. | |
263 self.gsutil_cmd = '' | |
264 # Cross-platform list containing gsutil path for use with subprocess. | |
265 self.gsutil_exec_list = [] | |
266 # If running on Windows, invoke python interpreter explicitly. | |
267 if platform.system() == "Windows": | |
268 self.gsutil_cmd += 'python ' | |
269 self.gsutil_exec_list += ['python'] | |
270 # Add full path to gsutil to make sure we test the correct version. | |
271 self.gsutil_path = os.path.join(self.gsutil_bin_dir, 'gsutil') | |
272 self.gsutil_cmd += self.gsutil_path | |
273 self.gsutil_exec_list += [self.gsutil_path] | |
274 | |
275 # We're treating recursion_requested like it's used by all commands, but | |
276 # only some of the commands accept the -R option. | |
277 if self.sub_opts: | |
278 for o, unused_a in self.sub_opts: | |
279 if o == '-r' or o == '-R': | |
280 self.recursion_requested = True | |
281 break | |
282 | |
283 def WildcardIterator(self, uri_or_str, all_versions=False): | |
284 """ | |
285 Helper to instantiate gslib.WildcardIterator. Args are same as | |
286 gslib.WildcardIterator interface, but this method fills in most of the | |
287 values from instance state. | |
288 | |
289 Args: | |
290 uri_or_str: StorageUri or URI string naming wildcard objects to iterate. | |
291 """ | |
292 return wildcard_iterator.wildcard_iterator( | |
293 uri_or_str, self.proj_id_handler, | |
294 bucket_storage_uri_class=self.bucket_storage_uri_class, | |
295 all_versions=all_versions, | |
296 headers=self.headers, debug=self.debug) | |
297 | |
298 def RunCommand(self): | |
299 """Abstract function in base class. Subclasses must implement this. The | |
300 return value of this function will be used as the exit status of the | |
301 process, so subclass commands should return an integer exit code (0 for | |
302 success, a value in [1,255] for failure). | |
303 """ | |
304 raise CommandException('Command %s is missing its RunCommand() ' | |
305 'implementation' % self.command_name) | |
306 | |
307 ############################################################ | |
308 # Shared helper functions that depend on base class state. # | |
309 ############################################################ | |
310 | |
311 def UrisAreForSingleProvider(self, uri_args): | |
312 """Tests whether the uris are all for a single provider. | |
313 | |
314 Returns: a StorageUri for one of the uris on success, None on failure. | |
315 """ | |
316 provider = None | |
317 uri = None | |
318 for uri_str in uri_args: | |
319 # validate=False because we allow wildcard uris. | |
320 uri = boto.storage_uri( | |
321 uri_str, debug=self.debug, validate=False, | |
322 bucket_storage_uri_class=self.bucket_storage_uri_class) | |
323 if not provider: | |
324 provider = uri.scheme | |
325 elif uri.scheme != provider: | |
326 return None | |
327 return uri | |
328 | |
329 def SetAclCommandHelper(self): | |
330 """ | |
331 Common logic for setting ACLs. Sets the standard ACL or the default | |
332 object ACL depending on self.command_name. | |
333 """ | |
334 | |
335 acl_arg = self.args[0] | |
336 uri_args = self.args[1:] | |
337 # Disallow multi-provider setacl requests, because there are differences in | |
338 # the ACL models. | |
339 storage_uri = self.UrisAreForSingleProvider(uri_args) | |
340 if not storage_uri: | |
341 raise CommandException('"%s" command spanning providers not allowed.' % | |
342 self.command_name) | |
343 | |
344 # Determine whether acl_arg names a file containing XML ACL text vs. the | |
345 # string name of a canned ACL. | |
346 if os.path.isfile(acl_arg): | |
347 acl_file = open(acl_arg, 'r') | |
348 acl_arg = acl_file.read() | |
349 | |
350 # TODO: Remove this workaround when GCS allows | |
351 # whitespace in the Permission element on the server-side | |
352 acl_arg = re.sub(r'<Permission>\s*(\S+)\s*</Permission>', | |
353 r'<Permission>\1</Permission>', acl_arg) | |
354 | |
355 acl_file.close() | |
356 self.canned = False | |
357 else: | |
358 # No file exists, so expect a canned ACL string. | |
359 canned_acls = storage_uri.canned_acls() | |
360 if acl_arg not in canned_acls: | |
361 raise CommandException('Invalid canned ACL "%s".' % acl_arg) | |
362 self.canned = True | |
363 | |
364 # Used to track if any ACLs failed to be set. | |
365 self.everything_set_okay = True | |
366 | |
367 def _SetAclExceptionHandler(e): | |
368 """Simple exception handler to allow post-completion status.""" | |
369 self.THREADED_LOGGER.error(str(e)) | |
370 self.everything_set_okay = False | |
371 | |
372 def _SetAclFunc(name_expansion_result): | |
373 exp_src_uri = self.suri_builder.StorageUri( | |
374 name_expansion_result.GetExpandedUriStr()) | |
375 # We don't do bucket operations multi-threaded (see comment below). | |
376 assert self.command_name != 'setdefacl' | |
377 self.THREADED_LOGGER.info('Setting ACL on %s...' % | |
378 name_expansion_result.expanded_uri_str) | |
379 if self.canned: | |
380 exp_src_uri.set_acl(acl_arg, exp_src_uri.object_name, False, | |
381 self.headers) | |
382 else: | |
383 exp_src_uri.set_xml_acl(acl_arg, exp_src_uri.object_name, False, | |
384 self.headers) | |
385 | |
386 # If user specified -R option, convert any bucket args to bucket wildcards | |
387 # (e.g., gs://bucket/*), to prevent the operation from being applied to | |
388 # the buckets themselves. | |
389 if self.recursion_requested: | |
390 for i in range(len(uri_args)): | |
391 uri = self.suri_builder.StorageUri(uri_args[i]) | |
392 if uri.names_bucket(): | |
393 uri_args[i] = uri.clone_replace_name('*').uri | |
394 else: | |
395 # Handle bucket ACL setting operations single-threaded, because | |
396 # our threading machinery currently assumes it's working with objects | |
397 # (name_expansion_iterator), and normally we wouldn't expect users to need | |
398 # to set ACLs on huge numbers of buckets at once anyway. | |
399 for i in range(len(uri_args)): | |
400 uri_str = uri_args[i] | |
401 if self.suri_builder.StorageUri(uri_str).names_bucket(): | |
402 self._RunSingleThreadedSetAcl(acl_arg, uri_args) | |
403 return | |
404 | |
405 name_expansion_iterator = NameExpansionIterator( | |
406 self.command_name, self.proj_id_handler, self.headers, self.debug, | |
407 self.bucket_storage_uri_class, uri_args, self.recursion_requested, | |
408 self.recursion_requested, all_versions=self.all_versions) | |
409 | |
410 # Perform requests in parallel (-m) mode, if requested, using | |
411 # configured number of parallel processes and threads. Otherwise, | |
412 # perform requests with sequential function calls in current process. | |
413 self.Apply(_SetAclFunc, name_expansion_iterator, _SetAclExceptionHandler) | |
414 | |
415 if not self.everything_set_okay: | |
416 raise CommandException('ACLs for some objects could not be set.') | |
417 | |
418 def _RunSingleThreadedSetAcl(self, acl_arg, uri_args): | |
419 some_matched = False | |
420 for uri_str in uri_args: | |
421 for blr in self.WildcardIterator(uri_str): | |
422 if blr.HasPrefix(): | |
423 continue | |
424 some_matched = True | |
425 uri = blr.GetUri() | |
426 if self.command_name == 'setdefacl': | |
427 print 'Setting default object ACL on %s...' % uri | |
428 if self.canned: | |
429 uri.set_def_acl(acl_arg, uri.object_name, False, self.headers) | |
430 else: | |
431 uri.set_def_xml_acl(acl_arg, False, self.headers) | |
432 else: | |
433 print 'Setting ACL on %s...' % uri | |
434 if self.canned: | |
435 uri.set_acl(acl_arg, uri.object_name, False, self.headers) | |
436 else: | |
437 uri.set_xml_acl(acl_arg, uri.object_name, False, self.headers) | |
438 if not some_matched: | |
439 raise CommandException('No URIs matched') | |
440 | |
441 def GetAclCommandHelper(self): | |
442 """Common logic for getting ACLs. Gets the standard ACL or the default | |
443 object ACL depending on self.command_name.""" | |
444 | |
445 # Resolve to just one object. | |
446 # Handle wildcard-less URI specially in case this is a version-specific | |
447 # URI, because WildcardIterator().IterUris() would lose the versioning info. | |
448 if not ContainsWildcard(self.args[0]): | |
449 uri = self.suri_builder.StorageUri(self.args[0]) | |
450 else: | |
451 uris = list(self.WildcardIterator(self.args[0]).IterUris()) | |
452 if len(uris) == 0: | |
453 raise CommandException('No URIs matched') | |
454 if len(uris) != 1: | |
455 raise CommandException('%s matched more than one URI, which is not ' | |
456 'allowed by the %s command' % (self.args[0], self.command_name)) | |
457 uri = uris[0] | |
458 if not uri.names_bucket() and not uri.names_object(): | |
459 raise CommandException('"%s" command must specify a bucket or ' | |
460 'object.' % self.command_name) | |
461 if self.command_name == 'getdefacl': | |
462 acl = uri.get_def_acl(False, self.headers) | |
463 else: | |
464 acl = uri.get_acl(False, self.headers) | |
465 # Pretty-print the XML to make it more easily human editable. | |
466 parsed_xml = xml.dom.minidom.parseString(acl.to_xml().encode('utf-8')) | |
467 print parsed_xml.toprettyxml(indent=' ') | |
468 | |
469 def GetXmlSubresource(self, subresource, uri_arg): | |
470 """Print an xml subresource, e.g. logging, for a bucket/object. | |
471 | |
472 Args: | |
473 subresource: The subresource name. | |
474 uri_arg: URI for the bucket/object. Wildcards will be expanded. | |
475 | |
476 Raises: | |
477 CommandException: if errors encountered. | |
478 """ | |
479 # Wildcarding is allowed but must resolve to just one bucket. | |
480 uris = list(self.WildcardIterator(uri_arg).IterUris()) | |
481 if len(uris) != 1: | |
482 raise CommandException('Wildcards must resolve to exactly one item for ' | |
483 'get %s' % subresource) | |
484 uri = uris[0] | |
485 xml_str = uri.get_subresource(subresource, False, self.headers) | |
486 # Pretty-print the XML to make it more easily human editable. | |
487 parsed_xml = xml.dom.minidom.parseString(xml_str.encode('utf-8')) | |
488 print parsed_xml.toprettyxml(indent=' ') | |
489 | |
490 def Apply(self, func, name_expansion_iterator, thr_exc_handler, | |
491 shared_attrs=None): | |
492 """Dispatch input URI assignments across a pool of parallel OS | |
493 processes and/or Python threads, based on options (-m or not) | |
494 and settings in the user's config file. If non-parallel mode | |
495 or only one OS process requested, execute requests sequentially | |
496 in the current OS process. | |
497 | |
498 Args: | |
499 func: Function to call to process each URI. | |
500 name_expansion_iterator: Iterator of NameExpansionResult. | |
501 thr_exc_handler: Exception handler for ThreadPool class. | |
502 shared_attrs: List of attributes to manage across sub-processes. | |
503 | |
504 Raises: | |
505 CommandException if invalid config encountered. | |
506 """ | |
507 | |
508 # Set OS process and python thread count as a function of options | |
509 # and config. | |
510 if self.parallel_operations: | |
511 process_count = boto.config.getint( | |
512 'GSUtil', 'parallel_process_count', | |
513 gslib.commands.config.DEFAULT_PARALLEL_PROCESS_COUNT) | |
514 if process_count < 1: | |
515 raise CommandException('Invalid parallel_process_count "%d".' % | |
516 process_count) | |
517 thread_count = boto.config.getint( | |
518 'GSUtil', 'parallel_thread_count', | |
519 gslib.commands.config.DEFAULT_PARALLEL_THREAD_COUNT) | |
520 if thread_count < 1: | |
521 raise CommandException('Invalid parallel_thread_count "%d".' % | |
522 thread_count) | |
523 else: | |
524 # If -m not specified, then assume 1 OS process and 1 Python thread. | |
525 process_count = 1 | |
526 thread_count = 1 | |
527 | |
528 if self.debug: | |
529 self.THREADED_LOGGER.info('process count: %d', process_count) | |
530 self.THREADED_LOGGER.info('thread count: %d', thread_count) | |
531 | |
532 if self.parallel_operations and process_count > 1: | |
533 procs = [] | |
534 # If any shared attributes passed by caller, create a dictionary of | |
535 # shared memory variables for every element in the list of shared | |
536 # attributes. | |
537 shared_vars = None | |
538 if shared_attrs: | |
539 for name in shared_attrs: | |
540 if not shared_vars: | |
541 shared_vars = {} | |
542 shared_vars[name] = multiprocessing.Value('i', 0) | |
543 # Construct work queue for parceling out work to multiprocessing workers, | |
544 # setting the max queue length of 50k so we will block if workers don't | |
545 # empty the queue as fast as we can continue iterating over the bucket | |
546 # listing. This number may need tuning; it should be large enough to | |
547 # keep workers busy (overlapping bucket list next-page retrieval with | |
548 # operations being fed from the queue) but small enough that we don't | |
549 # overfill memory when runing across a slow network link. | |
550 work_queue = multiprocessing.Queue(50000) | |
551 for shard in range(process_count): | |
552 # Spawn a separate OS process for each shard. | |
553 if self.debug: | |
554 self.THREADED_LOGGER.info('spawning process for shard %d', shard) | |
555 p = multiprocessing.Process(target=self._ApplyThreads, | |
556 args=(func, work_queue, shard, | |
557 thread_count, thr_exc_handler, | |
558 shared_vars)) | |
559 procs.append(p) | |
560 p.start() | |
561 | |
562 last_name_expansion_result = None | |
563 try: | |
564 # Feed all work into the queue being emptied by the workers. | |
565 for name_expansion_result in name_expansion_iterator: | |
566 last_name_expansion_result = name_expansion_result | |
567 work_queue.put(name_expansion_result) | |
568 except: | |
569 sys.stderr.write('Failed URI iteration. Last result (prior to ' | |
570 'exception) was: %s\n' | |
571 % repr(last_name_expansion_result)) | |
572 finally: | |
573 # We do all of the process cleanup in a finally cause in case the name | |
574 # expansion iterator throws an exception. This will send EOF to all the | |
575 # child processes and join them back into the parent process. | |
576 | |
577 # Send an EOF per worker. | |
578 for shard in range(process_count): | |
579 work_queue.put(_EOF_NAME_EXPANSION_RESULT) | |
580 | |
581 # Wait for all spawned OS processes to finish. | |
582 failed_process_count = 0 | |
583 for p in procs: | |
584 p.join() | |
585 # Count number of procs that returned non-zero exit code. | |
586 if p.exitcode != 0: | |
587 failed_process_count += 1 | |
588 | |
589 # Propagate shared variables back to caller's attributes. | |
590 if shared_vars: | |
591 for (name, var) in shared_vars.items(): | |
592 setattr(self, name, var.value) | |
593 | |
594 # Abort main process if one or more sub-processes failed. Note that this | |
595 # is outside the finally clause, because we only want to raise a new | |
596 # exception if an exception wasn't already raised in the try clause above. | |
597 if failed_process_count: | |
598 plural_str = '' | |
599 if failed_process_count > 1: | |
600 plural_str = 'es' | |
601 raise Exception('unexpected failure in %d sub-process%s, ' | |
602 'aborting...' % (failed_process_count, plural_str)) | |
603 | |
604 else: | |
605 # Using just 1 process, so funnel results to _ApplyThreads using facade | |
606 # that makes NameExpansionIterator look like a Multiprocessing.Queue | |
607 # that sends one EOF once the iterator empties. | |
608 work_queue = NameExpansionIteratorQueue(name_expansion_iterator, | |
609 _EOF_NAME_EXPANSION_RESULT) | |
610 self._ApplyThreads(func, work_queue, 0, thread_count, thr_exc_handler, | |
611 None) | |
612 | |
613 def HaveFileUris(self, args_to_check): | |
614 """Checks whether args_to_check contain any file URIs. | |
615 | |
616 Args: | |
617 args_to_check: Command-line argument subset to check. | |
618 | |
619 Returns: | |
620 True if args_to_check contains any file URIs. | |
621 """ | |
622 for uri_str in args_to_check: | |
623 if uri_str.lower().startswith('file://') or uri_str.find(':') == -1: | |
624 return True | |
625 return False | |
626 | |
627 ###################### | |
628 # Private functions. # | |
629 ###################### | |
630 | |
631 def _HaveProviderUris(self, args_to_check): | |
632 """Checks whether args_to_check contains any provider URIs (like 'gs://'). | |
633 | |
634 Args: | |
635 args_to_check: Command-line argument subset to check. | |
636 | |
637 Returns: | |
638 True if args_to_check contains any provider URIs. | |
639 """ | |
640 for uri_str in args_to_check: | |
641 if re.match('^[a-z]+://$', uri_str): | |
642 return True | |
643 return False | |
644 | |
645 def _ConfigureNoOpAuthIfNeeded(self): | |
646 """Sets up no-op auth handler if no boto credentials are configured.""" | |
647 config = boto.config | |
648 if not util.HasConfiguredCredentials(self.bypass_prodaccess): | |
649 if self.config_file_list: | |
650 if (config.has_option('Credentials', 'gs_oauth2_refresh_token') | |
651 and not HAVE_OAUTH2): | |
652 raise CommandException( | |
653 'Your gsutil is configured with OAuth2 authentication ' | |
654 'credentials.\nHowever, OAuth2 is only supported when running ' | |
655 'under Python 2.6 or later\n(unless additional dependencies are ' | |
656 'installed, see README for details); you are running Python %s.' % | |
657 sys.version) | |
658 raise CommandException('You have no storage service credentials in any ' | |
659 'of the following boto config\nfiles. Please ' | |
660 'add your credentials as described in the ' | |
661 'gsutil README file, or else\nre-run ' | |
662 '"gsutil config" to re-create a config ' | |
663 'file:\n%s' % self.config_file_list) | |
664 else: | |
665 # With no boto config file the user can still access publicly readable | |
666 # buckets and objects. | |
667 from gslib import no_op_auth_plugin | |
668 | |
669 def _ApplyThreads(self, func, work_queue, shard, num_threads, | |
670 thr_exc_handler=None, shared_vars=None): | |
671 """ | |
672 Perform subset of required requests across a caller specified | |
673 number of parallel Python threads, which may be one, in which | |
674 case the requests are processed in the current thread. | |
675 | |
676 Args: | |
677 func: Function to call for each request. | |
678 work_queue: shared queue of NameExpansionResult to process. | |
679 shard: Assigned subset (shard number) for this function. | |
680 num_threads: Number of Python threads to spawn to process this shard. | |
681 thr_exc_handler: Exception handler for ThreadPool class. | |
682 shared_vars: Dict of shared memory variables to be managed. | |
683 (only relevant, and non-None, if this function is | |
684 run in a separate OS process). | |
685 """ | |
686 # Each OS process needs to establish its own set of connections to | |
687 # the server to avoid writes from different OS processes interleaving | |
688 # onto the same socket (and garbling the underlying SSL session). | |
689 # We ensure each process gets its own set of connections here by | |
690 # closing all connections in the storage provider connection pool. | |
691 connection_pool = StorageUri.provider_pool | |
692 if connection_pool: | |
693 for i in connection_pool: | |
694 connection_pool[i].connection.close() | |
695 | |
696 if num_threads > 1: | |
697 thread_pool = ThreadPool(num_threads, thr_exc_handler) | |
698 try: | |
699 while True: # Loop until we hit EOF marker. | |
700 name_expansion_result = work_queue.get() | |
701 if name_expansion_result == _EOF_NAME_EXPANSION_RESULT: | |
702 break | |
703 exp_src_uri = self.suri_builder.StorageUri( | |
704 name_expansion_result.GetExpandedUriStr()) | |
705 if self.debug: | |
706 self.THREADED_LOGGER.info('process %d shard %d is handling uri %s', | |
707 os.getpid(), shard, exp_src_uri) | |
708 if (self.exclude_symlinks and exp_src_uri.is_file_uri() | |
709 and os.path.islink(exp_src_uri.object_name)): | |
710 self.THREADED_LOGGER.info('Skipping symbolic link %s...', exp_src_uri) | |
711 elif num_threads > 1: | |
712 thread_pool.AddTask(func, name_expansion_result) | |
713 else: | |
714 func(name_expansion_result) | |
715 # If any Python threads created, wait here for them to finish. | |
716 if num_threads > 1: | |
717 thread_pool.WaitCompletion() | |
718 finally: | |
719 if num_threads > 1: | |
720 thread_pool.Shutdown() | |
721 # If any shared variables (which means we are running in a separate OS | |
722 # process), increment value for each shared variable. | |
723 if shared_vars: | |
724 for (name, var) in shared_vars.items(): | |
725 var.value += getattr(self, name) | |
OLD | NEW |