| OLD | NEW |
| (Empty) |
| 1 # Copyright (c) 2001-2008 Twisted Matrix Laboratories. | |
| 2 # See LICENSE for details. | |
| 3 | |
| 4 """ | |
| 5 Tests for L{twisted.python.zipstream} | |
| 6 """ | |
| 7 import sys | |
| 8 import random | |
| 9 import md5 | |
| 10 import zipfile | |
| 11 | |
| 12 from twisted.python import zipstream, filepath | |
| 13 from twisted.trial import unittest | |
| 14 | |
| 15 class FileEntryMixin: | |
| 16 """ | |
| 17 File entry classes should behave as file-like objects | |
| 18 """ | |
| 19 def getFileEntry(self, contents): | |
| 20 """ | |
| 21 Return an appropriate zip file entry | |
| 22 """ | |
| 23 filename = self.mktemp() | |
| 24 z = zipfile.ZipFile(filename, 'w', self.compression) | |
| 25 z.writestr('content', contents) | |
| 26 z.close() | |
| 27 z = zipstream.ChunkingZipFile(filename, 'r') | |
| 28 return z.readfile('content') | |
| 29 | |
| 30 | |
| 31 def test_isatty(self): | |
| 32 """ | |
| 33 zip files should not be ttys, so isatty() should be false | |
| 34 """ | |
| 35 self.assertEquals(self.getFileEntry('').isatty(), False) | |
| 36 | |
| 37 | |
| 38 def test_closed(self): | |
| 39 """ | |
| 40 The C{closed} attribute should reflect whether C{close()} has been | |
| 41 called. | |
| 42 """ | |
| 43 fileEntry = self.getFileEntry('') | |
| 44 self.assertEquals(fileEntry.closed, False) | |
| 45 fileEntry.close() | |
| 46 self.assertEquals(fileEntry.closed, True) | |
| 47 | |
| 48 | |
| 49 def test_readline(self): | |
| 50 """ | |
| 51 C{readline()} should mirror L{file.readline} and return up to a single | |
| 52 deliminter. | |
| 53 """ | |
| 54 fileEntry = self.getFileEntry('hoho\nho') | |
| 55 self.assertEquals(fileEntry.readline(), 'hoho\n') | |
| 56 self.assertEquals(fileEntry.readline(), 'ho') | |
| 57 self.assertEquals(fileEntry.readline(), '') | |
| 58 | |
| 59 | |
| 60 def test_next(self): | |
| 61 """ | |
| 62 Zip file entries should implement the iterator protocol as files do. | |
| 63 """ | |
| 64 fileEntry = self.getFileEntry('ho\nhoho') | |
| 65 self.assertEquals(fileEntry.next(), 'ho\n') | |
| 66 self.assertEquals(fileEntry.next(), 'hoho') | |
| 67 self.assertRaises(StopIteration, fileEntry.next) | |
| 68 | |
| 69 | |
| 70 def test_readlines(self): | |
| 71 """ | |
| 72 C{readlines()} should return a list of all the lines. | |
| 73 """ | |
| 74 fileEntry = self.getFileEntry('ho\nho\nho') | |
| 75 self.assertEquals(fileEntry.readlines(), ['ho\n', 'ho\n', 'ho']) | |
| 76 | |
| 77 | |
| 78 def test_iteration(self): | |
| 79 """ | |
| 80 C{__iter__()} and C{xreadlines()} should return C{self}. | |
| 81 """ | |
| 82 fileEntry = self.getFileEntry('') | |
| 83 self.assertIdentical(iter(fileEntry), fileEntry) | |
| 84 self.assertIdentical(fileEntry.xreadlines(), fileEntry) | |
| 85 | |
| 86 | |
| 87 def test_readWhole(self): | |
| 88 """ | |
| 89 C{.read()} should read the entire file. | |
| 90 """ | |
| 91 contents = "Hello, world!" | |
| 92 entry = self.getFileEntry(contents) | |
| 93 self.assertEquals(entry.read(), contents) | |
| 94 | |
| 95 | |
| 96 def test_readPartial(self): | |
| 97 """ | |
| 98 C{.read(num)} should read num bytes from the file. | |
| 99 """ | |
| 100 contents = "0123456789" | |
| 101 entry = self.getFileEntry(contents) | |
| 102 one = entry.read(4) | |
| 103 two = entry.read(200) | |
| 104 self.assertEquals(one, "0123") | |
| 105 self.assertEquals(two, "456789") | |
| 106 | |
| 107 | |
| 108 def test_tell(self): | |
| 109 """ | |
| 110 C{.tell()} should return the number of bytes that have been read so | |
| 111 far. | |
| 112 """ | |
| 113 contents = "x" * 100 | |
| 114 entry = self.getFileEntry(contents) | |
| 115 entry.read(2) | |
| 116 self.assertEquals(entry.tell(), 2) | |
| 117 entry.read(4) | |
| 118 self.assertEquals(entry.tell(), 6) | |
| 119 | |
| 120 | |
| 121 | |
| 122 class DeflatedZipFileEntryTest(FileEntryMixin, unittest.TestCase): | |
| 123 """ | |
| 124 DeflatedZipFileEntry should be file-like | |
| 125 """ | |
| 126 compression = zipfile.ZIP_DEFLATED | |
| 127 | |
| 128 | |
| 129 | |
| 130 class ZipFileEntryTest(FileEntryMixin, unittest.TestCase): | |
| 131 """ | |
| 132 ZipFileEntry should be file-like | |
| 133 """ | |
| 134 compression = zipfile.ZIP_STORED | |
| 135 | |
| 136 | |
| 137 | |
| 138 class ZipstreamTest(unittest.TestCase): | |
| 139 """ | |
| 140 Tests for twisted.python.zipstream | |
| 141 """ | |
| 142 def setUp(self): | |
| 143 """ | |
| 144 Creates junk data that can be compressed and a test directory for any | |
| 145 files that will be created | |
| 146 """ | |
| 147 self.testdir = filepath.FilePath(self.mktemp()) | |
| 148 self.testdir.makedirs() | |
| 149 self.unzipdir = self.testdir.child('unzipped') | |
| 150 self.unzipdir.makedirs() | |
| 151 | |
| 152 | |
| 153 def makeZipFile(self, contents, directory=''): | |
| 154 """ | |
| 155 Makes a zip file archive containing len(contents) files. Contents | |
| 156 should be a list of strings, each string being the content of one file. | |
| 157 """ | |
| 158 zpfilename = self.testdir.child('zipfile.zip').path | |
| 159 zpfile = zipfile.ZipFile(zpfilename, 'w') | |
| 160 for i, content in enumerate(contents): | |
| 161 filename = str(i) | |
| 162 if directory: | |
| 163 filename = directory + "/" + filename | |
| 164 zpfile.writestr(filename, content) | |
| 165 zpfile.close() | |
| 166 return zpfilename | |
| 167 | |
| 168 | |
| 169 def test_countEntries(self): | |
| 170 """ | |
| 171 Make sure the deprecated L{countZipFileEntries} returns the correct | |
| 172 number of entries for a zip file. | |
| 173 """ | |
| 174 name = self.makeZipFile(["one", "two", "three", "four", "five"]) | |
| 175 result = self.assertWarns(DeprecationWarning, | |
| 176 "countZipFileEntries is deprecated.", | |
| 177 __file__, lambda : | |
| 178 zipstream.countZipFileEntries(name)) | |
| 179 self.assertEquals(result, 5) | |
| 180 | |
| 181 | |
| 182 def test_invalidMode(self): | |
| 183 """ | |
| 184 A ChunkingZipFile opened in write-mode should not allow .readfile(), | |
| 185 and raise a RuntimeError instead. | |
| 186 """ | |
| 187 czf = zipstream.ChunkingZipFile(self.mktemp(), "w") | |
| 188 self.assertRaises(RuntimeError, czf.readfile, "something") | |
| 189 | |
| 190 | |
| 191 def test_closedArchive(self): | |
| 192 """ | |
| 193 A closed ChunkingZipFile should raise a L{RuntimeError} when | |
| 194 .readfile() is invoked. | |
| 195 """ | |
| 196 czf = zipstream.ChunkingZipFile(self.makeZipFile(["something"]), "r") | |
| 197 czf.close() | |
| 198 self.assertRaises(RuntimeError, czf.readfile, "something") | |
| 199 | |
| 200 | |
| 201 def test_invalidHeader(self): | |
| 202 """ | |
| 203 A zipfile entry with the wrong magic number should raise BadZipfile for | |
| 204 readfile(), but that should not affect other files in the archive. | |
| 205 """ | |
| 206 fn = self.makeZipFile(["test contents", | |
| 207 "more contents"]) | |
| 208 zf = zipfile.ZipFile(fn, "r") | |
| 209 zeroOffset = zf.getinfo("0").header_offset | |
| 210 zf.close() | |
| 211 # Zero out just the one header. | |
| 212 scribble = file(fn, "r+b") | |
| 213 scribble.seek(zeroOffset, 0) | |
| 214 scribble.write(chr(0) * 4) | |
| 215 scribble.close() | |
| 216 czf = zipstream.ChunkingZipFile(fn) | |
| 217 self.assertRaises(zipfile.BadZipfile, czf.readfile, "0") | |
| 218 self.assertEquals(czf.readfile("1").read(), "more contents") | |
| 219 | |
| 220 | |
| 221 def test_filenameMismatch(self): | |
| 222 """ | |
| 223 A zipfile entry with a different filename than is found in the central | |
| 224 directory should raise BadZipfile. | |
| 225 """ | |
| 226 fn = self.makeZipFile(["test contents", | |
| 227 "more contents"]) | |
| 228 zf = zipfile.ZipFile(fn, "r") | |
| 229 info = zf.getinfo("0") | |
| 230 info.filename = "not zero" | |
| 231 zf.close() | |
| 232 scribble = file(fn, "r+b") | |
| 233 scribble.seek(info.header_offset, 0) | |
| 234 scribble.write(info.FileHeader()) | |
| 235 scribble.close() | |
| 236 | |
| 237 czf = zipstream.ChunkingZipFile(fn) | |
| 238 self.assertRaises(zipfile.BadZipfile, czf.readfile, "0") | |
| 239 self.assertEquals(czf.readfile("1").read(), "more contents") | |
| 240 | |
| 241 | |
| 242 if sys.version_info < (2, 5): | |
| 243 # In python 2.4 and earlier, consistency between the directory and the | |
| 244 # file header are verified at archive-opening time. In python 2.5 | |
| 245 # (and, presumably, later) it is readzipfile's responsibility. | |
| 246 message = "Consistency-checking only necessary in 2.5." | |
| 247 test_invalidHeader.skip = message | |
| 248 test_filenameMismatch.skip = message | |
| 249 | |
| 250 | |
| 251 | |
| 252 def test_unsupportedCompression(self): | |
| 253 """ | |
| 254 A zipfile which describes an unsupported compression mechanism should | |
| 255 raise BadZipfile. | |
| 256 """ | |
| 257 fn = self.mktemp() | |
| 258 zf = zipfile.ZipFile(fn, "w") | |
| 259 zi = zipfile.ZipInfo("0") | |
| 260 zf.writestr(zi, "some data") | |
| 261 # Mangle its compression type in the central directory; can't do this | |
| 262 # before the writestr call or zipfile will (correctly) tell us not to | |
| 263 # pass bad compression types :) | |
| 264 zi.compress_type = 1234 | |
| 265 zf.close() | |
| 266 | |
| 267 czf = zipstream.ChunkingZipFile(fn) | |
| 268 self.assertRaises(zipfile.BadZipfile, czf.readfile, "0") | |
| 269 | |
| 270 | |
| 271 def test_extraData(self): | |
| 272 """ | |
| 273 readfile() should skip over 'extra' data present in the zip metadata. | |
| 274 """ | |
| 275 fn = self.mktemp() | |
| 276 zf = zipfile.ZipFile(fn, 'w') | |
| 277 zi = zipfile.ZipInfo("0") | |
| 278 zi.extra = "hello, extra" | |
| 279 zf.writestr(zi, "the real data") | |
| 280 zf.close() | |
| 281 czf = zipstream.ChunkingZipFile(fn) | |
| 282 self.assertEquals(czf.readfile("0").read(), "the real data") | |
| 283 | |
| 284 | |
| 285 def test_unzipIter(self): | |
| 286 """ | |
| 287 L{twisted.python.zipstream.unzipIter} should unzip a file for each | |
| 288 iteration and yield the number of files left to unzip after that | |
| 289 iteration | |
| 290 """ | |
| 291 numfiles = 10 | |
| 292 contents = ['This is test file %d!' % i for i in range(numfiles)] | |
| 293 zpfilename = self.makeZipFile(contents) | |
| 294 uziter = zipstream.unzipIter(zpfilename, self.unzipdir.path) | |
| 295 for i in range(numfiles): | |
| 296 self.assertEquals(len(list(self.unzipdir.children())), i) | |
| 297 self.assertEquals(uziter.next(), numfiles - i - 1) | |
| 298 self.assertEquals(len(list(self.unzipdir.children())), numfiles) | |
| 299 | |
| 300 for child in self.unzipdir.children(): | |
| 301 num = int(child.basename()) | |
| 302 self.assertEquals(child.open().read(), contents[num]) | |
| 303 | |
| 304 | |
| 305 def test_unzip(self): | |
| 306 """ | |
| 307 L{twisted.python.zipstream.unzip} should extract all files from a zip | |
| 308 archive | |
| 309 """ | |
| 310 numfiles = 3 | |
| 311 zpfilename = self.makeZipFile([''] * numfiles) | |
| 312 zipstream.unzip(zpfilename, self.unzipdir.path) | |
| 313 self.assertEquals(len(list(self.unzipdir.children())), numfiles) | |
| 314 | |
| 315 | |
| 316 def test_overwrite(self): | |
| 317 """ | |
| 318 L{twisted.python.zipstream.unzip} and | |
| 319 L{twisted.python.zipstream.unzipIter} shouldn't overwrite files unless | |
| 320 the 'overwrite' flag is passed | |
| 321 """ | |
| 322 testfile = self.unzipdir.child('0') | |
| 323 zpfilename = self.makeZipFile(['OVERWRITTEN']) | |
| 324 | |
| 325 testfile.setContent('NOT OVERWRITTEN') | |
| 326 zipstream.unzip(zpfilename, self.unzipdir.path) | |
| 327 self.assertEquals(testfile.open().read(), 'NOT OVERWRITTEN') | |
| 328 zipstream.unzip(zpfilename, self.unzipdir.path, overwrite=True) | |
| 329 self.assertEquals(testfile.open().read(), 'OVERWRITTEN') | |
| 330 | |
| 331 testfile.setContent('NOT OVERWRITTEN') | |
| 332 uziter = zipstream.unzipIter(zpfilename, self.unzipdir.path) | |
| 333 uziter.next() | |
| 334 self.assertEquals(testfile.open().read(), 'NOT OVERWRITTEN') | |
| 335 uziter = zipstream.unzipIter(zpfilename, self.unzipdir.path, | |
| 336 overwrite=True) | |
| 337 uziter.next() | |
| 338 self.assertEquals(testfile.open().read(), 'OVERWRITTEN') | |
| 339 | |
| 340 | |
| 341 # XXX these tests are kind of gross and old, but I think unzipIterChunky is | |
| 342 # kind of a gross function anyway. We should really write an abstract | |
| 343 # copyTo/moveTo that operates on FilePath and make sure ZipPath can support | |
| 344 # it, then just deprecate / remove this stuff. | |
| 345 def _unzipIterChunkyTest(self, compression, chunksize, lower, upper): | |
| 346 """ | |
| 347 unzipIterChunky should unzip the given number of bytes per iteration. | |
| 348 """ | |
| 349 junk = ' '.join([str(random.random()) for n in xrange(1000)]) | |
| 350 junkmd5 = md5.new(junk).hexdigest() | |
| 351 | |
| 352 tempdir = filepath.FilePath(self.mktemp()) | |
| 353 tempdir.makedirs() | |
| 354 zfpath = tempdir.child('bigfile.zip').path | |
| 355 self._makebigfile(zfpath, compression, junk) | |
| 356 uziter = zipstream.unzipIterChunky(zfpath, tempdir.path, | |
| 357 chunksize=chunksize) | |
| 358 r = uziter.next() | |
| 359 # test that the number of chunks is in the right ballpark; | |
| 360 # this could theoretically be any number but statistically it | |
| 361 # should always be in this range | |
| 362 approx = lower < r < upper | |
| 363 self.failUnless(approx) | |
| 364 for r in uziter: | |
| 365 pass | |
| 366 self.assertEqual(r, 0) | |
| 367 newmd5 = md5.new( | |
| 368 tempdir.child("zipstreamjunk").open().read()).hexdigest() | |
| 369 self.assertEqual(newmd5, junkmd5) | |
| 370 | |
| 371 def test_unzipIterChunkyStored(self): | |
| 372 """ | |
| 373 unzipIterChunky should unzip the given number of bytes per iteration on | |
| 374 a stored archive. | |
| 375 """ | |
| 376 self._unzipIterChunkyTest(zipfile.ZIP_STORED, 500, 35, 45) | |
| 377 | |
| 378 | |
| 379 def test_chunkyDeflated(self): | |
| 380 """ | |
| 381 unzipIterChunky should unzip the given number of bytes per iteration on | |
| 382 a deflated archive. | |
| 383 """ | |
| 384 self._unzipIterChunkyTest(zipfile.ZIP_DEFLATED, 972, 23, 27) | |
| 385 | |
| 386 | |
| 387 def _makebigfile(self, filename, compression, junk): | |
| 388 """ | |
| 389 Create a zip file with the given file name and compression scheme. | |
| 390 """ | |
| 391 zf = zipfile.ZipFile(filename, 'w', compression) | |
| 392 for i in range(10): | |
| 393 fn = 'zipstream%d' % i | |
| 394 zf.writestr(fn, "") | |
| 395 zf.writestr('zipstreamjunk', junk) | |
| 396 zf.close() | |
| OLD | NEW |