Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(444)

Side by Side Diff: logdog/server/cmd/logdog_collector/main.go

Issue 2951393002: [errors] de-specialize Transient in favor of Tags. (Closed)
Patch Set: more refactor Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The LUCI Authors. All rights reserved. 1 // Copyright 2016 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package main 5 package main
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 "time" 9 "time"
10 10
11 "github.com/luci/luci-go/common/clock" 11 "github.com/luci/luci-go/common/clock"
12 "github.com/luci/luci-go/common/data/rand/mathrand" 12 "github.com/luci/luci-go/common/data/rand/mathrand"
13 "github.com/luci/luci-go/common/errors" 13 "github.com/luci/luci-go/common/errors"
14 gcps "github.com/luci/luci-go/common/gcloud/pubsub" 14 gcps "github.com/luci/luci-go/common/gcloud/pubsub"
15 log "github.com/luci/luci-go/common/logging" 15 log "github.com/luci/luci-go/common/logging"
16 "github.com/luci/luci-go/common/proto/google" 16 "github.com/luci/luci-go/common/proto/google"
17 "github.com/luci/luci-go/common/retry" 17 "github.com/luci/luci-go/common/retry"
18 "github.com/luci/luci-go/common/retry/transient"
18 "github.com/luci/luci-go/common/tsmon/distribution" 19 "github.com/luci/luci-go/common/tsmon/distribution"
19 "github.com/luci/luci-go/common/tsmon/field" 20 "github.com/luci/luci-go/common/tsmon/field"
20 "github.com/luci/luci-go/common/tsmon/metric" 21 "github.com/luci/luci-go/common/tsmon/metric"
21 "github.com/luci/luci-go/common/tsmon/types" 22 "github.com/luci/luci-go/common/tsmon/types"
22 "github.com/luci/luci-go/grpc/grpcutil" 23 "github.com/luci/luci-go/grpc/grpcutil"
23 "github.com/luci/luci-go/hardcoded/chromeinfra" 24 "github.com/luci/luci-go/hardcoded/chromeinfra"
24 "github.com/luci/luci-go/logdog/server/collector" 25 "github.com/luci/luci-go/logdog/server/collector"
25 "github.com/luci/luci-go/logdog/server/collector/coordinator" 26 "github.com/luci/luci-go/logdog/server/collector/coordinator"
26 "github.com/luci/luci-go/logdog/server/service" 27 "github.com/luci/luci-go/logdog/server/service"
27 28
(...skipping 115 matching lines...) Expand 10 before | Expand all | Expand 10 after
143 return &retry.ExponentialBackoff{ 144 return &retry.ExponentialBackoff{
144 Limited: retry.Limited{ 145 Limited: retry.Limited{
145 Delay: 200 * time.Millisecond, 146 Delay: 200 * time.Millisecond,
146 Retries: -1, // Unlimited. 147 Retries: -1, // Unlimited.
147 }, 148 },
148 MaxDelay: 10 * time.Second, 149 MaxDelay: 10 * time.Second,
149 Multiplier: 2, 150 Multiplier: 2,
150 } 151 }
151 } 152 }
152 153
153 » err = retry.Retry(c, retry.TransientOnly(retryForever), func() error { 154 » err = retry.Retry(c, transient.Only(retryForever), func() error {
154 return grpcutil.WrapIfTransient(psSub.Receive(c, func(c context. Context, msg *pubsub.Message) { 155 return grpcutil.WrapIfTransient(psSub.Receive(c, func(c context. Context, msg *pubsub.Message) {
155 c = log.SetField(c, "messageID", msg.ID) 156 c = log.SetField(c, "messageID", msg.ID)
156 if a.processMessage(c, &coll, msg) { 157 if a.processMessage(c, &coll, msg) {
157 // ACK the message, removing it from Pub/Sub. 158 // ACK the message, removing it from Pub/Sub.
158 msg.Ack() 159 msg.Ack()
159 } else { 160 } else {
160 // NACK the message. It will be redelivered and processed. 161 // NACK the message. It will be redelivered and processed.
161 msg.Nack() 162 msg.Nack()
162 } 163 }
163 })) 164 }))
(...skipping 20 matching lines...) Expand all
184 }.Infof(c, "Received Pub/Sub Message.") 185 }.Infof(c, "Received Pub/Sub Message.")
185 186
186 startTime := clock.Now(c) 187 startTime := clock.Now(c)
187 err := coll.Process(c, msg.Data) 188 err := coll.Process(c, msg.Data)
188 duration := clock.Now(c).Sub(startTime) 189 duration := clock.Now(c).Sub(startTime)
189 190
190 // We track processing time in milliseconds. 191 // We track processing time in milliseconds.
191 tsTaskProcessingTime.Add(c, duration.Seconds()*1000) 192 tsTaskProcessingTime.Add(c, duration.Seconds()*1000)
192 193
193 switch { 194 switch {
194 » case errors.IsTransient(err): 195 » case transient.Tag.In(err):
195 // Do not consume 196 // Do not consume
196 log.Fields{ 197 log.Fields{
197 log.ErrorKey: err, 198 log.ErrorKey: err,
198 "duration": duration, 199 "duration": duration,
199 }.Warningf(c, "TRANSIENT error ingesting Pub/Sub message.") 200 }.Warningf(c, "TRANSIENT error ingesting Pub/Sub message.")
200 tsPubsubCount.Add(c, 1, "transient_failure") 201 tsPubsubCount.Add(c, 1, "transient_failure")
201 return false 202 return false
202 203
203 case err == nil: 204 case err == nil:
204 log.Fields{ 205 log.Fields{
(...skipping 18 matching lines...) Expand all
223 func main() { 224 func main() {
224 mathrand.SeedRandomly() 225 mathrand.SeedRandomly()
225 a := application{ 226 a := application{
226 Service: service.Service{ 227 Service: service.Service{
227 Name: "collector", 228 Name: "collector",
228 DefaultAuthOptions: chromeinfra.DefaultAuthOptions(), 229 DefaultAuthOptions: chromeinfra.DefaultAuthOptions(),
229 }, 230 },
230 } 231 }
231 a.Run(context.Background(), a.runCollector) 232 a.Run(context.Background(), a.runCollector)
232 } 233 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698