| Index: third_party/pyftpdlib/test/test_ftpd.py
|
| ===================================================================
|
| --- third_party/pyftpdlib/test/test_ftpd.py (revision 0)
|
| +++ third_party/pyftpdlib/test/test_ftpd.py (revision 0)
|
| @@ -0,0 +1,1623 @@
|
| +#!/usr/bin/env python
|
| +# test_ftpd.py
|
| +
|
| +# ======================================================================
|
| +# Copyright (C) 2007 Giampaolo Rodola' <g.rodola@gmail.com>
|
| +#
|
| +# All Rights Reserved
|
| +#
|
| +# Permission to use, copy, modify, and distribute this software and
|
| +# its documentation for any purpose and without fee is hereby
|
| +# granted, provided that the above copyright notice appear in all
|
| +# copies and that both that copyright notice and this permission
|
| +# notice appear in supporting documentation, and that the name of
|
| +# Giampaolo Rodola' not be used in advertising or publicity pertaining to
|
| +# distribution of the software without specific, written prior
|
| +# permission.
|
| +#
|
| +# Giampaolo Rodola' DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
|
| +# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
|
| +# NO EVENT Giampaolo Rodola' BE LIABLE FOR ANY SPECIAL, INDIRECT OR
|
| +# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
|
| +# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
|
| +# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
|
| +# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
| +# ======================================================================
|
| +
|
| +
|
| +# This test suite has been run successfully on the following systems:
|
| +#
|
| +# -----------------------------------------------------------
|
| +# System | Python version
|
| +# -----------------------------------------------------------
|
| +# Linux Ubuntu 2.6.20-15 | 2.4, 2.5
|
| +# Linux Kubuntu 8.04 32 & 64 bits | 2.5.2
|
| +# Linux Debian 2.4.27-2-386 | 2.3.5
|
| +# Windows XP prof SP3 | 2.3, 2.4, 2.5, 2.6-RC2
|
| +# Windows Vista Ultimate 64 bit | 2.5.1
|
| +# Windows Vista Business 32 bit | 2.5.1
|
| +# Windows Server 2008 64bit | 2.5.1
|
| +# Windows Mobile 6.1 | PythonCE 2.5
|
| +# OS X 10.4.10 | 2.3, 2.4, 2.5
|
| +# FreeBSD 7.0 | 2.4, 2.5
|
| +# -----------------------------------------------------------
|
| +
|
| +
|
| +import threading
|
| +import unittest
|
| +import socket
|
| +import os
|
| +import time
|
| +import re
|
| +import tempfile
|
| +import ftplib
|
| +import random
|
| +import warnings
|
| +import sys
|
| +
|
| +from pyftpdlib import ftpserver
|
| +
|
| +
|
| +__release__ = 'pyftpdlib 0.5.0'
|
| +
|
| +# Attempt to use IP rather than hostname (test suite will run a lot faster)
|
| +try:
|
| + HOST = socket.gethostbyname('localhost')
|
| +except socket.error:
|
| + HOST = 'localhost'
|
| +USER = 'user'
|
| +PASSWD = '12345'
|
| +HOME = os.getcwd()
|
| +try:
|
| + from test.test_support import TESTFN
|
| +except ImportError:
|
| + TESTFN = 'temp-fname'
|
| +TESTFN2 = TESTFN + '2'
|
| +TESTFN3 = TESTFN + '3'
|
| +
|
| +def try_address(host, port=0):
|
| + """Try to bind a daemon on the given host:port and return True
|
| + if that has been possible."""
|
| + try:
|
| + ftpserver.FTPServer((host, port), None)
|
| + except socket.error:
|
| + return False
|
| + else:
|
| + return True
|
| +
|
| +SUPPORTS_IPV4 = try_address('127.0.0.1')
|
| +SUPPORTS_IPV6 = socket.has_ipv6 and try_address('::1')
|
| +
|
| +
|
| +class TestAbstractedFS(unittest.TestCase):
|
| + """Test for conversion utility methods of AbstractedFS class."""
|
| +
|
| + def test_ftpnorm(self):
|
| + # Tests for ftpnorm method.
|
| + ae = self.assertEquals
|
| + fs = ftpserver.AbstractedFS()
|
| +
|
| + fs.cwd = '/'
|
| + ae(fs.ftpnorm(''), '/')
|
| + ae(fs.ftpnorm('/'), '/')
|
| + ae(fs.ftpnorm('.'), '/')
|
| + ae(fs.ftpnorm('..'), '/')
|
| + ae(fs.ftpnorm('a'), '/a')
|
| + ae(fs.ftpnorm('/a'), '/a')
|
| + ae(fs.ftpnorm('/a/'), '/a')
|
| + ae(fs.ftpnorm('a/..'), '/')
|
| + ae(fs.ftpnorm('a/b'), '/a/b')
|
| + ae(fs.ftpnorm('a/b/..'), '/a')
|
| + ae(fs.ftpnorm('a/b/../..'), '/')
|
| + fs.cwd = '/sub'
|
| + ae(fs.ftpnorm(''), '/sub')
|
| + ae(fs.ftpnorm('/'), '/')
|
| + ae(fs.ftpnorm('.'), '/sub')
|
| + ae(fs.ftpnorm('..'), '/')
|
| + ae(fs.ftpnorm('a'), '/sub/a')
|
| + ae(fs.ftpnorm('a/'), '/sub/a')
|
| + ae(fs.ftpnorm('a/..'), '/sub')
|
| + ae(fs.ftpnorm('a/b'), '/sub/a/b')
|
| + ae(fs.ftpnorm('a/b/'), '/sub/a/b')
|
| + ae(fs.ftpnorm('a/b/..'), '/sub/a')
|
| + ae(fs.ftpnorm('a/b/../..'), '/sub')
|
| + ae(fs.ftpnorm('a/b/../../..'), '/')
|
| + ae(fs.ftpnorm('//'), '/') # UNC paths must be collapsed
|
| +
|
| + def test_ftp2fs(self):
|
| + # Tests for ftp2fs method.
|
| + ae = self.assertEquals
|
| + fs = ftpserver.AbstractedFS()
|
| + join = lambda x, y: os.path.join(x, y.replace('/', os.sep))
|
| +
|
| + def goforit(root):
|
| + fs.root = root
|
| + fs.cwd = '/'
|
| + ae(fs.ftp2fs(''), root)
|
| + ae(fs.ftp2fs('/'), root)
|
| + ae(fs.ftp2fs('.'), root)
|
| + ae(fs.ftp2fs('..'), root)
|
| + ae(fs.ftp2fs('a'), join(root, 'a'))
|
| + ae(fs.ftp2fs('/a'), join(root, 'a'))
|
| + ae(fs.ftp2fs('/a/'), join(root, 'a'))
|
| + ae(fs.ftp2fs('a/..'), root)
|
| + ae(fs.ftp2fs('a/b'), join(root, r'a/b'))
|
| + ae(fs.ftp2fs('/a/b'), join(root, r'a/b'))
|
| + ae(fs.ftp2fs('/a/b/..'), join(root, 'a'))
|
| + ae(fs.ftp2fs('/a/b/../..'), root)
|
| + fs.cwd = '/sub'
|
| + ae(fs.ftp2fs(''), join(root, 'sub'))
|
| + ae(fs.ftp2fs('/'), root)
|
| + ae(fs.ftp2fs('.'), join(root, 'sub'))
|
| + ae(fs.ftp2fs('..'), root)
|
| + ae(fs.ftp2fs('a'), join(root, 'sub/a'))
|
| + ae(fs.ftp2fs('a/'), join(root, 'sub/a'))
|
| + ae(fs.ftp2fs('a/..'), join(root, 'sub'))
|
| + ae(fs.ftp2fs('a/b'), join(root, 'sub/a/b'))
|
| + ae(fs.ftp2fs('a/b/..'), join(root, 'sub/a'))
|
| + ae(fs.ftp2fs('a/b/../..'), join(root, 'sub'))
|
| + ae(fs.ftp2fs('a/b/../../..'), root)
|
| + ae(fs.ftp2fs('//a'), join(root, 'a')) # UNC paths must be collapsed
|
| +
|
| + if os.sep == '\\':
|
| + goforit(r'C:\dir')
|
| + goforit('C:\\')
|
| + # on DOS-derived filesystems (e.g. Windows) this is the same
|
| + # as specifying the current drive directory (e.g. 'C:\\')
|
| + goforit('\\')
|
| + elif os.sep == '/':
|
| + goforit('/home/user')
|
| + goforit('/')
|
| + else:
|
| + # os.sep == ':'? Don't know... let's try it anyway
|
| + goforit(os.getcwd())
|
| +
|
| + def test_fs2ftp(self):
|
| + # Tests for fs2ftp method.
|
| + ae = self.assertEquals
|
| + fs = ftpserver.AbstractedFS()
|
| + join = lambda x, y: os.path.join(x, y.replace('/', os.sep))
|
| +
|
| + def goforit(root):
|
| + fs.root = root
|
| + ae(fs.fs2ftp(root), '/')
|
| + ae(fs.fs2ftp(join(root, '/')), '/')
|
| + ae(fs.fs2ftp(join(root, '.')), '/')
|
| + ae(fs.fs2ftp(join(root, '..')), '/') # can't escape from root
|
| + ae(fs.fs2ftp(join(root, 'a')), '/a')
|
| + ae(fs.fs2ftp(join(root, 'a/')), '/a')
|
| + ae(fs.fs2ftp(join(root, 'a/..')), '/')
|
| + ae(fs.fs2ftp(join(root, 'a/b')), '/a/b')
|
| + ae(fs.fs2ftp(join(root, 'a/b')), '/a/b')
|
| + ae(fs.fs2ftp(join(root, 'a/b/..')), '/a')
|
| + ae(fs.fs2ftp(join(root, '/a/b/../..')), '/')
|
| + fs.cwd = '/sub'
|
| + ae(fs.fs2ftp(join(root, 'a/')), '/a')
|
| +
|
| + if os.sep == '\\':
|
| + goforit(r'C:\dir')
|
| + goforit('C:\\')
|
| + # on DOS-derived filesystems (e.g. Windows) this is the same
|
| + # as specifying the current drive directory (e.g. 'C:\\')
|
| + goforit('\\')
|
| + fs.root = r'C:\dir'
|
| + ae(fs.fs2ftp('C:\\'), '/')
|
| + ae(fs.fs2ftp('D:\\'), '/')
|
| + ae(fs.fs2ftp('D:\\dir'), '/')
|
| + elif os.sep == '/':
|
| + goforit('/')
|
| + assert os.path.realpath('/__home/user') == '/__home/user', \
|
| + 'Test skipped (symlinks not allowed).'
|
| + goforit('/__home/user')
|
| + fs.root = '/__home/user'
|
| + ae(fs.fs2ftp('/__home'), '/')
|
| + ae(fs.fs2ftp('/'), '/')
|
| + ae(fs.fs2ftp('/__home/userx'), '/')
|
| + else:
|
| + # os.sep == ':'? Don't know... let's try it anyway
|
| + goforit(os.getcwd())
|
| +
|
| + def test_validpath(self):
|
| + # Tests for validpath method.
|
| + fs = ftpserver.AbstractedFS()
|
| + fs.root = HOME
|
| + self.failUnless(fs.validpath(HOME))
|
| + self.failUnless(fs.validpath(HOME + '/'))
|
| + self.failIf(fs.validpath(HOME + 'xxx'))
|
| +
|
| + if hasattr(os, 'symlink'):
|
| + # Tests for validpath on systems supporting symbolic links.
|
| +
|
| + def _safe_remove(self, path):
|
| + # convenience function for removing temporary files
|
| + try:
|
| + os.remove(path)
|
| + except os.error:
|
| + pass
|
| +
|
| + def test_validpath_validlink(self):
|
| + # Test validpath by issuing a symlink pointing to a path
|
| + # inside the root directory.
|
| + fs = ftpserver.AbstractedFS()
|
| + fs.root = HOME
|
| + try:
|
| + open(TESTFN, 'w')
|
| + os.symlink(TESTFN, TESTFN2)
|
| + self.failUnless(fs.validpath(TESTFN))
|
| + finally:
|
| + self._safe_remove(TESTFN)
|
| + self._safe_remove(TESTFN2)
|
| +
|
| + def test_validpath_external_symlink(self):
|
| + # Test validpath by issuing a symlink pointing to a path
|
| + # outside the root directory.
|
| + fs = ftpserver.AbstractedFS()
|
| + fs.root = HOME
|
| + try:
|
| + # tempfile should create our file in /tmp directory
|
| + # which should be outside the user root. If it is not
|
| + # we just skip the test.
|
| + file = tempfile.NamedTemporaryFile()
|
| + if HOME == os.path.dirname(file.name):
|
| + return
|
| + os.symlink(file.name, TESTFN)
|
| + self.failIf(fs.validpath(TESTFN))
|
| + finally:
|
| + self._safe_remove(TESTFN)
|
| + file.close()
|
| +
|
| +
|
| +class TestDummyAuthorizer(unittest.TestCase):
|
| + """Tests for DummyAuthorizer class."""
|
| +
|
| + # temporarily change warnings to exceptions for the purposes of testing
|
| + def setUp(self):
|
| + self.tempdir = tempfile.mkdtemp(dir=HOME)
|
| + self.subtempdir = tempfile.mkdtemp(dir=os.path.join(HOME, self.tempdir))
|
| + self.tempfile = open(os.path.join(self.tempdir, TESTFN), 'w').name
|
| + self.subtempfile = open(os.path.join(self.subtempdir, TESTFN), 'w').name
|
| + warnings.filterwarnings("error")
|
| +
|
| + def tearDown(self):
|
| + os.remove(self.tempfile)
|
| + os.remove(self.subtempfile)
|
| + os.rmdir(self.subtempdir)
|
| + os.rmdir(self.tempdir)
|
| + warnings.resetwarnings()
|
| +
|
| + def assertRaisesWithMsg(self, excClass, msg, callableObj, *args, **kwargs):
|
| + try:
|
| + callableObj(*args, **kwargs)
|
| + except excClass, why:
|
| + if str(why) == msg:
|
| + return
|
| + raise self.failureException("%s != %s" %(str(why), msg))
|
| + else:
|
| + if hasattr(excClass,'__name__'): excName = excClass.__name__
|
| + else: excName = str(excClass)
|
| + raise self.failureException, "%s not raised" % excName
|
| +
|
| + def test_common_methods(self):
|
| + auth = ftpserver.DummyAuthorizer()
|
| + # create user
|
| + auth.add_user(USER, PASSWD, HOME)
|
| + auth.add_anonymous(HOME)
|
| + # check credentials
|
| + self.failUnless(auth.validate_authentication(USER, PASSWD))
|
| + self.failIf(auth.validate_authentication(USER, 'wrongpwd'))
|
| + # remove them
|
| + auth.remove_user(USER)
|
| + auth.remove_user('anonymous')
|
| + # raise exc if user does not exists
|
| + self.assertRaises(KeyError, auth.remove_user, USER)
|
| + # raise exc if path does not exist
|
| + self.assertRaisesWithMsg(ftpserver.AuthorizerError,
|
| + 'No such directory: "%s"' %'?:\\',
|
| + auth.add_user, USER, PASSWD, '?:\\')
|
| + self.assertRaisesWithMsg(ftpserver.AuthorizerError,
|
| + 'No such directory: "%s"' %'?:\\',
|
| + auth.add_anonymous, '?:\\')
|
| + # raise exc if user already exists
|
| + auth.add_user(USER, PASSWD, HOME)
|
| + auth.add_anonymous(HOME)
|
| + self.assertRaisesWithMsg(ftpserver.AuthorizerError,
|
| + 'User "%s" already exists' %USER,
|
| + auth.add_user, USER, PASSWD, HOME)
|
| + self.assertRaisesWithMsg(ftpserver.AuthorizerError,
|
| + 'User "anonymous" already exists',
|
| + auth.add_anonymous, HOME)
|
| + auth.remove_user(USER)
|
| + auth.remove_user('anonymous')
|
| + # raise on wrong permission
|
| + self.assertRaisesWithMsg(ftpserver.AuthorizerError,
|
| + 'No such permission "?"',
|
| + auth.add_user, USER, PASSWD, HOME, perm='?')
|
| + self.assertRaisesWithMsg(ftpserver.AuthorizerError,
|
| + 'No such permission "?"',
|
| + auth.add_anonymous, HOME, perm='?')
|
| + # expect warning on write permissions assigned to anonymous user
|
| + for x in "adfmw":
|
| + self.assertRaisesWithMsg(RuntimeWarning,
|
| + "Write permissions assigned to anonymous user.",
|
| + auth.add_anonymous, HOME, perm=x)
|
| +
|
| + def test_override_perm_interface(self):
|
| + auth = ftpserver.DummyAuthorizer()
|
| + auth.add_user(USER, PASSWD, HOME, perm='elr')
|
| + # raise exc if user does not exists
|
| + self.assertRaises(KeyError, auth.override_perm, USER+'w', HOME, 'elr')
|
| + # raise exc if path does not exist or it's not a directory
|
| + self.assertRaisesWithMsg(ftpserver.AuthorizerError,
|
| + 'No such directory: "%s"' %'?:\\',
|
| + auth.override_perm, USER, '?:\\', 'elr')
|
| + self.assertRaisesWithMsg(ftpserver.AuthorizerError,
|
| + 'No such directory: "%s"' %self.tempfile,
|
| + auth.override_perm, USER, self.tempfile, 'elr')
|
| + # raise on wrong permission
|
| + self.assertRaisesWithMsg(ftpserver.AuthorizerError,
|
| + 'No such permission "?"', auth.override_perm,
|
| + USER, HOME, perm='?')
|
| + # expect warning on write permissions assigned to anonymous user
|
| + auth.add_anonymous(HOME)
|
| + for p in "adfmw":
|
| + self.assertRaisesWithMsg(RuntimeWarning,
|
| + "Write permissions assigned to anonymous user.",
|
| + auth.override_perm, 'anonymous', HOME, p)
|
| + # raise on attempt to override home directory permissions
|
| + self.assertRaisesWithMsg(ftpserver.AuthorizerError,
|
| + "Can't override home directory permissions",
|
| + auth.override_perm, USER, HOME, perm='w')
|
| + # raise on attempt to override a path escaping home directory
|
| + if os.path.dirname(HOME) != HOME:
|
| + self.assertRaisesWithMsg(ftpserver.AuthorizerError,
|
| + "Path escapes user home directory",
|
| + auth.override_perm, USER,
|
| + os.path.dirname(HOME), perm='w')
|
| + # try to re-set an overridden permission
|
| + auth.override_perm(USER, self.tempdir, perm='w')
|
| + auth.override_perm(USER, self.tempdir, perm='wr')
|
| +
|
| + def test_override_perm_recursive_paths(self):
|
| + auth = ftpserver.DummyAuthorizer()
|
| + auth.add_user(USER, PASSWD, HOME, perm='elr')
|
| + self.assert_(auth.has_perm(USER, 'w', self.tempdir) is False)
|
| + auth.override_perm(USER, self.tempdir, perm='w', recursive=True)
|
| + self.assert_(auth.has_perm(USER, 'w', HOME) is False)
|
| + self.assert_(auth.has_perm(USER, 'w', self.tempdir) is True)
|
| + self.assert_(auth.has_perm(USER, 'w', self.tempfile) is True)
|
| + self.assert_(auth.has_perm(USER, 'w', self.subtempdir) is True)
|
| + self.assert_(auth.has_perm(USER, 'w', self.subtempfile) is True)
|
| +
|
| + self.assert_(auth.has_perm(USER, 'w', HOME + '@') is False)
|
| + self.assert_(auth.has_perm(USER, 'w', self.tempdir + '@') is False)
|
| + path = os.path.join(self.tempdir + '@', os.path.basename(self.tempfile))
|
| + self.assert_(auth.has_perm(USER, 'w', path) is False)
|
| + # test case-sensitiveness
|
| + if (os.name in ('nt', 'ce')) or (sys.platform == 'cygwin'):
|
| + self.assert_(auth.has_perm(USER, 'w', self.tempdir.upper()) is True)
|
| +
|
| + def test_override_perm_not_recursive_paths(self):
|
| + auth = ftpserver.DummyAuthorizer()
|
| + auth.add_user(USER, PASSWD, HOME, perm='elr')
|
| + self.assert_(auth.has_perm(USER, 'w', self.tempdir) is False)
|
| + auth.override_perm(USER, self.tempdir, perm='w')
|
| + self.assert_(auth.has_perm(USER, 'w', HOME) is False)
|
| + self.assert_(auth.has_perm(USER, 'w', self.tempdir) is True)
|
| + self.assert_(auth.has_perm(USER, 'w', self.tempfile) is True)
|
| + self.assert_(auth.has_perm(USER, 'w', self.subtempdir) is False)
|
| + self.assert_(auth.has_perm(USER, 'w', self.subtempfile) is False)
|
| +
|
| + self.assert_(auth.has_perm(USER, 'w', HOME + '@') is False)
|
| + self.assert_(auth.has_perm(USER, 'w', self.tempdir + '@') is False)
|
| + path = os.path.join(self.tempdir + '@', os.path.basename(self.tempfile))
|
| + self.assert_(auth.has_perm(USER, 'w', path) is False)
|
| + # test case-sensitiveness
|
| + if (os.name in ('nt', 'ce')) or (sys.platform == 'cygwin'):
|
| + self.assert_(auth.has_perm(USER, 'w', self.tempdir.upper()) is True)
|
| +
|
| +
|
| +class TestCallLater(unittest.TestCase):
|
| + """Tests for CallLater class."""
|
| +
|
| + def setUp(self):
|
| + for task in ftpserver._tasks:
|
| + if not task.cancelled:
|
| + task.cancel()
|
| + del ftpserver._tasks[:]
|
| +
|
| + def scheduler(self, timeout=0.01, count=100):
|
| + while ftpserver._tasks and count > 0:
|
| + ftpserver._scheduler()
|
| + count -= 1
|
| + time.sleep(timeout)
|
| +
|
| + def test_interface(self):
|
| + fun = lambda: 0
|
| + self.assertRaises(AssertionError, ftpserver.CallLater, -1, fun)
|
| + x = ftpserver.CallLater(3, fun)
|
| + self.assertRaises(AssertionError, x.delay, -1)
|
| + self.assert_(x.cancelled is False)
|
| + x.cancel()
|
| + self.assert_(x.cancelled is True)
|
| + self.assertRaises(AssertionError, x.call)
|
| + self.assertRaises(AssertionError, x.reset)
|
| + self.assertRaises(AssertionError, x.delay, 2)
|
| + self.assertRaises(AssertionError, x.cancel)
|
| +
|
| + def test_order(self):
|
| + l = []
|
| + fun = lambda x: l.append(x)
|
| + for x in [0.05, 0.04, 0.03, 0.02, 0.01]:
|
| + ftpserver.CallLater(x, fun, x)
|
| + self.scheduler()
|
| + self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05])
|
| +
|
| + def test_delay(self):
|
| + l = []
|
| + fun = lambda x: l.append(x)
|
| + ftpserver.CallLater(0.01, fun, 0.01).delay(0.07)
|
| + ftpserver.CallLater(0.02, fun, 0.02).delay(0.08)
|
| + ftpserver.CallLater(0.03, fun, 0.03)
|
| + ftpserver.CallLater(0.04, fun, 0.04)
|
| + ftpserver.CallLater(0.05, fun, 0.05)
|
| + ftpserver.CallLater(0.06, fun, 0.06).delay(0.001)
|
| + self.scheduler()
|
| + self.assertEqual(l, [0.06, 0.03, 0.04, 0.05, 0.01, 0.02])
|
| +
|
| + def test_reset(self):
|
| + # will fail on such systems where time.time() does not provide
|
| + # time with a better precision than 1 second.
|
| + l = []
|
| + fun = lambda x: l.append(x)
|
| + ftpserver.CallLater(0.01, fun, 0.01)
|
| + ftpserver.CallLater(0.02, fun, 0.02)
|
| + ftpserver.CallLater(0.03, fun, 0.03)
|
| + x = ftpserver.CallLater(0.04, fun, 0.04)
|
| + ftpserver.CallLater(0.05, fun, 0.05)
|
| + time.sleep(0.1)
|
| + x.reset()
|
| + self.scheduler()
|
| + self.assertEqual(l, [0.01, 0.02, 0.03, 0.05, 0.04])
|
| +
|
| + def test_cancel(self):
|
| + l = []
|
| + fun = lambda x: l.append(x)
|
| + ftpserver.CallLater(0.01, fun, 0.01).cancel()
|
| + ftpserver.CallLater(0.02, fun, 0.02)
|
| + ftpserver.CallLater(0.03, fun, 0.03)
|
| + ftpserver.CallLater(0.04, fun, 0.04)
|
| + ftpserver.CallLater(0.05, fun, 0.05).cancel()
|
| + self.scheduler()
|
| + self.assertEqual(l, [0.02, 0.03, 0.04])
|
| +
|
| +
|
| +class TestFtpAuthentication(unittest.TestCase):
|
| + "test: USER, PASS, REIN."
|
| +
|
| + def setUp(self):
|
| + self.server = FTPd()
|
| + self.server.start()
|
| + self.client = ftplib.FTP()
|
| + self.client.connect(self.server.host, self.server.port)
|
| + self.f1 = open(TESTFN, 'w+b')
|
| + self.f2 = open(TESTFN2, 'w+b')
|
| +
|
| + def tearDown(self):
|
| + self.client.close()
|
| + self.server.stop()
|
| + if not self.f1.closed:
|
| + self.f1.close()
|
| + if not self.f2.closed:
|
| + self.f2.close()
|
| + os.remove(TESTFN)
|
| + os.remove(TESTFN2)
|
| +
|
| + def test_auth_ok(self):
|
| + self.client.login(user=USER, passwd=PASSWD)
|
| +
|
| + def test_anon_auth(self):
|
| + self.client.login(user='anonymous', passwd='anon@')
|
| + self.client.login(user='AnonYmoUs', passwd='anon@')
|
| + self.client.login(user='anonymous', passwd='')
|
| +
|
| + # Commented after delayed response on wrong credentials has been
|
| + # introduced because tests take too much to complete.
|
| +
|
| +## def test_auth_failed(self):
|
| +## self.assertRaises(ftplib.error_perm, self.client.login, USER, 'wrong')
|
| +## self.assertRaises(ftplib.error_perm, self.client.login, 'wrong', PASSWD)
|
| +## self.assertRaises(ftplib.error_perm, self.client.login, 'wrong', 'wrong')
|
| +
|
| +## def test_max_auth(self):
|
| +## self.assertRaises(ftplib.error_perm, self.client.login, USER, 'wrong')
|
| +## self.assertRaises(ftplib.error_perm, self.client.login, USER, 'wrong')
|
| +## self.assertRaises(ftplib.error_perm, self.client.login, USER, 'wrong')
|
| +## # If authentication fails for 3 times ftpd disconnects the
|
| +## # client. We can check if that happens by using self.client.sendcmd()
|
| +## # on the 'dead' socket object. If socket object is really
|
| +## # closed it should be raised a socket.error exception (Windows)
|
| +## # or a EOFError exception (Linux).
|
| +## self.assertRaises((socket.error, EOFError), self.client.sendcmd, '')
|
| +
|
| + def test_rein(self):
|
| + self.client.login(user=USER, passwd=PASSWD)
|
| + self.client.sendcmd('rein')
|
| + # user not authenticated, error response expected
|
| + self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pwd')
|
| + # by logging-in again we should be able to execute a
|
| + # file-system command
|
| + self.client.login(user=USER, passwd=PASSWD)
|
| + self.client.sendcmd('pwd')
|
| +
|
| + def test_rein_during_transfer(self):
|
| + self.client.login(user=USER, passwd=PASSWD)
|
| + data = 'abcde12345' * 100000
|
| + self.f1.write(data)
|
| + self.f1.close()
|
| +
|
| + self.client.voidcmd('TYPE I')
|
| + conn = self.client.transfercmd('retr ' + TESTFN)
|
| + rein_sent = 0
|
| + while 1:
|
| + chunk = conn.recv(8192)
|
| + if not chunk:
|
| + break
|
| + self.f2.write(chunk)
|
| + if not rein_sent:
|
| + rein_sent = 1
|
| + # flush account, error response expected
|
| + self.client.sendcmd('rein')
|
| + self.assertRaises(ftplib.error_perm, self.client.dir)
|
| +
|
| + # a 226 response is expected once tranfer finishes
|
| + self.assertEqual(self.client.voidresp()[:3], '226')
|
| + # account is still flushed, error response is still expected
|
| + self.assertRaises(ftplib.error_perm, self.client.sendcmd,
|
| + 'size ' + TESTFN)
|
| + # by logging-in again we should be able to execute a
|
| + # filesystem command
|
| + self.client.login(user=USER, passwd=PASSWD)
|
| + self.client.sendcmd('pwd')
|
| + self.f2.seek(0)
|
| + self.assertEqual(hash(data), hash (self.f2.read()))
|
| +
|
| + def test_user(self):
|
| + # Test USER while already authenticated and no transfer
|
| + # is in progress.
|
| + self.client.login(user=USER, passwd=PASSWD)
|
| + self.client.sendcmd('user ' + USER) # authentication flushed
|
| + self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pwd')
|
| + self.client.sendcmd('pass ' + PASSWD)
|
| + self.client.sendcmd('pwd')
|
| +
|
| + def test_user_on_transfer(self):
|
| + # Test USER while already authenticated and a transfer is
|
| + # in progress.
|
| + self.client.login(user=USER, passwd=PASSWD)
|
| + data = 'abcde12345' * 100000
|
| + self.f1.write(data)
|
| + self.f1.close()
|
| +
|
| + self.client.voidcmd('TYPE I')
|
| + conn = self.client.transfercmd('retr ' + TESTFN)
|
| + rein_sent = 0
|
| + while 1:
|
| + chunk = conn.recv(8192)
|
| + if not chunk:
|
| + break
|
| + self.f2.write(chunk)
|
| + # stop transfer while it isn't finished yet
|
| + if not rein_sent:
|
| + rein_sent = 1
|
| + # flush account, expect an error response
|
| + self.client.sendcmd('user ' + USER)
|
| + self.assertRaises(ftplib.error_perm, self.client.dir)
|
| +
|
| + # a 226 response is expected once tranfer finishes
|
| + self.assertEqual(self.client.voidresp()[:3], '226')
|
| + # account is still flushed, error response is still expected
|
| + self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pwd')
|
| + # by logging-in again we should be able to execute a
|
| + # filesystem command
|
| + self.client.sendcmd('pass ' + PASSWD)
|
| + self.client.sendcmd('pwd')
|
| + self.f2.seek(0)
|
| + self.assertEqual(hash(data), hash (self.f2.read()))
|
| +
|
| +
|
| +class TestFtpDummyCmds(unittest.TestCase):
|
| + "test: TYPE, STRU, MODE, NOOP, SYST, ALLO, HELP"
|
| +
|
| + def setUp(self):
|
| + self.server = FTPd()
|
| + self.server.start()
|
| + self.client = ftplib.FTP()
|
| + self.client.connect(self.server.host, self.server.port)
|
| + self.client.login(USER, PASSWD)
|
| +
|
| + def tearDown(self):
|
| + self.client.close()
|
| + self.server.stop()
|
| +
|
| + def test_type(self):
|
| + self.client.sendcmd('type a')
|
| + self.client.sendcmd('type i')
|
| + self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'type ?!?')
|
| +
|
| + def test_stru(self):
|
| + self.client.sendcmd('stru f')
|
| + self.client.sendcmd('stru F')
|
| + self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stru ?!?')
|
| +
|
| + def test_mode(self):
|
| + self.client.sendcmd('mode s')
|
| + self.client.sendcmd('mode S')
|
| + self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'mode ?!?')
|
| +
|
| + def test_noop(self):
|
| + self.client.sendcmd('noop')
|
| +
|
| + def test_syst(self):
|
| + self.client.sendcmd('syst')
|
| +
|
| + def test_allo(self):
|
| + self.client.sendcmd('allo x')
|
| +
|
| + def test_quit(self):
|
| + self.client.sendcmd('quit')
|
| +
|
| + def test_help(self):
|
| + self.client.sendcmd('help')
|
| + cmd = random.choice(ftpserver.proto_cmds.keys())
|
| + self.client.sendcmd('help %s' %cmd)
|
| + self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'help ?!?')
|
| +
|
| + def test_rest(self):
|
| + # test error conditions only;
|
| + # restored data-transfer is tested later
|
| + self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'rest')
|
| + self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'rest str')
|
| + self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'rest -1')
|
| +
|
| + def test_opts_feat(self):
|
| + self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'opts mlst bad_fact')
|
| + self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'opts mlst type ;')
|
| + self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'opts not_mlst')
|
| + # utility function which used for extracting the MLST "facts"
|
| + # string from the FEAT response
|
| + def mlst():
|
| + resp = self.client.sendcmd('feat')
|
| + return re.search(r'^\s*MLST\s+(\S+)$', resp, re.MULTILINE).group(1)
|
| + # we rely on "type", "perm", "size", and "modify" facts which
|
| + # are those available on all platforms
|
| + self.failUnless('type*;perm*;size*;modify*;' in mlst())
|
| + self.assertEqual(self.client.sendcmd('opts mlst type;'), '200 MLST OPTS type;')
|
| + self.assertEqual(self.client.sendcmd('opts mLSt TypE;'), '200 MLST OPTS type;')
|
| + self.failUnless('type*;perm;size;modify;' in mlst())
|
| +
|
| + self.assertEqual(self.client.sendcmd('opts mlst'), '200 MLST OPTS ')
|
| + self.failUnless(not '*' in mlst())
|
| +
|
| + self.assertEqual(self.client.sendcmd('opts mlst fish;cakes;'), '200 MLST OPTS ')
|
| + self.failUnless(not '*' in mlst())
|
| + self.assertEqual(self.client.sendcmd('opts mlst fish;cakes;type;'), \
|
| + '200 MLST OPTS type;')
|
| + self.failUnless('type*;perm;size;modify;' in mlst())
|
| +
|
| +
|
| +class TestFtpCmdsSemantic(unittest.TestCase):
|
| +
|
| + arg_cmds = ('allo','appe','dele','eprt','mdtm','mode','mkd','opts','port',
|
| + 'rest','retr','rmd','rnfr','rnto','size', 'stor', 'stru','type',
|
| + 'user','xmkd','xrmd')
|
| +
|
| + def setUp(self):
|
| + self.server = FTPd()
|
| + self.server.start()
|
| + self.client = ftplib.FTP()
|
| + self.client.connect(self.server.host, self.server.port)
|
| + self.client.login(USER, PASSWD)
|
| +
|
| + def tearDown(self):
|
| + self.client.close()
|
| + self.server.stop()
|
| +
|
| + def test_arg_cmds(self):
|
| + # test commands requiring an argument
|
| + expected = "501 Syntax error: command needs an argument."
|
| + for cmd in self.arg_cmds:
|
| + self.client.putcmd(cmd)
|
| + resp = self.client.getmultiline()
|
| + self.assertEqual(resp, expected)
|
| +
|
| + def test_no_arg_cmds(self):
|
| + # test commands accepting no arguments
|
| + expected = "501 Syntax error: command does not accept arguments."
|
| + for cmd in ('abor','cdup','feat','noop','pasv','pwd','quit','rein',
|
| + 'syst','xcup','xpwd'):
|
| + self.client.putcmd(cmd + ' arg')
|
| + resp = self.client.getmultiline()
|
| + self.assertEqual(resp, expected)
|
| +
|
| + def test_auth_cmds(self):
|
| + # test those commands requiring client to be authenticated
|
| + expected = "530 Log in with USER and PASS first."
|
| + self.client.sendcmd('rein')
|
| + for cmd in ftpserver.proto_cmds:
|
| + cmd = cmd.lower()
|
| + if cmd in ('feat','help','noop','user','pass','stat','syst','quit'):
|
| + continue
|
| + if cmd in self.arg_cmds:
|
| + cmd = cmd + ' arg'
|
| + self.client.putcmd(cmd)
|
| + resp = self.client.getmultiline()
|
| + self.assertEqual(resp, expected)
|
| +
|
| + def test_no_auth_cmds(self):
|
| + # test those commands that do not require client to be authenticated
|
| + self.client.sendcmd('rein')
|
| + for cmd in ('feat','help','noop','stat','syst'):
|
| + self.client.sendcmd(cmd)
|
| + # STAT provided with an argument is equal to LIST hence not allowed
|
| + # if not authenticated
|
| + self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stat /')
|
| + self.client.sendcmd('quit')
|
| +
|
| +
|
| +class TestFtpFsOperations(unittest.TestCase):
|
| + "test: PWD, CWD, CDUP, SIZE, RNFR, RNTO, DELE, MKD, RMD, MDTM, STAT"
|
| +
|
| + def setUp(self):
|
| + self.server = FTPd()
|
| + self.server.start()
|
| + self.client = ftplib.FTP()
|
| + self.client.connect(self.server.host, self.server.port)
|
| + self.client.login(USER, PASSWD)
|
| + self.tempfile = os.path.basename(open(TESTFN, 'w+b').name)
|
| + self.tempdir = os.path.basename(tempfile.mktemp(dir=HOME))
|
| + os.mkdir(self.tempdir)
|
| +
|
| + def tearDown(self):
|
| + self.client.close()
|
| + self.server.stop()
|
| + if os.path.exists(self.tempfile):
|
| + os.remove(self.tempfile)
|
| + if os.path.exists(self.tempdir):
|
| + os.rmdir(self.tempdir)
|
| +
|
| + def test_cwd(self):
|
| + self.client.cwd(self.tempdir)
|
| + self.assertEqual(self.client.pwd(), '/' + self.tempdir)
|
| + self.assertRaises(ftplib.error_perm, self.client.cwd, 'subtempdir')
|
| + # cwd provided with no arguments is supposed to move us to the
|
| + # root directory
|
| + self.client.sendcmd('cwd')
|
| + self.assertEqual(self.client.pwd(), '/')
|
| +
|
| + def test_pwd(self):
|
| + self.assertEqual(self.client.pwd(), '/')
|
| + self.client.cwd(self.tempdir)
|
| + self.assertEqual(self.client.pwd(), '/' + self.tempdir)
|
| +
|
| + def test_cdup(self):
|
| + self.client.cwd(self.tempdir)
|
| + self.assertEqual(self.client.pwd(), '/' + self.tempdir)
|
| + self.client.sendcmd('cdup')
|
| + self.assertEqual(self.client.pwd(), '/')
|
| + # make sure we can't escape from root directory
|
| + self.client.sendcmd('cdup')
|
| + self.assertEqual(self.client.pwd(), '/')
|
| +
|
| + def test_mkd(self):
|
| + tempdir = os.path.basename(tempfile.mktemp(dir=HOME))
|
| + self.client.mkd(tempdir)
|
| + # make sure we can't create directories which already exist
|
| + # (probably not really necessary);
|
| + # let's use a try/except statement to avoid leaving behind
|
| + # orphaned temporary directory in the event of a test failure.
|
| + try:
|
| + self.client.mkd(tempdir)
|
| + except ftplib.error_perm:
|
| + os.rmdir(tempdir) # ok
|
| + else:
|
| + self.fail('ftplib.error_perm not raised.')
|
| +
|
| + def test_rmd(self):
|
| + self.client.rmd(self.tempdir)
|
| + self.assertRaises(ftplib.error_perm, self.client.rmd, self.tempfile)
|
| + # make sure we can't remove the root directory
|
| + self.assertRaises(ftplib.error_perm, self.client.rmd, '/')
|
| +
|
| + def test_dele(self):
|
| + self.client.delete(self.tempfile)
|
| + self.assertRaises(ftplib.error_perm, self.client.delete, self.tempdir)
|
| +
|
| + def test_rnfr_rnto(self):
|
| + # rename file
|
| + tempname = os.path.basename(tempfile.mktemp(dir=HOME))
|
| + self.client.rename(self.tempfile, tempname)
|
| + self.client.rename(tempname, self.tempfile)
|
| + # rename dir
|
| + tempname = os.path.basename(tempfile.mktemp(dir=HOME))
|
| + self.client.rename(self.tempdir, tempname)
|
| + self.client.rename(tempname, self.tempdir)
|
| + # rnfr/rnto over non-existing paths
|
| + bogus = os.path.basename(tempfile.mktemp(dir=HOME))
|
| + self.assertRaises(ftplib.error_perm, self.client.rename, bogus, '/x')
|
| + self.assertRaises(ftplib.error_perm, self.client.rename, self.tempfile, '/')
|
| + # make sure we can't rename root directory
|
| + self.assertRaises(ftplib.error_perm, self.client.rename, '/', '/x')
|
| +
|
| + def test_mdtm(self):
|
| + self.client.sendcmd('mdtm ' + self.tempfile)
|
| + # make sure we can't use mdtm against directories
|
| + try:
|
| + self.client.sendcmd('mdtm ' + self.tempdir)
|
| + except ftplib.error_perm, err:
|
| + self.failUnless("not retrievable" in str(err))
|
| + else:
|
| + self.fail('Exception not raised')
|
| +
|
| + def test_size(self):
|
| + self.client.size(self.tempfile)
|
| + # make sure we can't use size against directories
|
| + try:
|
| + self.client.sendcmd('size ' + self.tempdir)
|
| + except ftplib.error_perm, err:
|
| + self.failUnless("not retrievable" in str(err))
|
| + else:
|
| + self.fail('Exception not raised')
|
| +
|
| +
|
| +class TestFtpRetrieveData(unittest.TestCase):
|
| + "test: RETR, REST, LIST, NLST, argumented STAT"
|
| +
|
| + def setUp(self):
|
| + self.server = FTPd()
|
| + self.server.start()
|
| + self.client = ftplib.FTP()
|
| + self.client.connect(self.server.host, self.server.port)
|
| + self.client.login(USER, PASSWD)
|
| + self.f1 = open(TESTFN, 'w+b')
|
| + self.f2 = open(TESTFN2, 'w+b')
|
| +
|
| + def tearDown(self):
|
| + self.client.close()
|
| + self.server.stop()
|
| + if not self.f1.closed:
|
| + self.f1.close()
|
| + if not self.f2.closed:
|
| + self.f2.close()
|
| + os.remove(TESTFN)
|
| + os.remove(TESTFN2)
|
| +
|
| + def test_retr(self):
|
| + data = 'abcde12345' * 100000
|
| + self.f1.write(data)
|
| + self.f1.close()
|
| + self.client.retrbinary("retr " + TESTFN, self.f2.write)
|
| + self.f2.seek(0)
|
| + self.assertEqual(hash(data), hash(self.f2.read()))
|
| +
|
| + def test_restore_on_retr(self):
|
| + data = 'abcde12345' * 100000
|
| + fname_1 = os.path.basename(self.f1.name)
|
| + self.f1.write(data)
|
| + self.f1.close()
|
| +
|
| + # look at ftplib.FTP.retrbinary method to understand this mess
|
| + self.client.voidcmd('TYPE I')
|
| + conn = self.client.transfercmd('retr ' + fname_1)
|
| + chunk = conn.recv(len(data) / 2)
|
| + self.f2.write(chunk)
|
| + conn.close()
|
| + # transfer wasn't finished yet so we expect a 426 response
|
| + self.assertRaises(ftplib.error_temp, self.client.voidresp)
|
| +
|
| + # resuming transfer by using a marker value greater than the
|
| + # file size stored on the server should result in an error
|
| + # on retr (RFC-1123)
|
| + file_size = self.client.size(fname_1)
|
| + self.client.sendcmd('rest %s' %((file_size + 1)))
|
| + self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'retr ' + fname_1)
|
| +
|
| + # test resume
|
| + self.client.sendcmd('rest %s' %len(chunk))
|
| + self.client.retrbinary("retr " + fname_1, self.f2.write)
|
| + self.f2.seek(0)
|
| + self.assertEqual(hash(data), hash (self.f2.read()))
|
| +
|
| + def _test_listing_cmds(self, cmd):
|
| + """Tests common to LIST NLST and MLSD commands."""
|
| + # assume that no argument has the same meaning of "/"
|
| + l1 = l2 = []
|
| + self.client.retrlines(cmd, l1.append)
|
| + self.client.retrlines(cmd + ' /', l2.append)
|
| + self.assertEqual(l1, l2)
|
| + if cmd.lower() != 'mlsd':
|
| + # if pathname is a file one line is expected
|
| + x = []
|
| + self.client.retrlines('%s ' %cmd + TESTFN, x.append)
|
| + self.assertEqual(len(x), 1)
|
| + self.failUnless(''.join(x).endswith(TESTFN))
|
| + # non-existent path, 550 response is expected
|
| + bogus = os.path.basename(tempfile.mktemp(dir=HOME))
|
| + self.assertRaises(ftplib.error_perm, self.client.retrlines,
|
| + '%s ' %cmd + bogus, lambda x: x)
|
| + # for an empty directory we excpect that the data channel is
|
| + # opened anyway and that no data is received
|
| + x = []
|
| + tempdir = os.path.basename(tempfile.mkdtemp(dir=HOME))
|
| + try:
|
| + self.client.retrlines('%s %s' %(cmd, tempdir), x.append)
|
| + self.assertEqual(x, [])
|
| + finally:
|
| + os.rmdir(tempdir)
|
| +
|
| + def test_nlst(self):
|
| + # common tests
|
| + self._test_listing_cmds('nlst')
|
| +
|
| + def test_list(self):
|
| + # common tests
|
| + self._test_listing_cmds('list')
|
| + # known incorrect pathname arguments (e.g. old clients) are
|
| + # expected to be treated as if pathname would be == '/'
|
| + l1 = l2 = l3 = l4 = l5 = []
|
| + self.client.retrlines('list /', l1.append)
|
| + self.client.retrlines('list -a', l2.append)
|
| + self.client.retrlines('list -l', l3.append)
|
| + self.client.retrlines('list -al', l4.append)
|
| + self.client.retrlines('list -la', l5.append)
|
| + tot = (l1, l2, l3, l4, l5)
|
| + for x in range(len(tot) - 1):
|
| + self.assertEqual(tot[x], tot[x+1])
|
| +
|
| + def test_mlst(self):
|
| + # utility function for extracting the line of interest
|
| + mlstline = lambda cmd: self.client.voidcmd(cmd).split('\n')[1]
|
| +
|
| + # the fact set must be preceded by a space
|
| + self.failUnless(mlstline('mlst').startswith(' '))
|
| + # where TVFS is supported, a fully qualified pathname is expected
|
| + self.failUnless(mlstline('mlst ' + TESTFN).endswith('/' + TESTFN))
|
| + self.failUnless(mlstline('mlst').endswith('/'))
|
| + # assume that no argument has the same meaning of "/"
|
| + self.assertEqual(mlstline('mlst'), mlstline('mlst /'))
|
| + # non-existent path
|
| + bogus = os.path.basename(tempfile.mktemp(dir=HOME))
|
| + self.assertRaises(ftplib.error_perm, mlstline, bogus)
|
| + # test file/dir notations
|
| + self.failUnless('type=dir' in mlstline('mlst'))
|
| + self.failUnless('type=file' in mlstline('mlst ' + TESTFN))
|
| + # let's add some tests for OPTS command
|
| + self.client.sendcmd('opts mlst type;')
|
| + self.assertEqual(mlstline('mlst'), ' type=dir; /')
|
| + # where no facts are present, two leading spaces before the
|
| + # pathname are required (RFC-3659)
|
| + self.client.sendcmd('opts mlst')
|
| + self.assertEqual(mlstline('mlst'), ' /')
|
| +
|
| + def test_mlsd(self):
|
| + # common tests
|
| + self._test_listing_cmds('mlsd')
|
| + dir = os.path.basename(tempfile.mkdtemp(dir=HOME))
|
| + try:
|
| + try:
|
| + self.client.retrlines('mlsd ' + TESTFN, lambda x: x)
|
| + except ftplib.error_perm, resp:
|
| + # if path is a file a 501 response code is expected
|
| + self.assertEqual(str(resp)[0:3], "501")
|
| + else:
|
| + self.fail("Exception not raised")
|
| + finally:
|
| + os.rmdir(dir)
|
| +
|
| + def test_stat(self):
|
| + # test STAT provided with argument which is equal to LIST
|
| + self.client.sendcmd('stat /')
|
| + self.client.sendcmd('stat ' + TESTFN)
|
| + self.client.putcmd('stat *')
|
| + resp = self.client.getmultiline()
|
| + self.assertEqual(resp, '550 Globbing not supported.')
|
| + bogus = os.path.basename(tempfile.mktemp(dir=HOME))
|
| + self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stat ' + bogus)
|
| +
|
| +
|
| +class TestFtpAbort(unittest.TestCase):
|
| + "test: ABOR"
|
| +
|
| + def setUp(self):
|
| + self.server = FTPd()
|
| + self.server.start()
|
| + self.client = ftplib.FTP()
|
| + self.client.connect(self.server.host, self.server.port)
|
| + self.client.login(USER, PASSWD)
|
| + self.f1 = open(TESTFN, 'w+b')
|
| + self.f2 = open(TESTFN2, 'w+b')
|
| +
|
| + def tearDown(self):
|
| + self.client.close()
|
| + self.server.stop()
|
| + if not self.f1.closed:
|
| + self.f1.close()
|
| + if not self.f2.closed:
|
| + self.f2.close()
|
| + os.remove(self.f1.name)
|
| + os.remove(self.f2.name)
|
| +
|
| + def test_abor_no_data(self):
|
| + # Case 1: ABOR while no data channel is opened: respond with 225.
|
| + resp = self.client.sendcmd('ABOR')
|
| + self.failUnlessEqual('225 No transfer to abort.', resp)
|
| +
|
| + def test_abor_pasv(self):
|
| + # Case 2: user sends a PASV, a data-channel socket is listening
|
| + # but not connected, and ABOR is sent: close listening data
|
| + # socket, respond with 225.
|
| + self.client.makepasv()
|
| + respcode = self.client.sendcmd('ABOR')[:3]
|
| + self.failUnlessEqual('225', respcode)
|
| +
|
| + def test_abor_port(self):
|
| + # Case 3: data channel opened with PASV or PORT, but ABOR sent
|
| + # before a data transfer has been started: close data channel,
|
| + # respond with 225
|
| + self.client.makeport()
|
| + respcode = self.client.sendcmd('ABOR')[:3]
|
| + self.failUnlessEqual('225', respcode)
|
| +
|
| + def test_abor(self):
|
| + # Case 4: ABOR while a data transfer on DTP channel is in
|
| + # progress: close data channel, respond with 426, respond
|
| + # with 226.
|
| + data = 'abcde12345' * 100000
|
| + self.f1.write(data)
|
| + self.f1.close()
|
| +
|
| + # this ugly loop construct is to simulate an interrupted
|
| + # transfer since ftplib doesn't like running storbinary()
|
| + # in a separate thread
|
| + self.client.voidcmd('TYPE I')
|
| + conn = self.client.transfercmd('retr ' + TESTFN)
|
| + chunk = conn.recv(len(data) / 2)
|
| + # stop transfer while it isn't finished yet
|
| + self.client.putcmd('ABOR')
|
| +
|
| + # transfer isn't finished yet so ftpd should respond with 426
|
| + self.assertRaises(ftplib.error_temp, self.client.voidresp)
|
| +
|
| + # transfer successfully aborted, so should now respond with a 226
|
| + self.failUnlessEqual('226', self.client.voidresp()[:3])
|
| +
|
| + if hasattr(socket, 'MSG_OOB'):
|
| + def test_oob_abor(self):
|
| + # Send ABOR by following the RFC-959 directives of sending
|
| + # Telnet IP/Synch sequence as OOB data.
|
| + # On some systems like FreeBSD this happened to be a problem
|
| + # due to a different SO_OOBINLINE behavior.
|
| + # On some platforms (e.g. Python CE) the test may fail
|
| + # although the MSG_OOB constant is defined.
|
| + self.client.sock.sendall(chr(244), socket.MSG_OOB)
|
| + self.client.sock.sendall(chr(242), socket.MSG_OOB)
|
| + self.client.sock.sendall('abor\r\n')
|
| + self.client.sock.settimeout(1)
|
| + self.assertEqual(self.client.getresp()[:3], '225')
|
| +
|
| +
|
| +class TestFtpStoreData(unittest.TestCase):
|
| + "test: STOR, STOU, APPE, REST"
|
| +
|
| + def setUp(self):
|
| + self.server = FTPd()
|
| + self.server.start()
|
| + self.client = ftplib.FTP()
|
| + self.client.connect(self.server.host, self.server.port)
|
| + self.client.login(USER, PASSWD)
|
| + self.f1 = open(TESTFN, 'w+b')
|
| + self.f2 = open(TESTFN2, 'w+b')
|
| +
|
| + def tearDown(self):
|
| + self.client.close()
|
| + self.server.stop()
|
| + if not self.f1.closed:
|
| + self.f1.close()
|
| + if not self.f2.closed:
|
| + self.f2.close()
|
| + os.remove(TESTFN)
|
| + os.remove(TESTFN2)
|
| +
|
| + def test_stor(self):
|
| + # TESTFN3 is the remote file name
|
| + try:
|
| + data = 'abcde12345' * 100000
|
| + self.f1.write(data)
|
| + self.f1.seek(0)
|
| + self.client.storbinary('stor ' + TESTFN3, self.f1)
|
| + self.client.retrbinary('retr ' + TESTFN3, self.f2.write)
|
| + self.f2.seek(0)
|
| + self.assertEqual(hash(data), hash (self.f2.read()))
|
| + finally:
|
| + # we do not use os.remove because file could be still
|
| + # locked by ftpd thread
|
| + if os.path.exists(TESTFN3):
|
| + self.client.delete(TESTFN3)
|
| +
|
| + def test_stou(self):
|
| + data = 'abcde12345' * 100000
|
| + self.f1.write(data)
|
| + self.f1.seek(0)
|
| +
|
| + self.client.voidcmd('TYPE I')
|
| + # filename comes in as "1xx FILE: <filename>"
|
| + filename = self.client.sendcmd('stou').split('FILE: ')[1]
|
| + try:
|
| + sock = self.client.makeport()
|
| + conn, sockaddr = sock.accept()
|
| + while 1:
|
| + buf = self.f1.read(8192)
|
| + if not buf:
|
| + break
|
| + conn.sendall(buf)
|
| + conn.close()
|
| + # transfer finished, a 226 response is expected
|
| + self.client.voidresp()
|
| + self.client.retrbinary('retr ' + filename, self.f2.write)
|
| + self.f2.seek(0)
|
| + self.assertEqual(hash(data), hash (self.f2.read()))
|
| + finally:
|
| + # we do not use os.remove because file could be
|
| + # still locked by ftpd thread
|
| + if os.path.exists(filename):
|
| + self.client.delete(filename)
|
| +
|
| + def test_stou_rest(self):
|
| + # watch for STOU preceded by REST, which makes no sense.
|
| + self.client.sendcmd('rest 10')
|
| + self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'stou')
|
| +
|
| + def test_appe(self):
|
| + # TESTFN3 is the remote file name
|
| + try:
|
| + data1 = 'abcde12345' * 100000
|
| + self.f1.write(data1)
|
| + self.f1.seek(0)
|
| + self.client.storbinary('stor ' + TESTFN3, self.f1)
|
| +
|
| + data2 = 'fghil67890' * 100000
|
| + self.f1.write(data2)
|
| + self.f1.seek(self.client.size(TESTFN3))
|
| + self.client.storbinary('appe ' + TESTFN3, self.f1)
|
| +
|
| + self.client.retrbinary("retr " + TESTFN3, self.f2.write)
|
| + self.f2.seek(0)
|
| + self.assertEqual(hash(data1 + data2), hash (self.f2.read()))
|
| + finally:
|
| + # we do not use os.remove because file could be still
|
| + # locked by ftpd thread
|
| + if os.path.exists(TESTFN3):
|
| + self.client.delete(TESTFN3)
|
| +
|
| + def test_appe_rest(self):
|
| + # watch for APPE preceded by REST, which makes no sense.
|
| + self.client.sendcmd('rest 10')
|
| + self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'appe x')
|
| +
|
| + def test_rest_on_stor(self):
|
| + # TESTFN3 is the remote file name
|
| + data = 'abcde12345' * 100000
|
| + self.f1.write(data)
|
| + self.f1.seek(0)
|
| +
|
| + self.client.voidcmd('TYPE I')
|
| + conn = self.client.transfercmd('stor ' + TESTFN3)
|
| + bytes_sent = 0
|
| + while 1:
|
| + chunk = self.f1.read(8192)
|
| + conn.sendall(chunk)
|
| + bytes_sent += len(chunk)
|
| + # stop transfer while it isn't finished yet
|
| + if bytes_sent >= 524288: # 2^19
|
| + break
|
| + elif not chunk:
|
| + break
|
| + conn.close()
|
| + # transfer wasn't finished yet so we expect a 426 response
|
| + self.client.voidresp()
|
| +
|
| + # resuming transfer by using a marker value greater than the
|
| + # file size stored on the server should result in an error
|
| + # on stor
|
| + file_size = self.client.size(TESTFN3)
|
| + self.assertEqual(file_size, bytes_sent)
|
| + self.client.sendcmd('rest %s' %((file_size + 1)))
|
| + self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stor ' + TESTFN3)
|
| +
|
| + self.client.sendcmd('rest %s' %bytes_sent)
|
| + self.client.storbinary('stor ' + TESTFN3, self.f1)
|
| +
|
| + self.client.retrbinary('retr ' + TESTFN3, self.f2.write)
|
| + self.f1.seek(0)
|
| + self.f2.seek(0)
|
| + self.assertEqual(hash(self.f1.read()), hash(self.f2.read()))
|
| + self.client.delete(TESTFN3)
|
| +
|
| +
|
| +class TestTimeouts(unittest.TestCase):
|
| + """Test idle-timeout capabilities of control and data channels.
|
| + Some tests may fail on slow machines.
|
| + """
|
| +
|
| + def _setUp(self, idle_timeout=300, data_timeout=300, pasv_timeout=30):
|
| + self.server = FTPd()
|
| + self.server.handler.timeout = idle_timeout
|
| + self.server.handler.dtp_handler.timeout = data_timeout
|
| + self.server.handler.passive_dtp.timeout = pasv_timeout
|
| + self.server.start()
|
| + self.client = ftplib.FTP()
|
| + self.client.connect(self.server.host, self.server.port)
|
| + self.client.login(USER, PASSWD)
|
| +
|
| + def tearDown(self):
|
| + self.client.close()
|
| + self.server.handler.timeout = 300
|
| + self.server.handler.dtp_handler.timeout = 300
|
| + self.server.handler.passive_dtp.timeout = 30
|
| + self.server.stop()
|
| +
|
| + def test_idle_timeout(self):
|
| + # Test control channel timeout. The client which does not send
|
| + # any command within the time specified in FTPHandler.timeout is
|
| + # supposed to be kicked off.
|
| + self._setUp(idle_timeout=0.1)
|
| + # fail if no msg is received within 1 second
|
| + self.client.sock.settimeout(1)
|
| + data = self.client.sock.recv(1024)
|
| + self.assertEqual(data, "421 Control connection timed out.\r\n")
|
| + # ensure client has been kicked off
|
| + self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop')
|
| +
|
| + def test_data_timeout(self):
|
| + # Test data channel timeout. The client which does not send
|
| + # or receive any data within the time specified in
|
| + # DTPHandler.timeout is supposed to be kicked off.
|
| + self._setUp(data_timeout=0.1)
|
| + addr = self.client.makepasv()
|
| + s = socket.socket()
|
| + s.connect(addr)
|
| + # fail if no msg is received within 1 second
|
| + self.client.sock.settimeout(1)
|
| + data = self.client.sock.recv(1024)
|
| + self.assertEqual(data, "421 Data connection timed out.\r\n")
|
| + # ensure client has been kicked off
|
| + self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop')
|
| +
|
| + def test_idle_data_timeout1(self):
|
| + # Tests that the control connection timeout is suspended while
|
| + # the data channel is opened
|
| + self._setUp(idle_timeout=0.1, data_timeout=0.2)
|
| + addr = self.client.makepasv()
|
| + s = socket.socket()
|
| + s.connect(addr)
|
| + # fail if no msg is received within 1 second
|
| + self.client.sock.settimeout(1)
|
| + data = self.client.sock.recv(1024)
|
| + self.assertEqual(data, "421 Data connection timed out.\r\n")
|
| + # ensure client has been kicked off
|
| + self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop')
|
| +
|
| + def test_idle_data_timeout2(self):
|
| + # Tests that the control connection timeout is restarted after
|
| + # data channel has been closed
|
| + self._setUp(idle_timeout=0.1, data_timeout=0.2)
|
| + addr = self.client.makepasv()
|
| + s = socket.socket()
|
| + s.connect(addr)
|
| + # close data channel
|
| + self.client.sendcmd('abor')
|
| + self.client.sock.settimeout(1)
|
| + data = self.client.sock.recv(1024)
|
| + self.assertEqual(data, "421 Control connection timed out.\r\n")
|
| + # ensure client has been kicked off
|
| + self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop')
|
| +
|
| + def test_pasv_timeout(self):
|
| + # Test pasv data channel timeout. The client which does not connect
|
| + # to the listening data socket within the time specified in
|
| + # PassiveDTP.timeout is supposed to receive a 421 response.
|
| + self._setUp(pasv_timeout=0.1)
|
| + self.client.makepasv()
|
| + # fail if no msg is received within 1 second
|
| + self.client.sock.settimeout(1)
|
| + data = self.client.sock.recv(1024)
|
| + self.assertEqual(data, "421 Passive data channel timed out.\r\n")
|
| + # client is not expected to be kicked off
|
| + self.client.sendcmd('noop')
|
| +
|
| +
|
| +class TestMaxConnections(unittest.TestCase):
|
| + """Test maximum connections (FTPServer.max_cons)."""
|
| +
|
| + def setUp(self):
|
| + self.server = FTPd()
|
| + self.server.server.max_cons = 3
|
| + self.server.start()
|
| +
|
| + def tearDown(self):
|
| + self.server.server.max_cons = 0
|
| + self.server.stop()
|
| +
|
| + def test_max_connections(self):
|
| + c1 = ftplib.FTP()
|
| + c2 = ftplib.FTP()
|
| + c3 = ftplib.FTP()
|
| + try:
|
| + c1.connect(self.server.host, self.server.port)
|
| + c2.connect(self.server.host, self.server.port)
|
| + self.assertRaises(ftplib.error_temp, c3.connect, self.server.host,
|
| + self.server.port)
|
| + # with passive data channel established
|
| + c2.close()
|
| + c1.login(USER, PASSWD)
|
| + c1.makepasv()
|
| + self.assertRaises(ftplib.error_temp, c2.connect, self.server.host,
|
| + self.server.port)
|
| + # with passive data socket waiting for connection
|
| + c1.login(USER, PASSWD)
|
| + c1.sendcmd('pasv')
|
| + self.assertRaises(ftplib.error_temp, c2.connect, self.server.host,
|
| + self.server.port)
|
| + # with active data channel established
|
| + c1.login(USER, PASSWD)
|
| + c1.makeport()
|
| + self.assertRaises(ftplib.error_temp, c2.connect, self.server.host,
|
| + self.server.port)
|
| + finally:
|
| + c1.close()
|
| + c2.close()
|
| + c3.close()
|
| +
|
| +
|
| +class _TestNetworkProtocols(unittest.TestCase):
|
| + """Test PASV, EPSV, PORT and EPRT commands.
|
| +
|
| + Do not use this class directly. Let TestIPv4Environment and
|
| + TestIPv6Environment classes use it instead.
|
| + """
|
| + HOST = HOST
|
| +
|
| + def setUp(self):
|
| + self.server = FTPd(self.HOST)
|
| + self.server.start()
|
| + self.client = ftplib.FTP()
|
| + self.client.connect(self.server.host, self.server.port)
|
| + self.client.login(USER, PASSWD)
|
| + if self.client.af == socket.AF_INET:
|
| + self.proto = "1"
|
| + self.other_proto = "2"
|
| + else:
|
| + self.proto = "2"
|
| + self.other_proto = "1"
|
| +
|
| + def tearDown(self):
|
| + self.client.close()
|
| + self.server.stop()
|
| +
|
| + def cmdresp(self, cmd):
|
| + """Send a command and return response, also if the command failed."""
|
| + try:
|
| + return self.client.sendcmd(cmd)
|
| + except ftplib.Error, err:
|
| + return str(err)
|
| +
|
| + def test_eprt(self):
|
| + # test wrong proto
|
| + try:
|
| + self.client.sendcmd('eprt |%s|%s|%s|' %(self.other_proto,
|
| + self.server.host, self.server.port))
|
| + except ftplib.error_perm, err:
|
| + self.assertEqual(str(err)[0:3], "522")
|
| + else:
|
| + self.fail("Exception not raised")
|
| +
|
| + # test bad args
|
| + msg = "501 Invalid EPRT format."
|
| + # len('|') > 3
|
| + self.assertEqual(self.cmdresp('eprt ||||'), msg)
|
| + # len('|') < 3
|
| + self.assertEqual(self.cmdresp('eprt ||'), msg)
|
| + # port > 65535
|
| + self.assertEqual(self.cmdresp('eprt |%s|%s|65536|' %(self.proto,
|
| + self.HOST)), msg)
|
| + # port < 0
|
| + self.assertEqual(self.cmdresp('eprt |%s|%s|-1|' %(self.proto,
|
| + self.HOST)), msg)
|
| + # port < 1024
|
| + self.assertEqual(self.cmdresp('eprt |%s|%s|222|' %(self.proto,
|
| + self.HOST)), "501 Can't connect over a privileged port.")
|
| +
|
| + # test connection
|
| + sock = socket.socket(self.client.af, socket.SOCK_STREAM)
|
| + sock.bind((self.client.sock.getsockname()[0], 0))
|
| + sock.listen(5)
|
| + sock.settimeout(2)
|
| + ip, port = sock.getsockname()[:2]
|
| + self.client.sendcmd('eprt |%s|%s|%s|' %(self.proto, ip, port))
|
| + try:
|
| + try:
|
| + sock.accept()
|
| + except socket.timeout:
|
| + self.fail("Server didn't connect to passive socket")
|
| + finally:
|
| + sock.close()
|
| +
|
| + def test_epsv(self):
|
| + # test wrong proto
|
| + try:
|
| + self.client.sendcmd('epsv ' + self.other_proto)
|
| + except ftplib.error_perm, err:
|
| + self.assertEqual(str(err)[0:3], "522")
|
| + else:
|
| + self.fail("Exception not raised")
|
| +
|
| + # test connection
|
| + for cmd in ('EPSV', 'EPSV ' + self.proto):
|
| + host, port = ftplib.parse229(self.client.sendcmd(cmd),
|
| + self.client.sock.getpeername())
|
| + s = socket.socket(self.client.af, socket.SOCK_STREAM)
|
| + s.settimeout(2)
|
| + try:
|
| + s.connect((host, port))
|
| + self.client.sendcmd('abor')
|
| + finally:
|
| + s.close()
|
| +
|
| + def test_epsv_all(self):
|
| + self.client.sendcmd('epsv all')
|
| + self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pasv')
|
| + self.assertRaises(ftplib.error_perm, self.client.sendport, self.HOST, 2000)
|
| + self.assertRaises(ftplib.error_perm, self.client.sendcmd,
|
| + 'eprt |%s|%s|%s|' %(self.proto, self.HOST, 2000))
|
| +
|
| +
|
| +class TestIPv4Environment(_TestNetworkProtocols):
|
| + """Test PASV, EPSV, PORT and EPRT commands.
|
| +
|
| + Runs tests contained in _TestNetworkProtocols class by using IPv4
|
| + plus some additional specific tests.
|
| + """
|
| + HOST = '127.0.0.1'
|
| +
|
| + def test_port_v4(self):
|
| + # test connection
|
| + self.client.makeport()
|
| + self.client.sendcmd('abor')
|
| + # test bad arguments
|
| + ae = self.assertEqual
|
| + msg = "501 Invalid PORT format."
|
| + ae(self.cmdresp('port 127,0,0,1,1.1'), msg) # sep != ','
|
| + ae(self.cmdresp('port X,0,0,1,1,1'), msg) # value != int
|
| + ae(self.cmdresp('port 127,0,0,1,1,1,1'), msg) # len(args) > 6
|
| + ae(self.cmdresp('port 127,0,0,1'), msg) # len(args) < 6
|
| + ae(self.cmdresp('port 256,0,0,1,1,1'), msg) # oct > 255
|
| + ae(self.cmdresp('port 127,0,0,1,256,1'), msg) # port > 65535
|
| + ae(self.cmdresp('port 127,0,0,1,-1,0'), msg) # port < 0
|
| + msg = "501 Can't connect over a privileged port."
|
| + ae(self.cmdresp('port %s,1,1' %self.HOST.replace('.',',')),msg) # port < 1024
|
| + if "1.2.3.4" != self.HOST:
|
| + msg = "501 Can't connect to a foreign address."
|
| + ae(self.cmdresp('port 1,2,3,4,4,4'), msg)
|
| +
|
| + def test_eprt_v4(self):
|
| + self.assertEqual(self.cmdresp('eprt |1|0.10.10.10|2222|'),
|
| + "501 Can't connect to a foreign address.")
|
| +
|
| + def test_pasv_v4(self):
|
| + host, port = ftplib.parse227(self.client.sendcmd('pasv'))
|
| + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
| + s.settimeout(2)
|
| + try:
|
| + s.connect((host, port))
|
| + finally:
|
| + s.close()
|
| +
|
| +
|
| +class TestIPv6Environment(_TestNetworkProtocols):
|
| + """Test PASV, EPSV, PORT and EPRT commands.
|
| +
|
| + Runs tests contained in _TestNetworkProtocols class by using IPv6
|
| + plus some additional specific tests.
|
| + """
|
| + HOST = '::1'
|
| +
|
| + def test_port_v6(self):
|
| + # 425 expected
|
| + self.assertRaises(ftplib.error_temp, self.client.sendport,
|
| + self.server.host, self.server.port)
|
| +
|
| + def test_pasv_v6(self):
|
| + # 425 expected
|
| + self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'pasv')
|
| +
|
| + def test_eprt_v6(self):
|
| + self.assertEqual(self.cmdresp('eprt |2|::xxx|2222|'),
|
| + "501 Can't connect to a foreign address.")
|
| +
|
| +
|
| +class FTPd(threading.Thread):
|
| + """A threaded FTP server used for running tests."""
|
| +
|
| + def __init__(self, host=HOST, port=0, verbose=False):
|
| + threading.Thread.__init__(self)
|
| + self.active = False
|
| + if not verbose:
|
| + ftpserver.log = ftpserver.logline = lambda x: x
|
| + self.authorizer = ftpserver.DummyAuthorizer()
|
| + self.authorizer.add_user(USER, PASSWD, HOME, perm='elradfmw') # full perms
|
| + self.authorizer.add_anonymous(HOME)
|
| + self.handler = ftpserver.FTPHandler
|
| + self.handler.authorizer = self.authorizer
|
| + self.server = ftpserver.FTPServer((host, port), self.handler)
|
| + self.host, self.port = self.server.socket.getsockname()[:2]
|
| + self.active_lock = threading.Lock()
|
| +
|
| + def start(self):
|
| + assert not self.active
|
| + self.__flag = threading.Event()
|
| + threading.Thread.start(self)
|
| + self.__flag.wait()
|
| +
|
| + def run(self):
|
| + self.active = True
|
| + self.__flag.set()
|
| + while self.active:
|
| + self.active_lock.acquire()
|
| + self.server.serve_forever(timeout=0.001, count=1)
|
| + self.active_lock.release()
|
| + self.server.close_all(ignore_all=True)
|
| +
|
| + def stop(self):
|
| + assert self.active
|
| + self.active = False
|
| + self.join()
|
| +
|
| +
|
| +def remove_test_files():
|
| + "Convenience function for removing temporary test files"
|
| + for file in [TESTFN, TESTFN2, TESTFN3]:
|
| + try:
|
| + os.remove(file)
|
| + except os.error:
|
| + pass
|
| +
|
| +def test_main(tests=None):
|
| + test_suite = unittest.TestSuite()
|
| + if tests is None:
|
| + tests = [
|
| + TestAbstractedFS,
|
| + TestDummyAuthorizer,
|
| + TestCallLater,
|
| + TestFtpAuthentication,
|
| + TestFtpDummyCmds,
|
| + TestFtpCmdsSemantic,
|
| + TestFtpFsOperations,
|
| + TestFtpRetrieveData,
|
| + TestFtpAbort,
|
| + TestFtpStoreData,
|
| + TestTimeouts,
|
| + TestMaxConnections
|
| + ]
|
| + if SUPPORTS_IPV4:
|
| + tests.append(TestIPv4Environment)
|
| + if SUPPORTS_IPV6:
|
| + tests.append(TestIPv6Environment)
|
| +
|
| + for test in tests:
|
| + test_suite.addTest(unittest.makeSuite(test))
|
| + remove_test_files()
|
| + unittest.TextTestRunner(verbosity=2).run(test_suite)
|
| + remove_test_files()
|
| +
|
| +
|
| +if __name__ == '__main__':
|
| + test_main()
|
|
|