| Index: tests/git_common_test.py
|
| diff --git a/tests/git_common_test.py b/tests/git_common_test.py
|
| new file mode 100755
|
| index 0000000000000000000000000000000000000000..bec09b2f6264e237717c670269d33ecb41ab993e
|
| --- /dev/null
|
| +++ b/tests/git_common_test.py
|
| @@ -0,0 +1,293 @@
|
| +#!/usr/bin/env python
|
| +# Copyright (c) 2013 The Chromium Authors. All rights reserved.
|
| +# Use of this source code is governed by a BSD-style license that can be
|
| +# found in the LICENSE file.
|
| +
|
| +"""Unit tests for git_common.py"""
|
| +
|
| +import binascii
|
| +import collections
|
| +import os
|
| +import signal
|
| +import sys
|
| +import tempfile
|
| +import time
|
| +import unittest
|
| +
|
| +DEPOT_TOOLS_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
| +sys.path.insert(0, DEPOT_TOOLS_ROOT)
|
| +
|
| +from testing_support import git_test_utils
|
| +from testing_support import coverage_utils
|
| +
|
| +
|
| +class GitCommonTestBase(unittest.TestCase):
|
| + @classmethod
|
| + def setUpClass(cls):
|
| + super(GitCommonTestBase, cls).setUpClass()
|
| + import git_common
|
| + cls.gc = git_common
|
| +
|
| +
|
| +class Support(GitCommonTestBase):
|
| + def _testMemoizeOneBody(self, threadsafe):
|
| + calls = collections.defaultdict(int)
|
| + def double_if_even(val):
|
| + calls[val] += 1
|
| + if val % 2 == 0:
|
| + return val * 2
|
| + else:
|
| + return None
|
| + # Use this explicitly as a wrapper fn instead of a decorator. Otherwise
|
| + # pylint crashes (!!)
|
| + double_if_even = self.gc.memoize_one(threadsafe=threadsafe)(double_if_even)
|
| +
|
| + self.assertEqual(4, double_if_even(2))
|
| + self.assertEqual(4, double_if_even(2))
|
| + self.assertEqual(None, double_if_even(1))
|
| + self.assertEqual(None, double_if_even(1))
|
| + self.assertDictEqual({1: 2, 2: 1}, calls)
|
| +
|
| + double_if_even.set(10, 20)
|
| + self.assertEqual(20, double_if_even(10))
|
| + self.assertDictEqual({1: 2, 2: 1}, calls)
|
| +
|
| + double_if_even.clear()
|
| + self.assertEqual(4, double_if_even(2))
|
| + self.assertEqual(4, double_if_even(2))
|
| + self.assertEqual(None, double_if_even(1))
|
| + self.assertEqual(None, double_if_even(1))
|
| + self.assertEqual(20, double_if_even(10))
|
| + self.assertDictEqual({1: 4, 2: 2, 10: 1}, calls)
|
| +
|
| + def testMemoizeOne(self):
|
| + self._testMemoizeOneBody(threadsafe=False)
|
| +
|
| + def testMemoizeOneThreadsafe(self):
|
| + self._testMemoizeOneBody(threadsafe=True)
|
| +
|
| +
|
| +def slow_square(i):
|
| + """Helper for ScopedPoolTest.
|
| +
|
| + Must be global because non top-level functions aren't pickleable.
|
| + """
|
| + time.sleep(0.2)
|
| + return i ** 2
|
| +
|
| +
|
| +class ScopedPoolTest(GitCommonTestBase):
|
| + if sys.platform.startswith('win'):
|
| + CTRL_C = signal.CTRL_C_EVENT
|
| + else:
|
| + CTRL_C = signal.SIGINT
|
| +
|
| + def testThreads(self):
|
| + result = []
|
| + with self.gc.ScopedPool(kind='threads') as pool:
|
| + for i in pool.imap(slow_square, xrange(10)):
|
| + result.append(i)
|
| + self.assertEqual([0, 1, 4, 9, 16, 25, 36, 49, 64, 81], result)
|
| +
|
| + def testThreadsCtrlC(self):
|
| + result = []
|
| + with self.assertRaises(KeyboardInterrupt):
|
| + with self.gc.ScopedPool(kind='threads') as pool:
|
| + # Make sure this pool is interrupted in mid-swing
|
| + for i in pool.imap(slow_square, xrange(1000000)):
|
| + if i > 32:
|
| + os.kill(os.getpid(), self.CTRL_C)
|
| + result.append(i)
|
| + self.assertEqual([0, 1, 4, 9, 16, 25], result)
|
| +
|
| + def testProcs(self):
|
| + result = []
|
| + with self.gc.ScopedPool() as pool:
|
| + for i in pool.imap(slow_square, xrange(10)):
|
| + result.append(i)
|
| + self.assertEqual([0, 1, 4, 9, 16, 25, 36, 49, 64, 81], result)
|
| +
|
| + def testProcsCtrlC(self):
|
| + result = []
|
| + with self.assertRaises(KeyboardInterrupt):
|
| + with self.gc.ScopedPool() as pool:
|
| + # Make sure this pool is interrupted in mid-swing
|
| + for i in pool.imap(slow_square, xrange(1000000)):
|
| + if i > 32:
|
| + os.kill(os.getpid(), self.CTRL_C)
|
| + result.append(i)
|
| + self.assertEqual([0, 1, 4, 9, 16, 25], result)
|
| +
|
| +
|
| +class ProgressPrinterTest(GitCommonTestBase):
|
| + class FakeStream(object):
|
| + def __init__(self):
|
| + self.data = set()
|
| + self.count = 0
|
| +
|
| + def write(self, line):
|
| + self.data.add(line)
|
| +
|
| + def flush(self):
|
| + self.count += 1
|
| +
|
| + # This test is probably racy, but I don't have a better alternative.
|
| + @unittest.expectedFailure
|
| + def testBasic(self):
|
| + fmt = '%(count)d/10'
|
| + stream = self.FakeStream()
|
| +
|
| + pp = self.gc.ProgressPrinter(fmt, enabled=True, stream=stream, period=0.01)
|
| + with pp as inc:
|
| + for _ in xrange(10):
|
| + time.sleep(0.02)
|
| + inc()
|
| +
|
| + filtered = set(x.strip() for x in stream.data)
|
| + rslt = set(fmt % {'count': i} for i in xrange(11))
|
| + self.assertSetEqual(filtered, rslt)
|
| + self.assertGreaterEqual(stream.count, 10)
|
| +
|
| +
|
| +class GitReadOnlyFunctionsTest(git_test_utils.GitRepoReadOnlyTestBase,
|
| + GitCommonTestBase):
|
| + REPO = """
|
| + A B C D
|
| + B E D
|
| + """
|
| +
|
| + COMMIT_A = {
|
| + 'some/files/file1': {'data': 'file1'},
|
| + 'some/files/file2': {'data': 'file2'},
|
| + 'some/files/file3': {'data': 'file3'},
|
| + 'some/other/file': {'data': 'otherfile'},
|
| + }
|
| +
|
| + COMMIT_C = {
|
| + 'some/files/file2': {
|
| + 'mode': 0755,
|
| + 'data': 'file2 - vanilla'},
|
| + }
|
| +
|
| + COMMIT_E = {
|
| + 'some/files/file2': {'data': 'file2 - merged'},
|
| + }
|
| +
|
| + COMMIT_D = {
|
| + 'some/files/file2': {'data': 'file2 - vanilla\nfile2 - merged'},
|
| + }
|
| +
|
| + def testHashes(self):
|
| + ret = self.repo.run(
|
| + self.gc.hashes, *[
|
| + 'master',
|
| + 'master~3',
|
| + self.repo['E']+'~',
|
| + self.repo['D']+'^2',
|
| + 'tag_C^{}',
|
| + ]
|
| + )
|
| + self.assertEqual([
|
| + self.repo['D'],
|
| + self.repo['A'],
|
| + self.repo['B'],
|
| + self.repo['E'],
|
| + self.repo['C'],
|
| + ], ret)
|
| +
|
| + def testParseCommitrefs(self):
|
| + ret = self.repo.run(
|
| + self.gc.parse_commitrefs, *[
|
| + 'master',
|
| + 'master~3',
|
| + self.repo['E']+'~',
|
| + self.repo['D']+'^2',
|
| + 'tag_C^{}',
|
| + ]
|
| + )
|
| + self.assertEqual(ret, map(binascii.unhexlify, [
|
| + self.repo['D'],
|
| + self.repo['A'],
|
| + self.repo['B'],
|
| + self.repo['E'],
|
| + self.repo['C'],
|
| + ]))
|
| +
|
| + with self.assertRaisesRegexp(Exception, r"one of \('master', 'bananas'\)"):
|
| + self.repo.run(self.gc.parse_commitrefs, 'master', 'bananas')
|
| +
|
| + def testTree(self):
|
| + tree = self.repo.run(self.gc.tree, 'master:some/files')
|
| + file1 = self.COMMIT_A['some/files/file1']['data']
|
| + file2 = self.COMMIT_D['some/files/file2']['data']
|
| + file3 = self.COMMIT_A['some/files/file3']['data']
|
| + self.assertEquals(tree['file1'],
|
| + ('100644', 'blob', git_test_utils.git_hash_data(file1)))
|
| + self.assertEquals(tree['file2'],
|
| + ('100755', 'blob', git_test_utils.git_hash_data(file2)))
|
| + self.assertEquals(tree['file3'],
|
| + ('100644', 'blob', git_test_utils.git_hash_data(file3)))
|
| +
|
| + tree = self.repo.run(self.gc.tree, 'master:some')
|
| + self.assertEquals(len(tree), 2)
|
| + # Don't check the tree hash because we're lazy :)
|
| + self.assertEquals(tree['files'][:2], ('040000', 'tree'))
|
| +
|
| + tree = self.repo.run(self.gc.tree, 'master:wat')
|
| + self.assertEqual(tree, None)
|
| +
|
| + def testTreeRecursive(self):
|
| + tree = self.repo.run(self.gc.tree, 'master:some', recurse=True)
|
| + file1 = self.COMMIT_A['some/files/file1']['data']
|
| + file2 = self.COMMIT_D['some/files/file2']['data']
|
| + file3 = self.COMMIT_A['some/files/file3']['data']
|
| + other = self.COMMIT_A['some/other/file']['data']
|
| + self.assertEquals(tree['files/file1'],
|
| + ('100644', 'blob', git_test_utils.git_hash_data(file1)))
|
| + self.assertEquals(tree['files/file2'],
|
| + ('100755', 'blob', git_test_utils.git_hash_data(file2)))
|
| + self.assertEquals(tree['files/file3'],
|
| + ('100644', 'blob', git_test_utils.git_hash_data(file3)))
|
| + self.assertEquals(tree['other/file'],
|
| + ('100644', 'blob', git_test_utils.git_hash_data(other)))
|
| +
|
| + def testError(self):
|
| + with self.assertRaisesRegexp(self.gc.CalledProcessError, 'status 1'):
|
| + self.gc.run('rev-parse', 'bananas')
|
| +
|
| +
|
| +class GitMutableFunctionsTest(git_test_utils.GitRepoReadWriteTestBase,
|
| + GitCommonTestBase):
|
| + REPO = ''
|
| +
|
| + def _intern_data(self, data):
|
| + with tempfile.TemporaryFile() as f:
|
| + f.write(data)
|
| + f.seek(0)
|
| + return self.repo.run(self.gc.intern_f, f)
|
| +
|
| + def testIndata(self):
|
| + data = 'WayCOOL SuperTest!'
|
| + data_hash = self.repo.run(
|
| + self.gc.run, 'hash-object', '-t', 'blob', '--stdin', indata=data)
|
| + self.assertEquals(data_hash, git_test_utils.git_hash_data(data))
|
| +
|
| + def testInternF(self):
|
| + data = "CoolBobcatsBro"
|
| + data_hash = self._intern_data(data)
|
| + self.assertEquals(git_test_utils.git_hash_data(data), data_hash)
|
| + self.assertEquals(data, self.repo.git('cat-file', 'blob', data_hash).stdout)
|
| +
|
| + def testMkTree(self):
|
| + tree = {}
|
| + for i in 1, 2, 3:
|
| + name = 'file%d' % i
|
| + tree[name] = ('100644', 'blob', self._intern_data(name))
|
| + tree_hash = self.repo.run(self.gc.mktree, tree)
|
| + self.assertEquals('37b61866d6e061c4ba478e7eb525be7b5752737d', tree_hash)
|
| +
|
| +
|
| +if __name__ == '__main__':
|
| + sys.exit(coverage_utils.covered_main(
|
| + os.path.join(DEPOT_TOOLS_ROOT, 'git_common.py')
|
| + ))
|
|
|