OLD | NEW |
| (Empty) |
1 # -*- test-case-name: twisted.test.test_htb -*- | |
2 # | |
3 # Copyright (c) 2001-2004 Twisted Matrix Laboratories. | |
4 # See LICENSE for details. | |
5 | |
6 | |
7 """Hierarchical Token Bucket traffic shaping. | |
8 | |
9 Patterned after U{Martin Devera's Hierarchical Token Bucket traffic | |
10 shaper for the Linux kernel<http://luxik.cdi.cz/~devik/qos/htb/>}. | |
11 | |
12 @seealso: U{HTB Linux queuing discipline manual - user guide | |
13 <http://luxik.cdi.cz/~devik/qos/htb/manual/userg.htm>} | |
14 @seealso: U{Token Bucket Filter in Linux Advanced Routing & Traffic Control | |
15 HOWTO<http://lartc.org/howto/lartc.qdisc.classless.html#AEN682>} | |
16 @author: U{Kevin Turner<mailto:acapnotic@twistedmatrix.com>} | |
17 """ | |
18 | |
19 from __future__ import nested_scopes | |
20 | |
21 __version__ = '$Revision: 1.5 $'[11:-2] | |
22 | |
23 | |
24 # TODO: Investigate whether we should be using os.times()[-1] instead of | |
25 # time.time. time.time, it has been pointed out, can go backwards. Is | |
26 # the same true of os.times? | |
27 from time import time | |
28 from zope.interface import implements, Interface | |
29 | |
30 from twisted.protocols import pcp | |
31 | |
32 | |
33 class Bucket: | |
34 """Token bucket, or something like it. | |
35 | |
36 I can hold up to a certain number of tokens, and I drain over time. | |
37 | |
38 @cvar maxburst: Size of the bucket, in bytes. If None, the bucket is | |
39 never full. | |
40 @type maxburst: int | |
41 @cvar rate: Rate the bucket drains, in bytes per second. If None, | |
42 the bucket drains instantaneously. | |
43 @type rate: int | |
44 """ | |
45 | |
46 maxburst = None | |
47 rate = None | |
48 | |
49 _refcount = 0 | |
50 | |
51 def __init__(self, parentBucket=None): | |
52 self.content = 0 | |
53 self.parentBucket=parentBucket | |
54 self.lastDrip = time() | |
55 | |
56 def add(self, amount): | |
57 """Add tokens to me. | |
58 | |
59 @param amount: A quanity of tokens to add. | |
60 @type amount: int | |
61 | |
62 @returns: The number of tokens that fit. | |
63 @returntype: int | |
64 """ | |
65 self.drip() | |
66 if self.maxburst is None: | |
67 allowable = amount | |
68 else: | |
69 allowable = min(amount, self.maxburst - self.content) | |
70 | |
71 if self.parentBucket is not None: | |
72 allowable = self.parentBucket.add(allowable) | |
73 self.content += allowable | |
74 return allowable | |
75 | |
76 def drip(self): | |
77 """Let some of the bucket drain. | |
78 | |
79 How much of the bucket drains depends on how long it has been | |
80 since I was last called. | |
81 | |
82 @returns: True if I am now empty. | |
83 @returntype: bool | |
84 """ | |
85 if self.parentBucket is not None: | |
86 self.parentBucket.drip() | |
87 | |
88 if self.rate is None: | |
89 self.content = 0 | |
90 return True | |
91 else: | |
92 now = time() | |
93 deltaT = now - self.lastDrip | |
94 self.content = long(max(0, self.content - deltaT * self.rate)) | |
95 self.lastDrip = now | |
96 return False | |
97 | |
98 | |
99 class IBucketFilter(Interface): | |
100 def getBucketFor(*somethings, **some_kw): | |
101 """I'll give you a bucket for something. | |
102 | |
103 @returntype: L{Bucket} | |
104 """ | |
105 | |
106 class HierarchicalBucketFilter: | |
107 """I filter things into buckets, and I am nestable. | |
108 | |
109 @cvar bucketFactory: Class of buckets to make. | |
110 @type bucketFactory: L{Bucket} class | |
111 @cvar sweepInterval: Seconds between sweeping out the bucket cache. | |
112 @type sweepInterval: int | |
113 """ | |
114 | |
115 implements(IBucketFilter) | |
116 | |
117 bucketFactory = Bucket | |
118 sweepInterval = None | |
119 | |
120 def __init__(self, parentFilter=None): | |
121 self.buckets = {} | |
122 self.parentFilter = parentFilter | |
123 self.lastSweep = time() | |
124 | |
125 def getBucketFor(self, *a, **kw): | |
126 """You want a bucket for that? I'll give you a bucket. | |
127 | |
128 Any parameters are passed on to L{getBucketKey}, from them it | |
129 decides which bucket you get. | |
130 | |
131 @returntype: L{Bucket} | |
132 """ | |
133 if ((self.sweepInterval is not None) | |
134 and ((time() - self.lastSweep) > self.sweepInterval)): | |
135 self.sweep() | |
136 | |
137 if self.parentFilter: | |
138 parentBucket = self.parentFilter.getBucketFor(self, *a, **kw) | |
139 else: | |
140 parentBucket = None | |
141 | |
142 key = self.getBucketKey(*a, **kw) | |
143 bucket = self.buckets.get(key) | |
144 if bucket is None: | |
145 bucket = self.bucketFactory(parentBucket) | |
146 self.buckets[key] = bucket | |
147 return bucket | |
148 | |
149 def getBucketKey(self, *a, **kw): | |
150 """I determine who gets which bucket. | |
151 | |
152 Unless I'm overridden, everything gets the same bucket. | |
153 | |
154 @returns: something to be used as a key in the bucket cache. | |
155 """ | |
156 return None | |
157 | |
158 def sweep(self): | |
159 """I throw away references to empty buckets.""" | |
160 for key, bucket in self.buckets.items(): | |
161 if (bucket._refcount == 0) and bucket.drip(): | |
162 del self.buckets[key] | |
163 | |
164 self.lastSweep = time() | |
165 | |
166 | |
167 class FilterByHost(HierarchicalBucketFilter): | |
168 """A bucket filter with a bucket for each host. | |
169 """ | |
170 sweepInterval = 60 * 20 | |
171 | |
172 def getBucketKey(self, transport): | |
173 return transport.getPeer()[1] | |
174 | |
175 | |
176 class FilterByServer(HierarchicalBucketFilter): | |
177 """A bucket filter with a bucket for each service. | |
178 """ | |
179 sweepInterval = None | |
180 | |
181 def getBucketKey(self, transport): | |
182 return transport.getHost()[2] | |
183 | |
184 | |
185 class ShapedConsumer(pcp.ProducerConsumerProxy): | |
186 """I wrap a Consumer and shape the rate at which it receives data. | |
187 """ | |
188 # Providing a Pull interface means I don't have to try to schedule | |
189 # traffic with callLaters. | |
190 iAmStreaming = False | |
191 | |
192 def __init__(self, consumer, bucket): | |
193 pcp.ProducerConsumerProxy.__init__(self, consumer) | |
194 self.bucket = bucket | |
195 self.bucket._refcount += 1 | |
196 | |
197 def _writeSomeData(self, data): | |
198 # In practice, this actually results in obscene amounts of | |
199 # overhead, as a result of generating lots and lots of packets | |
200 # with twelve-byte payloads. We may need to do a version of | |
201 # this with scheduled writes after all. | |
202 amount = self.bucket.add(len(data)) | |
203 return pcp.ProducerConsumerProxy._writeSomeData(self, data[:amount]) | |
204 | |
205 def stopProducing(self): | |
206 pcp.ProducerConsumerProxy.stopProducing(self) | |
207 self.bucket._refcount -= 1 | |
208 | |
209 | |
210 class ShapedTransport(ShapedConsumer): | |
211 """I wrap a Transport and shape the rate at which it receives data. | |
212 | |
213 I am a L{ShapedConsumer} with a little bit of magic to provide for | |
214 the case where the consumer I wrap is also a Transport and people | |
215 will be attempting to access attributes I do not proxy as a | |
216 Consumer (e.g. loseConnection). | |
217 """ | |
218 # Ugh. We only wanted to filter IConsumer, not ITransport. | |
219 | |
220 iAmStreaming = False | |
221 def __getattr__(self, name): | |
222 # Because people will be doing things like .getPeer and | |
223 # .loseConnection on me. | |
224 return getattr(self.consumer, name) | |
225 | |
226 | |
227 class ShapedProtocolFactory: | |
228 """I dispense Protocols with traffic shaping on their transports. | |
229 | |
230 Usage:: | |
231 | |
232 myserver = SomeFactory() | |
233 myserver.protocol = ShapedProtocolFactory(myserver.protocol, | |
234 bucketFilter) | |
235 | |
236 Where SomeServerFactory is a L{twisted.internet.protocol.Factory}, and | |
237 bucketFilter is an instance of L{HierarchicalBucketFilter}. | |
238 """ | |
239 def __init__(self, protoClass, bucketFilter): | |
240 """Tell me what to wrap and where to get buckets. | |
241 | |
242 @param protoClass: The class of Protocol I will generate | |
243 wrapped instances of. | |
244 @type protoClass: L{Protocol<twisted.internet.interfaces.IProtocol>} | |
245 class | |
246 @param bucketFilter: The filter which will determine how | |
247 traffic is shaped. | |
248 @type bucketFilter: L{HierarchicalBucketFilter}. | |
249 """ | |
250 # More precisely, protoClass can be any callable that will return | |
251 # instances of something that implements IProtocol. | |
252 self.protocol = protoClass | |
253 self.bucketFilter = bucketFilter | |
254 | |
255 def __call__(self, *a, **kw): | |
256 """Make a Protocol instance with a shaped transport. | |
257 | |
258 Any parameters will be passed on to the protocol's initializer. | |
259 | |
260 @returns: a Protocol instance with a L{ShapedTransport}. | |
261 """ | |
262 proto = self.protocol(*a, **kw) | |
263 origMakeConnection = proto.makeConnection | |
264 def makeConnection(transport): | |
265 bucket = self.bucketFilter.getBucketFor(transport) | |
266 shapedTransport = ShapedTransport(transport, bucket) | |
267 return origMakeConnection(shapedTransport) | |
268 proto.makeConnection = makeConnection | |
269 return proto | |
OLD | NEW |