Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(513)

Side by Side Diff: logdog/appengine/coordinator/service.go

Issue 2219023003: Update APIs to use new Google cloud paths. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package coordinator 5 package coordinator
6 6
7 import ( 7 import (
8 "net/http" 8 "net/http"
9 "sync" 9 "sync"
10 "sync/atomic" 10 "sync/atomic"
11 11
12 "github.com/luci/luci-go/appengine/gaemiddleware" 12 "github.com/luci/luci-go/appengine/gaemiddleware"
13 luciConfig "github.com/luci/luci-go/common/config" 13 luciConfig "github.com/luci/luci-go/common/config"
14 "github.com/luci/luci-go/common/errors" 14 "github.com/luci/luci-go/common/errors"
15 "github.com/luci/luci-go/common/gcloud/gs" 15 "github.com/luci/luci-go/common/gcloud/gs"
16 "github.com/luci/luci-go/common/gcloud/pubsub" 16 "github.com/luci/luci-go/common/gcloud/pubsub"
17 log "github.com/luci/luci-go/common/logging" 17 log "github.com/luci/luci-go/common/logging"
18 "github.com/luci/luci-go/logdog/api/config/svcconfig" 18 "github.com/luci/luci-go/logdog/api/config/svcconfig"
19 "github.com/luci/luci-go/logdog/appengine/coordinator/config" 19 "github.com/luci/luci-go/logdog/appengine/coordinator/config"
20 "github.com/luci/luci-go/logdog/common/storage" 20 "github.com/luci/luci-go/logdog/common/storage"
21 "github.com/luci/luci-go/logdog/common/storage/bigtable" 21 "github.com/luci/luci-go/logdog/common/storage/bigtable"
22 "github.com/luci/luci-go/server/auth" 22 "github.com/luci/luci-go/server/auth"
23 "github.com/luci/luci-go/server/router" 23 "github.com/luci/luci-go/server/router"
24
25 gcps "cloud.google.com/go/pubsub"
24 "golang.org/x/net/context" 26 "golang.org/x/net/context"
25 » "google.golang.org/cloud" 27 » "google.golang.org/api/option"
26 » gcps "google.golang.org/cloud/pubsub"
27 "google.golang.org/grpc/metadata" 28 "google.golang.org/grpc/metadata"
28 ) 29 )
29 30
30 // Services is a set of support services used by Coordinator. 31 // Services is a set of support services used by Coordinator.
31 // 32 //
32 // Each Services instance is valid for a singel request, but can be re-used 33 // Each Services instance is valid for a singel request, but can be re-used
33 // throughout that request. This is advised, as the Services instance may 34 // throughout that request. This is advised, as the Services instance may
34 // optionally cache values. 35 // optionally cache values.
35 // 36 //
36 // Services methods are goroutine-safe. 37 // Services methods are goroutine-safe.
(...skipping 157 matching lines...) Expand 10 before | Expand all | Expand 10 after
194 195
195 // Explicitly clear gRPC metadata from the Context. It is forwarded to 196 // Explicitly clear gRPC metadata from the Context. It is forwarded to
196 // delegate calls by default, and standard request metadata can break Bi gTable 197 // delegate calls by default, and standard request metadata can break Bi gTable
197 // calls. 198 // calls.
198 c = metadata.NewContext(c, nil) 199 c = metadata.NewContext(c, nil)
199 200
200 st, err := bigtable.New(c, bigtable.Options{ 201 st, err := bigtable.New(c, bigtable.Options{
201 Project: bt.Project, 202 Project: bt.Project,
202 Instance: bt.Instance, 203 Instance: bt.Instance,
203 LogTable: bt.LogTableName, 204 LogTable: bt.LogTableName,
204 » » ClientOptions: []cloud.ClientOption{ 205 » » ClientOptions: []option.ClientOption{
205 » » » cloud.WithBaseHTTP(&http.Client{Transport: transport}), 206 » » » option.WithHTTPClient(&http.Client{Transport: transport} ),
206 }, 207 },
207 }) 208 })
208 if err != nil { 209 if err != nil {
209 log.WithError(err).Errorf(c, "Failed to create BigTable instance .") 210 log.WithError(err).Errorf(c, "Failed to create BigTable instance .")
210 return nil, err 211 return nil, err
211 } 212 }
212 return st, nil 213 return st, nil
213 } 214 }
214 215
215 func (s *prodServicesInst) GSClient(c context.Context) (gs.Client, error) { 216 func (s *prodServicesInst) GSClient(c context.Context) (gs.Client, error) {
(...skipping 23 matching lines...) Expand all
239 } 240 }
240 project, topic := fullTopic.Split() 241 project, topic := fullTopic.Split()
241 242
242 // Create an authenticated Pub/Sub client. 243 // Create an authenticated Pub/Sub client.
243 transport, err := auth.GetRPCTransport(c, auth.AsSelf, auth.WithScopes(p ubsub.PublisherScopes...)) 244 transport, err := auth.GetRPCTransport(c, auth.AsSelf, auth.WithScopes(p ubsub.PublisherScopes...))
244 if err != nil { 245 if err != nil {
245 log.WithError(err).Errorf(c, "Failed to create Pub/Sub authentic ator.") 246 log.WithError(err).Errorf(c, "Failed to create Pub/Sub authentic ator.")
246 return nil, errors.New("failed to create Pub/Sub authenticator") 247 return nil, errors.New("failed to create Pub/Sub authenticator")
247 } 248 }
248 client := &http.Client{Transport: transport} 249 client := &http.Client{Transport: transport}
249 » psClient, err := gcps.NewClient(c, project, cloud.WithBaseHTTP(client)) 250 » psClient, err := gcps.NewClient(c, project, option.WithHTTPClient(client ))
250 if err != nil { 251 if err != nil {
251 log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.") 252 log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.")
252 return nil, errors.New("failed to create Pub/Sub client") 253 return nil, errors.New("failed to create Pub/Sub client")
253 } 254 }
254 255
255 return &pubsubArchivalPublisher{ 256 return &pubsubArchivalPublisher{
256 topic: psClient.Topic(topic), 257 topic: psClient.Topic(topic),
257 publishIndexFunc: s.nextArchiveIndex, 258 publishIndexFunc: s.nextArchiveIndex,
258 }, nil 259 }, nil
259 } 260 }
260 261
261 func (s *prodServicesInst) nextArchiveIndex() uint64 { 262 func (s *prodServicesInst) nextArchiveIndex() uint64 {
262 // We use a 32-bit value for this because it avoids atomic memory bounar y 263 // We use a 32-bit value for this because it avoids atomic memory bounar y
263 // issues. Furthermore, we constrain it to be positive, using a negative 264 // issues. Furthermore, we constrain it to be positive, using a negative
264 // value as a sentinel that the archival index has wrapped. 265 // value as a sentinel that the archival index has wrapped.
265 // 266 //
266 // This is reasonable, as it is very unlikely that a single request will issue 267 // This is reasonable, as it is very unlikely that a single request will issue
267 // more than MaxInt32 archival tasks. 268 // more than MaxInt32 archival tasks.
268 v := atomic.AddInt32(&s.archivalIndex, 1) - 1 269 v := atomic.AddInt32(&s.archivalIndex, 1) - 1
269 if v < 0 { 270 if v < 0 {
270 panic("archival index has wrapped") 271 panic("archival index has wrapped")
271 } 272 }
272 return uint64(v) 273 return uint64(v)
273 } 274 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698