Chromium Code Reviews| Index: ct/go/ctfe/main.go |
| diff --git a/ct/go/ctfe/main.go b/ct/go/ctfe/main.go |
| index f6f620965b6df7d15ed56d0d293ff2570338f31e..1d4709ee1444fea27084e5edd4aeb7bce56e5fe3 100644 |
| --- a/ct/go/ctfe/main.go |
| +++ b/ct/go/ctfe/main.go |
| @@ -16,6 +16,7 @@ import ( |
| "github.com/gorilla/mux" |
| "github.com/skia-dev/glog" |
| + metrics "github.com/rcrowley/go-metrics" |
| "go.skia.org/infra/ct/go/ctfe/admin_tasks" |
| "go.skia.org/infra/ct/go/ctfe/capture_skps" |
| "go.skia.org/infra/ct/go/ctfe/chromium_builds" |
| @@ -23,6 +24,7 @@ import ( |
| "go.skia.org/infra/ct/go/ctfe/lua_scripts" |
| "go.skia.org/infra/ct/go/ctfe/pending_tasks" |
| "go.skia.org/infra/ct/go/ctfe/task_common" |
| + "go.skia.org/infra/ct/go/ctfe/task_types" |
| ctfeutil "go.skia.org/infra/ct/go/ctfe/util" |
| "go.skia.org/infra/ct/go/db" |
| ctutil "go.skia.org/infra/ct/go/util" |
| @@ -37,16 +39,6 @@ import ( |
| var ( |
| dbClient *influxdb.Client = nil |
| - |
| - // Slice of all tasks supported by CTFE. |
| - supportedTasks = []task_common.Task{ |
| - &chromium_perf.DBTask{}, |
| - &capture_skps.DBTask{}, |
| - &lua_scripts.DBTask{}, |
| - &chromium_builds.DBTask{}, |
| - &admin_tasks.RecreatePageSetsDBTask{}, |
| - &admin_tasks.RecreateWebpageArchivesDBTask{}, |
| - } |
| ) |
| // flags |
| @@ -120,6 +112,45 @@ func runServer(serverURL string) { |
| glog.Fatal(http.ListenAndServe(*port, nil)) |
| } |
| +// startCtfeMetrics registers gauges with the graphite server that indicate CT is running healthily |
| +// and starts a goroutine to update them periodically. |
| +func startCtfeMetrics() { |
| + pendingTasksGauge := metrics.GetOrRegisterGauge("num-pending-tasks", |
|
jcgregorio
2015/08/26 14:40:44
Don't wrap long lines. Here and below.
dogben
2015/08/26 15:24:01
Done.
|
| + metrics.DefaultRegistry) |
| + oldestPendingTaskAgeGauge := metrics.GetOrRegisterGaugeFloat64("oldest-pending-task-age", |
| + metrics.DefaultRegistry) |
| + // 0=no tasks pending; 1=started; 2=not started |
| + oldestPendingTaskStatusGauge := metrics.GetOrRegisterGauge("oldest-pending-task-status", |
| + metrics.DefaultRegistry) |
| + go func() { |
| + for _ = range time.Tick(common.SAMPLE_PERIOD) { |
| + pendingTaskCount, err := pending_tasks.GetPendingTaskCount() |
| + if err != nil { |
| + glog.Error(err) |
| + } else { |
| + pendingTasksGauge.Update(pendingTaskCount) |
| + } |
| + |
| + oldestPendingTask, err := pending_tasks.GetOldestPendingTask() |
| + if err != nil { |
| + glog.Error(err) |
| + } else if oldestPendingTask == nil { |
| + oldestPendingTaskAgeGauge.Update(0) |
| + oldestPendingTaskStatusGauge.Update(0) |
| + } else { |
| + addedTime := ctutil.GetTimeFromTs(strconv.FormatInt( |
| + oldestPendingTask.GetCommonCols().TsAdded.Int64, 10)) |
| + oldestPendingTaskAgeGauge.Update(time.Since(addedTime).Seconds()) |
| + if oldestPendingTask.GetCommonCols().TsStarted.Valid { |
| + oldestPendingTaskStatusGauge.Update(1) |
| + } else { |
| + oldestPendingTaskStatusGauge.Update(2) |
| + } |
| + } |
| + } |
| + }() |
| +} |
| + |
| // repeatedTasksScheduler looks for all tasks that contain repeat_after_days |
| // set to > 0 and schedules them when the specified time comes. |
| // The function does the following: |
| @@ -133,7 +164,7 @@ func repeatedTasksScheduler() { |
| for _ = range time.Tick(*tasksSchedulerWaitTime) { |
| // Loop over all tasks to find tasks which need to be scheduled. |
| - for _, prototype := range supportedTasks { |
| + for _, prototype := range task_types.Prototypes() { |
| query, args := task_common.DBTaskQuery(prototype, |
| task_common.QueryParams{ |
| @@ -235,6 +266,8 @@ func main() { |
| glog.Fatal(err) |
| } |
| + startCtfeMetrics() |
| + |
| // Start the repeated tasks scheduler. |
| go repeatedTasksScheduler() |