| Index: build_scheduler/go/db/local_db/busywork/main.go
|
| diff --git a/build_scheduler/go/db/local_db/busywork/main.go b/build_scheduler/go/db/local_db/busywork/main.go
|
| deleted file mode 100644
|
| index 1d52f4faf2f7dcd9fe4220dbf91488deb8e930ff..0000000000000000000000000000000000000000
|
| --- a/build_scheduler/go/db/local_db/busywork/main.go
|
| +++ /dev/null
|
| @@ -1,523 +0,0 @@
|
| -// busywork is an end-to-end test for local_db. It performs inserts and updates
|
| -// roughly mimicking what we might expect from build_scheduler. It also tracks
|
| -// performance for various operations.
|
| -package main
|
| -
|
| -import (
|
| - "container/heap"
|
| - "flag"
|
| - "fmt"
|
| - "math"
|
| - "math/rand"
|
| - "path"
|
| - "sort"
|
| - "strconv"
|
| - "sync"
|
| - "time"
|
| -
|
| - "github.com/skia-dev/glog"
|
| -
|
| - "go.skia.org/infra/build_scheduler/go/db"
|
| - "go.skia.org/infra/build_scheduler/go/db/local_db"
|
| - "go.skia.org/infra/go/buildbot"
|
| - "go.skia.org/infra/go/common"
|
| - "go.skia.org/infra/go/influxdb"
|
| -)
|
| -
|
| -var (
|
| - // Flags.
|
| - local = flag.Bool("local", true, "Whether we're running on a dev machine vs in production.")
|
| - workdir = flag.String("workdir", "workdir", "Working directory to use.")
|
| -
|
| - influxHost = flag.String("influxdb_host", influxdb.DEFAULT_HOST, "The InfluxDB hostname.")
|
| - influxUser = flag.String("influxdb_name", influxdb.DEFAULT_USER, "The InfluxDB username.")
|
| - influxPassword = flag.String("influxdb_password", influxdb.DEFAULT_PASSWORD, "The InfluxDB password.")
|
| - influxDatabase = flag.String("influxdb_database", influxdb.DEFAULT_DATABASE, "The InfluxDB database.")
|
| -
|
| - // Counters.
|
| - inserts = 0
|
| - insertDur = time.Duration(0)
|
| - mInserts = sync.RWMutex{}
|
| - insertAndUpdates = 0
|
| - insertAndUpdateDur = time.Duration(0)
|
| - mInsertAndUpdates = sync.RWMutex{}
|
| - updates = 0
|
| - updateDur = time.Duration(0)
|
| - mUpdates = sync.RWMutex{}
|
| - reads = 0
|
| - readDur = time.Duration(0)
|
| - mReads = sync.RWMutex{}
|
| -
|
| - // epoch is a time before local_db was written.
|
| - epoch = time.Date(2016, 8, 1, 0, 0, 0, 0, time.UTC)
|
| -)
|
| -
|
| -const (
|
| - // Parameters for creating random tasks.
|
| - kNumTaskNames = 50
|
| - kNumRepos = 3
|
| - kRecentCommitRange = 30
|
| - kMedianBlamelistLength = 2
|
| -
|
| - // Parameters for randomly updating tasks.
|
| - kMedianPendingDuration = 10 * time.Second
|
| - kMedianRunningDuration = 10 * time.Minute
|
| -)
|
| -
|
| -// itoh converts an integer to a commit hash. Task.Revision is always set to
|
| -// the result of itoh.
|
| -func itoh(i int) string {
|
| - return strconv.Itoa(i)
|
| -}
|
| -
|
| -// htoi converts a commit hash to an integer. A commit's parent is
|
| -// itoh(htoi(hash)-1).
|
| -func htoi(h string) int {
|
| - i, err := strconv.Atoi(h)
|
| - if err != nil {
|
| - glog.Fatal(err)
|
| - }
|
| - return i
|
| -}
|
| -
|
| -// makeTask generates task with random Name, Repo, and Revision. Revision will
|
| -// be picked randomly from a range starting at recentCommitsBegin.
|
| -func makeTask(recentCommitsBegin int) *db.Task {
|
| - return &db.Task{
|
| - Name: fmt.Sprintf("Task-%d", rand.Intn(kNumTaskNames)),
|
| - Repo: fmt.Sprintf("Repo-%d", rand.Intn(kNumRepos)),
|
| - Revision: itoh(recentCommitsBegin + rand.Intn(kRecentCommitRange)),
|
| - }
|
| -}
|
| -
|
| -// updateBlamelists sets t's Commits based on t.Revision and previously-inserted
|
| -// tasks' Commits and returns t. If another task's Commits needs to change, also
|
| -// returns that task with its updated Commits.
|
| -func updateBlamelists(cache db.TaskCache, t *db.Task) ([]*db.Task, error) {
|
| - if !cache.KnownTaskName(t.Repo, t.Name) {
|
| - t.Commits = []string{t.Revision}
|
| - return []*db.Task{t}, nil
|
| - }
|
| - stealFrom, err := cache.GetTaskForCommit(t.Repo, t.Revision, t.Name)
|
| - if err != nil {
|
| - return nil, fmt.Errorf("Could not find task %q for commit %q: %s", t.Name, t.Revision, err)
|
| - }
|
| -
|
| - lastCommit := htoi(t.Revision)
|
| - firstCommit := lastCommit
|
| - // Work backwards until prev changes.
|
| - for i := lastCommit - 1; i > 0; i-- {
|
| - if lastCommit-firstCommit+1 > buildbot.MAX_BLAMELIST_COMMITS && stealFrom == nil {
|
| - t.Commits = []string{t.Revision}
|
| - return []*db.Task{t}, nil
|
| - }
|
| - hash := itoh(i)
|
| - prev, err := cache.GetTaskForCommit(t.Repo, hash, t.Name)
|
| - if err != nil {
|
| - return nil, fmt.Errorf("Could not find task %q for commit %q: %s", t.Name, hash, err)
|
| - }
|
| - if stealFrom != prev {
|
| - break
|
| - }
|
| - firstCommit = i
|
| - }
|
| -
|
| - t.Commits = make([]string, lastCommit-firstCommit+1)
|
| - for i := 0; i <= lastCommit-firstCommit; i++ {
|
| - t.Commits[i] = itoh(i + firstCommit)
|
| - }
|
| - sort.Strings(t.Commits)
|
| -
|
| - if stealFrom != nil {
|
| - newCommits := make([]string, 0, len(stealFrom.Commits)-len(t.Commits))
|
| - for _, h := range stealFrom.Commits {
|
| - idx := sort.SearchStrings(t.Commits, h)
|
| - if idx == len(t.Commits) || t.Commits[idx] != h {
|
| - newCommits = append(newCommits, h)
|
| - }
|
| - }
|
| - stealFrom.Commits = newCommits
|
| - return []*db.Task{t, stealFrom}, nil
|
| - } else {
|
| - return []*db.Task{t}, nil
|
| - }
|
| -}
|
| -
|
| -// findApproxLatestCommit scans the DB backwards and returns the commit # of the
|
| -// last-created task.
|
| -func findApproxLatestCommit(d db.DB) int {
|
| - glog.Infof("findApproxLatestCommit begin")
|
| - for t := time.Now(); t.After(epoch); t = t.Add(-24 * time.Hour) {
|
| - begin := t.Add(-24 * time.Hour)
|
| - glog.Infof("findApproxLatestCommit loading %s to %s", begin, t)
|
| - before := time.Now()
|
| - t, err := d.GetTasksFromDateRange(begin, t)
|
| - getTasksDur := time.Now().Sub(before)
|
| - if err != nil {
|
| - glog.Fatal(err)
|
| - }
|
| - mReads.Lock()
|
| - if len(t) > 0 {
|
| - reads += len(t)
|
| - } else {
|
| - reads++
|
| - }
|
| - readDur += getTasksDur
|
| - mReads.Unlock()
|
| - if len(t) > 0 {
|
| - // Return revision of last task.
|
| - lastTask := t[len(t)-1]
|
| - i := htoi(lastTask.Revision)
|
| - glog.Infof("findApproxLatestCommit returning %d from %s", i, lastTask.Id)
|
| - return i
|
| - }
|
| -
|
| - }
|
| - glog.Infof("findApproxLatestCommit found empty DB")
|
| - return 0
|
| -}
|
| -
|
| -// putTasks inserts randomly-generated tasks into the DB. Does not return.
|
| -func putTasks(d db.DB) {
|
| - glog.Infof("putTasks begin")
|
| - cache, err := db.NewTaskCache(d, 4*24*time.Hour)
|
| - if err != nil {
|
| - glog.Fatal(err)
|
| - }
|
| - // If we're restarting, try to pick up where we left off.
|
| - currentCommit := findApproxLatestCommit(d)
|
| - meanTasksPerCommit := float64(kNumTaskNames * kNumRepos / kMedianBlamelistLength)
|
| - maxTasksPerIter := float64(kNumTaskNames * kNumRepos * kRecentCommitRange)
|
| - for {
|
| - iterTasks := int(math.Max(0, math.Min(maxTasksPerIter, (rand.NormFloat64()+1)*meanTasksPerCommit)))
|
| - glog.Infof("Adding %d tasks with revisions %s - %s", iterTasks, itoh(currentCommit), itoh(currentCommit+kRecentCommitRange))
|
| - for i := 0; i < iterTasks; i++ {
|
| - t := makeTask(currentCommit)
|
| - putTasksDur := time.Duration(0)
|
| - before := time.Now()
|
| - updatedTasks, err := db.UpdateWithRetries(d, func() ([]*db.Task, error) {
|
| - putTasksDur += time.Now().Sub(before)
|
| - t := t.Copy()
|
| - if err := cache.Update(); err != nil {
|
| - glog.Fatal(err)
|
| - }
|
| - tasksToUpdate, err := updateBlamelists(cache, t)
|
| - if err != nil {
|
| - glog.Fatal(err)
|
| - }
|
| - before = time.Now()
|
| - if err := d.AssignId(t); err != nil {
|
| - glog.Fatal(err)
|
| - }
|
| - putTasksDur += time.Now().Sub(before)
|
| - t.Created = time.Now()
|
| - t.SwarmingTaskId = fmt.Sprintf("%x", rand.Int31())
|
| - before = time.Now()
|
| - return tasksToUpdate, nil
|
| - })
|
| - putTasksDur += time.Now().Sub(before)
|
| - if err != nil {
|
| - glog.Fatal(err)
|
| - }
|
| - if len(updatedTasks) > 1 {
|
| - mInsertAndUpdates.Lock()
|
| - if err == nil {
|
| - insertAndUpdates += len(updatedTasks)
|
| - }
|
| - insertAndUpdateDur += putTasksDur
|
| - mInsertAndUpdates.Unlock()
|
| - } else {
|
| - mInserts.Lock()
|
| - if err == nil {
|
| - inserts++
|
| - }
|
| - insertDur += putTasksDur
|
| - mInserts.Unlock()
|
| - }
|
| - }
|
| - currentCommit++
|
| - }
|
| -}
|
| -
|
| -// updateEntry is an item in updateEntryHeap.
|
| -type updateEntry struct {
|
| - task *db.Task
|
| - // updateTime is the key for updateEntryHeap.
|
| - updateTime time.Time
|
| - // heapIndex is the index of this updateEntry in updateEntryHeap. It is kept
|
| - // up-to-date by updateEntryHeap methods.
|
| - heapIndex int
|
| -}
|
| -
|
| -// updateEntryHeap implements a queue of updateEntry's ordered by updateTime. It
|
| -// implements heap.Interface.
|
| -type updateEntryHeap []*updateEntry
|
| -
|
| -func (h updateEntryHeap) Len() int { return len(h) }
|
| -func (h updateEntryHeap) Less(i, j int) bool { return h[i].updateTime.Before(h[j].updateTime) }
|
| -func (h updateEntryHeap) Swap(i, j int) {
|
| - h[i], h[j] = h[j], h[i]
|
| - h[i].heapIndex = i
|
| - h[j].heapIndex = j
|
| -}
|
| -
|
| -func (h *updateEntryHeap) Push(x interface{}) {
|
| - item := x.(*updateEntry)
|
| - item.heapIndex = len(*h)
|
| - *h = append(*h, item)
|
| -}
|
| -
|
| -func (h *updateEntryHeap) Pop() interface{} {
|
| - old := *h
|
| - n := len(old)
|
| - x := old[n-1]
|
| - *h = old[0 : n-1]
|
| - x.heapIndex = -1
|
| - return x
|
| -}
|
| -
|
| -// updateTasks makes random updates to pending and running tasks in the DB. Does
|
| -// not return.
|
| -func updateTasks(d db.DB) {
|
| - glog.Infof("updateTasks begin")
|
| - updateQueue := updateEntryHeap{}
|
| - idMap := map[string]*updateEntry{}
|
| -
|
| - freshenQueue := func(task *db.Task) {
|
| - entry := idMap[task.Id]
|
| - // Currently only updating pending and running tasks.
|
| - if task.Status == db.TASK_STATUS_PENDING || task.Status == db.TASK_STATUS_RUNNING {
|
| - meanUpdateDelay := kMedianPendingDuration
|
| - if task.Status == db.TASK_STATUS_RUNNING {
|
| - meanUpdateDelay = kMedianRunningDuration
|
| - }
|
| - updateDelayNanos := int64(math.Max(0, (rand.NormFloat64()+1)*float64(meanUpdateDelay)))
|
| - updateTime := time.Now().Add(time.Duration(updateDelayNanos) * time.Nanosecond)
|
| - if entry == nil {
|
| - entry = &updateEntry{
|
| - task: task,
|
| - updateTime: updateTime,
|
| - heapIndex: -1,
|
| - }
|
| - heap.Push(&updateQueue, entry)
|
| - } else {
|
| - entry.task = task
|
| - entry.updateTime = updateTime
|
| - heap.Fix(&updateQueue, entry.heapIndex)
|
| - }
|
| - if entry.heapIndex < 0 {
|
| - glog.Fatalf("you lose %#v %#v", entry, updateQueue)
|
| - }
|
| - idMap[task.Id] = entry
|
| - } else if entry != nil {
|
| - heap.Remove(&updateQueue, entry.heapIndex)
|
| - delete(idMap, task.Id)
|
| - }
|
| - }
|
| -
|
| - token, err := d.StartTrackingModifiedTasks()
|
| - if err != nil {
|
| - glog.Fatal(err)
|
| - }
|
| - // Initial read to find pending and running tasks.
|
| - for t := time.Now(); t.After(epoch); t = t.Add(-24 * time.Hour) {
|
| - begin := t.Add(-24 * time.Hour)
|
| - glog.Infof("updateTasks loading %s to %s", begin, t)
|
| - before := time.Now()
|
| - t, err := d.GetTasksFromDateRange(begin, t)
|
| - getTasksDur := time.Now().Sub(before)
|
| - if err != nil {
|
| - glog.Fatal(err)
|
| - }
|
| - mReads.Lock()
|
| - if len(t) > 0 {
|
| - reads += len(t)
|
| - } else {
|
| - reads++
|
| - }
|
| - readDur += getTasksDur
|
| - mReads.Unlock()
|
| - for _, task := range t {
|
| - freshenQueue(task)
|
| - }
|
| - }
|
| - glog.Infof("updateTasks finished loading; %d pending and running", len(idMap))
|
| - // Rate limit so we're not constantly taking locks for GetModifiedTasks.
|
| - for _ = range time.Tick(time.Millisecond) {
|
| - now := time.Now()
|
| - t, err := d.GetModifiedTasks(token)
|
| - if err != nil {
|
| - glog.Fatal(err)
|
| - }
|
| - for _, task := range t {
|
| - freshenQueue(task)
|
| - }
|
| - glog.Infof("updateTasks performing updates; %d tasks on queue", len(updateQueue))
|
| - for len(updateQueue) > 0 && updateQueue[0].updateTime.Before(now) {
|
| - if time.Now().Sub(now) >= db.MODIFIED_TASKS_TIMEOUT-5*time.Second {
|
| - break
|
| - }
|
| - entry := heap.Pop(&updateQueue).(*updateEntry)
|
| - task := entry.task
|
| - delete(idMap, task.Id)
|
| - putTasksDur := time.Duration(0)
|
| - before := time.Now()
|
| - _, err := db.UpdateTaskWithRetries(d, task.Id, func(task *db.Task) error {
|
| - putTasksDur += time.Now().Sub(before)
|
| - switch task.Status {
|
| - case db.TASK_STATUS_PENDING:
|
| - task.Started = now
|
| - isMishap := rand.Intn(100) == 0
|
| - if isMishap {
|
| - task.Status = db.TASK_STATUS_MISHAP
|
| - task.Finished = now
|
| - } else {
|
| - task.Status = db.TASK_STATUS_RUNNING
|
| - }
|
| - case db.TASK_STATUS_RUNNING:
|
| - task.Finished = now
|
| - statusRand := rand.Intn(25)
|
| - isMishap := statusRand == 0
|
| - isFailure := statusRand < 5
|
| - if isMishap {
|
| - task.Status = db.TASK_STATUS_MISHAP
|
| - } else if isFailure {
|
| - task.Status = db.TASK_STATUS_FAILURE
|
| - } else {
|
| - task.Status = db.TASK_STATUS_SUCCESS
|
| - task.IsolatedOutput = fmt.Sprintf("%x", rand.Int63())
|
| - }
|
| - default:
|
| - glog.Fatalf("Task %s in update queue has status %s. %#v", task.Id, task.Status, task)
|
| - }
|
| - before = time.Now()
|
| - return nil
|
| - })
|
| - putTasksDur += time.Now().Sub(before)
|
| - if err != nil {
|
| - glog.Fatal(err)
|
| - }
|
| - mUpdates.Lock()
|
| - updates++
|
| - updateDur += putTasksDur
|
| - mUpdates.Unlock()
|
| - }
|
| - }
|
| -}
|
| -
|
| -// readTasks reads the last hour of tasks every second. Does not return.
|
| -func readTasks(d db.DB) {
|
| - glog.Infof("readTasks begin")
|
| - var taskCount uint64 = 0
|
| - var readCount uint64 = 0
|
| - var totalDuration time.Duration = 0
|
| - lastMessage := time.Now()
|
| - for _ = range time.Tick(time.Second) {
|
| - now := time.Now()
|
| - t, err := d.GetTasksFromDateRange(now.Add(-time.Hour), now)
|
| - dur := time.Now().Sub(now)
|
| - if err != nil {
|
| - glog.Fatal(err)
|
| - }
|
| - taskCount += uint64(len(t))
|
| - readCount++
|
| - totalDuration += dur
|
| - mReads.Lock()
|
| - reads += len(t)
|
| - readDur += dur
|
| - mReads.Unlock()
|
| - if now.Sub(lastMessage) > time.Minute {
|
| - lastMessage = now
|
| - if readCount > 0 && totalDuration > 0 {
|
| - glog.Infof("readTasks %d tasks in last hour; %f reads/sec; %f tasks/sec", taskCount/readCount, float64(readCount)/totalDuration.Seconds(), float64(taskCount)/totalDuration.Seconds())
|
| - } else {
|
| - glog.Fatalf("readTasks 0 reads in last minute")
|
| - }
|
| - taskCount = 0
|
| - readCount = 0
|
| - totalDuration = 0
|
| - }
|
| - }
|
| -}
|
| -
|
| -// reportStats logs the performance of the DB as seen by putTasks, updateTasks,
|
| -// and readTasks. Does not return.
|
| -func reportStats() {
|
| - lastInserts := 0
|
| - lastInsertDur := time.Duration(0)
|
| - lastInsertAndUpdates := 0
|
| - lastInsertAndUpdateDur := time.Duration(0)
|
| - lastUpdates := 0
|
| - lastUpdateDur := time.Duration(0)
|
| - lastReads := 0
|
| - lastReadDur := time.Duration(0)
|
| - for _ = range time.Tick(5 * time.Second) {
|
| - mInserts.RLock()
|
| - totalInserts := inserts
|
| - totalInsertDur := insertDur
|
| - mInserts.RUnlock()
|
| - mInsertAndUpdates.RLock()
|
| - totalInsertAndUpdates := insertAndUpdates
|
| - totalInsertAndUpdateDur := insertAndUpdateDur
|
| - mInsertAndUpdates.RUnlock()
|
| - mUpdates.RLock()
|
| - totalUpdates := updates
|
| - totalUpdateDur := updateDur
|
| - mUpdates.RUnlock()
|
| - mReads.RLock()
|
| - totalReads := reads
|
| - totalReadDur := readDur
|
| - mReads.RUnlock()
|
| - curInserts := totalInserts - lastInserts
|
| - lastInserts = totalInserts
|
| - curInsertDur := totalInsertDur - lastInsertDur
|
| - lastInsertDur = totalInsertDur
|
| - curInsertAndUpdates := totalInsertAndUpdates - lastInsertAndUpdates
|
| - lastInsertAndUpdates = totalInsertAndUpdates
|
| - curInsertAndUpdateDur := totalInsertAndUpdateDur - lastInsertAndUpdateDur
|
| - lastInsertAndUpdateDur = totalInsertAndUpdateDur
|
| - curUpdates := totalUpdates - lastUpdates
|
| - lastUpdates = totalUpdates
|
| - curUpdateDur := totalUpdateDur - lastUpdateDur
|
| - lastUpdateDur = totalUpdateDur
|
| - curReads := totalReads - lastReads
|
| - lastReads = totalReads
|
| - curReadDur := totalReadDur - lastReadDur
|
| - lastReadDur = totalReadDur
|
| - glog.Infof("reportStats total; %d inserts %f/s; %d insert-and-updates %f/s; %d updates %f/s; %d reads %f/s", totalInserts, float64(totalInserts)/totalInsertDur.Seconds(), totalInsertAndUpdates, float64(totalInsertAndUpdates)/totalInsertAndUpdateDur.Seconds(), totalUpdates, float64(totalUpdates)/totalUpdateDur.Seconds(), totalReads, float64(totalReads)/totalReadDur.Seconds())
|
| - if curInsertDur.Nanoseconds() == 0 {
|
| - curInsertDur += time.Nanosecond
|
| - }
|
| - if curInsertAndUpdateDur.Nanoseconds() == 0 {
|
| - curInsertAndUpdateDur += time.Nanosecond
|
| - }
|
| - if curUpdateDur.Nanoseconds() == 0 {
|
| - curUpdateDur += time.Nanosecond
|
| - }
|
| - if curReadDur.Nanoseconds() == 0 {
|
| - curReadDur += time.Nanosecond
|
| - }
|
| - glog.Infof("reportStats current; %d inserts %f/s; %d insert-and-updates %f/s; %d updates %f/s; %d reads %f/s", curInserts, float64(curInserts)/curInsertDur.Seconds(), curInsertAndUpdates, float64(curInsertAndUpdates)/curInsertAndUpdateDur.Seconds(), curUpdates, float64(curUpdates)/curUpdateDur.Seconds(), curReads, float64(curReads)/curReadDur.Seconds())
|
| - }
|
| -}
|
| -
|
| -func main() {
|
| - defer common.LogPanic()
|
| -
|
| - // Global init.
|
| - common.InitWithMetrics2("busywork", influxHost, influxUser, influxPassword, influxDatabase, local)
|
| -
|
| - d, err := local_db.NewDB("busywork", path.Join(*workdir, "busywork.bdb"))
|
| - if err != nil {
|
| - glog.Fatal(err)
|
| - }
|
| -
|
| - go reportStats()
|
| -
|
| - go putTasks(d)
|
| - go updateTasks(d)
|
| - go readTasks(d)
|
| -
|
| - // Block forever while goroutines do the work.
|
| - select {}
|
| -}
|
|
|