Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(12)

Side by Side Diff: server/cmd/logdog_collector/main.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
dnj (Google) 2016/01/21 04:36:24 One of the main files to review.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package main
6
7 import (
8 "flag"
9 "fmt"
10 "os"
11 "time"
12
13 "github.com/luci/luci-go/common/auth"
14 "github.com/luci/luci-go/common/errors"
15 "github.com/luci/luci-go/common/gcloud/gcps"
16 "github.com/luci/luci-go/common/gcloud/gcps/ackbuffer"
17 "github.com/luci/luci-go/common/gcloud/gcps/subscriber"
18 log "github.com/luci/luci-go/common/logging"
19 "github.com/luci/luci-go/common/parallel"
20 "github.com/luci/luci-go/server/internal/logdog/collector"
21 "github.com/luci/luci-go/server/internal/logdog/service"
22 "golang.org/x/net/context"
23 "google.golang.org/cloud/pubsub"
24 )
25
26 var (
27 errInvalidConfig = errors.New("invalid configuration")
28 )
29
30 // application is the Collector application state.
31 type application struct {
32 *service.Service
33
34 // shutdownCtx is a Context that will be cancelled if our application
35 // receives a shutdown signal.
36 shutdownCtx context.Context
37 }
38
39 // run is the main execution function.
40 func (a *application) runCollector() error {
41 cfg := a.Config()
42 ccfg := cfg.GetCollector()
43 if ccfg == nil {
44 return errors.New("no collector configuration")
45 }
46
47 pscfg := cfg.GetTransport().GetPubsub()
48 switch {
49 case pscfg == nil:
50 return errors.New("missing Pub/Sub configuration")
51 case pscfg.Project == "":
52 return errors.New("missing required Pub/Sub project")
53 case pscfg.Subscription == "":
54 return errors.New("missing required subscription name")
55 }
56
57 // Our Subscription must be a valid one.
58 sub := gcps.Subscription(pscfg.Subscription)
59 if err := sub.Validate(); err != nil {
60 return fmt.Errorf("invalid subscription name: %v", err)
61 }
62
63 // New PubSub instance with the authenticated client.
64 psClient, err := a.AuthenticatedClient(func(o *auth.Options) {
65 o.Scopes = gcps.SubscriberScopes
66 })
67 if err != nil {
68 log.WithError(err).Errorf(a, "Failed to create Pub/Sub client.")
69 return err
70 }
71
72 // Create a retrying Pub/Sub client.
73 ps := gcps.New(psClient, pscfg.Project)
74 if err != nil {
75 log.Errorf(log.SetError(a, err), "Failed to create Pub/Sub insta nce.")
76 return errInvalidConfig
77 }
78 ps = &gcps.Retry{
79 PS: ps,
80 C: func(err error, d time.Duration) {
81 log.Fields{
82 log.ErrorKey: err,
83 "delay": d,
84 }.Warningf(a, "Transient error encountered; retrying..." )
85 },
86 }
87
88 exists, err := ps.SubExists(a, sub)
89 if err != nil {
90 log.Fields{
91 log.ErrorKey: err,
92 "subscription": pscfg.Subscription,
93 }.Errorf(a, "Could not confirm Pub/Sub subscription.")
94 return errInvalidConfig
95 }
96 if !exists {
97 log.Fields{
98 "subscription": pscfg.Subscription,
99 }.Errorf(a, "Subscription does not exist.")
100 return errInvalidConfig
101 }
102 log.Fields{
103 "subscription": sub,
104 }.Infof(a, "Successfully validated Pub/Sub subscription.")
105
106 // Initialize our Storage.
107 s, err := a.Storage()
108 if err != nil {
109 log.WithError(err).Errorf(a, "Failed to get storage instance.")
110 return err
111 }
112 defer s.Close()
113
114 // Application shutdown will now operate by cancelling the Collector's
115 // shutdown Context.
116 shutdownCtx, shutdownFunc := context.WithCancel(a)
117 a.SetShutdownFunc(shutdownFunc)
118 defer a.SetShutdownFunc(nil)
119
120 // Start an ACK buffer so that we can batch ACKs.
121 ab := ackbuffer.New(a, ackbuffer.Config{
122 Ack: ackbuffer.NewACK(ps, sub, 0),
123 })
124 defer ab.CloseAndFlush()
125
126 // Initialize our Collector service object.
127 coll := collector.New(collector.Options{
128 Coordinator: a.Coordinator(),
129 Storage: s,
130 StreamStateCacheExpire: ccfg.StateCacheExpiration.Duration(),
131 Sem: make(parallel.Semaphore, int(ccfg.Workers)),
132 })
133
134 // Execute our main Subscriber loop. It will run until the supplied Cont ext
135 // is cancelled.
136 engine := subscriber.Subscriber{
137 S: subscriber.NewSource(ps, sub, 0),
138 A: ab,
139
140 Workers: int(ccfg.TransportWorkers),
141 HandlerSem: make(parallel.Semaphore, int(ccfg.Workers)),
142 }
143 engine.Run(shutdownCtx, func(msg *pubsub.Message) bool {
144 ctx := log.SetFields(a, log.Fields{
145 "messageID": msg.ID,
146 "size": len(msg.Data),
147 "ackID": msg.AckID,
148 })
149
150 if err := coll.Process(ctx, msg.Data); err != nil {
151 if errors.IsTransient(err) {
152 // Do not consume
153 log.Fields{
154 log.ErrorKey: err,
155 "msgID": msg.ID,
156 "size": len(msg.Data),
157 }.Warningf(ctx, "TRANSIENT error ingesting Pub/S ub message.")
158 return false
159 }
160
161 log.Fields{
162 log.ErrorKey: err,
163 "msgID": msg.ID,
164 "size": len(msg.Data),
165 }.Errorf(ctx, "Error ingesting Pub/Sub message.")
166 }
167 return true
168 })
169
170 log.Debugf(a, "Collector finished.")
171 return nil
172 }
173
174 // mainImpl is the Main implementaion, and returns the application return code
175 // as an integer.
176 func mainImpl() int {
177 a := application{
178 Service: service.New(context.Background()),
179 }
180
181 fs := flag.FlagSet{}
182 a.AddFlags(&fs)
183
184 if err := fs.Parse(os.Args[1:]); err != nil {
185 log.Errorf(log.SetError(a, err), "Failed to parse command-line." )
186 return 1
187 }
188
189 // Run our configured application instance.
190 var rc int
191 if err := a.Run(a.runCollector); err != nil {
192 log.Errorf(log.SetError(a, err), "Application execution failed." )
193 return 1
194 }
195 log.Infof(log.SetField(a, "returnCode", rc), "Terminating.")
196 return 0
197 }
198
199 // Entry point.
200 func main() {
201 os.Exit(mainImpl())
202 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698