Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(4)

Side by Side Diff: server/cmd/logdog_collector/main.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Code review comments, use Pub/Sub, archival staging, quality of life. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package main 5 package main
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 "io"
9 "time" 10 "time"
10 11
11 "github.com/luci/luci-go/common/auth" 12 "github.com/luci/luci-go/common/auth"
12 "github.com/luci/luci-go/common/clock" 13 "github.com/luci/luci-go/common/clock"
13 "github.com/luci/luci-go/common/errors" 14 "github.com/luci/luci-go/common/errors"
14 gcps "github.com/luci/luci-go/common/gcloud/pubsub" 15 gcps "github.com/luci/luci-go/common/gcloud/pubsub"
15 log "github.com/luci/luci-go/common/logging" 16 log "github.com/luci/luci-go/common/logging"
16 "github.com/luci/luci-go/common/parallel" 17 "github.com/luci/luci-go/common/parallel"
17 "github.com/luci/luci-go/server/internal/logdog/collector" 18 "github.com/luci/luci-go/server/internal/logdog/collector"
18 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator" 19 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator"
(...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after
109 110
110 coll := collector.Collector{ 111 coll := collector.Collector{
111 Coordinator: coord, 112 Coordinator: coord,
112 Storage: st, 113 Storage: st,
113 MaxMessageWorkers: int(ccfg.MaxMessageWorkers), 114 MaxMessageWorkers: int(ccfg.MaxMessageWorkers),
114 } 115 }
115 defer coll.Close() 116 defer coll.Close()
116 117
117 // Execute our main subscription pull loop. It will run until the suppli ed 118 // Execute our main subscription pull loop. It will run until the suppli ed
118 // Context is cancelled. 119 // Context is cancelled.
119 » psIterator, err := psSub.Pull(c) 120 » psIterator, err := psSub.Pull(shutdownCtx)
120 if err != nil { 121 if err != nil {
121 log.WithError(err).Errorf(c, "Failed to create Pub/Sub iterator. ") 122 log.WithError(err).Errorf(c, "Failed to create Pub/Sub iterator. ")
122 return err 123 return err
123 } 124 }
124 » defer func() { 125 » defer psIterator.Stop()
125 » » log.Debugf(c, "Waiting for Pub/Sub subscription iterator to stop ...")
126 » » psIterator.Stop()
127 » » log.Debugf(c, "Pub/Sub subscription iterator has stopped.")
128 » }()
129 126
130 parallel.Ignore(parallel.Run(int(ccfg.MaxConcurrentMessages), func(taskC chan<- func() error) { 127 parallel.Ignore(parallel.Run(int(ccfg.MaxConcurrentMessages), func(taskC chan<- func() error) {
131 // Loop until shut down. 128 // Loop until shut down.
132 » » for shutdownCtx.Err() == nil { 129 » » for {
133 msg, err := psIterator.Next() 130 msg, err := psIterator.Next()
134 » » » if err != nil { 131 » » » switch err {
dnj 2016/04/11 17:20:04 This was a bugfix ported from Archivist in respons
132 » » » case nil:
133 » » » » taskC <- func() error {
134 » » » » » c := log.SetField(c, "messageID", msg.ID )
135 » » » » » msg.Done(a.processMessage(c, &coll, msg) )
136 » » » » » return nil
137 » » » » }
138
139 » » » case io.EOF, context.Canceled, context.DeadlineExceeded:
140 » » » » return
141
142 » » » default:
135 log.Fields{ 143 log.Fields{
136 log.ErrorKey: err, 144 log.ErrorKey: err,
137 "delay": pubsubPullErrorDelay, 145 "delay": pubsubPullErrorDelay,
138 }.Errorf(c, "Failed to fetch Pub/Sub message, re try after delay...") 146 }.Errorf(c, "Failed to fetch Pub/Sub message, re try after delay...")
139 clock.Sleep(c, pubsubPullErrorDelay) 147 clock.Sleep(c, pubsubPullErrorDelay)
140 continue
141 }
142
143 taskC <- func() error {
144 c := log.SetField(c, "messageID", msg.ID)
145 msg.Done(a.processMessage(c, &coll, msg))
146 return nil
147 } 148 }
148 } 149 }
149 })) 150 }))
150 151
151 log.Debugf(c, "Collector finished.") 152 log.Debugf(c, "Collector finished.")
152 return nil 153 return nil
153 } 154 }
154 155
155 // processMessage returns true if the message should be ACK'd (deleted from 156 // processMessage returns true if the message should be ACK'd (deleted from
156 // Pub/Sub) or false if the message should not be ACK'd. 157 // Pub/Sub) or false if the message should not be ACK'd.
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
190 }.Errorf(c, "Non-transient error ingesting Pub/Sub message; ACKi ng.") 191 }.Errorf(c, "Non-transient error ingesting Pub/Sub message; ACKi ng.")
191 return true 192 return true
192 } 193 }
193 } 194 }
194 195
195 // Entry point. 196 // Entry point.
196 func main() { 197 func main() {
197 a := application{} 198 a := application{}
198 a.Run(context.Background(), a.runCollector) 199 a.Run(context.Background(), a.runCollector)
199 } 200 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698