OLD | NEW |
1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package main | 5 package main |
6 | 6 |
7 import ( | 7 import ( |
8 "time" | 8 "time" |
9 | 9 |
10 "github.com/luci/luci-go/common/clock" | 10 "github.com/luci/luci-go/common/clock" |
11 "github.com/luci/luci-go/common/data/rand/mathrand" | 11 "github.com/luci/luci-go/common/data/rand/mathrand" |
12 "github.com/luci/luci-go/common/errors" | 12 "github.com/luci/luci-go/common/errors" |
13 "github.com/luci/luci-go/common/gcloud/gs" | 13 "github.com/luci/luci-go/common/gcloud/gs" |
14 gcps "github.com/luci/luci-go/common/gcloud/pubsub" | 14 gcps "github.com/luci/luci-go/common/gcloud/pubsub" |
15 log "github.com/luci/luci-go/common/logging" | 15 log "github.com/luci/luci-go/common/logging" |
16 "github.com/luci/luci-go/common/retry" | 16 "github.com/luci/luci-go/common/retry" |
| 17 "github.com/luci/luci-go/common/retry/transient" |
17 "github.com/luci/luci-go/common/tsmon/distribution" | 18 "github.com/luci/luci-go/common/tsmon/distribution" |
18 "github.com/luci/luci-go/common/tsmon/field" | 19 "github.com/luci/luci-go/common/tsmon/field" |
19 "github.com/luci/luci-go/common/tsmon/metric" | 20 "github.com/luci/luci-go/common/tsmon/metric" |
20 "github.com/luci/luci-go/common/tsmon/types" | 21 "github.com/luci/luci-go/common/tsmon/types" |
21 "github.com/luci/luci-go/grpc/grpcutil" | 22 "github.com/luci/luci-go/grpc/grpcutil" |
22 "github.com/luci/luci-go/logdog/api/config/svcconfig" | 23 "github.com/luci/luci-go/logdog/api/config/svcconfig" |
23 "github.com/luci/luci-go/logdog/server/archivist" | 24 "github.com/luci/luci-go/logdog/server/archivist" |
24 "github.com/luci/luci-go/logdog/server/service" | 25 "github.com/luci/luci-go/logdog/server/service" |
25 "github.com/luci/luci-go/luci_config/common/cfgtypes" | 26 "github.com/luci/luci-go/luci_config/common/cfgtypes" |
26 | 27 |
(...skipping 111 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
138 return &retry.ExponentialBackoff{ | 139 return &retry.ExponentialBackoff{ |
139 Limited: retry.Limited{ | 140 Limited: retry.Limited{ |
140 Delay: 200 * time.Millisecond, | 141 Delay: 200 * time.Millisecond, |
141 Retries: -1, // Unlimited. | 142 Retries: -1, // Unlimited. |
142 }, | 143 }, |
143 MaxDelay: 10 * time.Second, | 144 MaxDelay: 10 * time.Second, |
144 Multiplier: 2, | 145 Multiplier: 2, |
145 } | 146 } |
146 } | 147 } |
147 | 148 |
148 » err = retry.Retry(c, retry.TransientOnly(retryForever), func() error { | 149 » err = retry.Retry(c, transient.Only(retryForever), func() error { |
149 return grpcutil.WrapIfTransient(sub.Receive(c, func(c context.Co
ntext, msg *pubsub.Message) { | 150 return grpcutil.WrapIfTransient(sub.Receive(c, func(c context.Co
ntext, msg *pubsub.Message) { |
150 c = log.SetFields(c, log.Fields{ | 151 c = log.SetFields(c, log.Fields{ |
151 "messageID": msg.ID, | 152 "messageID": msg.ID, |
152 }) | 153 }) |
153 | 154 |
154 // ACK (or not) the message based on whether our task wa
s consumed. | 155 // ACK (or not) the message based on whether our task wa
s consumed. |
155 deleteTask := false | 156 deleteTask := false |
156 defer func() { | 157 defer func() { |
157 // ACK the message if it is completed. If not, w
e do not NACK it, as we | 158 // ACK the message if it is completed. If not, w
e do not NACK it, as we |
158 // want to let Pub/Sub redelivery delay occur as
a form of backoff. | 159 // want to let Pub/Sub redelivery delay occur as
a form of backoff. |
(...skipping 105 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
264 func main() { | 265 func main() { |
265 mathrand.SeedRandomly() | 266 mathrand.SeedRandomly() |
266 a := application{ | 267 a := application{ |
267 Service: service.Service{ | 268 Service: service.Service{ |
268 Name: "archivist", | 269 Name: "archivist", |
269 DefaultAuthOptions: chromeinfra.DefaultAuthOptions(), | 270 DefaultAuthOptions: chromeinfra.DefaultAuthOptions(), |
270 }, | 271 }, |
271 } | 272 } |
272 a.Run(context.Background(), a.runArchivist) | 273 a.Run(context.Background(), a.runArchivist) |
273 } | 274 } |
OLD | NEW |