OLD | NEW |
(Empty) | |
| 1 # -*- coding: utf-8 -*- |
| 2 # Copyright 2013 Google Inc. All Rights Reserved. |
| 3 # |
| 4 # Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 # you may not use this file except in compliance with the License. |
| 6 # You may obtain a copy of the License at |
| 7 # |
| 8 # http://www.apache.org/licenses/LICENSE-2.0 |
| 9 # |
| 10 # Unless required by applicable law or agreed to in writing, software |
| 11 # distributed under the License is distributed on an "AS IS" BASIS, |
| 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 # See the License for the specific language governing permissions and |
| 14 # limitations under the License. |
| 15 """Base test case class for unit and integration tests.""" |
| 16 |
| 17 from __future__ import absolute_import |
| 18 |
| 19 from functools import wraps |
| 20 import os.path |
| 21 import random |
| 22 import shutil |
| 23 import tempfile |
| 24 |
| 25 import boto |
| 26 import gslib.tests.util as util |
| 27 from gslib.tests.util import unittest |
| 28 from gslib.util import UTF8 |
| 29 |
| 30 MAX_BUCKET_LENGTH = 63 |
| 31 |
| 32 |
| 33 def NotParallelizable(func): |
| 34 """Wrapper function for cases that are not parallelizable.""" |
| 35 @wraps(func) |
| 36 def ParallelAnnotatedFunc(*args, **kwargs): |
| 37 return func(*args, **kwargs) |
| 38 ParallelAnnotatedFunc.is_parallelizable = False |
| 39 return ParallelAnnotatedFunc |
| 40 |
| 41 |
| 42 def RequiresIsolation(func): |
| 43 """Wrapper function for cases that require running in a separate process.""" |
| 44 @wraps(func) |
| 45 def RequiresIsolationFunc(*args, **kwargs): |
| 46 return func(*args, **kwargs) |
| 47 RequiresIsolationFunc.requires_isolation = True |
| 48 return RequiresIsolationFunc |
| 49 |
| 50 |
| 51 class GsUtilTestCase(unittest.TestCase): |
| 52 """Base test case class for unit and integration tests.""" |
| 53 |
| 54 def setUp(self): |
| 55 if util.RUN_S3_TESTS: |
| 56 self.test_api = 'XML' |
| 57 self.default_provider = 's3' |
| 58 self.provider_custom_meta = 'amz' |
| 59 else: |
| 60 self.test_api = boto.config.get('GSUtil', 'prefer_api', 'JSON').upper() |
| 61 self.default_provider = 'gs' |
| 62 self.provider_custom_meta = 'goog' |
| 63 self.tempdirs = [] |
| 64 |
| 65 def tearDown(self): |
| 66 while self.tempdirs: |
| 67 tmpdir = self.tempdirs.pop() |
| 68 shutil.rmtree(tmpdir, ignore_errors=True) |
| 69 |
| 70 def assertNumLines(self, text, numlines): |
| 71 self.assertEqual(text.count('\n'), numlines) |
| 72 |
| 73 def GetTestMethodName(self): |
| 74 if isinstance(self._testMethodName, unicode): |
| 75 return self._testMethodName.encode(UTF8) |
| 76 return self._testMethodName |
| 77 |
| 78 def MakeRandomTestString(self): |
| 79 """Creates a random string of hex characters 8 characters long.""" |
| 80 return '%08x' % random.randrange(256**4) |
| 81 |
| 82 def MakeTempName(self, kind, prefix=''): |
| 83 """Creates a temporary name that is most-likely unique. |
| 84 |
| 85 Args: |
| 86 kind: A string indicating what kind of test name this is. |
| 87 prefix: Prefix string to be used in the temporary name. |
| 88 |
| 89 Returns: |
| 90 The temporary name. |
| 91 """ |
| 92 name = '%sgsutil-test-%s-%s' % (prefix, self.GetTestMethodName(), kind) |
| 93 name = name[:MAX_BUCKET_LENGTH-9] |
| 94 name = '%s-%s' % (name, self.MakeRandomTestString()) |
| 95 return name |
| 96 |
| 97 def CreateTempDir(self, test_files=0): |
| 98 """Creates a temporary directory on disk. |
| 99 |
| 100 The directory and all of its contents will be deleted after the test. |
| 101 |
| 102 Args: |
| 103 test_files: The number of test files to place in the directory or a list |
| 104 of test file names. |
| 105 |
| 106 Returns: |
| 107 The path to the new temporary directory. |
| 108 """ |
| 109 tmpdir = tempfile.mkdtemp(prefix=self.MakeTempName('directory')) |
| 110 self.tempdirs.append(tmpdir) |
| 111 try: |
| 112 iter(test_files) |
| 113 except TypeError: |
| 114 test_files = [self.MakeTempName('file') for _ in range(test_files)] |
| 115 for i, name in enumerate(test_files): |
| 116 self.CreateTempFile(tmpdir=tmpdir, file_name=name, contents='test %d' % i) |
| 117 return tmpdir |
| 118 |
| 119 def CreateTempFile(self, tmpdir=None, contents=None, file_name=None): |
| 120 """Creates a temporary file on disk. |
| 121 |
| 122 Args: |
| 123 tmpdir: The temporary directory to place the file in. If not specified, a |
| 124 new temporary directory is created. |
| 125 contents: The contents to write to the file. If not specified, a test |
| 126 string is constructed and written to the file. |
| 127 file_name: The name to use for the file. If not specified, a temporary |
| 128 test file name is constructed. This can also be a tuple, where |
| 129 ('dir', 'foo') means to create a file named 'foo' inside a |
| 130 subdirectory named 'dir'. |
| 131 |
| 132 Returns: |
| 133 The path to the new temporary file. |
| 134 """ |
| 135 tmpdir = tmpdir or self.CreateTempDir() |
| 136 file_name = file_name or self.MakeTempName('file') |
| 137 if isinstance(file_name, basestring): |
| 138 fpath = os.path.join(tmpdir, file_name) |
| 139 else: |
| 140 fpath = os.path.join(tmpdir, *file_name) |
| 141 if not os.path.isdir(os.path.dirname(fpath)): |
| 142 os.makedirs(os.path.dirname(fpath)) |
| 143 |
| 144 with open(fpath, 'wb') as f: |
| 145 contents = (contents if contents is not None |
| 146 else self.MakeTempName('contents')) |
| 147 f.write(contents) |
| 148 return fpath |
OLD | NEW |