OLD | NEW |
(Empty) | |
| 1 #!/usr/bin/env python |
| 2 # test_ftpd.py |
| 3 |
| 4 # ====================================================================== |
| 5 # Copyright (C) 2007 Giampaolo Rodola' <g.rodola@gmail.com> |
| 6 # |
| 7 # All Rights Reserved |
| 8 # |
| 9 # Permission to use, copy, modify, and distribute this software and |
| 10 # its documentation for any purpose and without fee is hereby |
| 11 # granted, provided that the above copyright notice appear in all |
| 12 # copies and that both that copyright notice and this permission |
| 13 # notice appear in supporting documentation, and that the name of |
| 14 # Giampaolo Rodola' not be used in advertising or publicity pertaining to |
| 15 # distribution of the software without specific, written prior |
| 16 # permission. |
| 17 # |
| 18 # Giampaolo Rodola' DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, |
| 19 # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN |
| 20 # NO EVENT Giampaolo Rodola' BE LIABLE FOR ANY SPECIAL, INDIRECT OR |
| 21 # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS |
| 22 # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, |
| 23 # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN |
| 24 # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. |
| 25 # ====================================================================== |
| 26 |
| 27 |
| 28 # This test suite has been run successfully on the following systems: |
| 29 # |
| 30 # ----------------------------------------------------------- |
| 31 # System | Python version |
| 32 # ----------------------------------------------------------- |
| 33 # Linux Ubuntu 2.6.20-15 | 2.4, 2.5 |
| 34 # Linux Kubuntu 8.04 32 & 64 bits | 2.5.2 |
| 35 # Linux Debian 2.4.27-2-386 | 2.3.5 |
| 36 # Windows XP prof SP3 | 2.3, 2.4, 2.5, 2.6-RC2 |
| 37 # Windows Vista Ultimate 64 bit | 2.5.1 |
| 38 # Windows Vista Business 32 bit | 2.5.1 |
| 39 # Windows Server 2008 64bit | 2.5.1 |
| 40 # Windows Mobile 6.1 | PythonCE 2.5 |
| 41 # OS X 10.4.10 | 2.3, 2.4, 2.5 |
| 42 # FreeBSD 7.0 | 2.4, 2.5 |
| 43 # ----------------------------------------------------------- |
| 44 |
| 45 |
| 46 import threading |
| 47 import unittest |
| 48 import socket |
| 49 import os |
| 50 import time |
| 51 import re |
| 52 import tempfile |
| 53 import ftplib |
| 54 import random |
| 55 import warnings |
| 56 import sys |
| 57 |
| 58 from pyftpdlib import ftpserver |
| 59 |
| 60 |
| 61 __release__ = 'pyftpdlib 0.5.0' |
| 62 |
| 63 # Attempt to use IP rather than hostname (test suite will run a lot faster) |
| 64 try: |
| 65 HOST = socket.gethostbyname('localhost') |
| 66 except socket.error: |
| 67 HOST = 'localhost' |
| 68 USER = 'user' |
| 69 PASSWD = '12345' |
| 70 HOME = os.getcwd() |
| 71 try: |
| 72 from test.test_support import TESTFN |
| 73 except ImportError: |
| 74 TESTFN = 'temp-fname' |
| 75 TESTFN2 = TESTFN + '2' |
| 76 TESTFN3 = TESTFN + '3' |
| 77 |
| 78 def try_address(host, port=0): |
| 79 """Try to bind a daemon on the given host:port and return True |
| 80 if that has been possible.""" |
| 81 try: |
| 82 ftpserver.FTPServer((host, port), None) |
| 83 except socket.error: |
| 84 return False |
| 85 else: |
| 86 return True |
| 87 |
| 88 SUPPORTS_IPV4 = try_address('127.0.0.1') |
| 89 SUPPORTS_IPV6 = socket.has_ipv6 and try_address('::1') |
| 90 |
| 91 |
| 92 class TestAbstractedFS(unittest.TestCase): |
| 93 """Test for conversion utility methods of AbstractedFS class.""" |
| 94 |
| 95 def test_ftpnorm(self): |
| 96 # Tests for ftpnorm method. |
| 97 ae = self.assertEquals |
| 98 fs = ftpserver.AbstractedFS() |
| 99 |
| 100 fs.cwd = '/' |
| 101 ae(fs.ftpnorm(''), '/') |
| 102 ae(fs.ftpnorm('/'), '/') |
| 103 ae(fs.ftpnorm('.'), '/') |
| 104 ae(fs.ftpnorm('..'), '/') |
| 105 ae(fs.ftpnorm('a'), '/a') |
| 106 ae(fs.ftpnorm('/a'), '/a') |
| 107 ae(fs.ftpnorm('/a/'), '/a') |
| 108 ae(fs.ftpnorm('a/..'), '/') |
| 109 ae(fs.ftpnorm('a/b'), '/a/b') |
| 110 ae(fs.ftpnorm('a/b/..'), '/a') |
| 111 ae(fs.ftpnorm('a/b/../..'), '/') |
| 112 fs.cwd = '/sub' |
| 113 ae(fs.ftpnorm(''), '/sub') |
| 114 ae(fs.ftpnorm('/'), '/') |
| 115 ae(fs.ftpnorm('.'), '/sub') |
| 116 ae(fs.ftpnorm('..'), '/') |
| 117 ae(fs.ftpnorm('a'), '/sub/a') |
| 118 ae(fs.ftpnorm('a/'), '/sub/a') |
| 119 ae(fs.ftpnorm('a/..'), '/sub') |
| 120 ae(fs.ftpnorm('a/b'), '/sub/a/b') |
| 121 ae(fs.ftpnorm('a/b/'), '/sub/a/b') |
| 122 ae(fs.ftpnorm('a/b/..'), '/sub/a') |
| 123 ae(fs.ftpnorm('a/b/../..'), '/sub') |
| 124 ae(fs.ftpnorm('a/b/../../..'), '/') |
| 125 ae(fs.ftpnorm('//'), '/') # UNC paths must be collapsed |
| 126 |
| 127 def test_ftp2fs(self): |
| 128 # Tests for ftp2fs method. |
| 129 ae = self.assertEquals |
| 130 fs = ftpserver.AbstractedFS() |
| 131 join = lambda x, y: os.path.join(x, y.replace('/', os.sep)) |
| 132 |
| 133 def goforit(root): |
| 134 fs.root = root |
| 135 fs.cwd = '/' |
| 136 ae(fs.ftp2fs(''), root) |
| 137 ae(fs.ftp2fs('/'), root) |
| 138 ae(fs.ftp2fs('.'), root) |
| 139 ae(fs.ftp2fs('..'), root) |
| 140 ae(fs.ftp2fs('a'), join(root, 'a')) |
| 141 ae(fs.ftp2fs('/a'), join(root, 'a')) |
| 142 ae(fs.ftp2fs('/a/'), join(root, 'a')) |
| 143 ae(fs.ftp2fs('a/..'), root) |
| 144 ae(fs.ftp2fs('a/b'), join(root, r'a/b')) |
| 145 ae(fs.ftp2fs('/a/b'), join(root, r'a/b')) |
| 146 ae(fs.ftp2fs('/a/b/..'), join(root, 'a')) |
| 147 ae(fs.ftp2fs('/a/b/../..'), root) |
| 148 fs.cwd = '/sub' |
| 149 ae(fs.ftp2fs(''), join(root, 'sub')) |
| 150 ae(fs.ftp2fs('/'), root) |
| 151 ae(fs.ftp2fs('.'), join(root, 'sub')) |
| 152 ae(fs.ftp2fs('..'), root) |
| 153 ae(fs.ftp2fs('a'), join(root, 'sub/a')) |
| 154 ae(fs.ftp2fs('a/'), join(root, 'sub/a')) |
| 155 ae(fs.ftp2fs('a/..'), join(root, 'sub')) |
| 156 ae(fs.ftp2fs('a/b'), join(root, 'sub/a/b')) |
| 157 ae(fs.ftp2fs('a/b/..'), join(root, 'sub/a')) |
| 158 ae(fs.ftp2fs('a/b/../..'), join(root, 'sub')) |
| 159 ae(fs.ftp2fs('a/b/../../..'), root) |
| 160 ae(fs.ftp2fs('//a'), join(root, 'a')) # UNC paths must be collapsed |
| 161 |
| 162 if os.sep == '\\': |
| 163 goforit(r'C:\dir') |
| 164 goforit('C:\\') |
| 165 # on DOS-derived filesystems (e.g. Windows) this is the same |
| 166 # as specifying the current drive directory (e.g. 'C:\\') |
| 167 goforit('\\') |
| 168 elif os.sep == '/': |
| 169 goforit('/home/user') |
| 170 goforit('/') |
| 171 else: |
| 172 # os.sep == ':'? Don't know... let's try it anyway |
| 173 goforit(os.getcwd()) |
| 174 |
| 175 def test_fs2ftp(self): |
| 176 # Tests for fs2ftp method. |
| 177 ae = self.assertEquals |
| 178 fs = ftpserver.AbstractedFS() |
| 179 join = lambda x, y: os.path.join(x, y.replace('/', os.sep)) |
| 180 |
| 181 def goforit(root): |
| 182 fs.root = root |
| 183 ae(fs.fs2ftp(root), '/') |
| 184 ae(fs.fs2ftp(join(root, '/')), '/') |
| 185 ae(fs.fs2ftp(join(root, '.')), '/') |
| 186 ae(fs.fs2ftp(join(root, '..')), '/') # can't escape from root |
| 187 ae(fs.fs2ftp(join(root, 'a')), '/a') |
| 188 ae(fs.fs2ftp(join(root, 'a/')), '/a') |
| 189 ae(fs.fs2ftp(join(root, 'a/..')), '/') |
| 190 ae(fs.fs2ftp(join(root, 'a/b')), '/a/b') |
| 191 ae(fs.fs2ftp(join(root, 'a/b')), '/a/b') |
| 192 ae(fs.fs2ftp(join(root, 'a/b/..')), '/a') |
| 193 ae(fs.fs2ftp(join(root, '/a/b/../..')), '/') |
| 194 fs.cwd = '/sub' |
| 195 ae(fs.fs2ftp(join(root, 'a/')), '/a') |
| 196 |
| 197 if os.sep == '\\': |
| 198 goforit(r'C:\dir') |
| 199 goforit('C:\\') |
| 200 # on DOS-derived filesystems (e.g. Windows) this is the same |
| 201 # as specifying the current drive directory (e.g. 'C:\\') |
| 202 goforit('\\') |
| 203 fs.root = r'C:\dir' |
| 204 ae(fs.fs2ftp('C:\\'), '/') |
| 205 ae(fs.fs2ftp('D:\\'), '/') |
| 206 ae(fs.fs2ftp('D:\\dir'), '/') |
| 207 elif os.sep == '/': |
| 208 goforit('/') |
| 209 assert os.path.realpath('/__home/user') == '/__home/user', \ |
| 210 'Test skipped (symlinks not allowed).' |
| 211 goforit('/__home/user') |
| 212 fs.root = '/__home/user' |
| 213 ae(fs.fs2ftp('/__home'), '/') |
| 214 ae(fs.fs2ftp('/'), '/') |
| 215 ae(fs.fs2ftp('/__home/userx'), '/') |
| 216 else: |
| 217 # os.sep == ':'? Don't know... let's try it anyway |
| 218 goforit(os.getcwd()) |
| 219 |
| 220 def test_validpath(self): |
| 221 # Tests for validpath method. |
| 222 fs = ftpserver.AbstractedFS() |
| 223 fs.root = HOME |
| 224 self.failUnless(fs.validpath(HOME)) |
| 225 self.failUnless(fs.validpath(HOME + '/')) |
| 226 self.failIf(fs.validpath(HOME + 'xxx')) |
| 227 |
| 228 if hasattr(os, 'symlink'): |
| 229 # Tests for validpath on systems supporting symbolic links. |
| 230 |
| 231 def _safe_remove(self, path): |
| 232 # convenience function for removing temporary files |
| 233 try: |
| 234 os.remove(path) |
| 235 except os.error: |
| 236 pass |
| 237 |
| 238 def test_validpath_validlink(self): |
| 239 # Test validpath by issuing a symlink pointing to a path |
| 240 # inside the root directory. |
| 241 fs = ftpserver.AbstractedFS() |
| 242 fs.root = HOME |
| 243 try: |
| 244 open(TESTFN, 'w') |
| 245 os.symlink(TESTFN, TESTFN2) |
| 246 self.failUnless(fs.validpath(TESTFN)) |
| 247 finally: |
| 248 self._safe_remove(TESTFN) |
| 249 self._safe_remove(TESTFN2) |
| 250 |
| 251 def test_validpath_external_symlink(self): |
| 252 # Test validpath by issuing a symlink pointing to a path |
| 253 # outside the root directory. |
| 254 fs = ftpserver.AbstractedFS() |
| 255 fs.root = HOME |
| 256 try: |
| 257 # tempfile should create our file in /tmp directory |
| 258 # which should be outside the user root. If it is not |
| 259 # we just skip the test. |
| 260 file = tempfile.NamedTemporaryFile() |
| 261 if HOME == os.path.dirname(file.name): |
| 262 return |
| 263 os.symlink(file.name, TESTFN) |
| 264 self.failIf(fs.validpath(TESTFN)) |
| 265 finally: |
| 266 self._safe_remove(TESTFN) |
| 267 file.close() |
| 268 |
| 269 |
| 270 class TestDummyAuthorizer(unittest.TestCase): |
| 271 """Tests for DummyAuthorizer class.""" |
| 272 |
| 273 # temporarily change warnings to exceptions for the purposes of testing |
| 274 def setUp(self): |
| 275 self.tempdir = tempfile.mkdtemp(dir=HOME) |
| 276 self.subtempdir = tempfile.mkdtemp(dir=os.path.join(HOME, self.tempdir)) |
| 277 self.tempfile = open(os.path.join(self.tempdir, TESTFN), 'w').name |
| 278 self.subtempfile = open(os.path.join(self.subtempdir, TESTFN), 'w').name |
| 279 warnings.filterwarnings("error") |
| 280 |
| 281 def tearDown(self): |
| 282 os.remove(self.tempfile) |
| 283 os.remove(self.subtempfile) |
| 284 os.rmdir(self.subtempdir) |
| 285 os.rmdir(self.tempdir) |
| 286 warnings.resetwarnings() |
| 287 |
| 288 def assertRaisesWithMsg(self, excClass, msg, callableObj, *args, **kwargs): |
| 289 try: |
| 290 callableObj(*args, **kwargs) |
| 291 except excClass, why: |
| 292 if str(why) == msg: |
| 293 return |
| 294 raise self.failureException("%s != %s" %(str(why), msg)) |
| 295 else: |
| 296 if hasattr(excClass,'__name__'): excName = excClass.__name__ |
| 297 else: excName = str(excClass) |
| 298 raise self.failureException, "%s not raised" % excName |
| 299 |
| 300 def test_common_methods(self): |
| 301 auth = ftpserver.DummyAuthorizer() |
| 302 # create user |
| 303 auth.add_user(USER, PASSWD, HOME) |
| 304 auth.add_anonymous(HOME) |
| 305 # check credentials |
| 306 self.failUnless(auth.validate_authentication(USER, PASSWD)) |
| 307 self.failIf(auth.validate_authentication(USER, 'wrongpwd')) |
| 308 # remove them |
| 309 auth.remove_user(USER) |
| 310 auth.remove_user('anonymous') |
| 311 # raise exc if user does not exists |
| 312 self.assertRaises(KeyError, auth.remove_user, USER) |
| 313 # raise exc if path does not exist |
| 314 self.assertRaisesWithMsg(ftpserver.AuthorizerError, |
| 315 'No such directory: "%s"' %'?:\\', |
| 316 auth.add_user, USER, PASSWD, '?:\\') |
| 317 self.assertRaisesWithMsg(ftpserver.AuthorizerError, |
| 318 'No such directory: "%s"' %'?:\\', |
| 319 auth.add_anonymous, '?:\\') |
| 320 # raise exc if user already exists |
| 321 auth.add_user(USER, PASSWD, HOME) |
| 322 auth.add_anonymous(HOME) |
| 323 self.assertRaisesWithMsg(ftpserver.AuthorizerError, |
| 324 'User "%s" already exists' %USER, |
| 325 auth.add_user, USER, PASSWD, HOME) |
| 326 self.assertRaisesWithMsg(ftpserver.AuthorizerError, |
| 327 'User "anonymous" already exists', |
| 328 auth.add_anonymous, HOME) |
| 329 auth.remove_user(USER) |
| 330 auth.remove_user('anonymous') |
| 331 # raise on wrong permission |
| 332 self.assertRaisesWithMsg(ftpserver.AuthorizerError, |
| 333 'No such permission "?"', |
| 334 auth.add_user, USER, PASSWD, HOME, perm='?') |
| 335 self.assertRaisesWithMsg(ftpserver.AuthorizerError, |
| 336 'No such permission "?"', |
| 337 auth.add_anonymous, HOME, perm='?') |
| 338 # expect warning on write permissions assigned to anonymous user |
| 339 for x in "adfmw": |
| 340 self.assertRaisesWithMsg(RuntimeWarning, |
| 341 "Write permissions assigned to anonymous user.", |
| 342 auth.add_anonymous, HOME, perm=x) |
| 343 |
| 344 def test_override_perm_interface(self): |
| 345 auth = ftpserver.DummyAuthorizer() |
| 346 auth.add_user(USER, PASSWD, HOME, perm='elr') |
| 347 # raise exc if user does not exists |
| 348 self.assertRaises(KeyError, auth.override_perm, USER+'w', HOME, 'elr') |
| 349 # raise exc if path does not exist or it's not a directory |
| 350 self.assertRaisesWithMsg(ftpserver.AuthorizerError, |
| 351 'No such directory: "%s"' %'?:\\', |
| 352 auth.override_perm, USER, '?:\\', 'elr') |
| 353 self.assertRaisesWithMsg(ftpserver.AuthorizerError, |
| 354 'No such directory: "%s"' %self.tempfile, |
| 355 auth.override_perm, USER, self.tempfile, 'elr') |
| 356 # raise on wrong permission |
| 357 self.assertRaisesWithMsg(ftpserver.AuthorizerError, |
| 358 'No such permission "?"', auth.override_perm, |
| 359 USER, HOME, perm='?') |
| 360 # expect warning on write permissions assigned to anonymous user |
| 361 auth.add_anonymous(HOME) |
| 362 for p in "adfmw": |
| 363 self.assertRaisesWithMsg(RuntimeWarning, |
| 364 "Write permissions assigned to anonymous user.", |
| 365 auth.override_perm, 'anonymous', HOME, p) |
| 366 # raise on attempt to override home directory permissions |
| 367 self.assertRaisesWithMsg(ftpserver.AuthorizerError, |
| 368 "Can't override home directory permissions", |
| 369 auth.override_perm, USER, HOME, perm='w') |
| 370 # raise on attempt to override a path escaping home directory |
| 371 if os.path.dirname(HOME) != HOME: |
| 372 self.assertRaisesWithMsg(ftpserver.AuthorizerError, |
| 373 "Path escapes user home directory", |
| 374 auth.override_perm, USER, |
| 375 os.path.dirname(HOME), perm='w') |
| 376 # try to re-set an overridden permission |
| 377 auth.override_perm(USER, self.tempdir, perm='w') |
| 378 auth.override_perm(USER, self.tempdir, perm='wr') |
| 379 |
| 380 def test_override_perm_recursive_paths(self): |
| 381 auth = ftpserver.DummyAuthorizer() |
| 382 auth.add_user(USER, PASSWD, HOME, perm='elr') |
| 383 self.assert_(auth.has_perm(USER, 'w', self.tempdir) is False) |
| 384 auth.override_perm(USER, self.tempdir, perm='w', recursive=True) |
| 385 self.assert_(auth.has_perm(USER, 'w', HOME) is False) |
| 386 self.assert_(auth.has_perm(USER, 'w', self.tempdir) is True) |
| 387 self.assert_(auth.has_perm(USER, 'w', self.tempfile) is True) |
| 388 self.assert_(auth.has_perm(USER, 'w', self.subtempdir) is True) |
| 389 self.assert_(auth.has_perm(USER, 'w', self.subtempfile) is True) |
| 390 |
| 391 self.assert_(auth.has_perm(USER, 'w', HOME + '@') is False) |
| 392 self.assert_(auth.has_perm(USER, 'w', self.tempdir + '@') is False) |
| 393 path = os.path.join(self.tempdir + '@', os.path.basename(self.tempfile)) |
| 394 self.assert_(auth.has_perm(USER, 'w', path) is False) |
| 395 # test case-sensitiveness |
| 396 if (os.name in ('nt', 'ce')) or (sys.platform == 'cygwin'): |
| 397 self.assert_(auth.has_perm(USER, 'w', self.tempdir.upper()) is True) |
| 398 |
| 399 def test_override_perm_not_recursive_paths(self): |
| 400 auth = ftpserver.DummyAuthorizer() |
| 401 auth.add_user(USER, PASSWD, HOME, perm='elr') |
| 402 self.assert_(auth.has_perm(USER, 'w', self.tempdir) is False) |
| 403 auth.override_perm(USER, self.tempdir, perm='w') |
| 404 self.assert_(auth.has_perm(USER, 'w', HOME) is False) |
| 405 self.assert_(auth.has_perm(USER, 'w', self.tempdir) is True) |
| 406 self.assert_(auth.has_perm(USER, 'w', self.tempfile) is True) |
| 407 self.assert_(auth.has_perm(USER, 'w', self.subtempdir) is False) |
| 408 self.assert_(auth.has_perm(USER, 'w', self.subtempfile) is False) |
| 409 |
| 410 self.assert_(auth.has_perm(USER, 'w', HOME + '@') is False) |
| 411 self.assert_(auth.has_perm(USER, 'w', self.tempdir + '@') is False) |
| 412 path = os.path.join(self.tempdir + '@', os.path.basename(self.tempfile)) |
| 413 self.assert_(auth.has_perm(USER, 'w', path) is False) |
| 414 # test case-sensitiveness |
| 415 if (os.name in ('nt', 'ce')) or (sys.platform == 'cygwin'): |
| 416 self.assert_(auth.has_perm(USER, 'w', self.tempdir.upper()) is True) |
| 417 |
| 418 |
| 419 class TestCallLater(unittest.TestCase): |
| 420 """Tests for CallLater class.""" |
| 421 |
| 422 def setUp(self): |
| 423 for task in ftpserver._tasks: |
| 424 if not task.cancelled: |
| 425 task.cancel() |
| 426 del ftpserver._tasks[:] |
| 427 |
| 428 def scheduler(self, timeout=0.01, count=100): |
| 429 while ftpserver._tasks and count > 0: |
| 430 ftpserver._scheduler() |
| 431 count -= 1 |
| 432 time.sleep(timeout) |
| 433 |
| 434 def test_interface(self): |
| 435 fun = lambda: 0 |
| 436 self.assertRaises(AssertionError, ftpserver.CallLater, -1, fun) |
| 437 x = ftpserver.CallLater(3, fun) |
| 438 self.assertRaises(AssertionError, x.delay, -1) |
| 439 self.assert_(x.cancelled is False) |
| 440 x.cancel() |
| 441 self.assert_(x.cancelled is True) |
| 442 self.assertRaises(AssertionError, x.call) |
| 443 self.assertRaises(AssertionError, x.reset) |
| 444 self.assertRaises(AssertionError, x.delay, 2) |
| 445 self.assertRaises(AssertionError, x.cancel) |
| 446 |
| 447 def test_order(self): |
| 448 l = [] |
| 449 fun = lambda x: l.append(x) |
| 450 for x in [0.05, 0.04, 0.03, 0.02, 0.01]: |
| 451 ftpserver.CallLater(x, fun, x) |
| 452 self.scheduler() |
| 453 self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05]) |
| 454 |
| 455 def test_delay(self): |
| 456 l = [] |
| 457 fun = lambda x: l.append(x) |
| 458 ftpserver.CallLater(0.01, fun, 0.01).delay(0.07) |
| 459 ftpserver.CallLater(0.02, fun, 0.02).delay(0.08) |
| 460 ftpserver.CallLater(0.03, fun, 0.03) |
| 461 ftpserver.CallLater(0.04, fun, 0.04) |
| 462 ftpserver.CallLater(0.05, fun, 0.05) |
| 463 ftpserver.CallLater(0.06, fun, 0.06).delay(0.001) |
| 464 self.scheduler() |
| 465 self.assertEqual(l, [0.06, 0.03, 0.04, 0.05, 0.01, 0.02]) |
| 466 |
| 467 def test_reset(self): |
| 468 # will fail on such systems where time.time() does not provide |
| 469 # time with a better precision than 1 second. |
| 470 l = [] |
| 471 fun = lambda x: l.append(x) |
| 472 ftpserver.CallLater(0.01, fun, 0.01) |
| 473 ftpserver.CallLater(0.02, fun, 0.02) |
| 474 ftpserver.CallLater(0.03, fun, 0.03) |
| 475 x = ftpserver.CallLater(0.04, fun, 0.04) |
| 476 ftpserver.CallLater(0.05, fun, 0.05) |
| 477 time.sleep(0.1) |
| 478 x.reset() |
| 479 self.scheduler() |
| 480 self.assertEqual(l, [0.01, 0.02, 0.03, 0.05, 0.04]) |
| 481 |
| 482 def test_cancel(self): |
| 483 l = [] |
| 484 fun = lambda x: l.append(x) |
| 485 ftpserver.CallLater(0.01, fun, 0.01).cancel() |
| 486 ftpserver.CallLater(0.02, fun, 0.02) |
| 487 ftpserver.CallLater(0.03, fun, 0.03) |
| 488 ftpserver.CallLater(0.04, fun, 0.04) |
| 489 ftpserver.CallLater(0.05, fun, 0.05).cancel() |
| 490 self.scheduler() |
| 491 self.assertEqual(l, [0.02, 0.03, 0.04]) |
| 492 |
| 493 |
| 494 class TestFtpAuthentication(unittest.TestCase): |
| 495 "test: USER, PASS, REIN." |
| 496 |
| 497 def setUp(self): |
| 498 self.server = FTPd() |
| 499 self.server.start() |
| 500 self.client = ftplib.FTP() |
| 501 self.client.connect(self.server.host, self.server.port) |
| 502 self.f1 = open(TESTFN, 'w+b') |
| 503 self.f2 = open(TESTFN2, 'w+b') |
| 504 |
| 505 def tearDown(self): |
| 506 self.client.close() |
| 507 self.server.stop() |
| 508 if not self.f1.closed: |
| 509 self.f1.close() |
| 510 if not self.f2.closed: |
| 511 self.f2.close() |
| 512 os.remove(TESTFN) |
| 513 os.remove(TESTFN2) |
| 514 |
| 515 def test_auth_ok(self): |
| 516 self.client.login(user=USER, passwd=PASSWD) |
| 517 |
| 518 def test_anon_auth(self): |
| 519 self.client.login(user='anonymous', passwd='anon@') |
| 520 self.client.login(user='AnonYmoUs', passwd='anon@') |
| 521 self.client.login(user='anonymous', passwd='') |
| 522 |
| 523 # Commented after delayed response on wrong credentials has been |
| 524 # introduced because tests take too much to complete. |
| 525 |
| 526 ## def test_auth_failed(self): |
| 527 ## self.assertRaises(ftplib.error_perm, self.client.login, USER, 'wrong') |
| 528 ## self.assertRaises(ftplib.error_perm, self.client.login, 'wrong', PASSW
D) |
| 529 ## self.assertRaises(ftplib.error_perm, self.client.login, 'wrong', 'wron
g') |
| 530 |
| 531 ## def test_max_auth(self): |
| 532 ## self.assertRaises(ftplib.error_perm, self.client.login, USER, 'wrong') |
| 533 ## self.assertRaises(ftplib.error_perm, self.client.login, USER, 'wrong') |
| 534 ## self.assertRaises(ftplib.error_perm, self.client.login, USER, 'wrong') |
| 535 ## # If authentication fails for 3 times ftpd disconnects the |
| 536 ## # client. We can check if that happens by using self.client.sendcmd() |
| 537 ## # on the 'dead' socket object. If socket object is really |
| 538 ## # closed it should be raised a socket.error exception (Windows) |
| 539 ## # or a EOFError exception (Linux). |
| 540 ## self.assertRaises((socket.error, EOFError), self.client.sendcmd, '') |
| 541 |
| 542 def test_rein(self): |
| 543 self.client.login(user=USER, passwd=PASSWD) |
| 544 self.client.sendcmd('rein') |
| 545 # user not authenticated, error response expected |
| 546 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pwd') |
| 547 # by logging-in again we should be able to execute a |
| 548 # file-system command |
| 549 self.client.login(user=USER, passwd=PASSWD) |
| 550 self.client.sendcmd('pwd') |
| 551 |
| 552 def test_rein_during_transfer(self): |
| 553 self.client.login(user=USER, passwd=PASSWD) |
| 554 data = 'abcde12345' * 100000 |
| 555 self.f1.write(data) |
| 556 self.f1.close() |
| 557 |
| 558 self.client.voidcmd('TYPE I') |
| 559 conn = self.client.transfercmd('retr ' + TESTFN) |
| 560 rein_sent = 0 |
| 561 while 1: |
| 562 chunk = conn.recv(8192) |
| 563 if not chunk: |
| 564 break |
| 565 self.f2.write(chunk) |
| 566 if not rein_sent: |
| 567 rein_sent = 1 |
| 568 # flush account, error response expected |
| 569 self.client.sendcmd('rein') |
| 570 self.assertRaises(ftplib.error_perm, self.client.dir) |
| 571 |
| 572 # a 226 response is expected once tranfer finishes |
| 573 self.assertEqual(self.client.voidresp()[:3], '226') |
| 574 # account is still flushed, error response is still expected |
| 575 self.assertRaises(ftplib.error_perm, self.client.sendcmd, |
| 576 'size ' + TESTFN) |
| 577 # by logging-in again we should be able to execute a |
| 578 # filesystem command |
| 579 self.client.login(user=USER, passwd=PASSWD) |
| 580 self.client.sendcmd('pwd') |
| 581 self.f2.seek(0) |
| 582 self.assertEqual(hash(data), hash (self.f2.read())) |
| 583 |
| 584 def test_user(self): |
| 585 # Test USER while already authenticated and no transfer |
| 586 # is in progress. |
| 587 self.client.login(user=USER, passwd=PASSWD) |
| 588 self.client.sendcmd('user ' + USER) # authentication flushed |
| 589 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pwd') |
| 590 self.client.sendcmd('pass ' + PASSWD) |
| 591 self.client.sendcmd('pwd') |
| 592 |
| 593 def test_user_on_transfer(self): |
| 594 # Test USER while already authenticated and a transfer is |
| 595 # in progress. |
| 596 self.client.login(user=USER, passwd=PASSWD) |
| 597 data = 'abcde12345' * 100000 |
| 598 self.f1.write(data) |
| 599 self.f1.close() |
| 600 |
| 601 self.client.voidcmd('TYPE I') |
| 602 conn = self.client.transfercmd('retr ' + TESTFN) |
| 603 rein_sent = 0 |
| 604 while 1: |
| 605 chunk = conn.recv(8192) |
| 606 if not chunk: |
| 607 break |
| 608 self.f2.write(chunk) |
| 609 # stop transfer while it isn't finished yet |
| 610 if not rein_sent: |
| 611 rein_sent = 1 |
| 612 # flush account, expect an error response |
| 613 self.client.sendcmd('user ' + USER) |
| 614 self.assertRaises(ftplib.error_perm, self.client.dir) |
| 615 |
| 616 # a 226 response is expected once tranfer finishes |
| 617 self.assertEqual(self.client.voidresp()[:3], '226') |
| 618 # account is still flushed, error response is still expected |
| 619 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pwd') |
| 620 # by logging-in again we should be able to execute a |
| 621 # filesystem command |
| 622 self.client.sendcmd('pass ' + PASSWD) |
| 623 self.client.sendcmd('pwd') |
| 624 self.f2.seek(0) |
| 625 self.assertEqual(hash(data), hash (self.f2.read())) |
| 626 |
| 627 |
| 628 class TestFtpDummyCmds(unittest.TestCase): |
| 629 "test: TYPE, STRU, MODE, NOOP, SYST, ALLO, HELP" |
| 630 |
| 631 def setUp(self): |
| 632 self.server = FTPd() |
| 633 self.server.start() |
| 634 self.client = ftplib.FTP() |
| 635 self.client.connect(self.server.host, self.server.port) |
| 636 self.client.login(USER, PASSWD) |
| 637 |
| 638 def tearDown(self): |
| 639 self.client.close() |
| 640 self.server.stop() |
| 641 |
| 642 def test_type(self): |
| 643 self.client.sendcmd('type a') |
| 644 self.client.sendcmd('type i') |
| 645 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'type ?!?') |
| 646 |
| 647 def test_stru(self): |
| 648 self.client.sendcmd('stru f') |
| 649 self.client.sendcmd('stru F') |
| 650 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stru ?!?') |
| 651 |
| 652 def test_mode(self): |
| 653 self.client.sendcmd('mode s') |
| 654 self.client.sendcmd('mode S') |
| 655 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'mode ?!?') |
| 656 |
| 657 def test_noop(self): |
| 658 self.client.sendcmd('noop') |
| 659 |
| 660 def test_syst(self): |
| 661 self.client.sendcmd('syst') |
| 662 |
| 663 def test_allo(self): |
| 664 self.client.sendcmd('allo x') |
| 665 |
| 666 def test_quit(self): |
| 667 self.client.sendcmd('quit') |
| 668 |
| 669 def test_help(self): |
| 670 self.client.sendcmd('help') |
| 671 cmd = random.choice(ftpserver.proto_cmds.keys()) |
| 672 self.client.sendcmd('help %s' %cmd) |
| 673 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'help ?!?') |
| 674 |
| 675 def test_rest(self): |
| 676 # test error conditions only; |
| 677 # restored data-transfer is tested later |
| 678 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'rest') |
| 679 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'rest str') |
| 680 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'rest -1') |
| 681 |
| 682 def test_opts_feat(self): |
| 683 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'opts mlst bad
_fact') |
| 684 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'opts mlst typ
e ;') |
| 685 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'opts not_mlst
') |
| 686 # utility function which used for extracting the MLST "facts" |
| 687 # string from the FEAT response |
| 688 def mlst(): |
| 689 resp = self.client.sendcmd('feat') |
| 690 return re.search(r'^\s*MLST\s+(\S+)$', resp, re.MULTILINE).group(1) |
| 691 # we rely on "type", "perm", "size", and "modify" facts which |
| 692 # are those available on all platforms |
| 693 self.failUnless('type*;perm*;size*;modify*;' in mlst()) |
| 694 self.assertEqual(self.client.sendcmd('opts mlst type;'), '200 MLST OPTS
type;') |
| 695 self.assertEqual(self.client.sendcmd('opts mLSt TypE;'), '200 MLST OPTS
type;') |
| 696 self.failUnless('type*;perm;size;modify;' in mlst()) |
| 697 |
| 698 self.assertEqual(self.client.sendcmd('opts mlst'), '200 MLST OPTS ') |
| 699 self.failUnless(not '*' in mlst()) |
| 700 |
| 701 self.assertEqual(self.client.sendcmd('opts mlst fish;cakes;'), '200 MLST
OPTS ') |
| 702 self.failUnless(not '*' in mlst()) |
| 703 self.assertEqual(self.client.sendcmd('opts mlst fish;cakes;type;'), \ |
| 704 '200 MLST OPTS type;') |
| 705 self.failUnless('type*;perm;size;modify;' in mlst()) |
| 706 |
| 707 |
| 708 class TestFtpCmdsSemantic(unittest.TestCase): |
| 709 |
| 710 arg_cmds = ('allo','appe','dele','eprt','mdtm','mode','mkd','opts','port', |
| 711 'rest','retr','rmd','rnfr','rnto','size', 'stor', 'stru','type', |
| 712 'user','xmkd','xrmd') |
| 713 |
| 714 def setUp(self): |
| 715 self.server = FTPd() |
| 716 self.server.start() |
| 717 self.client = ftplib.FTP() |
| 718 self.client.connect(self.server.host, self.server.port) |
| 719 self.client.login(USER, PASSWD) |
| 720 |
| 721 def tearDown(self): |
| 722 self.client.close() |
| 723 self.server.stop() |
| 724 |
| 725 def test_arg_cmds(self): |
| 726 # test commands requiring an argument |
| 727 expected = "501 Syntax error: command needs an argument." |
| 728 for cmd in self.arg_cmds: |
| 729 self.client.putcmd(cmd) |
| 730 resp = self.client.getmultiline() |
| 731 self.assertEqual(resp, expected) |
| 732 |
| 733 def test_no_arg_cmds(self): |
| 734 # test commands accepting no arguments |
| 735 expected = "501 Syntax error: command does not accept arguments." |
| 736 for cmd in ('abor','cdup','feat','noop','pasv','pwd','quit','rein', |
| 737 'syst','xcup','xpwd'): |
| 738 self.client.putcmd(cmd + ' arg') |
| 739 resp = self.client.getmultiline() |
| 740 self.assertEqual(resp, expected) |
| 741 |
| 742 def test_auth_cmds(self): |
| 743 # test those commands requiring client to be authenticated |
| 744 expected = "530 Log in with USER and PASS first." |
| 745 self.client.sendcmd('rein') |
| 746 for cmd in ftpserver.proto_cmds: |
| 747 cmd = cmd.lower() |
| 748 if cmd in ('feat','help','noop','user','pass','stat','syst','quit'): |
| 749 continue |
| 750 if cmd in self.arg_cmds: |
| 751 cmd = cmd + ' arg' |
| 752 self.client.putcmd(cmd) |
| 753 resp = self.client.getmultiline() |
| 754 self.assertEqual(resp, expected) |
| 755 |
| 756 def test_no_auth_cmds(self): |
| 757 # test those commands that do not require client to be authenticated |
| 758 self.client.sendcmd('rein') |
| 759 for cmd in ('feat','help','noop','stat','syst'): |
| 760 self.client.sendcmd(cmd) |
| 761 # STAT provided with an argument is equal to LIST hence not allowed |
| 762 # if not authenticated |
| 763 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stat /') |
| 764 self.client.sendcmd('quit') |
| 765 |
| 766 |
| 767 class TestFtpFsOperations(unittest.TestCase): |
| 768 "test: PWD, CWD, CDUP, SIZE, RNFR, RNTO, DELE, MKD, RMD, MDTM, STAT" |
| 769 |
| 770 def setUp(self): |
| 771 self.server = FTPd() |
| 772 self.server.start() |
| 773 self.client = ftplib.FTP() |
| 774 self.client.connect(self.server.host, self.server.port) |
| 775 self.client.login(USER, PASSWD) |
| 776 self.tempfile = os.path.basename(open(TESTFN, 'w+b').name) |
| 777 self.tempdir = os.path.basename(tempfile.mktemp(dir=HOME)) |
| 778 os.mkdir(self.tempdir) |
| 779 |
| 780 def tearDown(self): |
| 781 self.client.close() |
| 782 self.server.stop() |
| 783 if os.path.exists(self.tempfile): |
| 784 os.remove(self.tempfile) |
| 785 if os.path.exists(self.tempdir): |
| 786 os.rmdir(self.tempdir) |
| 787 |
| 788 def test_cwd(self): |
| 789 self.client.cwd(self.tempdir) |
| 790 self.assertEqual(self.client.pwd(), '/' + self.tempdir) |
| 791 self.assertRaises(ftplib.error_perm, self.client.cwd, 'subtempdir') |
| 792 # cwd provided with no arguments is supposed to move us to the |
| 793 # root directory |
| 794 self.client.sendcmd('cwd') |
| 795 self.assertEqual(self.client.pwd(), '/') |
| 796 |
| 797 def test_pwd(self): |
| 798 self.assertEqual(self.client.pwd(), '/') |
| 799 self.client.cwd(self.tempdir) |
| 800 self.assertEqual(self.client.pwd(), '/' + self.tempdir) |
| 801 |
| 802 def test_cdup(self): |
| 803 self.client.cwd(self.tempdir) |
| 804 self.assertEqual(self.client.pwd(), '/' + self.tempdir) |
| 805 self.client.sendcmd('cdup') |
| 806 self.assertEqual(self.client.pwd(), '/') |
| 807 # make sure we can't escape from root directory |
| 808 self.client.sendcmd('cdup') |
| 809 self.assertEqual(self.client.pwd(), '/') |
| 810 |
| 811 def test_mkd(self): |
| 812 tempdir = os.path.basename(tempfile.mktemp(dir=HOME)) |
| 813 self.client.mkd(tempdir) |
| 814 # make sure we can't create directories which already exist |
| 815 # (probably not really necessary); |
| 816 # let's use a try/except statement to avoid leaving behind |
| 817 # orphaned temporary directory in the event of a test failure. |
| 818 try: |
| 819 self.client.mkd(tempdir) |
| 820 except ftplib.error_perm: |
| 821 os.rmdir(tempdir) # ok |
| 822 else: |
| 823 self.fail('ftplib.error_perm not raised.') |
| 824 |
| 825 def test_rmd(self): |
| 826 self.client.rmd(self.tempdir) |
| 827 self.assertRaises(ftplib.error_perm, self.client.rmd, self.tempfile) |
| 828 # make sure we can't remove the root directory |
| 829 self.assertRaises(ftplib.error_perm, self.client.rmd, '/') |
| 830 |
| 831 def test_dele(self): |
| 832 self.client.delete(self.tempfile) |
| 833 self.assertRaises(ftplib.error_perm, self.client.delete, self.tempdir) |
| 834 |
| 835 def test_rnfr_rnto(self): |
| 836 # rename file |
| 837 tempname = os.path.basename(tempfile.mktemp(dir=HOME)) |
| 838 self.client.rename(self.tempfile, tempname) |
| 839 self.client.rename(tempname, self.tempfile) |
| 840 # rename dir |
| 841 tempname = os.path.basename(tempfile.mktemp(dir=HOME)) |
| 842 self.client.rename(self.tempdir, tempname) |
| 843 self.client.rename(tempname, self.tempdir) |
| 844 # rnfr/rnto over non-existing paths |
| 845 bogus = os.path.basename(tempfile.mktemp(dir=HOME)) |
| 846 self.assertRaises(ftplib.error_perm, self.client.rename, bogus, '/x') |
| 847 self.assertRaises(ftplib.error_perm, self.client.rename, self.tempfile,
'/') |
| 848 # make sure we can't rename root directory |
| 849 self.assertRaises(ftplib.error_perm, self.client.rename, '/', '/x') |
| 850 |
| 851 def test_mdtm(self): |
| 852 self.client.sendcmd('mdtm ' + self.tempfile) |
| 853 # make sure we can't use mdtm against directories |
| 854 try: |
| 855 self.client.sendcmd('mdtm ' + self.tempdir) |
| 856 except ftplib.error_perm, err: |
| 857 self.failUnless("not retrievable" in str(err)) |
| 858 else: |
| 859 self.fail('Exception not raised') |
| 860 |
| 861 def test_size(self): |
| 862 self.client.size(self.tempfile) |
| 863 # make sure we can't use size against directories |
| 864 try: |
| 865 self.client.sendcmd('size ' + self.tempdir) |
| 866 except ftplib.error_perm, err: |
| 867 self.failUnless("not retrievable" in str(err)) |
| 868 else: |
| 869 self.fail('Exception not raised') |
| 870 |
| 871 |
| 872 class TestFtpRetrieveData(unittest.TestCase): |
| 873 "test: RETR, REST, LIST, NLST, argumented STAT" |
| 874 |
| 875 def setUp(self): |
| 876 self.server = FTPd() |
| 877 self.server.start() |
| 878 self.client = ftplib.FTP() |
| 879 self.client.connect(self.server.host, self.server.port) |
| 880 self.client.login(USER, PASSWD) |
| 881 self.f1 = open(TESTFN, 'w+b') |
| 882 self.f2 = open(TESTFN2, 'w+b') |
| 883 |
| 884 def tearDown(self): |
| 885 self.client.close() |
| 886 self.server.stop() |
| 887 if not self.f1.closed: |
| 888 self.f1.close() |
| 889 if not self.f2.closed: |
| 890 self.f2.close() |
| 891 os.remove(TESTFN) |
| 892 os.remove(TESTFN2) |
| 893 |
| 894 def test_retr(self): |
| 895 data = 'abcde12345' * 100000 |
| 896 self.f1.write(data) |
| 897 self.f1.close() |
| 898 self.client.retrbinary("retr " + TESTFN, self.f2.write) |
| 899 self.f2.seek(0) |
| 900 self.assertEqual(hash(data), hash(self.f2.read())) |
| 901 |
| 902 def test_restore_on_retr(self): |
| 903 data = 'abcde12345' * 100000 |
| 904 fname_1 = os.path.basename(self.f1.name) |
| 905 self.f1.write(data) |
| 906 self.f1.close() |
| 907 |
| 908 # look at ftplib.FTP.retrbinary method to understand this mess |
| 909 self.client.voidcmd('TYPE I') |
| 910 conn = self.client.transfercmd('retr ' + fname_1) |
| 911 chunk = conn.recv(len(data) / 2) |
| 912 self.f2.write(chunk) |
| 913 conn.close() |
| 914 # transfer wasn't finished yet so we expect a 426 response |
| 915 self.assertRaises(ftplib.error_temp, self.client.voidresp) |
| 916 |
| 917 # resuming transfer by using a marker value greater than the |
| 918 # file size stored on the server should result in an error |
| 919 # on retr (RFC-1123) |
| 920 file_size = self.client.size(fname_1) |
| 921 self.client.sendcmd('rest %s' %((file_size + 1))) |
| 922 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'retr ' + fnam
e_1) |
| 923 |
| 924 # test resume |
| 925 self.client.sendcmd('rest %s' %len(chunk)) |
| 926 self.client.retrbinary("retr " + fname_1, self.f2.write) |
| 927 self.f2.seek(0) |
| 928 self.assertEqual(hash(data), hash (self.f2.read())) |
| 929 |
| 930 def _test_listing_cmds(self, cmd): |
| 931 """Tests common to LIST NLST and MLSD commands.""" |
| 932 # assume that no argument has the same meaning of "/" |
| 933 l1 = l2 = [] |
| 934 self.client.retrlines(cmd, l1.append) |
| 935 self.client.retrlines(cmd + ' /', l2.append) |
| 936 self.assertEqual(l1, l2) |
| 937 if cmd.lower() != 'mlsd': |
| 938 # if pathname is a file one line is expected |
| 939 x = [] |
| 940 self.client.retrlines('%s ' %cmd + TESTFN, x.append) |
| 941 self.assertEqual(len(x), 1) |
| 942 self.failUnless(''.join(x).endswith(TESTFN)) |
| 943 # non-existent path, 550 response is expected |
| 944 bogus = os.path.basename(tempfile.mktemp(dir=HOME)) |
| 945 self.assertRaises(ftplib.error_perm, self.client.retrlines, |
| 946 '%s ' %cmd + bogus, lambda x: x) |
| 947 # for an empty directory we excpect that the data channel is |
| 948 # opened anyway and that no data is received |
| 949 x = [] |
| 950 tempdir = os.path.basename(tempfile.mkdtemp(dir=HOME)) |
| 951 try: |
| 952 self.client.retrlines('%s %s' %(cmd, tempdir), x.append) |
| 953 self.assertEqual(x, []) |
| 954 finally: |
| 955 os.rmdir(tempdir) |
| 956 |
| 957 def test_nlst(self): |
| 958 # common tests |
| 959 self._test_listing_cmds('nlst') |
| 960 |
| 961 def test_list(self): |
| 962 # common tests |
| 963 self._test_listing_cmds('list') |
| 964 # known incorrect pathname arguments (e.g. old clients) are |
| 965 # expected to be treated as if pathname would be == '/' |
| 966 l1 = l2 = l3 = l4 = l5 = [] |
| 967 self.client.retrlines('list /', l1.append) |
| 968 self.client.retrlines('list -a', l2.append) |
| 969 self.client.retrlines('list -l', l3.append) |
| 970 self.client.retrlines('list -al', l4.append) |
| 971 self.client.retrlines('list -la', l5.append) |
| 972 tot = (l1, l2, l3, l4, l5) |
| 973 for x in range(len(tot) - 1): |
| 974 self.assertEqual(tot[x], tot[x+1]) |
| 975 |
| 976 def test_mlst(self): |
| 977 # utility function for extracting the line of interest |
| 978 mlstline = lambda cmd: self.client.voidcmd(cmd).split('\n')[1] |
| 979 |
| 980 # the fact set must be preceded by a space |
| 981 self.failUnless(mlstline('mlst').startswith(' ')) |
| 982 # where TVFS is supported, a fully qualified pathname is expected |
| 983 self.failUnless(mlstline('mlst ' + TESTFN).endswith('/' + TESTFN)) |
| 984 self.failUnless(mlstline('mlst').endswith('/')) |
| 985 # assume that no argument has the same meaning of "/" |
| 986 self.assertEqual(mlstline('mlst'), mlstline('mlst /')) |
| 987 # non-existent path |
| 988 bogus = os.path.basename(tempfile.mktemp(dir=HOME)) |
| 989 self.assertRaises(ftplib.error_perm, mlstline, bogus) |
| 990 # test file/dir notations |
| 991 self.failUnless('type=dir' in mlstline('mlst')) |
| 992 self.failUnless('type=file' in mlstline('mlst ' + TESTFN)) |
| 993 # let's add some tests for OPTS command |
| 994 self.client.sendcmd('opts mlst type;') |
| 995 self.assertEqual(mlstline('mlst'), ' type=dir; /') |
| 996 # where no facts are present, two leading spaces before the |
| 997 # pathname are required (RFC-3659) |
| 998 self.client.sendcmd('opts mlst') |
| 999 self.assertEqual(mlstline('mlst'), ' /') |
| 1000 |
| 1001 def test_mlsd(self): |
| 1002 # common tests |
| 1003 self._test_listing_cmds('mlsd') |
| 1004 dir = os.path.basename(tempfile.mkdtemp(dir=HOME)) |
| 1005 try: |
| 1006 try: |
| 1007 self.client.retrlines('mlsd ' + TESTFN, lambda x: x) |
| 1008 except ftplib.error_perm, resp: |
| 1009 # if path is a file a 501 response code is expected |
| 1010 self.assertEqual(str(resp)[0:3], "501") |
| 1011 else: |
| 1012 self.fail("Exception not raised") |
| 1013 finally: |
| 1014 os.rmdir(dir) |
| 1015 |
| 1016 def test_stat(self): |
| 1017 # test STAT provided with argument which is equal to LIST |
| 1018 self.client.sendcmd('stat /') |
| 1019 self.client.sendcmd('stat ' + TESTFN) |
| 1020 self.client.putcmd('stat *') |
| 1021 resp = self.client.getmultiline() |
| 1022 self.assertEqual(resp, '550 Globbing not supported.') |
| 1023 bogus = os.path.basename(tempfile.mktemp(dir=HOME)) |
| 1024 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stat ' + bogu
s) |
| 1025 |
| 1026 |
| 1027 class TestFtpAbort(unittest.TestCase): |
| 1028 "test: ABOR" |
| 1029 |
| 1030 def setUp(self): |
| 1031 self.server = FTPd() |
| 1032 self.server.start() |
| 1033 self.client = ftplib.FTP() |
| 1034 self.client.connect(self.server.host, self.server.port) |
| 1035 self.client.login(USER, PASSWD) |
| 1036 self.f1 = open(TESTFN, 'w+b') |
| 1037 self.f2 = open(TESTFN2, 'w+b') |
| 1038 |
| 1039 def tearDown(self): |
| 1040 self.client.close() |
| 1041 self.server.stop() |
| 1042 if not self.f1.closed: |
| 1043 self.f1.close() |
| 1044 if not self.f2.closed: |
| 1045 self.f2.close() |
| 1046 os.remove(self.f1.name) |
| 1047 os.remove(self.f2.name) |
| 1048 |
| 1049 def test_abor_no_data(self): |
| 1050 # Case 1: ABOR while no data channel is opened: respond with 225. |
| 1051 resp = self.client.sendcmd('ABOR') |
| 1052 self.failUnlessEqual('225 No transfer to abort.', resp) |
| 1053 |
| 1054 def test_abor_pasv(self): |
| 1055 # Case 2: user sends a PASV, a data-channel socket is listening |
| 1056 # but not connected, and ABOR is sent: close listening data |
| 1057 # socket, respond with 225. |
| 1058 self.client.makepasv() |
| 1059 respcode = self.client.sendcmd('ABOR')[:3] |
| 1060 self.failUnlessEqual('225', respcode) |
| 1061 |
| 1062 def test_abor_port(self): |
| 1063 # Case 3: data channel opened with PASV or PORT, but ABOR sent |
| 1064 # before a data transfer has been started: close data channel, |
| 1065 # respond with 225 |
| 1066 self.client.makeport() |
| 1067 respcode = self.client.sendcmd('ABOR')[:3] |
| 1068 self.failUnlessEqual('225', respcode) |
| 1069 |
| 1070 def test_abor(self): |
| 1071 # Case 4: ABOR while a data transfer on DTP channel is in |
| 1072 # progress: close data channel, respond with 426, respond |
| 1073 # with 226. |
| 1074 data = 'abcde12345' * 100000 |
| 1075 self.f1.write(data) |
| 1076 self.f1.close() |
| 1077 |
| 1078 # this ugly loop construct is to simulate an interrupted |
| 1079 # transfer since ftplib doesn't like running storbinary() |
| 1080 # in a separate thread |
| 1081 self.client.voidcmd('TYPE I') |
| 1082 conn = self.client.transfercmd('retr ' + TESTFN) |
| 1083 chunk = conn.recv(len(data) / 2) |
| 1084 # stop transfer while it isn't finished yet |
| 1085 self.client.putcmd('ABOR') |
| 1086 |
| 1087 # transfer isn't finished yet so ftpd should respond with 426 |
| 1088 self.assertRaises(ftplib.error_temp, self.client.voidresp) |
| 1089 |
| 1090 # transfer successfully aborted, so should now respond with a 226 |
| 1091 self.failUnlessEqual('226', self.client.voidresp()[:3]) |
| 1092 |
| 1093 if hasattr(socket, 'MSG_OOB'): |
| 1094 def test_oob_abor(self): |
| 1095 # Send ABOR by following the RFC-959 directives of sending |
| 1096 # Telnet IP/Synch sequence as OOB data. |
| 1097 # On some systems like FreeBSD this happened to be a problem |
| 1098 # due to a different SO_OOBINLINE behavior. |
| 1099 # On some platforms (e.g. Python CE) the test may fail |
| 1100 # although the MSG_OOB constant is defined. |
| 1101 self.client.sock.sendall(chr(244), socket.MSG_OOB) |
| 1102 self.client.sock.sendall(chr(242), socket.MSG_OOB) |
| 1103 self.client.sock.sendall('abor\r\n') |
| 1104 self.client.sock.settimeout(1) |
| 1105 self.assertEqual(self.client.getresp()[:3], '225') |
| 1106 |
| 1107 |
| 1108 class TestFtpStoreData(unittest.TestCase): |
| 1109 "test: STOR, STOU, APPE, REST" |
| 1110 |
| 1111 def setUp(self): |
| 1112 self.server = FTPd() |
| 1113 self.server.start() |
| 1114 self.client = ftplib.FTP() |
| 1115 self.client.connect(self.server.host, self.server.port) |
| 1116 self.client.login(USER, PASSWD) |
| 1117 self.f1 = open(TESTFN, 'w+b') |
| 1118 self.f2 = open(TESTFN2, 'w+b') |
| 1119 |
| 1120 def tearDown(self): |
| 1121 self.client.close() |
| 1122 self.server.stop() |
| 1123 if not self.f1.closed: |
| 1124 self.f1.close() |
| 1125 if not self.f2.closed: |
| 1126 self.f2.close() |
| 1127 os.remove(TESTFN) |
| 1128 os.remove(TESTFN2) |
| 1129 |
| 1130 def test_stor(self): |
| 1131 # TESTFN3 is the remote file name |
| 1132 try: |
| 1133 data = 'abcde12345' * 100000 |
| 1134 self.f1.write(data) |
| 1135 self.f1.seek(0) |
| 1136 self.client.storbinary('stor ' + TESTFN3, self.f1) |
| 1137 self.client.retrbinary('retr ' + TESTFN3, self.f2.write) |
| 1138 self.f2.seek(0) |
| 1139 self.assertEqual(hash(data), hash (self.f2.read())) |
| 1140 finally: |
| 1141 # we do not use os.remove because file could be still |
| 1142 # locked by ftpd thread |
| 1143 if os.path.exists(TESTFN3): |
| 1144 self.client.delete(TESTFN3) |
| 1145 |
| 1146 def test_stou(self): |
| 1147 data = 'abcde12345' * 100000 |
| 1148 self.f1.write(data) |
| 1149 self.f1.seek(0) |
| 1150 |
| 1151 self.client.voidcmd('TYPE I') |
| 1152 # filename comes in as "1xx FILE: <filename>" |
| 1153 filename = self.client.sendcmd('stou').split('FILE: ')[1] |
| 1154 try: |
| 1155 sock = self.client.makeport() |
| 1156 conn, sockaddr = sock.accept() |
| 1157 while 1: |
| 1158 buf = self.f1.read(8192) |
| 1159 if not buf: |
| 1160 break |
| 1161 conn.sendall(buf) |
| 1162 conn.close() |
| 1163 # transfer finished, a 226 response is expected |
| 1164 self.client.voidresp() |
| 1165 self.client.retrbinary('retr ' + filename, self.f2.write) |
| 1166 self.f2.seek(0) |
| 1167 self.assertEqual(hash(data), hash (self.f2.read())) |
| 1168 finally: |
| 1169 # we do not use os.remove because file could be |
| 1170 # still locked by ftpd thread |
| 1171 if os.path.exists(filename): |
| 1172 self.client.delete(filename) |
| 1173 |
| 1174 def test_stou_rest(self): |
| 1175 # watch for STOU preceded by REST, which makes no sense. |
| 1176 self.client.sendcmd('rest 10') |
| 1177 self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'stou') |
| 1178 |
| 1179 def test_appe(self): |
| 1180 # TESTFN3 is the remote file name |
| 1181 try: |
| 1182 data1 = 'abcde12345' * 100000 |
| 1183 self.f1.write(data1) |
| 1184 self.f1.seek(0) |
| 1185 self.client.storbinary('stor ' + TESTFN3, self.f1) |
| 1186 |
| 1187 data2 = 'fghil67890' * 100000 |
| 1188 self.f1.write(data2) |
| 1189 self.f1.seek(self.client.size(TESTFN3)) |
| 1190 self.client.storbinary('appe ' + TESTFN3, self.f1) |
| 1191 |
| 1192 self.client.retrbinary("retr " + TESTFN3, self.f2.write) |
| 1193 self.f2.seek(0) |
| 1194 self.assertEqual(hash(data1 + data2), hash (self.f2.read())) |
| 1195 finally: |
| 1196 # we do not use os.remove because file could be still |
| 1197 # locked by ftpd thread |
| 1198 if os.path.exists(TESTFN3): |
| 1199 self.client.delete(TESTFN3) |
| 1200 |
| 1201 def test_appe_rest(self): |
| 1202 # watch for APPE preceded by REST, which makes no sense. |
| 1203 self.client.sendcmd('rest 10') |
| 1204 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'appe x') |
| 1205 |
| 1206 def test_rest_on_stor(self): |
| 1207 # TESTFN3 is the remote file name |
| 1208 data = 'abcde12345' * 100000 |
| 1209 self.f1.write(data) |
| 1210 self.f1.seek(0) |
| 1211 |
| 1212 self.client.voidcmd('TYPE I') |
| 1213 conn = self.client.transfercmd('stor ' + TESTFN3) |
| 1214 bytes_sent = 0 |
| 1215 while 1: |
| 1216 chunk = self.f1.read(8192) |
| 1217 conn.sendall(chunk) |
| 1218 bytes_sent += len(chunk) |
| 1219 # stop transfer while it isn't finished yet |
| 1220 if bytes_sent >= 524288: # 2^19 |
| 1221 break |
| 1222 elif not chunk: |
| 1223 break |
| 1224 conn.close() |
| 1225 # transfer wasn't finished yet so we expect a 426 response |
| 1226 self.client.voidresp() |
| 1227 |
| 1228 # resuming transfer by using a marker value greater than the |
| 1229 # file size stored on the server should result in an error |
| 1230 # on stor |
| 1231 file_size = self.client.size(TESTFN3) |
| 1232 self.assertEqual(file_size, bytes_sent) |
| 1233 self.client.sendcmd('rest %s' %((file_size + 1))) |
| 1234 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stor ' + TEST
FN3) |
| 1235 |
| 1236 self.client.sendcmd('rest %s' %bytes_sent) |
| 1237 self.client.storbinary('stor ' + TESTFN3, self.f1) |
| 1238 |
| 1239 self.client.retrbinary('retr ' + TESTFN3, self.f2.write) |
| 1240 self.f1.seek(0) |
| 1241 self.f2.seek(0) |
| 1242 self.assertEqual(hash(self.f1.read()), hash(self.f2.read())) |
| 1243 self.client.delete(TESTFN3) |
| 1244 |
| 1245 |
| 1246 class TestTimeouts(unittest.TestCase): |
| 1247 """Test idle-timeout capabilities of control and data channels. |
| 1248 Some tests may fail on slow machines. |
| 1249 """ |
| 1250 |
| 1251 def _setUp(self, idle_timeout=300, data_timeout=300, pasv_timeout=30): |
| 1252 self.server = FTPd() |
| 1253 self.server.handler.timeout = idle_timeout |
| 1254 self.server.handler.dtp_handler.timeout = data_timeout |
| 1255 self.server.handler.passive_dtp.timeout = pasv_timeout |
| 1256 self.server.start() |
| 1257 self.client = ftplib.FTP() |
| 1258 self.client.connect(self.server.host, self.server.port) |
| 1259 self.client.login(USER, PASSWD) |
| 1260 |
| 1261 def tearDown(self): |
| 1262 self.client.close() |
| 1263 self.server.handler.timeout = 300 |
| 1264 self.server.handler.dtp_handler.timeout = 300 |
| 1265 self.server.handler.passive_dtp.timeout = 30 |
| 1266 self.server.stop() |
| 1267 |
| 1268 def test_idle_timeout(self): |
| 1269 # Test control channel timeout. The client which does not send |
| 1270 # any command within the time specified in FTPHandler.timeout is |
| 1271 # supposed to be kicked off. |
| 1272 self._setUp(idle_timeout=0.1) |
| 1273 # fail if no msg is received within 1 second |
| 1274 self.client.sock.settimeout(1) |
| 1275 data = self.client.sock.recv(1024) |
| 1276 self.assertEqual(data, "421 Control connection timed out.\r\n") |
| 1277 # ensure client has been kicked off |
| 1278 self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop') |
| 1279 |
| 1280 def test_data_timeout(self): |
| 1281 # Test data channel timeout. The client which does not send |
| 1282 # or receive any data within the time specified in |
| 1283 # DTPHandler.timeout is supposed to be kicked off. |
| 1284 self._setUp(data_timeout=0.1) |
| 1285 addr = self.client.makepasv() |
| 1286 s = socket.socket() |
| 1287 s.connect(addr) |
| 1288 # fail if no msg is received within 1 second |
| 1289 self.client.sock.settimeout(1) |
| 1290 data = self.client.sock.recv(1024) |
| 1291 self.assertEqual(data, "421 Data connection timed out.\r\n") |
| 1292 # ensure client has been kicked off |
| 1293 self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop') |
| 1294 |
| 1295 def test_idle_data_timeout1(self): |
| 1296 # Tests that the control connection timeout is suspended while |
| 1297 # the data channel is opened |
| 1298 self._setUp(idle_timeout=0.1, data_timeout=0.2) |
| 1299 addr = self.client.makepasv() |
| 1300 s = socket.socket() |
| 1301 s.connect(addr) |
| 1302 # fail if no msg is received within 1 second |
| 1303 self.client.sock.settimeout(1) |
| 1304 data = self.client.sock.recv(1024) |
| 1305 self.assertEqual(data, "421 Data connection timed out.\r\n") |
| 1306 # ensure client has been kicked off |
| 1307 self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop') |
| 1308 |
| 1309 def test_idle_data_timeout2(self): |
| 1310 # Tests that the control connection timeout is restarted after |
| 1311 # data channel has been closed |
| 1312 self._setUp(idle_timeout=0.1, data_timeout=0.2) |
| 1313 addr = self.client.makepasv() |
| 1314 s = socket.socket() |
| 1315 s.connect(addr) |
| 1316 # close data channel |
| 1317 self.client.sendcmd('abor') |
| 1318 self.client.sock.settimeout(1) |
| 1319 data = self.client.sock.recv(1024) |
| 1320 self.assertEqual(data, "421 Control connection timed out.\r\n") |
| 1321 # ensure client has been kicked off |
| 1322 self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop') |
| 1323 |
| 1324 def test_pasv_timeout(self): |
| 1325 # Test pasv data channel timeout. The client which does not connect |
| 1326 # to the listening data socket within the time specified in |
| 1327 # PassiveDTP.timeout is supposed to receive a 421 response. |
| 1328 self._setUp(pasv_timeout=0.1) |
| 1329 self.client.makepasv() |
| 1330 # fail if no msg is received within 1 second |
| 1331 self.client.sock.settimeout(1) |
| 1332 data = self.client.sock.recv(1024) |
| 1333 self.assertEqual(data, "421 Passive data channel timed out.\r\n") |
| 1334 # client is not expected to be kicked off |
| 1335 self.client.sendcmd('noop') |
| 1336 |
| 1337 |
| 1338 class TestMaxConnections(unittest.TestCase): |
| 1339 """Test maximum connections (FTPServer.max_cons).""" |
| 1340 |
| 1341 def setUp(self): |
| 1342 self.server = FTPd() |
| 1343 self.server.server.max_cons = 3 |
| 1344 self.server.start() |
| 1345 |
| 1346 def tearDown(self): |
| 1347 self.server.server.max_cons = 0 |
| 1348 self.server.stop() |
| 1349 |
| 1350 def test_max_connections(self): |
| 1351 c1 = ftplib.FTP() |
| 1352 c2 = ftplib.FTP() |
| 1353 c3 = ftplib.FTP() |
| 1354 try: |
| 1355 c1.connect(self.server.host, self.server.port) |
| 1356 c2.connect(self.server.host, self.server.port) |
| 1357 self.assertRaises(ftplib.error_temp, c3.connect, self.server.host, |
| 1358 self.server.port) |
| 1359 # with passive data channel established |
| 1360 c2.close() |
| 1361 c1.login(USER, PASSWD) |
| 1362 c1.makepasv() |
| 1363 self.assertRaises(ftplib.error_temp, c2.connect, self.server.host, |
| 1364 self.server.port) |
| 1365 # with passive data socket waiting for connection |
| 1366 c1.login(USER, PASSWD) |
| 1367 c1.sendcmd('pasv') |
| 1368 self.assertRaises(ftplib.error_temp, c2.connect, self.server.host, |
| 1369 self.server.port) |
| 1370 # with active data channel established |
| 1371 c1.login(USER, PASSWD) |
| 1372 c1.makeport() |
| 1373 self.assertRaises(ftplib.error_temp, c2.connect, self.server.host, |
| 1374 self.server.port) |
| 1375 finally: |
| 1376 c1.close() |
| 1377 c2.close() |
| 1378 c3.close() |
| 1379 |
| 1380 |
| 1381 class _TestNetworkProtocols(unittest.TestCase): |
| 1382 """Test PASV, EPSV, PORT and EPRT commands. |
| 1383 |
| 1384 Do not use this class directly. Let TestIPv4Environment and |
| 1385 TestIPv6Environment classes use it instead. |
| 1386 """ |
| 1387 HOST = HOST |
| 1388 |
| 1389 def setUp(self): |
| 1390 self.server = FTPd(self.HOST) |
| 1391 self.server.start() |
| 1392 self.client = ftplib.FTP() |
| 1393 self.client.connect(self.server.host, self.server.port) |
| 1394 self.client.login(USER, PASSWD) |
| 1395 if self.client.af == socket.AF_INET: |
| 1396 self.proto = "1" |
| 1397 self.other_proto = "2" |
| 1398 else: |
| 1399 self.proto = "2" |
| 1400 self.other_proto = "1" |
| 1401 |
| 1402 def tearDown(self): |
| 1403 self.client.close() |
| 1404 self.server.stop() |
| 1405 |
| 1406 def cmdresp(self, cmd): |
| 1407 """Send a command and return response, also if the command failed.""" |
| 1408 try: |
| 1409 return self.client.sendcmd(cmd) |
| 1410 except ftplib.Error, err: |
| 1411 return str(err) |
| 1412 |
| 1413 def test_eprt(self): |
| 1414 # test wrong proto |
| 1415 try: |
| 1416 self.client.sendcmd('eprt |%s|%s|%s|' %(self.other_proto, |
| 1417 self.server.host, self.server.port)) |
| 1418 except ftplib.error_perm, err: |
| 1419 self.assertEqual(str(err)[0:3], "522") |
| 1420 else: |
| 1421 self.fail("Exception not raised") |
| 1422 |
| 1423 # test bad args |
| 1424 msg = "501 Invalid EPRT format." |
| 1425 # len('|') > 3 |
| 1426 self.assertEqual(self.cmdresp('eprt ||||'), msg) |
| 1427 # len('|') < 3 |
| 1428 self.assertEqual(self.cmdresp('eprt ||'), msg) |
| 1429 # port > 65535 |
| 1430 self.assertEqual(self.cmdresp('eprt |%s|%s|65536|' %(self.proto, |
| 1431 self.HOST)), msg) |
| 1432 # port < 0 |
| 1433 self.assertEqual(self.cmdresp('eprt |%s|%s|-1|' %(self.proto, |
| 1434 self.HOST)), msg) |
| 1435 # port < 1024 |
| 1436 self.assertEqual(self.cmdresp('eprt |%s|%s|222|' %(self.proto, |
| 1437 self.HOST)), "501 Can't connect over a privileged port.") |
| 1438 |
| 1439 # test connection |
| 1440 sock = socket.socket(self.client.af, socket.SOCK_STREAM) |
| 1441 sock.bind((self.client.sock.getsockname()[0], 0)) |
| 1442 sock.listen(5) |
| 1443 sock.settimeout(2) |
| 1444 ip, port = sock.getsockname()[:2] |
| 1445 self.client.sendcmd('eprt |%s|%s|%s|' %(self.proto, ip, port)) |
| 1446 try: |
| 1447 try: |
| 1448 sock.accept() |
| 1449 except socket.timeout: |
| 1450 self.fail("Server didn't connect to passive socket") |
| 1451 finally: |
| 1452 sock.close() |
| 1453 |
| 1454 def test_epsv(self): |
| 1455 # test wrong proto |
| 1456 try: |
| 1457 self.client.sendcmd('epsv ' + self.other_proto) |
| 1458 except ftplib.error_perm, err: |
| 1459 self.assertEqual(str(err)[0:3], "522") |
| 1460 else: |
| 1461 self.fail("Exception not raised") |
| 1462 |
| 1463 # test connection |
| 1464 for cmd in ('EPSV', 'EPSV ' + self.proto): |
| 1465 host, port = ftplib.parse229(self.client.sendcmd(cmd), |
| 1466 self.client.sock.getpeername()) |
| 1467 s = socket.socket(self.client.af, socket.SOCK_STREAM) |
| 1468 s.settimeout(2) |
| 1469 try: |
| 1470 s.connect((host, port)) |
| 1471 self.client.sendcmd('abor') |
| 1472 finally: |
| 1473 s.close() |
| 1474 |
| 1475 def test_epsv_all(self): |
| 1476 self.client.sendcmd('epsv all') |
| 1477 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pasv') |
| 1478 self.assertRaises(ftplib.error_perm, self.client.sendport, self.HOST, 20
00) |
| 1479 self.assertRaises(ftplib.error_perm, self.client.sendcmd, |
| 1480 'eprt |%s|%s|%s|' %(self.proto, self.HOST, 2000)) |
| 1481 |
| 1482 |
| 1483 class TestIPv4Environment(_TestNetworkProtocols): |
| 1484 """Test PASV, EPSV, PORT and EPRT commands. |
| 1485 |
| 1486 Runs tests contained in _TestNetworkProtocols class by using IPv4 |
| 1487 plus some additional specific tests. |
| 1488 """ |
| 1489 HOST = '127.0.0.1' |
| 1490 |
| 1491 def test_port_v4(self): |
| 1492 # test connection |
| 1493 self.client.makeport() |
| 1494 self.client.sendcmd('abor') |
| 1495 # test bad arguments |
| 1496 ae = self.assertEqual |
| 1497 msg = "501 Invalid PORT format." |
| 1498 ae(self.cmdresp('port 127,0,0,1,1.1'), msg) # sep != ',' |
| 1499 ae(self.cmdresp('port X,0,0,1,1,1'), msg) # value != int |
| 1500 ae(self.cmdresp('port 127,0,0,1,1,1,1'), msg) # len(args) > 6 |
| 1501 ae(self.cmdresp('port 127,0,0,1'), msg) # len(args) < 6 |
| 1502 ae(self.cmdresp('port 256,0,0,1,1,1'), msg) # oct > 255 |
| 1503 ae(self.cmdresp('port 127,0,0,1,256,1'), msg) # port > 65535 |
| 1504 ae(self.cmdresp('port 127,0,0,1,-1,0'), msg) # port < 0 |
| 1505 msg = "501 Can't connect over a privileged port." |
| 1506 ae(self.cmdresp('port %s,1,1' %self.HOST.replace('.',',')),msg) # port <
1024 |
| 1507 if "1.2.3.4" != self.HOST: |
| 1508 msg = "501 Can't connect to a foreign address." |
| 1509 ae(self.cmdresp('port 1,2,3,4,4,4'), msg) |
| 1510 |
| 1511 def test_eprt_v4(self): |
| 1512 self.assertEqual(self.cmdresp('eprt |1|0.10.10.10|2222|'), |
| 1513 "501 Can't connect to a foreign address.") |
| 1514 |
| 1515 def test_pasv_v4(self): |
| 1516 host, port = ftplib.parse227(self.client.sendcmd('pasv')) |
| 1517 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| 1518 s.settimeout(2) |
| 1519 try: |
| 1520 s.connect((host, port)) |
| 1521 finally: |
| 1522 s.close() |
| 1523 |
| 1524 |
| 1525 class TestIPv6Environment(_TestNetworkProtocols): |
| 1526 """Test PASV, EPSV, PORT and EPRT commands. |
| 1527 |
| 1528 Runs tests contained in _TestNetworkProtocols class by using IPv6 |
| 1529 plus some additional specific tests. |
| 1530 """ |
| 1531 HOST = '::1' |
| 1532 |
| 1533 def test_port_v6(self): |
| 1534 # 425 expected |
| 1535 self.assertRaises(ftplib.error_temp, self.client.sendport, |
| 1536 self.server.host, self.server.port) |
| 1537 |
| 1538 def test_pasv_v6(self): |
| 1539 # 425 expected |
| 1540 self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'pasv') |
| 1541 |
| 1542 def test_eprt_v6(self): |
| 1543 self.assertEqual(self.cmdresp('eprt |2|::xxx|2222|'), |
| 1544 "501 Can't connect to a foreign address.") |
| 1545 |
| 1546 |
| 1547 class FTPd(threading.Thread): |
| 1548 """A threaded FTP server used for running tests.""" |
| 1549 |
| 1550 def __init__(self, host=HOST, port=0, verbose=False): |
| 1551 threading.Thread.__init__(self) |
| 1552 self.active = False |
| 1553 if not verbose: |
| 1554 ftpserver.log = ftpserver.logline = lambda x: x |
| 1555 self.authorizer = ftpserver.DummyAuthorizer() |
| 1556 self.authorizer.add_user(USER, PASSWD, HOME, perm='elradfmw') # full pe
rms |
| 1557 self.authorizer.add_anonymous(HOME) |
| 1558 self.handler = ftpserver.FTPHandler |
| 1559 self.handler.authorizer = self.authorizer |
| 1560 self.server = ftpserver.FTPServer((host, port), self.handler) |
| 1561 self.host, self.port = self.server.socket.getsockname()[:2] |
| 1562 self.active_lock = threading.Lock() |
| 1563 |
| 1564 def start(self): |
| 1565 assert not self.active |
| 1566 self.__flag = threading.Event() |
| 1567 threading.Thread.start(self) |
| 1568 self.__flag.wait() |
| 1569 |
| 1570 def run(self): |
| 1571 self.active = True |
| 1572 self.__flag.set() |
| 1573 while self.active: |
| 1574 self.active_lock.acquire() |
| 1575 self.server.serve_forever(timeout=0.001, count=1) |
| 1576 self.active_lock.release() |
| 1577 self.server.close_all(ignore_all=True) |
| 1578 |
| 1579 def stop(self): |
| 1580 assert self.active |
| 1581 self.active = False |
| 1582 self.join() |
| 1583 |
| 1584 |
| 1585 def remove_test_files(): |
| 1586 "Convenience function for removing temporary test files" |
| 1587 for file in [TESTFN, TESTFN2, TESTFN3]: |
| 1588 try: |
| 1589 os.remove(file) |
| 1590 except os.error: |
| 1591 pass |
| 1592 |
| 1593 def test_main(tests=None): |
| 1594 test_suite = unittest.TestSuite() |
| 1595 if tests is None: |
| 1596 tests = [ |
| 1597 TestAbstractedFS, |
| 1598 TestDummyAuthorizer, |
| 1599 TestCallLater, |
| 1600 TestFtpAuthentication, |
| 1601 TestFtpDummyCmds, |
| 1602 TestFtpCmdsSemantic, |
| 1603 TestFtpFsOperations, |
| 1604 TestFtpRetrieveData, |
| 1605 TestFtpAbort, |
| 1606 TestFtpStoreData, |
| 1607 TestTimeouts, |
| 1608 TestMaxConnections |
| 1609 ] |
| 1610 if SUPPORTS_IPV4: |
| 1611 tests.append(TestIPv4Environment) |
| 1612 if SUPPORTS_IPV6: |
| 1613 tests.append(TestIPv6Environment) |
| 1614 |
| 1615 for test in tests: |
| 1616 test_suite.addTest(unittest.makeSuite(test)) |
| 1617 remove_test_files() |
| 1618 unittest.TextTestRunner(verbosity=2).run(test_suite) |
| 1619 remove_test_files() |
| 1620 |
| 1621 |
| 1622 if __name__ == '__main__': |
| 1623 test_main() |
OLD | NEW |