Index: build_scheduler/go/db/local_db/busywork/main.go |
diff --git a/build_scheduler/go/db/local_db/busywork/main.go b/build_scheduler/go/db/local_db/busywork/main.go |
deleted file mode 100644 |
index 1d52f4faf2f7dcd9fe4220dbf91488deb8e930ff..0000000000000000000000000000000000000000 |
--- a/build_scheduler/go/db/local_db/busywork/main.go |
+++ /dev/null |
@@ -1,523 +0,0 @@ |
-// busywork is an end-to-end test for local_db. It performs inserts and updates |
-// roughly mimicking what we might expect from build_scheduler. It also tracks |
-// performance for various operations. |
-package main |
- |
-import ( |
- "container/heap" |
- "flag" |
- "fmt" |
- "math" |
- "math/rand" |
- "path" |
- "sort" |
- "strconv" |
- "sync" |
- "time" |
- |
- "github.com/skia-dev/glog" |
- |
- "go.skia.org/infra/build_scheduler/go/db" |
- "go.skia.org/infra/build_scheduler/go/db/local_db" |
- "go.skia.org/infra/go/buildbot" |
- "go.skia.org/infra/go/common" |
- "go.skia.org/infra/go/influxdb" |
-) |
- |
-var ( |
- // Flags. |
- local = flag.Bool("local", true, "Whether we're running on a dev machine vs in production.") |
- workdir = flag.String("workdir", "workdir", "Working directory to use.") |
- |
- influxHost = flag.String("influxdb_host", influxdb.DEFAULT_HOST, "The InfluxDB hostname.") |
- influxUser = flag.String("influxdb_name", influxdb.DEFAULT_USER, "The InfluxDB username.") |
- influxPassword = flag.String("influxdb_password", influxdb.DEFAULT_PASSWORD, "The InfluxDB password.") |
- influxDatabase = flag.String("influxdb_database", influxdb.DEFAULT_DATABASE, "The InfluxDB database.") |
- |
- // Counters. |
- inserts = 0 |
- insertDur = time.Duration(0) |
- mInserts = sync.RWMutex{} |
- insertAndUpdates = 0 |
- insertAndUpdateDur = time.Duration(0) |
- mInsertAndUpdates = sync.RWMutex{} |
- updates = 0 |
- updateDur = time.Duration(0) |
- mUpdates = sync.RWMutex{} |
- reads = 0 |
- readDur = time.Duration(0) |
- mReads = sync.RWMutex{} |
- |
- // epoch is a time before local_db was written. |
- epoch = time.Date(2016, 8, 1, 0, 0, 0, 0, time.UTC) |
-) |
- |
-const ( |
- // Parameters for creating random tasks. |
- kNumTaskNames = 50 |
- kNumRepos = 3 |
- kRecentCommitRange = 30 |
- kMedianBlamelistLength = 2 |
- |
- // Parameters for randomly updating tasks. |
- kMedianPendingDuration = 10 * time.Second |
- kMedianRunningDuration = 10 * time.Minute |
-) |
- |
-// itoh converts an integer to a commit hash. Task.Revision is always set to |
-// the result of itoh. |
-func itoh(i int) string { |
- return strconv.Itoa(i) |
-} |
- |
-// htoi converts a commit hash to an integer. A commit's parent is |
-// itoh(htoi(hash)-1). |
-func htoi(h string) int { |
- i, err := strconv.Atoi(h) |
- if err != nil { |
- glog.Fatal(err) |
- } |
- return i |
-} |
- |
-// makeTask generates task with random Name, Repo, and Revision. Revision will |
-// be picked randomly from a range starting at recentCommitsBegin. |
-func makeTask(recentCommitsBegin int) *db.Task { |
- return &db.Task{ |
- Name: fmt.Sprintf("Task-%d", rand.Intn(kNumTaskNames)), |
- Repo: fmt.Sprintf("Repo-%d", rand.Intn(kNumRepos)), |
- Revision: itoh(recentCommitsBegin + rand.Intn(kRecentCommitRange)), |
- } |
-} |
- |
-// updateBlamelists sets t's Commits based on t.Revision and previously-inserted |
-// tasks' Commits and returns t. If another task's Commits needs to change, also |
-// returns that task with its updated Commits. |
-func updateBlamelists(cache db.TaskCache, t *db.Task) ([]*db.Task, error) { |
- if !cache.KnownTaskName(t.Repo, t.Name) { |
- t.Commits = []string{t.Revision} |
- return []*db.Task{t}, nil |
- } |
- stealFrom, err := cache.GetTaskForCommit(t.Repo, t.Revision, t.Name) |
- if err != nil { |
- return nil, fmt.Errorf("Could not find task %q for commit %q: %s", t.Name, t.Revision, err) |
- } |
- |
- lastCommit := htoi(t.Revision) |
- firstCommit := lastCommit |
- // Work backwards until prev changes. |
- for i := lastCommit - 1; i > 0; i-- { |
- if lastCommit-firstCommit+1 > buildbot.MAX_BLAMELIST_COMMITS && stealFrom == nil { |
- t.Commits = []string{t.Revision} |
- return []*db.Task{t}, nil |
- } |
- hash := itoh(i) |
- prev, err := cache.GetTaskForCommit(t.Repo, hash, t.Name) |
- if err != nil { |
- return nil, fmt.Errorf("Could not find task %q for commit %q: %s", t.Name, hash, err) |
- } |
- if stealFrom != prev { |
- break |
- } |
- firstCommit = i |
- } |
- |
- t.Commits = make([]string, lastCommit-firstCommit+1) |
- for i := 0; i <= lastCommit-firstCommit; i++ { |
- t.Commits[i] = itoh(i + firstCommit) |
- } |
- sort.Strings(t.Commits) |
- |
- if stealFrom != nil { |
- newCommits := make([]string, 0, len(stealFrom.Commits)-len(t.Commits)) |
- for _, h := range stealFrom.Commits { |
- idx := sort.SearchStrings(t.Commits, h) |
- if idx == len(t.Commits) || t.Commits[idx] != h { |
- newCommits = append(newCommits, h) |
- } |
- } |
- stealFrom.Commits = newCommits |
- return []*db.Task{t, stealFrom}, nil |
- } else { |
- return []*db.Task{t}, nil |
- } |
-} |
- |
-// findApproxLatestCommit scans the DB backwards and returns the commit # of the |
-// last-created task. |
-func findApproxLatestCommit(d db.DB) int { |
- glog.Infof("findApproxLatestCommit begin") |
- for t := time.Now(); t.After(epoch); t = t.Add(-24 * time.Hour) { |
- begin := t.Add(-24 * time.Hour) |
- glog.Infof("findApproxLatestCommit loading %s to %s", begin, t) |
- before := time.Now() |
- t, err := d.GetTasksFromDateRange(begin, t) |
- getTasksDur := time.Now().Sub(before) |
- if err != nil { |
- glog.Fatal(err) |
- } |
- mReads.Lock() |
- if len(t) > 0 { |
- reads += len(t) |
- } else { |
- reads++ |
- } |
- readDur += getTasksDur |
- mReads.Unlock() |
- if len(t) > 0 { |
- // Return revision of last task. |
- lastTask := t[len(t)-1] |
- i := htoi(lastTask.Revision) |
- glog.Infof("findApproxLatestCommit returning %d from %s", i, lastTask.Id) |
- return i |
- } |
- |
- } |
- glog.Infof("findApproxLatestCommit found empty DB") |
- return 0 |
-} |
- |
-// putTasks inserts randomly-generated tasks into the DB. Does not return. |
-func putTasks(d db.DB) { |
- glog.Infof("putTasks begin") |
- cache, err := db.NewTaskCache(d, 4*24*time.Hour) |
- if err != nil { |
- glog.Fatal(err) |
- } |
- // If we're restarting, try to pick up where we left off. |
- currentCommit := findApproxLatestCommit(d) |
- meanTasksPerCommit := float64(kNumTaskNames * kNumRepos / kMedianBlamelistLength) |
- maxTasksPerIter := float64(kNumTaskNames * kNumRepos * kRecentCommitRange) |
- for { |
- iterTasks := int(math.Max(0, math.Min(maxTasksPerIter, (rand.NormFloat64()+1)*meanTasksPerCommit))) |
- glog.Infof("Adding %d tasks with revisions %s - %s", iterTasks, itoh(currentCommit), itoh(currentCommit+kRecentCommitRange)) |
- for i := 0; i < iterTasks; i++ { |
- t := makeTask(currentCommit) |
- putTasksDur := time.Duration(0) |
- before := time.Now() |
- updatedTasks, err := db.UpdateWithRetries(d, func() ([]*db.Task, error) { |
- putTasksDur += time.Now().Sub(before) |
- t := t.Copy() |
- if err := cache.Update(); err != nil { |
- glog.Fatal(err) |
- } |
- tasksToUpdate, err := updateBlamelists(cache, t) |
- if err != nil { |
- glog.Fatal(err) |
- } |
- before = time.Now() |
- if err := d.AssignId(t); err != nil { |
- glog.Fatal(err) |
- } |
- putTasksDur += time.Now().Sub(before) |
- t.Created = time.Now() |
- t.SwarmingTaskId = fmt.Sprintf("%x", rand.Int31()) |
- before = time.Now() |
- return tasksToUpdate, nil |
- }) |
- putTasksDur += time.Now().Sub(before) |
- if err != nil { |
- glog.Fatal(err) |
- } |
- if len(updatedTasks) > 1 { |
- mInsertAndUpdates.Lock() |
- if err == nil { |
- insertAndUpdates += len(updatedTasks) |
- } |
- insertAndUpdateDur += putTasksDur |
- mInsertAndUpdates.Unlock() |
- } else { |
- mInserts.Lock() |
- if err == nil { |
- inserts++ |
- } |
- insertDur += putTasksDur |
- mInserts.Unlock() |
- } |
- } |
- currentCommit++ |
- } |
-} |
- |
-// updateEntry is an item in updateEntryHeap. |
-type updateEntry struct { |
- task *db.Task |
- // updateTime is the key for updateEntryHeap. |
- updateTime time.Time |
- // heapIndex is the index of this updateEntry in updateEntryHeap. It is kept |
- // up-to-date by updateEntryHeap methods. |
- heapIndex int |
-} |
- |
-// updateEntryHeap implements a queue of updateEntry's ordered by updateTime. It |
-// implements heap.Interface. |
-type updateEntryHeap []*updateEntry |
- |
-func (h updateEntryHeap) Len() int { return len(h) } |
-func (h updateEntryHeap) Less(i, j int) bool { return h[i].updateTime.Before(h[j].updateTime) } |
-func (h updateEntryHeap) Swap(i, j int) { |
- h[i], h[j] = h[j], h[i] |
- h[i].heapIndex = i |
- h[j].heapIndex = j |
-} |
- |
-func (h *updateEntryHeap) Push(x interface{}) { |
- item := x.(*updateEntry) |
- item.heapIndex = len(*h) |
- *h = append(*h, item) |
-} |
- |
-func (h *updateEntryHeap) Pop() interface{} { |
- old := *h |
- n := len(old) |
- x := old[n-1] |
- *h = old[0 : n-1] |
- x.heapIndex = -1 |
- return x |
-} |
- |
-// updateTasks makes random updates to pending and running tasks in the DB. Does |
-// not return. |
-func updateTasks(d db.DB) { |
- glog.Infof("updateTasks begin") |
- updateQueue := updateEntryHeap{} |
- idMap := map[string]*updateEntry{} |
- |
- freshenQueue := func(task *db.Task) { |
- entry := idMap[task.Id] |
- // Currently only updating pending and running tasks. |
- if task.Status == db.TASK_STATUS_PENDING || task.Status == db.TASK_STATUS_RUNNING { |
- meanUpdateDelay := kMedianPendingDuration |
- if task.Status == db.TASK_STATUS_RUNNING { |
- meanUpdateDelay = kMedianRunningDuration |
- } |
- updateDelayNanos := int64(math.Max(0, (rand.NormFloat64()+1)*float64(meanUpdateDelay))) |
- updateTime := time.Now().Add(time.Duration(updateDelayNanos) * time.Nanosecond) |
- if entry == nil { |
- entry = &updateEntry{ |
- task: task, |
- updateTime: updateTime, |
- heapIndex: -1, |
- } |
- heap.Push(&updateQueue, entry) |
- } else { |
- entry.task = task |
- entry.updateTime = updateTime |
- heap.Fix(&updateQueue, entry.heapIndex) |
- } |
- if entry.heapIndex < 0 { |
- glog.Fatalf("you lose %#v %#v", entry, updateQueue) |
- } |
- idMap[task.Id] = entry |
- } else if entry != nil { |
- heap.Remove(&updateQueue, entry.heapIndex) |
- delete(idMap, task.Id) |
- } |
- } |
- |
- token, err := d.StartTrackingModifiedTasks() |
- if err != nil { |
- glog.Fatal(err) |
- } |
- // Initial read to find pending and running tasks. |
- for t := time.Now(); t.After(epoch); t = t.Add(-24 * time.Hour) { |
- begin := t.Add(-24 * time.Hour) |
- glog.Infof("updateTasks loading %s to %s", begin, t) |
- before := time.Now() |
- t, err := d.GetTasksFromDateRange(begin, t) |
- getTasksDur := time.Now().Sub(before) |
- if err != nil { |
- glog.Fatal(err) |
- } |
- mReads.Lock() |
- if len(t) > 0 { |
- reads += len(t) |
- } else { |
- reads++ |
- } |
- readDur += getTasksDur |
- mReads.Unlock() |
- for _, task := range t { |
- freshenQueue(task) |
- } |
- } |
- glog.Infof("updateTasks finished loading; %d pending and running", len(idMap)) |
- // Rate limit so we're not constantly taking locks for GetModifiedTasks. |
- for _ = range time.Tick(time.Millisecond) { |
- now := time.Now() |
- t, err := d.GetModifiedTasks(token) |
- if err != nil { |
- glog.Fatal(err) |
- } |
- for _, task := range t { |
- freshenQueue(task) |
- } |
- glog.Infof("updateTasks performing updates; %d tasks on queue", len(updateQueue)) |
- for len(updateQueue) > 0 && updateQueue[0].updateTime.Before(now) { |
- if time.Now().Sub(now) >= db.MODIFIED_TASKS_TIMEOUT-5*time.Second { |
- break |
- } |
- entry := heap.Pop(&updateQueue).(*updateEntry) |
- task := entry.task |
- delete(idMap, task.Id) |
- putTasksDur := time.Duration(0) |
- before := time.Now() |
- _, err := db.UpdateTaskWithRetries(d, task.Id, func(task *db.Task) error { |
- putTasksDur += time.Now().Sub(before) |
- switch task.Status { |
- case db.TASK_STATUS_PENDING: |
- task.Started = now |
- isMishap := rand.Intn(100) == 0 |
- if isMishap { |
- task.Status = db.TASK_STATUS_MISHAP |
- task.Finished = now |
- } else { |
- task.Status = db.TASK_STATUS_RUNNING |
- } |
- case db.TASK_STATUS_RUNNING: |
- task.Finished = now |
- statusRand := rand.Intn(25) |
- isMishap := statusRand == 0 |
- isFailure := statusRand < 5 |
- if isMishap { |
- task.Status = db.TASK_STATUS_MISHAP |
- } else if isFailure { |
- task.Status = db.TASK_STATUS_FAILURE |
- } else { |
- task.Status = db.TASK_STATUS_SUCCESS |
- task.IsolatedOutput = fmt.Sprintf("%x", rand.Int63()) |
- } |
- default: |
- glog.Fatalf("Task %s in update queue has status %s. %#v", task.Id, task.Status, task) |
- } |
- before = time.Now() |
- return nil |
- }) |
- putTasksDur += time.Now().Sub(before) |
- if err != nil { |
- glog.Fatal(err) |
- } |
- mUpdates.Lock() |
- updates++ |
- updateDur += putTasksDur |
- mUpdates.Unlock() |
- } |
- } |
-} |
- |
-// readTasks reads the last hour of tasks every second. Does not return. |
-func readTasks(d db.DB) { |
- glog.Infof("readTasks begin") |
- var taskCount uint64 = 0 |
- var readCount uint64 = 0 |
- var totalDuration time.Duration = 0 |
- lastMessage := time.Now() |
- for _ = range time.Tick(time.Second) { |
- now := time.Now() |
- t, err := d.GetTasksFromDateRange(now.Add(-time.Hour), now) |
- dur := time.Now().Sub(now) |
- if err != nil { |
- glog.Fatal(err) |
- } |
- taskCount += uint64(len(t)) |
- readCount++ |
- totalDuration += dur |
- mReads.Lock() |
- reads += len(t) |
- readDur += dur |
- mReads.Unlock() |
- if now.Sub(lastMessage) > time.Minute { |
- lastMessage = now |
- if readCount > 0 && totalDuration > 0 { |
- glog.Infof("readTasks %d tasks in last hour; %f reads/sec; %f tasks/sec", taskCount/readCount, float64(readCount)/totalDuration.Seconds(), float64(taskCount)/totalDuration.Seconds()) |
- } else { |
- glog.Fatalf("readTasks 0 reads in last minute") |
- } |
- taskCount = 0 |
- readCount = 0 |
- totalDuration = 0 |
- } |
- } |
-} |
- |
-// reportStats logs the performance of the DB as seen by putTasks, updateTasks, |
-// and readTasks. Does not return. |
-func reportStats() { |
- lastInserts := 0 |
- lastInsertDur := time.Duration(0) |
- lastInsertAndUpdates := 0 |
- lastInsertAndUpdateDur := time.Duration(0) |
- lastUpdates := 0 |
- lastUpdateDur := time.Duration(0) |
- lastReads := 0 |
- lastReadDur := time.Duration(0) |
- for _ = range time.Tick(5 * time.Second) { |
- mInserts.RLock() |
- totalInserts := inserts |
- totalInsertDur := insertDur |
- mInserts.RUnlock() |
- mInsertAndUpdates.RLock() |
- totalInsertAndUpdates := insertAndUpdates |
- totalInsertAndUpdateDur := insertAndUpdateDur |
- mInsertAndUpdates.RUnlock() |
- mUpdates.RLock() |
- totalUpdates := updates |
- totalUpdateDur := updateDur |
- mUpdates.RUnlock() |
- mReads.RLock() |
- totalReads := reads |
- totalReadDur := readDur |
- mReads.RUnlock() |
- curInserts := totalInserts - lastInserts |
- lastInserts = totalInserts |
- curInsertDur := totalInsertDur - lastInsertDur |
- lastInsertDur = totalInsertDur |
- curInsertAndUpdates := totalInsertAndUpdates - lastInsertAndUpdates |
- lastInsertAndUpdates = totalInsertAndUpdates |
- curInsertAndUpdateDur := totalInsertAndUpdateDur - lastInsertAndUpdateDur |
- lastInsertAndUpdateDur = totalInsertAndUpdateDur |
- curUpdates := totalUpdates - lastUpdates |
- lastUpdates = totalUpdates |
- curUpdateDur := totalUpdateDur - lastUpdateDur |
- lastUpdateDur = totalUpdateDur |
- curReads := totalReads - lastReads |
- lastReads = totalReads |
- curReadDur := totalReadDur - lastReadDur |
- lastReadDur = totalReadDur |
- glog.Infof("reportStats total; %d inserts %f/s; %d insert-and-updates %f/s; %d updates %f/s; %d reads %f/s", totalInserts, float64(totalInserts)/totalInsertDur.Seconds(), totalInsertAndUpdates, float64(totalInsertAndUpdates)/totalInsertAndUpdateDur.Seconds(), totalUpdates, float64(totalUpdates)/totalUpdateDur.Seconds(), totalReads, float64(totalReads)/totalReadDur.Seconds()) |
- if curInsertDur.Nanoseconds() == 0 { |
- curInsertDur += time.Nanosecond |
- } |
- if curInsertAndUpdateDur.Nanoseconds() == 0 { |
- curInsertAndUpdateDur += time.Nanosecond |
- } |
- if curUpdateDur.Nanoseconds() == 0 { |
- curUpdateDur += time.Nanosecond |
- } |
- if curReadDur.Nanoseconds() == 0 { |
- curReadDur += time.Nanosecond |
- } |
- glog.Infof("reportStats current; %d inserts %f/s; %d insert-and-updates %f/s; %d updates %f/s; %d reads %f/s", curInserts, float64(curInserts)/curInsertDur.Seconds(), curInsertAndUpdates, float64(curInsertAndUpdates)/curInsertAndUpdateDur.Seconds(), curUpdates, float64(curUpdates)/curUpdateDur.Seconds(), curReads, float64(curReads)/curReadDur.Seconds()) |
- } |
-} |
- |
-func main() { |
- defer common.LogPanic() |
- |
- // Global init. |
- common.InitWithMetrics2("busywork", influxHost, influxUser, influxPassword, influxDatabase, local) |
- |
- d, err := local_db.NewDB("busywork", path.Join(*workdir, "busywork.bdb")) |
- if err != nil { |
- glog.Fatal(err) |
- } |
- |
- go reportStats() |
- |
- go putTasks(d) |
- go updateTasks(d) |
- go readTasks(d) |
- |
- // Block forever while goroutines do the work. |
- select {} |
-} |