OLD | NEW |
| (Empty) |
1 import sys, os | |
2 | |
3 from twisted.trial import unittest | |
4 from twisted.internet import reactor, interfaces | |
5 from twisted.python import util | |
6 from twisted.web import static, twcgi, server, resource | |
7 from twisted.web import client | |
8 | |
9 DUMMY_CGI = '''\ | |
10 print "Header: OK" | |
11 print | |
12 print "cgi output" | |
13 ''' | |
14 | |
15 READINPUT_CGI = '''\ | |
16 # this is an example of a correctly-written CGI script which reads a body | |
17 # from stdin, which only reads env['CONTENT_LENGTH'] bytes. | |
18 | |
19 import os, sys | |
20 | |
21 body_length = int(os.environ.get('CONTENT_LENGTH',0)) | |
22 indata = sys.stdin.read(body_length) | |
23 print "Header: OK" | |
24 print | |
25 print "readinput ok" | |
26 ''' | |
27 | |
28 READALLINPUT_CGI = '''\ | |
29 # this is an example of the typical (incorrect) CGI script which expects | |
30 # the server to close stdin when the body of the request is complete. | |
31 # A correct CGI should only read env['CONTENT_LENGTH'] bytes. | |
32 | |
33 import sys | |
34 | |
35 indata = sys.stdin.read() | |
36 print "Header: OK" | |
37 print | |
38 print "readallinput ok" | |
39 ''' | |
40 | |
41 class PythonScript(twcgi.FilteredScript): | |
42 filter = sys.executable | |
43 filters = sys.executable, # web2's version | |
44 | |
45 class CGI(unittest.TestCase): | |
46 def startServer(self, cgi): | |
47 root = resource.Resource() | |
48 cgipath = util.sibpath(__file__, cgi) | |
49 root.putChild("cgi", PythonScript(cgipath)) | |
50 site = server.Site(root) | |
51 self.p = reactor.listenTCP(0, site) | |
52 return self.p.getHost().port | |
53 | |
54 def tearDown(self): | |
55 if self.p: | |
56 return self.p.stopListening() | |
57 | |
58 | |
59 def testCGI(self): | |
60 cgiFilename = os.path.abspath(self.mktemp()) | |
61 cgiFile = file(cgiFilename, 'wt') | |
62 cgiFile.write(DUMMY_CGI) | |
63 cgiFile.close() | |
64 | |
65 portnum = self.startServer(cgiFilename) | |
66 d = client.getPage("http://localhost:%d/cgi" % portnum) | |
67 d.addCallback(self._testCGI_1) | |
68 return d | |
69 def _testCGI_1(self, res): | |
70 self.failUnlessEqual(res, "cgi output" + os.linesep) | |
71 | |
72 | |
73 def testReadEmptyInput(self): | |
74 cgiFilename = os.path.abspath(self.mktemp()) | |
75 cgiFile = file(cgiFilename, 'wt') | |
76 cgiFile.write(READINPUT_CGI) | |
77 cgiFile.close() | |
78 | |
79 portnum = self.startServer(cgiFilename) | |
80 d = client.getPage("http://localhost:%d/cgi" % portnum) | |
81 d.addCallback(self._testReadEmptyInput_1) | |
82 return d | |
83 testReadEmptyInput.timeout = 5 | |
84 def _testReadEmptyInput_1(self, res): | |
85 self.failUnlessEqual(res, "readinput ok%s" % os.linesep) | |
86 | |
87 def testReadInput(self): | |
88 cgiFilename = os.path.abspath(self.mktemp()) | |
89 cgiFile = file(cgiFilename, 'wt') | |
90 cgiFile.write(READINPUT_CGI) | |
91 cgiFile.close() | |
92 | |
93 portnum = self.startServer(cgiFilename) | |
94 d = client.getPage("http://localhost:%d/cgi" % portnum, | |
95 method="POST", | |
96 postdata="Here is your stdin") | |
97 d.addCallback(self._testReadInput_1) | |
98 return d | |
99 testReadInput.timeout = 5 | |
100 def _testReadInput_1(self, res): | |
101 self.failUnlessEqual(res, "readinput ok%s" % os.linesep) | |
102 | |
103 | |
104 def testReadAllInput(self): | |
105 cgiFilename = os.path.abspath(self.mktemp()) | |
106 cgiFile = file(cgiFilename, 'wt') | |
107 cgiFile.write(READALLINPUT_CGI) | |
108 cgiFile.close() | |
109 | |
110 portnum = self.startServer(cgiFilename) | |
111 d = client.getPage("http://localhost:%d/cgi" % portnum, | |
112 method="POST", | |
113 postdata="Here is your stdin") | |
114 d.addCallback(self._testReadAllInput_1) | |
115 return d | |
116 testReadAllInput.timeout = 5 | |
117 def _testReadAllInput_1(self, res): | |
118 self.failUnlessEqual(res, "readallinput ok%s" % os.linesep) | |
119 | |
120 if not interfaces.IReactorProcess.providedBy(reactor): | |
121 CGI.skip = "CGI tests require a functional reactor.spawnProcess()" | |
OLD | NEW |