Index: third_party/twisted_8_1/twisted/python/test/test_zipstream.py |
diff --git a/third_party/twisted_8_1/twisted/python/test/test_zipstream.py b/third_party/twisted_8_1/twisted/python/test/test_zipstream.py |
deleted file mode 100644 |
index 5ccb0330a064876131d8185d840766ec318a41b6..0000000000000000000000000000000000000000 |
--- a/third_party/twisted_8_1/twisted/python/test/test_zipstream.py |
+++ /dev/null |
@@ -1,396 +0,0 @@ |
-# Copyright (c) 2001-2008 Twisted Matrix Laboratories. |
-# See LICENSE for details. |
- |
-""" |
-Tests for L{twisted.python.zipstream} |
-""" |
-import sys |
-import random |
-import md5 |
-import zipfile |
- |
-from twisted.python import zipstream, filepath |
-from twisted.trial import unittest |
- |
-class FileEntryMixin: |
- """ |
- File entry classes should behave as file-like objects |
- """ |
- def getFileEntry(self, contents): |
- """ |
- Return an appropriate zip file entry |
- """ |
- filename = self.mktemp() |
- z = zipfile.ZipFile(filename, 'w', self.compression) |
- z.writestr('content', contents) |
- z.close() |
- z = zipstream.ChunkingZipFile(filename, 'r') |
- return z.readfile('content') |
- |
- |
- def test_isatty(self): |
- """ |
- zip files should not be ttys, so isatty() should be false |
- """ |
- self.assertEquals(self.getFileEntry('').isatty(), False) |
- |
- |
- def test_closed(self): |
- """ |
- The C{closed} attribute should reflect whether C{close()} has been |
- called. |
- """ |
- fileEntry = self.getFileEntry('') |
- self.assertEquals(fileEntry.closed, False) |
- fileEntry.close() |
- self.assertEquals(fileEntry.closed, True) |
- |
- |
- def test_readline(self): |
- """ |
- C{readline()} should mirror L{file.readline} and return up to a single |
- deliminter. |
- """ |
- fileEntry = self.getFileEntry('hoho\nho') |
- self.assertEquals(fileEntry.readline(), 'hoho\n') |
- self.assertEquals(fileEntry.readline(), 'ho') |
- self.assertEquals(fileEntry.readline(), '') |
- |
- |
- def test_next(self): |
- """ |
- Zip file entries should implement the iterator protocol as files do. |
- """ |
- fileEntry = self.getFileEntry('ho\nhoho') |
- self.assertEquals(fileEntry.next(), 'ho\n') |
- self.assertEquals(fileEntry.next(), 'hoho') |
- self.assertRaises(StopIteration, fileEntry.next) |
- |
- |
- def test_readlines(self): |
- """ |
- C{readlines()} should return a list of all the lines. |
- """ |
- fileEntry = self.getFileEntry('ho\nho\nho') |
- self.assertEquals(fileEntry.readlines(), ['ho\n', 'ho\n', 'ho']) |
- |
- |
- def test_iteration(self): |
- """ |
- C{__iter__()} and C{xreadlines()} should return C{self}. |
- """ |
- fileEntry = self.getFileEntry('') |
- self.assertIdentical(iter(fileEntry), fileEntry) |
- self.assertIdentical(fileEntry.xreadlines(), fileEntry) |
- |
- |
- def test_readWhole(self): |
- """ |
- C{.read()} should read the entire file. |
- """ |
- contents = "Hello, world!" |
- entry = self.getFileEntry(contents) |
- self.assertEquals(entry.read(), contents) |
- |
- |
- def test_readPartial(self): |
- """ |
- C{.read(num)} should read num bytes from the file. |
- """ |
- contents = "0123456789" |
- entry = self.getFileEntry(contents) |
- one = entry.read(4) |
- two = entry.read(200) |
- self.assertEquals(one, "0123") |
- self.assertEquals(two, "456789") |
- |
- |
- def test_tell(self): |
- """ |
- C{.tell()} should return the number of bytes that have been read so |
- far. |
- """ |
- contents = "x" * 100 |
- entry = self.getFileEntry(contents) |
- entry.read(2) |
- self.assertEquals(entry.tell(), 2) |
- entry.read(4) |
- self.assertEquals(entry.tell(), 6) |
- |
- |
- |
-class DeflatedZipFileEntryTest(FileEntryMixin, unittest.TestCase): |
- """ |
- DeflatedZipFileEntry should be file-like |
- """ |
- compression = zipfile.ZIP_DEFLATED |
- |
- |
- |
-class ZipFileEntryTest(FileEntryMixin, unittest.TestCase): |
- """ |
- ZipFileEntry should be file-like |
- """ |
- compression = zipfile.ZIP_STORED |
- |
- |
- |
-class ZipstreamTest(unittest.TestCase): |
- """ |
- Tests for twisted.python.zipstream |
- """ |
- def setUp(self): |
- """ |
- Creates junk data that can be compressed and a test directory for any |
- files that will be created |
- """ |
- self.testdir = filepath.FilePath(self.mktemp()) |
- self.testdir.makedirs() |
- self.unzipdir = self.testdir.child('unzipped') |
- self.unzipdir.makedirs() |
- |
- |
- def makeZipFile(self, contents, directory=''): |
- """ |
- Makes a zip file archive containing len(contents) files. Contents |
- should be a list of strings, each string being the content of one file. |
- """ |
- zpfilename = self.testdir.child('zipfile.zip').path |
- zpfile = zipfile.ZipFile(zpfilename, 'w') |
- for i, content in enumerate(contents): |
- filename = str(i) |
- if directory: |
- filename = directory + "/" + filename |
- zpfile.writestr(filename, content) |
- zpfile.close() |
- return zpfilename |
- |
- |
- def test_countEntries(self): |
- """ |
- Make sure the deprecated L{countZipFileEntries} returns the correct |
- number of entries for a zip file. |
- """ |
- name = self.makeZipFile(["one", "two", "three", "four", "five"]) |
- result = self.assertWarns(DeprecationWarning, |
- "countZipFileEntries is deprecated.", |
- __file__, lambda : |
- zipstream.countZipFileEntries(name)) |
- self.assertEquals(result, 5) |
- |
- |
- def test_invalidMode(self): |
- """ |
- A ChunkingZipFile opened in write-mode should not allow .readfile(), |
- and raise a RuntimeError instead. |
- """ |
- czf = zipstream.ChunkingZipFile(self.mktemp(), "w") |
- self.assertRaises(RuntimeError, czf.readfile, "something") |
- |
- |
- def test_closedArchive(self): |
- """ |
- A closed ChunkingZipFile should raise a L{RuntimeError} when |
- .readfile() is invoked. |
- """ |
- czf = zipstream.ChunkingZipFile(self.makeZipFile(["something"]), "r") |
- czf.close() |
- self.assertRaises(RuntimeError, czf.readfile, "something") |
- |
- |
- def test_invalidHeader(self): |
- """ |
- A zipfile entry with the wrong magic number should raise BadZipfile for |
- readfile(), but that should not affect other files in the archive. |
- """ |
- fn = self.makeZipFile(["test contents", |
- "more contents"]) |
- zf = zipfile.ZipFile(fn, "r") |
- zeroOffset = zf.getinfo("0").header_offset |
- zf.close() |
- # Zero out just the one header. |
- scribble = file(fn, "r+b") |
- scribble.seek(zeroOffset, 0) |
- scribble.write(chr(0) * 4) |
- scribble.close() |
- czf = zipstream.ChunkingZipFile(fn) |
- self.assertRaises(zipfile.BadZipfile, czf.readfile, "0") |
- self.assertEquals(czf.readfile("1").read(), "more contents") |
- |
- |
- def test_filenameMismatch(self): |
- """ |
- A zipfile entry with a different filename than is found in the central |
- directory should raise BadZipfile. |
- """ |
- fn = self.makeZipFile(["test contents", |
- "more contents"]) |
- zf = zipfile.ZipFile(fn, "r") |
- info = zf.getinfo("0") |
- info.filename = "not zero" |
- zf.close() |
- scribble = file(fn, "r+b") |
- scribble.seek(info.header_offset, 0) |
- scribble.write(info.FileHeader()) |
- scribble.close() |
- |
- czf = zipstream.ChunkingZipFile(fn) |
- self.assertRaises(zipfile.BadZipfile, czf.readfile, "0") |
- self.assertEquals(czf.readfile("1").read(), "more contents") |
- |
- |
- if sys.version_info < (2, 5): |
- # In python 2.4 and earlier, consistency between the directory and the |
- # file header are verified at archive-opening time. In python 2.5 |
- # (and, presumably, later) it is readzipfile's responsibility. |
- message = "Consistency-checking only necessary in 2.5." |
- test_invalidHeader.skip = message |
- test_filenameMismatch.skip = message |
- |
- |
- |
- def test_unsupportedCompression(self): |
- """ |
- A zipfile which describes an unsupported compression mechanism should |
- raise BadZipfile. |
- """ |
- fn = self.mktemp() |
- zf = zipfile.ZipFile(fn, "w") |
- zi = zipfile.ZipInfo("0") |
- zf.writestr(zi, "some data") |
- # Mangle its compression type in the central directory; can't do this |
- # before the writestr call or zipfile will (correctly) tell us not to |
- # pass bad compression types :) |
- zi.compress_type = 1234 |
- zf.close() |
- |
- czf = zipstream.ChunkingZipFile(fn) |
- self.assertRaises(zipfile.BadZipfile, czf.readfile, "0") |
- |
- |
- def test_extraData(self): |
- """ |
- readfile() should skip over 'extra' data present in the zip metadata. |
- """ |
- fn = self.mktemp() |
- zf = zipfile.ZipFile(fn, 'w') |
- zi = zipfile.ZipInfo("0") |
- zi.extra = "hello, extra" |
- zf.writestr(zi, "the real data") |
- zf.close() |
- czf = zipstream.ChunkingZipFile(fn) |
- self.assertEquals(czf.readfile("0").read(), "the real data") |
- |
- |
- def test_unzipIter(self): |
- """ |
- L{twisted.python.zipstream.unzipIter} should unzip a file for each |
- iteration and yield the number of files left to unzip after that |
- iteration |
- """ |
- numfiles = 10 |
- contents = ['This is test file %d!' % i for i in range(numfiles)] |
- zpfilename = self.makeZipFile(contents) |
- uziter = zipstream.unzipIter(zpfilename, self.unzipdir.path) |
- for i in range(numfiles): |
- self.assertEquals(len(list(self.unzipdir.children())), i) |
- self.assertEquals(uziter.next(), numfiles - i - 1) |
- self.assertEquals(len(list(self.unzipdir.children())), numfiles) |
- |
- for child in self.unzipdir.children(): |
- num = int(child.basename()) |
- self.assertEquals(child.open().read(), contents[num]) |
- |
- |
- def test_unzip(self): |
- """ |
- L{twisted.python.zipstream.unzip} should extract all files from a zip |
- archive |
- """ |
- numfiles = 3 |
- zpfilename = self.makeZipFile([''] * numfiles) |
- zipstream.unzip(zpfilename, self.unzipdir.path) |
- self.assertEquals(len(list(self.unzipdir.children())), numfiles) |
- |
- |
- def test_overwrite(self): |
- """ |
- L{twisted.python.zipstream.unzip} and |
- L{twisted.python.zipstream.unzipIter} shouldn't overwrite files unless |
- the 'overwrite' flag is passed |
- """ |
- testfile = self.unzipdir.child('0') |
- zpfilename = self.makeZipFile(['OVERWRITTEN']) |
- |
- testfile.setContent('NOT OVERWRITTEN') |
- zipstream.unzip(zpfilename, self.unzipdir.path) |
- self.assertEquals(testfile.open().read(), 'NOT OVERWRITTEN') |
- zipstream.unzip(zpfilename, self.unzipdir.path, overwrite=True) |
- self.assertEquals(testfile.open().read(), 'OVERWRITTEN') |
- |
- testfile.setContent('NOT OVERWRITTEN') |
- uziter = zipstream.unzipIter(zpfilename, self.unzipdir.path) |
- uziter.next() |
- self.assertEquals(testfile.open().read(), 'NOT OVERWRITTEN') |
- uziter = zipstream.unzipIter(zpfilename, self.unzipdir.path, |
- overwrite=True) |
- uziter.next() |
- self.assertEquals(testfile.open().read(), 'OVERWRITTEN') |
- |
- |
- # XXX these tests are kind of gross and old, but I think unzipIterChunky is |
- # kind of a gross function anyway. We should really write an abstract |
- # copyTo/moveTo that operates on FilePath and make sure ZipPath can support |
- # it, then just deprecate / remove this stuff. |
- def _unzipIterChunkyTest(self, compression, chunksize, lower, upper): |
- """ |
- unzipIterChunky should unzip the given number of bytes per iteration. |
- """ |
- junk = ' '.join([str(random.random()) for n in xrange(1000)]) |
- junkmd5 = md5.new(junk).hexdigest() |
- |
- tempdir = filepath.FilePath(self.mktemp()) |
- tempdir.makedirs() |
- zfpath = tempdir.child('bigfile.zip').path |
- self._makebigfile(zfpath, compression, junk) |
- uziter = zipstream.unzipIterChunky(zfpath, tempdir.path, |
- chunksize=chunksize) |
- r = uziter.next() |
- # test that the number of chunks is in the right ballpark; |
- # this could theoretically be any number but statistically it |
- # should always be in this range |
- approx = lower < r < upper |
- self.failUnless(approx) |
- for r in uziter: |
- pass |
- self.assertEqual(r, 0) |
- newmd5 = md5.new( |
- tempdir.child("zipstreamjunk").open().read()).hexdigest() |
- self.assertEqual(newmd5, junkmd5) |
- |
- def test_unzipIterChunkyStored(self): |
- """ |
- unzipIterChunky should unzip the given number of bytes per iteration on |
- a stored archive. |
- """ |
- self._unzipIterChunkyTest(zipfile.ZIP_STORED, 500, 35, 45) |
- |
- |
- def test_chunkyDeflated(self): |
- """ |
- unzipIterChunky should unzip the given number of bytes per iteration on |
- a deflated archive. |
- """ |
- self._unzipIterChunkyTest(zipfile.ZIP_DEFLATED, 972, 23, 27) |
- |
- |
- def _makebigfile(self, filename, compression, junk): |
- """ |
- Create a zip file with the given file name and compression scheme. |
- """ |
- zf = zipfile.ZipFile(filename, 'w', compression) |
- for i in range(10): |
- fn = 'zipstream%d' % i |
- zf.writestr(fn, "") |
- zf.writestr('zipstreamjunk', junk) |
- zf.close() |