Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(377)

Side by Side Diff: third_party/twisted_8_1/twisted/python/test/test_zipstream.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 # Copyright (c) 2001-2008 Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4 """
5 Tests for L{twisted.python.zipstream}
6 """
7 import sys
8 import random
9 import md5
10 import zipfile
11
12 from twisted.python import zipstream, filepath
13 from twisted.trial import unittest
14
15 class FileEntryMixin:
16 """
17 File entry classes should behave as file-like objects
18 """
19 def getFileEntry(self, contents):
20 """
21 Return an appropriate zip file entry
22 """
23 filename = self.mktemp()
24 z = zipfile.ZipFile(filename, 'w', self.compression)
25 z.writestr('content', contents)
26 z.close()
27 z = zipstream.ChunkingZipFile(filename, 'r')
28 return z.readfile('content')
29
30
31 def test_isatty(self):
32 """
33 zip files should not be ttys, so isatty() should be false
34 """
35 self.assertEquals(self.getFileEntry('').isatty(), False)
36
37
38 def test_closed(self):
39 """
40 The C{closed} attribute should reflect whether C{close()} has been
41 called.
42 """
43 fileEntry = self.getFileEntry('')
44 self.assertEquals(fileEntry.closed, False)
45 fileEntry.close()
46 self.assertEquals(fileEntry.closed, True)
47
48
49 def test_readline(self):
50 """
51 C{readline()} should mirror L{file.readline} and return up to a single
52 deliminter.
53 """
54 fileEntry = self.getFileEntry('hoho\nho')
55 self.assertEquals(fileEntry.readline(), 'hoho\n')
56 self.assertEquals(fileEntry.readline(), 'ho')
57 self.assertEquals(fileEntry.readline(), '')
58
59
60 def test_next(self):
61 """
62 Zip file entries should implement the iterator protocol as files do.
63 """
64 fileEntry = self.getFileEntry('ho\nhoho')
65 self.assertEquals(fileEntry.next(), 'ho\n')
66 self.assertEquals(fileEntry.next(), 'hoho')
67 self.assertRaises(StopIteration, fileEntry.next)
68
69
70 def test_readlines(self):
71 """
72 C{readlines()} should return a list of all the lines.
73 """
74 fileEntry = self.getFileEntry('ho\nho\nho')
75 self.assertEquals(fileEntry.readlines(), ['ho\n', 'ho\n', 'ho'])
76
77
78 def test_iteration(self):
79 """
80 C{__iter__()} and C{xreadlines()} should return C{self}.
81 """
82 fileEntry = self.getFileEntry('')
83 self.assertIdentical(iter(fileEntry), fileEntry)
84 self.assertIdentical(fileEntry.xreadlines(), fileEntry)
85
86
87 def test_readWhole(self):
88 """
89 C{.read()} should read the entire file.
90 """
91 contents = "Hello, world!"
92 entry = self.getFileEntry(contents)
93 self.assertEquals(entry.read(), contents)
94
95
96 def test_readPartial(self):
97 """
98 C{.read(num)} should read num bytes from the file.
99 """
100 contents = "0123456789"
101 entry = self.getFileEntry(contents)
102 one = entry.read(4)
103 two = entry.read(200)
104 self.assertEquals(one, "0123")
105 self.assertEquals(two, "456789")
106
107
108 def test_tell(self):
109 """
110 C{.tell()} should return the number of bytes that have been read so
111 far.
112 """
113 contents = "x" * 100
114 entry = self.getFileEntry(contents)
115 entry.read(2)
116 self.assertEquals(entry.tell(), 2)
117 entry.read(4)
118 self.assertEquals(entry.tell(), 6)
119
120
121
122 class DeflatedZipFileEntryTest(FileEntryMixin, unittest.TestCase):
123 """
124 DeflatedZipFileEntry should be file-like
125 """
126 compression = zipfile.ZIP_DEFLATED
127
128
129
130 class ZipFileEntryTest(FileEntryMixin, unittest.TestCase):
131 """
132 ZipFileEntry should be file-like
133 """
134 compression = zipfile.ZIP_STORED
135
136
137
138 class ZipstreamTest(unittest.TestCase):
139 """
140 Tests for twisted.python.zipstream
141 """
142 def setUp(self):
143 """
144 Creates junk data that can be compressed and a test directory for any
145 files that will be created
146 """
147 self.testdir = filepath.FilePath(self.mktemp())
148 self.testdir.makedirs()
149 self.unzipdir = self.testdir.child('unzipped')
150 self.unzipdir.makedirs()
151
152
153 def makeZipFile(self, contents, directory=''):
154 """
155 Makes a zip file archive containing len(contents) files. Contents
156 should be a list of strings, each string being the content of one file.
157 """
158 zpfilename = self.testdir.child('zipfile.zip').path
159 zpfile = zipfile.ZipFile(zpfilename, 'w')
160 for i, content in enumerate(contents):
161 filename = str(i)
162 if directory:
163 filename = directory + "/" + filename
164 zpfile.writestr(filename, content)
165 zpfile.close()
166 return zpfilename
167
168
169 def test_countEntries(self):
170 """
171 Make sure the deprecated L{countZipFileEntries} returns the correct
172 number of entries for a zip file.
173 """
174 name = self.makeZipFile(["one", "two", "three", "four", "five"])
175 result = self.assertWarns(DeprecationWarning,
176 "countZipFileEntries is deprecated.",
177 __file__, lambda :
178 zipstream.countZipFileEntries(name))
179 self.assertEquals(result, 5)
180
181
182 def test_invalidMode(self):
183 """
184 A ChunkingZipFile opened in write-mode should not allow .readfile(),
185 and raise a RuntimeError instead.
186 """
187 czf = zipstream.ChunkingZipFile(self.mktemp(), "w")
188 self.assertRaises(RuntimeError, czf.readfile, "something")
189
190
191 def test_closedArchive(self):
192 """
193 A closed ChunkingZipFile should raise a L{RuntimeError} when
194 .readfile() is invoked.
195 """
196 czf = zipstream.ChunkingZipFile(self.makeZipFile(["something"]), "r")
197 czf.close()
198 self.assertRaises(RuntimeError, czf.readfile, "something")
199
200
201 def test_invalidHeader(self):
202 """
203 A zipfile entry with the wrong magic number should raise BadZipfile for
204 readfile(), but that should not affect other files in the archive.
205 """
206 fn = self.makeZipFile(["test contents",
207 "more contents"])
208 zf = zipfile.ZipFile(fn, "r")
209 zeroOffset = zf.getinfo("0").header_offset
210 zf.close()
211 # Zero out just the one header.
212 scribble = file(fn, "r+b")
213 scribble.seek(zeroOffset, 0)
214 scribble.write(chr(0) * 4)
215 scribble.close()
216 czf = zipstream.ChunkingZipFile(fn)
217 self.assertRaises(zipfile.BadZipfile, czf.readfile, "0")
218 self.assertEquals(czf.readfile("1").read(), "more contents")
219
220
221 def test_filenameMismatch(self):
222 """
223 A zipfile entry with a different filename than is found in the central
224 directory should raise BadZipfile.
225 """
226 fn = self.makeZipFile(["test contents",
227 "more contents"])
228 zf = zipfile.ZipFile(fn, "r")
229 info = zf.getinfo("0")
230 info.filename = "not zero"
231 zf.close()
232 scribble = file(fn, "r+b")
233 scribble.seek(info.header_offset, 0)
234 scribble.write(info.FileHeader())
235 scribble.close()
236
237 czf = zipstream.ChunkingZipFile(fn)
238 self.assertRaises(zipfile.BadZipfile, czf.readfile, "0")
239 self.assertEquals(czf.readfile("1").read(), "more contents")
240
241
242 if sys.version_info < (2, 5):
243 # In python 2.4 and earlier, consistency between the directory and the
244 # file header are verified at archive-opening time. In python 2.5
245 # (and, presumably, later) it is readzipfile's responsibility.
246 message = "Consistency-checking only necessary in 2.5."
247 test_invalidHeader.skip = message
248 test_filenameMismatch.skip = message
249
250
251
252 def test_unsupportedCompression(self):
253 """
254 A zipfile which describes an unsupported compression mechanism should
255 raise BadZipfile.
256 """
257 fn = self.mktemp()
258 zf = zipfile.ZipFile(fn, "w")
259 zi = zipfile.ZipInfo("0")
260 zf.writestr(zi, "some data")
261 # Mangle its compression type in the central directory; can't do this
262 # before the writestr call or zipfile will (correctly) tell us not to
263 # pass bad compression types :)
264 zi.compress_type = 1234
265 zf.close()
266
267 czf = zipstream.ChunkingZipFile(fn)
268 self.assertRaises(zipfile.BadZipfile, czf.readfile, "0")
269
270
271 def test_extraData(self):
272 """
273 readfile() should skip over 'extra' data present in the zip metadata.
274 """
275 fn = self.mktemp()
276 zf = zipfile.ZipFile(fn, 'w')
277 zi = zipfile.ZipInfo("0")
278 zi.extra = "hello, extra"
279 zf.writestr(zi, "the real data")
280 zf.close()
281 czf = zipstream.ChunkingZipFile(fn)
282 self.assertEquals(czf.readfile("0").read(), "the real data")
283
284
285 def test_unzipIter(self):
286 """
287 L{twisted.python.zipstream.unzipIter} should unzip a file for each
288 iteration and yield the number of files left to unzip after that
289 iteration
290 """
291 numfiles = 10
292 contents = ['This is test file %d!' % i for i in range(numfiles)]
293 zpfilename = self.makeZipFile(contents)
294 uziter = zipstream.unzipIter(zpfilename, self.unzipdir.path)
295 for i in range(numfiles):
296 self.assertEquals(len(list(self.unzipdir.children())), i)
297 self.assertEquals(uziter.next(), numfiles - i - 1)
298 self.assertEquals(len(list(self.unzipdir.children())), numfiles)
299
300 for child in self.unzipdir.children():
301 num = int(child.basename())
302 self.assertEquals(child.open().read(), contents[num])
303
304
305 def test_unzip(self):
306 """
307 L{twisted.python.zipstream.unzip} should extract all files from a zip
308 archive
309 """
310 numfiles = 3
311 zpfilename = self.makeZipFile([''] * numfiles)
312 zipstream.unzip(zpfilename, self.unzipdir.path)
313 self.assertEquals(len(list(self.unzipdir.children())), numfiles)
314
315
316 def test_overwrite(self):
317 """
318 L{twisted.python.zipstream.unzip} and
319 L{twisted.python.zipstream.unzipIter} shouldn't overwrite files unless
320 the 'overwrite' flag is passed
321 """
322 testfile = self.unzipdir.child('0')
323 zpfilename = self.makeZipFile(['OVERWRITTEN'])
324
325 testfile.setContent('NOT OVERWRITTEN')
326 zipstream.unzip(zpfilename, self.unzipdir.path)
327 self.assertEquals(testfile.open().read(), 'NOT OVERWRITTEN')
328 zipstream.unzip(zpfilename, self.unzipdir.path, overwrite=True)
329 self.assertEquals(testfile.open().read(), 'OVERWRITTEN')
330
331 testfile.setContent('NOT OVERWRITTEN')
332 uziter = zipstream.unzipIter(zpfilename, self.unzipdir.path)
333 uziter.next()
334 self.assertEquals(testfile.open().read(), 'NOT OVERWRITTEN')
335 uziter = zipstream.unzipIter(zpfilename, self.unzipdir.path,
336 overwrite=True)
337 uziter.next()
338 self.assertEquals(testfile.open().read(), 'OVERWRITTEN')
339
340
341 # XXX these tests are kind of gross and old, but I think unzipIterChunky is
342 # kind of a gross function anyway. We should really write an abstract
343 # copyTo/moveTo that operates on FilePath and make sure ZipPath can support
344 # it, then just deprecate / remove this stuff.
345 def _unzipIterChunkyTest(self, compression, chunksize, lower, upper):
346 """
347 unzipIterChunky should unzip the given number of bytes per iteration.
348 """
349 junk = ' '.join([str(random.random()) for n in xrange(1000)])
350 junkmd5 = md5.new(junk).hexdigest()
351
352 tempdir = filepath.FilePath(self.mktemp())
353 tempdir.makedirs()
354 zfpath = tempdir.child('bigfile.zip').path
355 self._makebigfile(zfpath, compression, junk)
356 uziter = zipstream.unzipIterChunky(zfpath, tempdir.path,
357 chunksize=chunksize)
358 r = uziter.next()
359 # test that the number of chunks is in the right ballpark;
360 # this could theoretically be any number but statistically it
361 # should always be in this range
362 approx = lower < r < upper
363 self.failUnless(approx)
364 for r in uziter:
365 pass
366 self.assertEqual(r, 0)
367 newmd5 = md5.new(
368 tempdir.child("zipstreamjunk").open().read()).hexdigest()
369 self.assertEqual(newmd5, junkmd5)
370
371 def test_unzipIterChunkyStored(self):
372 """
373 unzipIterChunky should unzip the given number of bytes per iteration on
374 a stored archive.
375 """
376 self._unzipIterChunkyTest(zipfile.ZIP_STORED, 500, 35, 45)
377
378
379 def test_chunkyDeflated(self):
380 """
381 unzipIterChunky should unzip the given number of bytes per iteration on
382 a deflated archive.
383 """
384 self._unzipIterChunkyTest(zipfile.ZIP_DEFLATED, 972, 23, 27)
385
386
387 def _makebigfile(self, filename, compression, junk):
388 """
389 Create a zip file with the given file name and compression scheme.
390 """
391 zf = zipfile.ZipFile(filename, 'w', compression)
392 for i in range(10):
393 fn = 'zipstream%d' % i
394 zf.writestr(fn, "")
395 zf.writestr('zipstreamjunk', junk)
396 zf.close()
OLDNEW
« no previous file with comments | « third_party/twisted_8_1/twisted/python/test/test_versions.py ('k') | third_party/twisted_8_1/twisted/python/text.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698