Index: third_party/twisted_8_1/twisted/flow/stage.py |
diff --git a/third_party/twisted_8_1/twisted/flow/stage.py b/third_party/twisted_8_1/twisted/flow/stage.py |
deleted file mode 100644 |
index 86cf79f3479ac17b54cbe8d47377239c869575d8..0000000000000000000000000000000000000000 |
--- a/third_party/twisted_8_1/twisted/flow/stage.py |
+++ /dev/null |
@@ -1,239 +0,0 @@ |
-# Copyright (c) 2001-2004 Twisted Matrix Laboratories. |
-# See LICENSE for details. |
- |
-# |
-# Author: Clark Evans (cce@clarkevans.com) |
-# |
- |
-""" |
-flow.stage |
- |
-Various stages for manipulating data flows, in particular, those stages which |
-take more than one input stages or alternative input, such as a callback. |
-""" |
- |
-from base import * |
-from wrap import wrap |
-from twisted.python.failure import Failure |
- |
-class Map(Stage): |
- """ |
- flow equivalent to map: Map(function, stage, ... ) |
- |
- Apply a function to every item yielded and yield the results. If |
- additional stages are passed, the function must take that many arguments |
- and is applied to the items of all lists in parallel. If a list is shorter |
- than another, it is assumed to be extended with None items. If the |
- function is None, the identity function is assumed; if there are multiple |
- list arguments, Map stage returns a sequence consisting of tuples |
- containing the corresponding items from all lists. |
- |
- For example:: |
- |
- def fn(val): |
- return val + 10 |
- |
- source = flow.Map(fn,range(4)) |
- printFlow(source) |
- """ |
- def __init__(self, func, stage, *stages): |
- Stage.__init__(self) |
- self.func = func |
- self._stage = [wrap(stage)] |
- for stage in stages: |
- self._stage.append(wrap(stage)) |
- self._index = 0 |
- |
- def _yield(self): |
- if self.results or self.stop or self.failure: |
- return |
- if not self._index: |
- self._curr = [] |
- self._done = True |
- while self._index < len(self._stage): |
- idx = self._index |
- curr = self._stage[idx] |
- instruction = curr._yield() |
- if instruction: |
- return instruction |
- if curr.results: |
- self._curr.append(curr.results.pop(0)) |
- self._index += 1 |
- self._done = False |
- continue |
- if curr.stop: |
- self._curr.append(None) |
- self._index += 1 |
- continue |
- if curr.failure: |
- self.failure = curr.failure |
- return |
- raise AssertionError("flow.Map ; no results, stop or failure?") |
- if self._done: |
- self.stop = 1 |
- return |
- curr = tuple(self._curr) |
- if self.func: |
- try: |
- curr = self.func(*curr) |
- except Failure, fail: |
- self.failure = fail |
- return |
- except: |
- self.failure = Failure() |
- return |
- self.results.append(curr) |
- self._index = 0 |
- |
-class Zip(Map): |
- """ |
- Zips two or more stages into a stream of N tuples |
- |
- For example:: |
- |
- source = flow.Zip([1,flow.Cooperate(),2,3],["one","two"]) |
- printFlow(source) |
- |
- """ |
- def __init__(self, *stages): |
- Map.__init__(self, None, stages[0], *stages[1:]) |
- |
-class Concurrent(Stage): |
- """ |
- Executes stages concurrently |
- |
- This stage allows two or more stages (branches) to be executed at the same |
- time. It returns each stage as it becomes available. This can be used if |
- you have N callbacks, and you want to yield and wait for the first |
- available one that produces results. Once a stage is retuned, its next() |
- method should be used to extract the value for the stage. |
- """ |
- |
- class Instruction(CallLater): |
- def __init__(self, inst): |
- self.inst = inst |
- def callLater(self, callable): |
- for inst in self.inst: |
- inst.callLater(callable) |
- |
- def __init__(self, *stages): |
- Stage.__init__(self) |
- self._stages = [] |
- for stage in stages: |
- self._stages.append(wrap(stage)) |
- |
- def _yield(self): |
- if self.results or self.stop or self.failure: |
- return |
- stages = self._stages |
- later = [] |
- exit = None |
- while stages: |
- if stages[0] is exit: |
- if self.results: |
- return |
- break |
- curr = stages.pop(0) |
- instruction = curr._yield() |
- if curr.results: |
- self.results.append(curr) |
- if curr.failure: |
- self.failure = curr.failure |
- return |
- if curr.stop: |
- exit = None |
- if self.results: |
- return |
- continue |
- stages.append(curr) |
- if not exit: |
- exit = curr |
- if instruction: |
- if isinstance(instruction, CallLater): |
- if instruction not in later: |
- later.append(instruction) |
- continue |
- raise Unsupported(instruction) |
- if later: |
- return Concurrent.Instruction(later) |
- self.stop = True |
- |
-class Merge(Stage): |
- """ |
- Merges two or more Stages results into a single stream |
- |
- For example:: |
- |
- source = flow.Zip([1,flow.Cooperate(),2,3],["one","two"]) |
- printFlow(source) |
- """ |
- def __init__(self, *stages): |
- Stage.__init__(self) |
- self.concurrent = Concurrent(*stages) |
- |
- def _yield(self): |
- if self.results or self.stop or self.failure: |
- return |
- instruction = self.concurrent._yield() |
- if instruction: |
- return instruction |
- for stage in self.concurrent.results: |
- self.results.extend(stage.results) |
- stage.results = [] |
- self.concurrent.results = [] |
- if self.concurrent.stop: |
- self.stop = True |
- self.failure = self.concurrent.failure |
- |
-class Callback(Stage): |
- """ |
- Converts a single-thread push interface into a pull interface. |
- |
- Once this stage is constructed, its result, errback, and finish member |
- variables may be called by a producer. The results of which can be |
- obtained by yielding the Callback and then calling next(). |
- |
- For example:: |
- |
- source = flow.Callback() |
- reactor.callLater(0, lambda: source.result("one")) |
- reactor.callLater(.5, lambda: source.result("two")) |
- reactor.callLater(1, lambda: source.finish()) |
- printFlow(source) |
- |
- """ |
- # TODO: Potentially rename this 'Consumer' and make it |
- # comply with protocols.IConsumer |
- # TODO: Make the inverse stage, which is an IProducer |
- class Instruction(CallLater): |
- def __init__(self): |
- self.flow = lambda: True |
- def callLater(self, callable): |
- self.flow = callable |
- def __init__(self, *trap): |
- Stage.__init__(self, *trap) |
- self._finished = False |
- self._cooperate = Callback.Instruction() |
- def result(self,result): |
- """ called by the producer to indicate a successful result """ |
- self.results.append(result) |
- self._cooperate.flow() |
- def finish(self): |
- """ called by producer to indicate successful stream completion """ |
- assert not self.failure, "failed streams should not be finished" |
- self._finished = True |
- self._cooperate.flow() |
- def errback(self, fail): |
- """ called by the producer in case of Failure """ |
- self.failure = fail |
- self._cooperate.flow() |
- def _yield(self): |
- if self.results or self.stop or self.failure: |
- return |
- if not self.results: |
- if self._finished: |
- self.stop = True |
- return |
- return self._cooperate |
- __call__ = result |
- |