Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(169)

Side by Side Diff: ct/go/ctfe/main.go

Issue 1307333002: Add alerting for CTFE V2. (Closed) Base URL: https://skia.googlesource.com/buildbot@master
Patch Set: Rebase. Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « alertserver/alerts.cfg ('k') | ct/go/ctfe/pending_tasks/pending_tasks.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 /* 1 /*
2 The Cluster Telemetry Frontend. 2 The Cluster Telemetry Frontend.
3 */ 3 */
4 4
5 package main 5 package main
6 6
7 import ( 7 import (
8 "flag" 8 "flag"
9 "fmt" 9 "fmt"
10 "net/http" 10 "net/http"
11 "path/filepath" 11 "path/filepath"
12 "runtime" 12 "runtime"
13 "strconv" 13 "strconv"
14 "time" 14 "time"
15 15
16 "github.com/gorilla/mux" 16 "github.com/gorilla/mux"
17 "github.com/skia-dev/glog" 17 "github.com/skia-dev/glog"
18 18
19 metrics "github.com/rcrowley/go-metrics"
19 "go.skia.org/infra/ct/go/ctfe/admin_tasks" 20 "go.skia.org/infra/ct/go/ctfe/admin_tasks"
20 "go.skia.org/infra/ct/go/ctfe/capture_skps" 21 "go.skia.org/infra/ct/go/ctfe/capture_skps"
21 "go.skia.org/infra/ct/go/ctfe/chromium_builds" 22 "go.skia.org/infra/ct/go/ctfe/chromium_builds"
22 "go.skia.org/infra/ct/go/ctfe/chromium_perf" 23 "go.skia.org/infra/ct/go/ctfe/chromium_perf"
23 "go.skia.org/infra/ct/go/ctfe/lua_scripts" 24 "go.skia.org/infra/ct/go/ctfe/lua_scripts"
24 "go.skia.org/infra/ct/go/ctfe/pending_tasks" 25 "go.skia.org/infra/ct/go/ctfe/pending_tasks"
25 "go.skia.org/infra/ct/go/ctfe/task_common" 26 "go.skia.org/infra/ct/go/ctfe/task_common"
27 "go.skia.org/infra/ct/go/ctfe/task_types"
26 ctfeutil "go.skia.org/infra/ct/go/ctfe/util" 28 ctfeutil "go.skia.org/infra/ct/go/ctfe/util"
27 "go.skia.org/infra/ct/go/db" 29 "go.skia.org/infra/ct/go/db"
28 ctutil "go.skia.org/infra/ct/go/util" 30 ctutil "go.skia.org/infra/ct/go/util"
29 "go.skia.org/infra/go/common" 31 "go.skia.org/infra/go/common"
30 "go.skia.org/infra/go/influxdb" 32 "go.skia.org/infra/go/influxdb"
31 "go.skia.org/infra/go/login" 33 "go.skia.org/infra/go/login"
32 "go.skia.org/infra/go/metadata" 34 "go.skia.org/infra/go/metadata"
33 "go.skia.org/infra/go/skiaversion" 35 "go.skia.org/infra/go/skiaversion"
34 skutil "go.skia.org/infra/go/util" 36 skutil "go.skia.org/infra/go/util"
35 "go.skia.org/infra/go/webhook" 37 "go.skia.org/infra/go/webhook"
36 ) 38 )
37 39
38 var ( 40 var (
39 dbClient *influxdb.Client = nil 41 dbClient *influxdb.Client = nil
40
41 // Slice of all tasks supported by CTFE.
42 supportedTasks = []task_common.Task{
43 &chromium_perf.DBTask{},
44 &capture_skps.DBTask{},
45 &lua_scripts.DBTask{},
46 &chromium_builds.DBTask{},
47 &admin_tasks.RecreatePageSetsDBTask{},
48 &admin_tasks.RecreateWebpageArchivesDBTask{},
49 }
50 ) 42 )
51 43
52 // flags 44 // flags
53 var ( 45 var (
54 graphiteServer = flag.String("graphite_server", "localhost:2003" , "Where is Graphite metrics ingestion server running.") 46 graphiteServer = flag.String("graphite_server", "localhost:2003" , "Where is Graphite metrics ingestion server running.")
55 host = flag.String("host", "localhost", "HTTP service host") 47 host = flag.String("host", "localhost", "HTTP service host")
56 port = flag.String("port", ":8002", "HTTP service port (e.g., ':8002')") 48 port = flag.String("port", ":8002", "HTTP service port (e.g., ':8002')")
57 local = flag.Bool("local", false, "Running locally if t rue. As opposed to in production.") 49 local = flag.Bool("local", false, "Running locally if t rue. As opposed to in production.")
58 workdir = flag.String("workdir", ".", "Directory to use f or scratch work.") 50 workdir = flag.String("workdir", ".", "Directory to use f or scratch work.")
59 resourcesDir = flag.String("resources_dir", "", "The directory to find templates, JS, and CSS files. If blank the current directory will be us ed.") 51 resourcesDir = flag.String("resources_dir", "", "The directory to find templates, JS, and CSS files. If blank the current directory will be us ed.")
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after
113 r.HandleFunc("/json/version", skiaversion.JsonHandler) 105 r.HandleFunc("/json/version", skiaversion.JsonHandler)
114 r.HandleFunc("/oauth2callback/", login.OAuth2CallbackHandler) 106 r.HandleFunc("/oauth2callback/", login.OAuth2CallbackHandler)
115 r.HandleFunc("/login/", loginHandler) 107 r.HandleFunc("/login/", loginHandler)
116 r.HandleFunc("/logout/", login.LogoutHandler) 108 r.HandleFunc("/logout/", login.LogoutHandler)
117 r.HandleFunc("/loginstatus/", login.StatusHandler) 109 r.HandleFunc("/loginstatus/", login.StatusHandler)
118 http.Handle("/", skutil.LoggingGzipRequestResponse(r)) 110 http.Handle("/", skutil.LoggingGzipRequestResponse(r))
119 glog.Infof("Ready to serve on %s", serverURL) 111 glog.Infof("Ready to serve on %s", serverURL)
120 glog.Fatal(http.ListenAndServe(*port, nil)) 112 glog.Fatal(http.ListenAndServe(*port, nil))
121 } 113 }
122 114
115 // startCtfeMetrics registers gauges with the graphite server that indicate CT i s running healthily
116 // and starts a goroutine to update them periodically.
117 func startCtfeMetrics() {
118 pendingTasksGauge := metrics.GetOrRegisterGauge("num-pending-tasks", met rics.DefaultRegistry)
119 oldestPendingTaskAgeGauge := metrics.GetOrRegisterGaugeFloat64("oldest-p ending-task-age", metrics.DefaultRegistry)
120 // 0=no tasks pending; 1=started; 2=not started
121 oldestPendingTaskStatusGauge := metrics.GetOrRegisterGauge("oldest-pendi ng-task-status", metrics.DefaultRegistry)
122 go func() {
123 for _ = range time.Tick(common.SAMPLE_PERIOD) {
124 pendingTaskCount, err := pending_tasks.GetPendingTaskCou nt()
125 if err != nil {
126 glog.Error(err)
127 } else {
128 pendingTasksGauge.Update(pendingTaskCount)
129 }
130
131 oldestPendingTask, err := pending_tasks.GetOldestPending Task()
132 if err != nil {
133 glog.Error(err)
134 } else if oldestPendingTask == nil {
135 oldestPendingTaskAgeGauge.Update(0)
136 oldestPendingTaskStatusGauge.Update(0)
137 } else {
138 addedTime := ctutil.GetTimeFromTs(strconv.Format Int(oldestPendingTask.GetCommonCols().TsAdded.Int64, 10))
139 oldestPendingTaskAgeGauge.Update(time.Since(adde dTime).Seconds())
140 if oldestPendingTask.GetCommonCols().TsStarted.V alid {
141 oldestPendingTaskStatusGauge.Update(1)
142 } else {
143 oldestPendingTaskStatusGauge.Update(2)
144 }
145 }
146 }
147 }()
148 }
149
123 // repeatedTasksScheduler looks for all tasks that contain repeat_after_days 150 // repeatedTasksScheduler looks for all tasks that contain repeat_after_days
124 // set to > 0 and schedules them when the specified time comes. 151 // set to > 0 and schedules them when the specified time comes.
125 // The function does the following: 152 // The function does the following:
126 // 1. Look for tasks that need to be scheduled in the next 5 minutes. 153 // 1. Look for tasks that need to be scheduled in the next 5 minutes.
127 // 2. Loop over these tasks. 154 // 2. Loop over these tasks.
128 // 2.1 Schedule the task again and set repeat_after_days to what it 155 // 2.1 Schedule the task again and set repeat_after_days to what it
129 // originally was. 156 // originally was.
130 // 2.2 Update the original task and set repeat_after_days to 0 since the 157 // 2.2 Update the original task and set repeat_after_days to 0 since the
131 // newly created task will now replace it. 158 // newly created task will now replace it.
132 func repeatedTasksScheduler() { 159 func repeatedTasksScheduler() {
133 160
134 for _ = range time.Tick(*tasksSchedulerWaitTime) { 161 for _ = range time.Tick(*tasksSchedulerWaitTime) {
135 // Loop over all tasks to find tasks which need to be scheduled. 162 // Loop over all tasks to find tasks which need to be scheduled.
136 » » for _, prototype := range supportedTasks { 163 » » for _, prototype := range task_types.Prototypes() {
137 164
138 query, args := task_common.DBTaskQuery(prototype, 165 query, args := task_common.DBTaskQuery(prototype,
139 task_common.QueryParams{ 166 task_common.QueryParams{
140 FutureRunsOnly: true, 167 FutureRunsOnly: true,
141 Offset: 0, 168 Offset: 0,
142 Size: task_common.MAX_PAGE_SIZ E, 169 Size: task_common.MAX_PAGE_SIZ E,
143 }) 170 })
144 glog.Infof("Running %s", query) 171 glog.Infof("Running %s", query)
145 data, err := prototype.Select(query, args...) 172 data, err := prototype.Select(query, args...)
146 if err != nil { 173 if err != nil {
(...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after
228 // Initialize the ctfe database. 255 // Initialize the ctfe database.
229 if !*local { 256 if !*local {
230 if err := dbConf.GetPasswordFromMetadata(); err != nil { 257 if err := dbConf.GetPasswordFromMetadata(); err != nil {
231 glog.Fatal(err) 258 glog.Fatal(err)
232 } 259 }
233 } 260 }
234 if err := dbConf.InitDB(); err != nil { 261 if err := dbConf.InitDB(); err != nil {
235 glog.Fatal(err) 262 glog.Fatal(err)
236 } 263 }
237 264
265 startCtfeMetrics()
266
238 // Start the repeated tasks scheduler. 267 // Start the repeated tasks scheduler.
239 go repeatedTasksScheduler() 268 go repeatedTasksScheduler()
240 269
241 runServer(serverURL) 270 runServer(serverURL)
242 } 271 }
OLDNEW
« no previous file with comments | « alertserver/alerts.cfg ('k') | ct/go/ctfe/pending_tasks/pending_tasks.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698