OLD | NEW |
| (Empty) |
1 # Copyright (c) 2001-2004 Twisted Matrix Laboratories. | |
2 # See LICENSE for details. | |
3 | |
4 # | |
5 | |
6 """Journal using twisted.enterprise.row RDBMS support. | |
7 | |
8 You're going to need the following table in your database:: | |
9 | |
10 | CREATE TABLE journalinfo | |
11 | ( | |
12 | commandIndex int | |
13 | ); | |
14 | INSERT INTO journalinfo VALUES (0); | |
15 | |
16 """ | |
17 | |
18 from __future__ import nested_scopes | |
19 | |
20 # twisted imports | |
21 from twisted.internet import defer | |
22 | |
23 # sibling imports | |
24 import base | |
25 | |
26 | |
27 # constants for command list | |
28 INSERT, DELETE, UPDATE = range(3) | |
29 | |
30 | |
31 class RowJournal(base.Journal): | |
32 """Journal that stores data 'snapshot' in using twisted.enterprise.row. | |
33 | |
34 Use this as the reflector instead of the original reflector. | |
35 | |
36 It may block on creation, if it has to run recovery. | |
37 """ | |
38 | |
39 def __init__(self, log, journaledService, reflector): | |
40 self.reflector = reflector | |
41 self.commands = [] | |
42 self.syncing = 0 | |
43 base.Journal.__init__(self, log, journaledService) | |
44 | |
45 def updateRow(self, obj): | |
46 """Mark on object for updating when sync()ing.""" | |
47 self.commands.append((UPDATE, obj)) | |
48 | |
49 def insertRow(self, obj): | |
50 """Mark on object for inserting when sync()ing.""" | |
51 self.commands.append((INSERT, obj)) | |
52 | |
53 def deleteRow(self, obj): | |
54 """Mark on object for deleting when sync()ing.""" | |
55 self.commands.append((DELETE, obj)) | |
56 | |
57 def loadObjectsFrom(self, tableName, parentRow=None, data=None, whereClause=
None, forceChildren=0): | |
58 """Flush all objects to the database and then load objects.""" | |
59 d = self.sync() | |
60 d.addCallback(lambda result: self.reflector.loadObjectsFrom( | |
61 tableName, parentRow=parentRow, data=data, whereClause=whereClause, | |
62 forceChildren=forceChildren)) | |
63 return d | |
64 | |
65 def sync(self): | |
66 """Commit changes to database.""" | |
67 if self.syncing: | |
68 raise ValueError, "sync already in progress" | |
69 comandMap = {INSERT : self.reflector.insertRowSQL, | |
70 UPDATE : self.reflector.updateRowSQL, | |
71 DELETE : self.reflector.deleteRowSQL} | |
72 sqlCommands = [] | |
73 for kind, obj in self.commands: | |
74 sqlCommands.append(comandMap[kind](obj)) | |
75 self.commands = [] | |
76 if sqlCommands: | |
77 self.syncing = 1 | |
78 d = self.reflector.dbpool.runInteraction(self._sync, self.latestInde
x, sqlCommands) | |
79 d.addCallback(self._syncDone) | |
80 return d | |
81 else: | |
82 return defer.succeed(1) | |
83 | |
84 def _sync(self, txn, index, commands): | |
85 """Do the actual database synchronization.""" | |
86 for c in commands: | |
87 txn.execute(c) | |
88 txn.update("UPDATE journalinfo SET commandIndex = %d" % index) | |
89 | |
90 def _syncDone(self, result): | |
91 self.syncing = 0 | |
92 return result | |
93 | |
94 def getLastSnapshot(self): | |
95 """Return command index of last snapshot.""" | |
96 conn = self.reflector.dbpool.connect() | |
97 cursor = conn.cursor() | |
98 cursor.execute("SELECT commandIndex FROM journalinfo") | |
99 return cursor.fetchall()[0][0] | |
OLD | NEW |