Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(102)

Side by Side Diff: server/cmd/logdog_collector/main.go

Issue 1838803002: LogDog: BigTable batching schema. (Closed) Base URL: https://github.com/luci/luci-go@recordio-split
Patch Set: Minor comments and quality of code tweaks. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package main 5 package main
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 "time" 9 "time"
10 10
(...skipping 69 matching lines...) Expand 10 before | Expand all | Expand 10 after
80 if !exists { 80 if !exists {
81 log.Fields{ 81 log.Fields{
82 "subscription": sub, 82 "subscription": sub,
83 }.Errorf(c, "Subscription does not exist.") 83 }.Errorf(c, "Subscription does not exist.")
84 return errInvalidConfig 84 return errInvalidConfig
85 } 85 }
86 log.Fields{ 86 log.Fields{
87 "subscription": sub, 87 "subscription": sub,
88 }.Infof(c, "Successfully validated Pub/Sub subscription.") 88 }.Infof(c, "Successfully validated Pub/Sub subscription.")
89 89
90 » // Initialize our Storage. 90 » st, err := a.IntermediateStorage(c)
91 » s, err := a.IntermediateStorage(c)
92 if err != nil { 91 if err != nil {
93 log.WithError(err).Errorf(c, "Failed to get storage instance.")
94 return err 92 return err
95 } 93 }
96 » defer s.Close() 94 » defer st.Close()
97 95
98 // Application shutdown will now operate by cancelling the Collector's 96 // Application shutdown will now operate by cancelling the Collector's
99 // shutdown Context. 97 // shutdown Context.
100 shutdownCtx, shutdownFunc := context.WithCancel(c) 98 shutdownCtx, shutdownFunc := context.WithCancel(c)
101 a.SetShutdownFunc(shutdownFunc) 99 a.SetShutdownFunc(shutdownFunc)
102 100
103 // Start an ACK buffer so that we can batch ACKs. Note that we do NOT us e the 101 // Start an ACK buffer so that we can batch ACKs. Note that we do NOT us e the
104 // shutdown context here, as we want clean shutdowns to continue to ack any 102 // shutdown context here, as we want clean shutdowns to continue to ack any
105 // buffered messages. 103 // buffered messages.
106 ab := ackbuffer.New(c, ackbuffer.Config{ 104 ab := ackbuffer.New(c, ackbuffer.Config{
107 Ack: ackbuffer.NewACK(ps, sub, 0), 105 Ack: ackbuffer.NewACK(ps, sub, 0),
108 }) 106 })
109 defer ab.CloseAndFlush() 107 defer ab.CloseAndFlush()
110 108
111 // Initialize our Collector service object using a caching Coordinator 109 // Initialize our Collector service object using a caching Coordinator
112 // interface. 110 // interface.
113 coord := coordinator.NewCoordinator(a.Coordinator()) 111 coord := coordinator.NewCoordinator(a.Coordinator())
114 coord = coordinator.NewCache(coord, int(ccfg.StateCacheSize), ccfg.State CacheExpiration.Duration()) 112 coord = coordinator.NewCache(coord, int(ccfg.StateCacheSize), ccfg.State CacheExpiration.Duration())
115 113
116 coll := collector.Collector{ 114 coll := collector.Collector{
117 » » Coordinator: coord, 115 » » Coordinator: coord,
118 » » Storage: s, 116 » » Storage: st,
119 » » MaxParallelBundles: int(ccfg.Workers), 117 » » MaxMessageWorkers: int(ccfg.MaxMessageWorkers),
120 » » MaxIngestWorkers: int(ccfg.Workers),
121 } 118 }
122 defer coll.Close() 119 defer coll.Close()
123 120
124 // Execute our main Subscriber loop. It will run until the supplied Cont ext 121 // Execute our main Subscriber loop. It will run until the supplied Cont ext
125 // is cancelled. 122 // is cancelled.
126 clk := clock.Get(c) 123 clk := clock.Get(c)
127 engine := subscriber.Subscriber{ 124 engine := subscriber.Subscriber{
128 » » S: subscriber.NewSource(ps, sub, 0), 125 » » S: subscriber.NewSource(ps, sub),
129 » » A: ab, 126 » » A: ab,
130 127 » » Workers: int(ccfg.MaxConcurrentMessages),
131 » » PullWorkers: int(ccfg.TransportWorkers),
132 » » HandlerWorkers: int(ccfg.Workers),
133 } 128 }
134 engine.Run(shutdownCtx, func(msg *pubsub.Message) bool { 129 engine.Run(shutdownCtx, func(msg *pubsub.Message) bool {
135 c := log.SetField(c, "messageID", msg.ID) 130 c := log.SetField(c, "messageID", msg.ID)
136 log.Fields{ 131 log.Fields{
137 "ackID": msg.AckID, 132 "ackID": msg.AckID,
138 "size": len(msg.Data), 133 "size": len(msg.Data),
139 }.Infof(c, "Received Pub/Sub Message.") 134 }.Infof(c, "Received Pub/Sub Message.")
140 135
141 startTime := clk.Now() 136 startTime := clk.Now()
142 err := coll.Process(c, msg.Data) 137 err := coll.Process(c, msg.Data)
(...skipping 29 matching lines...) Expand all
172 167
173 log.Debugf(c, "Collector finished.") 168 log.Debugf(c, "Collector finished.")
174 return nil 169 return nil
175 } 170 }
176 171
177 // Entry point. 172 // Entry point.
178 func main() { 173 func main() {
179 a := application{} 174 a := application{}
180 a.Run(context.Background(), a.runCollector) 175 a.Run(context.Background(), a.runCollector)
181 } 176 }
OLDNEW
« no previous file with comments | « common/proto/logdog/svcconfig/config.pb.go ('k') | server/internal/logdog/archivist/archivist.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698