OLD | NEW |
| (Empty) |
1 # Copyright (c) 2001-2004 Twisted Matrix Laboratories. | |
2 # See LICENSE for details. | |
3 | |
4 # | |
5 # Author: Clark Evans (cce@clarkevans.com) | |
6 # | |
7 | |
8 """ | |
9 flow.protocol | |
10 | |
11 This allows one to use flow module to create protocols, a protocol is actually | |
12 a controller, but it is specialized enough to deserve its own module. | |
13 """ | |
14 | |
15 import types | |
16 from base import * | |
17 from wrap import wrap | |
18 from stage import Callback | |
19 from twisted.internet import protocol | |
20 from twisted.internet.error import ConnectionLost, ConnectionDone | |
21 | |
22 def makeProtocol(controller, baseClass = protocol.Protocol, | |
23 *callbacks, **kwargs): | |
24 """ | |
25 Construct a flow based protocol | |
26 | |
27 This takes a base protocol class, and a set of callbacks and creates a | |
28 connection flow based on the two. For example, the following would build a | |
29 simple 'echo' protocol:: | |
30 | |
31 from __future__ import generators | |
32 from twisted.internet import reactor, protocol | |
33 from twisted.flow import flow | |
34 PORT = 8392 | |
35 | |
36 def echoServer(conn): | |
37 yield conn | |
38 for data in conn: | |
39 conn.write(data) | |
40 yield conn | |
41 | |
42 def echoClient(conn): | |
43 conn.write("hello, world!") | |
44 yield conn | |
45 print "server said: ", conn.next() | |
46 reactor.callLater(0,reactor.stop) | |
47 | |
48 server = protocol.ServerFactory() | |
49 server.protocol = flow.makeProtocol(echoServer) | |
50 reactor.listenTCP(PORT,server) | |
51 client = protocol.ClientFactory() | |
52 client.protocol = flow.makeProtocol(echoClient) | |
53 reactor.connectTCP("localhost", PORT, client) | |
54 reactor.run() | |
55 | |
56 Of course, the best part about flow is that you can nest stages. Therefore | |
57 it is quite easy to make a lineBreaker generator which takes an input | |
58 connection and produces and output connection. Anyway, the code is almost | |
59 identical as far as the client/server is concerned:: | |
60 | |
61 # this is a filter generator, it consumes from the | |
62 # incoming connection, and yields results to | |
63 # the next stage, the echoServer below | |
64 def lineBreaker(conn, lineEnding = "\\n"): | |
65 lst = [] | |
66 yield conn | |
67 for chunk in conn: | |
68 pos = chunk.find(lineEnding) | |
69 if pos > -1: | |
70 lst.append(chunk[:pos]) | |
71 yield "".join(lst) | |
72 lst = [chunk[pos+1:]] | |
73 else: | |
74 lst.append(chunk) | |
75 yield conn | |
76 yield "".join(lst) | |
77 | |
78 # note that this class is only slightly modified, | |
79 # simply comment out the line breaker line to see | |
80 # how the server behaves without the filter... | |
81 def echoServer(conn): | |
82 lines = flow.wrap(lineBreaker(conn)) | |
83 yield lines | |
84 for data in lines: | |
85 conn.write(data) | |
86 yield lines | |
87 | |
88 # and the only thing that is changed is that we | |
89 # are sending data in strange chunks, and even | |
90 # putting the last chunk on hold for 2 seconds. | |
91 def echoClient(conn): | |
92 conn.write("Good Morning!\\nPlease ") | |
93 yield conn | |
94 print "server said: ", conn.next() | |
95 conn.write("do not disregard ") | |
96 reactor.callLater(2, conn.write, "this.\\n") | |
97 yield conn | |
98 print "server said: ", conn.next() | |
99 reactor.callLater(0,reactor.stop) | |
100 """ | |
101 if not callbacks: | |
102 callbacks = ('dataReceived',) | |
103 trap = kwargs.get("trap", tuple()) | |
104 class _Protocol(Controller, Callback, baseClass): | |
105 def __init__(self): | |
106 Callback.__init__(self, *trap) | |
107 setattr(self, callbacks[0], self) | |
108 # TODO: support more than one callback via Concurrent | |
109 def _execute(self, dummy = None): | |
110 cmd = self._controller | |
111 self.write = self.transport.write | |
112 while True: | |
113 instruction = cmd._yield() | |
114 if instruction: | |
115 if isinstance(instruction, CallLater): | |
116 instruction.callLater(self._execute) | |
117 return | |
118 raise Unsupported(instruction) | |
119 if cmd.stop: | |
120 self.transport.loseConnection() | |
121 return | |
122 if cmd.failure: | |
123 self.transport.loseConnection() | |
124 cmd.failure.trap() | |
125 return | |
126 if cmd.results: | |
127 self.transport.writeSequence(cmd.results) | |
128 cmd.results = [] | |
129 def connectionMade(self): | |
130 if types.ClassType == type(self.controller): | |
131 self._controller = wrap(self.controller(self)) | |
132 else: | |
133 self._controller = wrap(self.controller()) | |
134 self._execute() | |
135 def connectionLost(self, reason=protocol.connectionDone): | |
136 if isinstance(reason.value, ConnectionDone) or \ | |
137 (isinstance(reason.value, ConnectionLost) and \ | |
138 self.finishOnConnectionLost): | |
139 self.finish() | |
140 else: | |
141 self.errback(reason) | |
142 self._execute() | |
143 _Protocol.finishOnConnectionLost = kwargs.get("finishOnConnectionLost",True) | |
144 _Protocol.controller = controller | |
145 return _Protocol | |
146 | |
147 def _NotImplController(protocol): | |
148 raise NotImplementedError | |
149 Protocol = makeProtocol(_NotImplController) | |
150 Protocol.__doc__ = """ A concrete flow.Protocol for inheritance """ | |
151 | |
OLD | NEW |