OLD | NEW |
| (Empty) |
1 // busywork is an end-to-end test for local_db. It performs inserts and updates | |
2 // roughly mimicking what we might expect from build_scheduler. It also tracks | |
3 // performance for various operations. | |
4 package main | |
5 | |
6 import ( | |
7 "container/heap" | |
8 "flag" | |
9 "fmt" | |
10 "math" | |
11 "math/rand" | |
12 "path" | |
13 "sort" | |
14 "strconv" | |
15 "sync" | |
16 "time" | |
17 | |
18 "github.com/skia-dev/glog" | |
19 | |
20 "go.skia.org/infra/build_scheduler/go/db" | |
21 "go.skia.org/infra/build_scheduler/go/db/local_db" | |
22 "go.skia.org/infra/go/buildbot" | |
23 "go.skia.org/infra/go/common" | |
24 "go.skia.org/infra/go/influxdb" | |
25 ) | |
26 | |
27 var ( | |
28 // Flags. | |
29 local = flag.Bool("local", true, "Whether we're running on a dev machi
ne vs in production.") | |
30 workdir = flag.String("workdir", "workdir", "Working directory to use.") | |
31 | |
32 influxHost = flag.String("influxdb_host", influxdb.DEFAULT_HOST, "Th
e InfluxDB hostname.") | |
33 influxUser = flag.String("influxdb_name", influxdb.DEFAULT_USER, "Th
e InfluxDB username.") | |
34 influxPassword = flag.String("influxdb_password", influxdb.DEFAULT_PASSW
ORD, "The InfluxDB password.") | |
35 influxDatabase = flag.String("influxdb_database", influxdb.DEFAULT_DATAB
ASE, "The InfluxDB database.") | |
36 | |
37 // Counters. | |
38 inserts = 0 | |
39 insertDur = time.Duration(0) | |
40 mInserts = sync.RWMutex{} | |
41 insertAndUpdates = 0 | |
42 insertAndUpdateDur = time.Duration(0) | |
43 mInsertAndUpdates = sync.RWMutex{} | |
44 updates = 0 | |
45 updateDur = time.Duration(0) | |
46 mUpdates = sync.RWMutex{} | |
47 reads = 0 | |
48 readDur = time.Duration(0) | |
49 mReads = sync.RWMutex{} | |
50 | |
51 // epoch is a time before local_db was written. | |
52 epoch = time.Date(2016, 8, 1, 0, 0, 0, 0, time.UTC) | |
53 ) | |
54 | |
55 const ( | |
56 // Parameters for creating random tasks. | |
57 kNumTaskNames = 50 | |
58 kNumRepos = 3 | |
59 kRecentCommitRange = 30 | |
60 kMedianBlamelistLength = 2 | |
61 | |
62 // Parameters for randomly updating tasks. | |
63 kMedianPendingDuration = 10 * time.Second | |
64 kMedianRunningDuration = 10 * time.Minute | |
65 ) | |
66 | |
67 // itoh converts an integer to a commit hash. Task.Revision is always set to | |
68 // the result of itoh. | |
69 func itoh(i int) string { | |
70 return strconv.Itoa(i) | |
71 } | |
72 | |
73 // htoi converts a commit hash to an integer. A commit's parent is | |
74 // itoh(htoi(hash)-1). | |
75 func htoi(h string) int { | |
76 i, err := strconv.Atoi(h) | |
77 if err != nil { | |
78 glog.Fatal(err) | |
79 } | |
80 return i | |
81 } | |
82 | |
83 // makeTask generates task with random Name, Repo, and Revision. Revision will | |
84 // be picked randomly from a range starting at recentCommitsBegin. | |
85 func makeTask(recentCommitsBegin int) *db.Task { | |
86 return &db.Task{ | |
87 Name: fmt.Sprintf("Task-%d", rand.Intn(kNumTaskNames)), | |
88 Repo: fmt.Sprintf("Repo-%d", rand.Intn(kNumRepos)), | |
89 Revision: itoh(recentCommitsBegin + rand.Intn(kRecentCommitRange
)), | |
90 } | |
91 } | |
92 | |
93 // updateBlamelists sets t's Commits based on t.Revision and previously-inserted | |
94 // tasks' Commits and returns t. If another task's Commits needs to change, also | |
95 // returns that task with its updated Commits. | |
96 func updateBlamelists(cache db.TaskCache, t *db.Task) ([]*db.Task, error) { | |
97 if !cache.KnownTaskName(t.Repo, t.Name) { | |
98 t.Commits = []string{t.Revision} | |
99 return []*db.Task{t}, nil | |
100 } | |
101 stealFrom, err := cache.GetTaskForCommit(t.Repo, t.Revision, t.Name) | |
102 if err != nil { | |
103 return nil, fmt.Errorf("Could not find task %q for commit %q: %s
", t.Name, t.Revision, err) | |
104 } | |
105 | |
106 lastCommit := htoi(t.Revision) | |
107 firstCommit := lastCommit | |
108 // Work backwards until prev changes. | |
109 for i := lastCommit - 1; i > 0; i-- { | |
110 if lastCommit-firstCommit+1 > buildbot.MAX_BLAMELIST_COMMITS &&
stealFrom == nil { | |
111 t.Commits = []string{t.Revision} | |
112 return []*db.Task{t}, nil | |
113 } | |
114 hash := itoh(i) | |
115 prev, err := cache.GetTaskForCommit(t.Repo, hash, t.Name) | |
116 if err != nil { | |
117 return nil, fmt.Errorf("Could not find task %q for commi
t %q: %s", t.Name, hash, err) | |
118 } | |
119 if stealFrom != prev { | |
120 break | |
121 } | |
122 firstCommit = i | |
123 } | |
124 | |
125 t.Commits = make([]string, lastCommit-firstCommit+1) | |
126 for i := 0; i <= lastCommit-firstCommit; i++ { | |
127 t.Commits[i] = itoh(i + firstCommit) | |
128 } | |
129 sort.Strings(t.Commits) | |
130 | |
131 if stealFrom != nil { | |
132 newCommits := make([]string, 0, len(stealFrom.Commits)-len(t.Com
mits)) | |
133 for _, h := range stealFrom.Commits { | |
134 idx := sort.SearchStrings(t.Commits, h) | |
135 if idx == len(t.Commits) || t.Commits[idx] != h { | |
136 newCommits = append(newCommits, h) | |
137 } | |
138 } | |
139 stealFrom.Commits = newCommits | |
140 return []*db.Task{t, stealFrom}, nil | |
141 } else { | |
142 return []*db.Task{t}, nil | |
143 } | |
144 } | |
145 | |
146 // findApproxLatestCommit scans the DB backwards and returns the commit # of the | |
147 // last-created task. | |
148 func findApproxLatestCommit(d db.DB) int { | |
149 glog.Infof("findApproxLatestCommit begin") | |
150 for t := time.Now(); t.After(epoch); t = t.Add(-24 * time.Hour) { | |
151 begin := t.Add(-24 * time.Hour) | |
152 glog.Infof("findApproxLatestCommit loading %s to %s", begin, t) | |
153 before := time.Now() | |
154 t, err := d.GetTasksFromDateRange(begin, t) | |
155 getTasksDur := time.Now().Sub(before) | |
156 if err != nil { | |
157 glog.Fatal(err) | |
158 } | |
159 mReads.Lock() | |
160 if len(t) > 0 { | |
161 reads += len(t) | |
162 } else { | |
163 reads++ | |
164 } | |
165 readDur += getTasksDur | |
166 mReads.Unlock() | |
167 if len(t) > 0 { | |
168 // Return revision of last task. | |
169 lastTask := t[len(t)-1] | |
170 i := htoi(lastTask.Revision) | |
171 glog.Infof("findApproxLatestCommit returning %d from %s"
, i, lastTask.Id) | |
172 return i | |
173 } | |
174 | |
175 } | |
176 glog.Infof("findApproxLatestCommit found empty DB") | |
177 return 0 | |
178 } | |
179 | |
180 // putTasks inserts randomly-generated tasks into the DB. Does not return. | |
181 func putTasks(d db.DB) { | |
182 glog.Infof("putTasks begin") | |
183 cache, err := db.NewTaskCache(d, 4*24*time.Hour) | |
184 if err != nil { | |
185 glog.Fatal(err) | |
186 } | |
187 // If we're restarting, try to pick up where we left off. | |
188 currentCommit := findApproxLatestCommit(d) | |
189 meanTasksPerCommit := float64(kNumTaskNames * kNumRepos / kMedianBlameli
stLength) | |
190 maxTasksPerIter := float64(kNumTaskNames * kNumRepos * kRecentCommitRang
e) | |
191 for { | |
192 iterTasks := int(math.Max(0, math.Min(maxTasksPerIter, (rand.Nor
mFloat64()+1)*meanTasksPerCommit))) | |
193 glog.Infof("Adding %d tasks with revisions %s - %s", iterTasks,
itoh(currentCommit), itoh(currentCommit+kRecentCommitRange)) | |
194 for i := 0; i < iterTasks; i++ { | |
195 t := makeTask(currentCommit) | |
196 putTasksDur := time.Duration(0) | |
197 before := time.Now() | |
198 updatedTasks, err := db.UpdateWithRetries(d, func() ([]*
db.Task, error) { | |
199 putTasksDur += time.Now().Sub(before) | |
200 t := t.Copy() | |
201 if err := cache.Update(); err != nil { | |
202 glog.Fatal(err) | |
203 } | |
204 tasksToUpdate, err := updateBlamelists(cache, t) | |
205 if err != nil { | |
206 glog.Fatal(err) | |
207 } | |
208 before = time.Now() | |
209 if err := d.AssignId(t); err != nil { | |
210 glog.Fatal(err) | |
211 } | |
212 putTasksDur += time.Now().Sub(before) | |
213 t.Created = time.Now() | |
214 t.SwarmingTaskId = fmt.Sprintf("%x", rand.Int31(
)) | |
215 before = time.Now() | |
216 return tasksToUpdate, nil | |
217 }) | |
218 putTasksDur += time.Now().Sub(before) | |
219 if err != nil { | |
220 glog.Fatal(err) | |
221 } | |
222 if len(updatedTasks) > 1 { | |
223 mInsertAndUpdates.Lock() | |
224 if err == nil { | |
225 insertAndUpdates += len(updatedTasks) | |
226 } | |
227 insertAndUpdateDur += putTasksDur | |
228 mInsertAndUpdates.Unlock() | |
229 } else { | |
230 mInserts.Lock() | |
231 if err == nil { | |
232 inserts++ | |
233 } | |
234 insertDur += putTasksDur | |
235 mInserts.Unlock() | |
236 } | |
237 } | |
238 currentCommit++ | |
239 } | |
240 } | |
241 | |
242 // updateEntry is an item in updateEntryHeap. | |
243 type updateEntry struct { | |
244 task *db.Task | |
245 // updateTime is the key for updateEntryHeap. | |
246 updateTime time.Time | |
247 // heapIndex is the index of this updateEntry in updateEntryHeap. It is
kept | |
248 // up-to-date by updateEntryHeap methods. | |
249 heapIndex int | |
250 } | |
251 | |
252 // updateEntryHeap implements a queue of updateEntry's ordered by updateTime. It | |
253 // implements heap.Interface. | |
254 type updateEntryHeap []*updateEntry | |
255 | |
256 func (h updateEntryHeap) Len() int { return len(h) } | |
257 func (h updateEntryHeap) Less(i, j int) bool { return h[i].updateTime.Before(h[j
].updateTime) } | |
258 func (h updateEntryHeap) Swap(i, j int) { | |
259 h[i], h[j] = h[j], h[i] | |
260 h[i].heapIndex = i | |
261 h[j].heapIndex = j | |
262 } | |
263 | |
264 func (h *updateEntryHeap) Push(x interface{}) { | |
265 item := x.(*updateEntry) | |
266 item.heapIndex = len(*h) | |
267 *h = append(*h, item) | |
268 } | |
269 | |
270 func (h *updateEntryHeap) Pop() interface{} { | |
271 old := *h | |
272 n := len(old) | |
273 x := old[n-1] | |
274 *h = old[0 : n-1] | |
275 x.heapIndex = -1 | |
276 return x | |
277 } | |
278 | |
279 // updateTasks makes random updates to pending and running tasks in the DB. Does | |
280 // not return. | |
281 func updateTasks(d db.DB) { | |
282 glog.Infof("updateTasks begin") | |
283 updateQueue := updateEntryHeap{} | |
284 idMap := map[string]*updateEntry{} | |
285 | |
286 freshenQueue := func(task *db.Task) { | |
287 entry := idMap[task.Id] | |
288 // Currently only updating pending and running tasks. | |
289 if task.Status == db.TASK_STATUS_PENDING || task.Status == db.TA
SK_STATUS_RUNNING { | |
290 meanUpdateDelay := kMedianPendingDuration | |
291 if task.Status == db.TASK_STATUS_RUNNING { | |
292 meanUpdateDelay = kMedianRunningDuration | |
293 } | |
294 updateDelayNanos := int64(math.Max(0, (rand.NormFloat64(
)+1)*float64(meanUpdateDelay))) | |
295 updateTime := time.Now().Add(time.Duration(updateDelayNa
nos) * time.Nanosecond) | |
296 if entry == nil { | |
297 entry = &updateEntry{ | |
298 task: task, | |
299 updateTime: updateTime, | |
300 heapIndex: -1, | |
301 } | |
302 heap.Push(&updateQueue, entry) | |
303 } else { | |
304 entry.task = task | |
305 entry.updateTime = updateTime | |
306 heap.Fix(&updateQueue, entry.heapIndex) | |
307 } | |
308 if entry.heapIndex < 0 { | |
309 glog.Fatalf("you lose %#v %#v", entry, updateQue
ue) | |
310 } | |
311 idMap[task.Id] = entry | |
312 } else if entry != nil { | |
313 heap.Remove(&updateQueue, entry.heapIndex) | |
314 delete(idMap, task.Id) | |
315 } | |
316 } | |
317 | |
318 token, err := d.StartTrackingModifiedTasks() | |
319 if err != nil { | |
320 glog.Fatal(err) | |
321 } | |
322 // Initial read to find pending and running tasks. | |
323 for t := time.Now(); t.After(epoch); t = t.Add(-24 * time.Hour) { | |
324 begin := t.Add(-24 * time.Hour) | |
325 glog.Infof("updateTasks loading %s to %s", begin, t) | |
326 before := time.Now() | |
327 t, err := d.GetTasksFromDateRange(begin, t) | |
328 getTasksDur := time.Now().Sub(before) | |
329 if err != nil { | |
330 glog.Fatal(err) | |
331 } | |
332 mReads.Lock() | |
333 if len(t) > 0 { | |
334 reads += len(t) | |
335 } else { | |
336 reads++ | |
337 } | |
338 readDur += getTasksDur | |
339 mReads.Unlock() | |
340 for _, task := range t { | |
341 freshenQueue(task) | |
342 } | |
343 } | |
344 glog.Infof("updateTasks finished loading; %d pending and running", len(i
dMap)) | |
345 // Rate limit so we're not constantly taking locks for GetModifiedTasks. | |
346 for _ = range time.Tick(time.Millisecond) { | |
347 now := time.Now() | |
348 t, err := d.GetModifiedTasks(token) | |
349 if err != nil { | |
350 glog.Fatal(err) | |
351 } | |
352 for _, task := range t { | |
353 freshenQueue(task) | |
354 } | |
355 glog.Infof("updateTasks performing updates; %d tasks on queue",
len(updateQueue)) | |
356 for len(updateQueue) > 0 && updateQueue[0].updateTime.Before(now
) { | |
357 if time.Now().Sub(now) >= db.MODIFIED_TASKS_TIMEOUT-5*ti
me.Second { | |
358 break | |
359 } | |
360 entry := heap.Pop(&updateQueue).(*updateEntry) | |
361 task := entry.task | |
362 delete(idMap, task.Id) | |
363 putTasksDur := time.Duration(0) | |
364 before := time.Now() | |
365 _, err := db.UpdateTaskWithRetries(d, task.Id, func(task
*db.Task) error { | |
366 putTasksDur += time.Now().Sub(before) | |
367 switch task.Status { | |
368 case db.TASK_STATUS_PENDING: | |
369 task.Started = now | |
370 isMishap := rand.Intn(100) == 0 | |
371 if isMishap { | |
372 task.Status = db.TASK_STATUS_MIS
HAP | |
373 task.Finished = now | |
374 } else { | |
375 task.Status = db.TASK_STATUS_RUN
NING | |
376 } | |
377 case db.TASK_STATUS_RUNNING: | |
378 task.Finished = now | |
379 statusRand := rand.Intn(25) | |
380 isMishap := statusRand == 0 | |
381 isFailure := statusRand < 5 | |
382 if isMishap { | |
383 task.Status = db.TASK_STATUS_MIS
HAP | |
384 } else if isFailure { | |
385 task.Status = db.TASK_STATUS_FAI
LURE | |
386 } else { | |
387 task.Status = db.TASK_STATUS_SUC
CESS | |
388 task.IsolatedOutput = fmt.Sprint
f("%x", rand.Int63()) | |
389 } | |
390 default: | |
391 glog.Fatalf("Task %s in update queue has
status %s. %#v", task.Id, task.Status, task) | |
392 } | |
393 before = time.Now() | |
394 return nil | |
395 }) | |
396 putTasksDur += time.Now().Sub(before) | |
397 if err != nil { | |
398 glog.Fatal(err) | |
399 } | |
400 mUpdates.Lock() | |
401 updates++ | |
402 updateDur += putTasksDur | |
403 mUpdates.Unlock() | |
404 } | |
405 } | |
406 } | |
407 | |
408 // readTasks reads the last hour of tasks every second. Does not return. | |
409 func readTasks(d db.DB) { | |
410 glog.Infof("readTasks begin") | |
411 var taskCount uint64 = 0 | |
412 var readCount uint64 = 0 | |
413 var totalDuration time.Duration = 0 | |
414 lastMessage := time.Now() | |
415 for _ = range time.Tick(time.Second) { | |
416 now := time.Now() | |
417 t, err := d.GetTasksFromDateRange(now.Add(-time.Hour), now) | |
418 dur := time.Now().Sub(now) | |
419 if err != nil { | |
420 glog.Fatal(err) | |
421 } | |
422 taskCount += uint64(len(t)) | |
423 readCount++ | |
424 totalDuration += dur | |
425 mReads.Lock() | |
426 reads += len(t) | |
427 readDur += dur | |
428 mReads.Unlock() | |
429 if now.Sub(lastMessage) > time.Minute { | |
430 lastMessage = now | |
431 if readCount > 0 && totalDuration > 0 { | |
432 glog.Infof("readTasks %d tasks in last hour; %f
reads/sec; %f tasks/sec", taskCount/readCount, float64(readCount)/totalDuration.
Seconds(), float64(taskCount)/totalDuration.Seconds()) | |
433 } else { | |
434 glog.Fatalf("readTasks 0 reads in last minute") | |
435 } | |
436 taskCount = 0 | |
437 readCount = 0 | |
438 totalDuration = 0 | |
439 } | |
440 } | |
441 } | |
442 | |
443 // reportStats logs the performance of the DB as seen by putTasks, updateTasks, | |
444 // and readTasks. Does not return. | |
445 func reportStats() { | |
446 lastInserts := 0 | |
447 lastInsertDur := time.Duration(0) | |
448 lastInsertAndUpdates := 0 | |
449 lastInsertAndUpdateDur := time.Duration(0) | |
450 lastUpdates := 0 | |
451 lastUpdateDur := time.Duration(0) | |
452 lastReads := 0 | |
453 lastReadDur := time.Duration(0) | |
454 for _ = range time.Tick(5 * time.Second) { | |
455 mInserts.RLock() | |
456 totalInserts := inserts | |
457 totalInsertDur := insertDur | |
458 mInserts.RUnlock() | |
459 mInsertAndUpdates.RLock() | |
460 totalInsertAndUpdates := insertAndUpdates | |
461 totalInsertAndUpdateDur := insertAndUpdateDur | |
462 mInsertAndUpdates.RUnlock() | |
463 mUpdates.RLock() | |
464 totalUpdates := updates | |
465 totalUpdateDur := updateDur | |
466 mUpdates.RUnlock() | |
467 mReads.RLock() | |
468 totalReads := reads | |
469 totalReadDur := readDur | |
470 mReads.RUnlock() | |
471 curInserts := totalInserts - lastInserts | |
472 lastInserts = totalInserts | |
473 curInsertDur := totalInsertDur - lastInsertDur | |
474 lastInsertDur = totalInsertDur | |
475 curInsertAndUpdates := totalInsertAndUpdates - lastInsertAndUpda
tes | |
476 lastInsertAndUpdates = totalInsertAndUpdates | |
477 curInsertAndUpdateDur := totalInsertAndUpdateDur - lastInsertAnd
UpdateDur | |
478 lastInsertAndUpdateDur = totalInsertAndUpdateDur | |
479 curUpdates := totalUpdates - lastUpdates | |
480 lastUpdates = totalUpdates | |
481 curUpdateDur := totalUpdateDur - lastUpdateDur | |
482 lastUpdateDur = totalUpdateDur | |
483 curReads := totalReads - lastReads | |
484 lastReads = totalReads | |
485 curReadDur := totalReadDur - lastReadDur | |
486 lastReadDur = totalReadDur | |
487 glog.Infof("reportStats total; %d inserts %f/s; %d insert-and-up
dates %f/s; %d updates %f/s; %d reads %f/s", totalInserts, float64(totalInserts)
/totalInsertDur.Seconds(), totalInsertAndUpdates, float64(totalInsertAndUpdates)
/totalInsertAndUpdateDur.Seconds(), totalUpdates, float64(totalUpdates)/totalUpd
ateDur.Seconds(), totalReads, float64(totalReads)/totalReadDur.Seconds()) | |
488 if curInsertDur.Nanoseconds() == 0 { | |
489 curInsertDur += time.Nanosecond | |
490 } | |
491 if curInsertAndUpdateDur.Nanoseconds() == 0 { | |
492 curInsertAndUpdateDur += time.Nanosecond | |
493 } | |
494 if curUpdateDur.Nanoseconds() == 0 { | |
495 curUpdateDur += time.Nanosecond | |
496 } | |
497 if curReadDur.Nanoseconds() == 0 { | |
498 curReadDur += time.Nanosecond | |
499 } | |
500 glog.Infof("reportStats current; %d inserts %f/s; %d insert-and-
updates %f/s; %d updates %f/s; %d reads %f/s", curInserts, float64(curInserts)/c
urInsertDur.Seconds(), curInsertAndUpdates, float64(curInsertAndUpdates)/curInse
rtAndUpdateDur.Seconds(), curUpdates, float64(curUpdates)/curUpdateDur.Seconds()
, curReads, float64(curReads)/curReadDur.Seconds()) | |
501 } | |
502 } | |
503 | |
504 func main() { | |
505 defer common.LogPanic() | |
506 | |
507 // Global init. | |
508 common.InitWithMetrics2("busywork", influxHost, influxUser, influxPasswo
rd, influxDatabase, local) | |
509 | |
510 d, err := local_db.NewDB("busywork", path.Join(*workdir, "busywork.bdb")
) | |
511 if err != nil { | |
512 glog.Fatal(err) | |
513 } | |
514 | |
515 go reportStats() | |
516 | |
517 go putTasks(d) | |
518 go updateTasks(d) | |
519 go readTasks(d) | |
520 | |
521 // Block forever while goroutines do the work. | |
522 select {} | |
523 } | |
OLD | NEW |