OLD | NEW |
| (Empty) |
1 package db | |
2 | |
3 import ( | |
4 "errors" | |
5 "io" | |
6 "time" | |
7 | |
8 "github.com/skia-dev/glog" | |
9 ) | |
10 | |
11 const ( | |
12 // Maximum number of simultaneous GetModifiedTasks users. | |
13 MAX_MODIFIED_TASKS_USERS = 10 | |
14 | |
15 // Expiration for GetModifiedTasks users. | |
16 MODIFIED_TASKS_TIMEOUT = 10 * time.Minute | |
17 | |
18 // Retries attempted by UpdateWithRetries and UpdateTaskWithRetries. | |
19 NUM_RETRIES = 5 | |
20 ) | |
21 | |
22 var ( | |
23 ErrAlreadyExists = errors.New("Object already exists and modification
not allowed.") | |
24 ErrConcurrentUpdate = errors.New("Concurrent update") | |
25 ErrNotFound = errors.New("Task with given ID does not exist") | |
26 ErrTooManyUsers = errors.New("Too many users") | |
27 ErrUnknownId = errors.New("Unknown ID") | |
28 ) | |
29 | |
30 func IsAlreadyExists(e error) bool { | |
31 return e != nil && e.Error() == ErrAlreadyExists.Error() | |
32 } | |
33 | |
34 func IsConcurrentUpdate(e error) bool { | |
35 return e != nil && e.Error() == ErrConcurrentUpdate.Error() | |
36 } | |
37 | |
38 func IsNotFound(e error) bool { | |
39 return e != nil && e.Error() == ErrNotFound.Error() | |
40 } | |
41 | |
42 func IsTooManyUsers(e error) bool { | |
43 return e != nil && e.Error() == ErrTooManyUsers.Error() | |
44 } | |
45 | |
46 func IsUnknownId(e error) bool { | |
47 return e != nil && e.Error() == ErrUnknownId.Error() | |
48 } | |
49 | |
50 // TaskReader is a read-only view of a DB. | |
51 type TaskReader interface { | |
52 io.Closer | |
53 | |
54 // GetModifiedTasks returns all tasks modified since the last time | |
55 // GetModifiedTasks was run with the given id. | |
56 GetModifiedTasks(string) ([]*Task, error) | |
57 | |
58 // GetTaskById returns the task with the given Id field. Returns nil, ni
l if | |
59 // task is not found. | |
60 GetTaskById(string) (*Task, error) | |
61 | |
62 // GetTasksFromDateRange retrieves all tasks which started in the given
date range. | |
63 GetTasksFromDateRange(time.Time, time.Time) ([]*Task, error) | |
64 | |
65 // StartTrackingModifiedTasks initiates tracking of modified tasks for | |
66 // the current caller. Returns a unique ID which can be used by the call
er | |
67 // to retrieve tasks which have been modified since the last query. The
ID | |
68 // expires after a period of inactivity. | |
69 StartTrackingModifiedTasks() (string, error) | |
70 | |
71 // StopTrackingModifiedTasks cancels tracking of modified tasks for the | |
72 // provided ID. | |
73 StopTrackingModifiedTasks(string) | |
74 } | |
75 | |
76 // DB is used by the task scheduler to store Tasks. | |
77 type DB interface { | |
78 TaskReader | |
79 | |
80 // AssignId sets the given task's Id field. Does not insert the task int
o the | |
81 // database. | |
82 AssignId(*Task) error | |
83 | |
84 // PutTask inserts or updates the Task in the database. Task's Id field
must | |
85 // be empty or set with AssignId. PutTask will set Task.DbModified. | |
86 PutTask(*Task) error | |
87 | |
88 // PutTasks inserts or updates the Tasks in the database. Each Task's Id
field | |
89 // must be empty or set with AssignId. Each Task's DbModified field will
be | |
90 // set. | |
91 PutTasks([]*Task) error | |
92 } | |
93 | |
94 // UpdateWithRetries wraps a call to db.PutTasks with retries. It calls | |
95 // db.PutTasks(f()) repeatedly until one of the following happen: | |
96 // - f or db.PutTasks returns an error, which is then returned from | |
97 // UpdateWithRetries; | |
98 // - PutTasks succeeds, in which case UpdateWithRetries returns the updated | |
99 // Tasks returned by f; | |
100 // - retries are exhausted, in which case UpdateWithRetries returns | |
101 // ErrConcurrentUpdate. | |
102 // | |
103 // Within f, tasks should be refreshed from the DB, e.g. with | |
104 // db.GetModifiedTasks or db.GetTaskById. | |
105 func UpdateWithRetries(db DB, f func() ([]*Task, error)) ([]*Task, error) { | |
106 var lastErr error | |
107 for i := 0; i < NUM_RETRIES; i++ { | |
108 t, err := f() | |
109 if err != nil { | |
110 return nil, err | |
111 } | |
112 lastErr = db.PutTasks(t) | |
113 if lastErr == nil { | |
114 return t, nil | |
115 } else if !IsConcurrentUpdate(lastErr) { | |
116 return nil, lastErr | |
117 } | |
118 } | |
119 glog.Warningf("UpdateWithRetries: %d consecutive ErrConcurrentUpdate.",
NUM_RETRIES) | |
120 return nil, lastErr | |
121 } | |
122 | |
123 // UpdateTaskWithRetries reads, updates, and writes a single Task in the DB. It: | |
124 // 1. reads the task with the given id, | |
125 // 2. calls f on that task, and | |
126 // 3. calls db.PutTask() on the updated task | |
127 // 4. repeats from step 1 as long as PutTasks returns ErrConcurrentUpdate and | |
128 // retries have not been exhausted. | |
129 // Returns the updated task if it was successfully updated in the DB. | |
130 // Immediately returns ErrNotFound if db.GetTaskById(id) returns nil. | |
131 // Immediately returns any error returned from f or from PutTasks (except | |
132 // ErrConcurrentUpdate). Returns ErrConcurrentUpdate if retries are exhausted. | |
133 func UpdateTaskWithRetries(db DB, id string, f func(*Task) error) (*Task, error)
{ | |
134 tasks, err := UpdateWithRetries(db, func() ([]*Task, error) { | |
135 t, err := db.GetTaskById(id) | |
136 if err != nil { | |
137 return nil, err | |
138 } | |
139 if t == nil { | |
140 return nil, ErrNotFound | |
141 } | |
142 err = f(t) | |
143 if err != nil { | |
144 return nil, err | |
145 } | |
146 return []*Task{t}, nil | |
147 }) | |
148 if err != nil { | |
149 return nil, err | |
150 } else { | |
151 return tasks[0], nil | |
152 } | |
153 } | |
154 | |
155 // RemoteDB allows retrieving tasks and full access to comments. | |
156 type RemoteDB interface { | |
157 TaskReader | |
158 CommentDB | |
159 } | |
OLD | NEW |