| OLD | NEW | 
|---|
|  | (Empty) | 
| 1 import sys, os |  | 
| 2 |  | 
| 3 from twisted.trial import unittest |  | 
| 4 from twisted.internet import reactor, interfaces |  | 
| 5 from twisted.python import util |  | 
| 6 from twisted.web import static, twcgi, server, resource |  | 
| 7 from twisted.web import client |  | 
| 8 |  | 
| 9 DUMMY_CGI = '''\ |  | 
| 10 print "Header: OK" |  | 
| 11 print |  | 
| 12 print "cgi output" |  | 
| 13 ''' |  | 
| 14 |  | 
| 15 READINPUT_CGI = '''\ |  | 
| 16 # this is an example of a correctly-written CGI script which reads a body |  | 
| 17 # from stdin, which only reads env['CONTENT_LENGTH'] bytes. |  | 
| 18 |  | 
| 19 import os, sys |  | 
| 20 |  | 
| 21 body_length = int(os.environ.get('CONTENT_LENGTH',0)) |  | 
| 22 indata = sys.stdin.read(body_length) |  | 
| 23 print "Header: OK" |  | 
| 24 print |  | 
| 25 print "readinput ok" |  | 
| 26 ''' |  | 
| 27 |  | 
| 28 READALLINPUT_CGI = '''\ |  | 
| 29 # this is an example of the typical (incorrect) CGI script which expects |  | 
| 30 # the server to close stdin when the body of the request is complete. |  | 
| 31 # A correct CGI should only read env['CONTENT_LENGTH'] bytes. |  | 
| 32 |  | 
| 33 import sys |  | 
| 34 |  | 
| 35 indata = sys.stdin.read() |  | 
| 36 print "Header: OK" |  | 
| 37 print |  | 
| 38 print "readallinput ok" |  | 
| 39 ''' |  | 
| 40 |  | 
| 41 class PythonScript(twcgi.FilteredScript): |  | 
| 42     filter = sys.executable |  | 
| 43     filters = sys.executable,  # web2's version |  | 
| 44 |  | 
| 45 class CGI(unittest.TestCase): |  | 
| 46     def startServer(self, cgi): |  | 
| 47         root = resource.Resource() |  | 
| 48         cgipath = util.sibpath(__file__, cgi) |  | 
| 49         root.putChild("cgi", PythonScript(cgipath)) |  | 
| 50         site = server.Site(root) |  | 
| 51         self.p = reactor.listenTCP(0, site) |  | 
| 52         return self.p.getHost().port |  | 
| 53 |  | 
| 54     def tearDown(self): |  | 
| 55         if self.p: |  | 
| 56             return self.p.stopListening() |  | 
| 57 |  | 
| 58 |  | 
| 59     def testCGI(self): |  | 
| 60         cgiFilename = os.path.abspath(self.mktemp()) |  | 
| 61         cgiFile = file(cgiFilename, 'wt') |  | 
| 62         cgiFile.write(DUMMY_CGI) |  | 
| 63         cgiFile.close() |  | 
| 64 |  | 
| 65         portnum = self.startServer(cgiFilename) |  | 
| 66         d = client.getPage("http://localhost:%d/cgi" % portnum) |  | 
| 67         d.addCallback(self._testCGI_1) |  | 
| 68         return d |  | 
| 69     def _testCGI_1(self, res): |  | 
| 70         self.failUnlessEqual(res, "cgi output" + os.linesep) |  | 
| 71 |  | 
| 72 |  | 
| 73     def testReadEmptyInput(self): |  | 
| 74         cgiFilename = os.path.abspath(self.mktemp()) |  | 
| 75         cgiFile = file(cgiFilename, 'wt') |  | 
| 76         cgiFile.write(READINPUT_CGI) |  | 
| 77         cgiFile.close() |  | 
| 78 |  | 
| 79         portnum = self.startServer(cgiFilename) |  | 
| 80         d = client.getPage("http://localhost:%d/cgi" % portnum) |  | 
| 81         d.addCallback(self._testReadEmptyInput_1) |  | 
| 82         return d |  | 
| 83     testReadEmptyInput.timeout = 5 |  | 
| 84     def _testReadEmptyInput_1(self, res): |  | 
| 85         self.failUnlessEqual(res, "readinput ok%s" % os.linesep) |  | 
| 86 |  | 
| 87     def testReadInput(self): |  | 
| 88         cgiFilename = os.path.abspath(self.mktemp()) |  | 
| 89         cgiFile = file(cgiFilename, 'wt') |  | 
| 90         cgiFile.write(READINPUT_CGI) |  | 
| 91         cgiFile.close() |  | 
| 92 |  | 
| 93         portnum = self.startServer(cgiFilename) |  | 
| 94         d = client.getPage("http://localhost:%d/cgi" % portnum, |  | 
| 95                            method="POST", |  | 
| 96                            postdata="Here is your stdin") |  | 
| 97         d.addCallback(self._testReadInput_1) |  | 
| 98         return d |  | 
| 99     testReadInput.timeout = 5 |  | 
| 100     def _testReadInput_1(self, res): |  | 
| 101         self.failUnlessEqual(res, "readinput ok%s" % os.linesep) |  | 
| 102 |  | 
| 103 |  | 
| 104     def testReadAllInput(self): |  | 
| 105         cgiFilename = os.path.abspath(self.mktemp()) |  | 
| 106         cgiFile = file(cgiFilename, 'wt') |  | 
| 107         cgiFile.write(READALLINPUT_CGI) |  | 
| 108         cgiFile.close() |  | 
| 109 |  | 
| 110         portnum = self.startServer(cgiFilename) |  | 
| 111         d = client.getPage("http://localhost:%d/cgi" % portnum, |  | 
| 112                            method="POST", |  | 
| 113                            postdata="Here is your stdin") |  | 
| 114         d.addCallback(self._testReadAllInput_1) |  | 
| 115         return d |  | 
| 116     testReadAllInput.timeout = 5 |  | 
| 117     def _testReadAllInput_1(self, res): |  | 
| 118         self.failUnlessEqual(res, "readallinput ok%s" % os.linesep) |  | 
| 119 |  | 
| 120 if not interfaces.IReactorProcess.providedBy(reactor): |  | 
| 121     CGI.skip = "CGI tests require a functional reactor.spawnProcess()" |  | 
| OLD | NEW | 
|---|