OLD | NEW |
| (Empty) |
1 #!/usr/bin/python | |
2 # -*- test-case-name: twisted.mail.test.test_pop3client -*- | |
3 | |
4 from twisted.internet.protocol import Factory | |
5 from twisted.protocols import basic | |
6 from twisted.internet import reactor | |
7 import sys, time | |
8 | |
9 USER = "test" | |
10 PASS = "twisted" | |
11 | |
12 PORT = 1100 | |
13 | |
14 SSL_SUPPORT = True | |
15 UIDL_SUPPORT = True | |
16 INVALID_SERVER_RESPONSE = False | |
17 INVALID_CAPABILITY_RESPONSE = False | |
18 INVALID_LOGIN_RESPONSE = False | |
19 DENY_CONNECTION = False | |
20 DROP_CONNECTION = False | |
21 BAD_TLS_RESPONSE = False | |
22 TIMEOUT_RESPONSE = False | |
23 TIMEOUT_DEFERRED = False | |
24 SLOW_GREETING = False | |
25 | |
26 """Commands""" | |
27 CONNECTION_MADE = "+OK POP3 localhost v2003.83 server ready" | |
28 | |
29 CAPABILITIES = [ | |
30 "TOP", | |
31 "LOGIN-DELAY 180", | |
32 "USER", | |
33 "SASL LOGIN" | |
34 ] | |
35 | |
36 CAPABILITIES_SSL = "STLS" | |
37 CAPABILITIES_UIDL = "UIDL" | |
38 | |
39 | |
40 INVALID_RESPONSE = "-ERR Unknown request" | |
41 VALID_RESPONSE = "+OK Command Completed" | |
42 AUTH_DECLINED = "-ERR LOGIN failed" | |
43 AUTH_ACCEPTED = "+OK Mailbox open, 0 messages" | |
44 TLS_ERROR = "-ERR server side error start TLS handshake" | |
45 LOGOUT_COMPLETE = "+OK quit completed" | |
46 NOT_LOGGED_IN = "-ERR Unknown AUHORIZATION state command" | |
47 STAT = "+OK 0 0" | |
48 UIDL = "+OK Unique-ID listing follows\r\n." | |
49 LIST = "+OK Mailbox scan listing follows\r\n." | |
50 CAP_START = "+OK Capability list follows:" | |
51 | |
52 | |
53 class POP3TestServer(basic.LineReceiver): | |
54 def __init__(self, contextFactory = None): | |
55 self.loggedIn = False | |
56 self.caps = None | |
57 self.tmpUser = None | |
58 self.ctx = contextFactory | |
59 | |
60 def sendSTATResp(self, req): | |
61 self.sendLine(STAT) | |
62 | |
63 def sendUIDLResp(self, req): | |
64 self.sendLine(UIDL) | |
65 | |
66 def sendLISTResp(self, req): | |
67 self.sendLine(LIST) | |
68 | |
69 def sendCapabilities(self): | |
70 if self.caps is None: | |
71 self.caps = [CAP_START] | |
72 | |
73 if UIDL_SUPPORT: | |
74 self.caps.append(CAPABILITIES_UIDL) | |
75 | |
76 if SSL_SUPPORT: | |
77 self.caps.append(CAPABILITIES_SSL) | |
78 | |
79 for cap in CAPABILITIES: | |
80 self.caps.append(cap) | |
81 resp = '\r\n'.join(self.caps) | |
82 resp += "\r\n." | |
83 | |
84 self.sendLine(resp) | |
85 | |
86 | |
87 def connectionMade(self): | |
88 if DENY_CONNECTION: | |
89 self.disconnect() | |
90 return | |
91 | |
92 if SLOW_GREETING: | |
93 reactor.callLater(20, self.sendGreeting) | |
94 | |
95 else: | |
96 self.sendGreeting() | |
97 | |
98 def sendGreeting(self): | |
99 self.sendLine(CONNECTION_MADE) | |
100 | |
101 def lineReceived(self, line): | |
102 """Error Conditions""" | |
103 | |
104 uline = line.upper() | |
105 find = lambda s: uline.find(s) != -1 | |
106 | |
107 if TIMEOUT_RESPONSE: | |
108 # Do not respond to clients request | |
109 return | |
110 | |
111 if DROP_CONNECTION: | |
112 self.disconnect() | |
113 return | |
114 | |
115 elif find("CAPA"): | |
116 if INVALID_CAPABILITY_RESPONSE: | |
117 self.sendLine(INVALID_RESPONSE) | |
118 else: | |
119 self.sendCapabilities() | |
120 | |
121 elif find("STLS") and SSL_SUPPORT: | |
122 self.startTLS() | |
123 | |
124 elif find("USER"): | |
125 if INVALID_LOGIN_RESPONSE: | |
126 self.sendLine(INVALID_RESPONSE) | |
127 return | |
128 | |
129 resp = None | |
130 try: | |
131 self.tmpUser = line.split(" ")[1] | |
132 resp = VALID_RESPONSE | |
133 except: | |
134 resp = AUTH_DECLINED | |
135 | |
136 self.sendLine(resp) | |
137 | |
138 elif find("PASS"): | |
139 resp = None | |
140 try: | |
141 pwd = line.split(" ")[1] | |
142 | |
143 if self.tmpUser is None or pwd is None: | |
144 resp = AUTH_DECLINED | |
145 elif self.tmpUser == USER and pwd == PASS: | |
146 resp = AUTH_ACCEPTED | |
147 self.loggedIn = True | |
148 else: | |
149 resp = AUTH_DECLINED | |
150 except: | |
151 resp = AUTH_DECLINED | |
152 | |
153 self.sendLine(resp) | |
154 | |
155 elif find("QUIT"): | |
156 self.loggedIn = False | |
157 self.sendLine(LOGOUT_COMPLETE) | |
158 self.disconnect() | |
159 | |
160 elif INVALID_SERVER_RESPONSE: | |
161 self.sendLine(INVALID_RESPONSE) | |
162 | |
163 elif not self.loggedIn: | |
164 self.sendLine(NOT_LOGGED_IN) | |
165 | |
166 elif find("NOOP"): | |
167 self.sendLine(VALID_RESPONSE) | |
168 | |
169 elif find("STAT"): | |
170 if TIMEOUT_DEFERRED: | |
171 return | |
172 self.sendLine(STAT) | |
173 | |
174 elif find("LIST"): | |
175 if TIMEOUT_DEFERRED: | |
176 return | |
177 self.sendLine(LIST) | |
178 | |
179 elif find("UIDL"): | |
180 if TIMEOUT_DEFERRED: | |
181 return | |
182 elif not UIDL_SUPPORT: | |
183 self.sendLine(INVALID_RESPONSE) | |
184 return | |
185 | |
186 self.sendLine(UIDL) | |
187 | |
188 def startTLS(self): | |
189 if self.ctx is None: | |
190 self.getContext() | |
191 | |
192 if SSL_SUPPORT and self.ctx is not None: | |
193 self.sendLine('+OK Begin TLS negotiation now') | |
194 self.transport.startTLS(self.ctx) | |
195 else: | |
196 self.sendLine('-ERR TLS not available') | |
197 | |
198 def disconnect(self): | |
199 self.transport.loseConnection() | |
200 | |
201 def getContext(self): | |
202 try: | |
203 from twisted.internet import ssl | |
204 except ImportError: | |
205 self.ctx = None | |
206 else: | |
207 self.ctx = ssl.ClientContextFactory() | |
208 self.ctx.method = ssl.SSL.TLSv1_METHOD | |
209 | |
210 | |
211 usage = """popServer.py [arg] (default is Standard POP Server with no messages) | |
212 no_ssl - Start with no SSL support | |
213 no_uidl - Start with no UIDL support | |
214 bad_resp - Send a non-RFC compliant response to the Client | |
215 bad_cap_resp - send a non-RFC compliant response when the Client sends a 'CAPABI
LITY' request | |
216 bad_login_resp - send a non-RFC compliant response when the Client sends a 'LOGI
N' request | |
217 deny - Deny the connection | |
218 drop - Drop the connection after sending the greeting | |
219 bad_tls - Send a bad response to a STARTTLS | |
220 timeout - Do not return a response to a Client request | |
221 to_deferred - Do not return a response on a 'Select' request. This | |
222 will test Deferred callback handling | |
223 slow - Wait 20 seconds after the connection is made to return a Server Greeting | |
224 """ | |
225 | |
226 def printMessage(msg): | |
227 print "Server Starting in %s mode" % msg | |
228 | |
229 def processArg(arg): | |
230 | |
231 if arg.lower() == 'no_ssl': | |
232 global SSL_SUPPORT | |
233 SSL_SUPPORT = False | |
234 printMessage("NON-SSL") | |
235 | |
236 elif arg.lower() == 'no_uidl': | |
237 global UIDL_SUPPORT | |
238 UIDL_SUPPORT = False | |
239 printMessage("NON-UIDL") | |
240 | |
241 elif arg.lower() == 'bad_resp': | |
242 global INVALID_SERVER_RESPONSE | |
243 INVALID_SERVER_RESPONSE = True | |
244 printMessage("Invalid Server Response") | |
245 | |
246 elif arg.lower() == 'bad_cap_resp': | |
247 global INVALID_CAPABILITY_RESPONSE | |
248 INVALID_CAPABILITY_RESPONSE = True | |
249 printMessage("Invalid Capability Response") | |
250 | |
251 elif arg.lower() == 'bad_login_resp': | |
252 global INVALID_LOGIN_RESPONSE | |
253 INVALID_LOGIN_RESPONSE = True | |
254 printMessage("Invalid Capability Response") | |
255 | |
256 elif arg.lower() == 'deny': | |
257 global DENY_CONNECTION | |
258 DENY_CONNECTION = True | |
259 printMessage("Deny Connection") | |
260 | |
261 elif arg.lower() == 'drop': | |
262 global DROP_CONNECTION | |
263 DROP_CONNECTION = True | |
264 printMessage("Drop Connection") | |
265 | |
266 | |
267 elif arg.lower() == 'bad_tls': | |
268 global BAD_TLS_RESPONSE | |
269 BAD_TLS_RESPONSE = True | |
270 printMessage("Bad TLS Response") | |
271 | |
272 elif arg.lower() == 'timeout': | |
273 global TIMEOUT_RESPONSE | |
274 TIMEOUT_RESPONSE = True | |
275 printMessage("Timeout Response") | |
276 | |
277 elif arg.lower() == 'to_deferred': | |
278 global TIMEOUT_DEFERRED | |
279 TIMEOUT_DEFERRED = True | |
280 printMessage("Timeout Deferred Response") | |
281 | |
282 elif arg.lower() == 'slow': | |
283 global SLOW_GREETING | |
284 SLOW_GREETING = True | |
285 printMessage("Slow Greeting") | |
286 | |
287 elif arg.lower() == '--help': | |
288 print usage | |
289 sys.exit() | |
290 | |
291 else: | |
292 print usage | |
293 sys.exit() | |
294 | |
295 def main(): | |
296 | |
297 if len(sys.argv) < 2: | |
298 printMessage("POP3 with no messages") | |
299 else: | |
300 args = sys.argv[1:] | |
301 | |
302 for arg in args: | |
303 processArg(arg) | |
304 | |
305 f = Factory() | |
306 f.protocol = POP3TestServer | |
307 reactor.listenTCP(PORT, f) | |
308 reactor.run() | |
309 | |
310 if __name__ == '__main__': | |
311 main() | |
OLD | NEW |