OLD | NEW |
| (Empty) |
1 package db | |
2 | |
3 import ( | |
4 "fmt" | |
5 "sync" | |
6 "time" | |
7 | |
8 "github.com/skia-dev/glog" | |
9 ) | |
10 | |
11 type TaskCache interface { | |
12 | |
13 // GetTask returns the task with the given ID, or an error if no such ta
sk exists. | |
14 GetTask(string) (*Task, error) | |
15 | |
16 // GetTaskForCommit retrieves the task with the given name which ran at
the | |
17 // given commit, or nil if no such task exists. | |
18 GetTaskForCommit(string, string, string) (*Task, error) | |
19 | |
20 // GetTasksForCommits retrieves all tasks which included[1] each of the | |
21 // given commits. Returns a map whose keys are commit hashes and values
are | |
22 // sub-maps whose keys are task spec names and values are tasks. | |
23 // | |
24 // 1) Blamelist calculation is outside the scope of the taskCache, but t
he | |
25 // implied assumption here is that there is at most one task for each | |
26 // task spec which has a given commit in its blamelist. The user is | |
27 // responsible for inserting tasks into the database so that this inv
ariant | |
28 // is maintained. Generally, a more recent task will "steal" commits
from an | |
29 // earlier task's blamelist, if the blamelists overlap. There are thr
ee | |
30 // cases to consider: | |
31 // 1. The newer task ran at a newer commit than the older task. It
s | |
32 // blamelist consists of all commits not covered by the previou
s task, | |
33 // and therefore does not overlap with the older task's blameli
st. | |
34 // 2. The newer task ran at the same commit as the older task. Its | |
35 // blamelist is the same as the previous task's blamelist, and | |
36 // therefore it "steals" all commits from the previous task, wh
ose | |
37 // blamelist becomes empty. | |
38 // 3. The newer task ran at a commit which was in the previous tas
k's | |
39 // blamelist. Its blamelist consists of the commits in the prev
ious | |
40 // task's blamelist which it also covered. Those commits move o
ut of | |
41 // the previous task's blamelist and into the newer task's blam
elist. | |
42 GetTasksForCommits(string, []string) (map[string]map[string]*Task, error
) | |
43 | |
44 // KnownTaskName returns true iff the given task name has been seen befo
re. | |
45 KnownTaskName(string, string) bool | |
46 | |
47 // UnfinishedTasks returns a list of tasks which were not finished at | |
48 // the time of the last cache update. | |
49 UnfinishedTasks() ([]*Task, error) | |
50 | |
51 // Update loads new tasks from the database. | |
52 Update() error | |
53 } | |
54 | |
55 type taskCache struct { | |
56 db DB | |
57 // map[repo_name][task_spec_name]bool | |
58 knownTaskNames map[string]map[string]bool | |
59 mtx sync.RWMutex | |
60 queryId string | |
61 tasks map[string]*Task | |
62 // map[repo_name][commit_hash][task_spec_name]*Task | |
63 tasksByCommit map[string]map[string]map[string]*Task | |
64 timePeriod time.Duration | |
65 unfinished map[string]*Task | |
66 } | |
67 | |
68 // See documentation for TaskCache interface. | |
69 func (c *taskCache) GetTask(id string) (*Task, error) { | |
70 c.mtx.RLock() | |
71 defer c.mtx.RUnlock() | |
72 | |
73 if t, ok := c.tasks[id]; ok { | |
74 return t.Copy(), nil | |
75 } | |
76 return nil, fmt.Errorf("No such task!") | |
77 } | |
78 | |
79 // See documentation for TaskCache interface. | |
80 func (c *taskCache) GetTasksForCommits(repo string, commits []string) (map[strin
g]map[string]*Task, error) { | |
81 c.mtx.RLock() | |
82 defer c.mtx.RUnlock() | |
83 | |
84 rv := make(map[string]map[string]*Task, len(commits)) | |
85 commitMap := c.tasksByCommit[repo] | |
86 for _, commit := range commits { | |
87 if tasks, ok := commitMap[commit]; ok { | |
88 rv[commit] = make(map[string]*Task, len(tasks)) | |
89 for k, v := range tasks { | |
90 rv[commit][k] = v.Copy() | |
91 } | |
92 } else { | |
93 rv[commit] = map[string]*Task{} | |
94 } | |
95 } | |
96 return rv, nil | |
97 } | |
98 | |
99 // See documentation for TaskCache interface. | |
100 func (c *taskCache) KnownTaskName(repo, name string) bool { | |
101 c.mtx.RLock() | |
102 defer c.mtx.RUnlock() | |
103 _, ok := c.knownTaskNames[repo][name] | |
104 return ok | |
105 } | |
106 | |
107 // See documentation for TaskCache interface. | |
108 func (c *taskCache) GetTaskForCommit(repo, commit, name string) (*Task, error) { | |
109 c.mtx.RLock() | |
110 defer c.mtx.RUnlock() | |
111 | |
112 commitMap, ok := c.tasksByCommit[repo] | |
113 if !ok { | |
114 return nil, nil | |
115 } | |
116 if tasks, ok := commitMap[commit]; ok { | |
117 if t, ok := tasks[name]; ok { | |
118 return t.Copy(), nil | |
119 } | |
120 } | |
121 return nil, nil | |
122 } | |
123 | |
124 // See documentation for TaskCache interface. | |
125 func (c *taskCache) UnfinishedTasks() ([]*Task, error) { | |
126 c.mtx.RLock() | |
127 defer c.mtx.RUnlock() | |
128 | |
129 rv := make([]*Task, 0, len(c.unfinished)) | |
130 for _, t := range c.unfinished { | |
131 rv = append(rv, t.Copy()) | |
132 } | |
133 return rv, nil | |
134 } | |
135 | |
136 // update inserts the new/updated tasks into the cache. Assumes the caller | |
137 // holds a lock. | |
138 func (c *taskCache) update(tasks []*Task) error { | |
139 for _, t := range tasks { | |
140 repo := t.Repo | |
141 commitMap, ok := c.tasksByCommit[repo] | |
142 if !ok { | |
143 commitMap = map[string]map[string]*Task{} | |
144 c.tasksByCommit[repo] = commitMap | |
145 } | |
146 | |
147 // If we already know about this task, the blamelist might, | |
148 // have changed, so we need to remove it from tasksByCommit | |
149 // and re-insert where needed. | |
150 if old, ok := c.tasks[t.Id]; ok { | |
151 for _, commit := range old.Commits { | |
152 delete(commitMap[commit], t.Name) | |
153 } | |
154 } | |
155 | |
156 // Insert the new task into the main map. | |
157 cpy := t.Copy() | |
158 c.tasks[t.Id] = cpy | |
159 | |
160 // Insert the task into tasksByCommits. | |
161 for _, commit := range t.Commits { | |
162 if _, ok := commitMap[commit]; !ok { | |
163 commitMap[commit] = map[string]*Task{} | |
164 } | |
165 commitMap[commit][t.Name] = cpy | |
166 } | |
167 | |
168 // Unfinished tasks. | |
169 if _, ok := c.unfinished[t.Id]; ok { | |
170 delete(c.unfinished, t.Id) | |
171 } | |
172 if !t.Done() { | |
173 c.unfinished[t.Id] = cpy | |
174 } | |
175 | |
176 // Known task names. | |
177 if nameMap, ok := c.knownTaskNames[repo]; ok { | |
178 nameMap[t.Name] = true | |
179 } else { | |
180 c.knownTaskNames[repo] = map[string]bool{t.Name: true} | |
181 } | |
182 } | |
183 return nil | |
184 } | |
185 | |
186 // reset re-initializes c. Assumes the caller holds a lock. | |
187 func (c *taskCache) reset() error { | |
188 c.db.StopTrackingModifiedTasks(c.queryId) | |
189 queryId, err := c.db.StartTrackingModifiedTasks() | |
190 if err != nil { | |
191 return err | |
192 } | |
193 now := time.Now() | |
194 start := now.Add(-c.timePeriod) | |
195 glog.Infof("Reading Tasks from %s to %s.", start, now) | |
196 tasks, err := c.db.GetTasksFromDateRange(start, now) | |
197 if err != nil { | |
198 c.db.StopTrackingModifiedTasks(queryId) | |
199 return err | |
200 } | |
201 c.knownTaskNames = map[string]map[string]bool{} | |
202 c.queryId = queryId | |
203 c.tasks = map[string]*Task{} | |
204 c.tasksByCommit = map[string]map[string]map[string]*Task{} | |
205 c.unfinished = map[string]*Task{} | |
206 if err := c.update(tasks); err != nil { | |
207 return err | |
208 } | |
209 return nil | |
210 } | |
211 | |
212 // See documentation for TaskCache interface. | |
213 func (c *taskCache) Update() error { | |
214 newTasks, err := c.db.GetModifiedTasks(c.queryId) | |
215 c.mtx.Lock() | |
216 defer c.mtx.Unlock() | |
217 if IsUnknownId(err) { | |
218 glog.Warningf("Connection to db lost; re-initializing cache from
scratch.") | |
219 if err := c.reset(); err != nil { | |
220 return err | |
221 } | |
222 return nil | |
223 } else if err != nil { | |
224 return err | |
225 } | |
226 if err := c.update(newTasks); err == nil { | |
227 return nil | |
228 } else { | |
229 return err | |
230 } | |
231 } | |
232 | |
233 // NewTaskCache returns a local cache which provides more convenient views of | |
234 // task data than the database can provide. | |
235 func NewTaskCache(db DB, timePeriod time.Duration) (TaskCache, error) { | |
236 tc := &taskCache{ | |
237 db: db, | |
238 timePeriod: timePeriod, | |
239 } | |
240 if err := tc.reset(); err != nil { | |
241 return nil, err | |
242 } | |
243 return tc, nil | |
244 } | |
OLD | NEW |