Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(621)

Side by Side Diff: build_scheduler/go/db/cache.go

Issue 2296763008: [task scheduler] Move files from build_scheduler/ to task_scheduler/ (Closed) Base URL: https://skia.googlesource.com/buildbot@master
Patch Set: Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | build_scheduler/go/db/cache_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 package db
2
3 import (
4 "fmt"
5 "sync"
6 "time"
7
8 "github.com/skia-dev/glog"
9 )
10
11 type TaskCache interface {
12
13 // GetTask returns the task with the given ID, or an error if no such ta sk exists.
14 GetTask(string) (*Task, error)
15
16 // GetTaskForCommit retrieves the task with the given name which ran at the
17 // given commit, or nil if no such task exists.
18 GetTaskForCommit(string, string, string) (*Task, error)
19
20 // GetTasksForCommits retrieves all tasks which included[1] each of the
21 // given commits. Returns a map whose keys are commit hashes and values are
22 // sub-maps whose keys are task spec names and values are tasks.
23 //
24 // 1) Blamelist calculation is outside the scope of the taskCache, but t he
25 // implied assumption here is that there is at most one task for each
26 // task spec which has a given commit in its blamelist. The user is
27 // responsible for inserting tasks into the database so that this inv ariant
28 // is maintained. Generally, a more recent task will "steal" commits from an
29 // earlier task's blamelist, if the blamelists overlap. There are thr ee
30 // cases to consider:
31 // 1. The newer task ran at a newer commit than the older task. It s
32 // blamelist consists of all commits not covered by the previou s task,
33 // and therefore does not overlap with the older task's blameli st.
34 // 2. The newer task ran at the same commit as the older task. Its
35 // blamelist is the same as the previous task's blamelist, and
36 // therefore it "steals" all commits from the previous task, wh ose
37 // blamelist becomes empty.
38 // 3. The newer task ran at a commit which was in the previous tas k's
39 // blamelist. Its blamelist consists of the commits in the prev ious
40 // task's blamelist which it also covered. Those commits move o ut of
41 // the previous task's blamelist and into the newer task's blam elist.
42 GetTasksForCommits(string, []string) (map[string]map[string]*Task, error )
43
44 // KnownTaskName returns true iff the given task name has been seen befo re.
45 KnownTaskName(string, string) bool
46
47 // UnfinishedTasks returns a list of tasks which were not finished at
48 // the time of the last cache update.
49 UnfinishedTasks() ([]*Task, error)
50
51 // Update loads new tasks from the database.
52 Update() error
53 }
54
55 type taskCache struct {
56 db DB
57 // map[repo_name][task_spec_name]bool
58 knownTaskNames map[string]map[string]bool
59 mtx sync.RWMutex
60 queryId string
61 tasks map[string]*Task
62 // map[repo_name][commit_hash][task_spec_name]*Task
63 tasksByCommit map[string]map[string]map[string]*Task
64 timePeriod time.Duration
65 unfinished map[string]*Task
66 }
67
68 // See documentation for TaskCache interface.
69 func (c *taskCache) GetTask(id string) (*Task, error) {
70 c.mtx.RLock()
71 defer c.mtx.RUnlock()
72
73 if t, ok := c.tasks[id]; ok {
74 return t.Copy(), nil
75 }
76 return nil, fmt.Errorf("No such task!")
77 }
78
79 // See documentation for TaskCache interface.
80 func (c *taskCache) GetTasksForCommits(repo string, commits []string) (map[strin g]map[string]*Task, error) {
81 c.mtx.RLock()
82 defer c.mtx.RUnlock()
83
84 rv := make(map[string]map[string]*Task, len(commits))
85 commitMap := c.tasksByCommit[repo]
86 for _, commit := range commits {
87 if tasks, ok := commitMap[commit]; ok {
88 rv[commit] = make(map[string]*Task, len(tasks))
89 for k, v := range tasks {
90 rv[commit][k] = v.Copy()
91 }
92 } else {
93 rv[commit] = map[string]*Task{}
94 }
95 }
96 return rv, nil
97 }
98
99 // See documentation for TaskCache interface.
100 func (c *taskCache) KnownTaskName(repo, name string) bool {
101 c.mtx.RLock()
102 defer c.mtx.RUnlock()
103 _, ok := c.knownTaskNames[repo][name]
104 return ok
105 }
106
107 // See documentation for TaskCache interface.
108 func (c *taskCache) GetTaskForCommit(repo, commit, name string) (*Task, error) {
109 c.mtx.RLock()
110 defer c.mtx.RUnlock()
111
112 commitMap, ok := c.tasksByCommit[repo]
113 if !ok {
114 return nil, nil
115 }
116 if tasks, ok := commitMap[commit]; ok {
117 if t, ok := tasks[name]; ok {
118 return t.Copy(), nil
119 }
120 }
121 return nil, nil
122 }
123
124 // See documentation for TaskCache interface.
125 func (c *taskCache) UnfinishedTasks() ([]*Task, error) {
126 c.mtx.RLock()
127 defer c.mtx.RUnlock()
128
129 rv := make([]*Task, 0, len(c.unfinished))
130 for _, t := range c.unfinished {
131 rv = append(rv, t.Copy())
132 }
133 return rv, nil
134 }
135
136 // update inserts the new/updated tasks into the cache. Assumes the caller
137 // holds a lock.
138 func (c *taskCache) update(tasks []*Task) error {
139 for _, t := range tasks {
140 repo := t.Repo
141 commitMap, ok := c.tasksByCommit[repo]
142 if !ok {
143 commitMap = map[string]map[string]*Task{}
144 c.tasksByCommit[repo] = commitMap
145 }
146
147 // If we already know about this task, the blamelist might,
148 // have changed, so we need to remove it from tasksByCommit
149 // and re-insert where needed.
150 if old, ok := c.tasks[t.Id]; ok {
151 for _, commit := range old.Commits {
152 delete(commitMap[commit], t.Name)
153 }
154 }
155
156 // Insert the new task into the main map.
157 cpy := t.Copy()
158 c.tasks[t.Id] = cpy
159
160 // Insert the task into tasksByCommits.
161 for _, commit := range t.Commits {
162 if _, ok := commitMap[commit]; !ok {
163 commitMap[commit] = map[string]*Task{}
164 }
165 commitMap[commit][t.Name] = cpy
166 }
167
168 // Unfinished tasks.
169 if _, ok := c.unfinished[t.Id]; ok {
170 delete(c.unfinished, t.Id)
171 }
172 if !t.Done() {
173 c.unfinished[t.Id] = cpy
174 }
175
176 // Known task names.
177 if nameMap, ok := c.knownTaskNames[repo]; ok {
178 nameMap[t.Name] = true
179 } else {
180 c.knownTaskNames[repo] = map[string]bool{t.Name: true}
181 }
182 }
183 return nil
184 }
185
186 // reset re-initializes c. Assumes the caller holds a lock.
187 func (c *taskCache) reset() error {
188 c.db.StopTrackingModifiedTasks(c.queryId)
189 queryId, err := c.db.StartTrackingModifiedTasks()
190 if err != nil {
191 return err
192 }
193 now := time.Now()
194 start := now.Add(-c.timePeriod)
195 glog.Infof("Reading Tasks from %s to %s.", start, now)
196 tasks, err := c.db.GetTasksFromDateRange(start, now)
197 if err != nil {
198 c.db.StopTrackingModifiedTasks(queryId)
199 return err
200 }
201 c.knownTaskNames = map[string]map[string]bool{}
202 c.queryId = queryId
203 c.tasks = map[string]*Task{}
204 c.tasksByCommit = map[string]map[string]map[string]*Task{}
205 c.unfinished = map[string]*Task{}
206 if err := c.update(tasks); err != nil {
207 return err
208 }
209 return nil
210 }
211
212 // See documentation for TaskCache interface.
213 func (c *taskCache) Update() error {
214 newTasks, err := c.db.GetModifiedTasks(c.queryId)
215 c.mtx.Lock()
216 defer c.mtx.Unlock()
217 if IsUnknownId(err) {
218 glog.Warningf("Connection to db lost; re-initializing cache from scratch.")
219 if err := c.reset(); err != nil {
220 return err
221 }
222 return nil
223 } else if err != nil {
224 return err
225 }
226 if err := c.update(newTasks); err == nil {
227 return nil
228 } else {
229 return err
230 }
231 }
232
233 // NewTaskCache returns a local cache which provides more convenient views of
234 // task data than the database can provide.
235 func NewTaskCache(db DB, timePeriod time.Duration) (TaskCache, error) {
236 tc := &taskCache{
237 db: db,
238 timePeriod: timePeriod,
239 }
240 if err := tc.reset(); err != nil {
241 return nil, err
242 }
243 return tc, nil
244 }
OLDNEW
« no previous file with comments | « no previous file | build_scheduler/go/db/cache_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698