OLD | NEW |
| (Empty) |
1 # -*- test-case-name: twisted.test.test_stdio -*- | |
2 | |
3 """Standard input/out/err support. | |
4 | |
5 Future Plans:: | |
6 | |
7 support for stderr, perhaps | |
8 Rewrite to use the reactor instead of an ad-hoc mechanism for connecting | |
9 protocols to transport. | |
10 | |
11 Maintainer: U{James Y Knight <mailto:foom@fuhm.net>} | |
12 """ | |
13 | |
14 import warnings | |
15 from zope.interface import implements | |
16 | |
17 from twisted.internet import process, error, interfaces | |
18 from twisted.python import log, failure | |
19 | |
20 | |
21 class PipeAddress(object): | |
22 implements(interfaces.IAddress) | |
23 | |
24 | |
25 class StandardIO(object): | |
26 implements(interfaces.ITransport, interfaces.IProducer, interfaces.IConsumer
, interfaces.IHalfCloseableDescriptor) | |
27 _reader = None | |
28 _writer = None | |
29 disconnected = False | |
30 disconnecting = False | |
31 | |
32 def __init__(self, proto, stdin=0, stdout=1): | |
33 from twisted.internet import reactor | |
34 self.protocol = proto | |
35 | |
36 self._reader=process.ProcessReader(reactor, self, 'read', stdin) | |
37 self._reader.startReading() | |
38 self._writer=process.ProcessWriter(reactor, self, 'write', stdout) | |
39 self._writer.startReading() | |
40 self.protocol.makeConnection(self) | |
41 | |
42 # ITransport | |
43 def loseWriteConnection(self): | |
44 if self._writer is not None: | |
45 self._writer.loseConnection() | |
46 | |
47 def write(self, data): | |
48 if self._writer is not None: | |
49 self._writer.write(data) | |
50 | |
51 def writeSequence(self, data): | |
52 if self._writer is not None: | |
53 self._writer.writeSequence(data) | |
54 | |
55 def loseConnection(self): | |
56 self.disconnecting = True | |
57 | |
58 if self._writer is not None: | |
59 self._writer.loseConnection() | |
60 if self._reader is not None: | |
61 # Don't loseConnection, because we don't want to SIGPIPE it. | |
62 self._reader.stopReading() | |
63 | |
64 def getPeer(self): | |
65 return PipeAddress() | |
66 | |
67 def getHost(self): | |
68 return PipeAddress() | |
69 | |
70 | |
71 # Callbacks from process.ProcessReader/ProcessWriter | |
72 def childDataReceived(self, fd, data): | |
73 self.protocol.dataReceived(data) | |
74 | |
75 def childConnectionLost(self, fd, reason): | |
76 if self.disconnected: | |
77 return | |
78 | |
79 if reason.value.__class__ == error.ConnectionDone: | |
80 # Normal close | |
81 if fd == 'read': | |
82 self._readConnectionLost(reason) | |
83 else: | |
84 self._writeConnectionLost(reason) | |
85 else: | |
86 self.connectionLost(reason) | |
87 | |
88 def connectionLost(self, reason): | |
89 self.disconnected = True | |
90 | |
91 # Make sure to cleanup the other half | |
92 _reader = self._reader | |
93 _writer = self._writer | |
94 protocol = self.protocol | |
95 self._reader = self._writer = None | |
96 self.protocol = None | |
97 | |
98 if _writer is not None and not _writer.disconnected: | |
99 _writer.connectionLost(reason) | |
100 | |
101 if _reader is not None and not _reader.disconnected: | |
102 _reader.connectionLost(reason) | |
103 | |
104 try: | |
105 protocol.connectionLost(reason) | |
106 except: | |
107 log.err() | |
108 | |
109 def _writeConnectionLost(self, reason): | |
110 self._writer=None | |
111 if self.disconnecting: | |
112 self.connectionLost(reason) | |
113 return | |
114 | |
115 p = interfaces.IHalfCloseableProtocol(self.protocol, None) | |
116 if p: | |
117 try: | |
118 p.writeConnectionLost() | |
119 except: | |
120 log.err() | |
121 self.connectionLost(failure.Failure()) | |
122 | |
123 def _readConnectionLost(self, reason): | |
124 self._reader=None | |
125 p = interfaces.IHalfCloseableProtocol(self.protocol, None) | |
126 if p: | |
127 try: | |
128 p.readConnectionLost() | |
129 except: | |
130 log.err() | |
131 self.connectionLost(failure.Failure()) | |
132 else: | |
133 self.connectionLost(reason) | |
134 | |
135 # IConsumer | |
136 def registerProducer(self, producer, streaming): | |
137 if self._writer is None: | |
138 producer.stopProducing() | |
139 else: | |
140 self._writer.registerProducer(producer, streaming) | |
141 | |
142 def unregisterProducer(self): | |
143 if self._writer is not None: | |
144 self._writer.unregisterProducer() | |
145 | |
146 # IProducer | |
147 def stopProducing(self): | |
148 self.loseConnection() | |
149 | |
150 def pauseProducing(self): | |
151 if self._reader is not None: | |
152 self._reader.pauseProducing() | |
153 | |
154 def resumeProducing(self): | |
155 if self._reader is not None: | |
156 self._reader.resumeProducing() | |
157 | |
158 # Stupid compatibility: | |
159 def closeStdin(self): | |
160 """Compatibility only, don't use. Same as loseWriteConnection.""" | |
161 warnings.warn("This function is deprecated, use loseWriteConnection inst
ead.", | |
162 category=DeprecationWarning, stacklevel=2) | |
163 self.loseWriteConnection() | |
164 | |
165 def stopReading(self): | |
166 """Compatibility only, don't use. Call pauseProducing.""" | |
167 self.pauseProducing() | |
168 | |
169 def startReading(self): | |
170 """Compatibility only, don't use. Call resumeProducing.""" | |
171 self.resumeProducing() | |
OLD | NEW |