OLD | NEW |
| (Empty) |
1 # Copyright (c) 2001-2004 Twisted Matrix Laboratories. | |
2 # See LICENSE for details. | |
3 | |
4 # | |
5 # Author: Clark Evans (cce@clarkevans.com) | |
6 | |
7 """ | |
8 flow.pipe | |
9 | |
10 This contains various filter stages which have exactly one input stage. These | |
11 stages take a single input and modify its results, ie a rewrite stage. | |
12 """ | |
13 | |
14 from base import * | |
15 from wrap import wrap | |
16 from twisted.python.failure import Failure | |
17 | |
18 class Pipe(Stage): | |
19 """ abstract stage which takes a single input stage """ | |
20 def __init__(self, source, *trap): | |
21 Stage.__init__(self, *trap) | |
22 self._source = wrap(source) | |
23 | |
24 def _yield(self): | |
25 while not self.results \ | |
26 and not self.stop \ | |
27 and not self.failure: | |
28 source = self._source | |
29 instruction = source._yield() | |
30 if instruction: | |
31 return instruction | |
32 if source.failure: | |
33 self.failure = source.failure | |
34 return | |
35 results = source.results | |
36 stop = source.stop | |
37 if stop: | |
38 self.stop = True | |
39 source.results = [] | |
40 self.process(results, stop) | |
41 | |
42 def process(self, results): | |
43 """ process implemented by the pipe | |
44 | |
45 Take a set of possibly empty results and sets the member | |
46 variables: results, stop, or failure appropriately | |
47 """ | |
48 raise NotImplementedError | |
49 | |
50 class Filter(Pipe): | |
51 """ | |
52 flow equivalent to filter: Filter(function, source, ... ) | |
53 | |
54 Yield those elements from a source stage for which a function returns true. | |
55 If the function is None, the identity function is assumed, that is, all | |
56 items yielded that are false (zero or empty) are discarded. | |
57 | |
58 For example:: | |
59 | |
60 def odd(val): | |
61 if val % 2: | |
62 return True | |
63 | |
64 def range(): | |
65 yield 1 | |
66 yield 2 | |
67 yield 3 | |
68 yield 4 | |
69 | |
70 source = flow.Filter(odd,range) | |
71 printFlow(source) | |
72 """ | |
73 def __init__(self, func, source, *trap): | |
74 Pipe.__init__(self, source, *trap) | |
75 self._func = func | |
76 | |
77 def process(self, results, stop): | |
78 self.results.extend(filter(self._func,results)) | |
79 | |
80 class LineBreak(Pipe): | |
81 """ pipe stage which breaks its input into lines """ | |
82 def __init__(self, source, *trap, **kwargs): | |
83 Pipe.__init__(self, source, *trap) | |
84 self._delimiter = kwargs.get('delimiter','\r\n') | |
85 self._maxlen = int(kwargs.get('maxlength', 16384))+1 | |
86 self._trailer = int(kwargs.get('trailer',False)) | |
87 self._buffer = [] | |
88 self._currlen = 0 | |
89 | |
90 def process(self, results, stop): | |
91 for block in results: | |
92 lines = str(block).split(self._delimiter) | |
93 if len(lines) < 2: | |
94 tail = lines[0] | |
95 else: | |
96 tail = lines.pop() | |
97 if self._buffer: | |
98 self._buffer.append(lines.pop(0)) | |
99 self.results.append("".join(self._buffer)) | |
100 self._buffer = [] | |
101 self.results.extend(lines) | |
102 self._currlen = 0 | |
103 if tail: | |
104 self._currlen += len(tail) | |
105 self._buffer.append(tail) | |
106 if stop and self._buffer: | |
107 tail = "".join(self._buffer) | |
108 if self._trailer: | |
109 self.results.append(tail) | |
110 else: | |
111 raise RuntimeError, "trailing data remains: '%s'" % tail[:10] | |
112 | |
OLD | NEW |