| OLD | NEW |
| (Empty) |
| 1 # Copyright (c) 2001-2004 Twisted Matrix Laboratories. | |
| 2 # See LICENSE for details. | |
| 3 | |
| 4 # | |
| 5 # Author: Clark Evans (cce@clarkevans.com) | |
| 6 | |
| 7 """ | |
| 8 flow.pipe | |
| 9 | |
| 10 This contains various filter stages which have exactly one input stage. These | |
| 11 stages take a single input and modify its results, ie a rewrite stage. | |
| 12 """ | |
| 13 | |
| 14 from base import * | |
| 15 from wrap import wrap | |
| 16 from twisted.python.failure import Failure | |
| 17 | |
| 18 class Pipe(Stage): | |
| 19 """ abstract stage which takes a single input stage """ | |
| 20 def __init__(self, source, *trap): | |
| 21 Stage.__init__(self, *trap) | |
| 22 self._source = wrap(source) | |
| 23 | |
| 24 def _yield(self): | |
| 25 while not self.results \ | |
| 26 and not self.stop \ | |
| 27 and not self.failure: | |
| 28 source = self._source | |
| 29 instruction = source._yield() | |
| 30 if instruction: | |
| 31 return instruction | |
| 32 if source.failure: | |
| 33 self.failure = source.failure | |
| 34 return | |
| 35 results = source.results | |
| 36 stop = source.stop | |
| 37 if stop: | |
| 38 self.stop = True | |
| 39 source.results = [] | |
| 40 self.process(results, stop) | |
| 41 | |
| 42 def process(self, results): | |
| 43 """ process implemented by the pipe | |
| 44 | |
| 45 Take a set of possibly empty results and sets the member | |
| 46 variables: results, stop, or failure appropriately | |
| 47 """ | |
| 48 raise NotImplementedError | |
| 49 | |
| 50 class Filter(Pipe): | |
| 51 """ | |
| 52 flow equivalent to filter: Filter(function, source, ... ) | |
| 53 | |
| 54 Yield those elements from a source stage for which a function returns true. | |
| 55 If the function is None, the identity function is assumed, that is, all | |
| 56 items yielded that are false (zero or empty) are discarded. | |
| 57 | |
| 58 For example:: | |
| 59 | |
| 60 def odd(val): | |
| 61 if val % 2: | |
| 62 return True | |
| 63 | |
| 64 def range(): | |
| 65 yield 1 | |
| 66 yield 2 | |
| 67 yield 3 | |
| 68 yield 4 | |
| 69 | |
| 70 source = flow.Filter(odd,range) | |
| 71 printFlow(source) | |
| 72 """ | |
| 73 def __init__(self, func, source, *trap): | |
| 74 Pipe.__init__(self, source, *trap) | |
| 75 self._func = func | |
| 76 | |
| 77 def process(self, results, stop): | |
| 78 self.results.extend(filter(self._func,results)) | |
| 79 | |
| 80 class LineBreak(Pipe): | |
| 81 """ pipe stage which breaks its input into lines """ | |
| 82 def __init__(self, source, *trap, **kwargs): | |
| 83 Pipe.__init__(self, source, *trap) | |
| 84 self._delimiter = kwargs.get('delimiter','\r\n') | |
| 85 self._maxlen = int(kwargs.get('maxlength', 16384))+1 | |
| 86 self._trailer = int(kwargs.get('trailer',False)) | |
| 87 self._buffer = [] | |
| 88 self._currlen = 0 | |
| 89 | |
| 90 def process(self, results, stop): | |
| 91 for block in results: | |
| 92 lines = str(block).split(self._delimiter) | |
| 93 if len(lines) < 2: | |
| 94 tail = lines[0] | |
| 95 else: | |
| 96 tail = lines.pop() | |
| 97 if self._buffer: | |
| 98 self._buffer.append(lines.pop(0)) | |
| 99 self.results.append("".join(self._buffer)) | |
| 100 self._buffer = [] | |
| 101 self.results.extend(lines) | |
| 102 self._currlen = 0 | |
| 103 if tail: | |
| 104 self._currlen += len(tail) | |
| 105 self._buffer.append(tail) | |
| 106 if stop and self._buffer: | |
| 107 tail = "".join(self._buffer) | |
| 108 if self._trailer: | |
| 109 self.results.append(tail) | |
| 110 else: | |
| 111 raise RuntimeError, "trailing data remains: '%s'" % tail[:10] | |
| 112 | |
| OLD | NEW |