OLD | NEW |
| (Empty) |
1 # -*- coding: utf-8 -*- | |
2 # Copyright 2013 Google Inc. All Rights Reserved. | |
3 # | |
4 # Licensed under the Apache License, Version 2.0 (the "License"); | |
5 # you may not use this file except in compliance with the License. | |
6 # You may obtain a copy of the License at | |
7 # | |
8 # http://www.apache.org/licenses/LICENSE-2.0 | |
9 # | |
10 # Unless required by applicable law or agreed to in writing, software | |
11 # distributed under the License is distributed on an "AS IS" BASIS, | |
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 # See the License for the specific language governing permissions and | |
14 # limitations under the License. | |
15 """Base test case class for unit and integration tests.""" | |
16 | |
17 from __future__ import absolute_import | |
18 | |
19 from functools import wraps | |
20 import os.path | |
21 import random | |
22 import shutil | |
23 import tempfile | |
24 | |
25 import boto | |
26 import gslib.tests.util as util | |
27 from gslib.tests.util import unittest | |
28 from gslib.util import UTF8 | |
29 | |
30 MAX_BUCKET_LENGTH = 63 | |
31 | |
32 | |
33 def NotParallelizable(func): | |
34 """Wrapper function for cases that are not parallelizable.""" | |
35 @wraps(func) | |
36 def ParallelAnnotatedFunc(*args, **kwargs): | |
37 return func(*args, **kwargs) | |
38 ParallelAnnotatedFunc.is_parallelizable = False | |
39 return ParallelAnnotatedFunc | |
40 | |
41 | |
42 def RequiresIsolation(func): | |
43 """Wrapper function for cases that require running in a separate process.""" | |
44 @wraps(func) | |
45 def RequiresIsolationFunc(*args, **kwargs): | |
46 return func(*args, **kwargs) | |
47 RequiresIsolationFunc.requires_isolation = True | |
48 return RequiresIsolationFunc | |
49 | |
50 | |
51 class GsUtilTestCase(unittest.TestCase): | |
52 """Base test case class for unit and integration tests.""" | |
53 | |
54 def setUp(self): | |
55 if util.RUN_S3_TESTS: | |
56 self.test_api = 'XML' | |
57 self.default_provider = 's3' | |
58 self.provider_custom_meta = 'amz' | |
59 else: | |
60 self.test_api = boto.config.get('GSUtil', 'prefer_api', 'JSON').upper() | |
61 self.default_provider = 'gs' | |
62 self.provider_custom_meta = 'goog' | |
63 self.tempdirs = [] | |
64 | |
65 def tearDown(self): | |
66 while self.tempdirs: | |
67 tmpdir = self.tempdirs.pop() | |
68 shutil.rmtree(tmpdir, ignore_errors=True) | |
69 | |
70 def assertNumLines(self, text, numlines): | |
71 self.assertEqual(text.count('\n'), numlines) | |
72 | |
73 def GetTestMethodName(self): | |
74 if isinstance(self._testMethodName, unicode): | |
75 return self._testMethodName.encode(UTF8) | |
76 return self._testMethodName | |
77 | |
78 def MakeRandomTestString(self): | |
79 """Creates a random string of hex characters 8 characters long.""" | |
80 return '%08x' % random.randrange(256**4) | |
81 | |
82 def MakeTempName(self, kind, prefix=''): | |
83 """Creates a temporary name that is most-likely unique. | |
84 | |
85 Args: | |
86 kind: A string indicating what kind of test name this is. | |
87 prefix: Prefix string to be used in the temporary name. | |
88 | |
89 Returns: | |
90 The temporary name. | |
91 """ | |
92 name = '%sgsutil-test-%s-%s' % (prefix, self.GetTestMethodName(), kind) | |
93 name = name[:MAX_BUCKET_LENGTH-9] | |
94 name = '%s-%s' % (name, self.MakeRandomTestString()) | |
95 return name | |
96 | |
97 def CreateTempDir(self, test_files=0): | |
98 """Creates a temporary directory on disk. | |
99 | |
100 The directory and all of its contents will be deleted after the test. | |
101 | |
102 Args: | |
103 test_files: The number of test files to place in the directory or a list | |
104 of test file names. | |
105 | |
106 Returns: | |
107 The path to the new temporary directory. | |
108 """ | |
109 tmpdir = tempfile.mkdtemp(prefix=self.MakeTempName('directory')) | |
110 self.tempdirs.append(tmpdir) | |
111 try: | |
112 iter(test_files) | |
113 except TypeError: | |
114 test_files = [self.MakeTempName('file') for _ in range(test_files)] | |
115 for i, name in enumerate(test_files): | |
116 self.CreateTempFile(tmpdir=tmpdir, file_name=name, contents='test %d' % i) | |
117 return tmpdir | |
118 | |
119 def CreateTempFile(self, tmpdir=None, contents=None, file_name=None): | |
120 """Creates a temporary file on disk. | |
121 | |
122 Args: | |
123 tmpdir: The temporary directory to place the file in. If not specified, a | |
124 new temporary directory is created. | |
125 contents: The contents to write to the file. If not specified, a test | |
126 string is constructed and written to the file. | |
127 file_name: The name to use for the file. If not specified, a temporary | |
128 test file name is constructed. This can also be a tuple, where | |
129 ('dir', 'foo') means to create a file named 'foo' inside a | |
130 subdirectory named 'dir'. | |
131 | |
132 Returns: | |
133 The path to the new temporary file. | |
134 """ | |
135 tmpdir = tmpdir or self.CreateTempDir() | |
136 file_name = file_name or self.MakeTempName('file') | |
137 if isinstance(file_name, basestring): | |
138 fpath = os.path.join(tmpdir, file_name) | |
139 else: | |
140 fpath = os.path.join(tmpdir, *file_name) | |
141 if not os.path.isdir(os.path.dirname(fpath)): | |
142 os.makedirs(os.path.dirname(fpath)) | |
143 | |
144 with open(fpath, 'wb') as f: | |
145 contents = (contents if contents is not None | |
146 else self.MakeTempName('contents')) | |
147 f.write(contents) | |
148 return fpath | |
OLD | NEW |