| OLD | NEW |
| 1 /* | 1 /* |
| 2 The Cluster Telemetry Frontend. | 2 The Cluster Telemetry Frontend. |
| 3 */ | 3 */ |
| 4 | 4 |
| 5 package main | 5 package main |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "flag" | 8 "flag" |
| 9 "fmt" | 9 "fmt" |
| 10 "net/http" | 10 "net/http" |
| 11 "path/filepath" | 11 "path/filepath" |
| 12 "runtime" | 12 "runtime" |
| 13 "strconv" | 13 "strconv" |
| 14 "time" | 14 "time" |
| 15 | 15 |
| 16 "github.com/gorilla/mux" | 16 "github.com/gorilla/mux" |
| 17 "github.com/skia-dev/glog" | 17 "github.com/skia-dev/glog" |
| 18 | 18 |
| 19 metrics "github.com/rcrowley/go-metrics" |
| 19 "go.skia.org/infra/ct/go/ctfe/admin_tasks" | 20 "go.skia.org/infra/ct/go/ctfe/admin_tasks" |
| 20 "go.skia.org/infra/ct/go/ctfe/capture_skps" | 21 "go.skia.org/infra/ct/go/ctfe/capture_skps" |
| 21 "go.skia.org/infra/ct/go/ctfe/chromium_builds" | 22 "go.skia.org/infra/ct/go/ctfe/chromium_builds" |
| 22 "go.skia.org/infra/ct/go/ctfe/chromium_perf" | 23 "go.skia.org/infra/ct/go/ctfe/chromium_perf" |
| 23 "go.skia.org/infra/ct/go/ctfe/lua_scripts" | 24 "go.skia.org/infra/ct/go/ctfe/lua_scripts" |
| 24 "go.skia.org/infra/ct/go/ctfe/pending_tasks" | 25 "go.skia.org/infra/ct/go/ctfe/pending_tasks" |
| 25 "go.skia.org/infra/ct/go/ctfe/task_common" | 26 "go.skia.org/infra/ct/go/ctfe/task_common" |
| 27 "go.skia.org/infra/ct/go/ctfe/task_types" |
| 26 ctfeutil "go.skia.org/infra/ct/go/ctfe/util" | 28 ctfeutil "go.skia.org/infra/ct/go/ctfe/util" |
| 27 "go.skia.org/infra/ct/go/db" | 29 "go.skia.org/infra/ct/go/db" |
| 28 ctutil "go.skia.org/infra/ct/go/util" | 30 ctutil "go.skia.org/infra/ct/go/util" |
| 29 "go.skia.org/infra/go/common" | 31 "go.skia.org/infra/go/common" |
| 30 "go.skia.org/infra/go/influxdb" | 32 "go.skia.org/infra/go/influxdb" |
| 31 "go.skia.org/infra/go/login" | 33 "go.skia.org/infra/go/login" |
| 32 "go.skia.org/infra/go/metadata" | 34 "go.skia.org/infra/go/metadata" |
| 33 "go.skia.org/infra/go/skiaversion" | 35 "go.skia.org/infra/go/skiaversion" |
| 34 skutil "go.skia.org/infra/go/util" | 36 skutil "go.skia.org/infra/go/util" |
| 35 "go.skia.org/infra/go/webhook" | 37 "go.skia.org/infra/go/webhook" |
| 36 ) | 38 ) |
| 37 | 39 |
| 38 var ( | 40 var ( |
| 39 dbClient *influxdb.Client = nil | 41 dbClient *influxdb.Client = nil |
| 40 | |
| 41 // Slice of all tasks supported by CTFE. | |
| 42 supportedTasks = []task_common.Task{ | |
| 43 &chromium_perf.DBTask{}, | |
| 44 &capture_skps.DBTask{}, | |
| 45 &lua_scripts.DBTask{}, | |
| 46 &chromium_builds.DBTask{}, | |
| 47 &admin_tasks.RecreatePageSetsDBTask{}, | |
| 48 &admin_tasks.RecreateWebpageArchivesDBTask{}, | |
| 49 } | |
| 50 ) | 42 ) |
| 51 | 43 |
| 52 // flags | 44 // flags |
| 53 var ( | 45 var ( |
| 54 graphiteServer = flag.String("graphite_server", "localhost:2003"
, "Where is Graphite metrics ingestion server running.") | 46 graphiteServer = flag.String("graphite_server", "localhost:2003"
, "Where is Graphite metrics ingestion server running.") |
| 55 host = flag.String("host", "localhost", "HTTP service
host") | 47 host = flag.String("host", "localhost", "HTTP service
host") |
| 56 port = flag.String("port", ":8002", "HTTP service port
(e.g., ':8002')") | 48 port = flag.String("port", ":8002", "HTTP service port
(e.g., ':8002')") |
| 57 local = flag.Bool("local", false, "Running locally if t
rue. As opposed to in production.") | 49 local = flag.Bool("local", false, "Running locally if t
rue. As opposed to in production.") |
| 58 workdir = flag.String("workdir", ".", "Directory to use f
or scratch work.") | 50 workdir = flag.String("workdir", ".", "Directory to use f
or scratch work.") |
| 59 resourcesDir = flag.String("resources_dir", "", "The directory
to find templates, JS, and CSS files. If blank the current directory will be us
ed.") | 51 resourcesDir = flag.String("resources_dir", "", "The directory
to find templates, JS, and CSS files. If blank the current directory will be us
ed.") |
| (...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 113 r.HandleFunc("/json/version", skiaversion.JsonHandler) | 105 r.HandleFunc("/json/version", skiaversion.JsonHandler) |
| 114 r.HandleFunc("/oauth2callback/", login.OAuth2CallbackHandler) | 106 r.HandleFunc("/oauth2callback/", login.OAuth2CallbackHandler) |
| 115 r.HandleFunc("/login/", loginHandler) | 107 r.HandleFunc("/login/", loginHandler) |
| 116 r.HandleFunc("/logout/", login.LogoutHandler) | 108 r.HandleFunc("/logout/", login.LogoutHandler) |
| 117 r.HandleFunc("/loginstatus/", login.StatusHandler) | 109 r.HandleFunc("/loginstatus/", login.StatusHandler) |
| 118 http.Handle("/", skutil.LoggingGzipRequestResponse(r)) | 110 http.Handle("/", skutil.LoggingGzipRequestResponse(r)) |
| 119 glog.Infof("Ready to serve on %s", serverURL) | 111 glog.Infof("Ready to serve on %s", serverURL) |
| 120 glog.Fatal(http.ListenAndServe(*port, nil)) | 112 glog.Fatal(http.ListenAndServe(*port, nil)) |
| 121 } | 113 } |
| 122 | 114 |
| 115 // startCtfeMetrics registers gauges with the graphite server that indicate CT i
s running healthily |
| 116 // and starts a goroutine to update them periodically. |
| 117 func startCtfeMetrics() { |
| 118 pendingTasksGauge := metrics.GetOrRegisterGauge("num-pending-tasks", met
rics.DefaultRegistry) |
| 119 oldestPendingTaskAgeGauge := metrics.GetOrRegisterGaugeFloat64("oldest-p
ending-task-age", metrics.DefaultRegistry) |
| 120 // 0=no tasks pending; 1=started; 2=not started |
| 121 oldestPendingTaskStatusGauge := metrics.GetOrRegisterGauge("oldest-pendi
ng-task-status", metrics.DefaultRegistry) |
| 122 go func() { |
| 123 for _ = range time.Tick(common.SAMPLE_PERIOD) { |
| 124 pendingTaskCount, err := pending_tasks.GetPendingTaskCou
nt() |
| 125 if err != nil { |
| 126 glog.Error(err) |
| 127 } else { |
| 128 pendingTasksGauge.Update(pendingTaskCount) |
| 129 } |
| 130 |
| 131 oldestPendingTask, err := pending_tasks.GetOldestPending
Task() |
| 132 if err != nil { |
| 133 glog.Error(err) |
| 134 } else if oldestPendingTask == nil { |
| 135 oldestPendingTaskAgeGauge.Update(0) |
| 136 oldestPendingTaskStatusGauge.Update(0) |
| 137 } else { |
| 138 addedTime := ctutil.GetTimeFromTs(strconv.Format
Int(oldestPendingTask.GetCommonCols().TsAdded.Int64, 10)) |
| 139 oldestPendingTaskAgeGauge.Update(time.Since(adde
dTime).Seconds()) |
| 140 if oldestPendingTask.GetCommonCols().TsStarted.V
alid { |
| 141 oldestPendingTaskStatusGauge.Update(1) |
| 142 } else { |
| 143 oldestPendingTaskStatusGauge.Update(2) |
| 144 } |
| 145 } |
| 146 } |
| 147 }() |
| 148 } |
| 149 |
| 123 // repeatedTasksScheduler looks for all tasks that contain repeat_after_days | 150 // repeatedTasksScheduler looks for all tasks that contain repeat_after_days |
| 124 // set to > 0 and schedules them when the specified time comes. | 151 // set to > 0 and schedules them when the specified time comes. |
| 125 // The function does the following: | 152 // The function does the following: |
| 126 // 1. Look for tasks that need to be scheduled in the next 5 minutes. | 153 // 1. Look for tasks that need to be scheduled in the next 5 minutes. |
| 127 // 2. Loop over these tasks. | 154 // 2. Loop over these tasks. |
| 128 // 2.1 Schedule the task again and set repeat_after_days to what it | 155 // 2.1 Schedule the task again and set repeat_after_days to what it |
| 129 // originally was. | 156 // originally was. |
| 130 // 2.2 Update the original task and set repeat_after_days to 0 since the | 157 // 2.2 Update the original task and set repeat_after_days to 0 since the |
| 131 // newly created task will now replace it. | 158 // newly created task will now replace it. |
| 132 func repeatedTasksScheduler() { | 159 func repeatedTasksScheduler() { |
| 133 | 160 |
| 134 for _ = range time.Tick(*tasksSchedulerWaitTime) { | 161 for _ = range time.Tick(*tasksSchedulerWaitTime) { |
| 135 // Loop over all tasks to find tasks which need to be scheduled. | 162 // Loop over all tasks to find tasks which need to be scheduled. |
| 136 » » for _, prototype := range supportedTasks { | 163 » » for _, prototype := range task_types.Prototypes() { |
| 137 | 164 |
| 138 query, args := task_common.DBTaskQuery(prototype, | 165 query, args := task_common.DBTaskQuery(prototype, |
| 139 task_common.QueryParams{ | 166 task_common.QueryParams{ |
| 140 FutureRunsOnly: true, | 167 FutureRunsOnly: true, |
| 141 Offset: 0, | 168 Offset: 0, |
| 142 Size: task_common.MAX_PAGE_SIZ
E, | 169 Size: task_common.MAX_PAGE_SIZ
E, |
| 143 }) | 170 }) |
| 144 glog.Infof("Running %s", query) | 171 glog.Infof("Running %s", query) |
| 145 data, err := prototype.Select(query, args...) | 172 data, err := prototype.Select(query, args...) |
| 146 if err != nil { | 173 if err != nil { |
| (...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 228 // Initialize the ctfe database. | 255 // Initialize the ctfe database. |
| 229 if !*local { | 256 if !*local { |
| 230 if err := dbConf.GetPasswordFromMetadata(); err != nil { | 257 if err := dbConf.GetPasswordFromMetadata(); err != nil { |
| 231 glog.Fatal(err) | 258 glog.Fatal(err) |
| 232 } | 259 } |
| 233 } | 260 } |
| 234 if err := dbConf.InitDB(); err != nil { | 261 if err := dbConf.InitDB(); err != nil { |
| 235 glog.Fatal(err) | 262 glog.Fatal(err) |
| 236 } | 263 } |
| 237 | 264 |
| 265 startCtfeMetrics() |
| 266 |
| 238 // Start the repeated tasks scheduler. | 267 // Start the repeated tasks scheduler. |
| 239 go repeatedTasksScheduler() | 268 go repeatedTasksScheduler() |
| 240 | 269 |
| 241 runServer(serverURL) | 270 runServer(serverURL) |
| 242 } | 271 } |
| OLD | NEW |