Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(922)

Side by Side Diff: third_party/twisted_8_1/twisted/test/test_ftp.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 # Copyright (c) 2001-2007 Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4
5 """
6 FTP tests.
7
8 Maintainer: U{Andrew Bennetts<mailto:spiv@twistedmatrix.com>}
9 """
10
11 import os.path
12 from StringIO import StringIO
13 import shutil
14 import errno
15
16 from zope.interface import implements
17
18 from twisted.trial import unittest
19 from twisted.protocols import basic
20 from twisted.internet import reactor, protocol, defer, error
21 from twisted.internet.interfaces import IConsumer
22 from twisted.cred import portal, checkers, credentials
23 from twisted.python import failure, filepath
24 from twisted.test import proto_helpers
25
26 from twisted.protocols import ftp, loopback
27
28 class NonClosingStringIO(StringIO):
29 def close(self):
30 pass
31
32 StringIOWithoutClosing = NonClosingStringIO
33
34
35
36 class Dummy(basic.LineReceiver):
37 logname = None
38 def __init__(self):
39 self.lines = []
40 self.rawData = []
41 def connectionMade(self):
42 self.f = self.factory # to save typing in pdb :-)
43 def lineReceived(self,line):
44 self.lines.append(line)
45 def rawDataReceived(self, data):
46 self.rawData.append(data)
47 def lineLengthExceeded(self, line):
48 pass
49
50
51 class _BufferingProtocol(protocol.Protocol):
52 def connectionMade(self):
53 self.buffer = ''
54 self.d = defer.Deferred()
55 def dataReceived(self, data):
56 self.buffer += data
57 def connectionLost(self, reason):
58 self.d.callback(self)
59
60
61 class FTPServerTestCase(unittest.TestCase):
62 """
63 Simple tests for an FTP server with the default settings.
64
65 @ivar clientFactory: class used as ftp client.
66 """
67 clientFactory = ftp.FTPClientBasic
68
69 def setUp(self):
70 # Create a directory
71 self.directory = self.mktemp()
72 os.mkdir(self.directory)
73
74 # Start the server
75 p = portal.Portal(ftp.FTPRealm(self.directory))
76 p.registerChecker(checkers.AllowAnonymousAccess(),
77 credentials.IAnonymous)
78 self.factory = ftp.FTPFactory(portal=p)
79 self.port = reactor.listenTCP(0, self.factory, interface="127.0.0.1")
80
81 # Hook the server's buildProtocol to make the protocol instance
82 # accessible to tests.
83 buildProtocol = self.factory.buildProtocol
84 d1 = defer.Deferred()
85 def _rememberProtocolInstance(addr):
86 protocol = buildProtocol(addr)
87 self.serverProtocol = protocol.wrappedProtocol
88 d1.callback(None)
89 return protocol
90 self.factory.buildProtocol = _rememberProtocolInstance
91
92 # Connect a client to it
93 portNum = self.port.getHost().port
94 clientCreator = protocol.ClientCreator(reactor, self.clientFactory)
95 d2 = clientCreator.connectTCP("127.0.0.1", portNum)
96 def gotClient(client):
97 self.client = client
98 d2.addCallback(gotClient)
99 return defer.gatherResults([d1, d2])
100
101 def tearDown(self):
102 # Clean up sockets
103 self.client.transport.loseConnection()
104 d = defer.maybeDeferred(self.port.stopListening)
105 d.addCallback(self.ebTearDown)
106 return d
107
108 def ebTearDown(self, ignore):
109 del self.serverProtocol
110 # Clean up temporary directory
111 shutil.rmtree(self.directory)
112
113 def assertCommandResponse(self, command, expectedResponseLines,
114 chainDeferred=None):
115 """Asserts that a sending an FTP command receives the expected
116 response.
117
118 Returns a Deferred. Optionally accepts a deferred to chain its actions
119 to.
120 """
121 if chainDeferred is None:
122 chainDeferred = defer.succeed(None)
123
124 def queueCommand(ignored):
125 d = self.client.queueStringCommand(command)
126 def gotResponse(responseLines):
127 self.assertEquals(expectedResponseLines, responseLines)
128 return d.addCallback(gotResponse)
129 return chainDeferred.addCallback(queueCommand)
130
131 def assertCommandFailed(self, command, expectedResponse=None,
132 chainDeferred=None):
133 if chainDeferred is None:
134 chainDeferred = defer.succeed(None)
135
136 def queueCommand(ignored):
137 return self.client.queueStringCommand(command)
138 chainDeferred.addCallback(queueCommand)
139 self.assertFailure(chainDeferred, ftp.CommandFailed)
140 def failed(exception):
141 if expectedResponse is not None:
142 self.failUnlessEqual(
143 expectedResponse, exception.args[0])
144 return chainDeferred.addCallback(failed)
145
146 def _anonymousLogin(self):
147 d = self.assertCommandResponse(
148 'USER anonymous',
149 ['331 Guest login ok, type your email address as password.'])
150 return self.assertCommandResponse(
151 'PASS test@twistedmatrix.com',
152 ['230 Anonymous login ok, access restrictions apply.'],
153 chainDeferred=d)
154
155
156 class BasicFTPServerTestCase(FTPServerTestCase):
157 def testNotLoggedInReply(self):
158 """When not logged in, all commands other than USER and PASS should
159 get NOT_LOGGED_IN errors.
160 """
161 commandList = ['CDUP', 'CWD', 'LIST', 'MODE', 'PASV',
162 'PWD', 'RETR', 'STRU', 'SYST', 'TYPE']
163
164 # Issue commands, check responses
165 def checkResponse(exception):
166 failureResponseLines = exception.args[0]
167 self.failUnless(failureResponseLines[-1].startswith("530"),
168 "Response didn't start with 530: %r"
169 % (failureResponseLines[-1],))
170 deferreds = []
171 for command in commandList:
172 deferred = self.client.queueStringCommand(command)
173 self.assertFailure(deferred, ftp.CommandFailed)
174 deferred.addCallback(checkResponse)
175 deferreds.append(deferred)
176 return defer.DeferredList(deferreds, fireOnOneErrback=True)
177
178 def testPASSBeforeUSER(self):
179 """Issuing PASS before USER should give an error."""
180 return self.assertCommandFailed(
181 'PASS foo',
182 ["503 Incorrect sequence of commands: "
183 "USER required before PASS"])
184
185 def testNoParamsForUSER(self):
186 """Issuing USER without a username is a syntax error."""
187 return self.assertCommandFailed(
188 'USER',
189 ['500 Syntax error: USER requires an argument.'])
190
191 def testNoParamsForPASS(self):
192 """Issuing PASS without a password is a syntax error."""
193 d = self.client.queueStringCommand('USER foo')
194 return self.assertCommandFailed(
195 'PASS',
196 ['500 Syntax error: PASS requires an argument.'],
197 chainDeferred=d)
198
199 def testAnonymousLogin(self):
200 return self._anonymousLogin()
201
202 def testQuit(self):
203 """Issuing QUIT should return a 221 message."""
204 d = self._anonymousLogin()
205 return self.assertCommandResponse(
206 'QUIT',
207 ['221 Goodbye.'],
208 chainDeferred=d)
209
210 def testAnonymousLoginDenied(self):
211 # Reconfigure the server to disallow anonymous access, and to have an
212 # IUsernamePassword checker that always rejects.
213 self.factory.allowAnonymous = False
214 denyAlwaysChecker = checkers.InMemoryUsernamePasswordDatabaseDontUse()
215 self.factory.portal.registerChecker(denyAlwaysChecker,
216 credentials.IUsernamePassword)
217
218 # Same response code as allowAnonymous=True, but different text.
219 d = self.assertCommandResponse(
220 'USER anonymous',
221 ['331 Password required for anonymous.'])
222
223 # It will be denied. No-one can login.
224 d = self.assertCommandFailed(
225 'PASS test@twistedmatrix.com',
226 ['530 Sorry, Authentication failed.'],
227 chainDeferred=d)
228
229 # It's not just saying that. You aren't logged in.
230 d = self.assertCommandFailed(
231 'PWD',
232 ['530 Please login with USER and PASS.'],
233 chainDeferred=d)
234 return d
235
236 def testUnknownCommand(self):
237 d = self._anonymousLogin()
238 return self.assertCommandFailed(
239 'GIBBERISH',
240 ["502 Command 'GIBBERISH' not implemented"],
241 chainDeferred=d)
242
243 def testRETRBeforePORT(self):
244 d = self._anonymousLogin()
245 return self.assertCommandFailed(
246 'RETR foo',
247 ["503 Incorrect sequence of commands: "
248 "PORT or PASV required before RETR"],
249 chainDeferred=d)
250
251 def testSTORBeforePORT(self):
252 d = self._anonymousLogin()
253 return self.assertCommandFailed(
254 'STOR foo',
255 ["503 Incorrect sequence of commands: "
256 "PORT or PASV required before STOR"],
257 chainDeferred=d)
258
259 def testBadCommandArgs(self):
260 d = self._anonymousLogin()
261 self.assertCommandFailed(
262 'MODE z',
263 ["504 Not implemented for parameter 'z'."],
264 chainDeferred=d)
265 self.assertCommandFailed(
266 'STRU I',
267 ["504 Not implemented for parameter 'I'."],
268 chainDeferred=d)
269 return d
270
271 def testDecodeHostPort(self):
272 self.assertEquals(ftp.decodeHostPort('25,234,129,22,100,23'),
273 ('25.234.129.22', 25623))
274 nums = range(6)
275 for i in range(6):
276 badValue = list(nums)
277 badValue[i] = 256
278 s = ','.join(map(str, badValue))
279 self.assertRaises(ValueError, ftp.decodeHostPort, s)
280
281 def testPASV(self):
282 # Login
283 wfd = defer.waitForDeferred(self._anonymousLogin())
284 yield wfd
285 wfd.getResult()
286
287 # Issue a PASV command, and extract the host and port from the response
288 pasvCmd = defer.waitForDeferred(self.client.queueStringCommand('PASV'))
289 yield pasvCmd
290 responseLines = pasvCmd.getResult()
291 host, port = ftp.decodeHostPort(responseLines[-1][4:])
292
293 # Make sure the server is listening on the port it claims to be
294 self.assertEqual(port, self.serverProtocol.dtpPort.getHost().port)
295
296 # Semi-reasonable way to force cleanup
297 self.serverProtocol.transport.loseConnection()
298 testPASV = defer.deferredGenerator(testPASV)
299
300 def testSYST(self):
301 d = self._anonymousLogin()
302 self.assertCommandResponse('SYST', ["215 UNIX Type: L8"],
303 chainDeferred=d)
304 return d
305
306
307 def test_portRangeForwardError(self):
308 """
309 Exceptions other than L{error.CannotListenError} which are raised by
310 C{listenFactory} should be raised to the caller of L{FTP.getDTPPort}.
311 """
312 def listenFactory(portNumber, factory):
313 raise RuntimeError()
314 self.serverProtocol.listenFactory = listenFactory
315
316 self.assertRaises(RuntimeError, self.serverProtocol.getDTPPort,
317 protocol.Factory())
318
319
320 def test_portRange(self):
321 """
322 L{FTP.passivePortRange} should determine the ports which
323 L{FTP.getDTPPort} attempts to bind. If no port from that iterator can
324 be bound, L{error.CannotListenError} should be raised, otherwise the
325 first successful result from L{FTP.listenFactory} should be returned.
326 """
327 def listenFactory(portNumber, factory):
328 if portNumber in (22032, 22033, 22034):
329 raise error.CannotListenError('localhost', portNumber, 'error')
330 return portNumber
331 self.serverProtocol.listenFactory = listenFactory
332
333 port = self.serverProtocol.getDTPPort(protocol.Factory())
334 self.assertEquals(port, 0)
335
336 self.serverProtocol.passivePortRange = xrange(22032, 65536)
337 port = self.serverProtocol.getDTPPort(protocol.Factory())
338 self.assertEquals(port, 22035)
339
340 self.serverProtocol.passivePortRange = xrange(22032, 22035)
341 self.assertRaises(error.CannotListenError,
342 self.serverProtocol.getDTPPort,
343 protocol.Factory())
344
345
346
347 class FTPServerTestCaseAdvancedClient(FTPServerTestCase):
348 """
349 Test FTP server with the L{ftp.FTPClient} class.
350 """
351 clientFactory = ftp.FTPClient
352
353 def test_anonymousSTOR(self):
354 """
355 Try to make an STOR as anonymous, and check that we got a permission
356 denied error.
357 """
358 def eb(res):
359 res.trap(ftp.CommandFailed)
360 self.assertEquals(res.value.args[0][0],
361 '550 foo: Permission denied.')
362 d1, d2 = self.client.storeFile('foo')
363 d2.addErrback(eb)
364 return defer.gatherResults([d1, d2])
365
366
367 class FTPServerPasvDataConnectionTestCase(FTPServerTestCase):
368 def _makeDataConnection(self, ignored=None):
369 # Establish a passive data connection (i.e. client connecting to
370 # server).
371 d = self.client.queueStringCommand('PASV')
372 def gotPASV(responseLines):
373 host, port = ftp.decodeHostPort(responseLines[-1][4:])
374 cc = protocol.ClientCreator(reactor, _BufferingProtocol)
375 return cc.connectTCP('127.0.0.1', port)
376 return d.addCallback(gotPASV)
377
378 def _download(self, command, chainDeferred=None):
379 if chainDeferred is None:
380 chainDeferred = defer.succeed(None)
381
382 chainDeferred.addCallback(self._makeDataConnection)
383 def queueCommand(downloader):
384 # wait for the command to return, and the download connection to be
385 # closed.
386 d1 = self.client.queueStringCommand(command)
387 d2 = downloader.d
388 return defer.gatherResults([d1, d2])
389 chainDeferred.addCallback(queueCommand)
390
391 def downloadDone((ignored, downloader)):
392 return downloader.buffer
393 return chainDeferred.addCallback(downloadDone)
394
395 def testEmptyLIST(self):
396 # Login
397 d = self._anonymousLogin()
398
399 # No files, so the file listing should be empty
400 self._download('LIST', chainDeferred=d)
401 def checkEmpty(result):
402 self.assertEqual('', result)
403 return d.addCallback(checkEmpty)
404
405 def testTwoDirLIST(self):
406 # Make some directories
407 os.mkdir(os.path.join(self.directory, 'foo'))
408 os.mkdir(os.path.join(self.directory, 'bar'))
409
410 # Login
411 d = self._anonymousLogin()
412
413 # We expect 2 lines because there are two files.
414 self._download('LIST', chainDeferred=d)
415 def checkDownload(download):
416 self.assertEqual(2, len(download[:-2].split('\r\n')))
417 d.addCallback(checkDownload)
418
419 # Download a names-only listing.
420 self._download('NLST ', chainDeferred=d)
421 def checkDownload(download):
422 filenames = download[:-2].split('\r\n')
423 filenames.sort()
424 self.assertEqual(['bar', 'foo'], filenames)
425 d.addCallback(checkDownload)
426
427 # Download a listing of the 'foo' subdirectory. 'foo' has no files, so
428 # the file listing should be empty.
429 self._download('LIST foo', chainDeferred=d)
430 def checkDownload(download):
431 self.assertEqual('', download)
432 d.addCallback(checkDownload)
433
434 # Change the current working directory to 'foo'.
435 def chdir(ignored):
436 return self.client.queueStringCommand('CWD foo')
437 d.addCallback(chdir)
438
439 # Download a listing from within 'foo', and again it should be empty,
440 # because LIST uses the working directory by default.
441 self._download('LIST', chainDeferred=d)
442 def checkDownload(download):
443 self.assertEqual('', download)
444 return d.addCallback(checkDownload)
445
446 def testManyLargeDownloads(self):
447 # Login
448 d = self._anonymousLogin()
449
450 # Download a range of different size files
451 for size in range(100000, 110000, 500):
452 fObj = file(os.path.join(self.directory, '%d.txt' % (size,)), 'wb')
453 fObj.write('x' * size)
454 fObj.close()
455
456 self._download('RETR %d.txt' % (size,), chainDeferred=d)
457 def checkDownload(download, size=size):
458 self.assertEqual('x' * size, download)
459 d.addCallback(checkDownload)
460 return d
461
462
463 class FTPServerPortDataConnectionTestCase(FTPServerPasvDataConnectionTestCase):
464 def setUp(self):
465 self.dataPorts = []
466 return FTPServerPasvDataConnectionTestCase.setUp(self)
467
468 def _makeDataConnection(self, ignored=None):
469 # Establish an active data connection (i.e. server connecting to
470 # client).
471 deferred = defer.Deferred()
472 class DataFactory(protocol.ServerFactory):
473 protocol = _BufferingProtocol
474 def buildProtocol(self, addr):
475 p = protocol.ServerFactory.buildProtocol(self, addr)
476 reactor.callLater(0, deferred.callback, p)
477 return p
478 dataPort = reactor.listenTCP(0, DataFactory(), interface='127.0.0.1')
479 self.dataPorts.append(dataPort)
480 cmd = 'PORT ' + ftp.encodeHostPort('127.0.0.1', dataPort.getHost().port)
481 self.client.queueStringCommand(cmd)
482 return deferred
483
484 def tearDown(self):
485 l = [defer.maybeDeferred(port.stopListening) for port in self.dataPorts]
486 d = defer.maybeDeferred(
487 FTPServerPasvDataConnectionTestCase.tearDown, self)
488 l.append(d)
489 return defer.DeferredList(l, fireOnOneErrback=True)
490
491 def testPORTCannotConnect(self):
492 # Login
493 d = self._anonymousLogin()
494
495 # Listen on a port, and immediately stop listening as a way to find a
496 # port number that is definitely closed.
497 def loggedIn(ignored):
498 port = reactor.listenTCP(0, protocol.Factory(),
499 interface='127.0.0.1')
500 portNum = port.getHost().port
501 d = port.stopListening()
502 d.addCallback(lambda _: portNum)
503 return d
504 d.addCallback(loggedIn)
505
506 # Tell the server to connect to that port with a PORT command, and
507 # verify that it fails with the right error.
508 def gotPortNum(portNum):
509 return self.assertCommandFailed(
510 'PORT ' + ftp.encodeHostPort('127.0.0.1', portNum),
511 ["425 Can't open data connection."])
512 return d.addCallback(gotPortNum)
513
514
515 # -- Client Tests -----------------------------------------------------------
516
517 class PrintLines(protocol.Protocol):
518 """Helper class used by FTPFileListingTests."""
519
520 def __init__(self, lines):
521 self._lines = lines
522
523 def connectionMade(self):
524 for line in self._lines:
525 self.transport.write(line + "\r\n")
526 self.transport.loseConnection()
527
528
529 class MyFTPFileListProtocol(ftp.FTPFileListProtocol):
530 def __init__(self):
531 self.other = []
532 ftp.FTPFileListProtocol.__init__(self)
533
534 def unknownLine(self, line):
535 self.other.append(line)
536
537
538 class FTPFileListingTests(unittest.TestCase):
539 def getFilesForLines(self, lines):
540 fileList = MyFTPFileListProtocol()
541 d = loopback.loopbackAsync(PrintLines(lines), fileList)
542 d.addCallback(lambda _: (fileList.files, fileList.other))
543 return d
544
545 def testOneLine(self):
546 # This example line taken from the docstring for FTPFileListProtocol
547 line = '-rw-r--r-- 1 root other 531 Jan 29 03:26 README'
548 def check(((file,), other)):
549 self.failIf(other, 'unexpect unparsable lines: %s' % repr(other))
550 self.failUnless(file['filetype'] == '-', 'misparsed fileitem')
551 self.failUnless(file['perms'] == 'rw-r--r--', 'misparsed perms')
552 self.failUnless(file['owner'] == 'root', 'misparsed fileitem')
553 self.failUnless(file['group'] == 'other', 'misparsed fileitem')
554 self.failUnless(file['size'] == 531, 'misparsed fileitem')
555 self.failUnless(file['date'] == 'Jan 29 03:26', 'misparsed fileitem' )
556 self.failUnless(file['filename'] == 'README', 'misparsed fileitem')
557 self.failUnless(file['nlinks'] == 1, 'misparsed nlinks')
558 self.failIf(file['linktarget'], 'misparsed linktarget')
559 return self.getFilesForLines([line]).addCallback(check)
560
561 def testVariantLines(self):
562 line1 = 'drw-r--r-- 2 root other 531 Jan 9 2003 A'
563 line2 = 'lrw-r--r-- 1 root other 1 Jan 29 03:26 B -> A'
564 line3 = 'woohoo! '
565 def check(((file1, file2), (other,))):
566 self.failUnless(other == 'woohoo! \r', 'incorrect other line')
567 # file 1
568 self.failUnless(file1['filetype'] == 'd', 'misparsed fileitem')
569 self.failUnless(file1['perms'] == 'rw-r--r--', 'misparsed perms')
570 self.failUnless(file1['owner'] == 'root', 'misparsed owner')
571 self.failUnless(file1['group'] == 'other', 'misparsed group')
572 self.failUnless(file1['size'] == 531, 'misparsed size')
573 self.failUnless(file1['date'] == 'Jan 9 2003', 'misparsed date')
574 self.failUnless(file1['filename'] == 'A', 'misparsed filename')
575 self.failUnless(file1['nlinks'] == 2, 'misparsed nlinks')
576 self.failIf(file1['linktarget'], 'misparsed linktarget')
577 # file 2
578 self.failUnless(file2['filetype'] == 'l', 'misparsed fileitem')
579 self.failUnless(file2['perms'] == 'rw-r--r--', 'misparsed perms')
580 self.failUnless(file2['owner'] == 'root', 'misparsed owner')
581 self.failUnless(file2['group'] == 'other', 'misparsed group')
582 self.failUnless(file2['size'] == 1, 'misparsed size')
583 self.failUnless(file2['date'] == 'Jan 29 03:26', 'misparsed date')
584 self.failUnless(file2['filename'] == 'B', 'misparsed filename')
585 self.failUnless(file2['nlinks'] == 1, 'misparsed nlinks')
586 self.failUnless(file2['linktarget'] == 'A', 'misparsed linktarget')
587 return self.getFilesForLines([line1, line2, line3]).addCallback(check)
588
589 def testUnknownLine(self):
590 def check((files, others)):
591 self.failIf(files, 'unexpected file entries')
592 self.failUnless(others == ['ABC\r', 'not a file\r'],
593 'incorrect unparsable lines: %s' % repr(others))
594 return self.getFilesForLines(['ABC', 'not a file']).addCallback(check)
595
596 def testYear(self):
597 # This example derived from bug description in issue 514.
598 fileList = ftp.FTPFileListProtocol()
599 exampleLine = (
600 '-rw-r--r-- 1 root other 531 Jan 29 2003 README\n')
601 class PrintLine(protocol.Protocol):
602 def connectionMade(self):
603 self.transport.write(exampleLine)
604 self.transport.loseConnection()
605
606 def check(ignored):
607 file = fileList.files[0]
608 self.failUnless(file['size'] == 531, 'misparsed fileitem')
609 self.failUnless(file['date'] == 'Jan 29 2003', 'misparsed fileitem')
610 self.failUnless(file['filename'] == 'README', 'misparsed fileitem')
611
612 d = loopback.loopbackAsync(PrintLine(), fileList)
613 return d.addCallback(check)
614
615
616 class FTPClientTests(unittest.TestCase):
617 def tearDown(self):
618 # Clean up self.port, if any.
619 port = getattr(self, 'port', None)
620 if port is not None:
621 return port.stopListening()
622
623 def testFailedRETR(self):
624 f = protocol.Factory()
625 f.noisy = 0
626 self.port = reactor.listenTCP(0, f, interface="127.0.0.1")
627 portNum = self.port.getHost().port
628 # This test data derived from a bug report by ranty on #twisted
629 responses = ['220 ready, dude (vsFTPd 1.0.0: beat me, break me)',
630 # USER anonymous
631 '331 Please specify the password.',
632 # PASS twisted@twistedmatrix.com
633 '230 Login successful. Have fun.',
634 # TYPE I
635 '200 Binary it is, then.',
636 # PASV
637 '227 Entering Passive Mode (127,0,0,1,%d,%d)' %
638 (portNum >> 8, portNum & 0xff),
639 # RETR /file/that/doesnt/exist
640 '550 Failed to open file.']
641 f.buildProtocol = lambda addr: PrintLines(responses)
642
643 client = ftp.FTPClient(passive=1)
644 cc = protocol.ClientCreator(reactor, ftp.FTPClient, passive=1)
645 d = cc.connectTCP('127.0.0.1', portNum)
646 def gotClient(client):
647 p = protocol.Protocol()
648 return client.retrieveFile('/file/that/doesnt/exist', p)
649 d.addCallback(gotClient)
650 return self.assertFailure(d, ftp.CommandFailed)
651
652 def test_errbacksUponDisconnect(self):
653 """
654 Test the ftp command errbacks when a connection lost happens during
655 the operation.
656 """
657 ftpClient = ftp.FTPClient()
658 tr = proto_helpers.StringTransportWithDisconnection()
659 ftpClient.makeConnection(tr)
660 tr.protocol = ftpClient
661 d = ftpClient.list('some path', Dummy())
662 m = []
663 def _eb(failure):
664 m.append(failure)
665 return None
666 d.addErrback(_eb)
667 from twisted.internet.main import CONNECTION_LOST
668 ftpClient.connectionLost(failure.Failure(CONNECTION_LOST))
669 self.failUnless(m, m)
670 return d
671
672
673
674 class FTPClientTestCase(unittest.TestCase):
675 """
676 Test advanced FTP client commands.
677 """
678 def setUp(self):
679 """
680 Create a FTP client and connect it to fake transport.
681 """
682 self.client = ftp.FTPClient()
683 self.transport = proto_helpers.StringTransportWithDisconnection()
684 self.client.makeConnection(self.transport)
685 self.transport.protocol = self.client
686
687
688 def tearDown(self):
689 """
690 Deliver disconnection notification to the client so that it can
691 perform any cleanup which may be required.
692 """
693 self.client.connectionLost(error.ConnectionLost())
694
695
696 def _testLogin(self):
697 """
698 Test the login part.
699 """
700 self.assertEquals(self.transport.value(), '')
701 self.client.lineReceived(
702 '331 Guest login ok, type your email address as password.')
703 self.assertEquals(self.transport.value(), 'USER anonymous\r\n')
704 self.transport.clear()
705 self.client.lineReceived(
706 '230 Anonymous login ok, access restrictions apply.')
707 self.assertEquals(self.transport.value(), 'TYPE I\r\n')
708 self.transport.clear()
709 self.client.lineReceived('200 Type set to I.')
710
711
712 def test_CDUP(self):
713 """
714 Test the CDUP command.
715
716 L{ftp.FTPClient.cdup} should return a Deferred which fires with a
717 sequence of one element which is the string the server sent
718 indicating that the command was executed successfully.
719
720 (XXX - This is a bad API)
721 """
722 def cbCdup(res):
723 self.assertEquals(res[0], '250 Requested File Action Completed OK')
724
725 self._testLogin()
726 d = self.client.cdup().addCallback(cbCdup)
727 self.assertEquals(self.transport.value(), 'CDUP\r\n')
728 self.transport.clear()
729 self.client.lineReceived('250 Requested File Action Completed OK')
730 return d
731
732
733 def test_failedCDUP(self):
734 """
735 Test L{ftp.FTPClient.cdup}'s handling of a failed CDUP command.
736
737 When the CDUP command fails, the returned Deferred should errback
738 with L{ftp.CommandFailed}.
739 """
740 self._testLogin()
741 d = self.client.cdup()
742 self.assertFailure(d, ftp.CommandFailed)
743 self.assertEquals(self.transport.value(), 'CDUP\r\n')
744 self.transport.clear()
745 self.client.lineReceived('550 ..: No such file or directory')
746 return d
747
748
749 def test_PWD(self):
750 """
751 Test the PWD command.
752
753 L{ftp.FTPClient.pwd} should return a Deferred which fires with a
754 sequence of one element which is a string representing the current
755 working directory on the server.
756
757 (XXX - This is a bad API)
758 """
759 def cbPwd(res):
760 self.assertEquals(ftp.parsePWDResponse(res[0]), "/bar/baz")
761
762 self._testLogin()
763 d = self.client.pwd().addCallback(cbPwd)
764 self.assertEquals(self.transport.value(), 'PWD\r\n')
765 self.client.lineReceived('257 "/bar/baz"')
766 return d
767
768
769 def test_failedPWD(self):
770 """
771 Test a failure in PWD command.
772
773 When the PWD command fails, the returned Deferred should errback
774 with L{ftp.CommandFailed}.
775 """
776 self._testLogin()
777 d = self.client.pwd()
778 self.assertFailure(d, ftp.CommandFailed)
779 self.assertEquals(self.transport.value(), 'PWD\r\n')
780 self.client.lineReceived('550 /bar/baz: No such file or directory')
781 return d
782
783
784 def test_CWD(self):
785 """
786 Test the CWD command.
787
788 L{ftp.FTPClient.cwd} should return a Deferred which fires with a
789 sequence of one element which is the string the server sent
790 indicating that the command was executed successfully.
791
792 (XXX - This is a bad API)
793 """
794 def cbCwd(res):
795 self.assertEquals(res[0], '250 Requested File Action Completed OK')
796
797 self._testLogin()
798 d = self.client.cwd("bar/foo").addCallback(cbCwd)
799 self.assertEquals(self.transport.value(), 'CWD bar/foo\r\n')
800 self.client.lineReceived('250 Requested File Action Completed OK')
801 return d
802
803
804 def test_failedCWD(self):
805 """
806 Test a failure in CWD command.
807
808 When the PWD command fails, the returned Deferred should errback
809 with L{ftp.CommandFailed}.
810 """
811 self._testLogin()
812 d = self.client.cwd("bar/foo")
813 self.assertFailure(d, ftp.CommandFailed)
814 self.assertEquals(self.transport.value(), 'CWD bar/foo\r\n')
815 self.client.lineReceived('550 bar/foo: No such file or directory')
816 return d
817
818
819 def test_passiveRETR(self):
820 """
821 Test the RETR command in passive mode: get a file and verify its
822 content.
823
824 L{ftp.FTPClient.retrieveFile} should return a Deferred which fires
825 with the protocol instance passed to it after the download has
826 completed.
827
828 (XXX - This API should be based on producers and consumers)
829 """
830 def cbRetr(res, proto):
831 self.assertEquals(proto.buffer, 'x' * 1000)
832
833 def cbConnect(host, port, factory):
834 self.assertEquals(host, '127.0.0.1')
835 self.assertEquals(port, 12345)
836 proto = factory.buildProtocol((host, port))
837 proto.makeConnection(proto_helpers.StringTransport())
838 self.client.lineReceived(
839 '150 File status okay; about to open data connection.')
840 proto.dataReceived("x" * 1000)
841 proto.connectionLost(failure.Failure(error.ConnectionDone("")))
842
843 self.client.connectFactory = cbConnect
844 self._testLogin()
845 proto = _BufferingProtocol()
846 d = self.client.retrieveFile("spam", proto)
847 d.addCallback(cbRetr, proto)
848 self.assertEquals(self.transport.value(), 'PASV\r\n')
849 self.transport.clear()
850 self.client.lineReceived('227 Entering Passive Mode (%s).' %
851 (ftp.encodeHostPort('127.0.0.1', 12345),))
852 self.assertEquals(self.transport.value(), 'RETR spam\r\n')
853 self.transport.clear()
854 self.client.lineReceived('226 Transfer Complete.')
855 return d
856
857
858 def test_RETR(self):
859 """
860 Test the RETR command in non-passive mode.
861
862 Like L{test_passiveRETR} but in the configuration where the server
863 establishes the data connection to the client, rather than the other
864 way around.
865 """
866 self.client.passive = False
867
868 def generatePort(portCmd):
869 portCmd.text = 'PORT %s' % (ftp.encodeHostPort('127.0.0.1', 9876),)
870 portCmd.protocol.makeConnection(proto_helpers.StringTransport())
871 portCmd.protocol.dataReceived("x" * 1000)
872 portCmd.protocol.connectionLost(
873 failure.Failure(error.ConnectionDone("")))
874
875 def cbRetr(res, proto):
876 self.assertEquals(proto.buffer, 'x' * 1000)
877
878 self.client.generatePortCommand = generatePort
879 self._testLogin()
880 proto = _BufferingProtocol()
881 d = self.client.retrieveFile("spam", proto)
882 d.addCallback(cbRetr, proto)
883 self.assertEquals(self.transport.value(), 'PORT %s\r\n' %
884 (ftp.encodeHostPort('127.0.0.1', 9876),))
885 self.transport.clear()
886 self.client.lineReceived('200 PORT OK')
887 self.assertEquals(self.transport.value(), 'RETR spam\r\n')
888 self.transport.clear()
889 self.client.lineReceived('226 Transfer Complete.')
890 return d
891
892
893 def test_failedRETR(self):
894 """
895 Try to RETR an unexisting file.
896
897 L{ftp.FTPClient.retrieveFile} should return a Deferred which
898 errbacks with L{ftp.CommandFailed} if the server indicates the file
899 cannot be transferred for some reason.
900 """
901 def cbConnect(host, port, factory):
902 self.assertEquals(host, '127.0.0.1')
903 self.assertEquals(port, 12345)
904 proto = factory.buildProtocol((host, port))
905 proto.makeConnection(proto_helpers.StringTransport())
906 self.client.lineReceived(
907 '150 File status okay; about to open data connection.')
908 proto.connectionLost(failure.Failure(error.ConnectionDone("")))
909
910 self.client.connectFactory = cbConnect
911 self._testLogin()
912 proto = _BufferingProtocol()
913 d = self.client.retrieveFile("spam", proto)
914 self.assertFailure(d, ftp.CommandFailed)
915 self.assertEquals(self.transport.value(), 'PASV\r\n')
916 self.transport.clear()
917 self.client.lineReceived('227 Entering Passive Mode (%s).' %
918 (ftp.encodeHostPort('127.0.0.1', 12345),))
919 self.assertEquals(self.transport.value(), 'RETR spam\r\n')
920 self.transport.clear()
921 self.client.lineReceived('550 spam: No such file or directory')
922 return d
923
924
925 def test_lostRETR(self):
926 """
927 Try a RETR, but disconnect during the transfer.
928 L{ftp.FTPClient.retrieveFile} should return a Deferred which
929 errbacks with L{ftp.ConnectionLost)
930 """
931 self.client.passive = False
932
933 l = []
934 def generatePort(portCmd):
935 portCmd.text = 'PORT %s' % (ftp.encodeHostPort('127.0.0.1', 9876),)
936 tr = proto_helpers.StringTransportWithDisconnection()
937 portCmd.protocol.makeConnection(tr)
938 tr.protocol = portCmd.protocol
939 portCmd.protocol.dataReceived("x" * 500)
940 l.append(tr)
941
942 self.client.generatePortCommand = generatePort
943 self._testLogin()
944 proto = _BufferingProtocol()
945 d = self.client.retrieveFile("spam", proto)
946 self.assertEquals(self.transport.value(), 'PORT %s\r\n' %
947 (ftp.encodeHostPort('127.0.0.1', 9876),))
948 self.transport.clear()
949 self.client.lineReceived('200 PORT OK')
950 self.assertEquals(self.transport.value(), 'RETR spam\r\n')
951
952 self.assert_(l)
953 l[0].loseConnection()
954 self.transport.loseConnection()
955 self.assertFailure(d, ftp.ConnectionLost)
956 return d
957
958
959 def test_passiveSTOR(self):
960 """
961 Test the STOR command: send a file and verify its content.
962
963 L{ftp.FTPClient.storeFile} should return a two-tuple of Deferreds.
964 The first of which should fire with a protocol instance when the
965 data connection has been established and is responsible for sending
966 the contents of the file. The second of which should fire when the
967 upload has completed, the data connection has been closed, and the
968 server has acknowledged receipt of the file.
969
970 (XXX - storeFile should take a producer as an argument, instead, and
971 only return a Deferred which fires when the upload has succeeded or
972 failed).
973 """
974 tr = proto_helpers.StringTransport()
975 def cbStore(sender):
976 self.client.lineReceived(
977 '150 File status okay; about to open data connection.')
978 sender.transport.write("x" * 1000)
979 sender.finish()
980 sender.connectionLost(failure.Failure(error.ConnectionDone("")))
981
982 def cbFinish(ign):
983 self.assertEquals(tr.value(), "x" * 1000)
984
985 def cbConnect(host, port, factory):
986 self.assertEquals(host, '127.0.0.1')
987 self.assertEquals(port, 12345)
988 proto = factory.buildProtocol((host, port))
989 proto.makeConnection(tr)
990
991 self.client.connectFactory = cbConnect
992 self._testLogin()
993 d1, d2 = self.client.storeFile("spam")
994 d1.addCallback(cbStore)
995 d2.addCallback(cbFinish)
996 self.assertEquals(self.transport.value(), 'PASV\r\n')
997 self.transport.clear()
998 self.client.lineReceived('227 Entering Passive Mode (%s).' %
999 (ftp.encodeHostPort('127.0.0.1', 12345),))
1000 self.assertEquals(self.transport.value(), 'STOR spam\r\n')
1001 self.transport.clear()
1002 self.client.lineReceived('226 Transfer Complete.')
1003 return defer.gatherResults([d1, d2])
1004
1005
1006 def test_failedSTOR(self):
1007 """
1008 Test a failure in the STOR command.
1009
1010 If the server does not acknowledge successful receipt of the
1011 uploaded file, the second Deferred returned by
1012 L{ftp.FTPClient.storeFile} should errback with L{ftp.CommandFailed}.
1013 """
1014 tr = proto_helpers.StringTransport()
1015 def cbStore(sender):
1016 self.client.lineReceived(
1017 '150 File status okay; about to open data connection.')
1018 sender.transport.write("x" * 1000)
1019 sender.finish()
1020 sender.connectionLost(failure.Failure(error.ConnectionDone("")))
1021
1022 def cbConnect(host, port, factory):
1023 self.assertEquals(host, '127.0.0.1')
1024 self.assertEquals(port, 12345)
1025 proto = factory.buildProtocol((host, port))
1026 proto.makeConnection(tr)
1027
1028 self.client.connectFactory = cbConnect
1029 self._testLogin()
1030 d1, d2 = self.client.storeFile("spam")
1031 d1.addCallback(cbStore)
1032 self.assertFailure(d2, ftp.CommandFailed)
1033 self.assertEquals(self.transport.value(), 'PASV\r\n')
1034 self.transport.clear()
1035 self.client.lineReceived('227 Entering Passive Mode (%s).' %
1036 (ftp.encodeHostPort('127.0.0.1', 12345),))
1037 self.assertEquals(self.transport.value(), 'STOR spam\r\n')
1038 self.transport.clear()
1039 self.client.lineReceived(
1040 '426 Transfer aborted. Data connection closed.')
1041 return defer.gatherResults([d1, d2])
1042
1043
1044 def test_STOR(self):
1045 """
1046 Test the STOR command in non-passive mode.
1047
1048 Like L{test_passiveSTOR} but in the configuration where the server
1049 establishes the data connection to the client, rather than the other
1050 way around.
1051 """
1052 tr = proto_helpers.StringTransport()
1053 self.client.passive = False
1054 def generatePort(portCmd):
1055 portCmd.text = 'PORT %s' % ftp.encodeHostPort('127.0.0.1', 9876)
1056 portCmd.protocol.makeConnection(tr)
1057
1058 def cbStore(sender):
1059 self.assertEquals(self.transport.value(), 'PORT %s\r\n' %
1060 (ftp.encodeHostPort('127.0.0.1', 9876),))
1061 self.transport.clear()
1062 self.client.lineReceived('200 PORT OK')
1063 self.assertEquals(self.transport.value(), 'STOR spam\r\n')
1064 self.transport.clear()
1065 self.client.lineReceived(
1066 '150 File status okay; about to open data connection.')
1067 sender.transport.write("x" * 1000)
1068 sender.finish()
1069 sender.connectionLost(failure.Failure(error.ConnectionDone("")))
1070 self.client.lineReceived('226 Transfer Complete.')
1071
1072 def cbFinish(ign):
1073 self.assertEquals(tr.value(), "x" * 1000)
1074
1075 self.client.generatePortCommand = generatePort
1076 self._testLogin()
1077 d1, d2 = self.client.storeFile("spam")
1078 d1.addCallback(cbStore)
1079 d2.addCallback(cbFinish)
1080 return defer.gatherResults([d1, d2])
1081
1082
1083 def test_passiveLIST(self):
1084 """
1085 Test the LIST command.
1086
1087 L{ftp.FTPClient.list} should return a Deferred which fires with a
1088 protocol instance which was passed to list after the command has
1089 succeeded.
1090
1091 (XXX - This is a very unfortunate API; if my understanding is
1092 correct, the results are always at least line-oriented, so allowing
1093 a per-line parser function to be specified would make this simpler,
1094 but a default implementation should really be provided which knows
1095 how to deal with all the formats used in real servers, so
1096 application developers never have to care about this insanity. It
1097 would also be nice to either get back a Deferred of a list of
1098 filenames or to be able to consume the files as they are received
1099 (which the current API does allow, but in a somewhat inconvenient
1100 fashion) -exarkun)
1101 """
1102 def cbList(res, fileList):
1103 fls = [f["filename"] for f in fileList.files]
1104 expected = ["foo", "bar", "baz"]
1105 expected.sort()
1106 fls.sort()
1107 self.assertEquals(fls, expected)
1108
1109 def cbConnect(host, port, factory):
1110 self.assertEquals(host, '127.0.0.1')
1111 self.assertEquals(port, 12345)
1112 proto = factory.buildProtocol((host, port))
1113 proto.makeConnection(proto_helpers.StringTransport())
1114 self.client.lineReceived(
1115 '150 File status okay; about to open data connection.')
1116 sending = [
1117 '-rw-r--r-- 0 spam egg 100 Oct 10 2006 foo\r\n',
1118 '-rw-r--r-- 3 spam egg 100 Oct 10 2006 bar\r\n',
1119 '-rw-r--r-- 4 spam egg 100 Oct 10 2006 baz\r\n',
1120 ]
1121 for i in sending:
1122 proto.dataReceived(i)
1123 proto.connectionLost(failure.Failure(error.ConnectionDone("")))
1124
1125 self.client.connectFactory = cbConnect
1126 self._testLogin()
1127 fileList = ftp.FTPFileListProtocol()
1128 d = self.client.list('foo/bar', fileList).addCallback(cbList, fileList)
1129 self.assertEquals(self.transport.value(), 'PASV\r\n')
1130 self.transport.clear()
1131 self.client.lineReceived('227 Entering Passive Mode (%s).' %
1132 (ftp.encodeHostPort('127.0.0.1', 12345),))
1133 self.assertEquals(self.transport.value(), 'LIST foo/bar\r\n')
1134 self.client.lineReceived('226 Transfer Complete.')
1135 return d
1136
1137
1138 def test_LIST(self):
1139 """
1140 Test the LIST command in non-passive mode.
1141
1142 Like L{test_passiveLIST} but in the configuration where the server
1143 establishes the data connection to the client, rather than the other
1144 way around.
1145 """
1146 self.client.passive = False
1147 def generatePort(portCmd):
1148 portCmd.text = 'PORT %s' % (ftp.encodeHostPort('127.0.0.1', 9876),)
1149 portCmd.protocol.makeConnection(proto_helpers.StringTransport())
1150 self.client.lineReceived(
1151 '150 File status okay; about to open data connection.')
1152 sending = [
1153 '-rw-r--r-- 0 spam egg 100 Oct 10 2006 foo\r\n',
1154 '-rw-r--r-- 3 spam egg 100 Oct 10 2006 bar\r\n',
1155 '-rw-r--r-- 4 spam egg 100 Oct 10 2006 baz\r\n',
1156 ]
1157 for i in sending:
1158 portCmd.protocol.dataReceived(i)
1159 portCmd.protocol.connectionLost(
1160 failure.Failure(error.ConnectionDone("")))
1161
1162 def cbList(res, fileList):
1163 fls = [f["filename"] for f in fileList.files]
1164 expected = ["foo", "bar", "baz"]
1165 expected.sort()
1166 fls.sort()
1167 self.assertEquals(fls, expected)
1168
1169 self.client.generatePortCommand = generatePort
1170 self._testLogin()
1171 fileList = ftp.FTPFileListProtocol()
1172 d = self.client.list('foo/bar', fileList).addCallback(cbList, fileList)
1173 self.assertEquals(self.transport.value(), 'PORT %s\r\n' %
1174 (ftp.encodeHostPort('127.0.0.1', 9876),))
1175 self.transport.clear()
1176 self.client.lineReceived('200 PORT OK')
1177 self.assertEquals(self.transport.value(), 'LIST foo/bar\r\n')
1178 self.transport.clear()
1179 self.client.lineReceived('226 Transfer Complete.')
1180 return d
1181
1182
1183 def test_failedLIST(self):
1184 """
1185 Test a failure in LIST command.
1186
1187 L{ftp.FTPClient.list} should return a Deferred which fails with
1188 L{ftp.CommandFailed} if the server indicates the indicated path is
1189 invalid for some reason.
1190 """
1191 def cbConnect(host, port, factory):
1192 self.assertEquals(host, '127.0.0.1')
1193 self.assertEquals(port, 12345)
1194 proto = factory.buildProtocol((host, port))
1195 proto.makeConnection(proto_helpers.StringTransport())
1196 self.client.lineReceived(
1197 '150 File status okay; about to open data connection.')
1198 proto.connectionLost(failure.Failure(error.ConnectionDone("")))
1199
1200 self.client.connectFactory = cbConnect
1201 self._testLogin()
1202 fileList = ftp.FTPFileListProtocol()
1203 d = self.client.list('foo/bar', fileList)
1204 self.assertFailure(d, ftp.CommandFailed)
1205 self.assertEquals(self.transport.value(), 'PASV\r\n')
1206 self.transport.clear()
1207 self.client.lineReceived('227 Entering Passive Mode (%s).' %
1208 (ftp.encodeHostPort('127.0.0.1', 12345),))
1209 self.assertEquals(self.transport.value(), 'LIST foo/bar\r\n')
1210 self.client.lineReceived('550 foo/bar: No such file or directory')
1211 return d
1212
1213
1214 def test_NLST(self):
1215 """
1216 Test the NLST command in non-passive mode.
1217
1218 L{ftp.FTPClient.nlst} should return a Deferred which fires with a
1219 list of filenames when the list command has completed.
1220 """
1221 self.client.passive = False
1222 def generatePort(portCmd):
1223 portCmd.text = 'PORT %s' % (ftp.encodeHostPort('127.0.0.1', 9876),)
1224 portCmd.protocol.makeConnection(proto_helpers.StringTransport())
1225 self.client.lineReceived(
1226 '150 File status okay; about to open data connection.')
1227 portCmd.protocol.dataReceived('foo\r\n')
1228 portCmd.protocol.dataReceived('bar\r\n')
1229 portCmd.protocol.dataReceived('baz\r\n')
1230 portCmd.protocol.connectionLost(
1231 failure.Failure(error.ConnectionDone("")))
1232
1233 def cbList(res, proto):
1234 fls = proto.buffer.splitlines()
1235 expected = ["foo", "bar", "baz"]
1236 expected.sort()
1237 fls.sort()
1238 self.assertEquals(fls, expected)
1239
1240 self.client.generatePortCommand = generatePort
1241 self._testLogin()
1242 lstproto = _BufferingProtocol()
1243 d = self.client.nlst('foo/bar', lstproto).addCallback(cbList, lstproto)
1244 self.assertEquals(self.transport.value(), 'PORT %s\r\n' %
1245 (ftp.encodeHostPort('127.0.0.1', 9876),))
1246 self.transport.clear()
1247 self.client.lineReceived('200 PORT OK')
1248 self.assertEquals(self.transport.value(), 'NLST foo/bar\r\n')
1249 self.client.lineReceived('226 Transfer Complete.')
1250 return d
1251
1252
1253 def test_passiveNLST(self):
1254 """
1255 Test the NLST command.
1256
1257 Like L{test_passiveNLST} but in the configuration where the server
1258 establishes the data connection to the client, rather than the other
1259 way around.
1260 """
1261 def cbList(res, proto):
1262 fls = proto.buffer.splitlines()
1263 expected = ["foo", "bar", "baz"]
1264 expected.sort()
1265 fls.sort()
1266 self.assertEquals(fls, expected)
1267
1268 def cbConnect(host, port, factory):
1269 self.assertEquals(host, '127.0.0.1')
1270 self.assertEquals(port, 12345)
1271 proto = factory.buildProtocol((host, port))
1272 proto.makeConnection(proto_helpers.StringTransport())
1273 self.client.lineReceived(
1274 '150 File status okay; about to open data connection.')
1275 proto.dataReceived('foo\r\n')
1276 proto.dataReceived('bar\r\n')
1277 proto.dataReceived('baz\r\n')
1278 proto.connectionLost(failure.Failure(error.ConnectionDone("")))
1279
1280 self.client.connectFactory = cbConnect
1281 self._testLogin()
1282 lstproto = _BufferingProtocol()
1283 d = self.client.nlst('foo/bar', lstproto).addCallback(cbList, lstproto)
1284 self.assertEquals(self.transport.value(), 'PASV\r\n')
1285 self.transport.clear()
1286 self.client.lineReceived('227 Entering Passive Mode (%s).' %
1287 (ftp.encodeHostPort('127.0.0.1', 12345),))
1288 self.assertEquals(self.transport.value(), 'NLST foo/bar\r\n')
1289 self.client.lineReceived('226 Transfer Complete.')
1290 return d
1291
1292
1293 def test_failedNLST(self):
1294 """
1295 Test a failure in NLST command.
1296
1297 L{ftp.FTPClient.nlst} should return a Deferred which fails with
1298 L{ftp.CommandFailed} if the server indicates the indicated path is
1299 invalid for some reason.
1300 """
1301 tr = proto_helpers.StringTransport()
1302 def cbConnect(host, port, factory):
1303 self.assertEquals(host, '127.0.0.1')
1304 self.assertEquals(port, 12345)
1305 proto = factory.buildProtocol((host, port))
1306 proto.makeConnection(tr)
1307 self.client.lineReceived(
1308 '150 File status okay; about to open data connection.')
1309 proto.connectionLost(failure.Failure(error.ConnectionDone("")))
1310
1311 self.client.connectFactory = cbConnect
1312 self._testLogin()
1313 lstproto = _BufferingProtocol()
1314 d = self.client.nlst('foo/bar', lstproto)
1315 self.assertFailure(d, ftp.CommandFailed)
1316 self.assertEquals(self.transport.value(), 'PASV\r\n')
1317 self.transport.clear()
1318 self.client.lineReceived('227 Entering Passive Mode (%s).' %
1319 (ftp.encodeHostPort('127.0.0.1', 12345),))
1320 self.assertEquals(self.transport.value(), 'NLST foo/bar\r\n')
1321 self.client.lineReceived('550 foo/bar: No such file or directory')
1322 return d
1323
1324
1325 def test_changeDirectory(self):
1326 """
1327 Test the changeDirectory method.
1328
1329 L{ftp.FTPClient.changeDirectory} should return a Deferred which fires
1330 with True if succeeded.
1331 """
1332 def cbCd(res):
1333 self.assertEquals(res, True)
1334
1335 self._testLogin()
1336 d = self.client.changeDirectory("bar/foo").addCallback(cbCd)
1337 self.assertEquals(self.transport.value(), 'CWD bar/foo\r\n')
1338 self.client.lineReceived('250 Requested File Action Completed OK')
1339 return d
1340
1341
1342 def test_failedChangeDirectory(self):
1343 """
1344 Test a failure in the changeDirectory method.
1345
1346 The behaviour here is the same as a failed CWD.
1347 """
1348 self._testLogin()
1349 d = self.client.changeDirectory("bar/foo")
1350 self.assertFailure(d, ftp.CommandFailed)
1351 self.assertEquals(self.transport.value(), 'CWD bar/foo\r\n')
1352 self.client.lineReceived('550 bar/foo: No such file or directory')
1353 return d
1354
1355
1356 def test_strangeFailedChangeDirectory(self):
1357 """
1358 Test a strange failure in changeDirectory method.
1359
1360 L{ftp.FTPClient.changeDirectory} is stricter than CWD as it checks
1361 code 250 for success.
1362 """
1363 self._testLogin()
1364 d = self.client.changeDirectory("bar/foo")
1365 self.assertFailure(d, ftp.CommandFailed)
1366 self.assertEquals(self.transport.value(), 'CWD bar/foo\r\n')
1367 self.client.lineReceived('252 I do what I want !')
1368 return d
1369
1370
1371 def test_getDirectory(self):
1372 """
1373 Test the getDirectory method.
1374
1375 L{ftp.FTPClient.getDirectory} should return a Deferred which fires with
1376 the current directory on the server. It wraps PWD command.
1377 """
1378 def cbGet(res):
1379 self.assertEquals(res, "/bar/baz")
1380
1381 self._testLogin()
1382 d = self.client.getDirectory().addCallback(cbGet)
1383 self.assertEquals(self.transport.value(), 'PWD\r\n')
1384 self.client.lineReceived('257 "/bar/baz"')
1385 return d
1386
1387
1388 def test_failedGetDirectory(self):
1389 """
1390 Test a failure in getDirectory method.
1391
1392 The behaviour should be the same as PWD.
1393 """
1394 self._testLogin()
1395 d = self.client.getDirectory()
1396 self.assertFailure(d, ftp.CommandFailed)
1397 self.assertEquals(self.transport.value(), 'PWD\r\n')
1398 self.client.lineReceived('550 /bar/baz: No such file or directory')
1399 return d
1400
1401
1402 def test_anotherFailedGetDirectory(self):
1403 """
1404 Test a different failure in getDirectory method.
1405
1406 The response should be quoted to be parsed, so it returns an error
1407 otherwise.
1408 """
1409 self._testLogin()
1410 d = self.client.getDirectory()
1411 self.assertFailure(d, ftp.CommandFailed)
1412 self.assertEquals(self.transport.value(), 'PWD\r\n')
1413 self.client.lineReceived('257 /bar/baz')
1414 return d
1415
1416
1417
1418 class DummyTransport:
1419 def write(self, bytes):
1420 pass
1421
1422 class BufferingTransport:
1423 buffer = ''
1424 def write(self, bytes):
1425 self.buffer += bytes
1426
1427
1428 class FTPClientBasicTests(unittest.TestCase):
1429
1430 def testGreeting(self):
1431 # The first response is captured as a greeting.
1432 ftpClient = ftp.FTPClientBasic()
1433 ftpClient.lineReceived('220 Imaginary FTP.')
1434 self.failUnlessEqual(['220 Imaginary FTP.'], ftpClient.greeting)
1435
1436 def testResponseWithNoMessage(self):
1437 # Responses with no message are still valid, i.e. three digits followed
1438 # by a space is complete response.
1439 ftpClient = ftp.FTPClientBasic()
1440 ftpClient.lineReceived('220 ')
1441 self.failUnlessEqual(['220 '], ftpClient.greeting)
1442
1443 def testMultilineResponse(self):
1444 ftpClient = ftp.FTPClientBasic()
1445 ftpClient.transport = DummyTransport()
1446 ftpClient.lineReceived('220 Imaginary FTP.')
1447
1448 # Queue (and send) a dummy command, and set up a callback to capture the
1449 # result
1450 deferred = ftpClient.queueStringCommand('BLAH')
1451 result = []
1452 deferred.addCallback(result.append)
1453 deferred.addErrback(self.fail)
1454
1455 # Send the first line of a multiline response.
1456 ftpClient.lineReceived('210-First line.')
1457 self.failUnlessEqual([], result)
1458
1459 # Send a second line, again prefixed with "nnn-".
1460 ftpClient.lineReceived('123-Second line.')
1461 self.failUnlessEqual([], result)
1462
1463 # Send a plain line of text, no prefix.
1464 ftpClient.lineReceived('Just some text.')
1465 self.failUnlessEqual([], result)
1466
1467 # Now send a short (less than 4 chars) line.
1468 ftpClient.lineReceived('Hi')
1469 self.failUnlessEqual([], result)
1470
1471 # Now send an empty line.
1472 ftpClient.lineReceived('')
1473 self.failUnlessEqual([], result)
1474
1475 # And a line with 3 digits in it, and nothing else.
1476 ftpClient.lineReceived('321')
1477 self.failUnlessEqual([], result)
1478
1479 # Now finish it.
1480 ftpClient.lineReceived('210 Done.')
1481 self.failUnlessEqual(
1482 ['210-First line.',
1483 '123-Second line.',
1484 'Just some text.',
1485 'Hi',
1486 '',
1487 '321',
1488 '210 Done.'], result[0])
1489
1490 def testNoPasswordGiven(self):
1491 """Passing None as the password avoids sending the PASS command."""
1492 # Create a client, and give it a greeting.
1493 ftpClient = ftp.FTPClientBasic()
1494 ftpClient.transport = BufferingTransport()
1495 ftpClient.lineReceived('220 Welcome to Imaginary FTP.')
1496
1497 # Queue a login with no password
1498 ftpClient.queueLogin('bob', None)
1499 self.failUnlessEqual('USER bob\r\n', ftpClient.transport.buffer)
1500
1501 # Clear the test buffer, acknowledge the USER command.
1502 ftpClient.transport.buffer = ''
1503 ftpClient.lineReceived('200 Hello bob.')
1504
1505 # The client shouldn't have sent anything more (i.e. it shouldn't have
1506 # sent a PASS command).
1507 self.failUnlessEqual('', ftpClient.transport.buffer)
1508
1509 def testNoPasswordNeeded(self):
1510 """Receiving a 230 response to USER prevents PASS from being sent."""
1511 # Create a client, and give it a greeting.
1512 ftpClient = ftp.FTPClientBasic()
1513 ftpClient.transport = BufferingTransport()
1514 ftpClient.lineReceived('220 Welcome to Imaginary FTP.')
1515
1516 # Queue a login with no password
1517 ftpClient.queueLogin('bob', 'secret')
1518 self.failUnlessEqual('USER bob\r\n', ftpClient.transport.buffer)
1519
1520 # Clear the test buffer, acknowledge the USER command with a 230
1521 # response code.
1522 ftpClient.transport.buffer = ''
1523 ftpClient.lineReceived('230 Hello bob. No password needed.')
1524
1525 # The client shouldn't have sent anything more (i.e. it shouldn't have
1526 # sent a PASS command).
1527 self.failUnlessEqual('', ftpClient.transport.buffer)
1528
1529
1530 class PathHandling(unittest.TestCase):
1531 def testNormalizer(self):
1532 for inp, outp in [('a', ['a']),
1533 ('/a', ['a']),
1534 ('/', []),
1535 ('a/b/c', ['a', 'b', 'c']),
1536 ('/a/b/c', ['a', 'b', 'c']),
1537 ('/a/', ['a']),
1538 ('a/', ['a'])]:
1539 self.assertEquals(ftp.toSegments([], inp), outp)
1540
1541 for inp, outp in [('b', ['a', 'b']),
1542 ('b/', ['a', 'b']),
1543 ('/b', ['b']),
1544 ('/b/', ['b']),
1545 ('b/c', ['a', 'b', 'c']),
1546 ('b/c/', ['a', 'b', 'c']),
1547 ('/b/c', ['b', 'c']),
1548 ('/b/c/', ['b', 'c'])]:
1549 self.assertEquals(ftp.toSegments(['a'], inp), outp)
1550
1551 for inp, outp in [('//', []),
1552 ('//a', ['a']),
1553 ('a//', ['a']),
1554 ('a//b', ['a', 'b'])]:
1555 self.assertEquals(ftp.toSegments([], inp), outp)
1556
1557 for inp, outp in [('//', []),
1558 ('//b', ['b']),
1559 ('b//c', ['a', 'b', 'c'])]:
1560 self.assertEquals(ftp.toSegments(['a'], inp), outp)
1561
1562 for inp, outp in [('..', []),
1563 ('../', []),
1564 ('a/..', ['x']),
1565 ('/a/..', []),
1566 ('/a/b/..', ['a']),
1567 ('/a/b/../', ['a']),
1568 ('/a/b/../c', ['a', 'c']),
1569 ('/a/b/../c/', ['a', 'c']),
1570 ('/a/b/../../c', ['c']),
1571 ('/a/b/../../c/', ['c']),
1572 ('/a/b/../../c/..', []),
1573 ('/a/b/../../c/../', [])]:
1574 self.assertEquals(ftp.toSegments(['x'], inp), outp)
1575
1576 for inp in ['..', '../', 'a/../..', 'a/../../',
1577 '/..', '/../', '/a/../..', '/a/../../',
1578 '/a/b/../../..']:
1579 self.assertRaises(ftp.InvalidPath, ftp.toSegments, [], inp)
1580
1581 for inp in ['../..', '../../', '../a/../..']:
1582 self.assertRaises(ftp.InvalidPath, ftp.toSegments, ['x'], inp)
1583
1584
1585
1586 class ErrnoToFailureTestCase(unittest.TestCase):
1587 """
1588 Tests for L{ftp.errnoToFailure} errno checking.
1589 """
1590
1591 def test_notFound(self):
1592 """
1593 C{errno.ENOENT} should be translated to L{ftp.FileNotFoundError}.
1594 """
1595 d = ftp.errnoToFailure(errno.ENOENT, "foo")
1596 return self.assertFailure(d, ftp.FileNotFoundError)
1597
1598
1599 def test_permissionDenied(self):
1600 """
1601 C{errno.EPERM} should be translated to L{ftp.PermissionDeniedError}.
1602 """
1603 d = ftp.errnoToFailure(errno.EPERM, "foo")
1604 return self.assertFailure(d, ftp.PermissionDeniedError)
1605
1606
1607 def test_accessDenied(self):
1608 """
1609 C{errno.EACCES} should be translated to L{ftp.PermissionDeniedError}.
1610 """
1611 d = ftp.errnoToFailure(errno.EACCES, "foo")
1612 return self.assertFailure(d, ftp.PermissionDeniedError)
1613
1614
1615 def test_notDirectory(self):
1616 """
1617 C{errno.ENOTDIR} should be translated to L{ftp.IsNotADirectoryError}.
1618 """
1619 d = ftp.errnoToFailure(errno.ENOTDIR, "foo")
1620 return self.assertFailure(d, ftp.IsNotADirectoryError)
1621
1622
1623 def test_fileExists(self):
1624 """
1625 C{errno.EEXIST} should be translated to L{ftp.FileExistsError}.
1626 """
1627 d = ftp.errnoToFailure(errno.EEXIST, "foo")
1628 return self.assertFailure(d, ftp.FileExistsError)
1629
1630
1631 def test_isDirectory(self):
1632 """
1633 C{errno.EISDIR} should be translated to L{ftp.IsADirectoryError}.
1634 """
1635 d = ftp.errnoToFailure(errno.EISDIR, "foo")
1636 return self.assertFailure(d, ftp.IsADirectoryError)
1637
1638
1639 def test_passThrough(self):
1640 """
1641 If an unknown errno is passed to L{ftp.errnoToFailure}, it should let
1642 the originating exception pass through.
1643 """
1644 try:
1645 raise RuntimeError("bar")
1646 except:
1647 d = ftp.errnoToFailure(-1, "foo")
1648 return self.assertFailure(d, RuntimeError)
1649
1650
1651
1652 class AnonymousFTPShellTestCase(unittest.TestCase):
1653 """
1654 Test anynomous shell properties.
1655 """
1656
1657 def test_anonymousWrite(self):
1658 """
1659 Check that L{ftp.FTPAnonymousShell} returns an error when trying to
1660 open it in write mode.
1661 """
1662 shell = ftp.FTPAnonymousShell('')
1663 d = shell.openForWriting(('foo',))
1664 self.assertFailure(d, ftp.PermissionDeniedError)
1665 return d
1666
1667
1668
1669 class IFTPShellTestsMixin:
1670 """
1671 Generic tests for the C{IFTPShell} interface.
1672 """
1673
1674 def directoryExists(self, path):
1675 """
1676 Test if the directory exists at C{path}.
1677
1678 @param path: the relative path to check.
1679 @type path: C{str}.
1680
1681 @return: C{True} if C{path} exists and is a directory, C{False} if
1682 it's not the case
1683 @rtype: C{bool}
1684 """
1685 raise NotImplementedError()
1686
1687
1688 def createDirectory(self, path):
1689 """
1690 Create a directory in C{path}.
1691
1692 @param path: the relative path of the directory to create, with one
1693 segment.
1694 @type path: C{str}
1695 """
1696 raise NotImplementedError()
1697
1698
1699 def fileExists(self, path):
1700 """
1701 Test if the file exists at C{path}.
1702
1703 @param path: the relative path to check.
1704 @type path: C{str}.
1705
1706 @return: C{True} if C{path} exists and is a file, C{False} if it's not
1707 the case.
1708 @rtype: C{bool}
1709 """
1710 raise NotImplementedError()
1711
1712
1713 def createFile(self, path, fileContent=''):
1714 """
1715 Create a file named C{path} with some content.
1716
1717 @param path: the relative path of the file to create, without
1718 directory.
1719 @type path: C{str}
1720
1721 @param fileContent: the content of the file.
1722 @type fileContent: C{str}
1723 """
1724 raise NotImplementedError()
1725
1726
1727 def test_createDirectory(self):
1728 """
1729 C{directoryExists} should report correctly about directory existence,
1730 and C{createDirectory} should create a directory detectable by
1731 C{directoryExists}.
1732 """
1733 self.assertFalse(self.directoryExists('bar'))
1734 self.createDirectory('bar')
1735 self.assertTrue(self.directoryExists('bar'))
1736
1737
1738 def test_createFile(self):
1739 """
1740 C{fileExists} should report correctly about file existence, and
1741 C{createFile} should create a file detectable by C{fileExists}.
1742 """
1743 self.assertFalse(self.fileExists('file.txt'))
1744 self.createFile('file.txt')
1745 self.assertTrue(self.fileExists('file.txt'))
1746
1747
1748 def test_makeDirectory(self):
1749 """
1750 Create a directory and check it ends in the filesystem.
1751 """
1752 d = self.shell.makeDirectory(('foo',))
1753 def cb(result):
1754 self.assertTrue(self.directoryExists('foo'))
1755 return d.addCallback(cb)
1756
1757
1758 def test_makeDirectoryError(self):
1759 """
1760 Creating a directory that already exists should fail with a
1761 C{ftp.FileExistsError}.
1762 """
1763 self.createDirectory('foo')
1764 d = self.shell.makeDirectory(('foo',))
1765 return self.assertFailure(d, ftp.FileExistsError)
1766
1767
1768 def test_removeDirectory(self):
1769 """
1770 Try to remove a directory and check it's removed from the filesystem.
1771 """
1772 self.createDirectory('bar')
1773 d = self.shell.removeDirectory(('bar',))
1774 def cb(result):
1775 self.assertFalse(self.directoryExists('bar'))
1776 return d.addCallback(cb)
1777
1778
1779 def test_removeDirectoryOnFile(self):
1780 """
1781 removeDirectory should not work in file and fail with a
1782 C{ftp.IsNotADirectoryError}.
1783 """
1784 self.createFile('file.txt')
1785 d = self.shell.removeDirectory(('file.txt',))
1786 return self.assertFailure(d, ftp.IsNotADirectoryError)
1787
1788
1789 def test_removeNotExistingDirectory(self):
1790 """
1791 Removing directory that doesn't exist should fail with a
1792 C{ftp.FileNotFoundError}.
1793 """
1794 d = self.shell.removeDirectory(('bar',))
1795 return self.assertFailure(d, ftp.FileNotFoundError)
1796
1797
1798 def test_removeFile(self):
1799 """
1800 Try to remove a file and check it's removed from the filesystem.
1801 """
1802 self.createFile('file.txt')
1803 d = self.shell.removeFile(('file.txt',))
1804 def cb(res):
1805 self.assertFalse(self.fileExists('file.txt'))
1806 d.addCallback(cb)
1807 return d
1808
1809
1810 def test_removeFileOnDirectory(self):
1811 """
1812 removeFile should not work on directory.
1813 """
1814 self.createDirectory('ned')
1815 d = self.shell.removeFile(('ned',))
1816 return self.assertFailure(d, ftp.IsADirectoryError)
1817
1818
1819 def test_removeNotExistingFile(self):
1820 """
1821 Try to remove a non existent file, and check it raises a
1822 L{ivfs.NotFoundError}.
1823 """
1824 d = self.shell.removeFile(('foo',))
1825 return self.assertFailure(d, ftp.FileNotFoundError)
1826
1827
1828 def test_list(self):
1829 """
1830 Check the output of the list method.
1831 """
1832 self.createDirectory('ned')
1833 self.createFile('file.txt')
1834 d = self.shell.list(('.',))
1835 def cb(l):
1836 l.sort()
1837 self.assertEquals(l,
1838 [('file.txt', []), ('ned', [])])
1839 return d.addCallback(cb)
1840
1841
1842 def test_listWithStat(self):
1843 """
1844 Check the output of list with asked stats.
1845 """
1846 self.createDirectory('ned')
1847 self.createFile('file.txt')
1848 d = self.shell.list(('.',), ('size', 'permissions',))
1849 def cb(l):
1850 l.sort()
1851 self.assertEquals(len(l), 2)
1852 self.assertEquals(l[0][0], 'file.txt')
1853 self.assertEquals(l[1][0], 'ned')
1854 # Size and permissions are reported differently between platforms
1855 # so just check they are present
1856 self.assertEquals(len(l[0][1]), 2)
1857 self.assertEquals(len(l[1][1]), 2)
1858 return d.addCallback(cb)
1859
1860
1861 def test_listWithInvalidStat(self):
1862 """
1863 Querying an invalid stat should result to a C{AttributeError}.
1864 """
1865 self.createDirectory('ned')
1866 d = self.shell.list(('.',), ('size', 'whateverstat',))
1867 return self.assertFailure(d, AttributeError)
1868
1869
1870 def test_listFile(self):
1871 """
1872 Check the output of the list method on a file.
1873 """
1874 self.createFile('file.txt')
1875 d = self.shell.list(('file.txt',))
1876 def cb(l):
1877 l.sort()
1878 self.assertEquals(l,
1879 [('file.txt', [])])
1880 return d.addCallback(cb)
1881
1882
1883 def test_listNotExistingDirectory(self):
1884 """
1885 list on a directory that doesn't exist should fail with a
1886 L{ftp.FileNotFoundError}.
1887 """
1888 d = self.shell.list(('foo',))
1889 return self.assertFailure(d, ftp.FileNotFoundError)
1890
1891
1892 def test_access(self):
1893 """
1894 Try to access a resource.
1895 """
1896 self.createDirectory('ned')
1897 d = self.shell.access(('ned',))
1898 return d
1899
1900
1901 def test_accessNotFound(self):
1902 """
1903 access should fail on a resource that doesn't exist.
1904 """
1905 d = self.shell.access(('foo',))
1906 return self.assertFailure(d, ftp.FileNotFoundError)
1907
1908
1909 def test_openForReading(self):
1910 """
1911 Check that openForReading returns an object providing C{ftp.IReadFile}.
1912 """
1913 self.createFile('file.txt')
1914 d = self.shell.openForReading(('file.txt',))
1915 def cb(res):
1916 self.assertTrue(ftp.IReadFile.providedBy(res))
1917 d.addCallback(cb)
1918 return d
1919
1920
1921 def test_openForReadingNotFound(self):
1922 """
1923 openForReading should fail with a C{ftp.FileNotFoundError} on a file
1924 that doesn't exist.
1925 """
1926 d = self.shell.openForReading(('ned',))
1927 return self.assertFailure(d, ftp.FileNotFoundError)
1928
1929
1930 def test_openForReadingOnDirectory(self):
1931 """
1932 openForReading should not work on directory.
1933 """
1934 self.createDirectory('ned')
1935 d = self.shell.openForReading(('ned',))
1936 return self.assertFailure(d, ftp.IsADirectoryError)
1937
1938
1939 def test_openForWriting(self):
1940 """
1941 Check that openForWriting returns an object providing C{ftp.IWriteFile}.
1942 """
1943 d = self.shell.openForWriting(('foo',))
1944 def cb1(res):
1945 self.assertTrue(ftp.IWriteFile.providedBy(res))
1946 return res.receive().addCallback(cb2)
1947 def cb2(res):
1948 self.assertTrue(IConsumer.providedBy(res))
1949 d.addCallback(cb1)
1950 return d
1951
1952
1953 def test_openForWritingExistingDirectory(self):
1954 """
1955 openForWriting should not be able to open a directory that already
1956 exists.
1957 """
1958 self.createDirectory('ned')
1959 d = self.shell.openForWriting(('ned',))
1960 return self.assertFailure(d, ftp.IsADirectoryError)
1961
1962
1963 def test_openForWritingInNotExistingDirectory(self):
1964 """
1965 openForWring should fail with a L{ftp.FileNotFoundError} if you specify
1966 a file in a directory that doesn't exist.
1967 """
1968 self.createDirectory('ned')
1969 d = self.shell.openForWriting(('ned', 'idonotexist', 'foo'))
1970 return self.assertFailure(d, ftp.FileNotFoundError)
1971
1972
1973 def test_statFile(self):
1974 """
1975 Check the output of the stat method on a file.
1976 """
1977 fileContent = 'wobble\n'
1978 self.createFile('file.txt', fileContent)
1979 d = self.shell.stat(('file.txt',), ('size', 'directory'))
1980 def cb(res):
1981 self.assertEquals(res[0], len(fileContent))
1982 self.assertFalse(res[1])
1983 d.addCallback(cb)
1984 return d
1985
1986
1987 def test_statDirectory(self):
1988 """
1989 Check the output of the stat method on a directory.
1990 """
1991 self.createDirectory('ned')
1992 d = self.shell.stat(('ned',), ('size', 'directory'))
1993 def cb(res):
1994 self.assertTrue(res[1])
1995 d.addCallback(cb)
1996 return d
1997
1998
1999 def test_statOwnerGroup(self):
2000 """
2001 Check the owner and groups stats.
2002 """
2003 self.createDirectory('ned')
2004 d = self.shell.stat(('ned',), ('owner', 'group'))
2005 def cb(res):
2006 self.assertEquals(len(res), 2)
2007 d.addCallback(cb)
2008 return d
2009
2010
2011 def test_statNotExisting(self):
2012 """
2013 stat should fail with L{ftp.FileNotFoundError} on a file that doesn't
2014 exist.
2015 """
2016 d = self.shell.stat(('foo',), ('size', 'directory'))
2017 return self.assertFailure(d, ftp.FileNotFoundError)
2018
2019
2020 def test_invalidStat(self):
2021 """
2022 Querying an invalid stat should result to a C{AttributeError}.
2023 """
2024 self.createDirectory('ned')
2025 d = self.shell.stat(('ned',), ('size', 'whateverstat'))
2026 return self.assertFailure(d, AttributeError)
2027
2028
2029 def test_rename(self):
2030 """
2031 Try to rename a directory.
2032 """
2033 self.createDirectory('ned')
2034 d = self.shell.rename(('ned',), ('foo',))
2035 def cb(res):
2036 self.assertTrue(self.directoryExists('foo'))
2037 self.assertFalse(self.directoryExists('ned'))
2038 return d.addCallback(cb)
2039
2040
2041 def test_renameNotExisting(self):
2042 """
2043 Renaming a directory that doesn't exist should fail with
2044 L{ftp.FileNotFoundError}.
2045 """
2046 d = self.shell.rename(('foo',), ('bar',))
2047 return self.assertFailure(d, ftp.FileNotFoundError)
2048
2049
2050
2051 class FTPShellTestCase(unittest.TestCase, IFTPShellTestsMixin):
2052 """
2053 Tests for the C{ftp.FTPShell} object.
2054 """
2055
2056 def setUp(self):
2057 """
2058 Create a root directory and instantiate a shell.
2059 """
2060 self.root = filepath.FilePath(self.mktemp())
2061 self.root.createDirectory()
2062 self.shell = ftp.FTPShell(self.root)
2063
2064
2065 def directoryExists(self, path):
2066 """
2067 Test if the directory exists at C{path}.
2068 """
2069 return self.root.child(path).isdir()
2070
2071
2072 def createDirectory(self, path):
2073 """
2074 Create a directory in C{path}.
2075 """
2076 return self.root.child(path).createDirectory()
2077
2078
2079 def fileExists(self, path):
2080 """
2081 Test if the file exists at C{path}.
2082 """
2083 return self.root.child(path).isfile()
2084
2085
2086 def createFile(self, path, fileContent=''):
2087 """
2088 Create a file named C{path} with some content.
2089 """
2090 return self.root.child(path).setContent(fileContent)
2091
2092
2093
2094 class TestConsumer(object):
2095 """
2096 A simple consumer for tests. It only works with non-streaming producers.
2097
2098 @ivar producer: an object providing
2099 L{twisted.internet.interfaces.IPullProducer}.
2100 """
2101
2102 implements(IConsumer)
2103 producer = None
2104
2105 def registerProducer(self, producer, streaming):
2106 """
2107 Simple register of producer, checks that no register has happened
2108 before.
2109 """
2110 assert self.producer is None
2111 self.buffer = []
2112 self.producer = producer
2113 self.producer.resumeProducing()
2114
2115
2116 def unregisterProducer(self):
2117 """
2118 Unregister the producer, it should be done after a register.
2119 """
2120 assert self.producer is not None
2121 self.producer = None
2122
2123
2124 def write(self, data):
2125 """
2126 Save the data received.
2127 """
2128 self.buffer.append(data)
2129 self.producer.resumeProducing()
2130
2131
2132
2133 class TestProducer(object):
2134 """
2135 A dumb producer.
2136 """
2137
2138 def __init__(self, toProduce, consumer):
2139 """
2140 @param toProduce: data to write
2141 @type toProduce: C{str}
2142 @param consumer: the consumer of data.
2143 @type consumer: C{IConsumer}
2144 """
2145 self.toProduce = toProduce
2146 self.consumer = consumer
2147
2148
2149 def start(self):
2150 """
2151 Send the data to consume.
2152 """
2153 self.consumer.write(self.toProduce)
2154
2155
2156
2157 class IReadWriteTestsMixin:
2158 """
2159 Generic tests for the C{IReadFile} and C{IWriteFile} interfaces.
2160 """
2161
2162 def getFileReader(self, content):
2163 """
2164 Return an object providing C{IReadFile}, ready to send data C{content}.
2165 """
2166 raise NotImplementedError()
2167
2168
2169 def getFileWriter(self):
2170 """
2171 Return an object providing C{IWriteFile}, ready to receive data.
2172 """
2173 raise NotImplementedError()
2174
2175
2176 def getFileContent(self):
2177 """
2178 Return the content of the file used.
2179 """
2180 raise NotImplementedError()
2181
2182
2183 def test_read(self):
2184 """
2185 Test L{ftp.IReadFile}: the implementation should have a send method
2186 returning a C{Deferred} which fires when all the data has been sent
2187 to the consumer, and the data should be correctly send to the consumer.
2188 """
2189 content = 'wobble\n'
2190 consumer = TestConsumer()
2191 def cbGet(reader):
2192 return reader.send(consumer).addCallback(cbSend)
2193 def cbSend(res):
2194 self.assertEquals("".join(consumer.buffer), content)
2195 return self.getFileReader(content).addCallback(cbGet)
2196
2197
2198 def test_write(self):
2199 """
2200 Test L{ftp.IWriteFile}: the implementation should have a receive method
2201 returning a C{Deferred} with fires with a consumer ready to receive
2202 data to be written.
2203 """
2204 content = 'elbbow\n'
2205 def cbGet(writer):
2206 return writer.receive().addCallback(cbReceive)
2207 def cbReceive(consumer):
2208 producer = TestProducer(content, consumer)
2209 consumer.registerProducer(None, True)
2210 producer.start()
2211 consumer.unregisterProducer()
2212 self.assertEquals(self.getFileContent(), content)
2213 return self.getFileWriter().addCallback(cbGet)
2214
2215
2216
2217 class FTPReadWriteTestCase(unittest.TestCase, IReadWriteTestsMixin):
2218 """
2219 Tests for C{ftp._FileReader} and C{ftp._FileWriter}, the objects returned
2220 by the shell in C{openForReading}/C{openForWriting}.
2221 """
2222
2223 def setUp(self):
2224 """
2225 Create a temporary file used later.
2226 """
2227 self.root = filepath.FilePath(self.mktemp())
2228 self.root.createDirectory()
2229 self.shell = ftp.FTPShell(self.root)
2230 self.filename = "file.txt"
2231
2232
2233 def getFileReader(self, content):
2234 """
2235 Return a C{ftp._FileReader} instance with a file opened for reading.
2236 """
2237 self.root.child(self.filename).setContent(content)
2238 return self.shell.openForReading((self.filename,))
2239
2240
2241 def getFileWriter(self):
2242 """
2243 Return a C{ftp._FileWriter} instance with a file opened for writing.
2244 """
2245 return self.shell.openForWriting((self.filename,))
2246
2247
2248 def getFileContent(self):
2249 """
2250 Return the content of the temporary file.
2251 """
2252 return self.root.child(self.filename).getContent()
2253
OLDNEW
« no previous file with comments | « third_party/twisted_8_1/twisted/test/test_formmethod.py ('k') | third_party/twisted_8_1/twisted/test/test_hook.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698