| OLD | NEW |
| (Empty) |
| 1 # -*- test-case-name: buildbot.test.test_persistent_queue -*- | |
| 2 | |
| 3 try: | |
| 4 # Python 2.4+ | |
| 5 from collections import deque | |
| 6 except ImportError: | |
| 7 deque = None | |
| 8 import os | |
| 9 import pickle | |
| 10 | |
| 11 from zope.interface import implements, Interface | |
| 12 | |
| 13 | |
| 14 def ReadFile(path): | |
| 15 f = open(path, 'rb') | |
| 16 try: | |
| 17 return f.read() | |
| 18 finally: | |
| 19 f.close() | |
| 20 | |
| 21 | |
| 22 def WriteFile(path, buf): | |
| 23 f = open(path, 'wb') | |
| 24 try: | |
| 25 f.write(buf) | |
| 26 finally: | |
| 27 f.close() | |
| 28 | |
| 29 | |
| 30 class IQueue(Interface): | |
| 31 """Abstraction of a queue.""" | |
| 32 def pushItem(item): | |
| 33 """Adds an individual item to the end of the queue. | |
| 34 | |
| 35 Returns an item if it was overflowed.""" | |
| 36 | |
| 37 def insertBackChunk(items): | |
| 38 """Adds a list of items as the oldest entries. | |
| 39 | |
| 40 Normally called in case of failure to process the data, queue the data | |
| 41 back so it can be retrieved at a later time. | |
| 42 | |
| 43 Returns a list of items if it was overflowed.""" | |
| 44 | |
| 45 def popChunk(nbItems=None): | |
| 46 """Pop many items at once. Defaults to self.maxItems().""" | |
| 47 | |
| 48 def save(): | |
| 49 """Save the queue to storage if implemented.""" | |
| 50 | |
| 51 def items(): | |
| 52 """Returns items in the queue. | |
| 53 | |
| 54 Warning: Can be extremely slow for queue on disk.""" | |
| 55 | |
| 56 def nbItems(): | |
| 57 """Returns the number of items in the queue.""" | |
| 58 | |
| 59 def maxItems(): | |
| 60 """Returns the maximum number of items this queue can hold.""" | |
| 61 | |
| 62 | |
| 63 # Available for python 2.3 and earlier. | |
| 64 class ListMemoryQueue(object): | |
| 65 """Simple length bounded queue using list.""" | |
| 66 implements(IQueue) | |
| 67 | |
| 68 def __init__(self, maxItems=None): | |
| 69 self._maxItems = maxItems | |
| 70 if self._maxItems is None: | |
| 71 self._maxItems = 10000 | |
| 72 self._items = [] | |
| 73 | |
| 74 def pushItem(self, item): | |
| 75 self._items.append(item) | |
| 76 if len(self._items) > self._maxItems: | |
| 77 return self._items.pop(0) | |
| 78 | |
| 79 def insertBackChunk(self, chunk): | |
| 80 ret = None | |
| 81 excess = len(self._items) + len(chunk) - self._maxItems | |
| 82 if excess > 0: | |
| 83 ret = chunk[0:excess] | |
| 84 chunk = chunk[excess:] | |
| 85 self._items = chunk + self._items | |
| 86 return ret | |
| 87 | |
| 88 def popChunk(self, nbItems=None): | |
| 89 if nbItems is None: | |
| 90 nbItems = self._maxItems | |
| 91 if nbItems > len(self._items): | |
| 92 items = self._items | |
| 93 self._items = [] | |
| 94 else: | |
| 95 items = self._items[0:nbItems] | |
| 96 del self._items[0:nbItems] | |
| 97 return items | |
| 98 | |
| 99 def save(self): | |
| 100 pass | |
| 101 | |
| 102 def items(self): | |
| 103 return self._items | |
| 104 | |
| 105 def nbItems(self): | |
| 106 return len(self._items) | |
| 107 | |
| 108 def maxItems(self): | |
| 109 return self._maxItems | |
| 110 | |
| 111 if deque: | |
| 112 class DequeMemoryQueue(object): | |
| 113 """Simple length bounded queue using deque. | |
| 114 | |
| 115 list.pop(0) operation is O(n) so for a 10000 items list, it can start to | |
| 116 be real slow. On the contrary, deque.popleft() is O(1) most of the time. | |
| 117 See http://docs.python.org/library/collections.html for more | |
| 118 information. | |
| 119 """ | |
| 120 implements(IQueue) | |
| 121 | |
| 122 def __init__(self, maxItems=None): | |
| 123 self._maxItems = maxItems | |
| 124 if self._maxItems is None: | |
| 125 self._maxItems = 10000 | |
| 126 self._items = deque() | |
| 127 | |
| 128 def pushItem(self, item): | |
| 129 ret = None | |
| 130 if len(self._items) == self._maxItems: | |
| 131 ret = self._items.popleft() | |
| 132 self._items.append(item) | |
| 133 return ret | |
| 134 | |
| 135 def insertBackChunk(self, chunk): | |
| 136 ret = None | |
| 137 excess = len(self._items) + len(chunk) - self._maxItems | |
| 138 if excess > 0: | |
| 139 ret = chunk[0:excess] | |
| 140 chunk = chunk[excess:] | |
| 141 self._items.extendleft(reversed(chunk)) | |
| 142 return ret | |
| 143 | |
| 144 def popChunk(self, nbItems=None): | |
| 145 if nbItems is None: | |
| 146 nbItems = self._maxItems | |
| 147 if nbItems > len(self._items): | |
| 148 items = list(self._items) | |
| 149 self._items = deque() | |
| 150 else: | |
| 151 items = [] | |
| 152 for i in range(nbItems): | |
| 153 items.append(self._items.popleft()) | |
| 154 return items | |
| 155 | |
| 156 def save(self): | |
| 157 pass | |
| 158 | |
| 159 def items(self): | |
| 160 return list(self._items) | |
| 161 | |
| 162 def nbItems(self): | |
| 163 return len(self._items) | |
| 164 | |
| 165 def maxItems(self): | |
| 166 return self._maxItems | |
| 167 | |
| 168 MemoryQueue = DequeMemoryQueue | |
| 169 else: | |
| 170 MemoryQueue = ListMemoryQueue | |
| 171 | |
| 172 | |
| 173 class DiskQueue(object): | |
| 174 """Keeps a list of abstract items and serializes it to the disk. | |
| 175 | |
| 176 Use pickle for serialization.""" | |
| 177 implements(IQueue) | |
| 178 | |
| 179 def __init__(self, path, maxItems=None, pickleFn=pickle.dumps, | |
| 180 unpickleFn=pickle.loads): | |
| 181 """ | |
| 182 @path: directory to save the items. | |
| 183 @maxItems: maximum number of items to keep on disk, flush the | |
| 184 older ones. | |
| 185 @pickleFn: function used to pack the items to disk. | |
| 186 @unpickleFn: function used to unpack items from disk. | |
| 187 """ | |
| 188 self.path = path | |
| 189 self._maxItems = maxItems | |
| 190 if self._maxItems is None: | |
| 191 self._maxItems = 100000 | |
| 192 if not os.path.isdir(self.path): | |
| 193 os.mkdir(self.path) | |
| 194 self.pickleFn = pickleFn | |
| 195 self.unpickleFn = unpickleFn | |
| 196 | |
| 197 # Total number of items. | |
| 198 self._nbItems = 0 | |
| 199 # The actual items id start at one. | |
| 200 self.firstItemId = 0 | |
| 201 self.lastItemId = 0 | |
| 202 self._loadFromDisk() | |
| 203 | |
| 204 def pushItem(self, item): | |
| 205 ret = None | |
| 206 if self._nbItems == self._maxItems: | |
| 207 id = self._findNext(self.firstItemId) | |
| 208 path = os.path.join(self.path, str(id)) | |
| 209 ret = self.unpickleFn(ReadFile(path)) | |
| 210 os.remove(path) | |
| 211 self.firstItemId = id + 1 | |
| 212 else: | |
| 213 self._nbItems += 1 | |
| 214 self.lastItemId += 1 | |
| 215 path = os.path.join(self.path, str(self.lastItemId)) | |
| 216 if os.path.exists(path): | |
| 217 raise IOError('%s already exists.' % path) | |
| 218 WriteFile(path, self.pickleFn(item)) | |
| 219 return ret | |
| 220 | |
| 221 def insertBackChunk(self, chunk): | |
| 222 ret = None | |
| 223 excess = self._nbItems + len(chunk) - self._maxItems | |
| 224 if excess > 0: | |
| 225 ret = chunk[0:excess] | |
| 226 chunk = chunk[excess:] | |
| 227 for i in reversed(chunk): | |
| 228 self.firstItemId -= 1 | |
| 229 path = os.path.join(self.path, str(self.firstItemId)) | |
| 230 if os.path.exists(path): | |
| 231 raise IOError('%s already exists.' % path) | |
| 232 WriteFile(path, self.pickleFn(i)) | |
| 233 self._nbItems += 1 | |
| 234 return ret | |
| 235 | |
| 236 def popChunk(self, nbItems=None): | |
| 237 if nbItems is None: | |
| 238 nbItems = self._maxItems | |
| 239 ret = [] | |
| 240 for i in range(nbItems): | |
| 241 if self._nbItems == 0: | |
| 242 break | |
| 243 id = self._findNext(self.firstItemId) | |
| 244 path = os.path.join(self.path, str(id)) | |
| 245 ret.append(self.unpickleFn(ReadFile(path))) | |
| 246 os.remove(path) | |
| 247 self._nbItems -= 1 | |
| 248 self.firstItemId = id + 1 | |
| 249 return ret | |
| 250 | |
| 251 def save(self): | |
| 252 pass | |
| 253 | |
| 254 def items(self): | |
| 255 """Warning, very slow.""" | |
| 256 ret = [] | |
| 257 for id in range(self.firstItemId, self.lastItemId + 1): | |
| 258 path = os.path.join(self.path, str(id)) | |
| 259 if os.path.exists(path): | |
| 260 ret.append(self.unpickleFn(ReadFile(path))) | |
| 261 return ret | |
| 262 | |
| 263 def nbItems(self): | |
| 264 return self._nbItems | |
| 265 | |
| 266 def maxItems(self): | |
| 267 return self._maxItems | |
| 268 | |
| 269 #### Protected functions | |
| 270 | |
| 271 def _findNext(self, id): | |
| 272 while True: | |
| 273 path = os.path.join(self.path, str(id)) | |
| 274 if os.path.isfile(path): | |
| 275 return id | |
| 276 id += 1 | |
| 277 return None | |
| 278 | |
| 279 def _loadFromDisk(self): | |
| 280 """Loads the queue from disk upto self.maxMemoryItems items into | |
| 281 self.items. | |
| 282 """ | |
| 283 def SafeInt(item): | |
| 284 try: | |
| 285 return int(item) | |
| 286 except ValueError: | |
| 287 return None | |
| 288 | |
| 289 files = filter(None, [SafeInt(x) for x in os.listdir(self.path)]) | |
| 290 files.sort() | |
| 291 self._nbItems = len(files) | |
| 292 if self._nbItems: | |
| 293 self.firstItemId = files[0] | |
| 294 self.lastItemId = files[-1] | |
| 295 | |
| 296 | |
| 297 class PersistentQueue(object): | |
| 298 """Keeps a list of abstract items and serializes it to the disk. | |
| 299 | |
| 300 It has 2 layers of queue, normally an in-memory queue and an on-disk queue. | |
| 301 When the number of items in the primary queue gets too large, the new items | |
| 302 are automatically saved to the secondary queue. The older items are kept in | |
| 303 the primary queue. | |
| 304 """ | |
| 305 implements(IQueue) | |
| 306 | |
| 307 def __init__(self, primaryQueue=None, secondaryQueue=None, path=None): | |
| 308 """ | |
| 309 @primaryQueue: memory queue to use before buffering to disk. | |
| 310 @secondaryQueue: disk queue to use as permanent buffer. | |
| 311 @path: path is a shortcut when using default DiskQueue settings. | |
| 312 """ | |
| 313 self.primaryQueue = primaryQueue | |
| 314 if self.primaryQueue is None: | |
| 315 self.primaryQueue = MemoryQueue() | |
| 316 self.secondaryQueue = secondaryQueue | |
| 317 if self.secondaryQueue is None: | |
| 318 self.secondaryQueue = DiskQueue(path) | |
| 319 # Preload data from the secondary queue only if we know we won't start | |
| 320 # using the secondary queue right away. | |
| 321 if self.secondaryQueue.nbItems() < self.primaryQueue.maxItems(): | |
| 322 self.primaryQueue.insertBackChunk( | |
| 323 self.secondaryQueue.popChunk(self.primaryQueue.maxItems())) | |
| 324 | |
| 325 def pushItem(self, item): | |
| 326 # If there is already items in secondaryQueue, we'd need to pop them | |
| 327 # all to start inserting them into primaryQueue so don't bother and | |
| 328 # just push it in secondaryQueue. | |
| 329 if (self.secondaryQueue.nbItems() or | |
| 330 self.primaryQueue.nbItems() == self.primaryQueue.maxItems()): | |
| 331 item = self.secondaryQueue.pushItem(item) | |
| 332 if item is None: | |
| 333 return item | |
| 334 # If item is not None, secondaryQueue overflowed. We need to push it | |
| 335 # back to primaryQueue so the oldest item is dumped. | |
| 336 # Or everything fit in the primaryQueue. | |
| 337 return self.primaryQueue.pushItem(item) | |
| 338 | |
| 339 def insertBackChunk(self, chunk): | |
| 340 ret = None | |
| 341 # Overall excess | |
| 342 excess = self.nbItems() + len(chunk) - self.maxItems() | |
| 343 if excess > 0: | |
| 344 ret = chunk[0:excess] | |
| 345 chunk = chunk[excess:] | |
| 346 # Memory excess | |
| 347 excess = (self.primaryQueue.nbItems() + len(chunk) - | |
| 348 self.primaryQueue.maxItems()) | |
| 349 if excess > 0: | |
| 350 chunk2 = [] | |
| 351 for i in range(excess): | |
| 352 chunk2.append(self.primaryQueue.items().pop()) | |
| 353 chunk2.reverse() | |
| 354 x = self.primaryQueue.insertBackChunk(chunk) | |
| 355 assert x is None, "primaryQueue.insertBackChunk did not return None" | |
| 356 if excess > 0: | |
| 357 x = self.secondaryQueue.insertBackChunk(chunk2) | |
| 358 assert x is None, ("secondaryQueue.insertBackChunk did not return " | |
| 359 " None") | |
| 360 return ret | |
| 361 | |
| 362 def popChunk(self, nbItems=None): | |
| 363 if nbItems is None: | |
| 364 nbItems = self.primaryQueue.maxItems() | |
| 365 ret = self.primaryQueue.popChunk(nbItems) | |
| 366 nbItems -= len(ret) | |
| 367 if nbItems and self.secondaryQueue.nbItems(): | |
| 368 ret.extend(self.secondaryQueue.popChunk(nbItems)) | |
| 369 return ret | |
| 370 | |
| 371 def save(self): | |
| 372 self.secondaryQueue.insertBackChunk(self.primaryQueue.popChunk()) | |
| 373 | |
| 374 def items(self): | |
| 375 return self.primaryQueue.items() + self.secondaryQueue.items() | |
| 376 | |
| 377 def nbItems(self): | |
| 378 return self.primaryQueue.nbItems() + self.secondaryQueue.nbItems() | |
| 379 | |
| 380 def maxItems(self): | |
| 381 return self.primaryQueue.maxItems() + self.secondaryQueue.maxItems() | |
| 382 | |
| 383 | |
| 384 class IndexedQueue(object): | |
| 385 """Adds functionality to a IQueue object to track its usage. | |
| 386 | |
| 387 Adds a new member function getIndex() and modify popChunk() and | |
| 388 insertBackChunk() to keep a virtual pointer to the queue's first entry | |
| 389 index.""" | |
| 390 implements(IQueue) | |
| 391 | |
| 392 def __init__(self, queue): | |
| 393 # Copy all the member functions from the other object that this class | |
| 394 # doesn't already define. | |
| 395 assert IQueue.providedBy(queue) | |
| 396 def Filter(m): | |
| 397 return (m[0] != '_' and callable(getattr(queue, m)) | |
| 398 and not hasattr(self, m)) | |
| 399 for member in filter(Filter, dir(queue)): | |
| 400 setattr(self, member, getattr(queue, member)) | |
| 401 self.queue = queue | |
| 402 self._index = 0 | |
| 403 | |
| 404 def getIndex(self): | |
| 405 return self._index | |
| 406 | |
| 407 def popChunk(self, *args, **kwargs): | |
| 408 items = self.queue.popChunk(*args, **kwargs) | |
| 409 if items: | |
| 410 self._index += len(items) | |
| 411 return items | |
| 412 | |
| 413 def insertBackChunk(self, items): | |
| 414 self._index -= len(items) | |
| 415 ret = self.queue.insertBackChunk(items) | |
| 416 if ret: | |
| 417 self._index += len(ret) | |
| 418 return ret | |
| 419 | |
| 420 | |
| 421 def ToIndexedQueue(queue): | |
| 422 """If the IQueue wasn't already a IndexedQueue, makes it an IndexedQueue.""" | |
| 423 if not IQueue.providedBy(queue): | |
| 424 raise TypeError("queue doesn't implement IQueue", queue) | |
| 425 if isinstance(queue, IndexedQueue): | |
| 426 return queue | |
| 427 return IndexedQueue(queue) | |
| 428 | |
| 429 # vim: set ts=4 sts=4 sw=4 et: | |
| OLD | NEW |