| Index: third_party/buildbot_7_12/buildbot/status/persistent_queue.py
|
| diff --git a/third_party/buildbot_7_12/buildbot/status/persistent_queue.py b/third_party/buildbot_7_12/buildbot/status/persistent_queue.py
|
| deleted file mode 100644
|
| index b678682d08bb122634e132f7cc650a1a2890278b..0000000000000000000000000000000000000000
|
| --- a/third_party/buildbot_7_12/buildbot/status/persistent_queue.py
|
| +++ /dev/null
|
| @@ -1,429 +0,0 @@
|
| -# -*- test-case-name: buildbot.test.test_persistent_queue -*-
|
| -
|
| -try:
|
| - # Python 2.4+
|
| - from collections import deque
|
| -except ImportError:
|
| - deque = None
|
| -import os
|
| -import pickle
|
| -
|
| -from zope.interface import implements, Interface
|
| -
|
| -
|
| -def ReadFile(path):
|
| - f = open(path, 'rb')
|
| - try:
|
| - return f.read()
|
| - finally:
|
| - f.close()
|
| -
|
| -
|
| -def WriteFile(path, buf):
|
| - f = open(path, 'wb')
|
| - try:
|
| - f.write(buf)
|
| - finally:
|
| - f.close()
|
| -
|
| -
|
| -class IQueue(Interface):
|
| - """Abstraction of a queue."""
|
| - def pushItem(item):
|
| - """Adds an individual item to the end of the queue.
|
| -
|
| - Returns an item if it was overflowed."""
|
| -
|
| - def insertBackChunk(items):
|
| - """Adds a list of items as the oldest entries.
|
| -
|
| - Normally called in case of failure to process the data, queue the data
|
| - back so it can be retrieved at a later time.
|
| -
|
| - Returns a list of items if it was overflowed."""
|
| -
|
| - def popChunk(nbItems=None):
|
| - """Pop many items at once. Defaults to self.maxItems()."""
|
| -
|
| - def save():
|
| - """Save the queue to storage if implemented."""
|
| -
|
| - def items():
|
| - """Returns items in the queue.
|
| -
|
| - Warning: Can be extremely slow for queue on disk."""
|
| -
|
| - def nbItems():
|
| - """Returns the number of items in the queue."""
|
| -
|
| - def maxItems():
|
| - """Returns the maximum number of items this queue can hold."""
|
| -
|
| -
|
| -# Available for python 2.3 and earlier.
|
| -class ListMemoryQueue(object):
|
| - """Simple length bounded queue using list."""
|
| - implements(IQueue)
|
| -
|
| - def __init__(self, maxItems=None):
|
| - self._maxItems = maxItems
|
| - if self._maxItems is None:
|
| - self._maxItems = 10000
|
| - self._items = []
|
| -
|
| - def pushItem(self, item):
|
| - self._items.append(item)
|
| - if len(self._items) > self._maxItems:
|
| - return self._items.pop(0)
|
| -
|
| - def insertBackChunk(self, chunk):
|
| - ret = None
|
| - excess = len(self._items) + len(chunk) - self._maxItems
|
| - if excess > 0:
|
| - ret = chunk[0:excess]
|
| - chunk = chunk[excess:]
|
| - self._items = chunk + self._items
|
| - return ret
|
| -
|
| - def popChunk(self, nbItems=None):
|
| - if nbItems is None:
|
| - nbItems = self._maxItems
|
| - if nbItems > len(self._items):
|
| - items = self._items
|
| - self._items = []
|
| - else:
|
| - items = self._items[0:nbItems]
|
| - del self._items[0:nbItems]
|
| - return items
|
| -
|
| - def save(self):
|
| - pass
|
| -
|
| - def items(self):
|
| - return self._items
|
| -
|
| - def nbItems(self):
|
| - return len(self._items)
|
| -
|
| - def maxItems(self):
|
| - return self._maxItems
|
| -
|
| -if deque:
|
| - class DequeMemoryQueue(object):
|
| - """Simple length bounded queue using deque.
|
| -
|
| - list.pop(0) operation is O(n) so for a 10000 items list, it can start to
|
| - be real slow. On the contrary, deque.popleft() is O(1) most of the time.
|
| - See http://docs.python.org/library/collections.html for more
|
| - information.
|
| - """
|
| - implements(IQueue)
|
| -
|
| - def __init__(self, maxItems=None):
|
| - self._maxItems = maxItems
|
| - if self._maxItems is None:
|
| - self._maxItems = 10000
|
| - self._items = deque()
|
| -
|
| - def pushItem(self, item):
|
| - ret = None
|
| - if len(self._items) == self._maxItems:
|
| - ret = self._items.popleft()
|
| - self._items.append(item)
|
| - return ret
|
| -
|
| - def insertBackChunk(self, chunk):
|
| - ret = None
|
| - excess = len(self._items) + len(chunk) - self._maxItems
|
| - if excess > 0:
|
| - ret = chunk[0:excess]
|
| - chunk = chunk[excess:]
|
| - self._items.extendleft(reversed(chunk))
|
| - return ret
|
| -
|
| - def popChunk(self, nbItems=None):
|
| - if nbItems is None:
|
| - nbItems = self._maxItems
|
| - if nbItems > len(self._items):
|
| - items = list(self._items)
|
| - self._items = deque()
|
| - else:
|
| - items = []
|
| - for i in range(nbItems):
|
| - items.append(self._items.popleft())
|
| - return items
|
| -
|
| - def save(self):
|
| - pass
|
| -
|
| - def items(self):
|
| - return list(self._items)
|
| -
|
| - def nbItems(self):
|
| - return len(self._items)
|
| -
|
| - def maxItems(self):
|
| - return self._maxItems
|
| -
|
| - MemoryQueue = DequeMemoryQueue
|
| -else:
|
| - MemoryQueue = ListMemoryQueue
|
| -
|
| -
|
| -class DiskQueue(object):
|
| - """Keeps a list of abstract items and serializes it to the disk.
|
| -
|
| - Use pickle for serialization."""
|
| - implements(IQueue)
|
| -
|
| - def __init__(self, path, maxItems=None, pickleFn=pickle.dumps,
|
| - unpickleFn=pickle.loads):
|
| - """
|
| - @path: directory to save the items.
|
| - @maxItems: maximum number of items to keep on disk, flush the
|
| - older ones.
|
| - @pickleFn: function used to pack the items to disk.
|
| - @unpickleFn: function used to unpack items from disk.
|
| - """
|
| - self.path = path
|
| - self._maxItems = maxItems
|
| - if self._maxItems is None:
|
| - self._maxItems = 100000
|
| - if not os.path.isdir(self.path):
|
| - os.mkdir(self.path)
|
| - self.pickleFn = pickleFn
|
| - self.unpickleFn = unpickleFn
|
| -
|
| - # Total number of items.
|
| - self._nbItems = 0
|
| - # The actual items id start at one.
|
| - self.firstItemId = 0
|
| - self.lastItemId = 0
|
| - self._loadFromDisk()
|
| -
|
| - def pushItem(self, item):
|
| - ret = None
|
| - if self._nbItems == self._maxItems:
|
| - id = self._findNext(self.firstItemId)
|
| - path = os.path.join(self.path, str(id))
|
| - ret = self.unpickleFn(ReadFile(path))
|
| - os.remove(path)
|
| - self.firstItemId = id + 1
|
| - else:
|
| - self._nbItems += 1
|
| - self.lastItemId += 1
|
| - path = os.path.join(self.path, str(self.lastItemId))
|
| - if os.path.exists(path):
|
| - raise IOError('%s already exists.' % path)
|
| - WriteFile(path, self.pickleFn(item))
|
| - return ret
|
| -
|
| - def insertBackChunk(self, chunk):
|
| - ret = None
|
| - excess = self._nbItems + len(chunk) - self._maxItems
|
| - if excess > 0:
|
| - ret = chunk[0:excess]
|
| - chunk = chunk[excess:]
|
| - for i in reversed(chunk):
|
| - self.firstItemId -= 1
|
| - path = os.path.join(self.path, str(self.firstItemId))
|
| - if os.path.exists(path):
|
| - raise IOError('%s already exists.' % path)
|
| - WriteFile(path, self.pickleFn(i))
|
| - self._nbItems += 1
|
| - return ret
|
| -
|
| - def popChunk(self, nbItems=None):
|
| - if nbItems is None:
|
| - nbItems = self._maxItems
|
| - ret = []
|
| - for i in range(nbItems):
|
| - if self._nbItems == 0:
|
| - break
|
| - id = self._findNext(self.firstItemId)
|
| - path = os.path.join(self.path, str(id))
|
| - ret.append(self.unpickleFn(ReadFile(path)))
|
| - os.remove(path)
|
| - self._nbItems -= 1
|
| - self.firstItemId = id + 1
|
| - return ret
|
| -
|
| - def save(self):
|
| - pass
|
| -
|
| - def items(self):
|
| - """Warning, very slow."""
|
| - ret = []
|
| - for id in range(self.firstItemId, self.lastItemId + 1):
|
| - path = os.path.join(self.path, str(id))
|
| - if os.path.exists(path):
|
| - ret.append(self.unpickleFn(ReadFile(path)))
|
| - return ret
|
| -
|
| - def nbItems(self):
|
| - return self._nbItems
|
| -
|
| - def maxItems(self):
|
| - return self._maxItems
|
| -
|
| - #### Protected functions
|
| -
|
| - def _findNext(self, id):
|
| - while True:
|
| - path = os.path.join(self.path, str(id))
|
| - if os.path.isfile(path):
|
| - return id
|
| - id += 1
|
| - return None
|
| -
|
| - def _loadFromDisk(self):
|
| - """Loads the queue from disk upto self.maxMemoryItems items into
|
| - self.items.
|
| - """
|
| - def SafeInt(item):
|
| - try:
|
| - return int(item)
|
| - except ValueError:
|
| - return None
|
| -
|
| - files = filter(None, [SafeInt(x) for x in os.listdir(self.path)])
|
| - files.sort()
|
| - self._nbItems = len(files)
|
| - if self._nbItems:
|
| - self.firstItemId = files[0]
|
| - self.lastItemId = files[-1]
|
| -
|
| -
|
| -class PersistentQueue(object):
|
| - """Keeps a list of abstract items and serializes it to the disk.
|
| -
|
| - It has 2 layers of queue, normally an in-memory queue and an on-disk queue.
|
| - When the number of items in the primary queue gets too large, the new items
|
| - are automatically saved to the secondary queue. The older items are kept in
|
| - the primary queue.
|
| - """
|
| - implements(IQueue)
|
| -
|
| - def __init__(self, primaryQueue=None, secondaryQueue=None, path=None):
|
| - """
|
| - @primaryQueue: memory queue to use before buffering to disk.
|
| - @secondaryQueue: disk queue to use as permanent buffer.
|
| - @path: path is a shortcut when using default DiskQueue settings.
|
| - """
|
| - self.primaryQueue = primaryQueue
|
| - if self.primaryQueue is None:
|
| - self.primaryQueue = MemoryQueue()
|
| - self.secondaryQueue = secondaryQueue
|
| - if self.secondaryQueue is None:
|
| - self.secondaryQueue = DiskQueue(path)
|
| - # Preload data from the secondary queue only if we know we won't start
|
| - # using the secondary queue right away.
|
| - if self.secondaryQueue.nbItems() < self.primaryQueue.maxItems():
|
| - self.primaryQueue.insertBackChunk(
|
| - self.secondaryQueue.popChunk(self.primaryQueue.maxItems()))
|
| -
|
| - def pushItem(self, item):
|
| - # If there is already items in secondaryQueue, we'd need to pop them
|
| - # all to start inserting them into primaryQueue so don't bother and
|
| - # just push it in secondaryQueue.
|
| - if (self.secondaryQueue.nbItems() or
|
| - self.primaryQueue.nbItems() == self.primaryQueue.maxItems()):
|
| - item = self.secondaryQueue.pushItem(item)
|
| - if item is None:
|
| - return item
|
| - # If item is not None, secondaryQueue overflowed. We need to push it
|
| - # back to primaryQueue so the oldest item is dumped.
|
| - # Or everything fit in the primaryQueue.
|
| - return self.primaryQueue.pushItem(item)
|
| -
|
| - def insertBackChunk(self, chunk):
|
| - ret = None
|
| - # Overall excess
|
| - excess = self.nbItems() + len(chunk) - self.maxItems()
|
| - if excess > 0:
|
| - ret = chunk[0:excess]
|
| - chunk = chunk[excess:]
|
| - # Memory excess
|
| - excess = (self.primaryQueue.nbItems() + len(chunk) -
|
| - self.primaryQueue.maxItems())
|
| - if excess > 0:
|
| - chunk2 = []
|
| - for i in range(excess):
|
| - chunk2.append(self.primaryQueue.items().pop())
|
| - chunk2.reverse()
|
| - x = self.primaryQueue.insertBackChunk(chunk)
|
| - assert x is None, "primaryQueue.insertBackChunk did not return None"
|
| - if excess > 0:
|
| - x = self.secondaryQueue.insertBackChunk(chunk2)
|
| - assert x is None, ("secondaryQueue.insertBackChunk did not return "
|
| - " None")
|
| - return ret
|
| -
|
| - def popChunk(self, nbItems=None):
|
| - if nbItems is None:
|
| - nbItems = self.primaryQueue.maxItems()
|
| - ret = self.primaryQueue.popChunk(nbItems)
|
| - nbItems -= len(ret)
|
| - if nbItems and self.secondaryQueue.nbItems():
|
| - ret.extend(self.secondaryQueue.popChunk(nbItems))
|
| - return ret
|
| -
|
| - def save(self):
|
| - self.secondaryQueue.insertBackChunk(self.primaryQueue.popChunk())
|
| -
|
| - def items(self):
|
| - return self.primaryQueue.items() + self.secondaryQueue.items()
|
| -
|
| - def nbItems(self):
|
| - return self.primaryQueue.nbItems() + self.secondaryQueue.nbItems()
|
| -
|
| - def maxItems(self):
|
| - return self.primaryQueue.maxItems() + self.secondaryQueue.maxItems()
|
| -
|
| -
|
| -class IndexedQueue(object):
|
| - """Adds functionality to a IQueue object to track its usage.
|
| -
|
| - Adds a new member function getIndex() and modify popChunk() and
|
| - insertBackChunk() to keep a virtual pointer to the queue's first entry
|
| - index."""
|
| - implements(IQueue)
|
| -
|
| - def __init__(self, queue):
|
| - # Copy all the member functions from the other object that this class
|
| - # doesn't already define.
|
| - assert IQueue.providedBy(queue)
|
| - def Filter(m):
|
| - return (m[0] != '_' and callable(getattr(queue, m))
|
| - and not hasattr(self, m))
|
| - for member in filter(Filter, dir(queue)):
|
| - setattr(self, member, getattr(queue, member))
|
| - self.queue = queue
|
| - self._index = 0
|
| -
|
| - def getIndex(self):
|
| - return self._index
|
| -
|
| - def popChunk(self, *args, **kwargs):
|
| - items = self.queue.popChunk(*args, **kwargs)
|
| - if items:
|
| - self._index += len(items)
|
| - return items
|
| -
|
| - def insertBackChunk(self, items):
|
| - self._index -= len(items)
|
| - ret = self.queue.insertBackChunk(items)
|
| - if ret:
|
| - self._index += len(ret)
|
| - return ret
|
| -
|
| -
|
| -def ToIndexedQueue(queue):
|
| - """If the IQueue wasn't already a IndexedQueue, makes it an IndexedQueue."""
|
| - if not IQueue.providedBy(queue):
|
| - raise TypeError("queue doesn't implement IQueue", queue)
|
| - if isinstance(queue, IndexedQueue):
|
| - return queue
|
| - return IndexedQueue(queue)
|
| -
|
| -# vim: set ts=4 sts=4 sw=4 et:
|
|
|