Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(13)

Side by Side Diff: server/cmd/logdog_collector/main.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Comments, rebase. Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « server/cmd/logdog_collector/Dockerfile ('k') | server/cmd/logdog_collector/run.sh » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package main
6
7 import (
8 "flag"
9 "fmt"
10 "os"
11 "time"
12
13 "github.com/luci/luci-go/common/auth"
14 "github.com/luci/luci-go/common/errors"
15 "github.com/luci/luci-go/common/gcloud/pubsub"
16 "github.com/luci/luci-go/common/gcloud/pubsub/ackbuffer"
17 "github.com/luci/luci-go/common/gcloud/pubsub/subscriber"
18 log "github.com/luci/luci-go/common/logging"
19 "github.com/luci/luci-go/common/parallel"
20 "github.com/luci/luci-go/server/internal/logdog/collector"
21 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator"
22 "github.com/luci/luci-go/server/internal/logdog/service"
23 "golang.org/x/net/context"
24 )
25
26 var (
27 errInvalidConfig = errors.New("invalid configuration")
28 )
29
30 // application is the Collector application state.
31 type application struct {
32 *service.Service
33
34 // shutdownCtx is a Context that will be cancelled if our application
35 // receives a shutdown signal.
36 shutdownCtx context.Context
37 }
38
39 // run is the main execution function.
40 func (a *application) runCollector() error {
41 cfg := a.Config()
42 ccfg := cfg.GetCollector()
43 if ccfg == nil {
44 return errors.New("no collector configuration")
45 }
46
47 pscfg := cfg.GetTransport().GetPubsub()
48 if pscfg == nil {
49 return errors.New("missing Pub/Sub configuration")
50 }
51
52 // Our Subscription must be a valid one.
53 sub := pubsub.NewSubscription(pscfg.Project, pscfg.Subscription)
54 if err := sub.Validate(); err != nil {
55 return fmt.Errorf("invalid Pub/Sub subscription %q: %v", sub, er r)
56 }
57
58 // New PubSub instance with the authenticated client.
59 psClient, err := a.AuthenticatedClient(func(o *auth.Options) {
60 o.Scopes = pubsub.SubscriberScopes
61 })
62 if err != nil {
63 log.WithError(err).Errorf(a, "Failed to create Pub/Sub client.")
64 return err
65 }
66
67 // Create a retrying Pub/Sub client.
68 ps := &pubsub.Retry{
69 Connection: pubsub.NewConnection(psClient),
70 Callback: func(err error, d time.Duration) {
71 log.Fields{
72 log.ErrorKey: err,
73 "delay": d,
74 }.Warningf(a, "Transient error encountered; retrying..." )
75 },
76 }
77
78 exists, err := ps.SubExists(a, sub)
79 if err != nil {
80 log.Fields{
81 log.ErrorKey: err,
82 "subscription": sub,
83 }.Errorf(a, "Could not confirm Pub/Sub subscription.")
84 return errInvalidConfig
85 }
86 if !exists {
87 log.Fields{
88 "subscription": sub,
89 }.Errorf(a, "Subscription does not exist.")
90 return errInvalidConfig
91 }
92 log.Fields{
93 "subscription": sub,
94 }.Infof(a, "Successfully validated Pub/Sub subscription.")
95
96 // Initialize our Storage.
97 s, err := a.Storage()
98 if err != nil {
99 log.WithError(err).Errorf(a, "Failed to get storage instance.")
100 return err
101 }
102 defer s.Close()
103
104 // Application shutdown will now operate by cancelling the Collector's
105 // shutdown Context.
106 shutdownCtx, shutdownFunc := context.WithCancel(a)
107 a.SetShutdownFunc(shutdownFunc)
108 defer a.SetShutdownFunc(nil)
109
110 // Start an ACK buffer so that we can batch ACKs.
111 ab := ackbuffer.New(a, ackbuffer.Config{
112 Ack: ackbuffer.NewACK(ps, sub, 0),
113 })
114 defer ab.CloseAndFlush()
115
116 // Initialize our Collector service object using a caching Coordinator
117 // interface.
118 coord := coordinator.NewCoordinator(a.Coordinator())
119 coord = coordinator.NewCache(coord, int(ccfg.StateCacheSize), ccfg.State CacheExpiration.Duration())
120 coll := collector.Collector{
121 Coordinator: coord,
122 Storage: s,
123 Sem: make(parallel.Semaphore, int(ccfg.Workers)),
124 }
125
126 // Execute our main Subscriber loop. It will run until the supplied Cont ext
127 // is cancelled.
128 engine := subscriber.Subscriber{
129 S: subscriber.NewSource(ps, sub, 0),
130 A: ab,
131
132 PullWorkers: int(ccfg.TransportWorkers),
133 HandlerWorkers: int(ccfg.Workers),
134 }
135 engine.Run(shutdownCtx, func(msg *pubsub.Message) bool {
136 ctx := log.SetFields(a, log.Fields{
137 "messageID": msg.ID,
138 "size": len(msg.Data),
139 "ackID": msg.AckID,
140 })
141
142 if err := coll.Process(ctx, msg.Data); err != nil {
143 if errors.IsTransient(err) {
144 // Do not consume
145 log.Fields{
146 log.ErrorKey: err,
147 "msgID": msg.ID,
148 "size": len(msg.Data),
149 }.Warningf(ctx, "TRANSIENT error ingesting Pub/S ub message.")
150 return false
151 }
152
153 log.Fields{
154 log.ErrorKey: err,
155 "msgID": msg.ID,
156 "size": len(msg.Data),
157 }.Errorf(ctx, "Error ingesting Pub/Sub message.")
158 }
159 return true
160 })
161
162 log.Debugf(a, "Collector finished.")
163 return nil
164 }
165
166 // mainImpl is the Main implementaion, and returns the application return code
167 // as an integer.
168 func mainImpl() int {
169 a := application{
170 Service: service.New(context.Background()),
171 }
172
173 fs := flag.FlagSet{}
174 a.AddFlags(&fs)
175
176 if err := fs.Parse(os.Args[1:]); err != nil {
177 log.Errorf(log.SetError(a, err), "Failed to parse command-line." )
178 return 1
179 }
180
181 // Run our configured application instance.
182 var rc int
183 if err := a.Run(a.runCollector); err != nil {
184 log.Errorf(log.SetError(a, err), "Application execution failed." )
185 return 1
186 }
187 log.Infof(log.SetField(a, "returnCode", rc), "Terminating.")
188 return 0
189 }
190
191 // Entry point.
192 func main() {
193 os.Exit(mainImpl())
194 }
OLDNEW
« no previous file with comments | « server/cmd/logdog_collector/Dockerfile ('k') | server/cmd/logdog_collector/run.sh » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698