| OLD | NEW |
| (Empty) |
| 1 package db | |
| 2 | |
| 3 import ( | |
| 4 "fmt" | |
| 5 "sync" | |
| 6 "time" | |
| 7 | |
| 8 "github.com/skia-dev/glog" | |
| 9 ) | |
| 10 | |
| 11 type TaskCache interface { | |
| 12 | |
| 13 // GetTask returns the task with the given ID, or an error if no such ta
sk exists. | |
| 14 GetTask(string) (*Task, error) | |
| 15 | |
| 16 // GetTaskForCommit retrieves the task with the given name which ran at
the | |
| 17 // given commit, or nil if no such task exists. | |
| 18 GetTaskForCommit(string, string, string) (*Task, error) | |
| 19 | |
| 20 // GetTasksForCommits retrieves all tasks which included[1] each of the | |
| 21 // given commits. Returns a map whose keys are commit hashes and values
are | |
| 22 // sub-maps whose keys are task spec names and values are tasks. | |
| 23 // | |
| 24 // 1) Blamelist calculation is outside the scope of the taskCache, but t
he | |
| 25 // implied assumption here is that there is at most one task for each | |
| 26 // task spec which has a given commit in its blamelist. The user is | |
| 27 // responsible for inserting tasks into the database so that this inv
ariant | |
| 28 // is maintained. Generally, a more recent task will "steal" commits
from an | |
| 29 // earlier task's blamelist, if the blamelists overlap. There are thr
ee | |
| 30 // cases to consider: | |
| 31 // 1. The newer task ran at a newer commit than the older task. It
s | |
| 32 // blamelist consists of all commits not covered by the previou
s task, | |
| 33 // and therefore does not overlap with the older task's blameli
st. | |
| 34 // 2. The newer task ran at the same commit as the older task. Its | |
| 35 // blamelist is the same as the previous task's blamelist, and | |
| 36 // therefore it "steals" all commits from the previous task, wh
ose | |
| 37 // blamelist becomes empty. | |
| 38 // 3. The newer task ran at a commit which was in the previous tas
k's | |
| 39 // blamelist. Its blamelist consists of the commits in the prev
ious | |
| 40 // task's blamelist which it also covered. Those commits move o
ut of | |
| 41 // the previous task's blamelist and into the newer task's blam
elist. | |
| 42 GetTasksForCommits(string, []string) (map[string]map[string]*Task, error
) | |
| 43 | |
| 44 // KnownTaskName returns true iff the given task name has been seen befo
re. | |
| 45 KnownTaskName(string, string) bool | |
| 46 | |
| 47 // UnfinishedTasks returns a list of tasks which were not finished at | |
| 48 // the time of the last cache update. | |
| 49 UnfinishedTasks() ([]*Task, error) | |
| 50 | |
| 51 // Update loads new tasks from the database. | |
| 52 Update() error | |
| 53 } | |
| 54 | |
| 55 type taskCache struct { | |
| 56 db DB | |
| 57 // map[repo_name][task_spec_name]bool | |
| 58 knownTaskNames map[string]map[string]bool | |
| 59 mtx sync.RWMutex | |
| 60 queryId string | |
| 61 tasks map[string]*Task | |
| 62 // map[repo_name][commit_hash][task_spec_name]*Task | |
| 63 tasksByCommit map[string]map[string]map[string]*Task | |
| 64 timePeriod time.Duration | |
| 65 unfinished map[string]*Task | |
| 66 } | |
| 67 | |
| 68 // See documentation for TaskCache interface. | |
| 69 func (c *taskCache) GetTask(id string) (*Task, error) { | |
| 70 c.mtx.RLock() | |
| 71 defer c.mtx.RUnlock() | |
| 72 | |
| 73 if t, ok := c.tasks[id]; ok { | |
| 74 return t.Copy(), nil | |
| 75 } | |
| 76 return nil, fmt.Errorf("No such task!") | |
| 77 } | |
| 78 | |
| 79 // See documentation for TaskCache interface. | |
| 80 func (c *taskCache) GetTasksForCommits(repo string, commits []string) (map[strin
g]map[string]*Task, error) { | |
| 81 c.mtx.RLock() | |
| 82 defer c.mtx.RUnlock() | |
| 83 | |
| 84 rv := make(map[string]map[string]*Task, len(commits)) | |
| 85 commitMap := c.tasksByCommit[repo] | |
| 86 for _, commit := range commits { | |
| 87 if tasks, ok := commitMap[commit]; ok { | |
| 88 rv[commit] = make(map[string]*Task, len(tasks)) | |
| 89 for k, v := range tasks { | |
| 90 rv[commit][k] = v.Copy() | |
| 91 } | |
| 92 } else { | |
| 93 rv[commit] = map[string]*Task{} | |
| 94 } | |
| 95 } | |
| 96 return rv, nil | |
| 97 } | |
| 98 | |
| 99 // See documentation for TaskCache interface. | |
| 100 func (c *taskCache) KnownTaskName(repo, name string) bool { | |
| 101 c.mtx.RLock() | |
| 102 defer c.mtx.RUnlock() | |
| 103 _, ok := c.knownTaskNames[repo][name] | |
| 104 return ok | |
| 105 } | |
| 106 | |
| 107 // See documentation for TaskCache interface. | |
| 108 func (c *taskCache) GetTaskForCommit(repo, commit, name string) (*Task, error) { | |
| 109 c.mtx.RLock() | |
| 110 defer c.mtx.RUnlock() | |
| 111 | |
| 112 commitMap, ok := c.tasksByCommit[repo] | |
| 113 if !ok { | |
| 114 return nil, nil | |
| 115 } | |
| 116 if tasks, ok := commitMap[commit]; ok { | |
| 117 if t, ok := tasks[name]; ok { | |
| 118 return t.Copy(), nil | |
| 119 } | |
| 120 } | |
| 121 return nil, nil | |
| 122 } | |
| 123 | |
| 124 // See documentation for TaskCache interface. | |
| 125 func (c *taskCache) UnfinishedTasks() ([]*Task, error) { | |
| 126 c.mtx.RLock() | |
| 127 defer c.mtx.RUnlock() | |
| 128 | |
| 129 rv := make([]*Task, 0, len(c.unfinished)) | |
| 130 for _, t := range c.unfinished { | |
| 131 rv = append(rv, t.Copy()) | |
| 132 } | |
| 133 return rv, nil | |
| 134 } | |
| 135 | |
| 136 // update inserts the new/updated tasks into the cache. Assumes the caller | |
| 137 // holds a lock. | |
| 138 func (c *taskCache) update(tasks []*Task) error { | |
| 139 for _, t := range tasks { | |
| 140 repo := t.Repo | |
| 141 commitMap, ok := c.tasksByCommit[repo] | |
| 142 if !ok { | |
| 143 commitMap = map[string]map[string]*Task{} | |
| 144 c.tasksByCommit[repo] = commitMap | |
| 145 } | |
| 146 | |
| 147 // If we already know about this task, the blamelist might, | |
| 148 // have changed, so we need to remove it from tasksByCommit | |
| 149 // and re-insert where needed. | |
| 150 if old, ok := c.tasks[t.Id]; ok { | |
| 151 for _, commit := range old.Commits { | |
| 152 delete(commitMap[commit], t.Name) | |
| 153 } | |
| 154 } | |
| 155 | |
| 156 // Insert the new task into the main map. | |
| 157 cpy := t.Copy() | |
| 158 c.tasks[t.Id] = cpy | |
| 159 | |
| 160 // Insert the task into tasksByCommits. | |
| 161 for _, commit := range t.Commits { | |
| 162 if _, ok := commitMap[commit]; !ok { | |
| 163 commitMap[commit] = map[string]*Task{} | |
| 164 } | |
| 165 commitMap[commit][t.Name] = cpy | |
| 166 } | |
| 167 | |
| 168 // Unfinished tasks. | |
| 169 if _, ok := c.unfinished[t.Id]; ok { | |
| 170 delete(c.unfinished, t.Id) | |
| 171 } | |
| 172 if !t.Done() { | |
| 173 c.unfinished[t.Id] = cpy | |
| 174 } | |
| 175 | |
| 176 // Known task names. | |
| 177 if nameMap, ok := c.knownTaskNames[repo]; ok { | |
| 178 nameMap[t.Name] = true | |
| 179 } else { | |
| 180 c.knownTaskNames[repo] = map[string]bool{t.Name: true} | |
| 181 } | |
| 182 } | |
| 183 return nil | |
| 184 } | |
| 185 | |
| 186 // reset re-initializes c. Assumes the caller holds a lock. | |
| 187 func (c *taskCache) reset() error { | |
| 188 c.db.StopTrackingModifiedTasks(c.queryId) | |
| 189 queryId, err := c.db.StartTrackingModifiedTasks() | |
| 190 if err != nil { | |
| 191 return err | |
| 192 } | |
| 193 now := time.Now() | |
| 194 start := now.Add(-c.timePeriod) | |
| 195 glog.Infof("Reading Tasks from %s to %s.", start, now) | |
| 196 tasks, err := c.db.GetTasksFromDateRange(start, now) | |
| 197 if err != nil { | |
| 198 c.db.StopTrackingModifiedTasks(queryId) | |
| 199 return err | |
| 200 } | |
| 201 c.knownTaskNames = map[string]map[string]bool{} | |
| 202 c.queryId = queryId | |
| 203 c.tasks = map[string]*Task{} | |
| 204 c.tasksByCommit = map[string]map[string]map[string]*Task{} | |
| 205 c.unfinished = map[string]*Task{} | |
| 206 if err := c.update(tasks); err != nil { | |
| 207 return err | |
| 208 } | |
| 209 return nil | |
| 210 } | |
| 211 | |
| 212 // See documentation for TaskCache interface. | |
| 213 func (c *taskCache) Update() error { | |
| 214 newTasks, err := c.db.GetModifiedTasks(c.queryId) | |
| 215 c.mtx.Lock() | |
| 216 defer c.mtx.Unlock() | |
| 217 if IsUnknownId(err) { | |
| 218 glog.Warningf("Connection to db lost; re-initializing cache from
scratch.") | |
| 219 if err := c.reset(); err != nil { | |
| 220 return err | |
| 221 } | |
| 222 return nil | |
| 223 } else if err != nil { | |
| 224 return err | |
| 225 } | |
| 226 if err := c.update(newTasks); err == nil { | |
| 227 return nil | |
| 228 } else { | |
| 229 return err | |
| 230 } | |
| 231 } | |
| 232 | |
| 233 // NewTaskCache returns a local cache which provides more convenient views of | |
| 234 // task data than the database can provide. | |
| 235 func NewTaskCache(db DB, timePeriod time.Duration) (TaskCache, error) { | |
| 236 tc := &taskCache{ | |
| 237 db: db, | |
| 238 timePeriod: timePeriod, | |
| 239 } | |
| 240 if err := tc.reset(); err != nil { | |
| 241 return nil, err | |
| 242 } | |
| 243 return tc, nil | |
| 244 } | |
| OLD | NEW |