| OLD | NEW |
| (Empty) |
| 1 # -*- test-case-name: twisted.test.test_journal -*- | |
| 2 # | |
| 3 # Copyright (c) 2001-2004 Twisted Matrix Laboratories. | |
| 4 # See LICENSE for details. | |
| 5 | |
| 6 # | |
| 7 | |
| 8 | |
| 9 """Basic classes and interfaces for journal.""" | |
| 10 | |
| 11 from __future__ import nested_scopes | |
| 12 | |
| 13 # system imports | |
| 14 import os, time | |
| 15 | |
| 16 try: | |
| 17 import cPickle as pickle | |
| 18 except ImportError: | |
| 19 import pickle | |
| 20 | |
| 21 # twisted imports | |
| 22 from zope.interface import implements, Interface | |
| 23 | |
| 24 | |
| 25 class Journal: | |
| 26 """All commands to the system get routed through here. | |
| 27 | |
| 28 Subclasses should implement the actual snapshotting capability. | |
| 29 """ | |
| 30 | |
| 31 def __init__(self, log, journaledService): | |
| 32 self.log = log | |
| 33 self.journaledService = journaledService | |
| 34 self.latestIndex = self.log.getCurrentIndex() | |
| 35 | |
| 36 def updateFromLog(self): | |
| 37 """Run all commands from log that haven't been run yet. | |
| 38 | |
| 39 This method should be run on startup to ensure the snapshot | |
| 40 is up-to-date. | |
| 41 """ | |
| 42 snapshotIndex = self.getLastSnapshot() | |
| 43 if snapshotIndex < self.latestIndex: | |
| 44 for cmdtime, command in self.log.getCommandsSince(snapshotIndex + 1)
: | |
| 45 command.execute(self.journaledService, cmdtime) | |
| 46 | |
| 47 def executeCommand(self, command): | |
| 48 """Log and execute a command.""" | |
| 49 runTime = time.time() | |
| 50 d = self.log.logCommand(command, runTime) | |
| 51 d.addCallback(self._reallyExecute, command, runTime) | |
| 52 return d | |
| 53 | |
| 54 def _reallyExecute(self, index, command, runTime): | |
| 55 """Callback called when logging command is done.""" | |
| 56 result = command.execute(self.journaledService, runTime) | |
| 57 self.latestIndex = index | |
| 58 return result | |
| 59 | |
| 60 def getLastSnapshot(self): | |
| 61 """Return command index of the last snapshot taken.""" | |
| 62 raise NotImplementedError | |
| 63 | |
| 64 def sync(self, *args, **kwargs): | |
| 65 """Save journal to disk, returns Deferred of finish status. | |
| 66 | |
| 67 Subclasses may choose whatever signature is appropriate, or may | |
| 68 not implement this at all. | |
| 69 """ | |
| 70 raise NotImplementedError | |
| 71 | |
| 72 | |
| 73 | |
| 74 class MemoryJournal(Journal): | |
| 75 """Prevayler-like journal that dumps from memory to disk.""" | |
| 76 | |
| 77 def __init__(self, log, journaledService, path, loadedCallback): | |
| 78 self.path = path | |
| 79 if os.path.exists(path): | |
| 80 try: | |
| 81 self.lastSync, obj = pickle.load(open(path, "rb")) | |
| 82 except (IOError, OSError, pickle.UnpicklingError): | |
| 83 self.lastSync, obj = 0, None | |
| 84 loadedCallback(obj) | |
| 85 else: | |
| 86 self.lastSync = 0 | |
| 87 loadedCallback(None) | |
| 88 Journal.__init__(self, log, journaledService) | |
| 89 | |
| 90 def getLastSnapshot(self): | |
| 91 return self.lastSync | |
| 92 | |
| 93 def sync(self, obj): | |
| 94 # make this more reliable at some point | |
| 95 f = open(self.path, "wb") | |
| 96 pickle.dump((self.latestIndex, obj), f, 1) | |
| 97 f.close() | |
| 98 self.lastSync = self.latestIndex | |
| 99 | |
| 100 | |
| 101 class ICommand(Interface): | |
| 102 """A serializable command which interacts with a journaled service.""" | |
| 103 | |
| 104 def execute(journaledService, runTime): | |
| 105 """Run the command and return result.""" | |
| 106 | |
| 107 | |
| 108 class ICommandLog(Interface): | |
| 109 """Interface for command log.""" | |
| 110 | |
| 111 def logCommand(command, runTime): | |
| 112 """Add a command and its run time to the log. | |
| 113 | |
| 114 @return: Deferred of command index. | |
| 115 """ | |
| 116 | |
| 117 def getCurrentIndex(): | |
| 118 """Return index of last command that was logged.""" | |
| 119 | |
| 120 def getCommandsSince(index): | |
| 121 """Return commands who's index >= the given one. | |
| 122 | |
| 123 @return: list of (time, command) tuples, sorted with ascending times. | |
| 124 """ | |
| 125 | |
| 126 | |
| 127 class LoadingService: | |
| 128 """Base class for journalled service used with Wrappables.""" | |
| 129 | |
| 130 def loadObject(self, objType, objId): | |
| 131 """Return object of specified type and id.""" | |
| 132 raise NotImplementedError | |
| 133 | |
| 134 | |
| 135 class Wrappable: | |
| 136 """Base class for objects used with LoadingService.""" | |
| 137 | |
| 138 objectType = None # override in base class | |
| 139 | |
| 140 def getUid(self): | |
| 141 """Return uid for loading with LoadingService.loadObject""" | |
| 142 raise NotImplementedError | |
| 143 | |
| 144 | |
| 145 class WrapperCommand: | |
| 146 | |
| 147 implements(ICommand) | |
| 148 | |
| 149 def __init__(self, methodName, obj, args=(), kwargs={}): | |
| 150 self.obj = obj | |
| 151 self.objId = obj.getUid() | |
| 152 self.objType = obj.objectType | |
| 153 self.methodName = methodName | |
| 154 self.args = args | |
| 155 self.kwargs = kwargs | |
| 156 | |
| 157 def execute(self, svc, commandTime): | |
| 158 if not hasattr(self, "obj"): | |
| 159 obj = svc.loadObject(self.objType, self.objId) | |
| 160 else: | |
| 161 obj = self.obj | |
| 162 return getattr(obj, self.methodName)(*self.args, **self.kwargs) | |
| 163 | |
| 164 def __getstate__(self): | |
| 165 d = self.__dict__.copy() | |
| 166 del d["obj"] | |
| 167 return d | |
| 168 | |
| 169 | |
| 170 def command(methodName, cmdClass=WrapperCommand): | |
| 171 """Wrap a method so it gets turned into command automatically. | |
| 172 | |
| 173 For use with Wrappables. | |
| 174 | |
| 175 Usage:: | |
| 176 | |
| 177 | class Foo(Wrappable): | |
| 178 | objectType = "foo" | |
| 179 | def getUid(self): | |
| 180 | return self.id | |
| 181 | def _bar(self, x): | |
| 182 | return x + 1 | |
| 183 | | |
| 184 | bar = command('_bar') | |
| 185 | |
| 186 The resulting callable will have signature identical to wrapped | |
| 187 function, except that it expects journal as first argument, and | |
| 188 returns a Deferred. | |
| 189 """ | |
| 190 def wrapper(obj, journal, *args, **kwargs): | |
| 191 return journal.executeCommand(cmdClass(methodName, obj, args, kwargs)) | |
| 192 return wrapper | |
| 193 | |
| 194 | |
| 195 class ServiceWrapperCommand: | |
| 196 | |
| 197 implements(ICommand) | |
| 198 | |
| 199 def __init__(self, methodName, args=(), kwargs={}): | |
| 200 self.methodName = methodName | |
| 201 self.args = args | |
| 202 self.kwargs = kwargs | |
| 203 | |
| 204 def execute(self, svc, commandTime): | |
| 205 return getattr(svc, self.methodName)(*self.args, **self.kwargs) | |
| 206 | |
| 207 def __repr__(self): | |
| 208 return "<ServiceWrapperCommand: %s, %s, %s>" % (self.methodName, self.ar
gs, self.kwargs) | |
| 209 | |
| 210 def __cmp__(self, other): | |
| 211 if hasattr(other, "__dict__"): | |
| 212 return cmp(self.__dict__, other.__dict__) | |
| 213 else: | |
| 214 return 0 | |
| 215 | |
| 216 | |
| 217 def serviceCommand(methodName, cmdClass=ServiceWrapperCommand): | |
| 218 """Wrap methods into commands for a journalled service. | |
| 219 | |
| 220 The resulting callable will have signature identical to wrapped | |
| 221 function, except that it expects journal as first argument, and | |
| 222 returns a Deferred. | |
| 223 """ | |
| 224 def wrapper(obj, journal, *args, **kwargs): | |
| 225 return journal.executeCommand(cmdClass(methodName, args, kwargs)) | |
| 226 return wrapper | |
| OLD | NEW |