OLD | NEW |
(Empty) | |
| 1 from .compat import threading |
| 2 |
| 3 import logging |
| 4 log = logging.getLogger(__name__) |
| 5 |
| 6 |
| 7 class LockError(Exception): |
| 8 pass |
| 9 |
| 10 |
| 11 class ReadWriteMutex(object): |
| 12 """A mutex which allows multiple readers, single writer. |
| 13 |
| 14 :class:`.ReadWriteMutex` uses a Python ``threading.Condition`` |
| 15 to provide this functionality across threads within a process. |
| 16 |
| 17 The Beaker package also contained a file-lock based version |
| 18 of this concept, so that readers/writers could be synchronized |
| 19 across processes with a common filesystem. A future Dogpile |
| 20 release may include this additional class at some point. |
| 21 |
| 22 """ |
| 23 |
| 24 def __init__(self): |
| 25 # counts how many asynchronous methods are executing |
| 26 self.async = 0 |
| 27 |
| 28 # pointer to thread that is the current sync operation |
| 29 self.current_sync_operation = None |
| 30 |
| 31 # condition object to lock on |
| 32 self.condition = threading.Condition(threading.Lock()) |
| 33 |
| 34 def acquire_read_lock(self, wait = True): |
| 35 """Acquire the 'read' lock.""" |
| 36 self.condition.acquire() |
| 37 try: |
| 38 # see if a synchronous operation is waiting to start |
| 39 # or is already running, in which case we wait (or just |
| 40 # give up and return) |
| 41 if wait: |
| 42 while self.current_sync_operation is not None: |
| 43 self.condition.wait() |
| 44 else: |
| 45 if self.current_sync_operation is not None: |
| 46 return False |
| 47 |
| 48 self.async += 1 |
| 49 log.debug("%s acquired read lock", self) |
| 50 finally: |
| 51 self.condition.release() |
| 52 |
| 53 if not wait: |
| 54 return True |
| 55 |
| 56 def release_read_lock(self): |
| 57 """Release the 'read' lock.""" |
| 58 self.condition.acquire() |
| 59 try: |
| 60 self.async -= 1 |
| 61 |
| 62 # check if we are the last asynchronous reader thread |
| 63 # out the door. |
| 64 if self.async == 0: |
| 65 # yes. so if a sync operation is waiting, notifyAll to wake |
| 66 # it up |
| 67 if self.current_sync_operation is not None: |
| 68 self.condition.notifyAll() |
| 69 elif self.async < 0: |
| 70 raise LockError("Synchronizer error - too many " |
| 71 "release_read_locks called") |
| 72 log.debug("%s released read lock", self) |
| 73 finally: |
| 74 self.condition.release() |
| 75 |
| 76 def acquire_write_lock(self, wait = True): |
| 77 """Acquire the 'write' lock.""" |
| 78 self.condition.acquire() |
| 79 try: |
| 80 # here, we are not a synchronous reader, and after returning, |
| 81 # assuming waiting or immediate availability, we will be. |
| 82 |
| 83 if wait: |
| 84 # if another sync is working, wait |
| 85 while self.current_sync_operation is not None: |
| 86 self.condition.wait() |
| 87 else: |
| 88 # if another sync is working, |
| 89 # we dont want to wait, so forget it |
| 90 if self.current_sync_operation is not None: |
| 91 return False |
| 92 |
| 93 # establish ourselves as the current sync |
| 94 # this indicates to other read/write operations |
| 95 # that they should wait until this is None again |
| 96 self.current_sync_operation = threading.currentThread() |
| 97 |
| 98 # now wait again for asyncs to finish |
| 99 if self.async > 0: |
| 100 if wait: |
| 101 # wait |
| 102 self.condition.wait() |
| 103 else: |
| 104 # we dont want to wait, so forget it |
| 105 self.current_sync_operation = None |
| 106 return False |
| 107 log.debug("%s acquired write lock", self) |
| 108 finally: |
| 109 self.condition.release() |
| 110 |
| 111 if not wait: |
| 112 return True |
| 113 |
| 114 def release_write_lock(self): |
| 115 """Release the 'write' lock.""" |
| 116 self.condition.acquire() |
| 117 try: |
| 118 if self.current_sync_operation is not threading.currentThread(): |
| 119 raise LockError("Synchronizer error - current thread doesn't " |
| 120 "have the write lock") |
| 121 |
| 122 # reset the current sync operation so |
| 123 # another can get it |
| 124 self.current_sync_operation = None |
| 125 |
| 126 # tell everyone to get ready |
| 127 self.condition.notifyAll() |
| 128 |
| 129 log.debug("%s released write lock", self) |
| 130 finally: |
| 131 # everyone go !! |
| 132 self.condition.release() |
OLD | NEW |