OLD | NEW |
---|---|
1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package main | 5 package main |
6 | 6 |
7 import ( | 7 import ( |
8 "fmt" | 8 "fmt" |
9 "time" | 9 "time" |
10 | 10 |
11 "github.com/luci/luci-go/common/auth" | 11 "github.com/luci/luci-go/common/auth" |
12 "github.com/luci/luci-go/common/clock" | 12 "github.com/luci/luci-go/common/clock" |
13 "github.com/luci/luci-go/common/errors" | 13 "github.com/luci/luci-go/common/errors" |
14 gcps "github.com/luci/luci-go/common/gcloud/pubsub" | 14 gcps "github.com/luci/luci-go/common/gcloud/pubsub" |
15 log "github.com/luci/luci-go/common/logging" | 15 log "github.com/luci/luci-go/common/logging" |
16 "github.com/luci/luci-go/common/parallel" | 16 "github.com/luci/luci-go/common/parallel" |
17 "github.com/luci/luci-go/common/tsmon/distribution" | 17 "github.com/luci/luci-go/common/tsmon/distribution" |
18 "github.com/luci/luci-go/common/tsmon/field" | 18 "github.com/luci/luci-go/common/tsmon/field" |
19 "github.com/luci/luci-go/common/tsmon/metric" | 19 "github.com/luci/luci-go/common/tsmon/metric" |
20 "github.com/luci/luci-go/common/tsmon/types" | |
20 "github.com/luci/luci-go/server/internal/logdog/collector" | 21 "github.com/luci/luci-go/server/internal/logdog/collector" |
21 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator" | 22 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator" |
22 "github.com/luci/luci-go/server/internal/logdog/service" | 23 "github.com/luci/luci-go/server/internal/logdog/service" |
23 "golang.org/x/net/context" | 24 "golang.org/x/net/context" |
24 "google.golang.org/cloud" | 25 "google.golang.org/cloud" |
25 "google.golang.org/cloud/pubsub" | 26 "google.golang.org/cloud/pubsub" |
26 ) | 27 ) |
27 | 28 |
28 var ( | 29 var ( |
29 errInvalidConfig = errors.New("invalid configuration") | 30 errInvalidConfig = errors.New("invalid configuration") |
30 ) | 31 ) |
31 | 32 |
32 const ( | 33 const ( |
33 pubsubPullErrorDelay = 10 * time.Second | 34 pubsubPullErrorDelay = 10 * time.Second |
34 ) | 35 ) |
35 | 36 |
36 // Metrics. | 37 // Metrics. |
37 var ( | 38 var ( |
38 // tsPubsubCount counts the number of Pub/Sub messages processed by the | 39 // tsPubsubCount counts the number of Pub/Sub messages processed by the |
39 // Archivist. | 40 // Archivist. |
40 // | 41 // |
41 // Result tracks the outcome of each message, either "success", "failure ", or | 42 // Result tracks the outcome of each message, either "success", "failure ", or |
42 // "transient_failure". | 43 // "transient_failure". |
43 tsPubsubCount = metric.NewCounter("logdog/collector/subscription/count", | 44 tsPubsubCount = metric.NewCounter("logdog/collector/subscription/count", |
44 "The number of Pub/Sub messages pulled.", | 45 "The number of Pub/Sub messages pulled.", |
46 types.MetricMetadata{}, | |
45 field.String("result")) | 47 field.String("result")) |
46 | 48 |
47 // tsTaskProcessingTime tracks the amount of time a single subscription | 49 // tsTaskProcessingTime tracks the amount of time a single subscription |
48 // message takes to process, in milliseconds. | 50 // message takes to process, in milliseconds. |
49 tsTaskProcessingTime = metric.NewCumulativeDistribution("logdog/collecto r/subscription/processing_time_ms", | 51 tsTaskProcessingTime = metric.NewCumulativeDistribution("logdog/collecto r/subscription/processing_time_ms", |
50 "Amount of time in milliseconds that a single Pub/Sub message ta kes to process.", | 52 "Amount of time in milliseconds that a single Pub/Sub message ta kes to process.", |
53 types.MetricMetadata{}, | |
Sergey Berezin
2016/07/06 21:39:12
Should this be Units: types.Milliseconds ?
ddoman
2016/07/07 04:47:57
Good catch!
Fixed.
| |
51 distribution.DefaultBucketer) | 54 distribution.DefaultBucketer) |
52 ) | 55 ) |
53 | 56 |
54 // application is the Collector application state. | 57 // application is the Collector application state. |
55 type application struct { | 58 type application struct { |
56 service.Service | 59 service.Service |
57 } | 60 } |
58 | 61 |
59 // run is the main execution function. | 62 // run is the main execution function. |
60 func (a *application) runCollector(c context.Context) error { | 63 func (a *application) runCollector(c context.Context) error { |
(...skipping 159 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
220 | 223 |
221 // Entry point. | 224 // Entry point. |
222 func main() { | 225 func main() { |
223 a := application{ | 226 a := application{ |
224 Service: service.Service{ | 227 Service: service.Service{ |
225 Name: "collector", | 228 Name: "collector", |
226 }, | 229 }, |
227 } | 230 } |
228 a.Run(context.Background(), a.runCollector) | 231 a.Run(context.Background(), a.runCollector) |
229 } | 232 } |
OLD | NEW |