| OLD | NEW | 
|---|
|  | (Empty) | 
| 1 # -*- test-case-name: twisted.test.test_htb -*- |  | 
| 2 # |  | 
| 3 # Copyright (c) 2001-2004 Twisted Matrix Laboratories. |  | 
| 4 # See LICENSE for details. |  | 
| 5 |  | 
| 6 |  | 
| 7 """Hierarchical Token Bucket traffic shaping. |  | 
| 8 |  | 
| 9 Patterned after U{Martin Devera's Hierarchical Token Bucket traffic |  | 
| 10 shaper for the Linux kernel<http://luxik.cdi.cz/~devik/qos/htb/>}. |  | 
| 11 |  | 
| 12 @seealso: U{HTB Linux queuing discipline manual - user guide |  | 
| 13   <http://luxik.cdi.cz/~devik/qos/htb/manual/userg.htm>} |  | 
| 14 @seealso: U{Token Bucket Filter in Linux Advanced Routing & Traffic Control |  | 
| 15     HOWTO<http://lartc.org/howto/lartc.qdisc.classless.html#AEN682>} |  | 
| 16 @author: U{Kevin Turner<mailto:acapnotic@twistedmatrix.com>} |  | 
| 17 """ |  | 
| 18 |  | 
| 19 from __future__ import nested_scopes |  | 
| 20 |  | 
| 21 __version__ = '$Revision: 1.5 $'[11:-2] |  | 
| 22 |  | 
| 23 |  | 
| 24 # TODO: Investigate whether we should be using os.times()[-1] instead of |  | 
| 25 # time.time.  time.time, it has been pointed out, can go backwards.  Is |  | 
| 26 # the same true of os.times? |  | 
| 27 from time import time |  | 
| 28 from zope.interface import implements, Interface |  | 
| 29 |  | 
| 30 from twisted.protocols import pcp |  | 
| 31 |  | 
| 32 |  | 
| 33 class Bucket: |  | 
| 34     """Token bucket, or something like it. |  | 
| 35 |  | 
| 36     I can hold up to a certain number of tokens, and I drain over time. |  | 
| 37 |  | 
| 38     @cvar maxburst: Size of the bucket, in bytes.  If None, the bucket is |  | 
| 39         never full. |  | 
| 40     @type maxburst: int |  | 
| 41     @cvar rate: Rate the bucket drains, in bytes per second.  If None, |  | 
| 42         the bucket drains instantaneously. |  | 
| 43     @type rate: int |  | 
| 44     """ |  | 
| 45 |  | 
| 46     maxburst = None |  | 
| 47     rate = None |  | 
| 48 |  | 
| 49     _refcount = 0 |  | 
| 50 |  | 
| 51     def __init__(self, parentBucket=None): |  | 
| 52         self.content = 0 |  | 
| 53         self.parentBucket=parentBucket |  | 
| 54         self.lastDrip = time() |  | 
| 55 |  | 
| 56     def add(self, amount): |  | 
| 57         """Add tokens to me. |  | 
| 58 |  | 
| 59         @param amount: A quanity of tokens to add. |  | 
| 60         @type amount: int |  | 
| 61 |  | 
| 62         @returns: The number of tokens that fit. |  | 
| 63         @returntype: int |  | 
| 64         """ |  | 
| 65         self.drip() |  | 
| 66         if self.maxburst is None: |  | 
| 67             allowable = amount |  | 
| 68         else: |  | 
| 69             allowable = min(amount, self.maxburst - self.content) |  | 
| 70 |  | 
| 71         if self.parentBucket is not None: |  | 
| 72             allowable = self.parentBucket.add(allowable) |  | 
| 73         self.content += allowable |  | 
| 74         return allowable |  | 
| 75 |  | 
| 76     def drip(self): |  | 
| 77         """Let some of the bucket drain. |  | 
| 78 |  | 
| 79         How much of the bucket drains depends on how long it has been |  | 
| 80         since I was last called. |  | 
| 81 |  | 
| 82         @returns: True if I am now empty. |  | 
| 83         @returntype: bool |  | 
| 84         """ |  | 
| 85         if self.parentBucket is not None: |  | 
| 86             self.parentBucket.drip() |  | 
| 87 |  | 
| 88         if self.rate is None: |  | 
| 89             self.content = 0 |  | 
| 90             return True |  | 
| 91         else: |  | 
| 92             now = time() |  | 
| 93             deltaT = now - self.lastDrip |  | 
| 94             self.content = long(max(0, self.content - deltaT * self.rate)) |  | 
| 95             self.lastDrip = now |  | 
| 96             return False |  | 
| 97 |  | 
| 98 |  | 
| 99 class IBucketFilter(Interface): |  | 
| 100     def getBucketFor(*somethings, **some_kw): |  | 
| 101         """I'll give you a bucket for something. |  | 
| 102 |  | 
| 103         @returntype: L{Bucket} |  | 
| 104         """ |  | 
| 105 |  | 
| 106 class HierarchicalBucketFilter: |  | 
| 107     """I filter things into buckets, and I am nestable. |  | 
| 108 |  | 
| 109     @cvar bucketFactory: Class of buckets to make. |  | 
| 110     @type bucketFactory: L{Bucket} class |  | 
| 111     @cvar sweepInterval: Seconds between sweeping out the bucket cache. |  | 
| 112     @type sweepInterval: int |  | 
| 113     """ |  | 
| 114 |  | 
| 115     implements(IBucketFilter) |  | 
| 116 |  | 
| 117     bucketFactory = Bucket |  | 
| 118     sweepInterval = None |  | 
| 119 |  | 
| 120     def __init__(self, parentFilter=None): |  | 
| 121         self.buckets = {} |  | 
| 122         self.parentFilter = parentFilter |  | 
| 123         self.lastSweep = time() |  | 
| 124 |  | 
| 125     def getBucketFor(self, *a, **kw): |  | 
| 126         """You want a bucket for that?  I'll give you a bucket. |  | 
| 127 |  | 
| 128         Any parameters are passed on to L{getBucketKey}, from them it |  | 
| 129         decides which bucket you get. |  | 
| 130 |  | 
| 131         @returntype: L{Bucket} |  | 
| 132         """ |  | 
| 133         if ((self.sweepInterval is not None) |  | 
| 134             and ((time() - self.lastSweep) > self.sweepInterval)): |  | 
| 135             self.sweep() |  | 
| 136 |  | 
| 137         if self.parentFilter: |  | 
| 138             parentBucket = self.parentFilter.getBucketFor(self, *a, **kw) |  | 
| 139         else: |  | 
| 140             parentBucket = None |  | 
| 141 |  | 
| 142         key = self.getBucketKey(*a, **kw) |  | 
| 143         bucket = self.buckets.get(key) |  | 
| 144         if bucket is None: |  | 
| 145             bucket = self.bucketFactory(parentBucket) |  | 
| 146             self.buckets[key] = bucket |  | 
| 147         return bucket |  | 
| 148 |  | 
| 149     def getBucketKey(self, *a, **kw): |  | 
| 150         """I determine who gets which bucket. |  | 
| 151 |  | 
| 152         Unless I'm overridden, everything gets the same bucket. |  | 
| 153 |  | 
| 154         @returns: something to be used as a key in the bucket cache. |  | 
| 155         """ |  | 
| 156         return None |  | 
| 157 |  | 
| 158     def sweep(self): |  | 
| 159         """I throw away references to empty buckets.""" |  | 
| 160         for key, bucket in self.buckets.items(): |  | 
| 161             if (bucket._refcount == 0) and bucket.drip(): |  | 
| 162                 del self.buckets[key] |  | 
| 163 |  | 
| 164         self.lastSweep = time() |  | 
| 165 |  | 
| 166 |  | 
| 167 class FilterByHost(HierarchicalBucketFilter): |  | 
| 168     """A bucket filter with a bucket for each host. |  | 
| 169     """ |  | 
| 170     sweepInterval = 60 * 20 |  | 
| 171 |  | 
| 172     def getBucketKey(self, transport): |  | 
| 173         return transport.getPeer()[1] |  | 
| 174 |  | 
| 175 |  | 
| 176 class FilterByServer(HierarchicalBucketFilter): |  | 
| 177     """A bucket filter with a bucket for each service. |  | 
| 178     """ |  | 
| 179     sweepInterval = None |  | 
| 180 |  | 
| 181     def getBucketKey(self, transport): |  | 
| 182         return transport.getHost()[2] |  | 
| 183 |  | 
| 184 |  | 
| 185 class ShapedConsumer(pcp.ProducerConsumerProxy): |  | 
| 186     """I wrap a Consumer and shape the rate at which it receives data. |  | 
| 187     """ |  | 
| 188     # Providing a Pull interface means I don't have to try to schedule |  | 
| 189     # traffic with callLaters. |  | 
| 190     iAmStreaming = False |  | 
| 191 |  | 
| 192     def __init__(self, consumer, bucket): |  | 
| 193         pcp.ProducerConsumerProxy.__init__(self, consumer) |  | 
| 194         self.bucket = bucket |  | 
| 195         self.bucket._refcount += 1 |  | 
| 196 |  | 
| 197     def _writeSomeData(self, data): |  | 
| 198         # In practice, this actually results in obscene amounts of |  | 
| 199         # overhead, as a result of generating lots and lots of packets |  | 
| 200         # with twelve-byte payloads.  We may need to do a version of |  | 
| 201         # this with scheduled writes after all. |  | 
| 202         amount = self.bucket.add(len(data)) |  | 
| 203         return pcp.ProducerConsumerProxy._writeSomeData(self, data[:amount]) |  | 
| 204 |  | 
| 205     def stopProducing(self): |  | 
| 206         pcp.ProducerConsumerProxy.stopProducing(self) |  | 
| 207         self.bucket._refcount -= 1 |  | 
| 208 |  | 
| 209 |  | 
| 210 class ShapedTransport(ShapedConsumer): |  | 
| 211     """I wrap a Transport and shape the rate at which it receives data. |  | 
| 212 |  | 
| 213     I am a L{ShapedConsumer} with a little bit of magic to provide for |  | 
| 214     the case where the consumer I wrap is also a Transport and people |  | 
| 215     will be attempting to access attributes I do not proxy as a |  | 
| 216     Consumer (e.g. loseConnection). |  | 
| 217     """ |  | 
| 218     # Ugh.  We only wanted to filter IConsumer, not ITransport. |  | 
| 219 |  | 
| 220     iAmStreaming = False |  | 
| 221     def __getattr__(self, name): |  | 
| 222         # Because people will be doing things like .getPeer and |  | 
| 223         # .loseConnection on me. |  | 
| 224         return getattr(self.consumer, name) |  | 
| 225 |  | 
| 226 |  | 
| 227 class ShapedProtocolFactory: |  | 
| 228     """I dispense Protocols with traffic shaping on their transports. |  | 
| 229 |  | 
| 230     Usage:: |  | 
| 231 |  | 
| 232         myserver = SomeFactory() |  | 
| 233         myserver.protocol = ShapedProtocolFactory(myserver.protocol, |  | 
| 234                                                   bucketFilter) |  | 
| 235 |  | 
| 236     Where SomeServerFactory is a L{twisted.internet.protocol.Factory}, and |  | 
| 237     bucketFilter is an instance of L{HierarchicalBucketFilter}. |  | 
| 238     """ |  | 
| 239     def __init__(self, protoClass, bucketFilter): |  | 
| 240         """Tell me what to wrap and where to get buckets. |  | 
| 241 |  | 
| 242         @param protoClass: The class of Protocol I will generate |  | 
| 243           wrapped instances of. |  | 
| 244         @type protoClass: L{Protocol<twisted.internet.interfaces.IProtocol>} |  | 
| 245           class |  | 
| 246         @param bucketFilter: The filter which will determine how |  | 
| 247           traffic is shaped. |  | 
| 248         @type bucketFilter: L{HierarchicalBucketFilter}. |  | 
| 249         """ |  | 
| 250         # More precisely, protoClass can be any callable that will return |  | 
| 251         # instances of something that implements IProtocol. |  | 
| 252         self.protocol = protoClass |  | 
| 253         self.bucketFilter = bucketFilter |  | 
| 254 |  | 
| 255     def __call__(self, *a, **kw): |  | 
| 256         """Make a Protocol instance with a shaped transport. |  | 
| 257 |  | 
| 258         Any parameters will be passed on to the protocol's initializer. |  | 
| 259 |  | 
| 260         @returns: a Protocol instance with a L{ShapedTransport}. |  | 
| 261         """ |  | 
| 262         proto = self.protocol(*a, **kw) |  | 
| 263         origMakeConnection = proto.makeConnection |  | 
| 264         def makeConnection(transport): |  | 
| 265             bucket = self.bucketFilter.getBucketFor(transport) |  | 
| 266             shapedTransport = ShapedTransport(transport, bucket) |  | 
| 267             return origMakeConnection(shapedTransport) |  | 
| 268         proto.makeConnection = makeConnection |  | 
| 269         return proto |  | 
| OLD | NEW | 
|---|