| OLD | NEW |
| (Empty) |
| 1 # Copyright (c) 2001-2004 Twisted Matrix Laboratories. | |
| 2 # See LICENSE for details. | |
| 3 | |
| 4 | |
| 5 | |
| 6 """ | |
| 7 Test cases for dirdbm module. | |
| 8 """ | |
| 9 | |
| 10 from twisted.trial import unittest | |
| 11 from twisted.persisted import dirdbm | |
| 12 import os, shutil, glob | |
| 13 | |
| 14 class DirDbmTestCase(unittest.TestCase): | |
| 15 | |
| 16 def setUp(self): | |
| 17 self.path = self.mktemp() | |
| 18 self.dbm = dirdbm.open(self.path) | |
| 19 self.items = (('abc', 'foo'), ('/lalal', '\000\001'), ('\000\012', 'baz'
)) | |
| 20 | |
| 21 def tearDown(self): | |
| 22 shutil.rmtree(self.path) | |
| 23 | |
| 24 def testAll(self): | |
| 25 k = "//==".decode("base64") | |
| 26 self.dbm[k] = "a" | |
| 27 self.dbm[k] = "a" | |
| 28 self.assertEquals(self.dbm[k], "a") | |
| 29 | |
| 30 def testRebuildInteraction(self): | |
| 31 from twisted.persisted import dirdbm | |
| 32 from twisted.python import rebuild | |
| 33 | |
| 34 s = dirdbm.Shelf('dirdbm.rebuild.test') | |
| 35 s['key'] = 'value' | |
| 36 rebuild.rebuild(dirdbm) | |
| 37 # print s['key'] | |
| 38 | |
| 39 def testDbm(self): | |
| 40 d = self.dbm | |
| 41 | |
| 42 # insert keys | |
| 43 keys = [] | |
| 44 values = [] | |
| 45 for k, v in self.items: | |
| 46 d[k] = v | |
| 47 keys.append(k) | |
| 48 values.append(v) | |
| 49 keys.sort() | |
| 50 values.sort() | |
| 51 | |
| 52 # check they exist | |
| 53 for k, v in self.items: | |
| 54 assert d.has_key(k), "has_key() failed" | |
| 55 assert d[k] == v, "database has wrong value" | |
| 56 | |
| 57 # check non existent key | |
| 58 try: | |
| 59 d["XXX"] | |
| 60 except KeyError: | |
| 61 pass | |
| 62 else: | |
| 63 assert 0, "didn't raise KeyError on non-existent key" | |
| 64 | |
| 65 # check keys(), values() and items() | |
| 66 dbkeys = list(d.keys()) | |
| 67 dbvalues = list(d.values()) | |
| 68 dbitems = list(d.items()) | |
| 69 dbkeys.sort() | |
| 70 dbvalues.sort() | |
| 71 dbitems.sort() | |
| 72 items = list(self.items) | |
| 73 items.sort() | |
| 74 assert keys == dbkeys, ".keys() output didn't match: %s != %s" % (repr(k
eys), repr(dbkeys)) | |
| 75 assert values == dbvalues, ".values() output didn't match: %s != %s" % (
repr(values), repr(dbvalues)) | |
| 76 assert items == dbitems, "items() didn't match: %s != %s" % (repr(items)
, repr(dbitems)) | |
| 77 | |
| 78 copyPath = self.mktemp() | |
| 79 d2 = d.copyTo(copyPath) | |
| 80 | |
| 81 copykeys = list(d.keys()) | |
| 82 copyvalues = list(d.values()) | |
| 83 copyitems = list(d.items()) | |
| 84 copykeys.sort() | |
| 85 copyvalues.sort() | |
| 86 copyitems.sort() | |
| 87 | |
| 88 assert dbkeys == copykeys, ".copyTo().keys() didn't match: %s != %s" % (
repr(dbkeys), repr(copykeys)) | |
| 89 assert dbvalues == copyvalues, ".copyTo().values() didn't match: %s != %
s" % (repr(dbvalues), repr(copyvalues)) | |
| 90 assert dbitems == copyitems, ".copyTo().items() didn't match: %s != %s"
% (repr(dbkeys), repr(copyitems)) | |
| 91 | |
| 92 d2.clear() | |
| 93 assert len(d2.keys()) == len(d2.values()) == len(d2.items()) == 0, ".cle
ar() failed" | |
| 94 shutil.rmtree(copyPath) | |
| 95 | |
| 96 # delete items | |
| 97 for k, v in self.items: | |
| 98 del d[k] | |
| 99 assert not d.has_key(k), "has_key() even though we deleted it" | |
| 100 assert len(d.keys()) == 0, "database has keys" | |
| 101 assert len(d.values()) == 0, "database has values" | |
| 102 assert len(d.items()) == 0, "database has items" | |
| 103 | |
| 104 | |
| 105 def testModificationTime(self): | |
| 106 import time | |
| 107 # the mtime value for files comes from a different place than the | |
| 108 # gettimeofday() system call. On linux, gettimeofday() can be | |
| 109 # slightly ahead (due to clock drift which gettimeofday() takes into | |
| 110 # account but which open()/write()/close() do not), and if we are | |
| 111 # close to the edge of the next second, time.time() can give a value | |
| 112 # which is larger than the mtime which results from a subsequent | |
| 113 # write(). I consider this a kernel bug, but it is beyond the scope | |
| 114 # of this test. Thus we keep the range of acceptability to 3 seconds tim
e. | |
| 115 # -warner | |
| 116 self.dbm["k"] = "v" | |
| 117 self.assert_(abs(time.time() - self.dbm.getModificationTime("k")) <= 3) | |
| 118 | |
| 119 def testRecovery(self): | |
| 120 """DirDBM: test recovery from directory after a faked crash""" | |
| 121 k = self.dbm._encode("key1") | |
| 122 f = open(os.path.join(self.path, k + ".rpl"), "wb") | |
| 123 f.write("value") | |
| 124 f.close() | |
| 125 | |
| 126 k2 = self.dbm._encode("key2") | |
| 127 f = open(os.path.join(self.path, k2), "wb") | |
| 128 f.write("correct") | |
| 129 f.close() | |
| 130 f = open(os.path.join(self.path, k2 + ".rpl"), "wb") | |
| 131 f.write("wrong") | |
| 132 f.close() | |
| 133 | |
| 134 f = open(os.path.join(self.path, "aa.new"), "wb") | |
| 135 f.write("deleted") | |
| 136 f.close() | |
| 137 | |
| 138 dbm = dirdbm.DirDBM(self.path) | |
| 139 assert dbm["key1"] == "value" | |
| 140 assert dbm["key2"] == "correct" | |
| 141 assert not glob.glob(os.path.join(self.path, "*.new")) | |
| 142 assert not glob.glob(os.path.join(self.path, "*.rpl")) | |
| 143 | |
| 144 | |
| 145 def test_nonStringKeys(self): | |
| 146 """ | |
| 147 L{dirdbm.DirDBM} operations only support string keys: other types | |
| 148 should raise a C{AssertionError}. This really ought to be a | |
| 149 C{TypeError}, but it'll stay like this for backward compatibility. | |
| 150 """ | |
| 151 self.assertRaises(AssertionError, self.dbm.__setitem__, 2, "3") | |
| 152 try: | |
| 153 self.assertRaises(AssertionError, self.dbm.__setitem__, "2", 3) | |
| 154 except unittest.FailTest: | |
| 155 # dirdbm.Shelf.__setitem__ supports non-string values | |
| 156 self.assertIsInstance(self.dbm, dirdbm.Shelf) | |
| 157 self.assertRaises(AssertionError, self.dbm.__getitem__, 2) | |
| 158 self.assertRaises(AssertionError, self.dbm.__delitem__, 2) | |
| 159 self.assertRaises(AssertionError, self.dbm.has_key, 2) | |
| 160 self.assertRaises(AssertionError, self.dbm.__contains__, 2) | |
| 161 self.assertRaises(AssertionError, self.dbm.getModificationTime, 2) | |
| 162 | |
| 163 | |
| 164 | |
| 165 class ShelfTestCase(DirDbmTestCase): | |
| 166 | |
| 167 def setUp(self): | |
| 168 self.path = self.mktemp() | |
| 169 self.dbm = dirdbm.Shelf(self.path) | |
| 170 self.items = (('abc', 'foo'), ('/lalal', '\000\001'), ('\000\012', 'baz'
), | |
| 171 ('int', 12), ('float', 12.0), ('tuple', (None, 12))) | |
| 172 | |
| 173 | |
| 174 testCases = [DirDbmTestCase, ShelfTestCase] | |
| OLD | NEW |