OLD | NEW |
(Empty) | |
| 1 #!/usr/bin/env python |
| 2 # Copyright (c) 2013 The Chromium Authors. All rights reserved. |
| 3 # Use of this source code is governed by a BSD-style license that can be |
| 4 # found in the LICENSE file. |
| 5 |
| 6 """Unit tests for git_common.py""" |
| 7 |
| 8 import binascii |
| 9 import collections |
| 10 import os |
| 11 import signal |
| 12 import sys |
| 13 import tempfile |
| 14 import time |
| 15 import unittest |
| 16 |
| 17 DEPOT_TOOLS_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
| 18 sys.path.insert(0, DEPOT_TOOLS_ROOT) |
| 19 |
| 20 from testing_support import git_test_utils |
| 21 from testing_support import coverage_utils |
| 22 |
| 23 |
| 24 class GitCommonTestBase(unittest.TestCase): |
| 25 @classmethod |
| 26 def setUpClass(cls): |
| 27 super(GitCommonTestBase, cls).setUpClass() |
| 28 import git_common |
| 29 cls.gc = git_common |
| 30 |
| 31 |
| 32 class Support(GitCommonTestBase): |
| 33 def _testMemoizeOneBody(self, threadsafe): |
| 34 calls = collections.defaultdict(int) |
| 35 def double_if_even(val): |
| 36 calls[val] += 1 |
| 37 if val % 2 == 0: |
| 38 return val * 2 |
| 39 else: |
| 40 return None |
| 41 # Use this explicitly as a wrapper fn instead of a decorator. Otherwise |
| 42 # pylint crashes (!!) |
| 43 double_if_even = self.gc.memoize_one(threadsafe=threadsafe)(double_if_even) |
| 44 |
| 45 self.assertEqual(4, double_if_even(2)) |
| 46 self.assertEqual(4, double_if_even(2)) |
| 47 self.assertEqual(None, double_if_even(1)) |
| 48 self.assertEqual(None, double_if_even(1)) |
| 49 self.assertDictEqual({1: 2, 2: 1}, calls) |
| 50 |
| 51 double_if_even.set(10, 20) |
| 52 self.assertEqual(20, double_if_even(10)) |
| 53 self.assertDictEqual({1: 2, 2: 1}, calls) |
| 54 |
| 55 double_if_even.clear() |
| 56 self.assertEqual(4, double_if_even(2)) |
| 57 self.assertEqual(4, double_if_even(2)) |
| 58 self.assertEqual(None, double_if_even(1)) |
| 59 self.assertEqual(None, double_if_even(1)) |
| 60 self.assertEqual(20, double_if_even(10)) |
| 61 self.assertDictEqual({1: 4, 2: 2, 10: 1}, calls) |
| 62 |
| 63 def testMemoizeOne(self): |
| 64 self._testMemoizeOneBody(threadsafe=False) |
| 65 |
| 66 def testMemoizeOneThreadsafe(self): |
| 67 self._testMemoizeOneBody(threadsafe=True) |
| 68 |
| 69 |
| 70 def slow_square(i): |
| 71 """Helper for ScopedPoolTest. |
| 72 |
| 73 Must be global because non top-level functions aren't pickleable. |
| 74 """ |
| 75 time.sleep(0.2) |
| 76 return i ** 2 |
| 77 |
| 78 |
| 79 class ScopedPoolTest(GitCommonTestBase): |
| 80 if sys.platform.startswith('win'): |
| 81 CTRL_C = signal.CTRL_C_EVENT |
| 82 else: |
| 83 CTRL_C = signal.SIGINT |
| 84 |
| 85 def testThreads(self): |
| 86 result = [] |
| 87 with self.gc.ScopedPool(kind='threads') as pool: |
| 88 for i in pool.imap(slow_square, xrange(10)): |
| 89 result.append(i) |
| 90 self.assertEqual([0, 1, 4, 9, 16, 25, 36, 49, 64, 81], result) |
| 91 |
| 92 def testThreadsCtrlC(self): |
| 93 result = [] |
| 94 with self.assertRaises(KeyboardInterrupt): |
| 95 with self.gc.ScopedPool(kind='threads') as pool: |
| 96 # Make sure this pool is interrupted in mid-swing |
| 97 for i in pool.imap(slow_square, xrange(1000000)): |
| 98 if i > 32: |
| 99 os.kill(os.getpid(), self.CTRL_C) |
| 100 result.append(i) |
| 101 self.assertEqual([0, 1, 4, 9, 16, 25], result) |
| 102 |
| 103 def testProcs(self): |
| 104 result = [] |
| 105 with self.gc.ScopedPool() as pool: |
| 106 for i in pool.imap(slow_square, xrange(10)): |
| 107 result.append(i) |
| 108 self.assertEqual([0, 1, 4, 9, 16, 25, 36, 49, 64, 81], result) |
| 109 |
| 110 def testProcsCtrlC(self): |
| 111 result = [] |
| 112 with self.assertRaises(KeyboardInterrupt): |
| 113 with self.gc.ScopedPool() as pool: |
| 114 # Make sure this pool is interrupted in mid-swing |
| 115 for i in pool.imap(slow_square, xrange(1000000)): |
| 116 if i > 32: |
| 117 os.kill(os.getpid(), self.CTRL_C) |
| 118 result.append(i) |
| 119 self.assertEqual([0, 1, 4, 9, 16, 25], result) |
| 120 |
| 121 |
| 122 class ProgressPrinterTest(GitCommonTestBase): |
| 123 class FakeStream(object): |
| 124 def __init__(self): |
| 125 self.data = set() |
| 126 self.count = 0 |
| 127 |
| 128 def write(self, line): |
| 129 self.data.add(line) |
| 130 |
| 131 def flush(self): |
| 132 self.count += 1 |
| 133 |
| 134 # This test is probably racy, but I don't have a better alternative. |
| 135 @unittest.expectedFailure |
| 136 def testBasic(self): |
| 137 fmt = '%(count)d/10' |
| 138 stream = self.FakeStream() |
| 139 |
| 140 pp = self.gc.ProgressPrinter(fmt, enabled=True, stream=stream, period=0.01) |
| 141 with pp as inc: |
| 142 for _ in xrange(10): |
| 143 time.sleep(0.02) |
| 144 inc() |
| 145 |
| 146 filtered = set(x.strip() for x in stream.data) |
| 147 rslt = set(fmt % {'count': i} for i in xrange(11)) |
| 148 self.assertSetEqual(filtered, rslt) |
| 149 self.assertGreaterEqual(stream.count, 10) |
| 150 |
| 151 |
| 152 class GitReadOnlyFunctionsTest(git_test_utils.GitRepoReadOnlyTestBase, |
| 153 GitCommonTestBase): |
| 154 REPO = """ |
| 155 A B C D |
| 156 B E D |
| 157 """ |
| 158 |
| 159 COMMIT_A = { |
| 160 'some/files/file1': {'data': 'file1'}, |
| 161 'some/files/file2': {'data': 'file2'}, |
| 162 'some/files/file3': {'data': 'file3'}, |
| 163 'some/other/file': {'data': 'otherfile'}, |
| 164 } |
| 165 |
| 166 COMMIT_C = { |
| 167 'some/files/file2': { |
| 168 'mode': 0755, |
| 169 'data': 'file2 - vanilla'}, |
| 170 } |
| 171 |
| 172 COMMIT_E = { |
| 173 'some/files/file2': {'data': 'file2 - merged'}, |
| 174 } |
| 175 |
| 176 COMMIT_D = { |
| 177 'some/files/file2': {'data': 'file2 - vanilla\nfile2 - merged'}, |
| 178 } |
| 179 |
| 180 def testHashes(self): |
| 181 ret = self.repo.run( |
| 182 self.gc.hashes, *[ |
| 183 'master', |
| 184 'master~3', |
| 185 self.repo['E']+'~', |
| 186 self.repo['D']+'^2', |
| 187 'tag_C^{}', |
| 188 ] |
| 189 ) |
| 190 self.assertEqual([ |
| 191 self.repo['D'], |
| 192 self.repo['A'], |
| 193 self.repo['B'], |
| 194 self.repo['E'], |
| 195 self.repo['C'], |
| 196 ], ret) |
| 197 |
| 198 def testParseCommitrefs(self): |
| 199 ret = self.repo.run( |
| 200 self.gc.parse_commitrefs, *[ |
| 201 'master', |
| 202 'master~3', |
| 203 self.repo['E']+'~', |
| 204 self.repo['D']+'^2', |
| 205 'tag_C^{}', |
| 206 ] |
| 207 ) |
| 208 self.assertEqual(ret, map(binascii.unhexlify, [ |
| 209 self.repo['D'], |
| 210 self.repo['A'], |
| 211 self.repo['B'], |
| 212 self.repo['E'], |
| 213 self.repo['C'], |
| 214 ])) |
| 215 |
| 216 with self.assertRaisesRegexp(Exception, r"one of \('master', 'bananas'\)"): |
| 217 self.repo.run(self.gc.parse_commitrefs, 'master', 'bananas') |
| 218 |
| 219 def testTree(self): |
| 220 tree = self.repo.run(self.gc.tree, 'master:some/files') |
| 221 file1 = self.COMMIT_A['some/files/file1']['data'] |
| 222 file2 = self.COMMIT_D['some/files/file2']['data'] |
| 223 file3 = self.COMMIT_A['some/files/file3']['data'] |
| 224 self.assertEquals(tree['file1'], |
| 225 ('100644', 'blob', git_test_utils.git_hash_data(file1))) |
| 226 self.assertEquals(tree['file2'], |
| 227 ('100755', 'blob', git_test_utils.git_hash_data(file2))) |
| 228 self.assertEquals(tree['file3'], |
| 229 ('100644', 'blob', git_test_utils.git_hash_data(file3))) |
| 230 |
| 231 tree = self.repo.run(self.gc.tree, 'master:some') |
| 232 self.assertEquals(len(tree), 2) |
| 233 # Don't check the tree hash because we're lazy :) |
| 234 self.assertEquals(tree['files'][:2], ('040000', 'tree')) |
| 235 |
| 236 tree = self.repo.run(self.gc.tree, 'master:wat') |
| 237 self.assertEqual(tree, None) |
| 238 |
| 239 def testTreeRecursive(self): |
| 240 tree = self.repo.run(self.gc.tree, 'master:some', recurse=True) |
| 241 file1 = self.COMMIT_A['some/files/file1']['data'] |
| 242 file2 = self.COMMIT_D['some/files/file2']['data'] |
| 243 file3 = self.COMMIT_A['some/files/file3']['data'] |
| 244 other = self.COMMIT_A['some/other/file']['data'] |
| 245 self.assertEquals(tree['files/file1'], |
| 246 ('100644', 'blob', git_test_utils.git_hash_data(file1))) |
| 247 self.assertEquals(tree['files/file2'], |
| 248 ('100755', 'blob', git_test_utils.git_hash_data(file2))) |
| 249 self.assertEquals(tree['files/file3'], |
| 250 ('100644', 'blob', git_test_utils.git_hash_data(file3))) |
| 251 self.assertEquals(tree['other/file'], |
| 252 ('100644', 'blob', git_test_utils.git_hash_data(other))) |
| 253 |
| 254 def testError(self): |
| 255 with self.assertRaisesRegexp(self.gc.CalledProcessError, 'status 1'): |
| 256 self.gc.run('rev-parse', 'bananas') |
| 257 |
| 258 |
| 259 class GitMutableFunctionsTest(git_test_utils.GitRepoReadWriteTestBase, |
| 260 GitCommonTestBase): |
| 261 REPO = '' |
| 262 |
| 263 def _intern_data(self, data): |
| 264 with tempfile.TemporaryFile() as f: |
| 265 f.write(data) |
| 266 f.seek(0) |
| 267 return self.repo.run(self.gc.intern_f, f) |
| 268 |
| 269 def testIndata(self): |
| 270 data = 'WayCOOL SuperTest!' |
| 271 data_hash = self.repo.run( |
| 272 self.gc.run, 'hash-object', '-t', 'blob', '--stdin', indata=data) |
| 273 self.assertEquals(data_hash, git_test_utils.git_hash_data(data)) |
| 274 |
| 275 def testInternF(self): |
| 276 data = "CoolBobcatsBro" |
| 277 data_hash = self._intern_data(data) |
| 278 self.assertEquals(git_test_utils.git_hash_data(data), data_hash) |
| 279 self.assertEquals(data, self.repo.git('cat-file', 'blob', data_hash).stdout) |
| 280 |
| 281 def testMkTree(self): |
| 282 tree = {} |
| 283 for i in 1, 2, 3: |
| 284 name = 'file%d' % i |
| 285 tree[name] = ('100644', 'blob', self._intern_data(name)) |
| 286 tree_hash = self.repo.run(self.gc.mktree, tree) |
| 287 self.assertEquals('37b61866d6e061c4ba478e7eb525be7b5752737d', tree_hash) |
| 288 |
| 289 |
| 290 if __name__ == '__main__': |
| 291 sys.exit(coverage_utils.covered_main( |
| 292 os.path.join(DEPOT_TOOLS_ROOT, 'git_common.py') |
| 293 )) |
OLD | NEW |