| OLD | NEW |
| (Empty) |
| 1 # Copyright (c) 2001-2004 Twisted Matrix Laboratories. | |
| 2 # See LICENSE for details. | |
| 3 | |
| 4 # | |
| 5 | |
| 6 """Journal using twisted.enterprise.row RDBMS support. | |
| 7 | |
| 8 You're going to need the following table in your database:: | |
| 9 | |
| 10 | CREATE TABLE journalinfo | |
| 11 | ( | |
| 12 | commandIndex int | |
| 13 | ); | |
| 14 | INSERT INTO journalinfo VALUES (0); | |
| 15 | |
| 16 """ | |
| 17 | |
| 18 from __future__ import nested_scopes | |
| 19 | |
| 20 # twisted imports | |
| 21 from twisted.internet import defer | |
| 22 | |
| 23 # sibling imports | |
| 24 import base | |
| 25 | |
| 26 | |
| 27 # constants for command list | |
| 28 INSERT, DELETE, UPDATE = range(3) | |
| 29 | |
| 30 | |
| 31 class RowJournal(base.Journal): | |
| 32 """Journal that stores data 'snapshot' in using twisted.enterprise.row. | |
| 33 | |
| 34 Use this as the reflector instead of the original reflector. | |
| 35 | |
| 36 It may block on creation, if it has to run recovery. | |
| 37 """ | |
| 38 | |
| 39 def __init__(self, log, journaledService, reflector): | |
| 40 self.reflector = reflector | |
| 41 self.commands = [] | |
| 42 self.syncing = 0 | |
| 43 base.Journal.__init__(self, log, journaledService) | |
| 44 | |
| 45 def updateRow(self, obj): | |
| 46 """Mark on object for updating when sync()ing.""" | |
| 47 self.commands.append((UPDATE, obj)) | |
| 48 | |
| 49 def insertRow(self, obj): | |
| 50 """Mark on object for inserting when sync()ing.""" | |
| 51 self.commands.append((INSERT, obj)) | |
| 52 | |
| 53 def deleteRow(self, obj): | |
| 54 """Mark on object for deleting when sync()ing.""" | |
| 55 self.commands.append((DELETE, obj)) | |
| 56 | |
| 57 def loadObjectsFrom(self, tableName, parentRow=None, data=None, whereClause=
None, forceChildren=0): | |
| 58 """Flush all objects to the database and then load objects.""" | |
| 59 d = self.sync() | |
| 60 d.addCallback(lambda result: self.reflector.loadObjectsFrom( | |
| 61 tableName, parentRow=parentRow, data=data, whereClause=whereClause, | |
| 62 forceChildren=forceChildren)) | |
| 63 return d | |
| 64 | |
| 65 def sync(self): | |
| 66 """Commit changes to database.""" | |
| 67 if self.syncing: | |
| 68 raise ValueError, "sync already in progress" | |
| 69 comandMap = {INSERT : self.reflector.insertRowSQL, | |
| 70 UPDATE : self.reflector.updateRowSQL, | |
| 71 DELETE : self.reflector.deleteRowSQL} | |
| 72 sqlCommands = [] | |
| 73 for kind, obj in self.commands: | |
| 74 sqlCommands.append(comandMap[kind](obj)) | |
| 75 self.commands = [] | |
| 76 if sqlCommands: | |
| 77 self.syncing = 1 | |
| 78 d = self.reflector.dbpool.runInteraction(self._sync, self.latestInde
x, sqlCommands) | |
| 79 d.addCallback(self._syncDone) | |
| 80 return d | |
| 81 else: | |
| 82 return defer.succeed(1) | |
| 83 | |
| 84 def _sync(self, txn, index, commands): | |
| 85 """Do the actual database synchronization.""" | |
| 86 for c in commands: | |
| 87 txn.execute(c) | |
| 88 txn.update("UPDATE journalinfo SET commandIndex = %d" % index) | |
| 89 | |
| 90 def _syncDone(self, result): | |
| 91 self.syncing = 0 | |
| 92 return result | |
| 93 | |
| 94 def getLastSnapshot(self): | |
| 95 """Return command index of last snapshot.""" | |
| 96 conn = self.reflector.dbpool.connect() | |
| 97 cursor = conn.cursor() | |
| 98 cursor.execute("SELECT commandIndex FROM journalinfo") | |
| 99 return cursor.fetchall()[0][0] | |
| OLD | NEW |