| Index: ct/go/ctfe/main.go
|
| diff --git a/ct/go/ctfe/main.go b/ct/go/ctfe/main.go
|
| index f6f620965b6df7d15ed56d0d293ff2570338f31e..651f77f84ccbdd5773a6e93829dcb7072e3eee61 100644
|
| --- a/ct/go/ctfe/main.go
|
| +++ b/ct/go/ctfe/main.go
|
| @@ -16,6 +16,7 @@ import (
|
| "github.com/gorilla/mux"
|
| "github.com/skia-dev/glog"
|
|
|
| + metrics "github.com/rcrowley/go-metrics"
|
| "go.skia.org/infra/ct/go/ctfe/admin_tasks"
|
| "go.skia.org/infra/ct/go/ctfe/capture_skps"
|
| "go.skia.org/infra/ct/go/ctfe/chromium_builds"
|
| @@ -23,6 +24,7 @@ import (
|
| "go.skia.org/infra/ct/go/ctfe/lua_scripts"
|
| "go.skia.org/infra/ct/go/ctfe/pending_tasks"
|
| "go.skia.org/infra/ct/go/ctfe/task_common"
|
| + "go.skia.org/infra/ct/go/ctfe/task_types"
|
| ctfeutil "go.skia.org/infra/ct/go/ctfe/util"
|
| "go.skia.org/infra/ct/go/db"
|
| ctutil "go.skia.org/infra/ct/go/util"
|
| @@ -37,16 +39,6 @@ import (
|
|
|
| var (
|
| dbClient *influxdb.Client = nil
|
| -
|
| - // Slice of all tasks supported by CTFE.
|
| - supportedTasks = []task_common.Task{
|
| - &chromium_perf.DBTask{},
|
| - &capture_skps.DBTask{},
|
| - &lua_scripts.DBTask{},
|
| - &chromium_builds.DBTask{},
|
| - &admin_tasks.RecreatePageSetsDBTask{},
|
| - &admin_tasks.RecreateWebpageArchivesDBTask{},
|
| - }
|
| )
|
|
|
| // flags
|
| @@ -120,6 +112,41 @@ func runServer(serverURL string) {
|
| glog.Fatal(http.ListenAndServe(*port, nil))
|
| }
|
|
|
| +// startCtfeMetrics registers gauges with the graphite server that indicate CT is running healthily
|
| +// and starts a goroutine to update them periodically.
|
| +func startCtfeMetrics() {
|
| + pendingTasksGauge := metrics.GetOrRegisterGauge("num-pending-tasks", metrics.DefaultRegistry)
|
| + oldestPendingTaskAgeGauge := metrics.GetOrRegisterGaugeFloat64("oldest-pending-task-age", metrics.DefaultRegistry)
|
| + // 0=no tasks pending; 1=started; 2=not started
|
| + oldestPendingTaskStatusGauge := metrics.GetOrRegisterGauge("oldest-pending-task-status", metrics.DefaultRegistry)
|
| + go func() {
|
| + for _ = range time.Tick(common.SAMPLE_PERIOD) {
|
| + pendingTaskCount, err := pending_tasks.GetPendingTaskCount()
|
| + if err != nil {
|
| + glog.Error(err)
|
| + } else {
|
| + pendingTasksGauge.Update(pendingTaskCount)
|
| + }
|
| +
|
| + oldestPendingTask, err := pending_tasks.GetOldestPendingTask()
|
| + if err != nil {
|
| + glog.Error(err)
|
| + } else if oldestPendingTask == nil {
|
| + oldestPendingTaskAgeGauge.Update(0)
|
| + oldestPendingTaskStatusGauge.Update(0)
|
| + } else {
|
| + addedTime := ctutil.GetTimeFromTs(strconv.FormatInt(oldestPendingTask.GetCommonCols().TsAdded.Int64, 10))
|
| + oldestPendingTaskAgeGauge.Update(time.Since(addedTime).Seconds())
|
| + if oldestPendingTask.GetCommonCols().TsStarted.Valid {
|
| + oldestPendingTaskStatusGauge.Update(1)
|
| + } else {
|
| + oldestPendingTaskStatusGauge.Update(2)
|
| + }
|
| + }
|
| + }
|
| + }()
|
| +}
|
| +
|
| // repeatedTasksScheduler looks for all tasks that contain repeat_after_days
|
| // set to > 0 and schedules them when the specified time comes.
|
| // The function does the following:
|
| @@ -133,7 +160,7 @@ func repeatedTasksScheduler() {
|
|
|
| for _ = range time.Tick(*tasksSchedulerWaitTime) {
|
| // Loop over all tasks to find tasks which need to be scheduled.
|
| - for _, prototype := range supportedTasks {
|
| + for _, prototype := range task_types.Prototypes() {
|
|
|
| query, args := task_common.DBTaskQuery(prototype,
|
| task_common.QueryParams{
|
| @@ -235,6 +262,8 @@ func main() {
|
| glog.Fatal(err)
|
| }
|
|
|
| + startCtfeMetrics()
|
| +
|
| // Start the repeated tasks scheduler.
|
| go repeatedTasksScheduler()
|
|
|
|
|