| OLD | NEW |
| (Empty) |
| 1 import sys, os | |
| 2 | |
| 3 from twisted.trial import unittest | |
| 4 from twisted.internet import reactor, interfaces | |
| 5 from twisted.python import util | |
| 6 from twisted.web import static, twcgi, server, resource | |
| 7 from twisted.web import client | |
| 8 | |
| 9 DUMMY_CGI = '''\ | |
| 10 print "Header: OK" | |
| 11 print | |
| 12 print "cgi output" | |
| 13 ''' | |
| 14 | |
| 15 READINPUT_CGI = '''\ | |
| 16 # this is an example of a correctly-written CGI script which reads a body | |
| 17 # from stdin, which only reads env['CONTENT_LENGTH'] bytes. | |
| 18 | |
| 19 import os, sys | |
| 20 | |
| 21 body_length = int(os.environ.get('CONTENT_LENGTH',0)) | |
| 22 indata = sys.stdin.read(body_length) | |
| 23 print "Header: OK" | |
| 24 print | |
| 25 print "readinput ok" | |
| 26 ''' | |
| 27 | |
| 28 READALLINPUT_CGI = '''\ | |
| 29 # this is an example of the typical (incorrect) CGI script which expects | |
| 30 # the server to close stdin when the body of the request is complete. | |
| 31 # A correct CGI should only read env['CONTENT_LENGTH'] bytes. | |
| 32 | |
| 33 import sys | |
| 34 | |
| 35 indata = sys.stdin.read() | |
| 36 print "Header: OK" | |
| 37 print | |
| 38 print "readallinput ok" | |
| 39 ''' | |
| 40 | |
| 41 class PythonScript(twcgi.FilteredScript): | |
| 42 filter = sys.executable | |
| 43 filters = sys.executable, # web2's version | |
| 44 | |
| 45 class CGI(unittest.TestCase): | |
| 46 def startServer(self, cgi): | |
| 47 root = resource.Resource() | |
| 48 cgipath = util.sibpath(__file__, cgi) | |
| 49 root.putChild("cgi", PythonScript(cgipath)) | |
| 50 site = server.Site(root) | |
| 51 self.p = reactor.listenTCP(0, site) | |
| 52 return self.p.getHost().port | |
| 53 | |
| 54 def tearDown(self): | |
| 55 if self.p: | |
| 56 return self.p.stopListening() | |
| 57 | |
| 58 | |
| 59 def testCGI(self): | |
| 60 cgiFilename = os.path.abspath(self.mktemp()) | |
| 61 cgiFile = file(cgiFilename, 'wt') | |
| 62 cgiFile.write(DUMMY_CGI) | |
| 63 cgiFile.close() | |
| 64 | |
| 65 portnum = self.startServer(cgiFilename) | |
| 66 d = client.getPage("http://localhost:%d/cgi" % portnum) | |
| 67 d.addCallback(self._testCGI_1) | |
| 68 return d | |
| 69 def _testCGI_1(self, res): | |
| 70 self.failUnlessEqual(res, "cgi output" + os.linesep) | |
| 71 | |
| 72 | |
| 73 def testReadEmptyInput(self): | |
| 74 cgiFilename = os.path.abspath(self.mktemp()) | |
| 75 cgiFile = file(cgiFilename, 'wt') | |
| 76 cgiFile.write(READINPUT_CGI) | |
| 77 cgiFile.close() | |
| 78 | |
| 79 portnum = self.startServer(cgiFilename) | |
| 80 d = client.getPage("http://localhost:%d/cgi" % portnum) | |
| 81 d.addCallback(self._testReadEmptyInput_1) | |
| 82 return d | |
| 83 testReadEmptyInput.timeout = 5 | |
| 84 def _testReadEmptyInput_1(self, res): | |
| 85 self.failUnlessEqual(res, "readinput ok%s" % os.linesep) | |
| 86 | |
| 87 def testReadInput(self): | |
| 88 cgiFilename = os.path.abspath(self.mktemp()) | |
| 89 cgiFile = file(cgiFilename, 'wt') | |
| 90 cgiFile.write(READINPUT_CGI) | |
| 91 cgiFile.close() | |
| 92 | |
| 93 portnum = self.startServer(cgiFilename) | |
| 94 d = client.getPage("http://localhost:%d/cgi" % portnum, | |
| 95 method="POST", | |
| 96 postdata="Here is your stdin") | |
| 97 d.addCallback(self._testReadInput_1) | |
| 98 return d | |
| 99 testReadInput.timeout = 5 | |
| 100 def _testReadInput_1(self, res): | |
| 101 self.failUnlessEqual(res, "readinput ok%s" % os.linesep) | |
| 102 | |
| 103 | |
| 104 def testReadAllInput(self): | |
| 105 cgiFilename = os.path.abspath(self.mktemp()) | |
| 106 cgiFile = file(cgiFilename, 'wt') | |
| 107 cgiFile.write(READALLINPUT_CGI) | |
| 108 cgiFile.close() | |
| 109 | |
| 110 portnum = self.startServer(cgiFilename) | |
| 111 d = client.getPage("http://localhost:%d/cgi" % portnum, | |
| 112 method="POST", | |
| 113 postdata="Here is your stdin") | |
| 114 d.addCallback(self._testReadAllInput_1) | |
| 115 return d | |
| 116 testReadAllInput.timeout = 5 | |
| 117 def _testReadAllInput_1(self, res): | |
| 118 self.failUnlessEqual(res, "readallinput ok%s" % os.linesep) | |
| 119 | |
| 120 if not interfaces.IReactorProcess.providedBy(reactor): | |
| 121 CGI.skip = "CGI tests require a functional reactor.spawnProcess()" | |
| OLD | NEW |