Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(190)

Unified Diff: third_party/pyftpdlib/test/test_ftpd.py

Issue 16429: python based ftp server (Closed) Base URL: http://src.chromium.org/svn/trunk/src/
Patch Set: '' Created 12 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « third_party/pyftpdlib/setup.py ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: third_party/pyftpdlib/test/test_ftpd.py
===================================================================
--- third_party/pyftpdlib/test/test_ftpd.py (revision 0)
+++ third_party/pyftpdlib/test/test_ftpd.py (revision 0)
@@ -0,0 +1,1623 @@
+#!/usr/bin/env python
+# test_ftpd.py
+
+# ======================================================================
+# Copyright (C) 2007 Giampaolo Rodola' <g.rodola@gmail.com>
+#
+# All Rights Reserved
+#
+# Permission to use, copy, modify, and distribute this software and
+# its documentation for any purpose and without fee is hereby
+# granted, provided that the above copyright notice appear in all
+# copies and that both that copyright notice and this permission
+# notice appear in supporting documentation, and that the name of
+# Giampaolo Rodola' not be used in advertising or publicity pertaining to
+# distribution of the software without specific, written prior
+# permission.
+#
+# Giampaolo Rodola' DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
+# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
+# NO EVENT Giampaolo Rodola' BE LIABLE FOR ANY SPECIAL, INDIRECT OR
+# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
+# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
+# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
+# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+# ======================================================================
+
+
+# This test suite has been run successfully on the following systems:
+#
+# -----------------------------------------------------------
+# System | Python version
+# -----------------------------------------------------------
+# Linux Ubuntu 2.6.20-15 | 2.4, 2.5
+# Linux Kubuntu 8.04 32 & 64 bits | 2.5.2
+# Linux Debian 2.4.27-2-386 | 2.3.5
+# Windows XP prof SP3 | 2.3, 2.4, 2.5, 2.6-RC2
+# Windows Vista Ultimate 64 bit | 2.5.1
+# Windows Vista Business 32 bit | 2.5.1
+# Windows Server 2008 64bit | 2.5.1
+# Windows Mobile 6.1 | PythonCE 2.5
+# OS X 10.4.10 | 2.3, 2.4, 2.5
+# FreeBSD 7.0 | 2.4, 2.5
+# -----------------------------------------------------------
+
+
+import threading
+import unittest
+import socket
+import os
+import time
+import re
+import tempfile
+import ftplib
+import random
+import warnings
+import sys
+
+from pyftpdlib import ftpserver
+
+
+__release__ = 'pyftpdlib 0.5.0'
+
+# Attempt to use IP rather than hostname (test suite will run a lot faster)
+try:
+ HOST = socket.gethostbyname('localhost')
+except socket.error:
+ HOST = 'localhost'
+USER = 'user'
+PASSWD = '12345'
+HOME = os.getcwd()
+try:
+ from test.test_support import TESTFN
+except ImportError:
+ TESTFN = 'temp-fname'
+TESTFN2 = TESTFN + '2'
+TESTFN3 = TESTFN + '3'
+
+def try_address(host, port=0):
+ """Try to bind a daemon on the given host:port and return True
+ if that has been possible."""
+ try:
+ ftpserver.FTPServer((host, port), None)
+ except socket.error:
+ return False
+ else:
+ return True
+
+SUPPORTS_IPV4 = try_address('127.0.0.1')
+SUPPORTS_IPV6 = socket.has_ipv6 and try_address('::1')
+
+
+class TestAbstractedFS(unittest.TestCase):
+ """Test for conversion utility methods of AbstractedFS class."""
+
+ def test_ftpnorm(self):
+ # Tests for ftpnorm method.
+ ae = self.assertEquals
+ fs = ftpserver.AbstractedFS()
+
+ fs.cwd = '/'
+ ae(fs.ftpnorm(''), '/')
+ ae(fs.ftpnorm('/'), '/')
+ ae(fs.ftpnorm('.'), '/')
+ ae(fs.ftpnorm('..'), '/')
+ ae(fs.ftpnorm('a'), '/a')
+ ae(fs.ftpnorm('/a'), '/a')
+ ae(fs.ftpnorm('/a/'), '/a')
+ ae(fs.ftpnorm('a/..'), '/')
+ ae(fs.ftpnorm('a/b'), '/a/b')
+ ae(fs.ftpnorm('a/b/..'), '/a')
+ ae(fs.ftpnorm('a/b/../..'), '/')
+ fs.cwd = '/sub'
+ ae(fs.ftpnorm(''), '/sub')
+ ae(fs.ftpnorm('/'), '/')
+ ae(fs.ftpnorm('.'), '/sub')
+ ae(fs.ftpnorm('..'), '/')
+ ae(fs.ftpnorm('a'), '/sub/a')
+ ae(fs.ftpnorm('a/'), '/sub/a')
+ ae(fs.ftpnorm('a/..'), '/sub')
+ ae(fs.ftpnorm('a/b'), '/sub/a/b')
+ ae(fs.ftpnorm('a/b/'), '/sub/a/b')
+ ae(fs.ftpnorm('a/b/..'), '/sub/a')
+ ae(fs.ftpnorm('a/b/../..'), '/sub')
+ ae(fs.ftpnorm('a/b/../../..'), '/')
+ ae(fs.ftpnorm('//'), '/') # UNC paths must be collapsed
+
+ def test_ftp2fs(self):
+ # Tests for ftp2fs method.
+ ae = self.assertEquals
+ fs = ftpserver.AbstractedFS()
+ join = lambda x, y: os.path.join(x, y.replace('/', os.sep))
+
+ def goforit(root):
+ fs.root = root
+ fs.cwd = '/'
+ ae(fs.ftp2fs(''), root)
+ ae(fs.ftp2fs('/'), root)
+ ae(fs.ftp2fs('.'), root)
+ ae(fs.ftp2fs('..'), root)
+ ae(fs.ftp2fs('a'), join(root, 'a'))
+ ae(fs.ftp2fs('/a'), join(root, 'a'))
+ ae(fs.ftp2fs('/a/'), join(root, 'a'))
+ ae(fs.ftp2fs('a/..'), root)
+ ae(fs.ftp2fs('a/b'), join(root, r'a/b'))
+ ae(fs.ftp2fs('/a/b'), join(root, r'a/b'))
+ ae(fs.ftp2fs('/a/b/..'), join(root, 'a'))
+ ae(fs.ftp2fs('/a/b/../..'), root)
+ fs.cwd = '/sub'
+ ae(fs.ftp2fs(''), join(root, 'sub'))
+ ae(fs.ftp2fs('/'), root)
+ ae(fs.ftp2fs('.'), join(root, 'sub'))
+ ae(fs.ftp2fs('..'), root)
+ ae(fs.ftp2fs('a'), join(root, 'sub/a'))
+ ae(fs.ftp2fs('a/'), join(root, 'sub/a'))
+ ae(fs.ftp2fs('a/..'), join(root, 'sub'))
+ ae(fs.ftp2fs('a/b'), join(root, 'sub/a/b'))
+ ae(fs.ftp2fs('a/b/..'), join(root, 'sub/a'))
+ ae(fs.ftp2fs('a/b/../..'), join(root, 'sub'))
+ ae(fs.ftp2fs('a/b/../../..'), root)
+ ae(fs.ftp2fs('//a'), join(root, 'a')) # UNC paths must be collapsed
+
+ if os.sep == '\\':
+ goforit(r'C:\dir')
+ goforit('C:\\')
+ # on DOS-derived filesystems (e.g. Windows) this is the same
+ # as specifying the current drive directory (e.g. 'C:\\')
+ goforit('\\')
+ elif os.sep == '/':
+ goforit('/home/user')
+ goforit('/')
+ else:
+ # os.sep == ':'? Don't know... let's try it anyway
+ goforit(os.getcwd())
+
+ def test_fs2ftp(self):
+ # Tests for fs2ftp method.
+ ae = self.assertEquals
+ fs = ftpserver.AbstractedFS()
+ join = lambda x, y: os.path.join(x, y.replace('/', os.sep))
+
+ def goforit(root):
+ fs.root = root
+ ae(fs.fs2ftp(root), '/')
+ ae(fs.fs2ftp(join(root, '/')), '/')
+ ae(fs.fs2ftp(join(root, '.')), '/')
+ ae(fs.fs2ftp(join(root, '..')), '/') # can't escape from root
+ ae(fs.fs2ftp(join(root, 'a')), '/a')
+ ae(fs.fs2ftp(join(root, 'a/')), '/a')
+ ae(fs.fs2ftp(join(root, 'a/..')), '/')
+ ae(fs.fs2ftp(join(root, 'a/b')), '/a/b')
+ ae(fs.fs2ftp(join(root, 'a/b')), '/a/b')
+ ae(fs.fs2ftp(join(root, 'a/b/..')), '/a')
+ ae(fs.fs2ftp(join(root, '/a/b/../..')), '/')
+ fs.cwd = '/sub'
+ ae(fs.fs2ftp(join(root, 'a/')), '/a')
+
+ if os.sep == '\\':
+ goforit(r'C:\dir')
+ goforit('C:\\')
+ # on DOS-derived filesystems (e.g. Windows) this is the same
+ # as specifying the current drive directory (e.g. 'C:\\')
+ goforit('\\')
+ fs.root = r'C:\dir'
+ ae(fs.fs2ftp('C:\\'), '/')
+ ae(fs.fs2ftp('D:\\'), '/')
+ ae(fs.fs2ftp('D:\\dir'), '/')
+ elif os.sep == '/':
+ goforit('/')
+ assert os.path.realpath('/__home/user') == '/__home/user', \
+ 'Test skipped (symlinks not allowed).'
+ goforit('/__home/user')
+ fs.root = '/__home/user'
+ ae(fs.fs2ftp('/__home'), '/')
+ ae(fs.fs2ftp('/'), '/')
+ ae(fs.fs2ftp('/__home/userx'), '/')
+ else:
+ # os.sep == ':'? Don't know... let's try it anyway
+ goforit(os.getcwd())
+
+ def test_validpath(self):
+ # Tests for validpath method.
+ fs = ftpserver.AbstractedFS()
+ fs.root = HOME
+ self.failUnless(fs.validpath(HOME))
+ self.failUnless(fs.validpath(HOME + '/'))
+ self.failIf(fs.validpath(HOME + 'xxx'))
+
+ if hasattr(os, 'symlink'):
+ # Tests for validpath on systems supporting symbolic links.
+
+ def _safe_remove(self, path):
+ # convenience function for removing temporary files
+ try:
+ os.remove(path)
+ except os.error:
+ pass
+
+ def test_validpath_validlink(self):
+ # Test validpath by issuing a symlink pointing to a path
+ # inside the root directory.
+ fs = ftpserver.AbstractedFS()
+ fs.root = HOME
+ try:
+ open(TESTFN, 'w')
+ os.symlink(TESTFN, TESTFN2)
+ self.failUnless(fs.validpath(TESTFN))
+ finally:
+ self._safe_remove(TESTFN)
+ self._safe_remove(TESTFN2)
+
+ def test_validpath_external_symlink(self):
+ # Test validpath by issuing a symlink pointing to a path
+ # outside the root directory.
+ fs = ftpserver.AbstractedFS()
+ fs.root = HOME
+ try:
+ # tempfile should create our file in /tmp directory
+ # which should be outside the user root. If it is not
+ # we just skip the test.
+ file = tempfile.NamedTemporaryFile()
+ if HOME == os.path.dirname(file.name):
+ return
+ os.symlink(file.name, TESTFN)
+ self.failIf(fs.validpath(TESTFN))
+ finally:
+ self._safe_remove(TESTFN)
+ file.close()
+
+
+class TestDummyAuthorizer(unittest.TestCase):
+ """Tests for DummyAuthorizer class."""
+
+ # temporarily change warnings to exceptions for the purposes of testing
+ def setUp(self):
+ self.tempdir = tempfile.mkdtemp(dir=HOME)
+ self.subtempdir = tempfile.mkdtemp(dir=os.path.join(HOME, self.tempdir))
+ self.tempfile = open(os.path.join(self.tempdir, TESTFN), 'w').name
+ self.subtempfile = open(os.path.join(self.subtempdir, TESTFN), 'w').name
+ warnings.filterwarnings("error")
+
+ def tearDown(self):
+ os.remove(self.tempfile)
+ os.remove(self.subtempfile)
+ os.rmdir(self.subtempdir)
+ os.rmdir(self.tempdir)
+ warnings.resetwarnings()
+
+ def assertRaisesWithMsg(self, excClass, msg, callableObj, *args, **kwargs):
+ try:
+ callableObj(*args, **kwargs)
+ except excClass, why:
+ if str(why) == msg:
+ return
+ raise self.failureException("%s != %s" %(str(why), msg))
+ else:
+ if hasattr(excClass,'__name__'): excName = excClass.__name__
+ else: excName = str(excClass)
+ raise self.failureException, "%s not raised" % excName
+
+ def test_common_methods(self):
+ auth = ftpserver.DummyAuthorizer()
+ # create user
+ auth.add_user(USER, PASSWD, HOME)
+ auth.add_anonymous(HOME)
+ # check credentials
+ self.failUnless(auth.validate_authentication(USER, PASSWD))
+ self.failIf(auth.validate_authentication(USER, 'wrongpwd'))
+ # remove them
+ auth.remove_user(USER)
+ auth.remove_user('anonymous')
+ # raise exc if user does not exists
+ self.assertRaises(KeyError, auth.remove_user, USER)
+ # raise exc if path does not exist
+ self.assertRaisesWithMsg(ftpserver.AuthorizerError,
+ 'No such directory: "%s"' %'?:\\',
+ auth.add_user, USER, PASSWD, '?:\\')
+ self.assertRaisesWithMsg(ftpserver.AuthorizerError,
+ 'No such directory: "%s"' %'?:\\',
+ auth.add_anonymous, '?:\\')
+ # raise exc if user already exists
+ auth.add_user(USER, PASSWD, HOME)
+ auth.add_anonymous(HOME)
+ self.assertRaisesWithMsg(ftpserver.AuthorizerError,
+ 'User "%s" already exists' %USER,
+ auth.add_user, USER, PASSWD, HOME)
+ self.assertRaisesWithMsg(ftpserver.AuthorizerError,
+ 'User "anonymous" already exists',
+ auth.add_anonymous, HOME)
+ auth.remove_user(USER)
+ auth.remove_user('anonymous')
+ # raise on wrong permission
+ self.assertRaisesWithMsg(ftpserver.AuthorizerError,
+ 'No such permission "?"',
+ auth.add_user, USER, PASSWD, HOME, perm='?')
+ self.assertRaisesWithMsg(ftpserver.AuthorizerError,
+ 'No such permission "?"',
+ auth.add_anonymous, HOME, perm='?')
+ # expect warning on write permissions assigned to anonymous user
+ for x in "adfmw":
+ self.assertRaisesWithMsg(RuntimeWarning,
+ "Write permissions assigned to anonymous user.",
+ auth.add_anonymous, HOME, perm=x)
+
+ def test_override_perm_interface(self):
+ auth = ftpserver.DummyAuthorizer()
+ auth.add_user(USER, PASSWD, HOME, perm='elr')
+ # raise exc if user does not exists
+ self.assertRaises(KeyError, auth.override_perm, USER+'w', HOME, 'elr')
+ # raise exc if path does not exist or it's not a directory
+ self.assertRaisesWithMsg(ftpserver.AuthorizerError,
+ 'No such directory: "%s"' %'?:\\',
+ auth.override_perm, USER, '?:\\', 'elr')
+ self.assertRaisesWithMsg(ftpserver.AuthorizerError,
+ 'No such directory: "%s"' %self.tempfile,
+ auth.override_perm, USER, self.tempfile, 'elr')
+ # raise on wrong permission
+ self.assertRaisesWithMsg(ftpserver.AuthorizerError,
+ 'No such permission "?"', auth.override_perm,
+ USER, HOME, perm='?')
+ # expect warning on write permissions assigned to anonymous user
+ auth.add_anonymous(HOME)
+ for p in "adfmw":
+ self.assertRaisesWithMsg(RuntimeWarning,
+ "Write permissions assigned to anonymous user.",
+ auth.override_perm, 'anonymous', HOME, p)
+ # raise on attempt to override home directory permissions
+ self.assertRaisesWithMsg(ftpserver.AuthorizerError,
+ "Can't override home directory permissions",
+ auth.override_perm, USER, HOME, perm='w')
+ # raise on attempt to override a path escaping home directory
+ if os.path.dirname(HOME) != HOME:
+ self.assertRaisesWithMsg(ftpserver.AuthorizerError,
+ "Path escapes user home directory",
+ auth.override_perm, USER,
+ os.path.dirname(HOME), perm='w')
+ # try to re-set an overridden permission
+ auth.override_perm(USER, self.tempdir, perm='w')
+ auth.override_perm(USER, self.tempdir, perm='wr')
+
+ def test_override_perm_recursive_paths(self):
+ auth = ftpserver.DummyAuthorizer()
+ auth.add_user(USER, PASSWD, HOME, perm='elr')
+ self.assert_(auth.has_perm(USER, 'w', self.tempdir) is False)
+ auth.override_perm(USER, self.tempdir, perm='w', recursive=True)
+ self.assert_(auth.has_perm(USER, 'w', HOME) is False)
+ self.assert_(auth.has_perm(USER, 'w', self.tempdir) is True)
+ self.assert_(auth.has_perm(USER, 'w', self.tempfile) is True)
+ self.assert_(auth.has_perm(USER, 'w', self.subtempdir) is True)
+ self.assert_(auth.has_perm(USER, 'w', self.subtempfile) is True)
+
+ self.assert_(auth.has_perm(USER, 'w', HOME + '@') is False)
+ self.assert_(auth.has_perm(USER, 'w', self.tempdir + '@') is False)
+ path = os.path.join(self.tempdir + '@', os.path.basename(self.tempfile))
+ self.assert_(auth.has_perm(USER, 'w', path) is False)
+ # test case-sensitiveness
+ if (os.name in ('nt', 'ce')) or (sys.platform == 'cygwin'):
+ self.assert_(auth.has_perm(USER, 'w', self.tempdir.upper()) is True)
+
+ def test_override_perm_not_recursive_paths(self):
+ auth = ftpserver.DummyAuthorizer()
+ auth.add_user(USER, PASSWD, HOME, perm='elr')
+ self.assert_(auth.has_perm(USER, 'w', self.tempdir) is False)
+ auth.override_perm(USER, self.tempdir, perm='w')
+ self.assert_(auth.has_perm(USER, 'w', HOME) is False)
+ self.assert_(auth.has_perm(USER, 'w', self.tempdir) is True)
+ self.assert_(auth.has_perm(USER, 'w', self.tempfile) is True)
+ self.assert_(auth.has_perm(USER, 'w', self.subtempdir) is False)
+ self.assert_(auth.has_perm(USER, 'w', self.subtempfile) is False)
+
+ self.assert_(auth.has_perm(USER, 'w', HOME + '@') is False)
+ self.assert_(auth.has_perm(USER, 'w', self.tempdir + '@') is False)
+ path = os.path.join(self.tempdir + '@', os.path.basename(self.tempfile))
+ self.assert_(auth.has_perm(USER, 'w', path) is False)
+ # test case-sensitiveness
+ if (os.name in ('nt', 'ce')) or (sys.platform == 'cygwin'):
+ self.assert_(auth.has_perm(USER, 'w', self.tempdir.upper()) is True)
+
+
+class TestCallLater(unittest.TestCase):
+ """Tests for CallLater class."""
+
+ def setUp(self):
+ for task in ftpserver._tasks:
+ if not task.cancelled:
+ task.cancel()
+ del ftpserver._tasks[:]
+
+ def scheduler(self, timeout=0.01, count=100):
+ while ftpserver._tasks and count > 0:
+ ftpserver._scheduler()
+ count -= 1
+ time.sleep(timeout)
+
+ def test_interface(self):
+ fun = lambda: 0
+ self.assertRaises(AssertionError, ftpserver.CallLater, -1, fun)
+ x = ftpserver.CallLater(3, fun)
+ self.assertRaises(AssertionError, x.delay, -1)
+ self.assert_(x.cancelled is False)
+ x.cancel()
+ self.assert_(x.cancelled is True)
+ self.assertRaises(AssertionError, x.call)
+ self.assertRaises(AssertionError, x.reset)
+ self.assertRaises(AssertionError, x.delay, 2)
+ self.assertRaises(AssertionError, x.cancel)
+
+ def test_order(self):
+ l = []
+ fun = lambda x: l.append(x)
+ for x in [0.05, 0.04, 0.03, 0.02, 0.01]:
+ ftpserver.CallLater(x, fun, x)
+ self.scheduler()
+ self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05])
+
+ def test_delay(self):
+ l = []
+ fun = lambda x: l.append(x)
+ ftpserver.CallLater(0.01, fun, 0.01).delay(0.07)
+ ftpserver.CallLater(0.02, fun, 0.02).delay(0.08)
+ ftpserver.CallLater(0.03, fun, 0.03)
+ ftpserver.CallLater(0.04, fun, 0.04)
+ ftpserver.CallLater(0.05, fun, 0.05)
+ ftpserver.CallLater(0.06, fun, 0.06).delay(0.001)
+ self.scheduler()
+ self.assertEqual(l, [0.06, 0.03, 0.04, 0.05, 0.01, 0.02])
+
+ def test_reset(self):
+ # will fail on such systems where time.time() does not provide
+ # time with a better precision than 1 second.
+ l = []
+ fun = lambda x: l.append(x)
+ ftpserver.CallLater(0.01, fun, 0.01)
+ ftpserver.CallLater(0.02, fun, 0.02)
+ ftpserver.CallLater(0.03, fun, 0.03)
+ x = ftpserver.CallLater(0.04, fun, 0.04)
+ ftpserver.CallLater(0.05, fun, 0.05)
+ time.sleep(0.1)
+ x.reset()
+ self.scheduler()
+ self.assertEqual(l, [0.01, 0.02, 0.03, 0.05, 0.04])
+
+ def test_cancel(self):
+ l = []
+ fun = lambda x: l.append(x)
+ ftpserver.CallLater(0.01, fun, 0.01).cancel()
+ ftpserver.CallLater(0.02, fun, 0.02)
+ ftpserver.CallLater(0.03, fun, 0.03)
+ ftpserver.CallLater(0.04, fun, 0.04)
+ ftpserver.CallLater(0.05, fun, 0.05).cancel()
+ self.scheduler()
+ self.assertEqual(l, [0.02, 0.03, 0.04])
+
+
+class TestFtpAuthentication(unittest.TestCase):
+ "test: USER, PASS, REIN."
+
+ def setUp(self):
+ self.server = FTPd()
+ self.server.start()
+ self.client = ftplib.FTP()
+ self.client.connect(self.server.host, self.server.port)
+ self.f1 = open(TESTFN, 'w+b')
+ self.f2 = open(TESTFN2, 'w+b')
+
+ def tearDown(self):
+ self.client.close()
+ self.server.stop()
+ if not self.f1.closed:
+ self.f1.close()
+ if not self.f2.closed:
+ self.f2.close()
+ os.remove(TESTFN)
+ os.remove(TESTFN2)
+
+ def test_auth_ok(self):
+ self.client.login(user=USER, passwd=PASSWD)
+
+ def test_anon_auth(self):
+ self.client.login(user='anonymous', passwd='anon@')
+ self.client.login(user='AnonYmoUs', passwd='anon@')
+ self.client.login(user='anonymous', passwd='')
+
+ # Commented after delayed response on wrong credentials has been
+ # introduced because tests take too much to complete.
+
+## def test_auth_failed(self):
+## self.assertRaises(ftplib.error_perm, self.client.login, USER, 'wrong')
+## self.assertRaises(ftplib.error_perm, self.client.login, 'wrong', PASSWD)
+## self.assertRaises(ftplib.error_perm, self.client.login, 'wrong', 'wrong')
+
+## def test_max_auth(self):
+## self.assertRaises(ftplib.error_perm, self.client.login, USER, 'wrong')
+## self.assertRaises(ftplib.error_perm, self.client.login, USER, 'wrong')
+## self.assertRaises(ftplib.error_perm, self.client.login, USER, 'wrong')
+## # If authentication fails for 3 times ftpd disconnects the
+## # client. We can check if that happens by using self.client.sendcmd()
+## # on the 'dead' socket object. If socket object is really
+## # closed it should be raised a socket.error exception (Windows)
+## # or a EOFError exception (Linux).
+## self.assertRaises((socket.error, EOFError), self.client.sendcmd, '')
+
+ def test_rein(self):
+ self.client.login(user=USER, passwd=PASSWD)
+ self.client.sendcmd('rein')
+ # user not authenticated, error response expected
+ self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pwd')
+ # by logging-in again we should be able to execute a
+ # file-system command
+ self.client.login(user=USER, passwd=PASSWD)
+ self.client.sendcmd('pwd')
+
+ def test_rein_during_transfer(self):
+ self.client.login(user=USER, passwd=PASSWD)
+ data = 'abcde12345' * 100000
+ self.f1.write(data)
+ self.f1.close()
+
+ self.client.voidcmd('TYPE I')
+ conn = self.client.transfercmd('retr ' + TESTFN)
+ rein_sent = 0
+ while 1:
+ chunk = conn.recv(8192)
+ if not chunk:
+ break
+ self.f2.write(chunk)
+ if not rein_sent:
+ rein_sent = 1
+ # flush account, error response expected
+ self.client.sendcmd('rein')
+ self.assertRaises(ftplib.error_perm, self.client.dir)
+
+ # a 226 response is expected once tranfer finishes
+ self.assertEqual(self.client.voidresp()[:3], '226')
+ # account is still flushed, error response is still expected
+ self.assertRaises(ftplib.error_perm, self.client.sendcmd,
+ 'size ' + TESTFN)
+ # by logging-in again we should be able to execute a
+ # filesystem command
+ self.client.login(user=USER, passwd=PASSWD)
+ self.client.sendcmd('pwd')
+ self.f2.seek(0)
+ self.assertEqual(hash(data), hash (self.f2.read()))
+
+ def test_user(self):
+ # Test USER while already authenticated and no transfer
+ # is in progress.
+ self.client.login(user=USER, passwd=PASSWD)
+ self.client.sendcmd('user ' + USER) # authentication flushed
+ self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pwd')
+ self.client.sendcmd('pass ' + PASSWD)
+ self.client.sendcmd('pwd')
+
+ def test_user_on_transfer(self):
+ # Test USER while already authenticated and a transfer is
+ # in progress.
+ self.client.login(user=USER, passwd=PASSWD)
+ data = 'abcde12345' * 100000
+ self.f1.write(data)
+ self.f1.close()
+
+ self.client.voidcmd('TYPE I')
+ conn = self.client.transfercmd('retr ' + TESTFN)
+ rein_sent = 0
+ while 1:
+ chunk = conn.recv(8192)
+ if not chunk:
+ break
+ self.f2.write(chunk)
+ # stop transfer while it isn't finished yet
+ if not rein_sent:
+ rein_sent = 1
+ # flush account, expect an error response
+ self.client.sendcmd('user ' + USER)
+ self.assertRaises(ftplib.error_perm, self.client.dir)
+
+ # a 226 response is expected once tranfer finishes
+ self.assertEqual(self.client.voidresp()[:3], '226')
+ # account is still flushed, error response is still expected
+ self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pwd')
+ # by logging-in again we should be able to execute a
+ # filesystem command
+ self.client.sendcmd('pass ' + PASSWD)
+ self.client.sendcmd('pwd')
+ self.f2.seek(0)
+ self.assertEqual(hash(data), hash (self.f2.read()))
+
+
+class TestFtpDummyCmds(unittest.TestCase):
+ "test: TYPE, STRU, MODE, NOOP, SYST, ALLO, HELP"
+
+ def setUp(self):
+ self.server = FTPd()
+ self.server.start()
+ self.client = ftplib.FTP()
+ self.client.connect(self.server.host, self.server.port)
+ self.client.login(USER, PASSWD)
+
+ def tearDown(self):
+ self.client.close()
+ self.server.stop()
+
+ def test_type(self):
+ self.client.sendcmd('type a')
+ self.client.sendcmd('type i')
+ self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'type ?!?')
+
+ def test_stru(self):
+ self.client.sendcmd('stru f')
+ self.client.sendcmd('stru F')
+ self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stru ?!?')
+
+ def test_mode(self):
+ self.client.sendcmd('mode s')
+ self.client.sendcmd('mode S')
+ self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'mode ?!?')
+
+ def test_noop(self):
+ self.client.sendcmd('noop')
+
+ def test_syst(self):
+ self.client.sendcmd('syst')
+
+ def test_allo(self):
+ self.client.sendcmd('allo x')
+
+ def test_quit(self):
+ self.client.sendcmd('quit')
+
+ def test_help(self):
+ self.client.sendcmd('help')
+ cmd = random.choice(ftpserver.proto_cmds.keys())
+ self.client.sendcmd('help %s' %cmd)
+ self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'help ?!?')
+
+ def test_rest(self):
+ # test error conditions only;
+ # restored data-transfer is tested later
+ self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'rest')
+ self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'rest str')
+ self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'rest -1')
+
+ def test_opts_feat(self):
+ self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'opts mlst bad_fact')
+ self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'opts mlst type ;')
+ self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'opts not_mlst')
+ # utility function which used for extracting the MLST "facts"
+ # string from the FEAT response
+ def mlst():
+ resp = self.client.sendcmd('feat')
+ return re.search(r'^\s*MLST\s+(\S+)$', resp, re.MULTILINE).group(1)
+ # we rely on "type", "perm", "size", and "modify" facts which
+ # are those available on all platforms
+ self.failUnless('type*;perm*;size*;modify*;' in mlst())
+ self.assertEqual(self.client.sendcmd('opts mlst type;'), '200 MLST OPTS type;')
+ self.assertEqual(self.client.sendcmd('opts mLSt TypE;'), '200 MLST OPTS type;')
+ self.failUnless('type*;perm;size;modify;' in mlst())
+
+ self.assertEqual(self.client.sendcmd('opts mlst'), '200 MLST OPTS ')
+ self.failUnless(not '*' in mlst())
+
+ self.assertEqual(self.client.sendcmd('opts mlst fish;cakes;'), '200 MLST OPTS ')
+ self.failUnless(not '*' in mlst())
+ self.assertEqual(self.client.sendcmd('opts mlst fish;cakes;type;'), \
+ '200 MLST OPTS type;')
+ self.failUnless('type*;perm;size;modify;' in mlst())
+
+
+class TestFtpCmdsSemantic(unittest.TestCase):
+
+ arg_cmds = ('allo','appe','dele','eprt','mdtm','mode','mkd','opts','port',
+ 'rest','retr','rmd','rnfr','rnto','size', 'stor', 'stru','type',
+ 'user','xmkd','xrmd')
+
+ def setUp(self):
+ self.server = FTPd()
+ self.server.start()
+ self.client = ftplib.FTP()
+ self.client.connect(self.server.host, self.server.port)
+ self.client.login(USER, PASSWD)
+
+ def tearDown(self):
+ self.client.close()
+ self.server.stop()
+
+ def test_arg_cmds(self):
+ # test commands requiring an argument
+ expected = "501 Syntax error: command needs an argument."
+ for cmd in self.arg_cmds:
+ self.client.putcmd(cmd)
+ resp = self.client.getmultiline()
+ self.assertEqual(resp, expected)
+
+ def test_no_arg_cmds(self):
+ # test commands accepting no arguments
+ expected = "501 Syntax error: command does not accept arguments."
+ for cmd in ('abor','cdup','feat','noop','pasv','pwd','quit','rein',
+ 'syst','xcup','xpwd'):
+ self.client.putcmd(cmd + ' arg')
+ resp = self.client.getmultiline()
+ self.assertEqual(resp, expected)
+
+ def test_auth_cmds(self):
+ # test those commands requiring client to be authenticated
+ expected = "530 Log in with USER and PASS first."
+ self.client.sendcmd('rein')
+ for cmd in ftpserver.proto_cmds:
+ cmd = cmd.lower()
+ if cmd in ('feat','help','noop','user','pass','stat','syst','quit'):
+ continue
+ if cmd in self.arg_cmds:
+ cmd = cmd + ' arg'
+ self.client.putcmd(cmd)
+ resp = self.client.getmultiline()
+ self.assertEqual(resp, expected)
+
+ def test_no_auth_cmds(self):
+ # test those commands that do not require client to be authenticated
+ self.client.sendcmd('rein')
+ for cmd in ('feat','help','noop','stat','syst'):
+ self.client.sendcmd(cmd)
+ # STAT provided with an argument is equal to LIST hence not allowed
+ # if not authenticated
+ self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stat /')
+ self.client.sendcmd('quit')
+
+
+class TestFtpFsOperations(unittest.TestCase):
+ "test: PWD, CWD, CDUP, SIZE, RNFR, RNTO, DELE, MKD, RMD, MDTM, STAT"
+
+ def setUp(self):
+ self.server = FTPd()
+ self.server.start()
+ self.client = ftplib.FTP()
+ self.client.connect(self.server.host, self.server.port)
+ self.client.login(USER, PASSWD)
+ self.tempfile = os.path.basename(open(TESTFN, 'w+b').name)
+ self.tempdir = os.path.basename(tempfile.mktemp(dir=HOME))
+ os.mkdir(self.tempdir)
+
+ def tearDown(self):
+ self.client.close()
+ self.server.stop()
+ if os.path.exists(self.tempfile):
+ os.remove(self.tempfile)
+ if os.path.exists(self.tempdir):
+ os.rmdir(self.tempdir)
+
+ def test_cwd(self):
+ self.client.cwd(self.tempdir)
+ self.assertEqual(self.client.pwd(), '/' + self.tempdir)
+ self.assertRaises(ftplib.error_perm, self.client.cwd, 'subtempdir')
+ # cwd provided with no arguments is supposed to move us to the
+ # root directory
+ self.client.sendcmd('cwd')
+ self.assertEqual(self.client.pwd(), '/')
+
+ def test_pwd(self):
+ self.assertEqual(self.client.pwd(), '/')
+ self.client.cwd(self.tempdir)
+ self.assertEqual(self.client.pwd(), '/' + self.tempdir)
+
+ def test_cdup(self):
+ self.client.cwd(self.tempdir)
+ self.assertEqual(self.client.pwd(), '/' + self.tempdir)
+ self.client.sendcmd('cdup')
+ self.assertEqual(self.client.pwd(), '/')
+ # make sure we can't escape from root directory
+ self.client.sendcmd('cdup')
+ self.assertEqual(self.client.pwd(), '/')
+
+ def test_mkd(self):
+ tempdir = os.path.basename(tempfile.mktemp(dir=HOME))
+ self.client.mkd(tempdir)
+ # make sure we can't create directories which already exist
+ # (probably not really necessary);
+ # let's use a try/except statement to avoid leaving behind
+ # orphaned temporary directory in the event of a test failure.
+ try:
+ self.client.mkd(tempdir)
+ except ftplib.error_perm:
+ os.rmdir(tempdir) # ok
+ else:
+ self.fail('ftplib.error_perm not raised.')
+
+ def test_rmd(self):
+ self.client.rmd(self.tempdir)
+ self.assertRaises(ftplib.error_perm, self.client.rmd, self.tempfile)
+ # make sure we can't remove the root directory
+ self.assertRaises(ftplib.error_perm, self.client.rmd, '/')
+
+ def test_dele(self):
+ self.client.delete(self.tempfile)
+ self.assertRaises(ftplib.error_perm, self.client.delete, self.tempdir)
+
+ def test_rnfr_rnto(self):
+ # rename file
+ tempname = os.path.basename(tempfile.mktemp(dir=HOME))
+ self.client.rename(self.tempfile, tempname)
+ self.client.rename(tempname, self.tempfile)
+ # rename dir
+ tempname = os.path.basename(tempfile.mktemp(dir=HOME))
+ self.client.rename(self.tempdir, tempname)
+ self.client.rename(tempname, self.tempdir)
+ # rnfr/rnto over non-existing paths
+ bogus = os.path.basename(tempfile.mktemp(dir=HOME))
+ self.assertRaises(ftplib.error_perm, self.client.rename, bogus, '/x')
+ self.assertRaises(ftplib.error_perm, self.client.rename, self.tempfile, '/')
+ # make sure we can't rename root directory
+ self.assertRaises(ftplib.error_perm, self.client.rename, '/', '/x')
+
+ def test_mdtm(self):
+ self.client.sendcmd('mdtm ' + self.tempfile)
+ # make sure we can't use mdtm against directories
+ try:
+ self.client.sendcmd('mdtm ' + self.tempdir)
+ except ftplib.error_perm, err:
+ self.failUnless("not retrievable" in str(err))
+ else:
+ self.fail('Exception not raised')
+
+ def test_size(self):
+ self.client.size(self.tempfile)
+ # make sure we can't use size against directories
+ try:
+ self.client.sendcmd('size ' + self.tempdir)
+ except ftplib.error_perm, err:
+ self.failUnless("not retrievable" in str(err))
+ else:
+ self.fail('Exception not raised')
+
+
+class TestFtpRetrieveData(unittest.TestCase):
+ "test: RETR, REST, LIST, NLST, argumented STAT"
+
+ def setUp(self):
+ self.server = FTPd()
+ self.server.start()
+ self.client = ftplib.FTP()
+ self.client.connect(self.server.host, self.server.port)
+ self.client.login(USER, PASSWD)
+ self.f1 = open(TESTFN, 'w+b')
+ self.f2 = open(TESTFN2, 'w+b')
+
+ def tearDown(self):
+ self.client.close()
+ self.server.stop()
+ if not self.f1.closed:
+ self.f1.close()
+ if not self.f2.closed:
+ self.f2.close()
+ os.remove(TESTFN)
+ os.remove(TESTFN2)
+
+ def test_retr(self):
+ data = 'abcde12345' * 100000
+ self.f1.write(data)
+ self.f1.close()
+ self.client.retrbinary("retr " + TESTFN, self.f2.write)
+ self.f2.seek(0)
+ self.assertEqual(hash(data), hash(self.f2.read()))
+
+ def test_restore_on_retr(self):
+ data = 'abcde12345' * 100000
+ fname_1 = os.path.basename(self.f1.name)
+ self.f1.write(data)
+ self.f1.close()
+
+ # look at ftplib.FTP.retrbinary method to understand this mess
+ self.client.voidcmd('TYPE I')
+ conn = self.client.transfercmd('retr ' + fname_1)
+ chunk = conn.recv(len(data) / 2)
+ self.f2.write(chunk)
+ conn.close()
+ # transfer wasn't finished yet so we expect a 426 response
+ self.assertRaises(ftplib.error_temp, self.client.voidresp)
+
+ # resuming transfer by using a marker value greater than the
+ # file size stored on the server should result in an error
+ # on retr (RFC-1123)
+ file_size = self.client.size(fname_1)
+ self.client.sendcmd('rest %s' %((file_size + 1)))
+ self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'retr ' + fname_1)
+
+ # test resume
+ self.client.sendcmd('rest %s' %len(chunk))
+ self.client.retrbinary("retr " + fname_1, self.f2.write)
+ self.f2.seek(0)
+ self.assertEqual(hash(data), hash (self.f2.read()))
+
+ def _test_listing_cmds(self, cmd):
+ """Tests common to LIST NLST and MLSD commands."""
+ # assume that no argument has the same meaning of "/"
+ l1 = l2 = []
+ self.client.retrlines(cmd, l1.append)
+ self.client.retrlines(cmd + ' /', l2.append)
+ self.assertEqual(l1, l2)
+ if cmd.lower() != 'mlsd':
+ # if pathname is a file one line is expected
+ x = []
+ self.client.retrlines('%s ' %cmd + TESTFN, x.append)
+ self.assertEqual(len(x), 1)
+ self.failUnless(''.join(x).endswith(TESTFN))
+ # non-existent path, 550 response is expected
+ bogus = os.path.basename(tempfile.mktemp(dir=HOME))
+ self.assertRaises(ftplib.error_perm, self.client.retrlines,
+ '%s ' %cmd + bogus, lambda x: x)
+ # for an empty directory we excpect that the data channel is
+ # opened anyway and that no data is received
+ x = []
+ tempdir = os.path.basename(tempfile.mkdtemp(dir=HOME))
+ try:
+ self.client.retrlines('%s %s' %(cmd, tempdir), x.append)
+ self.assertEqual(x, [])
+ finally:
+ os.rmdir(tempdir)
+
+ def test_nlst(self):
+ # common tests
+ self._test_listing_cmds('nlst')
+
+ def test_list(self):
+ # common tests
+ self._test_listing_cmds('list')
+ # known incorrect pathname arguments (e.g. old clients) are
+ # expected to be treated as if pathname would be == '/'
+ l1 = l2 = l3 = l4 = l5 = []
+ self.client.retrlines('list /', l1.append)
+ self.client.retrlines('list -a', l2.append)
+ self.client.retrlines('list -l', l3.append)
+ self.client.retrlines('list -al', l4.append)
+ self.client.retrlines('list -la', l5.append)
+ tot = (l1, l2, l3, l4, l5)
+ for x in range(len(tot) - 1):
+ self.assertEqual(tot[x], tot[x+1])
+
+ def test_mlst(self):
+ # utility function for extracting the line of interest
+ mlstline = lambda cmd: self.client.voidcmd(cmd).split('\n')[1]
+
+ # the fact set must be preceded by a space
+ self.failUnless(mlstline('mlst').startswith(' '))
+ # where TVFS is supported, a fully qualified pathname is expected
+ self.failUnless(mlstline('mlst ' + TESTFN).endswith('/' + TESTFN))
+ self.failUnless(mlstline('mlst').endswith('/'))
+ # assume that no argument has the same meaning of "/"
+ self.assertEqual(mlstline('mlst'), mlstline('mlst /'))
+ # non-existent path
+ bogus = os.path.basename(tempfile.mktemp(dir=HOME))
+ self.assertRaises(ftplib.error_perm, mlstline, bogus)
+ # test file/dir notations
+ self.failUnless('type=dir' in mlstline('mlst'))
+ self.failUnless('type=file' in mlstline('mlst ' + TESTFN))
+ # let's add some tests for OPTS command
+ self.client.sendcmd('opts mlst type;')
+ self.assertEqual(mlstline('mlst'), ' type=dir; /')
+ # where no facts are present, two leading spaces before the
+ # pathname are required (RFC-3659)
+ self.client.sendcmd('opts mlst')
+ self.assertEqual(mlstline('mlst'), ' /')
+
+ def test_mlsd(self):
+ # common tests
+ self._test_listing_cmds('mlsd')
+ dir = os.path.basename(tempfile.mkdtemp(dir=HOME))
+ try:
+ try:
+ self.client.retrlines('mlsd ' + TESTFN, lambda x: x)
+ except ftplib.error_perm, resp:
+ # if path is a file a 501 response code is expected
+ self.assertEqual(str(resp)[0:3], "501")
+ else:
+ self.fail("Exception not raised")
+ finally:
+ os.rmdir(dir)
+
+ def test_stat(self):
+ # test STAT provided with argument which is equal to LIST
+ self.client.sendcmd('stat /')
+ self.client.sendcmd('stat ' + TESTFN)
+ self.client.putcmd('stat *')
+ resp = self.client.getmultiline()
+ self.assertEqual(resp, '550 Globbing not supported.')
+ bogus = os.path.basename(tempfile.mktemp(dir=HOME))
+ self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stat ' + bogus)
+
+
+class TestFtpAbort(unittest.TestCase):
+ "test: ABOR"
+
+ def setUp(self):
+ self.server = FTPd()
+ self.server.start()
+ self.client = ftplib.FTP()
+ self.client.connect(self.server.host, self.server.port)
+ self.client.login(USER, PASSWD)
+ self.f1 = open(TESTFN, 'w+b')
+ self.f2 = open(TESTFN2, 'w+b')
+
+ def tearDown(self):
+ self.client.close()
+ self.server.stop()
+ if not self.f1.closed:
+ self.f1.close()
+ if not self.f2.closed:
+ self.f2.close()
+ os.remove(self.f1.name)
+ os.remove(self.f2.name)
+
+ def test_abor_no_data(self):
+ # Case 1: ABOR while no data channel is opened: respond with 225.
+ resp = self.client.sendcmd('ABOR')
+ self.failUnlessEqual('225 No transfer to abort.', resp)
+
+ def test_abor_pasv(self):
+ # Case 2: user sends a PASV, a data-channel socket is listening
+ # but not connected, and ABOR is sent: close listening data
+ # socket, respond with 225.
+ self.client.makepasv()
+ respcode = self.client.sendcmd('ABOR')[:3]
+ self.failUnlessEqual('225', respcode)
+
+ def test_abor_port(self):
+ # Case 3: data channel opened with PASV or PORT, but ABOR sent
+ # before a data transfer has been started: close data channel,
+ # respond with 225
+ self.client.makeport()
+ respcode = self.client.sendcmd('ABOR')[:3]
+ self.failUnlessEqual('225', respcode)
+
+ def test_abor(self):
+ # Case 4: ABOR while a data transfer on DTP channel is in
+ # progress: close data channel, respond with 426, respond
+ # with 226.
+ data = 'abcde12345' * 100000
+ self.f1.write(data)
+ self.f1.close()
+
+ # this ugly loop construct is to simulate an interrupted
+ # transfer since ftplib doesn't like running storbinary()
+ # in a separate thread
+ self.client.voidcmd('TYPE I')
+ conn = self.client.transfercmd('retr ' + TESTFN)
+ chunk = conn.recv(len(data) / 2)
+ # stop transfer while it isn't finished yet
+ self.client.putcmd('ABOR')
+
+ # transfer isn't finished yet so ftpd should respond with 426
+ self.assertRaises(ftplib.error_temp, self.client.voidresp)
+
+ # transfer successfully aborted, so should now respond with a 226
+ self.failUnlessEqual('226', self.client.voidresp()[:3])
+
+ if hasattr(socket, 'MSG_OOB'):
+ def test_oob_abor(self):
+ # Send ABOR by following the RFC-959 directives of sending
+ # Telnet IP/Synch sequence as OOB data.
+ # On some systems like FreeBSD this happened to be a problem
+ # due to a different SO_OOBINLINE behavior.
+ # On some platforms (e.g. Python CE) the test may fail
+ # although the MSG_OOB constant is defined.
+ self.client.sock.sendall(chr(244), socket.MSG_OOB)
+ self.client.sock.sendall(chr(242), socket.MSG_OOB)
+ self.client.sock.sendall('abor\r\n')
+ self.client.sock.settimeout(1)
+ self.assertEqual(self.client.getresp()[:3], '225')
+
+
+class TestFtpStoreData(unittest.TestCase):
+ "test: STOR, STOU, APPE, REST"
+
+ def setUp(self):
+ self.server = FTPd()
+ self.server.start()
+ self.client = ftplib.FTP()
+ self.client.connect(self.server.host, self.server.port)
+ self.client.login(USER, PASSWD)
+ self.f1 = open(TESTFN, 'w+b')
+ self.f2 = open(TESTFN2, 'w+b')
+
+ def tearDown(self):
+ self.client.close()
+ self.server.stop()
+ if not self.f1.closed:
+ self.f1.close()
+ if not self.f2.closed:
+ self.f2.close()
+ os.remove(TESTFN)
+ os.remove(TESTFN2)
+
+ def test_stor(self):
+ # TESTFN3 is the remote file name
+ try:
+ data = 'abcde12345' * 100000
+ self.f1.write(data)
+ self.f1.seek(0)
+ self.client.storbinary('stor ' + TESTFN3, self.f1)
+ self.client.retrbinary('retr ' + TESTFN3, self.f2.write)
+ self.f2.seek(0)
+ self.assertEqual(hash(data), hash (self.f2.read()))
+ finally:
+ # we do not use os.remove because file could be still
+ # locked by ftpd thread
+ if os.path.exists(TESTFN3):
+ self.client.delete(TESTFN3)
+
+ def test_stou(self):
+ data = 'abcde12345' * 100000
+ self.f1.write(data)
+ self.f1.seek(0)
+
+ self.client.voidcmd('TYPE I')
+ # filename comes in as "1xx FILE: <filename>"
+ filename = self.client.sendcmd('stou').split('FILE: ')[1]
+ try:
+ sock = self.client.makeport()
+ conn, sockaddr = sock.accept()
+ while 1:
+ buf = self.f1.read(8192)
+ if not buf:
+ break
+ conn.sendall(buf)
+ conn.close()
+ # transfer finished, a 226 response is expected
+ self.client.voidresp()
+ self.client.retrbinary('retr ' + filename, self.f2.write)
+ self.f2.seek(0)
+ self.assertEqual(hash(data), hash (self.f2.read()))
+ finally:
+ # we do not use os.remove because file could be
+ # still locked by ftpd thread
+ if os.path.exists(filename):
+ self.client.delete(filename)
+
+ def test_stou_rest(self):
+ # watch for STOU preceded by REST, which makes no sense.
+ self.client.sendcmd('rest 10')
+ self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'stou')
+
+ def test_appe(self):
+ # TESTFN3 is the remote file name
+ try:
+ data1 = 'abcde12345' * 100000
+ self.f1.write(data1)
+ self.f1.seek(0)
+ self.client.storbinary('stor ' + TESTFN3, self.f1)
+
+ data2 = 'fghil67890' * 100000
+ self.f1.write(data2)
+ self.f1.seek(self.client.size(TESTFN3))
+ self.client.storbinary('appe ' + TESTFN3, self.f1)
+
+ self.client.retrbinary("retr " + TESTFN3, self.f2.write)
+ self.f2.seek(0)
+ self.assertEqual(hash(data1 + data2), hash (self.f2.read()))
+ finally:
+ # we do not use os.remove because file could be still
+ # locked by ftpd thread
+ if os.path.exists(TESTFN3):
+ self.client.delete(TESTFN3)
+
+ def test_appe_rest(self):
+ # watch for APPE preceded by REST, which makes no sense.
+ self.client.sendcmd('rest 10')
+ self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'appe x')
+
+ def test_rest_on_stor(self):
+ # TESTFN3 is the remote file name
+ data = 'abcde12345' * 100000
+ self.f1.write(data)
+ self.f1.seek(0)
+
+ self.client.voidcmd('TYPE I')
+ conn = self.client.transfercmd('stor ' + TESTFN3)
+ bytes_sent = 0
+ while 1:
+ chunk = self.f1.read(8192)
+ conn.sendall(chunk)
+ bytes_sent += len(chunk)
+ # stop transfer while it isn't finished yet
+ if bytes_sent >= 524288: # 2^19
+ break
+ elif not chunk:
+ break
+ conn.close()
+ # transfer wasn't finished yet so we expect a 426 response
+ self.client.voidresp()
+
+ # resuming transfer by using a marker value greater than the
+ # file size stored on the server should result in an error
+ # on stor
+ file_size = self.client.size(TESTFN3)
+ self.assertEqual(file_size, bytes_sent)
+ self.client.sendcmd('rest %s' %((file_size + 1)))
+ self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stor ' + TESTFN3)
+
+ self.client.sendcmd('rest %s' %bytes_sent)
+ self.client.storbinary('stor ' + TESTFN3, self.f1)
+
+ self.client.retrbinary('retr ' + TESTFN3, self.f2.write)
+ self.f1.seek(0)
+ self.f2.seek(0)
+ self.assertEqual(hash(self.f1.read()), hash(self.f2.read()))
+ self.client.delete(TESTFN3)
+
+
+class TestTimeouts(unittest.TestCase):
+ """Test idle-timeout capabilities of control and data channels.
+ Some tests may fail on slow machines.
+ """
+
+ def _setUp(self, idle_timeout=300, data_timeout=300, pasv_timeout=30):
+ self.server = FTPd()
+ self.server.handler.timeout = idle_timeout
+ self.server.handler.dtp_handler.timeout = data_timeout
+ self.server.handler.passive_dtp.timeout = pasv_timeout
+ self.server.start()
+ self.client = ftplib.FTP()
+ self.client.connect(self.server.host, self.server.port)
+ self.client.login(USER, PASSWD)
+
+ def tearDown(self):
+ self.client.close()
+ self.server.handler.timeout = 300
+ self.server.handler.dtp_handler.timeout = 300
+ self.server.handler.passive_dtp.timeout = 30
+ self.server.stop()
+
+ def test_idle_timeout(self):
+ # Test control channel timeout. The client which does not send
+ # any command within the time specified in FTPHandler.timeout is
+ # supposed to be kicked off.
+ self._setUp(idle_timeout=0.1)
+ # fail if no msg is received within 1 second
+ self.client.sock.settimeout(1)
+ data = self.client.sock.recv(1024)
+ self.assertEqual(data, "421 Control connection timed out.\r\n")
+ # ensure client has been kicked off
+ self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop')
+
+ def test_data_timeout(self):
+ # Test data channel timeout. The client which does not send
+ # or receive any data within the time specified in
+ # DTPHandler.timeout is supposed to be kicked off.
+ self._setUp(data_timeout=0.1)
+ addr = self.client.makepasv()
+ s = socket.socket()
+ s.connect(addr)
+ # fail if no msg is received within 1 second
+ self.client.sock.settimeout(1)
+ data = self.client.sock.recv(1024)
+ self.assertEqual(data, "421 Data connection timed out.\r\n")
+ # ensure client has been kicked off
+ self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop')
+
+ def test_idle_data_timeout1(self):
+ # Tests that the control connection timeout is suspended while
+ # the data channel is opened
+ self._setUp(idle_timeout=0.1, data_timeout=0.2)
+ addr = self.client.makepasv()
+ s = socket.socket()
+ s.connect(addr)
+ # fail if no msg is received within 1 second
+ self.client.sock.settimeout(1)
+ data = self.client.sock.recv(1024)
+ self.assertEqual(data, "421 Data connection timed out.\r\n")
+ # ensure client has been kicked off
+ self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop')
+
+ def test_idle_data_timeout2(self):
+ # Tests that the control connection timeout is restarted after
+ # data channel has been closed
+ self._setUp(idle_timeout=0.1, data_timeout=0.2)
+ addr = self.client.makepasv()
+ s = socket.socket()
+ s.connect(addr)
+ # close data channel
+ self.client.sendcmd('abor')
+ self.client.sock.settimeout(1)
+ data = self.client.sock.recv(1024)
+ self.assertEqual(data, "421 Control connection timed out.\r\n")
+ # ensure client has been kicked off
+ self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop')
+
+ def test_pasv_timeout(self):
+ # Test pasv data channel timeout. The client which does not connect
+ # to the listening data socket within the time specified in
+ # PassiveDTP.timeout is supposed to receive a 421 response.
+ self._setUp(pasv_timeout=0.1)
+ self.client.makepasv()
+ # fail if no msg is received within 1 second
+ self.client.sock.settimeout(1)
+ data = self.client.sock.recv(1024)
+ self.assertEqual(data, "421 Passive data channel timed out.\r\n")
+ # client is not expected to be kicked off
+ self.client.sendcmd('noop')
+
+
+class TestMaxConnections(unittest.TestCase):
+ """Test maximum connections (FTPServer.max_cons)."""
+
+ def setUp(self):
+ self.server = FTPd()
+ self.server.server.max_cons = 3
+ self.server.start()
+
+ def tearDown(self):
+ self.server.server.max_cons = 0
+ self.server.stop()
+
+ def test_max_connections(self):
+ c1 = ftplib.FTP()
+ c2 = ftplib.FTP()
+ c3 = ftplib.FTP()
+ try:
+ c1.connect(self.server.host, self.server.port)
+ c2.connect(self.server.host, self.server.port)
+ self.assertRaises(ftplib.error_temp, c3.connect, self.server.host,
+ self.server.port)
+ # with passive data channel established
+ c2.close()
+ c1.login(USER, PASSWD)
+ c1.makepasv()
+ self.assertRaises(ftplib.error_temp, c2.connect, self.server.host,
+ self.server.port)
+ # with passive data socket waiting for connection
+ c1.login(USER, PASSWD)
+ c1.sendcmd('pasv')
+ self.assertRaises(ftplib.error_temp, c2.connect, self.server.host,
+ self.server.port)
+ # with active data channel established
+ c1.login(USER, PASSWD)
+ c1.makeport()
+ self.assertRaises(ftplib.error_temp, c2.connect, self.server.host,
+ self.server.port)
+ finally:
+ c1.close()
+ c2.close()
+ c3.close()
+
+
+class _TestNetworkProtocols(unittest.TestCase):
+ """Test PASV, EPSV, PORT and EPRT commands.
+
+ Do not use this class directly. Let TestIPv4Environment and
+ TestIPv6Environment classes use it instead.
+ """
+ HOST = HOST
+
+ def setUp(self):
+ self.server = FTPd(self.HOST)
+ self.server.start()
+ self.client = ftplib.FTP()
+ self.client.connect(self.server.host, self.server.port)
+ self.client.login(USER, PASSWD)
+ if self.client.af == socket.AF_INET:
+ self.proto = "1"
+ self.other_proto = "2"
+ else:
+ self.proto = "2"
+ self.other_proto = "1"
+
+ def tearDown(self):
+ self.client.close()
+ self.server.stop()
+
+ def cmdresp(self, cmd):
+ """Send a command and return response, also if the command failed."""
+ try:
+ return self.client.sendcmd(cmd)
+ except ftplib.Error, err:
+ return str(err)
+
+ def test_eprt(self):
+ # test wrong proto
+ try:
+ self.client.sendcmd('eprt |%s|%s|%s|' %(self.other_proto,
+ self.server.host, self.server.port))
+ except ftplib.error_perm, err:
+ self.assertEqual(str(err)[0:3], "522")
+ else:
+ self.fail("Exception not raised")
+
+ # test bad args
+ msg = "501 Invalid EPRT format."
+ # len('|') > 3
+ self.assertEqual(self.cmdresp('eprt ||||'), msg)
+ # len('|') < 3
+ self.assertEqual(self.cmdresp('eprt ||'), msg)
+ # port > 65535
+ self.assertEqual(self.cmdresp('eprt |%s|%s|65536|' %(self.proto,
+ self.HOST)), msg)
+ # port < 0
+ self.assertEqual(self.cmdresp('eprt |%s|%s|-1|' %(self.proto,
+ self.HOST)), msg)
+ # port < 1024
+ self.assertEqual(self.cmdresp('eprt |%s|%s|222|' %(self.proto,
+ self.HOST)), "501 Can't connect over a privileged port.")
+
+ # test connection
+ sock = socket.socket(self.client.af, socket.SOCK_STREAM)
+ sock.bind((self.client.sock.getsockname()[0], 0))
+ sock.listen(5)
+ sock.settimeout(2)
+ ip, port = sock.getsockname()[:2]
+ self.client.sendcmd('eprt |%s|%s|%s|' %(self.proto, ip, port))
+ try:
+ try:
+ sock.accept()
+ except socket.timeout:
+ self.fail("Server didn't connect to passive socket")
+ finally:
+ sock.close()
+
+ def test_epsv(self):
+ # test wrong proto
+ try:
+ self.client.sendcmd('epsv ' + self.other_proto)
+ except ftplib.error_perm, err:
+ self.assertEqual(str(err)[0:3], "522")
+ else:
+ self.fail("Exception not raised")
+
+ # test connection
+ for cmd in ('EPSV', 'EPSV ' + self.proto):
+ host, port = ftplib.parse229(self.client.sendcmd(cmd),
+ self.client.sock.getpeername())
+ s = socket.socket(self.client.af, socket.SOCK_STREAM)
+ s.settimeout(2)
+ try:
+ s.connect((host, port))
+ self.client.sendcmd('abor')
+ finally:
+ s.close()
+
+ def test_epsv_all(self):
+ self.client.sendcmd('epsv all')
+ self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pasv')
+ self.assertRaises(ftplib.error_perm, self.client.sendport, self.HOST, 2000)
+ self.assertRaises(ftplib.error_perm, self.client.sendcmd,
+ 'eprt |%s|%s|%s|' %(self.proto, self.HOST, 2000))
+
+
+class TestIPv4Environment(_TestNetworkProtocols):
+ """Test PASV, EPSV, PORT and EPRT commands.
+
+ Runs tests contained in _TestNetworkProtocols class by using IPv4
+ plus some additional specific tests.
+ """
+ HOST = '127.0.0.1'
+
+ def test_port_v4(self):
+ # test connection
+ self.client.makeport()
+ self.client.sendcmd('abor')
+ # test bad arguments
+ ae = self.assertEqual
+ msg = "501 Invalid PORT format."
+ ae(self.cmdresp('port 127,0,0,1,1.1'), msg) # sep != ','
+ ae(self.cmdresp('port X,0,0,1,1,1'), msg) # value != int
+ ae(self.cmdresp('port 127,0,0,1,1,1,1'), msg) # len(args) > 6
+ ae(self.cmdresp('port 127,0,0,1'), msg) # len(args) < 6
+ ae(self.cmdresp('port 256,0,0,1,1,1'), msg) # oct > 255
+ ae(self.cmdresp('port 127,0,0,1,256,1'), msg) # port > 65535
+ ae(self.cmdresp('port 127,0,0,1,-1,0'), msg) # port < 0
+ msg = "501 Can't connect over a privileged port."
+ ae(self.cmdresp('port %s,1,1' %self.HOST.replace('.',',')),msg) # port < 1024
+ if "1.2.3.4" != self.HOST:
+ msg = "501 Can't connect to a foreign address."
+ ae(self.cmdresp('port 1,2,3,4,4,4'), msg)
+
+ def test_eprt_v4(self):
+ self.assertEqual(self.cmdresp('eprt |1|0.10.10.10|2222|'),
+ "501 Can't connect to a foreign address.")
+
+ def test_pasv_v4(self):
+ host, port = ftplib.parse227(self.client.sendcmd('pasv'))
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.settimeout(2)
+ try:
+ s.connect((host, port))
+ finally:
+ s.close()
+
+
+class TestIPv6Environment(_TestNetworkProtocols):
+ """Test PASV, EPSV, PORT and EPRT commands.
+
+ Runs tests contained in _TestNetworkProtocols class by using IPv6
+ plus some additional specific tests.
+ """
+ HOST = '::1'
+
+ def test_port_v6(self):
+ # 425 expected
+ self.assertRaises(ftplib.error_temp, self.client.sendport,
+ self.server.host, self.server.port)
+
+ def test_pasv_v6(self):
+ # 425 expected
+ self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'pasv')
+
+ def test_eprt_v6(self):
+ self.assertEqual(self.cmdresp('eprt |2|::xxx|2222|'),
+ "501 Can't connect to a foreign address.")
+
+
+class FTPd(threading.Thread):
+ """A threaded FTP server used for running tests."""
+
+ def __init__(self, host=HOST, port=0, verbose=False):
+ threading.Thread.__init__(self)
+ self.active = False
+ if not verbose:
+ ftpserver.log = ftpserver.logline = lambda x: x
+ self.authorizer = ftpserver.DummyAuthorizer()
+ self.authorizer.add_user(USER, PASSWD, HOME, perm='elradfmw') # full perms
+ self.authorizer.add_anonymous(HOME)
+ self.handler = ftpserver.FTPHandler
+ self.handler.authorizer = self.authorizer
+ self.server = ftpserver.FTPServer((host, port), self.handler)
+ self.host, self.port = self.server.socket.getsockname()[:2]
+ self.active_lock = threading.Lock()
+
+ def start(self):
+ assert not self.active
+ self.__flag = threading.Event()
+ threading.Thread.start(self)
+ self.__flag.wait()
+
+ def run(self):
+ self.active = True
+ self.__flag.set()
+ while self.active:
+ self.active_lock.acquire()
+ self.server.serve_forever(timeout=0.001, count=1)
+ self.active_lock.release()
+ self.server.close_all(ignore_all=True)
+
+ def stop(self):
+ assert self.active
+ self.active = False
+ self.join()
+
+
+def remove_test_files():
+ "Convenience function for removing temporary test files"
+ for file in [TESTFN, TESTFN2, TESTFN3]:
+ try:
+ os.remove(file)
+ except os.error:
+ pass
+
+def test_main(tests=None):
+ test_suite = unittest.TestSuite()
+ if tests is None:
+ tests = [
+ TestAbstractedFS,
+ TestDummyAuthorizer,
+ TestCallLater,
+ TestFtpAuthentication,
+ TestFtpDummyCmds,
+ TestFtpCmdsSemantic,
+ TestFtpFsOperations,
+ TestFtpRetrieveData,
+ TestFtpAbort,
+ TestFtpStoreData,
+ TestTimeouts,
+ TestMaxConnections
+ ]
+ if SUPPORTS_IPV4:
+ tests.append(TestIPv4Environment)
+ if SUPPORTS_IPV6:
+ tests.append(TestIPv6Environment)
+
+ for test in tests:
+ test_suite.addTest(unittest.makeSuite(test))
+ remove_test_files()
+ unittest.TextTestRunner(verbosity=2).run(test_suite)
+ remove_test_files()
+
+
+if __name__ == '__main__':
+ test_main()
« no previous file with comments | « third_party/pyftpdlib/setup.py ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698