Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(126)

Side by Side Diff: logdog/server/cmd/logdog_archivist/main.go

Issue 2219023003: Update APIs to use new Google cloud paths. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The LUCI Authors. All rights reserved. 1 // Copyright 2016 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package main 5 package main
6 6
7 import ( 7 import (
8 "time" 8 "time"
9 9
10 "github.com/luci/luci-go/common/auth" 10 "github.com/luci/luci-go/common/auth"
11 "github.com/luci/luci-go/common/clock" 11 "github.com/luci/luci-go/common/clock"
12 "github.com/luci/luci-go/common/config" 12 "github.com/luci/luci-go/common/config"
13 "github.com/luci/luci-go/common/errors" 13 "github.com/luci/luci-go/common/errors"
14 "github.com/luci/luci-go/common/gcloud/gs" 14 "github.com/luci/luci-go/common/gcloud/gs"
15 gcps "github.com/luci/luci-go/common/gcloud/pubsub" 15 gcps "github.com/luci/luci-go/common/gcloud/pubsub"
16 log "github.com/luci/luci-go/common/logging" 16 log "github.com/luci/luci-go/common/logging"
17 "github.com/luci/luci-go/common/sync/parallel" 17 "github.com/luci/luci-go/common/sync/parallel"
18 "github.com/luci/luci-go/common/tsmon/distribution" 18 "github.com/luci/luci-go/common/tsmon/distribution"
19 "github.com/luci/luci-go/common/tsmon/field" 19 "github.com/luci/luci-go/common/tsmon/field"
20 "github.com/luci/luci-go/common/tsmon/metric" 20 "github.com/luci/luci-go/common/tsmon/metric"
21 "github.com/luci/luci-go/common/tsmon/types" 21 "github.com/luci/luci-go/common/tsmon/types"
22 "github.com/luci/luci-go/logdog/api/config/svcconfig" 22 "github.com/luci/luci-go/logdog/api/config/svcconfig"
23 "github.com/luci/luci-go/logdog/server/archivist" 23 "github.com/luci/luci-go/logdog/server/archivist"
24 "github.com/luci/luci-go/logdog/server/service" 24 "github.com/luci/luci-go/logdog/server/service"
25
26 "cloud.google.com/go/pubsub"
25 "golang.org/x/net/context" 27 "golang.org/x/net/context"
26 » "google.golang.org/cloud" 28 » "google.golang.org/api/option"
27 » "google.golang.org/cloud/pubsub"
28 ) 29 )
29 30
30 var ( 31 var (
31 errInvalidConfig = errors.New("invalid configuration") 32 errInvalidConfig = errors.New("invalid configuration")
32 33
33 // tsTaskProcessingTime measures the amount of time spent processing a s ingle 34 // tsTaskProcessingTime measures the amount of time spent processing a s ingle
34 // task. 35 // task.
35 // 36 //
36 // The "consumed" field is true if the underlying task was consumed and 37 // The "consumed" field is true if the underlying task was consumed and
37 // false if it was not. 38 // false if it was not.
(...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after
84 psProject, psSubscriptionName := taskSub.Split() 85 psProject, psSubscriptionName := taskSub.Split()
85 86
86 psAuth, err := a.Authenticator(c, func(o *auth.Options) { 87 psAuth, err := a.Authenticator(c, func(o *auth.Options) {
87 o.Scopes = gcps.SubscriberScopes 88 o.Scopes = gcps.SubscriberScopes
88 }) 89 })
89 if err != nil { 90 if err != nil {
90 log.WithError(err).Errorf(c, "Failed to get Pub/Sub authenticato r.") 91 log.WithError(err).Errorf(c, "Failed to get Pub/Sub authenticato r.")
91 return err 92 return err
92 } 93 }
93 94
94 // Pub/Sub: HTTP Client => Context
95 psHTTPClient, err := psAuth.Client()
96 if err != nil {
97 log.WithError(err).Errorf(c, "Failed to create authenticated Pub /Sub transport.")
98 return err
99 }
100 psContext := cloud.WithContext(c, psProject, psHTTPClient)
101
102 // Pub/Sub: TokenSource => Client 95 // Pub/Sub: TokenSource => Client
103 » psClient, err := pubsub.NewClient(c, psProject, cloud.WithTokenSource(ps Auth.TokenSource())) 96 » psClient, err := pubsub.NewClient(c, psProject, option.WithTokenSource(p sAuth.TokenSource()))
104 if err != nil { 97 if err != nil {
105 log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.") 98 log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.")
106 return err 99 return err
107 } 100 }
108 sub := psClient.Subscription(psSubscriptionName) 101 sub := psClient.Subscription(psSubscriptionName)
109 102
110 // Initialize our Storage. 103 // Initialize our Storage.
111 st, err := a.IntermediateStorage(c) 104 st, err := a.IntermediateStorage(c)
112 if err != nil { 105 if err != nil {
113 log.WithError(err).Errorf(c, "Failed to get storage instance.") 106 log.WithError(err).Errorf(c, "Failed to get storage instance.")
(...skipping 69 matching lines...) Expand 10 before | Expand all | Expand 10 after
183 } else { 176 } else {
184 log.Fields{ 177 log.Fields{
185 "duration": dura tion, 178 "duration": dura tion,
186 }.Infof(c, "Task process ing incomplete. Not deleting.") 179 }.Infof(c, "Task process ing incomplete. Not deleting.")
187 } 180 }
188 181
189 // Add to our processing time me tric. 182 // Add to our processing time me tric.
190 tsTaskProcessingTime.Add(c, dura tion.Seconds()*1000, deleteTask) 183 tsTaskProcessingTime.Add(c, dura tion.Seconds()*1000, deleteTask)
191 }() 184 }()
192 185
193 » » » » » task, err := makePubSubArchivistTask(psC ontext, psSubscriptionName, msg) 186 » » » » » task, err := makePubSubArchivistTask(psS ubscriptionName, msg)
194 if err != nil { 187 if err != nil {
195 log.WithError(err).Errorf(c, "Fa iled to unmarshal archive task from message.") 188 log.WithError(err).Errorf(c, "Fa iled to unmarshal archive task from message.")
196 deleteTask = true 189 deleteTask = true
197 return nil 190 return nil
198 } 191 }
199 192
200 ar.ArchiveTask(c, task) 193 ar.ArchiveTask(c, task)
201 deleteTask = task.consumed 194 deleteTask = task.consumed
202 195
203 return nil 196 return nil
(...skipping 71 matching lines...) Expand 10 before | Expand all | Expand 10 after
275 268
276 // Entry point. 269 // Entry point.
277 func main() { 270 func main() {
278 a := application{ 271 a := application{
279 Service: service.Service{ 272 Service: service.Service{
280 Name: "archivist", 273 Name: "archivist",
281 }, 274 },
282 } 275 }
283 a.Run(context.Background(), a.runArchivist) 276 a.Run(context.Background(), a.runArchivist)
284 } 277 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698