OLD | NEW |
1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package main | 5 package main |
6 | 6 |
7 import ( | 7 import ( |
8 "fmt" | 8 "fmt" |
9 "time" | 9 "time" |
10 | 10 |
11 "github.com/luci/luci-go/common/clock" | 11 "github.com/luci/luci-go/common/clock" |
12 "github.com/luci/luci-go/common/data/rand/mathrand" | 12 "github.com/luci/luci-go/common/data/rand/mathrand" |
13 "github.com/luci/luci-go/common/errors" | 13 "github.com/luci/luci-go/common/errors" |
14 gcps "github.com/luci/luci-go/common/gcloud/pubsub" | 14 gcps "github.com/luci/luci-go/common/gcloud/pubsub" |
15 log "github.com/luci/luci-go/common/logging" | 15 log "github.com/luci/luci-go/common/logging" |
16 "github.com/luci/luci-go/common/proto/google" | 16 "github.com/luci/luci-go/common/proto/google" |
17 "github.com/luci/luci-go/common/retry" | 17 "github.com/luci/luci-go/common/retry" |
| 18 "github.com/luci/luci-go/common/retry/transient" |
18 "github.com/luci/luci-go/common/tsmon/distribution" | 19 "github.com/luci/luci-go/common/tsmon/distribution" |
19 "github.com/luci/luci-go/common/tsmon/field" | 20 "github.com/luci/luci-go/common/tsmon/field" |
20 "github.com/luci/luci-go/common/tsmon/metric" | 21 "github.com/luci/luci-go/common/tsmon/metric" |
21 "github.com/luci/luci-go/common/tsmon/types" | 22 "github.com/luci/luci-go/common/tsmon/types" |
22 "github.com/luci/luci-go/grpc/grpcutil" | 23 "github.com/luci/luci-go/grpc/grpcutil" |
23 "github.com/luci/luci-go/hardcoded/chromeinfra" | 24 "github.com/luci/luci-go/hardcoded/chromeinfra" |
24 "github.com/luci/luci-go/logdog/server/collector" | 25 "github.com/luci/luci-go/logdog/server/collector" |
25 "github.com/luci/luci-go/logdog/server/collector/coordinator" | 26 "github.com/luci/luci-go/logdog/server/collector/coordinator" |
26 "github.com/luci/luci-go/logdog/server/service" | 27 "github.com/luci/luci-go/logdog/server/service" |
27 | 28 |
(...skipping 115 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
143 return &retry.ExponentialBackoff{ | 144 return &retry.ExponentialBackoff{ |
144 Limited: retry.Limited{ | 145 Limited: retry.Limited{ |
145 Delay: 200 * time.Millisecond, | 146 Delay: 200 * time.Millisecond, |
146 Retries: -1, // Unlimited. | 147 Retries: -1, // Unlimited. |
147 }, | 148 }, |
148 MaxDelay: 10 * time.Second, | 149 MaxDelay: 10 * time.Second, |
149 Multiplier: 2, | 150 Multiplier: 2, |
150 } | 151 } |
151 } | 152 } |
152 | 153 |
153 » err = retry.Retry(c, retry.TransientOnly(retryForever), func() error { | 154 » err = retry.Retry(c, transient.Only(retryForever), func() error { |
154 return grpcutil.WrapIfTransient(psSub.Receive(c, func(c context.
Context, msg *pubsub.Message) { | 155 return grpcutil.WrapIfTransient(psSub.Receive(c, func(c context.
Context, msg *pubsub.Message) { |
155 c = log.SetField(c, "messageID", msg.ID) | 156 c = log.SetField(c, "messageID", msg.ID) |
156 if a.processMessage(c, &coll, msg) { | 157 if a.processMessage(c, &coll, msg) { |
157 // ACK the message, removing it from Pub/Sub. | 158 // ACK the message, removing it from Pub/Sub. |
158 msg.Ack() | 159 msg.Ack() |
159 } else { | 160 } else { |
160 // NACK the message. It will be redelivered and
processed. | 161 // NACK the message. It will be redelivered and
processed. |
161 msg.Nack() | 162 msg.Nack() |
162 } | 163 } |
163 })) | 164 })) |
(...skipping 20 matching lines...) Expand all Loading... |
184 }.Infof(c, "Received Pub/Sub Message.") | 185 }.Infof(c, "Received Pub/Sub Message.") |
185 | 186 |
186 startTime := clock.Now(c) | 187 startTime := clock.Now(c) |
187 err := coll.Process(c, msg.Data) | 188 err := coll.Process(c, msg.Data) |
188 duration := clock.Now(c).Sub(startTime) | 189 duration := clock.Now(c).Sub(startTime) |
189 | 190 |
190 // We track processing time in milliseconds. | 191 // We track processing time in milliseconds. |
191 tsTaskProcessingTime.Add(c, duration.Seconds()*1000) | 192 tsTaskProcessingTime.Add(c, duration.Seconds()*1000) |
192 | 193 |
193 switch { | 194 switch { |
194 » case errors.IsTransient(err): | 195 » case transient.Tag.In(err): |
195 // Do not consume | 196 // Do not consume |
196 log.Fields{ | 197 log.Fields{ |
197 log.ErrorKey: err, | 198 log.ErrorKey: err, |
198 "duration": duration, | 199 "duration": duration, |
199 }.Warningf(c, "TRANSIENT error ingesting Pub/Sub message.") | 200 }.Warningf(c, "TRANSIENT error ingesting Pub/Sub message.") |
200 tsPubsubCount.Add(c, 1, "transient_failure") | 201 tsPubsubCount.Add(c, 1, "transient_failure") |
201 return false | 202 return false |
202 | 203 |
203 case err == nil: | 204 case err == nil: |
204 log.Fields{ | 205 log.Fields{ |
(...skipping 18 matching lines...) Expand all Loading... |
223 func main() { | 224 func main() { |
224 mathrand.SeedRandomly() | 225 mathrand.SeedRandomly() |
225 a := application{ | 226 a := application{ |
226 Service: service.Service{ | 227 Service: service.Service{ |
227 Name: "collector", | 228 Name: "collector", |
228 DefaultAuthOptions: chromeinfra.DefaultAuthOptions(), | 229 DefaultAuthOptions: chromeinfra.DefaultAuthOptions(), |
229 }, | 230 }, |
230 } | 231 } |
231 a.Run(context.Background(), a.runCollector) | 232 a.Run(context.Background(), a.runCollector) |
232 } | 233 } |
OLD | NEW |