Index: third_party/google-endpoints/dogpile/util/readwrite_lock.py |
diff --git a/third_party/google-endpoints/dogpile/util/readwrite_lock.py b/third_party/google-endpoints/dogpile/util/readwrite_lock.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..2196ed7d70601c35d9431e2525fcd9d064476377 |
--- /dev/null |
+++ b/third_party/google-endpoints/dogpile/util/readwrite_lock.py |
@@ -0,0 +1,132 @@ |
+from .compat import threading |
+ |
+import logging |
+log = logging.getLogger(__name__) |
+ |
+ |
+class LockError(Exception): |
+ pass |
+ |
+ |
+class ReadWriteMutex(object): |
+ """A mutex which allows multiple readers, single writer. |
+ |
+ :class:`.ReadWriteMutex` uses a Python ``threading.Condition`` |
+ to provide this functionality across threads within a process. |
+ |
+ The Beaker package also contained a file-lock based version |
+ of this concept, so that readers/writers could be synchronized |
+ across processes with a common filesystem. A future Dogpile |
+ release may include this additional class at some point. |
+ |
+ """ |
+ |
+ def __init__(self): |
+ # counts how many asynchronous methods are executing |
+ self.async = 0 |
+ |
+ # pointer to thread that is the current sync operation |
+ self.current_sync_operation = None |
+ |
+ # condition object to lock on |
+ self.condition = threading.Condition(threading.Lock()) |
+ |
+ def acquire_read_lock(self, wait = True): |
+ """Acquire the 'read' lock.""" |
+ self.condition.acquire() |
+ try: |
+ # see if a synchronous operation is waiting to start |
+ # or is already running, in which case we wait (or just |
+ # give up and return) |
+ if wait: |
+ while self.current_sync_operation is not None: |
+ self.condition.wait() |
+ else: |
+ if self.current_sync_operation is not None: |
+ return False |
+ |
+ self.async += 1 |
+ log.debug("%s acquired read lock", self) |
+ finally: |
+ self.condition.release() |
+ |
+ if not wait: |
+ return True |
+ |
+ def release_read_lock(self): |
+ """Release the 'read' lock.""" |
+ self.condition.acquire() |
+ try: |
+ self.async -= 1 |
+ |
+ # check if we are the last asynchronous reader thread |
+ # out the door. |
+ if self.async == 0: |
+ # yes. so if a sync operation is waiting, notifyAll to wake |
+ # it up |
+ if self.current_sync_operation is not None: |
+ self.condition.notifyAll() |
+ elif self.async < 0: |
+ raise LockError("Synchronizer error - too many " |
+ "release_read_locks called") |
+ log.debug("%s released read lock", self) |
+ finally: |
+ self.condition.release() |
+ |
+ def acquire_write_lock(self, wait = True): |
+ """Acquire the 'write' lock.""" |
+ self.condition.acquire() |
+ try: |
+ # here, we are not a synchronous reader, and after returning, |
+ # assuming waiting or immediate availability, we will be. |
+ |
+ if wait: |
+ # if another sync is working, wait |
+ while self.current_sync_operation is not None: |
+ self.condition.wait() |
+ else: |
+ # if another sync is working, |
+ # we dont want to wait, so forget it |
+ if self.current_sync_operation is not None: |
+ return False |
+ |
+ # establish ourselves as the current sync |
+ # this indicates to other read/write operations |
+ # that they should wait until this is None again |
+ self.current_sync_operation = threading.currentThread() |
+ |
+ # now wait again for asyncs to finish |
+ if self.async > 0: |
+ if wait: |
+ # wait |
+ self.condition.wait() |
+ else: |
+ # we dont want to wait, so forget it |
+ self.current_sync_operation = None |
+ return False |
+ log.debug("%s acquired write lock", self) |
+ finally: |
+ self.condition.release() |
+ |
+ if not wait: |
+ return True |
+ |
+ def release_write_lock(self): |
+ """Release the 'write' lock.""" |
+ self.condition.acquire() |
+ try: |
+ if self.current_sync_operation is not threading.currentThread(): |
+ raise LockError("Synchronizer error - current thread doesn't " |
+ "have the write lock") |
+ |
+ # reset the current sync operation so |
+ # another can get it |
+ self.current_sync_operation = None |
+ |
+ # tell everyone to get ready |
+ self.condition.notifyAll() |
+ |
+ log.debug("%s released write lock", self) |
+ finally: |
+ # everyone go !! |
+ self.condition.release() |