| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package main | 5 package main |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "io" |
| 9 "time" | 10 "time" |
| 10 | 11 |
| 11 "github.com/luci/luci-go/common/auth" | 12 "github.com/luci/luci-go/common/auth" |
| 12 "github.com/luci/luci-go/common/clock" | 13 "github.com/luci/luci-go/common/clock" |
| 13 "github.com/luci/luci-go/common/errors" | 14 "github.com/luci/luci-go/common/errors" |
| 14 gcps "github.com/luci/luci-go/common/gcloud/pubsub" | 15 gcps "github.com/luci/luci-go/common/gcloud/pubsub" |
| 15 log "github.com/luci/luci-go/common/logging" | 16 log "github.com/luci/luci-go/common/logging" |
| 16 "github.com/luci/luci-go/common/parallel" | 17 "github.com/luci/luci-go/common/parallel" |
| 17 "github.com/luci/luci-go/server/internal/logdog/collector" | 18 "github.com/luci/luci-go/server/internal/logdog/collector" |
| 18 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator" | 19 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator" |
| (...skipping 71 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 90 log.Fields{ | 91 log.Fields{ |
| 91 "subscription": sub, | 92 "subscription": sub, |
| 92 }.Infof(c, "Successfully validated Pub/Sub subscription.") | 93 }.Infof(c, "Successfully validated Pub/Sub subscription.") |
| 93 | 94 |
| 94 st, err := a.IntermediateStorage(c) | 95 st, err := a.IntermediateStorage(c) |
| 95 if err != nil { | 96 if err != nil { |
| 96 return err | 97 return err |
| 97 } | 98 } |
| 98 defer st.Close() | 99 defer st.Close() |
| 99 | 100 |
| 100 // Application shutdown will now operate by cancelling the Collector's | |
| 101 // shutdown Context. | |
| 102 shutdownCtx, shutdownFunc := context.WithCancel(c) | |
| 103 a.SetShutdownFunc(shutdownFunc) | |
| 104 | |
| 105 // Initialize our Collector service object using a caching Coordinator | 101 // Initialize our Collector service object using a caching Coordinator |
| 106 // interface. | 102 // interface. |
| 107 coord := coordinator.NewCoordinator(a.Coordinator()) | 103 coord := coordinator.NewCoordinator(a.Coordinator()) |
| 108 coord = coordinator.NewCache(coord, int(ccfg.StateCacheSize), ccfg.State
CacheExpiration.Duration()) | 104 coord = coordinator.NewCache(coord, int(ccfg.StateCacheSize), ccfg.State
CacheExpiration.Duration()) |
| 109 | 105 |
| 110 coll := collector.Collector{ | 106 coll := collector.Collector{ |
| 111 Coordinator: coord, | 107 Coordinator: coord, |
| 112 Storage: st, | 108 Storage: st, |
| 113 MaxMessageWorkers: int(ccfg.MaxMessageWorkers), | 109 MaxMessageWorkers: int(ccfg.MaxMessageWorkers), |
| 114 } | 110 } |
| 115 defer coll.Close() | 111 defer coll.Close() |
| 116 | 112 |
| 117 // Execute our main subscription pull loop. It will run until the suppli
ed | 113 // Execute our main subscription pull loop. It will run until the suppli
ed |
| 118 // Context is cancelled. | 114 // Context is cancelled. |
| 119 psIterator, err := psSub.Pull(c) | 115 psIterator, err := psSub.Pull(c) |
| 120 if err != nil { | 116 if err != nil { |
| 121 log.WithError(err).Errorf(c, "Failed to create Pub/Sub iterator.
") | 117 log.WithError(err).Errorf(c, "Failed to create Pub/Sub iterator.
") |
| 122 return err | 118 return err |
| 123 } | 119 } |
| 124 » defer func() { | 120 » defer psIterator.Stop() |
| 125 » » log.Debugf(c, "Waiting for Pub/Sub subscription iterator to stop
...") | 121 |
| 126 » » psIterator.Stop() | 122 » // Application shutdown will now operate by cancelling the Collector's |
| 127 » » log.Debugf(c, "Pub/Sub subscription iterator has stopped.") | 123 » // shutdown Context. |
| 128 » }() | 124 » a.SetShutdownFunc(psIterator.Stop) |
| 129 | 125 |
| 130 parallel.Ignore(parallel.Run(int(ccfg.MaxConcurrentMessages), func(taskC
chan<- func() error) { | 126 parallel.Ignore(parallel.Run(int(ccfg.MaxConcurrentMessages), func(taskC
chan<- func() error) { |
| 131 // Loop until shut down. | 127 // Loop until shut down. |
| 132 » » for shutdownCtx.Err() == nil { | 128 » » for { |
| 133 msg, err := psIterator.Next() | 129 msg, err := psIterator.Next() |
| 134 » » » if err != nil { | 130 » » » switch err { |
| 131 » » » case nil: |
| 132 » » » » taskC <- func() error { |
| 133 » » » » » c := log.SetField(c, "messageID", msg.ID
) |
| 134 » » » » » msg.Done(a.processMessage(c, &coll, msg)
) |
| 135 » » » » » return nil |
| 136 » » » » } |
| 137 |
| 138 » » » case io.EOF, context.Canceled, context.DeadlineExceeded: |
| 139 » » » » return |
| 140 |
| 141 » » » default: |
| 135 log.Fields{ | 142 log.Fields{ |
| 136 log.ErrorKey: err, | 143 log.ErrorKey: err, |
| 137 "delay": pubsubPullErrorDelay, | 144 "delay": pubsubPullErrorDelay, |
| 138 }.Errorf(c, "Failed to fetch Pub/Sub message, re
try after delay...") | 145 }.Errorf(c, "Failed to fetch Pub/Sub message, re
try after delay...") |
| 139 clock.Sleep(c, pubsubPullErrorDelay) | 146 clock.Sleep(c, pubsubPullErrorDelay) |
| 140 continue | |
| 141 } | |
| 142 | |
| 143 taskC <- func() error { | |
| 144 c := log.SetField(c, "messageID", msg.ID) | |
| 145 msg.Done(a.processMessage(c, &coll, msg)) | |
| 146 return nil | |
| 147 } | 147 } |
| 148 } | 148 } |
| 149 })) | 149 })) |
| 150 | 150 |
| 151 log.Debugf(c, "Collector finished.") | 151 log.Debugf(c, "Collector finished.") |
| 152 return nil | 152 return nil |
| 153 } | 153 } |
| 154 | 154 |
| 155 // processMessage returns true if the message should be ACK'd (deleted from | 155 // processMessage returns true if the message should be ACK'd (deleted from |
| 156 // Pub/Sub) or false if the message should not be ACK'd. | 156 // Pub/Sub) or false if the message should not be ACK'd. |
| (...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 190 }.Errorf(c, "Non-transient error ingesting Pub/Sub message; ACKi
ng.") | 190 }.Errorf(c, "Non-transient error ingesting Pub/Sub message; ACKi
ng.") |
| 191 return true | 191 return true |
| 192 } | 192 } |
| 193 } | 193 } |
| 194 | 194 |
| 195 // Entry point. | 195 // Entry point. |
| 196 func main() { | 196 func main() { |
| 197 a := application{} | 197 a := application{} |
| 198 a.Run(context.Background(), a.runCollector) | 198 a.Run(context.Background(), a.runCollector) |
| 199 } | 199 } |
| OLD | NEW |