Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(415)

Side by Side Diff: logdog/appengine/coordinator/service.go

Issue 2989333002: [logdog] Replace Tumble with push queues. (Closed)
Patch Set: comments Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The LUCI Authors. 1 // Copyright 2015 The LUCI Authors.
2 // 2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License. 4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at 5 // You may obtain a copy of the License at
6 // 6 //
7 // http://www.apache.org/licenses/LICENSE-2.0 7 // http://www.apache.org/licenses/LICENSE-2.0
8 // 8 //
9 // Unless required by applicable law or agreed to in writing, software 9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, 10 // distributed under the License is distributed on an "AS IS" BASIS,
(...skipping 17 matching lines...) Expand all
28 "github.com/luci/luci-go/logdog/api/config/svcconfig" 28 "github.com/luci/luci-go/logdog/api/config/svcconfig"
29 "github.com/luci/luci-go/logdog/appengine/coordinator/config" 29 "github.com/luci/luci-go/logdog/appengine/coordinator/config"
30 "github.com/luci/luci-go/logdog/common/storage" 30 "github.com/luci/luci-go/logdog/common/storage"
31 "github.com/luci/luci-go/logdog/common/storage/archive" 31 "github.com/luci/luci-go/logdog/common/storage/archive"
32 "github.com/luci/luci-go/logdog/common/storage/bigtable" 32 "github.com/luci/luci-go/logdog/common/storage/bigtable"
33 "github.com/luci/luci-go/logdog/common/storage/caching" 33 "github.com/luci/luci-go/logdog/common/storage/caching"
34 "github.com/luci/luci-go/luci_config/common/cfgtypes" 34 "github.com/luci/luci-go/luci_config/common/cfgtypes"
35 "github.com/luci/luci-go/server/auth" 35 "github.com/luci/luci-go/server/auth"
36 "github.com/luci/luci-go/server/router" 36 "github.com/luci/luci-go/server/router"
37 37
38 » gcps "cloud.google.com/go/pubsub" 38 » vkit "cloud.google.com/go/pubsub/apiv1"
39 gcst "cloud.google.com/go/storage" 39 gcst "cloud.google.com/go/storage"
40 "golang.org/x/net/context"
41 "google.golang.org/api/option" 40 "google.golang.org/api/option"
42 "google.golang.org/appengine" 41 "google.golang.org/appengine"
43 "google.golang.org/grpc" 42 "google.golang.org/grpc"
44 "google.golang.org/grpc/metadata" 43 "google.golang.org/grpc/metadata"
44
45 "golang.org/x/net/context"
45 ) 46 )
46 47
47 const ( 48 const (
48 // maxSignedURLLifetime is the maximum allowed signed URL lifetime. 49 // maxSignedURLLifetime is the maximum allowed signed URL lifetime.
49 maxSignedURLLifetime = 1 * time.Hour 50 maxSignedURLLifetime = 1 * time.Hour
50 51
51 // maxGSFetchSize is the maximum amount of data we can fetch from a sing le 52 // maxGSFetchSize is the maximum amount of data we can fetch from a sing le
52 // Google Storage RPC call. 53 // Google Storage RPC call.
53 // 54 //
54 // AppEngine's "urlfetch" has a limit of 32MB: 55 // AppEngine's "urlfetch" has a limit of 32MB:
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after
118 119
119 // gcfg is the cached global configuration. 120 // gcfg is the cached global configuration.
120 gcfg *config.Config 121 gcfg *config.Config
121 projectConfigs map[cfgtypes.ProjectName]*cachedProjectConfig 122 projectConfigs map[cfgtypes.ProjectName]*cachedProjectConfig
122 123
123 // archivalIndex is the atomically-manipulated archival index for the 124 // archivalIndex is the atomically-manipulated archival index for the
124 // ArchivalPublisher. This is shared between all ArchivalPublisher insta nces 125 // ArchivalPublisher. This is shared between all ArchivalPublisher insta nces
125 // from this service. 126 // from this service.
126 archivalIndex int32 127 archivalIndex int32
127 128
128 » // pubSubClients is a map of Pub/Sub client singletons generated during this 129 » // pubSubClient is a Pub/Sub client generated during this request.
129 » // request. Each client is associated with its project, and will be
130 » // initialized the first time it is requested by getPubSubClient.
131 // 130 //
132 » // All clients will be closed on "close". 131 » // It will be closed on "close".
133 » pubSubClients map[string]*gcps.Client 132 » pubSubClient *vkit.PublisherClient
134
135 // signer is the signer instance to use. 133 // signer is the signer instance to use.
136 signer gaesigner.Signer 134 signer gaesigner.Signer
137 } 135 }
138 136
139 func (s *prodServicesInst) close(c context.Context) { 137 func (s *prodServicesInst) close(c context.Context) {
140 s.Lock() 138 s.Lock()
141 defer s.Unlock() 139 defer s.Unlock()
142 140
143 » for proj, client := range s.pubSubClients { 141 » if client := s.pubSubClient; client != nil {
144 if err := client.Close(); err != nil { 142 if err := client.Close(); err != nil {
145 » » » log.Fields{ 143 » » » log.WithError(err).Errorf(c, "Failed to close Pub/Sub cl ient singleton.")
146 » » » » log.ErrorKey: err,
147 » » » » "project": proj,
148 » » » }.Errorf(c, "Failed to close Pub/Sub client singleton.")
149 } 144 }
145 s.pubSubClient = nil
150 } 146 }
151 } 147 }
152
153 func (s *prodServicesInst) Config(c context.Context) (*config.Config, error) { 148 func (s *prodServicesInst) Config(c context.Context) (*config.Config, error) {
154 s.Lock() 149 s.Lock()
155 defer s.Unlock() 150 defer s.Unlock()
156 151
157 // Load/cache the global config. 152 // Load/cache the global config.
158 if s.gcfg == nil { 153 if s.gcfg == nil {
159 var err error 154 var err error
160 s.gcfg, err = config.Load(c) 155 s.gcfg, err = config.Load(c)
161 if err != nil { 156 if err != nil {
162 return nil, err 157 return nil, err
(...skipping 193 matching lines...) Expand 10 before | Expand all | Expand 10 after
356 } 351 }
357 352
358 fullTopic := pubsub.Topic(cfg.Coordinator.ArchiveTopic) 353 fullTopic := pubsub.Topic(cfg.Coordinator.ArchiveTopic)
359 if err := fullTopic.Validate(); err != nil { 354 if err := fullTopic.Validate(); err != nil {
360 log.Fields{ 355 log.Fields{
361 log.ErrorKey: err, 356 log.ErrorKey: err,
362 "topic": fullTopic, 357 "topic": fullTopic,
363 }.Errorf(c, "Failed to validate archival topic.") 358 }.Errorf(c, "Failed to validate archival topic.")
364 return nil, errors.New("invalid archival topic") 359 return nil, errors.New("invalid archival topic")
365 } 360 }
366 project, topic := fullTopic.Split()
367 361
368 » // Get a Pub/Sub client (maybe re-use). 362 » psClient, err := s.getPubSubClient()
369 » psClient, err := s.getPubSubClient(project)
370 if err != nil { 363 if err != nil {
371 log.WithError(err).Errorf(c, "Failed to get Pub/Sub client.")
372 return nil, err 364 return nil, err
373 } 365 }
374 366
375 // Create a Topic, and configure it to not bundle messages. 367 // Create a Topic, and configure it to not bundle messages.
376 psTopic := psClient.Topic(topic)
377 pubsub.DisableTopicBundling(psTopic)
378
379 return &pubsubArchivalPublisher{ 368 return &pubsubArchivalPublisher{
380 » » client: psClient, 369 » » publisher: &pubsub.UnbufferedPublisher{
381 » » topic: psTopic, 370 » » » Topic: fullTopic,
371 » » » Client: psClient,
372 » » },
382 publishIndexFunc: s.nextArchiveIndex, 373 publishIndexFunc: s.nextArchiveIndex,
383 }, nil 374 }, nil
384 } 375 }
385 376
386 func (s *prodServicesInst) getPubSubClient(proj string) (*gcps.Client, error) { 377 func (s *prodServicesInst) getPubSubClient() (*vkit.PublisherClient, error) {
378 » s.Lock()
379 » defer s.Unlock()
380
381 » if s.pubSubClient != nil {
382 » » return s.pubSubClient, nil
383 » }
384
387 // Create a new AppEngine context. Don't pass gRPC metadata to PubSub, s ince 385 // Create a new AppEngine context. Don't pass gRPC metadata to PubSub, s ince
388 // we don't want any caller RPC to be forwarded to the backend service. 386 // we don't want any caller RPC to be forwarded to the backend service.
389 c := metadata.NewContext(s.aeCtx, nil) 387 c := metadata.NewContext(s.aeCtx, nil)
390 388
391 » s.Lock() 389 » // Create an authenticated unbuffered Pub/Sub Publisher.
392 » defer s.Unlock()
393
394 » client := s.pubSubClients[proj]
395 » if client != nil {
396 » » return client, nil
397 » }
398
399 » // Create an authenticated Pub/Sub client.
400 creds, err := auth.GetPerRPCCredentials(auth.AsSelf, auth.WithScopes(pub sub.PublisherScopes...)) 390 creds, err := auth.GetPerRPCCredentials(auth.AsSelf, auth.WithScopes(pub sub.PublisherScopes...))
401 if err != nil { 391 if err != nil {
402 log.WithError(err).Errorf(c, "Failed to create Pub/Sub credentia ls.") 392 log.WithError(err).Errorf(c, "Failed to create Pub/Sub credentia ls.")
403 return nil, errors.New("failed to create Pub/Sub credentials") 393 return nil, errors.New("failed to create Pub/Sub credentials")
404 } 394 }
405 395
406 » psClient, err := gcps.NewClient(c, proj, 396 » psClient, err := vkit.NewPublisherClient(c,
407 option.WithGRPCDialOption(grpc.WithPerRPCCredentials(creds))) 397 option.WithGRPCDialOption(grpc.WithPerRPCCredentials(creds)))
408 if err != nil { 398 if err != nil {
409 log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.") 399 log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.")
410 return nil, errors.New("failed to create Pub/Sub client") 400 return nil, errors.New("failed to create Pub/Sub client")
411 } 401 }
412 402
413 » // Retain this client for the duration of our request. 403 » s.pubSubClient = psClient
414 » if s.pubSubClients == nil {
415 » » s.pubSubClients = make(map[string]*gcps.Client)
416 » }
417 » s.pubSubClients[proj] = psClient
418 return psClient, nil 404 return psClient, nil
419 } 405 }
420 406
421 func (s *prodServicesInst) nextArchiveIndex() uint64 { 407 func (s *prodServicesInst) nextArchiveIndex() uint64 {
422 // We use a 32-bit value for this because it avoids atomic memory bounar y 408 // We use a 32-bit value for this because it avoids atomic memory bounar y
423 // issues. Furthermore, we constrain it to be positive, using a negative 409 // issues. Furthermore, we constrain it to be positive, using a negative
424 // value as a sentinel that the archival index has wrapped. 410 // value as a sentinel that the archival index has wrapped.
425 // 411 //
426 // This is reasonable, as it is very unlikely that a single request will issue 412 // This is reasonable, as it is very unlikely that a single request will issue
427 // more than MaxInt32 archival tasks. 413 // more than MaxInt32 archival tasks.
(...skipping 134 matching lines...) Expand 10 before | Expand all | Expand 10 after
562 548
563 // Sign index URL. 549 // Sign index URL.
564 if req.Index { 550 if req.Index {
565 if resp.Index, err = doSign(si.index); err != nil { 551 if resp.Index, err = doSign(si.index); err != nil {
566 return nil, errors.Annotate(err, "").InternalReason("fai led to sign index URL").Err() 552 return nil, errors.Annotate(err, "").InternalReason("fai led to sign index URL").Err()
567 } 553 }
568 } 554 }
569 555
570 return &resp, nil 556 return &resp, nil
571 } 557 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698