Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(94)

Unified Diff: server/cmd/logdog_collector/main.go

Issue 1838803002: LogDog: BigTable batching schema. (Closed) Base URL: https://github.com/luci/luci-go@recordio-split
Patch Set: Minor comments and quality of code tweaks. Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « common/proto/logdog/svcconfig/config.pb.go ('k') | server/internal/logdog/archivist/archivist.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: server/cmd/logdog_collector/main.go
diff --git a/server/cmd/logdog_collector/main.go b/server/cmd/logdog_collector/main.go
index 0020d8a9aaaa00bbae8f29ef320df94a27216844..ffc2997769dab18317a4d5209fde8880aeaf29e2 100644
--- a/server/cmd/logdog_collector/main.go
+++ b/server/cmd/logdog_collector/main.go
@@ -87,13 +87,11 @@ func (a *application) runCollector(c context.Context) error {
"subscription": sub,
}.Infof(c, "Successfully validated Pub/Sub subscription.")
- // Initialize our Storage.
- s, err := a.IntermediateStorage(c)
+ st, err := a.IntermediateStorage(c)
if err != nil {
- log.WithError(err).Errorf(c, "Failed to get storage instance.")
return err
}
- defer s.Close()
+ defer st.Close()
// Application shutdown will now operate by cancelling the Collector's
// shutdown Context.
@@ -114,10 +112,9 @@ func (a *application) runCollector(c context.Context) error {
coord = coordinator.NewCache(coord, int(ccfg.StateCacheSize), ccfg.StateCacheExpiration.Duration())
coll := collector.Collector{
- Coordinator: coord,
- Storage: s,
- MaxParallelBundles: int(ccfg.Workers),
- MaxIngestWorkers: int(ccfg.Workers),
+ Coordinator: coord,
+ Storage: st,
+ MaxMessageWorkers: int(ccfg.MaxMessageWorkers),
}
defer coll.Close()
@@ -125,11 +122,9 @@ func (a *application) runCollector(c context.Context) error {
// is cancelled.
clk := clock.Get(c)
engine := subscriber.Subscriber{
- S: subscriber.NewSource(ps, sub, 0),
- A: ab,
-
- PullWorkers: int(ccfg.TransportWorkers),
- HandlerWorkers: int(ccfg.Workers),
+ S: subscriber.NewSource(ps, sub),
+ A: ab,
+ Workers: int(ccfg.MaxConcurrentMessages),
}
engine.Run(shutdownCtx, func(msg *pubsub.Message) bool {
c := log.SetField(c, "messageID", msg.ID)
« no previous file with comments | « common/proto/logdog/svcconfig/config.pb.go ('k') | server/internal/logdog/archivist/archivist.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698