Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(41)

Side by Side Diff: build_scheduler/go/db/local_db/busywork/main.go

Issue 2296763008: [task scheduler] Move files from build_scheduler/ to task_scheduler/ (Closed) Base URL: https://skia.googlesource.com/buildbot@master
Patch Set: Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « build_scheduler/go/db/db.go ('k') | build_scheduler/go/db/local_db/local_db.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // busywork is an end-to-end test for local_db. It performs inserts and updates
2 // roughly mimicking what we might expect from build_scheduler. It also tracks
3 // performance for various operations.
4 package main
5
6 import (
7 "container/heap"
8 "flag"
9 "fmt"
10 "math"
11 "math/rand"
12 "path"
13 "sort"
14 "strconv"
15 "sync"
16 "time"
17
18 "github.com/skia-dev/glog"
19
20 "go.skia.org/infra/build_scheduler/go/db"
21 "go.skia.org/infra/build_scheduler/go/db/local_db"
22 "go.skia.org/infra/go/buildbot"
23 "go.skia.org/infra/go/common"
24 "go.skia.org/infra/go/influxdb"
25 )
26
27 var (
28 // Flags.
29 local = flag.Bool("local", true, "Whether we're running on a dev machi ne vs in production.")
30 workdir = flag.String("workdir", "workdir", "Working directory to use.")
31
32 influxHost = flag.String("influxdb_host", influxdb.DEFAULT_HOST, "Th e InfluxDB hostname.")
33 influxUser = flag.String("influxdb_name", influxdb.DEFAULT_USER, "Th e InfluxDB username.")
34 influxPassword = flag.String("influxdb_password", influxdb.DEFAULT_PASSW ORD, "The InfluxDB password.")
35 influxDatabase = flag.String("influxdb_database", influxdb.DEFAULT_DATAB ASE, "The InfluxDB database.")
36
37 // Counters.
38 inserts = 0
39 insertDur = time.Duration(0)
40 mInserts = sync.RWMutex{}
41 insertAndUpdates = 0
42 insertAndUpdateDur = time.Duration(0)
43 mInsertAndUpdates = sync.RWMutex{}
44 updates = 0
45 updateDur = time.Duration(0)
46 mUpdates = sync.RWMutex{}
47 reads = 0
48 readDur = time.Duration(0)
49 mReads = sync.RWMutex{}
50
51 // epoch is a time before local_db was written.
52 epoch = time.Date(2016, 8, 1, 0, 0, 0, 0, time.UTC)
53 )
54
55 const (
56 // Parameters for creating random tasks.
57 kNumTaskNames = 50
58 kNumRepos = 3
59 kRecentCommitRange = 30
60 kMedianBlamelistLength = 2
61
62 // Parameters for randomly updating tasks.
63 kMedianPendingDuration = 10 * time.Second
64 kMedianRunningDuration = 10 * time.Minute
65 )
66
67 // itoh converts an integer to a commit hash. Task.Revision is always set to
68 // the result of itoh.
69 func itoh(i int) string {
70 return strconv.Itoa(i)
71 }
72
73 // htoi converts a commit hash to an integer. A commit's parent is
74 // itoh(htoi(hash)-1).
75 func htoi(h string) int {
76 i, err := strconv.Atoi(h)
77 if err != nil {
78 glog.Fatal(err)
79 }
80 return i
81 }
82
83 // makeTask generates task with random Name, Repo, and Revision. Revision will
84 // be picked randomly from a range starting at recentCommitsBegin.
85 func makeTask(recentCommitsBegin int) *db.Task {
86 return &db.Task{
87 Name: fmt.Sprintf("Task-%d", rand.Intn(kNumTaskNames)),
88 Repo: fmt.Sprintf("Repo-%d", rand.Intn(kNumRepos)),
89 Revision: itoh(recentCommitsBegin + rand.Intn(kRecentCommitRange )),
90 }
91 }
92
93 // updateBlamelists sets t's Commits based on t.Revision and previously-inserted
94 // tasks' Commits and returns t. If another task's Commits needs to change, also
95 // returns that task with its updated Commits.
96 func updateBlamelists(cache db.TaskCache, t *db.Task) ([]*db.Task, error) {
97 if !cache.KnownTaskName(t.Repo, t.Name) {
98 t.Commits = []string{t.Revision}
99 return []*db.Task{t}, nil
100 }
101 stealFrom, err := cache.GetTaskForCommit(t.Repo, t.Revision, t.Name)
102 if err != nil {
103 return nil, fmt.Errorf("Could not find task %q for commit %q: %s ", t.Name, t.Revision, err)
104 }
105
106 lastCommit := htoi(t.Revision)
107 firstCommit := lastCommit
108 // Work backwards until prev changes.
109 for i := lastCommit - 1; i > 0; i-- {
110 if lastCommit-firstCommit+1 > buildbot.MAX_BLAMELIST_COMMITS && stealFrom == nil {
111 t.Commits = []string{t.Revision}
112 return []*db.Task{t}, nil
113 }
114 hash := itoh(i)
115 prev, err := cache.GetTaskForCommit(t.Repo, hash, t.Name)
116 if err != nil {
117 return nil, fmt.Errorf("Could not find task %q for commi t %q: %s", t.Name, hash, err)
118 }
119 if stealFrom != prev {
120 break
121 }
122 firstCommit = i
123 }
124
125 t.Commits = make([]string, lastCommit-firstCommit+1)
126 for i := 0; i <= lastCommit-firstCommit; i++ {
127 t.Commits[i] = itoh(i + firstCommit)
128 }
129 sort.Strings(t.Commits)
130
131 if stealFrom != nil {
132 newCommits := make([]string, 0, len(stealFrom.Commits)-len(t.Com mits))
133 for _, h := range stealFrom.Commits {
134 idx := sort.SearchStrings(t.Commits, h)
135 if idx == len(t.Commits) || t.Commits[idx] != h {
136 newCommits = append(newCommits, h)
137 }
138 }
139 stealFrom.Commits = newCommits
140 return []*db.Task{t, stealFrom}, nil
141 } else {
142 return []*db.Task{t}, nil
143 }
144 }
145
146 // findApproxLatestCommit scans the DB backwards and returns the commit # of the
147 // last-created task.
148 func findApproxLatestCommit(d db.DB) int {
149 glog.Infof("findApproxLatestCommit begin")
150 for t := time.Now(); t.After(epoch); t = t.Add(-24 * time.Hour) {
151 begin := t.Add(-24 * time.Hour)
152 glog.Infof("findApproxLatestCommit loading %s to %s", begin, t)
153 before := time.Now()
154 t, err := d.GetTasksFromDateRange(begin, t)
155 getTasksDur := time.Now().Sub(before)
156 if err != nil {
157 glog.Fatal(err)
158 }
159 mReads.Lock()
160 if len(t) > 0 {
161 reads += len(t)
162 } else {
163 reads++
164 }
165 readDur += getTasksDur
166 mReads.Unlock()
167 if len(t) > 0 {
168 // Return revision of last task.
169 lastTask := t[len(t)-1]
170 i := htoi(lastTask.Revision)
171 glog.Infof("findApproxLatestCommit returning %d from %s" , i, lastTask.Id)
172 return i
173 }
174
175 }
176 glog.Infof("findApproxLatestCommit found empty DB")
177 return 0
178 }
179
180 // putTasks inserts randomly-generated tasks into the DB. Does not return.
181 func putTasks(d db.DB) {
182 glog.Infof("putTasks begin")
183 cache, err := db.NewTaskCache(d, 4*24*time.Hour)
184 if err != nil {
185 glog.Fatal(err)
186 }
187 // If we're restarting, try to pick up where we left off.
188 currentCommit := findApproxLatestCommit(d)
189 meanTasksPerCommit := float64(kNumTaskNames * kNumRepos / kMedianBlameli stLength)
190 maxTasksPerIter := float64(kNumTaskNames * kNumRepos * kRecentCommitRang e)
191 for {
192 iterTasks := int(math.Max(0, math.Min(maxTasksPerIter, (rand.Nor mFloat64()+1)*meanTasksPerCommit)))
193 glog.Infof("Adding %d tasks with revisions %s - %s", iterTasks, itoh(currentCommit), itoh(currentCommit+kRecentCommitRange))
194 for i := 0; i < iterTasks; i++ {
195 t := makeTask(currentCommit)
196 putTasksDur := time.Duration(0)
197 before := time.Now()
198 updatedTasks, err := db.UpdateWithRetries(d, func() ([]* db.Task, error) {
199 putTasksDur += time.Now().Sub(before)
200 t := t.Copy()
201 if err := cache.Update(); err != nil {
202 glog.Fatal(err)
203 }
204 tasksToUpdate, err := updateBlamelists(cache, t)
205 if err != nil {
206 glog.Fatal(err)
207 }
208 before = time.Now()
209 if err := d.AssignId(t); err != nil {
210 glog.Fatal(err)
211 }
212 putTasksDur += time.Now().Sub(before)
213 t.Created = time.Now()
214 t.SwarmingTaskId = fmt.Sprintf("%x", rand.Int31( ))
215 before = time.Now()
216 return tasksToUpdate, nil
217 })
218 putTasksDur += time.Now().Sub(before)
219 if err != nil {
220 glog.Fatal(err)
221 }
222 if len(updatedTasks) > 1 {
223 mInsertAndUpdates.Lock()
224 if err == nil {
225 insertAndUpdates += len(updatedTasks)
226 }
227 insertAndUpdateDur += putTasksDur
228 mInsertAndUpdates.Unlock()
229 } else {
230 mInserts.Lock()
231 if err == nil {
232 inserts++
233 }
234 insertDur += putTasksDur
235 mInserts.Unlock()
236 }
237 }
238 currentCommit++
239 }
240 }
241
242 // updateEntry is an item in updateEntryHeap.
243 type updateEntry struct {
244 task *db.Task
245 // updateTime is the key for updateEntryHeap.
246 updateTime time.Time
247 // heapIndex is the index of this updateEntry in updateEntryHeap. It is kept
248 // up-to-date by updateEntryHeap methods.
249 heapIndex int
250 }
251
252 // updateEntryHeap implements a queue of updateEntry's ordered by updateTime. It
253 // implements heap.Interface.
254 type updateEntryHeap []*updateEntry
255
256 func (h updateEntryHeap) Len() int { return len(h) }
257 func (h updateEntryHeap) Less(i, j int) bool { return h[i].updateTime.Before(h[j ].updateTime) }
258 func (h updateEntryHeap) Swap(i, j int) {
259 h[i], h[j] = h[j], h[i]
260 h[i].heapIndex = i
261 h[j].heapIndex = j
262 }
263
264 func (h *updateEntryHeap) Push(x interface{}) {
265 item := x.(*updateEntry)
266 item.heapIndex = len(*h)
267 *h = append(*h, item)
268 }
269
270 func (h *updateEntryHeap) Pop() interface{} {
271 old := *h
272 n := len(old)
273 x := old[n-1]
274 *h = old[0 : n-1]
275 x.heapIndex = -1
276 return x
277 }
278
279 // updateTasks makes random updates to pending and running tasks in the DB. Does
280 // not return.
281 func updateTasks(d db.DB) {
282 glog.Infof("updateTasks begin")
283 updateQueue := updateEntryHeap{}
284 idMap := map[string]*updateEntry{}
285
286 freshenQueue := func(task *db.Task) {
287 entry := idMap[task.Id]
288 // Currently only updating pending and running tasks.
289 if task.Status == db.TASK_STATUS_PENDING || task.Status == db.TA SK_STATUS_RUNNING {
290 meanUpdateDelay := kMedianPendingDuration
291 if task.Status == db.TASK_STATUS_RUNNING {
292 meanUpdateDelay = kMedianRunningDuration
293 }
294 updateDelayNanos := int64(math.Max(0, (rand.NormFloat64( )+1)*float64(meanUpdateDelay)))
295 updateTime := time.Now().Add(time.Duration(updateDelayNa nos) * time.Nanosecond)
296 if entry == nil {
297 entry = &updateEntry{
298 task: task,
299 updateTime: updateTime,
300 heapIndex: -1,
301 }
302 heap.Push(&updateQueue, entry)
303 } else {
304 entry.task = task
305 entry.updateTime = updateTime
306 heap.Fix(&updateQueue, entry.heapIndex)
307 }
308 if entry.heapIndex < 0 {
309 glog.Fatalf("you lose %#v %#v", entry, updateQue ue)
310 }
311 idMap[task.Id] = entry
312 } else if entry != nil {
313 heap.Remove(&updateQueue, entry.heapIndex)
314 delete(idMap, task.Id)
315 }
316 }
317
318 token, err := d.StartTrackingModifiedTasks()
319 if err != nil {
320 glog.Fatal(err)
321 }
322 // Initial read to find pending and running tasks.
323 for t := time.Now(); t.After(epoch); t = t.Add(-24 * time.Hour) {
324 begin := t.Add(-24 * time.Hour)
325 glog.Infof("updateTasks loading %s to %s", begin, t)
326 before := time.Now()
327 t, err := d.GetTasksFromDateRange(begin, t)
328 getTasksDur := time.Now().Sub(before)
329 if err != nil {
330 glog.Fatal(err)
331 }
332 mReads.Lock()
333 if len(t) > 0 {
334 reads += len(t)
335 } else {
336 reads++
337 }
338 readDur += getTasksDur
339 mReads.Unlock()
340 for _, task := range t {
341 freshenQueue(task)
342 }
343 }
344 glog.Infof("updateTasks finished loading; %d pending and running", len(i dMap))
345 // Rate limit so we're not constantly taking locks for GetModifiedTasks.
346 for _ = range time.Tick(time.Millisecond) {
347 now := time.Now()
348 t, err := d.GetModifiedTasks(token)
349 if err != nil {
350 glog.Fatal(err)
351 }
352 for _, task := range t {
353 freshenQueue(task)
354 }
355 glog.Infof("updateTasks performing updates; %d tasks on queue", len(updateQueue))
356 for len(updateQueue) > 0 && updateQueue[0].updateTime.Before(now ) {
357 if time.Now().Sub(now) >= db.MODIFIED_TASKS_TIMEOUT-5*ti me.Second {
358 break
359 }
360 entry := heap.Pop(&updateQueue).(*updateEntry)
361 task := entry.task
362 delete(idMap, task.Id)
363 putTasksDur := time.Duration(0)
364 before := time.Now()
365 _, err := db.UpdateTaskWithRetries(d, task.Id, func(task *db.Task) error {
366 putTasksDur += time.Now().Sub(before)
367 switch task.Status {
368 case db.TASK_STATUS_PENDING:
369 task.Started = now
370 isMishap := rand.Intn(100) == 0
371 if isMishap {
372 task.Status = db.TASK_STATUS_MIS HAP
373 task.Finished = now
374 } else {
375 task.Status = db.TASK_STATUS_RUN NING
376 }
377 case db.TASK_STATUS_RUNNING:
378 task.Finished = now
379 statusRand := rand.Intn(25)
380 isMishap := statusRand == 0
381 isFailure := statusRand < 5
382 if isMishap {
383 task.Status = db.TASK_STATUS_MIS HAP
384 } else if isFailure {
385 task.Status = db.TASK_STATUS_FAI LURE
386 } else {
387 task.Status = db.TASK_STATUS_SUC CESS
388 task.IsolatedOutput = fmt.Sprint f("%x", rand.Int63())
389 }
390 default:
391 glog.Fatalf("Task %s in update queue has status %s. %#v", task.Id, task.Status, task)
392 }
393 before = time.Now()
394 return nil
395 })
396 putTasksDur += time.Now().Sub(before)
397 if err != nil {
398 glog.Fatal(err)
399 }
400 mUpdates.Lock()
401 updates++
402 updateDur += putTasksDur
403 mUpdates.Unlock()
404 }
405 }
406 }
407
408 // readTasks reads the last hour of tasks every second. Does not return.
409 func readTasks(d db.DB) {
410 glog.Infof("readTasks begin")
411 var taskCount uint64 = 0
412 var readCount uint64 = 0
413 var totalDuration time.Duration = 0
414 lastMessage := time.Now()
415 for _ = range time.Tick(time.Second) {
416 now := time.Now()
417 t, err := d.GetTasksFromDateRange(now.Add(-time.Hour), now)
418 dur := time.Now().Sub(now)
419 if err != nil {
420 glog.Fatal(err)
421 }
422 taskCount += uint64(len(t))
423 readCount++
424 totalDuration += dur
425 mReads.Lock()
426 reads += len(t)
427 readDur += dur
428 mReads.Unlock()
429 if now.Sub(lastMessage) > time.Minute {
430 lastMessage = now
431 if readCount > 0 && totalDuration > 0 {
432 glog.Infof("readTasks %d tasks in last hour; %f reads/sec; %f tasks/sec", taskCount/readCount, float64(readCount)/totalDuration. Seconds(), float64(taskCount)/totalDuration.Seconds())
433 } else {
434 glog.Fatalf("readTasks 0 reads in last minute")
435 }
436 taskCount = 0
437 readCount = 0
438 totalDuration = 0
439 }
440 }
441 }
442
443 // reportStats logs the performance of the DB as seen by putTasks, updateTasks,
444 // and readTasks. Does not return.
445 func reportStats() {
446 lastInserts := 0
447 lastInsertDur := time.Duration(0)
448 lastInsertAndUpdates := 0
449 lastInsertAndUpdateDur := time.Duration(0)
450 lastUpdates := 0
451 lastUpdateDur := time.Duration(0)
452 lastReads := 0
453 lastReadDur := time.Duration(0)
454 for _ = range time.Tick(5 * time.Second) {
455 mInserts.RLock()
456 totalInserts := inserts
457 totalInsertDur := insertDur
458 mInserts.RUnlock()
459 mInsertAndUpdates.RLock()
460 totalInsertAndUpdates := insertAndUpdates
461 totalInsertAndUpdateDur := insertAndUpdateDur
462 mInsertAndUpdates.RUnlock()
463 mUpdates.RLock()
464 totalUpdates := updates
465 totalUpdateDur := updateDur
466 mUpdates.RUnlock()
467 mReads.RLock()
468 totalReads := reads
469 totalReadDur := readDur
470 mReads.RUnlock()
471 curInserts := totalInserts - lastInserts
472 lastInserts = totalInserts
473 curInsertDur := totalInsertDur - lastInsertDur
474 lastInsertDur = totalInsertDur
475 curInsertAndUpdates := totalInsertAndUpdates - lastInsertAndUpda tes
476 lastInsertAndUpdates = totalInsertAndUpdates
477 curInsertAndUpdateDur := totalInsertAndUpdateDur - lastInsertAnd UpdateDur
478 lastInsertAndUpdateDur = totalInsertAndUpdateDur
479 curUpdates := totalUpdates - lastUpdates
480 lastUpdates = totalUpdates
481 curUpdateDur := totalUpdateDur - lastUpdateDur
482 lastUpdateDur = totalUpdateDur
483 curReads := totalReads - lastReads
484 lastReads = totalReads
485 curReadDur := totalReadDur - lastReadDur
486 lastReadDur = totalReadDur
487 glog.Infof("reportStats total; %d inserts %f/s; %d insert-and-up dates %f/s; %d updates %f/s; %d reads %f/s", totalInserts, float64(totalInserts) /totalInsertDur.Seconds(), totalInsertAndUpdates, float64(totalInsertAndUpdates) /totalInsertAndUpdateDur.Seconds(), totalUpdates, float64(totalUpdates)/totalUpd ateDur.Seconds(), totalReads, float64(totalReads)/totalReadDur.Seconds())
488 if curInsertDur.Nanoseconds() == 0 {
489 curInsertDur += time.Nanosecond
490 }
491 if curInsertAndUpdateDur.Nanoseconds() == 0 {
492 curInsertAndUpdateDur += time.Nanosecond
493 }
494 if curUpdateDur.Nanoseconds() == 0 {
495 curUpdateDur += time.Nanosecond
496 }
497 if curReadDur.Nanoseconds() == 0 {
498 curReadDur += time.Nanosecond
499 }
500 glog.Infof("reportStats current; %d inserts %f/s; %d insert-and- updates %f/s; %d updates %f/s; %d reads %f/s", curInserts, float64(curInserts)/c urInsertDur.Seconds(), curInsertAndUpdates, float64(curInsertAndUpdates)/curInse rtAndUpdateDur.Seconds(), curUpdates, float64(curUpdates)/curUpdateDur.Seconds() , curReads, float64(curReads)/curReadDur.Seconds())
501 }
502 }
503
504 func main() {
505 defer common.LogPanic()
506
507 // Global init.
508 common.InitWithMetrics2("busywork", influxHost, influxUser, influxPasswo rd, influxDatabase, local)
509
510 d, err := local_db.NewDB("busywork", path.Join(*workdir, "busywork.bdb") )
511 if err != nil {
512 glog.Fatal(err)
513 }
514
515 go reportStats()
516
517 go putTasks(d)
518 go updateTasks(d)
519 go readTasks(d)
520
521 // Block forever while goroutines do the work.
522 select {}
523 }
OLDNEW
« no previous file with comments | « build_scheduler/go/db/db.go ('k') | build_scheduler/go/db/local_db/local_db.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698