Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(76)

Side by Side Diff: third_party/pyftpdlib/test/test_ftpd.py

Issue 16429: python based ftp server (Closed) Base URL: http://src.chromium.org/svn/trunk/src/
Patch Set: '' Created 11 years, 12 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « third_party/pyftpdlib/setup.py ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 #!/usr/bin/env python
2 # test_ftpd.py
3
4 # ======================================================================
5 # Copyright (C) 2007 Giampaolo Rodola' <g.rodola@gmail.com>
6 #
7 # All Rights Reserved
8 #
9 # Permission to use, copy, modify, and distribute this software and
10 # its documentation for any purpose and without fee is hereby
11 # granted, provided that the above copyright notice appear in all
12 # copies and that both that copyright notice and this permission
13 # notice appear in supporting documentation, and that the name of
14 # Giampaolo Rodola' not be used in advertising or publicity pertaining to
15 # distribution of the software without specific, written prior
16 # permission.
17 #
18 # Giampaolo Rodola' DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
19 # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
20 # NO EVENT Giampaolo Rodola' BE LIABLE FOR ANY SPECIAL, INDIRECT OR
21 # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
22 # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
23 # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
24 # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
25 # ======================================================================
26
27
28 # This test suite has been run successfully on the following systems:
29 #
30 # -----------------------------------------------------------
31 # System | Python version
32 # -----------------------------------------------------------
33 # Linux Ubuntu 2.6.20-15 | 2.4, 2.5
34 # Linux Kubuntu 8.04 32 & 64 bits | 2.5.2
35 # Linux Debian 2.4.27-2-386 | 2.3.5
36 # Windows XP prof SP3 | 2.3, 2.4, 2.5, 2.6-RC2
37 # Windows Vista Ultimate 64 bit | 2.5.1
38 # Windows Vista Business 32 bit | 2.5.1
39 # Windows Server 2008 64bit | 2.5.1
40 # Windows Mobile 6.1 | PythonCE 2.5
41 # OS X 10.4.10 | 2.3, 2.4, 2.5
42 # FreeBSD 7.0 | 2.4, 2.5
43 # -----------------------------------------------------------
44
45
46 import threading
47 import unittest
48 import socket
49 import os
50 import time
51 import re
52 import tempfile
53 import ftplib
54 import random
55 import warnings
56 import sys
57
58 from pyftpdlib import ftpserver
59
60
61 __release__ = 'pyftpdlib 0.5.0'
62
63 # Attempt to use IP rather than hostname (test suite will run a lot faster)
64 try:
65 HOST = socket.gethostbyname('localhost')
66 except socket.error:
67 HOST = 'localhost'
68 USER = 'user'
69 PASSWD = '12345'
70 HOME = os.getcwd()
71 try:
72 from test.test_support import TESTFN
73 except ImportError:
74 TESTFN = 'temp-fname'
75 TESTFN2 = TESTFN + '2'
76 TESTFN3 = TESTFN + '3'
77
78 def try_address(host, port=0):
79 """Try to bind a daemon on the given host:port and return True
80 if that has been possible."""
81 try:
82 ftpserver.FTPServer((host, port), None)
83 except socket.error:
84 return False
85 else:
86 return True
87
88 SUPPORTS_IPV4 = try_address('127.0.0.1')
89 SUPPORTS_IPV6 = socket.has_ipv6 and try_address('::1')
90
91
92 class TestAbstractedFS(unittest.TestCase):
93 """Test for conversion utility methods of AbstractedFS class."""
94
95 def test_ftpnorm(self):
96 # Tests for ftpnorm method.
97 ae = self.assertEquals
98 fs = ftpserver.AbstractedFS()
99
100 fs.cwd = '/'
101 ae(fs.ftpnorm(''), '/')
102 ae(fs.ftpnorm('/'), '/')
103 ae(fs.ftpnorm('.'), '/')
104 ae(fs.ftpnorm('..'), '/')
105 ae(fs.ftpnorm('a'), '/a')
106 ae(fs.ftpnorm('/a'), '/a')
107 ae(fs.ftpnorm('/a/'), '/a')
108 ae(fs.ftpnorm('a/..'), '/')
109 ae(fs.ftpnorm('a/b'), '/a/b')
110 ae(fs.ftpnorm('a/b/..'), '/a')
111 ae(fs.ftpnorm('a/b/../..'), '/')
112 fs.cwd = '/sub'
113 ae(fs.ftpnorm(''), '/sub')
114 ae(fs.ftpnorm('/'), '/')
115 ae(fs.ftpnorm('.'), '/sub')
116 ae(fs.ftpnorm('..'), '/')
117 ae(fs.ftpnorm('a'), '/sub/a')
118 ae(fs.ftpnorm('a/'), '/sub/a')
119 ae(fs.ftpnorm('a/..'), '/sub')
120 ae(fs.ftpnorm('a/b'), '/sub/a/b')
121 ae(fs.ftpnorm('a/b/'), '/sub/a/b')
122 ae(fs.ftpnorm('a/b/..'), '/sub/a')
123 ae(fs.ftpnorm('a/b/../..'), '/sub')
124 ae(fs.ftpnorm('a/b/../../..'), '/')
125 ae(fs.ftpnorm('//'), '/') # UNC paths must be collapsed
126
127 def test_ftp2fs(self):
128 # Tests for ftp2fs method.
129 ae = self.assertEquals
130 fs = ftpserver.AbstractedFS()
131 join = lambda x, y: os.path.join(x, y.replace('/', os.sep))
132
133 def goforit(root):
134 fs.root = root
135 fs.cwd = '/'
136 ae(fs.ftp2fs(''), root)
137 ae(fs.ftp2fs('/'), root)
138 ae(fs.ftp2fs('.'), root)
139 ae(fs.ftp2fs('..'), root)
140 ae(fs.ftp2fs('a'), join(root, 'a'))
141 ae(fs.ftp2fs('/a'), join(root, 'a'))
142 ae(fs.ftp2fs('/a/'), join(root, 'a'))
143 ae(fs.ftp2fs('a/..'), root)
144 ae(fs.ftp2fs('a/b'), join(root, r'a/b'))
145 ae(fs.ftp2fs('/a/b'), join(root, r'a/b'))
146 ae(fs.ftp2fs('/a/b/..'), join(root, 'a'))
147 ae(fs.ftp2fs('/a/b/../..'), root)
148 fs.cwd = '/sub'
149 ae(fs.ftp2fs(''), join(root, 'sub'))
150 ae(fs.ftp2fs('/'), root)
151 ae(fs.ftp2fs('.'), join(root, 'sub'))
152 ae(fs.ftp2fs('..'), root)
153 ae(fs.ftp2fs('a'), join(root, 'sub/a'))
154 ae(fs.ftp2fs('a/'), join(root, 'sub/a'))
155 ae(fs.ftp2fs('a/..'), join(root, 'sub'))
156 ae(fs.ftp2fs('a/b'), join(root, 'sub/a/b'))
157 ae(fs.ftp2fs('a/b/..'), join(root, 'sub/a'))
158 ae(fs.ftp2fs('a/b/../..'), join(root, 'sub'))
159 ae(fs.ftp2fs('a/b/../../..'), root)
160 ae(fs.ftp2fs('//a'), join(root, 'a')) # UNC paths must be collapsed
161
162 if os.sep == '\\':
163 goforit(r'C:\dir')
164 goforit('C:\\')
165 # on DOS-derived filesystems (e.g. Windows) this is the same
166 # as specifying the current drive directory (e.g. 'C:\\')
167 goforit('\\')
168 elif os.sep == '/':
169 goforit('/home/user')
170 goforit('/')
171 else:
172 # os.sep == ':'? Don't know... let's try it anyway
173 goforit(os.getcwd())
174
175 def test_fs2ftp(self):
176 # Tests for fs2ftp method.
177 ae = self.assertEquals
178 fs = ftpserver.AbstractedFS()
179 join = lambda x, y: os.path.join(x, y.replace('/', os.sep))
180
181 def goforit(root):
182 fs.root = root
183 ae(fs.fs2ftp(root), '/')
184 ae(fs.fs2ftp(join(root, '/')), '/')
185 ae(fs.fs2ftp(join(root, '.')), '/')
186 ae(fs.fs2ftp(join(root, '..')), '/') # can't escape from root
187 ae(fs.fs2ftp(join(root, 'a')), '/a')
188 ae(fs.fs2ftp(join(root, 'a/')), '/a')
189 ae(fs.fs2ftp(join(root, 'a/..')), '/')
190 ae(fs.fs2ftp(join(root, 'a/b')), '/a/b')
191 ae(fs.fs2ftp(join(root, 'a/b')), '/a/b')
192 ae(fs.fs2ftp(join(root, 'a/b/..')), '/a')
193 ae(fs.fs2ftp(join(root, '/a/b/../..')), '/')
194 fs.cwd = '/sub'
195 ae(fs.fs2ftp(join(root, 'a/')), '/a')
196
197 if os.sep == '\\':
198 goforit(r'C:\dir')
199 goforit('C:\\')
200 # on DOS-derived filesystems (e.g. Windows) this is the same
201 # as specifying the current drive directory (e.g. 'C:\\')
202 goforit('\\')
203 fs.root = r'C:\dir'
204 ae(fs.fs2ftp('C:\\'), '/')
205 ae(fs.fs2ftp('D:\\'), '/')
206 ae(fs.fs2ftp('D:\\dir'), '/')
207 elif os.sep == '/':
208 goforit('/')
209 assert os.path.realpath('/__home/user') == '/__home/user', \
210 'Test skipped (symlinks not allowed).'
211 goforit('/__home/user')
212 fs.root = '/__home/user'
213 ae(fs.fs2ftp('/__home'), '/')
214 ae(fs.fs2ftp('/'), '/')
215 ae(fs.fs2ftp('/__home/userx'), '/')
216 else:
217 # os.sep == ':'? Don't know... let's try it anyway
218 goforit(os.getcwd())
219
220 def test_validpath(self):
221 # Tests for validpath method.
222 fs = ftpserver.AbstractedFS()
223 fs.root = HOME
224 self.failUnless(fs.validpath(HOME))
225 self.failUnless(fs.validpath(HOME + '/'))
226 self.failIf(fs.validpath(HOME + 'xxx'))
227
228 if hasattr(os, 'symlink'):
229 # Tests for validpath on systems supporting symbolic links.
230
231 def _safe_remove(self, path):
232 # convenience function for removing temporary files
233 try:
234 os.remove(path)
235 except os.error:
236 pass
237
238 def test_validpath_validlink(self):
239 # Test validpath by issuing a symlink pointing to a path
240 # inside the root directory.
241 fs = ftpserver.AbstractedFS()
242 fs.root = HOME
243 try:
244 open(TESTFN, 'w')
245 os.symlink(TESTFN, TESTFN2)
246 self.failUnless(fs.validpath(TESTFN))
247 finally:
248 self._safe_remove(TESTFN)
249 self._safe_remove(TESTFN2)
250
251 def test_validpath_external_symlink(self):
252 # Test validpath by issuing a symlink pointing to a path
253 # outside the root directory.
254 fs = ftpserver.AbstractedFS()
255 fs.root = HOME
256 try:
257 # tempfile should create our file in /tmp directory
258 # which should be outside the user root. If it is not
259 # we just skip the test.
260 file = tempfile.NamedTemporaryFile()
261 if HOME == os.path.dirname(file.name):
262 return
263 os.symlink(file.name, TESTFN)
264 self.failIf(fs.validpath(TESTFN))
265 finally:
266 self._safe_remove(TESTFN)
267 file.close()
268
269
270 class TestDummyAuthorizer(unittest.TestCase):
271 """Tests for DummyAuthorizer class."""
272
273 # temporarily change warnings to exceptions for the purposes of testing
274 def setUp(self):
275 self.tempdir = tempfile.mkdtemp(dir=HOME)
276 self.subtempdir = tempfile.mkdtemp(dir=os.path.join(HOME, self.tempdir))
277 self.tempfile = open(os.path.join(self.tempdir, TESTFN), 'w').name
278 self.subtempfile = open(os.path.join(self.subtempdir, TESTFN), 'w').name
279 warnings.filterwarnings("error")
280
281 def tearDown(self):
282 os.remove(self.tempfile)
283 os.remove(self.subtempfile)
284 os.rmdir(self.subtempdir)
285 os.rmdir(self.tempdir)
286 warnings.resetwarnings()
287
288 def assertRaisesWithMsg(self, excClass, msg, callableObj, *args, **kwargs):
289 try:
290 callableObj(*args, **kwargs)
291 except excClass, why:
292 if str(why) == msg:
293 return
294 raise self.failureException("%s != %s" %(str(why), msg))
295 else:
296 if hasattr(excClass,'__name__'): excName = excClass.__name__
297 else: excName = str(excClass)
298 raise self.failureException, "%s not raised" % excName
299
300 def test_common_methods(self):
301 auth = ftpserver.DummyAuthorizer()
302 # create user
303 auth.add_user(USER, PASSWD, HOME)
304 auth.add_anonymous(HOME)
305 # check credentials
306 self.failUnless(auth.validate_authentication(USER, PASSWD))
307 self.failIf(auth.validate_authentication(USER, 'wrongpwd'))
308 # remove them
309 auth.remove_user(USER)
310 auth.remove_user('anonymous')
311 # raise exc if user does not exists
312 self.assertRaises(KeyError, auth.remove_user, USER)
313 # raise exc if path does not exist
314 self.assertRaisesWithMsg(ftpserver.AuthorizerError,
315 'No such directory: "%s"' %'?:\\',
316 auth.add_user, USER, PASSWD, '?:\\')
317 self.assertRaisesWithMsg(ftpserver.AuthorizerError,
318 'No such directory: "%s"' %'?:\\',
319 auth.add_anonymous, '?:\\')
320 # raise exc if user already exists
321 auth.add_user(USER, PASSWD, HOME)
322 auth.add_anonymous(HOME)
323 self.assertRaisesWithMsg(ftpserver.AuthorizerError,
324 'User "%s" already exists' %USER,
325 auth.add_user, USER, PASSWD, HOME)
326 self.assertRaisesWithMsg(ftpserver.AuthorizerError,
327 'User "anonymous" already exists',
328 auth.add_anonymous, HOME)
329 auth.remove_user(USER)
330 auth.remove_user('anonymous')
331 # raise on wrong permission
332 self.assertRaisesWithMsg(ftpserver.AuthorizerError,
333 'No such permission "?"',
334 auth.add_user, USER, PASSWD, HOME, perm='?')
335 self.assertRaisesWithMsg(ftpserver.AuthorizerError,
336 'No such permission "?"',
337 auth.add_anonymous, HOME, perm='?')
338 # expect warning on write permissions assigned to anonymous user
339 for x in "adfmw":
340 self.assertRaisesWithMsg(RuntimeWarning,
341 "Write permissions assigned to anonymous user.",
342 auth.add_anonymous, HOME, perm=x)
343
344 def test_override_perm_interface(self):
345 auth = ftpserver.DummyAuthorizer()
346 auth.add_user(USER, PASSWD, HOME, perm='elr')
347 # raise exc if user does not exists
348 self.assertRaises(KeyError, auth.override_perm, USER+'w', HOME, 'elr')
349 # raise exc if path does not exist or it's not a directory
350 self.assertRaisesWithMsg(ftpserver.AuthorizerError,
351 'No such directory: "%s"' %'?:\\',
352 auth.override_perm, USER, '?:\\', 'elr')
353 self.assertRaisesWithMsg(ftpserver.AuthorizerError,
354 'No such directory: "%s"' %self.tempfile,
355 auth.override_perm, USER, self.tempfile, 'elr')
356 # raise on wrong permission
357 self.assertRaisesWithMsg(ftpserver.AuthorizerError,
358 'No such permission "?"', auth.override_perm,
359 USER, HOME, perm='?')
360 # expect warning on write permissions assigned to anonymous user
361 auth.add_anonymous(HOME)
362 for p in "adfmw":
363 self.assertRaisesWithMsg(RuntimeWarning,
364 "Write permissions assigned to anonymous user.",
365 auth.override_perm, 'anonymous', HOME, p)
366 # raise on attempt to override home directory permissions
367 self.assertRaisesWithMsg(ftpserver.AuthorizerError,
368 "Can't override home directory permissions",
369 auth.override_perm, USER, HOME, perm='w')
370 # raise on attempt to override a path escaping home directory
371 if os.path.dirname(HOME) != HOME:
372 self.assertRaisesWithMsg(ftpserver.AuthorizerError,
373 "Path escapes user home directory",
374 auth.override_perm, USER,
375 os.path.dirname(HOME), perm='w')
376 # try to re-set an overridden permission
377 auth.override_perm(USER, self.tempdir, perm='w')
378 auth.override_perm(USER, self.tempdir, perm='wr')
379
380 def test_override_perm_recursive_paths(self):
381 auth = ftpserver.DummyAuthorizer()
382 auth.add_user(USER, PASSWD, HOME, perm='elr')
383 self.assert_(auth.has_perm(USER, 'w', self.tempdir) is False)
384 auth.override_perm(USER, self.tempdir, perm='w', recursive=True)
385 self.assert_(auth.has_perm(USER, 'w', HOME) is False)
386 self.assert_(auth.has_perm(USER, 'w', self.tempdir) is True)
387 self.assert_(auth.has_perm(USER, 'w', self.tempfile) is True)
388 self.assert_(auth.has_perm(USER, 'w', self.subtempdir) is True)
389 self.assert_(auth.has_perm(USER, 'w', self.subtempfile) is True)
390
391 self.assert_(auth.has_perm(USER, 'w', HOME + '@') is False)
392 self.assert_(auth.has_perm(USER, 'w', self.tempdir + '@') is False)
393 path = os.path.join(self.tempdir + '@', os.path.basename(self.tempfile))
394 self.assert_(auth.has_perm(USER, 'w', path) is False)
395 # test case-sensitiveness
396 if (os.name in ('nt', 'ce')) or (sys.platform == 'cygwin'):
397 self.assert_(auth.has_perm(USER, 'w', self.tempdir.upper()) is True)
398
399 def test_override_perm_not_recursive_paths(self):
400 auth = ftpserver.DummyAuthorizer()
401 auth.add_user(USER, PASSWD, HOME, perm='elr')
402 self.assert_(auth.has_perm(USER, 'w', self.tempdir) is False)
403 auth.override_perm(USER, self.tempdir, perm='w')
404 self.assert_(auth.has_perm(USER, 'w', HOME) is False)
405 self.assert_(auth.has_perm(USER, 'w', self.tempdir) is True)
406 self.assert_(auth.has_perm(USER, 'w', self.tempfile) is True)
407 self.assert_(auth.has_perm(USER, 'w', self.subtempdir) is False)
408 self.assert_(auth.has_perm(USER, 'w', self.subtempfile) is False)
409
410 self.assert_(auth.has_perm(USER, 'w', HOME + '@') is False)
411 self.assert_(auth.has_perm(USER, 'w', self.tempdir + '@') is False)
412 path = os.path.join(self.tempdir + '@', os.path.basename(self.tempfile))
413 self.assert_(auth.has_perm(USER, 'w', path) is False)
414 # test case-sensitiveness
415 if (os.name in ('nt', 'ce')) or (sys.platform == 'cygwin'):
416 self.assert_(auth.has_perm(USER, 'w', self.tempdir.upper()) is True)
417
418
419 class TestCallLater(unittest.TestCase):
420 """Tests for CallLater class."""
421
422 def setUp(self):
423 for task in ftpserver._tasks:
424 if not task.cancelled:
425 task.cancel()
426 del ftpserver._tasks[:]
427
428 def scheduler(self, timeout=0.01, count=100):
429 while ftpserver._tasks and count > 0:
430 ftpserver._scheduler()
431 count -= 1
432 time.sleep(timeout)
433
434 def test_interface(self):
435 fun = lambda: 0
436 self.assertRaises(AssertionError, ftpserver.CallLater, -1, fun)
437 x = ftpserver.CallLater(3, fun)
438 self.assertRaises(AssertionError, x.delay, -1)
439 self.assert_(x.cancelled is False)
440 x.cancel()
441 self.assert_(x.cancelled is True)
442 self.assertRaises(AssertionError, x.call)
443 self.assertRaises(AssertionError, x.reset)
444 self.assertRaises(AssertionError, x.delay, 2)
445 self.assertRaises(AssertionError, x.cancel)
446
447 def test_order(self):
448 l = []
449 fun = lambda x: l.append(x)
450 for x in [0.05, 0.04, 0.03, 0.02, 0.01]:
451 ftpserver.CallLater(x, fun, x)
452 self.scheduler()
453 self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05])
454
455 def test_delay(self):
456 l = []
457 fun = lambda x: l.append(x)
458 ftpserver.CallLater(0.01, fun, 0.01).delay(0.07)
459 ftpserver.CallLater(0.02, fun, 0.02).delay(0.08)
460 ftpserver.CallLater(0.03, fun, 0.03)
461 ftpserver.CallLater(0.04, fun, 0.04)
462 ftpserver.CallLater(0.05, fun, 0.05)
463 ftpserver.CallLater(0.06, fun, 0.06).delay(0.001)
464 self.scheduler()
465 self.assertEqual(l, [0.06, 0.03, 0.04, 0.05, 0.01, 0.02])
466
467 def test_reset(self):
468 # will fail on such systems where time.time() does not provide
469 # time with a better precision than 1 second.
470 l = []
471 fun = lambda x: l.append(x)
472 ftpserver.CallLater(0.01, fun, 0.01)
473 ftpserver.CallLater(0.02, fun, 0.02)
474 ftpserver.CallLater(0.03, fun, 0.03)
475 x = ftpserver.CallLater(0.04, fun, 0.04)
476 ftpserver.CallLater(0.05, fun, 0.05)
477 time.sleep(0.1)
478 x.reset()
479 self.scheduler()
480 self.assertEqual(l, [0.01, 0.02, 0.03, 0.05, 0.04])
481
482 def test_cancel(self):
483 l = []
484 fun = lambda x: l.append(x)
485 ftpserver.CallLater(0.01, fun, 0.01).cancel()
486 ftpserver.CallLater(0.02, fun, 0.02)
487 ftpserver.CallLater(0.03, fun, 0.03)
488 ftpserver.CallLater(0.04, fun, 0.04)
489 ftpserver.CallLater(0.05, fun, 0.05).cancel()
490 self.scheduler()
491 self.assertEqual(l, [0.02, 0.03, 0.04])
492
493
494 class TestFtpAuthentication(unittest.TestCase):
495 "test: USER, PASS, REIN."
496
497 def setUp(self):
498 self.server = FTPd()
499 self.server.start()
500 self.client = ftplib.FTP()
501 self.client.connect(self.server.host, self.server.port)
502 self.f1 = open(TESTFN, 'w+b')
503 self.f2 = open(TESTFN2, 'w+b')
504
505 def tearDown(self):
506 self.client.close()
507 self.server.stop()
508 if not self.f1.closed:
509 self.f1.close()
510 if not self.f2.closed:
511 self.f2.close()
512 os.remove(TESTFN)
513 os.remove(TESTFN2)
514
515 def test_auth_ok(self):
516 self.client.login(user=USER, passwd=PASSWD)
517
518 def test_anon_auth(self):
519 self.client.login(user='anonymous', passwd='anon@')
520 self.client.login(user='AnonYmoUs', passwd='anon@')
521 self.client.login(user='anonymous', passwd='')
522
523 # Commented after delayed response on wrong credentials has been
524 # introduced because tests take too much to complete.
525
526 ## def test_auth_failed(self):
527 ## self.assertRaises(ftplib.error_perm, self.client.login, USER, 'wrong')
528 ## self.assertRaises(ftplib.error_perm, self.client.login, 'wrong', PASSW D)
529 ## self.assertRaises(ftplib.error_perm, self.client.login, 'wrong', 'wron g')
530
531 ## def test_max_auth(self):
532 ## self.assertRaises(ftplib.error_perm, self.client.login, USER, 'wrong')
533 ## self.assertRaises(ftplib.error_perm, self.client.login, USER, 'wrong')
534 ## self.assertRaises(ftplib.error_perm, self.client.login, USER, 'wrong')
535 ## # If authentication fails for 3 times ftpd disconnects the
536 ## # client. We can check if that happens by using self.client.sendcmd()
537 ## # on the 'dead' socket object. If socket object is really
538 ## # closed it should be raised a socket.error exception (Windows)
539 ## # or a EOFError exception (Linux).
540 ## self.assertRaises((socket.error, EOFError), self.client.sendcmd, '')
541
542 def test_rein(self):
543 self.client.login(user=USER, passwd=PASSWD)
544 self.client.sendcmd('rein')
545 # user not authenticated, error response expected
546 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pwd')
547 # by logging-in again we should be able to execute a
548 # file-system command
549 self.client.login(user=USER, passwd=PASSWD)
550 self.client.sendcmd('pwd')
551
552 def test_rein_during_transfer(self):
553 self.client.login(user=USER, passwd=PASSWD)
554 data = 'abcde12345' * 100000
555 self.f1.write(data)
556 self.f1.close()
557
558 self.client.voidcmd('TYPE I')
559 conn = self.client.transfercmd('retr ' + TESTFN)
560 rein_sent = 0
561 while 1:
562 chunk = conn.recv(8192)
563 if not chunk:
564 break
565 self.f2.write(chunk)
566 if not rein_sent:
567 rein_sent = 1
568 # flush account, error response expected
569 self.client.sendcmd('rein')
570 self.assertRaises(ftplib.error_perm, self.client.dir)
571
572 # a 226 response is expected once tranfer finishes
573 self.assertEqual(self.client.voidresp()[:3], '226')
574 # account is still flushed, error response is still expected
575 self.assertRaises(ftplib.error_perm, self.client.sendcmd,
576 'size ' + TESTFN)
577 # by logging-in again we should be able to execute a
578 # filesystem command
579 self.client.login(user=USER, passwd=PASSWD)
580 self.client.sendcmd('pwd')
581 self.f2.seek(0)
582 self.assertEqual(hash(data), hash (self.f2.read()))
583
584 def test_user(self):
585 # Test USER while already authenticated and no transfer
586 # is in progress.
587 self.client.login(user=USER, passwd=PASSWD)
588 self.client.sendcmd('user ' + USER) # authentication flushed
589 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pwd')
590 self.client.sendcmd('pass ' + PASSWD)
591 self.client.sendcmd('pwd')
592
593 def test_user_on_transfer(self):
594 # Test USER while already authenticated and a transfer is
595 # in progress.
596 self.client.login(user=USER, passwd=PASSWD)
597 data = 'abcde12345' * 100000
598 self.f1.write(data)
599 self.f1.close()
600
601 self.client.voidcmd('TYPE I')
602 conn = self.client.transfercmd('retr ' + TESTFN)
603 rein_sent = 0
604 while 1:
605 chunk = conn.recv(8192)
606 if not chunk:
607 break
608 self.f2.write(chunk)
609 # stop transfer while it isn't finished yet
610 if not rein_sent:
611 rein_sent = 1
612 # flush account, expect an error response
613 self.client.sendcmd('user ' + USER)
614 self.assertRaises(ftplib.error_perm, self.client.dir)
615
616 # a 226 response is expected once tranfer finishes
617 self.assertEqual(self.client.voidresp()[:3], '226')
618 # account is still flushed, error response is still expected
619 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pwd')
620 # by logging-in again we should be able to execute a
621 # filesystem command
622 self.client.sendcmd('pass ' + PASSWD)
623 self.client.sendcmd('pwd')
624 self.f2.seek(0)
625 self.assertEqual(hash(data), hash (self.f2.read()))
626
627
628 class TestFtpDummyCmds(unittest.TestCase):
629 "test: TYPE, STRU, MODE, NOOP, SYST, ALLO, HELP"
630
631 def setUp(self):
632 self.server = FTPd()
633 self.server.start()
634 self.client = ftplib.FTP()
635 self.client.connect(self.server.host, self.server.port)
636 self.client.login(USER, PASSWD)
637
638 def tearDown(self):
639 self.client.close()
640 self.server.stop()
641
642 def test_type(self):
643 self.client.sendcmd('type a')
644 self.client.sendcmd('type i')
645 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'type ?!?')
646
647 def test_stru(self):
648 self.client.sendcmd('stru f')
649 self.client.sendcmd('stru F')
650 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stru ?!?')
651
652 def test_mode(self):
653 self.client.sendcmd('mode s')
654 self.client.sendcmd('mode S')
655 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'mode ?!?')
656
657 def test_noop(self):
658 self.client.sendcmd('noop')
659
660 def test_syst(self):
661 self.client.sendcmd('syst')
662
663 def test_allo(self):
664 self.client.sendcmd('allo x')
665
666 def test_quit(self):
667 self.client.sendcmd('quit')
668
669 def test_help(self):
670 self.client.sendcmd('help')
671 cmd = random.choice(ftpserver.proto_cmds.keys())
672 self.client.sendcmd('help %s' %cmd)
673 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'help ?!?')
674
675 def test_rest(self):
676 # test error conditions only;
677 # restored data-transfer is tested later
678 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'rest')
679 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'rest str')
680 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'rest -1')
681
682 def test_opts_feat(self):
683 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'opts mlst bad _fact')
684 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'opts mlst typ e ;')
685 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'opts not_mlst ')
686 # utility function which used for extracting the MLST "facts"
687 # string from the FEAT response
688 def mlst():
689 resp = self.client.sendcmd('feat')
690 return re.search(r'^\s*MLST\s+(\S+)$', resp, re.MULTILINE).group(1)
691 # we rely on "type", "perm", "size", and "modify" facts which
692 # are those available on all platforms
693 self.failUnless('type*;perm*;size*;modify*;' in mlst())
694 self.assertEqual(self.client.sendcmd('opts mlst type;'), '200 MLST OPTS type;')
695 self.assertEqual(self.client.sendcmd('opts mLSt TypE;'), '200 MLST OPTS type;')
696 self.failUnless('type*;perm;size;modify;' in mlst())
697
698 self.assertEqual(self.client.sendcmd('opts mlst'), '200 MLST OPTS ')
699 self.failUnless(not '*' in mlst())
700
701 self.assertEqual(self.client.sendcmd('opts mlst fish;cakes;'), '200 MLST OPTS ')
702 self.failUnless(not '*' in mlst())
703 self.assertEqual(self.client.sendcmd('opts mlst fish;cakes;type;'), \
704 '200 MLST OPTS type;')
705 self.failUnless('type*;perm;size;modify;' in mlst())
706
707
708 class TestFtpCmdsSemantic(unittest.TestCase):
709
710 arg_cmds = ('allo','appe','dele','eprt','mdtm','mode','mkd','opts','port',
711 'rest','retr','rmd','rnfr','rnto','size', 'stor', 'stru','type',
712 'user','xmkd','xrmd')
713
714 def setUp(self):
715 self.server = FTPd()
716 self.server.start()
717 self.client = ftplib.FTP()
718 self.client.connect(self.server.host, self.server.port)
719 self.client.login(USER, PASSWD)
720
721 def tearDown(self):
722 self.client.close()
723 self.server.stop()
724
725 def test_arg_cmds(self):
726 # test commands requiring an argument
727 expected = "501 Syntax error: command needs an argument."
728 for cmd in self.arg_cmds:
729 self.client.putcmd(cmd)
730 resp = self.client.getmultiline()
731 self.assertEqual(resp, expected)
732
733 def test_no_arg_cmds(self):
734 # test commands accepting no arguments
735 expected = "501 Syntax error: command does not accept arguments."
736 for cmd in ('abor','cdup','feat','noop','pasv','pwd','quit','rein',
737 'syst','xcup','xpwd'):
738 self.client.putcmd(cmd + ' arg')
739 resp = self.client.getmultiline()
740 self.assertEqual(resp, expected)
741
742 def test_auth_cmds(self):
743 # test those commands requiring client to be authenticated
744 expected = "530 Log in with USER and PASS first."
745 self.client.sendcmd('rein')
746 for cmd in ftpserver.proto_cmds:
747 cmd = cmd.lower()
748 if cmd in ('feat','help','noop','user','pass','stat','syst','quit'):
749 continue
750 if cmd in self.arg_cmds:
751 cmd = cmd + ' arg'
752 self.client.putcmd(cmd)
753 resp = self.client.getmultiline()
754 self.assertEqual(resp, expected)
755
756 def test_no_auth_cmds(self):
757 # test those commands that do not require client to be authenticated
758 self.client.sendcmd('rein')
759 for cmd in ('feat','help','noop','stat','syst'):
760 self.client.sendcmd(cmd)
761 # STAT provided with an argument is equal to LIST hence not allowed
762 # if not authenticated
763 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stat /')
764 self.client.sendcmd('quit')
765
766
767 class TestFtpFsOperations(unittest.TestCase):
768 "test: PWD, CWD, CDUP, SIZE, RNFR, RNTO, DELE, MKD, RMD, MDTM, STAT"
769
770 def setUp(self):
771 self.server = FTPd()
772 self.server.start()
773 self.client = ftplib.FTP()
774 self.client.connect(self.server.host, self.server.port)
775 self.client.login(USER, PASSWD)
776 self.tempfile = os.path.basename(open(TESTFN, 'w+b').name)
777 self.tempdir = os.path.basename(tempfile.mktemp(dir=HOME))
778 os.mkdir(self.tempdir)
779
780 def tearDown(self):
781 self.client.close()
782 self.server.stop()
783 if os.path.exists(self.tempfile):
784 os.remove(self.tempfile)
785 if os.path.exists(self.tempdir):
786 os.rmdir(self.tempdir)
787
788 def test_cwd(self):
789 self.client.cwd(self.tempdir)
790 self.assertEqual(self.client.pwd(), '/' + self.tempdir)
791 self.assertRaises(ftplib.error_perm, self.client.cwd, 'subtempdir')
792 # cwd provided with no arguments is supposed to move us to the
793 # root directory
794 self.client.sendcmd('cwd')
795 self.assertEqual(self.client.pwd(), '/')
796
797 def test_pwd(self):
798 self.assertEqual(self.client.pwd(), '/')
799 self.client.cwd(self.tempdir)
800 self.assertEqual(self.client.pwd(), '/' + self.tempdir)
801
802 def test_cdup(self):
803 self.client.cwd(self.tempdir)
804 self.assertEqual(self.client.pwd(), '/' + self.tempdir)
805 self.client.sendcmd('cdup')
806 self.assertEqual(self.client.pwd(), '/')
807 # make sure we can't escape from root directory
808 self.client.sendcmd('cdup')
809 self.assertEqual(self.client.pwd(), '/')
810
811 def test_mkd(self):
812 tempdir = os.path.basename(tempfile.mktemp(dir=HOME))
813 self.client.mkd(tempdir)
814 # make sure we can't create directories which already exist
815 # (probably not really necessary);
816 # let's use a try/except statement to avoid leaving behind
817 # orphaned temporary directory in the event of a test failure.
818 try:
819 self.client.mkd(tempdir)
820 except ftplib.error_perm:
821 os.rmdir(tempdir) # ok
822 else:
823 self.fail('ftplib.error_perm not raised.')
824
825 def test_rmd(self):
826 self.client.rmd(self.tempdir)
827 self.assertRaises(ftplib.error_perm, self.client.rmd, self.tempfile)
828 # make sure we can't remove the root directory
829 self.assertRaises(ftplib.error_perm, self.client.rmd, '/')
830
831 def test_dele(self):
832 self.client.delete(self.tempfile)
833 self.assertRaises(ftplib.error_perm, self.client.delete, self.tempdir)
834
835 def test_rnfr_rnto(self):
836 # rename file
837 tempname = os.path.basename(tempfile.mktemp(dir=HOME))
838 self.client.rename(self.tempfile, tempname)
839 self.client.rename(tempname, self.tempfile)
840 # rename dir
841 tempname = os.path.basename(tempfile.mktemp(dir=HOME))
842 self.client.rename(self.tempdir, tempname)
843 self.client.rename(tempname, self.tempdir)
844 # rnfr/rnto over non-existing paths
845 bogus = os.path.basename(tempfile.mktemp(dir=HOME))
846 self.assertRaises(ftplib.error_perm, self.client.rename, bogus, '/x')
847 self.assertRaises(ftplib.error_perm, self.client.rename, self.tempfile, '/')
848 # make sure we can't rename root directory
849 self.assertRaises(ftplib.error_perm, self.client.rename, '/', '/x')
850
851 def test_mdtm(self):
852 self.client.sendcmd('mdtm ' + self.tempfile)
853 # make sure we can't use mdtm against directories
854 try:
855 self.client.sendcmd('mdtm ' + self.tempdir)
856 except ftplib.error_perm, err:
857 self.failUnless("not retrievable" in str(err))
858 else:
859 self.fail('Exception not raised')
860
861 def test_size(self):
862 self.client.size(self.tempfile)
863 # make sure we can't use size against directories
864 try:
865 self.client.sendcmd('size ' + self.tempdir)
866 except ftplib.error_perm, err:
867 self.failUnless("not retrievable" in str(err))
868 else:
869 self.fail('Exception not raised')
870
871
872 class TestFtpRetrieveData(unittest.TestCase):
873 "test: RETR, REST, LIST, NLST, argumented STAT"
874
875 def setUp(self):
876 self.server = FTPd()
877 self.server.start()
878 self.client = ftplib.FTP()
879 self.client.connect(self.server.host, self.server.port)
880 self.client.login(USER, PASSWD)
881 self.f1 = open(TESTFN, 'w+b')
882 self.f2 = open(TESTFN2, 'w+b')
883
884 def tearDown(self):
885 self.client.close()
886 self.server.stop()
887 if not self.f1.closed:
888 self.f1.close()
889 if not self.f2.closed:
890 self.f2.close()
891 os.remove(TESTFN)
892 os.remove(TESTFN2)
893
894 def test_retr(self):
895 data = 'abcde12345' * 100000
896 self.f1.write(data)
897 self.f1.close()
898 self.client.retrbinary("retr " + TESTFN, self.f2.write)
899 self.f2.seek(0)
900 self.assertEqual(hash(data), hash(self.f2.read()))
901
902 def test_restore_on_retr(self):
903 data = 'abcde12345' * 100000
904 fname_1 = os.path.basename(self.f1.name)
905 self.f1.write(data)
906 self.f1.close()
907
908 # look at ftplib.FTP.retrbinary method to understand this mess
909 self.client.voidcmd('TYPE I')
910 conn = self.client.transfercmd('retr ' + fname_1)
911 chunk = conn.recv(len(data) / 2)
912 self.f2.write(chunk)
913 conn.close()
914 # transfer wasn't finished yet so we expect a 426 response
915 self.assertRaises(ftplib.error_temp, self.client.voidresp)
916
917 # resuming transfer by using a marker value greater than the
918 # file size stored on the server should result in an error
919 # on retr (RFC-1123)
920 file_size = self.client.size(fname_1)
921 self.client.sendcmd('rest %s' %((file_size + 1)))
922 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'retr ' + fnam e_1)
923
924 # test resume
925 self.client.sendcmd('rest %s' %len(chunk))
926 self.client.retrbinary("retr " + fname_1, self.f2.write)
927 self.f2.seek(0)
928 self.assertEqual(hash(data), hash (self.f2.read()))
929
930 def _test_listing_cmds(self, cmd):
931 """Tests common to LIST NLST and MLSD commands."""
932 # assume that no argument has the same meaning of "/"
933 l1 = l2 = []
934 self.client.retrlines(cmd, l1.append)
935 self.client.retrlines(cmd + ' /', l2.append)
936 self.assertEqual(l1, l2)
937 if cmd.lower() != 'mlsd':
938 # if pathname is a file one line is expected
939 x = []
940 self.client.retrlines('%s ' %cmd + TESTFN, x.append)
941 self.assertEqual(len(x), 1)
942 self.failUnless(''.join(x).endswith(TESTFN))
943 # non-existent path, 550 response is expected
944 bogus = os.path.basename(tempfile.mktemp(dir=HOME))
945 self.assertRaises(ftplib.error_perm, self.client.retrlines,
946 '%s ' %cmd + bogus, lambda x: x)
947 # for an empty directory we excpect that the data channel is
948 # opened anyway and that no data is received
949 x = []
950 tempdir = os.path.basename(tempfile.mkdtemp(dir=HOME))
951 try:
952 self.client.retrlines('%s %s' %(cmd, tempdir), x.append)
953 self.assertEqual(x, [])
954 finally:
955 os.rmdir(tempdir)
956
957 def test_nlst(self):
958 # common tests
959 self._test_listing_cmds('nlst')
960
961 def test_list(self):
962 # common tests
963 self._test_listing_cmds('list')
964 # known incorrect pathname arguments (e.g. old clients) are
965 # expected to be treated as if pathname would be == '/'
966 l1 = l2 = l3 = l4 = l5 = []
967 self.client.retrlines('list /', l1.append)
968 self.client.retrlines('list -a', l2.append)
969 self.client.retrlines('list -l', l3.append)
970 self.client.retrlines('list -al', l4.append)
971 self.client.retrlines('list -la', l5.append)
972 tot = (l1, l2, l3, l4, l5)
973 for x in range(len(tot) - 1):
974 self.assertEqual(tot[x], tot[x+1])
975
976 def test_mlst(self):
977 # utility function for extracting the line of interest
978 mlstline = lambda cmd: self.client.voidcmd(cmd).split('\n')[1]
979
980 # the fact set must be preceded by a space
981 self.failUnless(mlstline('mlst').startswith(' '))
982 # where TVFS is supported, a fully qualified pathname is expected
983 self.failUnless(mlstline('mlst ' + TESTFN).endswith('/' + TESTFN))
984 self.failUnless(mlstline('mlst').endswith('/'))
985 # assume that no argument has the same meaning of "/"
986 self.assertEqual(mlstline('mlst'), mlstline('mlst /'))
987 # non-existent path
988 bogus = os.path.basename(tempfile.mktemp(dir=HOME))
989 self.assertRaises(ftplib.error_perm, mlstline, bogus)
990 # test file/dir notations
991 self.failUnless('type=dir' in mlstline('mlst'))
992 self.failUnless('type=file' in mlstline('mlst ' + TESTFN))
993 # let's add some tests for OPTS command
994 self.client.sendcmd('opts mlst type;')
995 self.assertEqual(mlstline('mlst'), ' type=dir; /')
996 # where no facts are present, two leading spaces before the
997 # pathname are required (RFC-3659)
998 self.client.sendcmd('opts mlst')
999 self.assertEqual(mlstline('mlst'), ' /')
1000
1001 def test_mlsd(self):
1002 # common tests
1003 self._test_listing_cmds('mlsd')
1004 dir = os.path.basename(tempfile.mkdtemp(dir=HOME))
1005 try:
1006 try:
1007 self.client.retrlines('mlsd ' + TESTFN, lambda x: x)
1008 except ftplib.error_perm, resp:
1009 # if path is a file a 501 response code is expected
1010 self.assertEqual(str(resp)[0:3], "501")
1011 else:
1012 self.fail("Exception not raised")
1013 finally:
1014 os.rmdir(dir)
1015
1016 def test_stat(self):
1017 # test STAT provided with argument which is equal to LIST
1018 self.client.sendcmd('stat /')
1019 self.client.sendcmd('stat ' + TESTFN)
1020 self.client.putcmd('stat *')
1021 resp = self.client.getmultiline()
1022 self.assertEqual(resp, '550 Globbing not supported.')
1023 bogus = os.path.basename(tempfile.mktemp(dir=HOME))
1024 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stat ' + bogu s)
1025
1026
1027 class TestFtpAbort(unittest.TestCase):
1028 "test: ABOR"
1029
1030 def setUp(self):
1031 self.server = FTPd()
1032 self.server.start()
1033 self.client = ftplib.FTP()
1034 self.client.connect(self.server.host, self.server.port)
1035 self.client.login(USER, PASSWD)
1036 self.f1 = open(TESTFN, 'w+b')
1037 self.f2 = open(TESTFN2, 'w+b')
1038
1039 def tearDown(self):
1040 self.client.close()
1041 self.server.stop()
1042 if not self.f1.closed:
1043 self.f1.close()
1044 if not self.f2.closed:
1045 self.f2.close()
1046 os.remove(self.f1.name)
1047 os.remove(self.f2.name)
1048
1049 def test_abor_no_data(self):
1050 # Case 1: ABOR while no data channel is opened: respond with 225.
1051 resp = self.client.sendcmd('ABOR')
1052 self.failUnlessEqual('225 No transfer to abort.', resp)
1053
1054 def test_abor_pasv(self):
1055 # Case 2: user sends a PASV, a data-channel socket is listening
1056 # but not connected, and ABOR is sent: close listening data
1057 # socket, respond with 225.
1058 self.client.makepasv()
1059 respcode = self.client.sendcmd('ABOR')[:3]
1060 self.failUnlessEqual('225', respcode)
1061
1062 def test_abor_port(self):
1063 # Case 3: data channel opened with PASV or PORT, but ABOR sent
1064 # before a data transfer has been started: close data channel,
1065 # respond with 225
1066 self.client.makeport()
1067 respcode = self.client.sendcmd('ABOR')[:3]
1068 self.failUnlessEqual('225', respcode)
1069
1070 def test_abor(self):
1071 # Case 4: ABOR while a data transfer on DTP channel is in
1072 # progress: close data channel, respond with 426, respond
1073 # with 226.
1074 data = 'abcde12345' * 100000
1075 self.f1.write(data)
1076 self.f1.close()
1077
1078 # this ugly loop construct is to simulate an interrupted
1079 # transfer since ftplib doesn't like running storbinary()
1080 # in a separate thread
1081 self.client.voidcmd('TYPE I')
1082 conn = self.client.transfercmd('retr ' + TESTFN)
1083 chunk = conn.recv(len(data) / 2)
1084 # stop transfer while it isn't finished yet
1085 self.client.putcmd('ABOR')
1086
1087 # transfer isn't finished yet so ftpd should respond with 426
1088 self.assertRaises(ftplib.error_temp, self.client.voidresp)
1089
1090 # transfer successfully aborted, so should now respond with a 226
1091 self.failUnlessEqual('226', self.client.voidresp()[:3])
1092
1093 if hasattr(socket, 'MSG_OOB'):
1094 def test_oob_abor(self):
1095 # Send ABOR by following the RFC-959 directives of sending
1096 # Telnet IP/Synch sequence as OOB data.
1097 # On some systems like FreeBSD this happened to be a problem
1098 # due to a different SO_OOBINLINE behavior.
1099 # On some platforms (e.g. Python CE) the test may fail
1100 # although the MSG_OOB constant is defined.
1101 self.client.sock.sendall(chr(244), socket.MSG_OOB)
1102 self.client.sock.sendall(chr(242), socket.MSG_OOB)
1103 self.client.sock.sendall('abor\r\n')
1104 self.client.sock.settimeout(1)
1105 self.assertEqual(self.client.getresp()[:3], '225')
1106
1107
1108 class TestFtpStoreData(unittest.TestCase):
1109 "test: STOR, STOU, APPE, REST"
1110
1111 def setUp(self):
1112 self.server = FTPd()
1113 self.server.start()
1114 self.client = ftplib.FTP()
1115 self.client.connect(self.server.host, self.server.port)
1116 self.client.login(USER, PASSWD)
1117 self.f1 = open(TESTFN, 'w+b')
1118 self.f2 = open(TESTFN2, 'w+b')
1119
1120 def tearDown(self):
1121 self.client.close()
1122 self.server.stop()
1123 if not self.f1.closed:
1124 self.f1.close()
1125 if not self.f2.closed:
1126 self.f2.close()
1127 os.remove(TESTFN)
1128 os.remove(TESTFN2)
1129
1130 def test_stor(self):
1131 # TESTFN3 is the remote file name
1132 try:
1133 data = 'abcde12345' * 100000
1134 self.f1.write(data)
1135 self.f1.seek(0)
1136 self.client.storbinary('stor ' + TESTFN3, self.f1)
1137 self.client.retrbinary('retr ' + TESTFN3, self.f2.write)
1138 self.f2.seek(0)
1139 self.assertEqual(hash(data), hash (self.f2.read()))
1140 finally:
1141 # we do not use os.remove because file could be still
1142 # locked by ftpd thread
1143 if os.path.exists(TESTFN3):
1144 self.client.delete(TESTFN3)
1145
1146 def test_stou(self):
1147 data = 'abcde12345' * 100000
1148 self.f1.write(data)
1149 self.f1.seek(0)
1150
1151 self.client.voidcmd('TYPE I')
1152 # filename comes in as "1xx FILE: <filename>"
1153 filename = self.client.sendcmd('stou').split('FILE: ')[1]
1154 try:
1155 sock = self.client.makeport()
1156 conn, sockaddr = sock.accept()
1157 while 1:
1158 buf = self.f1.read(8192)
1159 if not buf:
1160 break
1161 conn.sendall(buf)
1162 conn.close()
1163 # transfer finished, a 226 response is expected
1164 self.client.voidresp()
1165 self.client.retrbinary('retr ' + filename, self.f2.write)
1166 self.f2.seek(0)
1167 self.assertEqual(hash(data), hash (self.f2.read()))
1168 finally:
1169 # we do not use os.remove because file could be
1170 # still locked by ftpd thread
1171 if os.path.exists(filename):
1172 self.client.delete(filename)
1173
1174 def test_stou_rest(self):
1175 # watch for STOU preceded by REST, which makes no sense.
1176 self.client.sendcmd('rest 10')
1177 self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'stou')
1178
1179 def test_appe(self):
1180 # TESTFN3 is the remote file name
1181 try:
1182 data1 = 'abcde12345' * 100000
1183 self.f1.write(data1)
1184 self.f1.seek(0)
1185 self.client.storbinary('stor ' + TESTFN3, self.f1)
1186
1187 data2 = 'fghil67890' * 100000
1188 self.f1.write(data2)
1189 self.f1.seek(self.client.size(TESTFN3))
1190 self.client.storbinary('appe ' + TESTFN3, self.f1)
1191
1192 self.client.retrbinary("retr " + TESTFN3, self.f2.write)
1193 self.f2.seek(0)
1194 self.assertEqual(hash(data1 + data2), hash (self.f2.read()))
1195 finally:
1196 # we do not use os.remove because file could be still
1197 # locked by ftpd thread
1198 if os.path.exists(TESTFN3):
1199 self.client.delete(TESTFN3)
1200
1201 def test_appe_rest(self):
1202 # watch for APPE preceded by REST, which makes no sense.
1203 self.client.sendcmd('rest 10')
1204 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'appe x')
1205
1206 def test_rest_on_stor(self):
1207 # TESTFN3 is the remote file name
1208 data = 'abcde12345' * 100000
1209 self.f1.write(data)
1210 self.f1.seek(0)
1211
1212 self.client.voidcmd('TYPE I')
1213 conn = self.client.transfercmd('stor ' + TESTFN3)
1214 bytes_sent = 0
1215 while 1:
1216 chunk = self.f1.read(8192)
1217 conn.sendall(chunk)
1218 bytes_sent += len(chunk)
1219 # stop transfer while it isn't finished yet
1220 if bytes_sent >= 524288: # 2^19
1221 break
1222 elif not chunk:
1223 break
1224 conn.close()
1225 # transfer wasn't finished yet so we expect a 426 response
1226 self.client.voidresp()
1227
1228 # resuming transfer by using a marker value greater than the
1229 # file size stored on the server should result in an error
1230 # on stor
1231 file_size = self.client.size(TESTFN3)
1232 self.assertEqual(file_size, bytes_sent)
1233 self.client.sendcmd('rest %s' %((file_size + 1)))
1234 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stor ' + TEST FN3)
1235
1236 self.client.sendcmd('rest %s' %bytes_sent)
1237 self.client.storbinary('stor ' + TESTFN3, self.f1)
1238
1239 self.client.retrbinary('retr ' + TESTFN3, self.f2.write)
1240 self.f1.seek(0)
1241 self.f2.seek(0)
1242 self.assertEqual(hash(self.f1.read()), hash(self.f2.read()))
1243 self.client.delete(TESTFN3)
1244
1245
1246 class TestTimeouts(unittest.TestCase):
1247 """Test idle-timeout capabilities of control and data channels.
1248 Some tests may fail on slow machines.
1249 """
1250
1251 def _setUp(self, idle_timeout=300, data_timeout=300, pasv_timeout=30):
1252 self.server = FTPd()
1253 self.server.handler.timeout = idle_timeout
1254 self.server.handler.dtp_handler.timeout = data_timeout
1255 self.server.handler.passive_dtp.timeout = pasv_timeout
1256 self.server.start()
1257 self.client = ftplib.FTP()
1258 self.client.connect(self.server.host, self.server.port)
1259 self.client.login(USER, PASSWD)
1260
1261 def tearDown(self):
1262 self.client.close()
1263 self.server.handler.timeout = 300
1264 self.server.handler.dtp_handler.timeout = 300
1265 self.server.handler.passive_dtp.timeout = 30
1266 self.server.stop()
1267
1268 def test_idle_timeout(self):
1269 # Test control channel timeout. The client which does not send
1270 # any command within the time specified in FTPHandler.timeout is
1271 # supposed to be kicked off.
1272 self._setUp(idle_timeout=0.1)
1273 # fail if no msg is received within 1 second
1274 self.client.sock.settimeout(1)
1275 data = self.client.sock.recv(1024)
1276 self.assertEqual(data, "421 Control connection timed out.\r\n")
1277 # ensure client has been kicked off
1278 self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop')
1279
1280 def test_data_timeout(self):
1281 # Test data channel timeout. The client which does not send
1282 # or receive any data within the time specified in
1283 # DTPHandler.timeout is supposed to be kicked off.
1284 self._setUp(data_timeout=0.1)
1285 addr = self.client.makepasv()
1286 s = socket.socket()
1287 s.connect(addr)
1288 # fail if no msg is received within 1 second
1289 self.client.sock.settimeout(1)
1290 data = self.client.sock.recv(1024)
1291 self.assertEqual(data, "421 Data connection timed out.\r\n")
1292 # ensure client has been kicked off
1293 self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop')
1294
1295 def test_idle_data_timeout1(self):
1296 # Tests that the control connection timeout is suspended while
1297 # the data channel is opened
1298 self._setUp(idle_timeout=0.1, data_timeout=0.2)
1299 addr = self.client.makepasv()
1300 s = socket.socket()
1301 s.connect(addr)
1302 # fail if no msg is received within 1 second
1303 self.client.sock.settimeout(1)
1304 data = self.client.sock.recv(1024)
1305 self.assertEqual(data, "421 Data connection timed out.\r\n")
1306 # ensure client has been kicked off
1307 self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop')
1308
1309 def test_idle_data_timeout2(self):
1310 # Tests that the control connection timeout is restarted after
1311 # data channel has been closed
1312 self._setUp(idle_timeout=0.1, data_timeout=0.2)
1313 addr = self.client.makepasv()
1314 s = socket.socket()
1315 s.connect(addr)
1316 # close data channel
1317 self.client.sendcmd('abor')
1318 self.client.sock.settimeout(1)
1319 data = self.client.sock.recv(1024)
1320 self.assertEqual(data, "421 Control connection timed out.\r\n")
1321 # ensure client has been kicked off
1322 self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop')
1323
1324 def test_pasv_timeout(self):
1325 # Test pasv data channel timeout. The client which does not connect
1326 # to the listening data socket within the time specified in
1327 # PassiveDTP.timeout is supposed to receive a 421 response.
1328 self._setUp(pasv_timeout=0.1)
1329 self.client.makepasv()
1330 # fail if no msg is received within 1 second
1331 self.client.sock.settimeout(1)
1332 data = self.client.sock.recv(1024)
1333 self.assertEqual(data, "421 Passive data channel timed out.\r\n")
1334 # client is not expected to be kicked off
1335 self.client.sendcmd('noop')
1336
1337
1338 class TestMaxConnections(unittest.TestCase):
1339 """Test maximum connections (FTPServer.max_cons)."""
1340
1341 def setUp(self):
1342 self.server = FTPd()
1343 self.server.server.max_cons = 3
1344 self.server.start()
1345
1346 def tearDown(self):
1347 self.server.server.max_cons = 0
1348 self.server.stop()
1349
1350 def test_max_connections(self):
1351 c1 = ftplib.FTP()
1352 c2 = ftplib.FTP()
1353 c3 = ftplib.FTP()
1354 try:
1355 c1.connect(self.server.host, self.server.port)
1356 c2.connect(self.server.host, self.server.port)
1357 self.assertRaises(ftplib.error_temp, c3.connect, self.server.host,
1358 self.server.port)
1359 # with passive data channel established
1360 c2.close()
1361 c1.login(USER, PASSWD)
1362 c1.makepasv()
1363 self.assertRaises(ftplib.error_temp, c2.connect, self.server.host,
1364 self.server.port)
1365 # with passive data socket waiting for connection
1366 c1.login(USER, PASSWD)
1367 c1.sendcmd('pasv')
1368 self.assertRaises(ftplib.error_temp, c2.connect, self.server.host,
1369 self.server.port)
1370 # with active data channel established
1371 c1.login(USER, PASSWD)
1372 c1.makeport()
1373 self.assertRaises(ftplib.error_temp, c2.connect, self.server.host,
1374 self.server.port)
1375 finally:
1376 c1.close()
1377 c2.close()
1378 c3.close()
1379
1380
1381 class _TestNetworkProtocols(unittest.TestCase):
1382 """Test PASV, EPSV, PORT and EPRT commands.
1383
1384 Do not use this class directly. Let TestIPv4Environment and
1385 TestIPv6Environment classes use it instead.
1386 """
1387 HOST = HOST
1388
1389 def setUp(self):
1390 self.server = FTPd(self.HOST)
1391 self.server.start()
1392 self.client = ftplib.FTP()
1393 self.client.connect(self.server.host, self.server.port)
1394 self.client.login(USER, PASSWD)
1395 if self.client.af == socket.AF_INET:
1396 self.proto = "1"
1397 self.other_proto = "2"
1398 else:
1399 self.proto = "2"
1400 self.other_proto = "1"
1401
1402 def tearDown(self):
1403 self.client.close()
1404 self.server.stop()
1405
1406 def cmdresp(self, cmd):
1407 """Send a command and return response, also if the command failed."""
1408 try:
1409 return self.client.sendcmd(cmd)
1410 except ftplib.Error, err:
1411 return str(err)
1412
1413 def test_eprt(self):
1414 # test wrong proto
1415 try:
1416 self.client.sendcmd('eprt |%s|%s|%s|' %(self.other_proto,
1417 self.server.host, self.server.port))
1418 except ftplib.error_perm, err:
1419 self.assertEqual(str(err)[0:3], "522")
1420 else:
1421 self.fail("Exception not raised")
1422
1423 # test bad args
1424 msg = "501 Invalid EPRT format."
1425 # len('|') > 3
1426 self.assertEqual(self.cmdresp('eprt ||||'), msg)
1427 # len('|') < 3
1428 self.assertEqual(self.cmdresp('eprt ||'), msg)
1429 # port > 65535
1430 self.assertEqual(self.cmdresp('eprt |%s|%s|65536|' %(self.proto,
1431 self.HOST)), msg)
1432 # port < 0
1433 self.assertEqual(self.cmdresp('eprt |%s|%s|-1|' %(self.proto,
1434 self.HOST)), msg)
1435 # port < 1024
1436 self.assertEqual(self.cmdresp('eprt |%s|%s|222|' %(self.proto,
1437 self.HOST)), "501 Can't connect over a privileged port.")
1438
1439 # test connection
1440 sock = socket.socket(self.client.af, socket.SOCK_STREAM)
1441 sock.bind((self.client.sock.getsockname()[0], 0))
1442 sock.listen(5)
1443 sock.settimeout(2)
1444 ip, port = sock.getsockname()[:2]
1445 self.client.sendcmd('eprt |%s|%s|%s|' %(self.proto, ip, port))
1446 try:
1447 try:
1448 sock.accept()
1449 except socket.timeout:
1450 self.fail("Server didn't connect to passive socket")
1451 finally:
1452 sock.close()
1453
1454 def test_epsv(self):
1455 # test wrong proto
1456 try:
1457 self.client.sendcmd('epsv ' + self.other_proto)
1458 except ftplib.error_perm, err:
1459 self.assertEqual(str(err)[0:3], "522")
1460 else:
1461 self.fail("Exception not raised")
1462
1463 # test connection
1464 for cmd in ('EPSV', 'EPSV ' + self.proto):
1465 host, port = ftplib.parse229(self.client.sendcmd(cmd),
1466 self.client.sock.getpeername())
1467 s = socket.socket(self.client.af, socket.SOCK_STREAM)
1468 s.settimeout(2)
1469 try:
1470 s.connect((host, port))
1471 self.client.sendcmd('abor')
1472 finally:
1473 s.close()
1474
1475 def test_epsv_all(self):
1476 self.client.sendcmd('epsv all')
1477 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pasv')
1478 self.assertRaises(ftplib.error_perm, self.client.sendport, self.HOST, 20 00)
1479 self.assertRaises(ftplib.error_perm, self.client.sendcmd,
1480 'eprt |%s|%s|%s|' %(self.proto, self.HOST, 2000))
1481
1482
1483 class TestIPv4Environment(_TestNetworkProtocols):
1484 """Test PASV, EPSV, PORT and EPRT commands.
1485
1486 Runs tests contained in _TestNetworkProtocols class by using IPv4
1487 plus some additional specific tests.
1488 """
1489 HOST = '127.0.0.1'
1490
1491 def test_port_v4(self):
1492 # test connection
1493 self.client.makeport()
1494 self.client.sendcmd('abor')
1495 # test bad arguments
1496 ae = self.assertEqual
1497 msg = "501 Invalid PORT format."
1498 ae(self.cmdresp('port 127,0,0,1,1.1'), msg) # sep != ','
1499 ae(self.cmdresp('port X,0,0,1,1,1'), msg) # value != int
1500 ae(self.cmdresp('port 127,0,0,1,1,1,1'), msg) # len(args) > 6
1501 ae(self.cmdresp('port 127,0,0,1'), msg) # len(args) < 6
1502 ae(self.cmdresp('port 256,0,0,1,1,1'), msg) # oct > 255
1503 ae(self.cmdresp('port 127,0,0,1,256,1'), msg) # port > 65535
1504 ae(self.cmdresp('port 127,0,0,1,-1,0'), msg) # port < 0
1505 msg = "501 Can't connect over a privileged port."
1506 ae(self.cmdresp('port %s,1,1' %self.HOST.replace('.',',')),msg) # port < 1024
1507 if "1.2.3.4" != self.HOST:
1508 msg = "501 Can't connect to a foreign address."
1509 ae(self.cmdresp('port 1,2,3,4,4,4'), msg)
1510
1511 def test_eprt_v4(self):
1512 self.assertEqual(self.cmdresp('eprt |1|0.10.10.10|2222|'),
1513 "501 Can't connect to a foreign address.")
1514
1515 def test_pasv_v4(self):
1516 host, port = ftplib.parse227(self.client.sendcmd('pasv'))
1517 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1518 s.settimeout(2)
1519 try:
1520 s.connect((host, port))
1521 finally:
1522 s.close()
1523
1524
1525 class TestIPv6Environment(_TestNetworkProtocols):
1526 """Test PASV, EPSV, PORT and EPRT commands.
1527
1528 Runs tests contained in _TestNetworkProtocols class by using IPv6
1529 plus some additional specific tests.
1530 """
1531 HOST = '::1'
1532
1533 def test_port_v6(self):
1534 # 425 expected
1535 self.assertRaises(ftplib.error_temp, self.client.sendport,
1536 self.server.host, self.server.port)
1537
1538 def test_pasv_v6(self):
1539 # 425 expected
1540 self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'pasv')
1541
1542 def test_eprt_v6(self):
1543 self.assertEqual(self.cmdresp('eprt |2|::xxx|2222|'),
1544 "501 Can't connect to a foreign address.")
1545
1546
1547 class FTPd(threading.Thread):
1548 """A threaded FTP server used for running tests."""
1549
1550 def __init__(self, host=HOST, port=0, verbose=False):
1551 threading.Thread.__init__(self)
1552 self.active = False
1553 if not verbose:
1554 ftpserver.log = ftpserver.logline = lambda x: x
1555 self.authorizer = ftpserver.DummyAuthorizer()
1556 self.authorizer.add_user(USER, PASSWD, HOME, perm='elradfmw') # full pe rms
1557 self.authorizer.add_anonymous(HOME)
1558 self.handler = ftpserver.FTPHandler
1559 self.handler.authorizer = self.authorizer
1560 self.server = ftpserver.FTPServer((host, port), self.handler)
1561 self.host, self.port = self.server.socket.getsockname()[:2]
1562 self.active_lock = threading.Lock()
1563
1564 def start(self):
1565 assert not self.active
1566 self.__flag = threading.Event()
1567 threading.Thread.start(self)
1568 self.__flag.wait()
1569
1570 def run(self):
1571 self.active = True
1572 self.__flag.set()
1573 while self.active:
1574 self.active_lock.acquire()
1575 self.server.serve_forever(timeout=0.001, count=1)
1576 self.active_lock.release()
1577 self.server.close_all(ignore_all=True)
1578
1579 def stop(self):
1580 assert self.active
1581 self.active = False
1582 self.join()
1583
1584
1585 def remove_test_files():
1586 "Convenience function for removing temporary test files"
1587 for file in [TESTFN, TESTFN2, TESTFN3]:
1588 try:
1589 os.remove(file)
1590 except os.error:
1591 pass
1592
1593 def test_main(tests=None):
1594 test_suite = unittest.TestSuite()
1595 if tests is None:
1596 tests = [
1597 TestAbstractedFS,
1598 TestDummyAuthorizer,
1599 TestCallLater,
1600 TestFtpAuthentication,
1601 TestFtpDummyCmds,
1602 TestFtpCmdsSemantic,
1603 TestFtpFsOperations,
1604 TestFtpRetrieveData,
1605 TestFtpAbort,
1606 TestFtpStoreData,
1607 TestTimeouts,
1608 TestMaxConnections
1609 ]
1610 if SUPPORTS_IPV4:
1611 tests.append(TestIPv4Environment)
1612 if SUPPORTS_IPV6:
1613 tests.append(TestIPv6Environment)
1614
1615 for test in tests:
1616 test_suite.addTest(unittest.makeSuite(test))
1617 remove_test_files()
1618 unittest.TextTestRunner(verbosity=2).run(test_suite)
1619 remove_test_files()
1620
1621
1622 if __name__ == '__main__':
1623 test_main()
OLDNEW
« no previous file with comments | « third_party/pyftpdlib/setup.py ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698