| Index: server/cmd/logdog_collector/main.go
|
| diff --git a/server/cmd/logdog_collector/main.go b/server/cmd/logdog_collector/main.go
|
| index 0020d8a9aaaa00bbae8f29ef320df94a27216844..ffc2997769dab18317a4d5209fde8880aeaf29e2 100644
|
| --- a/server/cmd/logdog_collector/main.go
|
| +++ b/server/cmd/logdog_collector/main.go
|
| @@ -87,13 +87,11 @@ func (a *application) runCollector(c context.Context) error {
|
| "subscription": sub,
|
| }.Infof(c, "Successfully validated Pub/Sub subscription.")
|
|
|
| - // Initialize our Storage.
|
| - s, err := a.IntermediateStorage(c)
|
| + st, err := a.IntermediateStorage(c)
|
| if err != nil {
|
| - log.WithError(err).Errorf(c, "Failed to get storage instance.")
|
| return err
|
| }
|
| - defer s.Close()
|
| + defer st.Close()
|
|
|
| // Application shutdown will now operate by cancelling the Collector's
|
| // shutdown Context.
|
| @@ -114,10 +112,9 @@ func (a *application) runCollector(c context.Context) error {
|
| coord = coordinator.NewCache(coord, int(ccfg.StateCacheSize), ccfg.StateCacheExpiration.Duration())
|
|
|
| coll := collector.Collector{
|
| - Coordinator: coord,
|
| - Storage: s,
|
| - MaxParallelBundles: int(ccfg.Workers),
|
| - MaxIngestWorkers: int(ccfg.Workers),
|
| + Coordinator: coord,
|
| + Storage: st,
|
| + MaxMessageWorkers: int(ccfg.MaxMessageWorkers),
|
| }
|
| defer coll.Close()
|
|
|
| @@ -125,11 +122,9 @@ func (a *application) runCollector(c context.Context) error {
|
| // is cancelled.
|
| clk := clock.Get(c)
|
| engine := subscriber.Subscriber{
|
| - S: subscriber.NewSource(ps, sub, 0),
|
| - A: ab,
|
| -
|
| - PullWorkers: int(ccfg.TransportWorkers),
|
| - HandlerWorkers: int(ccfg.Workers),
|
| + S: subscriber.NewSource(ps, sub),
|
| + A: ab,
|
| + Workers: int(ccfg.MaxConcurrentMessages),
|
| }
|
| engine.Run(shutdownCtx, func(msg *pubsub.Message) bool {
|
| c := log.SetField(c, "messageID", msg.ID)
|
|
|