| OLD | NEW |
| (Empty) |
| 1 # -*- test-case-name: twisted.test.test_htb -*- | |
| 2 # | |
| 3 # Copyright (c) 2001-2004 Twisted Matrix Laboratories. | |
| 4 # See LICENSE for details. | |
| 5 | |
| 6 | |
| 7 """Hierarchical Token Bucket traffic shaping. | |
| 8 | |
| 9 Patterned after U{Martin Devera's Hierarchical Token Bucket traffic | |
| 10 shaper for the Linux kernel<http://luxik.cdi.cz/~devik/qos/htb/>}. | |
| 11 | |
| 12 @seealso: U{HTB Linux queuing discipline manual - user guide | |
| 13 <http://luxik.cdi.cz/~devik/qos/htb/manual/userg.htm>} | |
| 14 @seealso: U{Token Bucket Filter in Linux Advanced Routing & Traffic Control | |
| 15 HOWTO<http://lartc.org/howto/lartc.qdisc.classless.html#AEN682>} | |
| 16 @author: U{Kevin Turner<mailto:acapnotic@twistedmatrix.com>} | |
| 17 """ | |
| 18 | |
| 19 from __future__ import nested_scopes | |
| 20 | |
| 21 __version__ = '$Revision: 1.5 $'[11:-2] | |
| 22 | |
| 23 | |
| 24 # TODO: Investigate whether we should be using os.times()[-1] instead of | |
| 25 # time.time. time.time, it has been pointed out, can go backwards. Is | |
| 26 # the same true of os.times? | |
| 27 from time import time | |
| 28 from zope.interface import implements, Interface | |
| 29 | |
| 30 from twisted.protocols import pcp | |
| 31 | |
| 32 | |
| 33 class Bucket: | |
| 34 """Token bucket, or something like it. | |
| 35 | |
| 36 I can hold up to a certain number of tokens, and I drain over time. | |
| 37 | |
| 38 @cvar maxburst: Size of the bucket, in bytes. If None, the bucket is | |
| 39 never full. | |
| 40 @type maxburst: int | |
| 41 @cvar rate: Rate the bucket drains, in bytes per second. If None, | |
| 42 the bucket drains instantaneously. | |
| 43 @type rate: int | |
| 44 """ | |
| 45 | |
| 46 maxburst = None | |
| 47 rate = None | |
| 48 | |
| 49 _refcount = 0 | |
| 50 | |
| 51 def __init__(self, parentBucket=None): | |
| 52 self.content = 0 | |
| 53 self.parentBucket=parentBucket | |
| 54 self.lastDrip = time() | |
| 55 | |
| 56 def add(self, amount): | |
| 57 """Add tokens to me. | |
| 58 | |
| 59 @param amount: A quanity of tokens to add. | |
| 60 @type amount: int | |
| 61 | |
| 62 @returns: The number of tokens that fit. | |
| 63 @returntype: int | |
| 64 """ | |
| 65 self.drip() | |
| 66 if self.maxburst is None: | |
| 67 allowable = amount | |
| 68 else: | |
| 69 allowable = min(amount, self.maxburst - self.content) | |
| 70 | |
| 71 if self.parentBucket is not None: | |
| 72 allowable = self.parentBucket.add(allowable) | |
| 73 self.content += allowable | |
| 74 return allowable | |
| 75 | |
| 76 def drip(self): | |
| 77 """Let some of the bucket drain. | |
| 78 | |
| 79 How much of the bucket drains depends on how long it has been | |
| 80 since I was last called. | |
| 81 | |
| 82 @returns: True if I am now empty. | |
| 83 @returntype: bool | |
| 84 """ | |
| 85 if self.parentBucket is not None: | |
| 86 self.parentBucket.drip() | |
| 87 | |
| 88 if self.rate is None: | |
| 89 self.content = 0 | |
| 90 return True | |
| 91 else: | |
| 92 now = time() | |
| 93 deltaT = now - self.lastDrip | |
| 94 self.content = long(max(0, self.content - deltaT * self.rate)) | |
| 95 self.lastDrip = now | |
| 96 return False | |
| 97 | |
| 98 | |
| 99 class IBucketFilter(Interface): | |
| 100 def getBucketFor(*somethings, **some_kw): | |
| 101 """I'll give you a bucket for something. | |
| 102 | |
| 103 @returntype: L{Bucket} | |
| 104 """ | |
| 105 | |
| 106 class HierarchicalBucketFilter: | |
| 107 """I filter things into buckets, and I am nestable. | |
| 108 | |
| 109 @cvar bucketFactory: Class of buckets to make. | |
| 110 @type bucketFactory: L{Bucket} class | |
| 111 @cvar sweepInterval: Seconds between sweeping out the bucket cache. | |
| 112 @type sweepInterval: int | |
| 113 """ | |
| 114 | |
| 115 implements(IBucketFilter) | |
| 116 | |
| 117 bucketFactory = Bucket | |
| 118 sweepInterval = None | |
| 119 | |
| 120 def __init__(self, parentFilter=None): | |
| 121 self.buckets = {} | |
| 122 self.parentFilter = parentFilter | |
| 123 self.lastSweep = time() | |
| 124 | |
| 125 def getBucketFor(self, *a, **kw): | |
| 126 """You want a bucket for that? I'll give you a bucket. | |
| 127 | |
| 128 Any parameters are passed on to L{getBucketKey}, from them it | |
| 129 decides which bucket you get. | |
| 130 | |
| 131 @returntype: L{Bucket} | |
| 132 """ | |
| 133 if ((self.sweepInterval is not None) | |
| 134 and ((time() - self.lastSweep) > self.sweepInterval)): | |
| 135 self.sweep() | |
| 136 | |
| 137 if self.parentFilter: | |
| 138 parentBucket = self.parentFilter.getBucketFor(self, *a, **kw) | |
| 139 else: | |
| 140 parentBucket = None | |
| 141 | |
| 142 key = self.getBucketKey(*a, **kw) | |
| 143 bucket = self.buckets.get(key) | |
| 144 if bucket is None: | |
| 145 bucket = self.bucketFactory(parentBucket) | |
| 146 self.buckets[key] = bucket | |
| 147 return bucket | |
| 148 | |
| 149 def getBucketKey(self, *a, **kw): | |
| 150 """I determine who gets which bucket. | |
| 151 | |
| 152 Unless I'm overridden, everything gets the same bucket. | |
| 153 | |
| 154 @returns: something to be used as a key in the bucket cache. | |
| 155 """ | |
| 156 return None | |
| 157 | |
| 158 def sweep(self): | |
| 159 """I throw away references to empty buckets.""" | |
| 160 for key, bucket in self.buckets.items(): | |
| 161 if (bucket._refcount == 0) and bucket.drip(): | |
| 162 del self.buckets[key] | |
| 163 | |
| 164 self.lastSweep = time() | |
| 165 | |
| 166 | |
| 167 class FilterByHost(HierarchicalBucketFilter): | |
| 168 """A bucket filter with a bucket for each host. | |
| 169 """ | |
| 170 sweepInterval = 60 * 20 | |
| 171 | |
| 172 def getBucketKey(self, transport): | |
| 173 return transport.getPeer()[1] | |
| 174 | |
| 175 | |
| 176 class FilterByServer(HierarchicalBucketFilter): | |
| 177 """A bucket filter with a bucket for each service. | |
| 178 """ | |
| 179 sweepInterval = None | |
| 180 | |
| 181 def getBucketKey(self, transport): | |
| 182 return transport.getHost()[2] | |
| 183 | |
| 184 | |
| 185 class ShapedConsumer(pcp.ProducerConsumerProxy): | |
| 186 """I wrap a Consumer and shape the rate at which it receives data. | |
| 187 """ | |
| 188 # Providing a Pull interface means I don't have to try to schedule | |
| 189 # traffic with callLaters. | |
| 190 iAmStreaming = False | |
| 191 | |
| 192 def __init__(self, consumer, bucket): | |
| 193 pcp.ProducerConsumerProxy.__init__(self, consumer) | |
| 194 self.bucket = bucket | |
| 195 self.bucket._refcount += 1 | |
| 196 | |
| 197 def _writeSomeData(self, data): | |
| 198 # In practice, this actually results in obscene amounts of | |
| 199 # overhead, as a result of generating lots and lots of packets | |
| 200 # with twelve-byte payloads. We may need to do a version of | |
| 201 # this with scheduled writes after all. | |
| 202 amount = self.bucket.add(len(data)) | |
| 203 return pcp.ProducerConsumerProxy._writeSomeData(self, data[:amount]) | |
| 204 | |
| 205 def stopProducing(self): | |
| 206 pcp.ProducerConsumerProxy.stopProducing(self) | |
| 207 self.bucket._refcount -= 1 | |
| 208 | |
| 209 | |
| 210 class ShapedTransport(ShapedConsumer): | |
| 211 """I wrap a Transport and shape the rate at which it receives data. | |
| 212 | |
| 213 I am a L{ShapedConsumer} with a little bit of magic to provide for | |
| 214 the case where the consumer I wrap is also a Transport and people | |
| 215 will be attempting to access attributes I do not proxy as a | |
| 216 Consumer (e.g. loseConnection). | |
| 217 """ | |
| 218 # Ugh. We only wanted to filter IConsumer, not ITransport. | |
| 219 | |
| 220 iAmStreaming = False | |
| 221 def __getattr__(self, name): | |
| 222 # Because people will be doing things like .getPeer and | |
| 223 # .loseConnection on me. | |
| 224 return getattr(self.consumer, name) | |
| 225 | |
| 226 | |
| 227 class ShapedProtocolFactory: | |
| 228 """I dispense Protocols with traffic shaping on their transports. | |
| 229 | |
| 230 Usage:: | |
| 231 | |
| 232 myserver = SomeFactory() | |
| 233 myserver.protocol = ShapedProtocolFactory(myserver.protocol, | |
| 234 bucketFilter) | |
| 235 | |
| 236 Where SomeServerFactory is a L{twisted.internet.protocol.Factory}, and | |
| 237 bucketFilter is an instance of L{HierarchicalBucketFilter}. | |
| 238 """ | |
| 239 def __init__(self, protoClass, bucketFilter): | |
| 240 """Tell me what to wrap and where to get buckets. | |
| 241 | |
| 242 @param protoClass: The class of Protocol I will generate | |
| 243 wrapped instances of. | |
| 244 @type protoClass: L{Protocol<twisted.internet.interfaces.IProtocol>} | |
| 245 class | |
| 246 @param bucketFilter: The filter which will determine how | |
| 247 traffic is shaped. | |
| 248 @type bucketFilter: L{HierarchicalBucketFilter}. | |
| 249 """ | |
| 250 # More precisely, protoClass can be any callable that will return | |
| 251 # instances of something that implements IProtocol. | |
| 252 self.protocol = protoClass | |
| 253 self.bucketFilter = bucketFilter | |
| 254 | |
| 255 def __call__(self, *a, **kw): | |
| 256 """Make a Protocol instance with a shaped transport. | |
| 257 | |
| 258 Any parameters will be passed on to the protocol's initializer. | |
| 259 | |
| 260 @returns: a Protocol instance with a L{ShapedTransport}. | |
| 261 """ | |
| 262 proto = self.protocol(*a, **kw) | |
| 263 origMakeConnection = proto.makeConnection | |
| 264 def makeConnection(transport): | |
| 265 bucket = self.bucketFilter.getBucketFor(transport) | |
| 266 shapedTransport = ShapedTransport(transport, bucket) | |
| 267 return origMakeConnection(shapedTransport) | |
| 268 proto.makeConnection = makeConnection | |
| 269 return proto | |
| OLD | NEW |