OLD | NEW |
| (Empty) |
1 # -*- test-case-name: twisted.test.test_journal -*- | |
2 # | |
3 # Copyright (c) 2001-2004 Twisted Matrix Laboratories. | |
4 # See LICENSE for details. | |
5 | |
6 # | |
7 | |
8 | |
9 """Basic classes and interfaces for journal.""" | |
10 | |
11 from __future__ import nested_scopes | |
12 | |
13 # system imports | |
14 import os, time | |
15 | |
16 try: | |
17 import cPickle as pickle | |
18 except ImportError: | |
19 import pickle | |
20 | |
21 # twisted imports | |
22 from zope.interface import implements, Interface | |
23 | |
24 | |
25 class Journal: | |
26 """All commands to the system get routed through here. | |
27 | |
28 Subclasses should implement the actual snapshotting capability. | |
29 """ | |
30 | |
31 def __init__(self, log, journaledService): | |
32 self.log = log | |
33 self.journaledService = journaledService | |
34 self.latestIndex = self.log.getCurrentIndex() | |
35 | |
36 def updateFromLog(self): | |
37 """Run all commands from log that haven't been run yet. | |
38 | |
39 This method should be run on startup to ensure the snapshot | |
40 is up-to-date. | |
41 """ | |
42 snapshotIndex = self.getLastSnapshot() | |
43 if snapshotIndex < self.latestIndex: | |
44 for cmdtime, command in self.log.getCommandsSince(snapshotIndex + 1)
: | |
45 command.execute(self.journaledService, cmdtime) | |
46 | |
47 def executeCommand(self, command): | |
48 """Log and execute a command.""" | |
49 runTime = time.time() | |
50 d = self.log.logCommand(command, runTime) | |
51 d.addCallback(self._reallyExecute, command, runTime) | |
52 return d | |
53 | |
54 def _reallyExecute(self, index, command, runTime): | |
55 """Callback called when logging command is done.""" | |
56 result = command.execute(self.journaledService, runTime) | |
57 self.latestIndex = index | |
58 return result | |
59 | |
60 def getLastSnapshot(self): | |
61 """Return command index of the last snapshot taken.""" | |
62 raise NotImplementedError | |
63 | |
64 def sync(self, *args, **kwargs): | |
65 """Save journal to disk, returns Deferred of finish status. | |
66 | |
67 Subclasses may choose whatever signature is appropriate, or may | |
68 not implement this at all. | |
69 """ | |
70 raise NotImplementedError | |
71 | |
72 | |
73 | |
74 class MemoryJournal(Journal): | |
75 """Prevayler-like journal that dumps from memory to disk.""" | |
76 | |
77 def __init__(self, log, journaledService, path, loadedCallback): | |
78 self.path = path | |
79 if os.path.exists(path): | |
80 try: | |
81 self.lastSync, obj = pickle.load(open(path, "rb")) | |
82 except (IOError, OSError, pickle.UnpicklingError): | |
83 self.lastSync, obj = 0, None | |
84 loadedCallback(obj) | |
85 else: | |
86 self.lastSync = 0 | |
87 loadedCallback(None) | |
88 Journal.__init__(self, log, journaledService) | |
89 | |
90 def getLastSnapshot(self): | |
91 return self.lastSync | |
92 | |
93 def sync(self, obj): | |
94 # make this more reliable at some point | |
95 f = open(self.path, "wb") | |
96 pickle.dump((self.latestIndex, obj), f, 1) | |
97 f.close() | |
98 self.lastSync = self.latestIndex | |
99 | |
100 | |
101 class ICommand(Interface): | |
102 """A serializable command which interacts with a journaled service.""" | |
103 | |
104 def execute(journaledService, runTime): | |
105 """Run the command and return result.""" | |
106 | |
107 | |
108 class ICommandLog(Interface): | |
109 """Interface for command log.""" | |
110 | |
111 def logCommand(command, runTime): | |
112 """Add a command and its run time to the log. | |
113 | |
114 @return: Deferred of command index. | |
115 """ | |
116 | |
117 def getCurrentIndex(): | |
118 """Return index of last command that was logged.""" | |
119 | |
120 def getCommandsSince(index): | |
121 """Return commands who's index >= the given one. | |
122 | |
123 @return: list of (time, command) tuples, sorted with ascending times. | |
124 """ | |
125 | |
126 | |
127 class LoadingService: | |
128 """Base class for journalled service used with Wrappables.""" | |
129 | |
130 def loadObject(self, objType, objId): | |
131 """Return object of specified type and id.""" | |
132 raise NotImplementedError | |
133 | |
134 | |
135 class Wrappable: | |
136 """Base class for objects used with LoadingService.""" | |
137 | |
138 objectType = None # override in base class | |
139 | |
140 def getUid(self): | |
141 """Return uid for loading with LoadingService.loadObject""" | |
142 raise NotImplementedError | |
143 | |
144 | |
145 class WrapperCommand: | |
146 | |
147 implements(ICommand) | |
148 | |
149 def __init__(self, methodName, obj, args=(), kwargs={}): | |
150 self.obj = obj | |
151 self.objId = obj.getUid() | |
152 self.objType = obj.objectType | |
153 self.methodName = methodName | |
154 self.args = args | |
155 self.kwargs = kwargs | |
156 | |
157 def execute(self, svc, commandTime): | |
158 if not hasattr(self, "obj"): | |
159 obj = svc.loadObject(self.objType, self.objId) | |
160 else: | |
161 obj = self.obj | |
162 return getattr(obj, self.methodName)(*self.args, **self.kwargs) | |
163 | |
164 def __getstate__(self): | |
165 d = self.__dict__.copy() | |
166 del d["obj"] | |
167 return d | |
168 | |
169 | |
170 def command(methodName, cmdClass=WrapperCommand): | |
171 """Wrap a method so it gets turned into command automatically. | |
172 | |
173 For use with Wrappables. | |
174 | |
175 Usage:: | |
176 | |
177 | class Foo(Wrappable): | |
178 | objectType = "foo" | |
179 | def getUid(self): | |
180 | return self.id | |
181 | def _bar(self, x): | |
182 | return x + 1 | |
183 | | |
184 | bar = command('_bar') | |
185 | |
186 The resulting callable will have signature identical to wrapped | |
187 function, except that it expects journal as first argument, and | |
188 returns a Deferred. | |
189 """ | |
190 def wrapper(obj, journal, *args, **kwargs): | |
191 return journal.executeCommand(cmdClass(methodName, obj, args, kwargs)) | |
192 return wrapper | |
193 | |
194 | |
195 class ServiceWrapperCommand: | |
196 | |
197 implements(ICommand) | |
198 | |
199 def __init__(self, methodName, args=(), kwargs={}): | |
200 self.methodName = methodName | |
201 self.args = args | |
202 self.kwargs = kwargs | |
203 | |
204 def execute(self, svc, commandTime): | |
205 return getattr(svc, self.methodName)(*self.args, **self.kwargs) | |
206 | |
207 def __repr__(self): | |
208 return "<ServiceWrapperCommand: %s, %s, %s>" % (self.methodName, self.ar
gs, self.kwargs) | |
209 | |
210 def __cmp__(self, other): | |
211 if hasattr(other, "__dict__"): | |
212 return cmp(self.__dict__, other.__dict__) | |
213 else: | |
214 return 0 | |
215 | |
216 | |
217 def serviceCommand(methodName, cmdClass=ServiceWrapperCommand): | |
218 """Wrap methods into commands for a journalled service. | |
219 | |
220 The resulting callable will have signature identical to wrapped | |
221 function, except that it expects journal as first argument, and | |
222 returns a Deferred. | |
223 """ | |
224 def wrapper(obj, journal, *args, **kwargs): | |
225 return journal.executeCommand(cmdClass(methodName, args, kwargs)) | |
226 return wrapper | |
OLD | NEW |