OLD | NEW |
(Empty) | |
| 1 # Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 # Use of this source code is governed under the Apache License, Version 2.0 |
| 3 # that can be found in the LICENSE file. |
| 4 |
| 5 import sys |
| 6 import os |
| 7 import subprocess |
| 8 import unittest |
| 9 import shutil |
| 10 import tempfile |
| 11 |
| 12 from cStringIO import StringIO |
| 13 from StringIO import StringIO as SlowStringIO |
| 14 |
| 15 import arfile |
| 16 import cli |
| 17 |
| 18 if not hasattr(subprocess, 'DEVNULL'): |
| 19 subprocess.DEVNULL = file(os.devnull, 'wb') |
| 20 |
| 21 |
| 22 class CloseSaveStringIO(SlowStringIO): |
| 23 def close(self): |
| 24 _value = self.getvalue() |
| 25 self.getvalue = lambda: _value |
| 26 SlowStringIO.close(self) |
| 27 |
| 28 |
| 29 AR_TEST_SIMPLE1 = ( |
| 30 # ar file header |
| 31 '!<arch>\n' + |
| 32 # File 1 |
| 33 # ---------------------- |
| 34 # (16 bytes) simple file |
| 35 'filename1 ' + |
| 36 # (12 bytes) modification time |
| 37 '123 ' + |
| 38 # (6 bytes) user id |
| 39 '1000 ' + |
| 40 # (6 bytes) group id |
| 41 '1000 ' + |
| 42 # (8 bytes) file mode |
| 43 '100640 ' + |
| 44 # (10 bytes) data size |
| 45 '6 ' + |
| 46 # (2 bytes) file magic |
| 47 '\x60\n' + |
| 48 # File data |
| 49 'abc123' + |
| 50 # Finished |
| 51 '') |
| 52 |
| 53 AR_TEST_BSD1 = ( |
| 54 # ar file header |
| 55 '!<arch>\n' + |
| 56 # File 1 |
| 57 # ---------------------- |
| 58 # (16 bytes) BSD style filename length |
| 59 '#1/9 ' + |
| 60 # (12 bytes) modification time |
| 61 '1234 ' + |
| 62 # (6 bytes) user id |
| 63 '1001 ' + |
| 64 # (6 bytes) group id |
| 65 '1001 ' + |
| 66 # (8 bytes) file mode |
| 67 '100644 ' + |
| 68 # (10 bytes) data size |
| 69 '15 ' + |
| 70 # (2 bytes) file magic |
| 71 '\x60\n' + |
| 72 # BSD style filename |
| 73 'filename1' + |
| 74 # File data |
| 75 'abc123' + |
| 76 # Padding |
| 77 '\n' + |
| 78 # Finished |
| 79 '') |
| 80 |
| 81 AR_TEST_BSD2 = ( |
| 82 # ar file header |
| 83 '!<arch>\n' + |
| 84 |
| 85 # File 1 |
| 86 # ---------------------- |
| 87 # (16 bytes) filename len |
| 88 '#1/5 ' + |
| 89 # (12 bytes) mtime |
| 90 '1447140471 ' + |
| 91 # (6 bytes) owner id |
| 92 '1000 ' + |
| 93 # (6 bytes) group id |
| 94 '1000 ' + |
| 95 # (8 bytes) file mode |
| 96 '100640 ' + |
| 97 # (10 bytes) Data size |
| 98 '13 ' + |
| 99 # (2 bytes) File magic |
| 100 '\x60\n' + |
| 101 # (9 bytes) File name |
| 102 'file1' + |
| 103 # (6 bytes) File data |
| 104 'contents' + |
| 105 # (1 byte) Padding |
| 106 '\n' + |
| 107 |
| 108 # File 2 |
| 109 # ---------------------- |
| 110 # (16 bytes) filename len |
| 111 '#1/7 ' + |
| 112 # (12 bytes) mtime |
| 113 '1447140471 ' + |
| 114 # (6 bytes) owner id |
| 115 '1000 ' + |
| 116 # (6 bytes) group id |
| 117 '1000 ' + |
| 118 # (8 bytes) file mode |
| 119 '100640 ' + |
| 120 # (10 bytes) Data size |
| 121 '10 ' + |
| 122 # (2 bytes) File magic |
| 123 '\x60\n' + |
| 124 # (9 bytes) File name |
| 125 'fileabc' + |
| 126 # (6 bytes) File data |
| 127 '123' + |
| 128 # (0 byte) No padding |
| 129 '' + |
| 130 |
| 131 # File 3 |
| 132 # ---------------------- |
| 133 # (16 bytes) filename len |
| 134 '#1/10 ' + |
| 135 # (12 bytes) mtime |
| 136 '1447140471 ' + |
| 137 # (6 bytes) owner id |
| 138 '1000 ' + |
| 139 # (6 bytes) group id |
| 140 '1000 ' + |
| 141 # (8 bytes) file mode |
| 142 '100640 ' + |
| 143 # (10 bytes) Data size |
| 144 '16 ' + |
| 145 # (2 bytes) File magic |
| 146 '\x60\n' + |
| 147 # (9 bytes) File name |
| 148 'dir1/file1' + |
| 149 # (6 bytes) File data |
| 150 '123abc' + |
| 151 # (0 byte) No padding |
| 152 '' + |
| 153 |
| 154 # Finished |
| 155 '') |
| 156 |
| 157 |
| 158 class TestArFileReader(unittest.TestCase): |
| 159 |
| 160 def testSimple1(self): |
| 161 fileobj = StringIO(AR_TEST_SIMPLE1) |
| 162 |
| 163 afri = iter(arfile.ArFileReader(fileobj)) |
| 164 ai, af = afri.next() |
| 165 self.assertIs(arfile.AR_FORMAT_SIMPLE, ai.format) |
| 166 self.assertEqual('filename1', ai.name) |
| 167 self.assertEqual(6, ai.size) |
| 168 self.assertEqual(123, ai.mtime) |
| 169 self.assertEqual(1000, ai.uid) |
| 170 self.assertEqual(1000, ai.gid) |
| 171 self.assertEqual('0100640', oct(ai.mode)) |
| 172 self.assertEqual('abc123', af.read(ai.size)) |
| 173 |
| 174 def testBSD1(self): |
| 175 fileobj = StringIO(AR_TEST_BSD1) |
| 176 |
| 177 afri = iter(arfile.ArFileReader(fileobj)) |
| 178 ai, af = afri.next() |
| 179 self.assertIs(arfile.AR_FORMAT_BSD, ai.format) |
| 180 self.assertEqual('filename1', ai.name) |
| 181 self.assertEqual(6, ai.size) |
| 182 self.assertEqual(1234, ai.mtime) |
| 183 self.assertEqual(1001, ai.uid) |
| 184 self.assertEqual(1001, ai.gid) |
| 185 self.assertEqual('0100644', oct(ai.mode)) |
| 186 self.assertEqual('abc123', af.read(ai.size)) |
| 187 |
| 188 def testBSD2(self): |
| 189 fileobj = StringIO(AR_TEST_BSD2) |
| 190 |
| 191 afri = iter(arfile.ArFileReader(fileobj)) |
| 192 ai, af = afri.next() |
| 193 self.assertIs(arfile.AR_FORMAT_BSD, ai.format) |
| 194 self.assertEqual('file1', ai.name) |
| 195 self.assertEqual(8, ai.size) |
| 196 self.assertEqual(1447140471, ai.mtime) |
| 197 self.assertEqual(1000, ai.uid) |
| 198 self.assertEqual(1000, ai.gid) |
| 199 self.assertEqual('0100640', oct(ai.mode)) |
| 200 self.assertEqual('contents', af.read(ai.size)) |
| 201 |
| 202 ai, af = afri.next() |
| 203 self.assertIs(arfile.AR_FORMAT_BSD, ai.format) |
| 204 self.assertEqual('fileabc', ai.name) |
| 205 self.assertEqual(3, ai.size) |
| 206 self.assertEqual(1447140471, ai.mtime) |
| 207 self.assertEqual(1000, ai.uid) |
| 208 self.assertEqual(1000, ai.gid) |
| 209 self.assertEqual('0100640', oct(ai.mode)) |
| 210 self.assertEqual('123', af.read(ai.size)) |
| 211 |
| 212 ai, af = afri.next() |
| 213 self.assertIs(arfile.AR_FORMAT_BSD, ai.format) |
| 214 self.assertEqual('dir1/file1', ai.name) |
| 215 self.assertEqual(6, ai.size) |
| 216 self.assertEqual(1447140471, ai.mtime) |
| 217 self.assertEqual(1000, ai.uid) |
| 218 self.assertEqual(1000, ai.gid) |
| 219 self.assertEqual('0100640', oct(ai.mode)) |
| 220 self.assertEqual('123abc', af.read(ai.size)) |
| 221 |
| 222 |
| 223 class TestArFileWriter(unittest.TestCase): |
| 224 |
| 225 def testSimple1(self): |
| 226 fileobj = CloseSaveStringIO() |
| 227 |
| 228 afw = arfile.ArFileWriter(fileobj) |
| 229 ai = arfile.ArInfo( |
| 230 arfile.AR_FORMAT_SIMPLE, 'filename1', 6, 123, 1000, 1000, 0100640) |
| 231 afw.addfile(ai, StringIO('abc123')) |
| 232 afw.close() |
| 233 |
| 234 self.assertMultiLineEqual(AR_TEST_SIMPLE1, fileobj.getvalue()) |
| 235 |
| 236 def testBSD1(self): |
| 237 fileobj = CloseSaveStringIO() |
| 238 |
| 239 afw = arfile.ArFileWriter(fileobj) |
| 240 ai = arfile.ArInfo( |
| 241 arfile.AR_FORMAT_BSD, 'filename1', 6, 1234, 1001, 1001, 0100644) |
| 242 afw.addfile(ai, StringIO('abc123')) |
| 243 afw.close() |
| 244 |
| 245 self.assertMultiLineEqual(AR_TEST_BSD1, fileobj.getvalue()) |
| 246 |
| 247 def testBSD2(self): |
| 248 fileobj = CloseSaveStringIO() |
| 249 |
| 250 afw = arfile.ArFileWriter(fileobj) |
| 251 afw.addfile( |
| 252 arfile.ArInfo.fromdefault( |
| 253 'file1', 8, arformat=arfile.AR_FORMAT_BSD), |
| 254 StringIO('contents')) |
| 255 afw.addfile( |
| 256 arfile.ArInfo.fromdefault( |
| 257 'fileabc', 3, arformat=arfile.AR_FORMAT_BSD), |
| 258 StringIO('123')) |
| 259 afw.addfile( |
| 260 arfile.ArInfo.fromdefault( |
| 261 'dir1/file1', 6, arformat=arfile.AR_FORMAT_BSD), |
| 262 StringIO('123abc')) |
| 263 afw.close() |
| 264 |
| 265 self.assertMultiLineEqual(AR_TEST_BSD2, fileobj.getvalue()) |
| 266 |
| 267 |
| 268 |
| 269 class BaseTestSuite(object): |
| 270 |
| 271 def testSimple1(self): |
| 272 self.assertWorking( |
| 273 (arfile.ArInfo( |
| 274 arfile.AR_FORMAT_SIMPLE, 'filename1', 6, 123, 1000, 1000, 0100640), |
| 275 'abc123'), |
| 276 ) |
| 277 |
| 278 def testBSD1(self): |
| 279 self.assertWorking( |
| 280 (arfile.ArInfo( |
| 281 arfile.AR_FORMAT_BSD, 'filename1', 6, 123, 1000, 1000, 0100640), |
| 282 'abc123'), |
| 283 ) |
| 284 |
| 285 def testBSD2(self): |
| 286 self.assertWorking( |
| 287 (arfile.ArInfo.fromdefault( |
| 288 'file1', 8, arformat=arfile.AR_FORMAT_BSD), |
| 289 'contents'), |
| 290 (arfile.ArInfo.fromdefault( |
| 291 'fileabc', 3, arformat=arfile.AR_FORMAT_BSD), |
| 292 '123'), |
| 293 (arfile.ArInfo.fromdefault( |
| 294 'dir1/file1', 6, arformat=arfile.AR_FORMAT_BSD), |
| 295 '123abc'), |
| 296 ) |
| 297 |
| 298 def testMixed(self): |
| 299 self.assertWorking( |
| 300 (arfile.ArInfo.fromdefault('file1', 0), ''), |
| 301 (arfile.ArInfo.fromdefault('f f', 1), 'a'), |
| 302 (arfile.ArInfo.fromdefault('123456789abcedefa', 1), 'a'), |
| 303 ) |
| 304 |
| 305 |
| 306 class TestArRoundTrip(BaseTestSuite, unittest.TestCase): |
| 307 |
| 308 def assertWorking(self, *initems): |
| 309 outfile = CloseSaveStringIO() |
| 310 |
| 311 afw = arfile.ArFileWriter(outfile) |
| 312 for ai, data in initems: |
| 313 assert ai.size == len(data) |
| 314 afw.addfile(ai, StringIO(data)) |
| 315 afw.close() |
| 316 |
| 317 infile = StringIO(outfile.getvalue()) |
| 318 afr = arfile.ArFileReader(infile) |
| 319 |
| 320 outitems = [] |
| 321 for ai, fd in afr: |
| 322 data = fd.read(ai.size) |
| 323 outitems.append((ai, data)) |
| 324 |
| 325 self.assertSequenceEqual(initems, outitems) |
| 326 |
| 327 |
| 328 def system_has_ar(): |
| 329 retcode = subprocess.call( |
| 330 'ar', stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) |
| 331 return retcode == 1 |
| 332 |
| 333 |
| 334 @unittest.skipIf(not system_has_ar(), 'no ar binary found.') |
| 335 class TestArExternal(BaseTestSuite, unittest.TestCase): |
| 336 |
| 337 def assertWorking(self, *initems): |
| 338 tf = tempfile.NamedTemporaryFile(mode='wb') |
| 339 afw = arfile.ArFileWriter(tf) |
| 340 |
| 341 files = [] |
| 342 for ai, data in initems: |
| 343 files.append(ai.name) |
| 344 assert ai.size == len(data) |
| 345 afw.addfile(ai, StringIO(data)) |
| 346 afw.flush() |
| 347 |
| 348 output = subprocess.check_output(['ar', 't', tf.name]) |
| 349 self.assertMultiLineEqual('\n'.join(files), output.strip()) |
| 350 tf.close() |
| 351 |
| 352 |
| 353 class TestCLI(unittest.TestCase): |
| 354 |
| 355 def runCLI(self, args): |
| 356 orig_stdout = sys.stdout |
| 357 orig_stderr = sys.stderr |
| 358 try: |
| 359 sys.stdout = StringIO() |
| 360 sys.stderr = StringIO() |
| 361 cli.main('artool', args) |
| 362 return sys.stdout.getvalue(), sys.stderr.getvalue() |
| 363 finally: |
| 364 sys.stdout = orig_stdout |
| 365 sys.stderr = orig_stderr |
| 366 |
| 367 def assertCLI(self, *initems): |
| 368 indir = None |
| 369 ardir = None |
| 370 outdir = None |
| 371 try: |
| 372 indir = tempfile.mkdtemp() |
| 373 ardir = tempfile.mkdtemp() |
| 374 outdir = tempfile.mkdtemp() |
| 375 |
| 376 arp = os.path.join(ardir, 'out.ar') |
| 377 assert not os.path.exists(arp) |
| 378 |
| 379 # Write out a directory tree |
| 380 files = [] |
| 381 for fp, contents in initems: |
| 382 fn = os.path.join(indir, fp) |
| 383 dn = os.path.dirname(fn) |
| 384 if not os.path.exists(dn): |
| 385 os.makedirs(dn) |
| 386 |
| 387 with file(fn, 'wb') as f: |
| 388 f.write(contents) |
| 389 |
| 390 files.append(fp) |
| 391 |
| 392 files.sort() |
| 393 fileslist = '\n'.join(files) |
| 394 |
| 395 # Create an archive from a directory |
| 396 self.runCLI(['create', '--filename', arp, indir]) |
| 397 self.assertTrue( |
| 398 os.path.exists(arp), '%s file should exists' % arp) |
| 399 |
| 400 # List the archive contents |
| 401 output, _ = self.runCLI(['list', '--filename', arp]) |
| 402 filesoutput = '\n'.join(sorted(output[:-1].split('\n'))) |
| 403 self.assertMultiLineEqual(fileslist, filesoutput) |
| 404 |
| 405 # Extract the archive |
| 406 os.chdir(outdir) |
| 407 self.runCLI(['extract', '--filename', arp]) |
| 408 |
| 409 # Walk the directory tree and collect the extracted output |
| 410 outitems = [] |
| 411 for root, _, files in os.walk(outdir): |
| 412 for fn in files: |
| 413 fp = os.path.join(root, fn) |
| 414 outitems.append([fp[len(outdir)+1:], file(fp, 'rb').read()]) |
| 415 |
| 416 # Check the two are equal |
| 417 self.assertSequenceEqual(sorted(initems), sorted(outitems)) |
| 418 |
| 419 finally: |
| 420 if indir: |
| 421 shutil.rmtree(indir, ignore_errors=True) |
| 422 if ardir: |
| 423 shutil.rmtree(ardir, ignore_errors=True) |
| 424 if outdir: |
| 425 shutil.rmtree(outdir, ignore_errors=True) |
| 426 |
| 427 |
| 428 def testSimple1(self): |
| 429 self.assertCLI( |
| 430 ['file1', 'contents1'], |
| 431 ) |
| 432 |
| 433 def testMultiple(self): |
| 434 self.assertCLI( |
| 435 ['file1', 'contents1'], |
| 436 ['dir1/file2', 'contents2'], |
| 437 ['dir2/dir3/file3', 'contents3'], |
| 438 ['file4', 'contents4'], |
| 439 ) |
| 440 |
| 441 def testUnicodeContents(self): |
| 442 self.assertCLI( |
| 443 ['file1', u'\u2603'.encode('utf-8')], |
| 444 ) |
| 445 |
| 446 def testFilenameSpaces(self): |
| 447 self.assertCLI( |
| 448 ['f f1', 'contents1'], |
| 449 ['d d1/file2', 'contents2'], |
| 450 ['d d1/f f3', 'contents3'], |
| 451 ['file4', 'contents4'], |
| 452 ) |
| 453 |
| 454 def testBigFile(self): |
| 455 self.assertCLI( |
| 456 ['bigfile', 'data'*1024*1024*10], |
| 457 ) |
| 458 |
| 459 if __name__ == '__main__': |
| 460 import subprocess |
| 461 subprocess.call('python arfile.py', shell=True) |
| 462 unittest.main() |
OLD | NEW |