Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(577)

Side by Side Diff: server/internal/logdog/collector/collector.go

Issue 1838803002: LogDog: BigTable batching schema. (Closed) Base URL: https://github.com/luci/luci-go@recordio-split
Patch Set: Minor comments and quality of code tweaks. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package collector 5 package collector
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "sync"
10 "time" 9 "time"
11 10
12 "github.com/golang/protobuf/proto" 11 "github.com/golang/protobuf/proto"
13 "github.com/luci/luci-go/common/errors" 12 "github.com/luci/luci-go/common/errors"
14 "github.com/luci/luci-go/common/logdog/butlerproto" 13 "github.com/luci/luci-go/common/logdog/butlerproto"
15 "github.com/luci/luci-go/common/logdog/types" 14 "github.com/luci/luci-go/common/logdog/types"
16 log "github.com/luci/luci-go/common/logging" 15 log "github.com/luci/luci-go/common/logging"
17 "github.com/luci/luci-go/common/parallel" 16 "github.com/luci/luci-go/common/parallel"
18 "github.com/luci/luci-go/common/proto/logdog/logpb" 17 "github.com/luci/luci-go/common/proto/logdog/logpb"
19 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator" 18 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator"
20 "github.com/luci/luci-go/server/logdog/storage" 19 "github.com/luci/luci-go/server/logdog/storage"
21 "golang.org/x/net/context" 20 "golang.org/x/net/context"
22 ) 21 )
23 22
23 const (
24 // DefaultMaxMessageWorkers is the default number of concurrent worker
25 // goroutones to employ for a single message.
26 DefaultMaxMessageWorkers = 4
27 )
28
24 // Collector is a stateful object responsible for ingesting LogDog logs, 29 // Collector is a stateful object responsible for ingesting LogDog logs,
25 // registering them with a Coordinator, and stowing them in short-term storage 30 // registering them with a Coordinator, and stowing them in short-term storage
26 // for streaming and processing. 31 // for streaming and processing.
27 // 32 //
28 // A Collector's Close should be called when finished to release any internal 33 // A Collector's Close should be called when finished to release any internal
29 // resources. 34 // resources.
30 type Collector struct { 35 type Collector struct {
31 // Coordinator is used to interface with the Coordinator client. 36 // Coordinator is used to interface with the Coordinator client.
32 // 37 //
33 // On production systems, this should wrapped with a caching client (see 38 // On production systems, this should wrapped with a caching client (see
34 // the stateCache sub-package) to avoid overwhelming the server. 39 // the stateCache sub-package) to avoid overwhelming the server.
35 Coordinator coordinator.Coordinator 40 Coordinator coordinator.Coordinator
36 41
37 » // Storage is the backing store to use. 42 » // Storage is the intermediate storage instance to use.
38 Storage storage.Storage 43 Storage storage.Storage
39 44
40 // StreamStateCacheExpire is the maximum amount of time that a cached st ream 45 // StreamStateCacheExpire is the maximum amount of time that a cached st ream
41 // state entry is valid. If zero, DefaultStreamStateCacheExpire will be used. 46 // state entry is valid. If zero, DefaultStreamStateCacheExpire will be used.
42 StreamStateCacheExpire time.Duration 47 StreamStateCacheExpire time.Duration
43 48
44 » // MaxParallelBundles is the maximum number of log entry bundles per mes sage 49 » // MaxMessageWorkers is the maximum number of concurrent workers to empl oy
45 » // to handle in parallel. If <= 0, no maximum will be applied. 50 » // for any given message. If <= 0, DefaultMaxMessageWorkers will be appl ied.
46 » MaxParallelBundles int 51 » MaxMessageWorkers int
47 » // MaxIngestWorkers is the maximum number of ingest worker goroutines th at
48 » // will operate at a time. If <= 0, no maximum will be applied.
49 » MaxIngestWorkers int
50
51 » // initOnce is used to ensure that the Collector's internal state is
52 » // initialized at most once.
53 » initOnce sync.Once
54 » // runner is the Runner that will be used for ingest. It will be configu red
55 » // based on the supplied MaxIngestWorkers parameter.
56 » //
57 » // Internally, runner must not be used by tasks that themselves use the
58 » // runner, else deadlock could occur.
59 » runner *parallel.Runner
60 }
61
62 // init initializes the operational state of the Collector. It must be called
63 // internally at the beginning of any exported method that uses that state.
64 func (c *Collector) init() {
65 » c.initOnce.Do(func() {
66 » » c.runner = &parallel.Runner{
67 » » » Sustained: c.MaxIngestWorkers,
68 » » » Maximum: c.MaxIngestWorkers,
69 » » }
70 » })
71 } 52 }
72 53
73 // Process ingests an encoded ButlerLogBundle message, registering it with 54 // Process ingests an encoded ButlerLogBundle message, registering it with
74 // the LogDog Coordinator and stowing it in a temporary Storage for streaming 55 // the LogDog Coordinator and stowing it in a temporary Storage for streaming
75 // retrieval. 56 // retrieval.
76 // 57 //
77 // If a transient error occurs during ingest, Process will return an error. 58 // If a transient error occurs during ingest, Process will return an error.
78 // If no error occurred, or if there was an error with the input data, no error 59 // If no error occurred, or if there was an error with the input data, no error
79 // will be returned. 60 // will be returned.
80 func (c *Collector) Process(ctx context.Context, msg []byte) error { 61 func (c *Collector) Process(ctx context.Context, msg []byte) error {
81 c.init()
82
83 pr := butlerproto.Reader{} 62 pr := butlerproto.Reader{}
84 if err := pr.Read(bytes.NewReader(msg)); err != nil { 63 if err := pr.Read(bytes.NewReader(msg)); err != nil {
85 log.Errorf(log.SetError(ctx, err), "Failed to unpack message.") 64 log.Errorf(log.SetError(ctx, err), "Failed to unpack message.")
86 return nil 65 return nil
87 } 66 }
88 if pr.Metadata.ProtoVersion != logpb.Version { 67 if pr.Metadata.ProtoVersion != logpb.Version {
89 log.Fields{ 68 log.Fields{
90 "messageProtoVersion": pr.Metadata.ProtoVersion, 69 "messageProtoVersion": pr.Metadata.ProtoVersion,
91 "currentProtoVersion": logpb.Version, 70 "currentProtoVersion": logpb.Version,
92 }.Errorf(ctx, "Unknown protobuf version.") 71 }.Errorf(ctx, "Unknown protobuf version.")
(...skipping 22 matching lines...) Expand all
115 94
116 fields.Infof(ctx, "Processing log bundle entry.") 95 fields.Infof(ctx, "Processing log bundle entry.")
117 } 96 }
118 } 97 }
119 98
120 // If there are no entries, there is nothing to do. 99 // If there are no entries, there is nothing to do.
121 if len(pr.Bundle.Entries) == 0 { 100 if len(pr.Bundle.Entries) == 0 {
122 return nil 101 return nil
123 } 102 }
124 103
125 » // Define our logWork template. This will be cloned for each ingested lo g 104 » lw := bundleHandler{
126 » // stream. 105 » » msg: msg,
127 » lw := logWork{ 106 » » md: pr.Metadata,
128 » » md: pr.Metadata, 107 » » b: pr.Bundle,
129 » » b: pr.Bundle,
130 } 108 }
131 109
132 // Handle each bundle entry in parallel. We will use a separate work poo l 110 // Handle each bundle entry in parallel. We will use a separate work poo l
133 // here so that top-level bundle dispatch can't deadlock the processing tasks. 111 // here so that top-level bundle dispatch can't deadlock the processing tasks.
134 » err := parallel.WorkPool(c.MaxParallelBundles, func(taskC chan<- func() error) { 112 » workers := c.MaxMessageWorkers
113 » if workers <= 0 {
114 » » workers = DefaultMaxMessageWorkers
115 » }
116 » err := parallel.WorkPool(workers, func(taskC chan<- func() error) {
135 for _, be := range pr.Bundle.Entries { 117 for _, be := range pr.Bundle.Entries {
136 » » » lw := lw 118 » » » be := be
137 » » » lw.be = be 119
138 taskC <- func() error { 120 taskC <- func() error {
139 » » » » return c.processLogStream(ctx, &lw) 121 » » » » return c.processLogStream(ctx, &bundleEntryHandl er{
122 » » » » » bundleHandler: &lw,
123 » » » » » be: be,
124 » » » » })
140 } 125 }
141 } 126 }
142 }) 127 })
143 if err != nil { 128 if err != nil {
144 » » if hasTransientError(err) && !errors.IsTransient(err) { 129 » » if !errors.IsTransient(err) && hasTransientError(err) {
145 // err has a nested transient error; propagate that to t op. 130 // err has a nested transient error; propagate that to t op.
146 err = errors.WrapTransient(err) 131 err = errors.WrapTransient(err)
147 } 132 }
148 return err 133 return err
149 } 134 }
150 return nil 135 return nil
151 } 136 }
152 137
153 // Close releases any internal resources and blocks pending the completion of 138 // Close releases any internal resources and blocks pending the completion of
154 // any outstanding operations. After Close, no new Process calls may be made. 139 // any outstanding operations. After Close, no new Process calls may be made.
155 func (c *Collector) Close() { 140 func (c *Collector) Close() {
156 c.init()
157
158 c.runner.Close()
159 } 141 }
160 142
161 // logWork is a cumulative set of read-only state passed around by value for log 143 // bundleHandler is a cumulative set of read-only state passed around by
162 // processing. 144 // value for log processing.
163 type logWork struct { 145 type bundleHandler struct {
146 » // msg is the original message bytes.
147 » msg []byte
164 // md is the metadata associated with the overall message. 148 // md is the metadata associated with the overall message.
165 md *logpb.ButlerMetadata 149 md *logpb.ButlerMetadata
166 // b is the Butler bundle. 150 // b is the Butler bundle.
167 b *logpb.ButlerLogBundle 151 b *logpb.ButlerLogBundle
152 }
153
154 type bundleEntryHandler struct {
155 *bundleHandler
156
168 // be is the Bundle entry. 157 // be is the Bundle entry.
169 be *logpb.ButlerLogBundle_Entry 158 be *logpb.ButlerLogBundle_Entry
170 // path is the constructed path of the stream being processed. 159 // path is the constructed path of the stream being processed.
171 path types.StreamPath 160 path types.StreamPath
172 // le is the LogEntry in the bundle entry.
173 le *logpb.LogEntry
174 } 161 }
175 162
176 // processLogStream processes an individual set of log messages belonging to the 163 // processLogStream processes an individual set of log messages belonging to the
177 // same log stream. 164 // same log stream.
178 func (c *Collector) processLogStream(ctx context.Context, lw *logWork) error { 165 func (c *Collector) processLogStream(ctx context.Context, h *bundleEntryHandler) error {
179 » if err := lw.be.Desc.Validate(true); err != nil { 166 » if err := h.be.Desc.Validate(true); err != nil {
180 log.Errorf(log.SetError(ctx, err), "Invalid log stream descripto r.") 167 log.Errorf(log.SetError(ctx, err), "Invalid log stream descripto r.")
181 » » return nil 168 » » return err
182 } 169 }
183 » lw.path = types.StreamName(lw.be.Desc.Prefix).Join(types.StreamName(lw.b e.Desc.Name)) 170 » h.path = types.StreamName(h.be.Desc.Prefix).Join(types.StreamName(h.be.D esc.Name))
184 » ctx = log.SetField(ctx, "path", lw.path) 171 » ctx = log.SetField(ctx, "path", h.path)
185 172
186 » if len(lw.be.Secret) == 0 { 173 » if len(h.be.Secret) == 0 {
187 log.Errorf(ctx, "Missing secret.") 174 log.Errorf(ctx, "Missing secret.")
188 » » return nil 175 » » return errors.New("missing stream secret")
176 » }
177
178 » // Confirm that the log entries are valid and contiguous. Serialize the log
179 » // entries for ingest as we validate them.
180 » var logData [][]byte
181 » var blockIndex uint64
182 » if logs := h.be.Logs; len(logs) > 0 {
183 » » logData = make([][]byte, len(logs))
184 » » blockIndex = logs[0].StreamIndex
185
186 » » for i, le := range logs {
187 » » » // Validate this log entry.
188 » » » if err := le.Validate(h.be.Desc); err != nil {
189 » » » » log.Fields{
190 » » » » » log.ErrorKey: err,
191 » » » » » "index": le.StreamIndex,
192 » » » » }.Warningf(ctx, "Discarding invalid log entry.")
193 » » » » return errors.New("invalid log entry")
194 » » » }
195
196 » » » // Validate that this entry is contiguous.
197 » » » if le.StreamIndex != blockIndex+uint64(i) {
198 » » » » log.Fields{
199 » » » » » "index": i,
200 » » » » » "expected": (blockIndex + uint64(i)),
201 » » » » » "actual": le.StreamIndex,
202 » » » » }.Errorf(ctx, "Non-contiguous log entry block in stream.")
203 » » » » return errors.New("non-contiguous log entry bloc k")
204 » » » }
205
206 » » » var err error
207 » » » logData[i], err = proto.Marshal(le)
208 » » » if err != nil {
209 » » » » log.Fields{
210 » » » » » log.ErrorKey: err,
211 » » » » » "index": le.StreamIndex,
212 » » » » }.Errorf(ctx, "Failed to marshal log entry.")
213 » » » » return errors.New("failed to marshal log entries ")
214 » » » }
215 » » }
189 } 216 }
190 217
191 // Fetch our cached/remote state. This will replace our state object wit h the 218 // Fetch our cached/remote state. This will replace our state object wit h the
192 // fetched state, so any future calls will need to re-set the Secret val ue. 219 // fetched state, so any future calls will need to re-set the Secret val ue.
193 // TODO: Use timeout? 220 // TODO: Use timeout?
194 state, err := c.Coordinator.RegisterStream(ctx, &coordinator.LogStreamSt ate{ 221 state, err := c.Coordinator.RegisterStream(ctx, &coordinator.LogStreamSt ate{
195 » » Path: lw.path, 222 » » Path: h.path,
196 » » Secret: types.StreamSecret(lw.be.Secret), 223 » » Secret: types.StreamSecret(h.be.Secret),
197 » » ProtoVersion: lw.md.ProtoVersion, 224 » » ProtoVersion: h.md.ProtoVersion,
198 » }, lw.be.Desc) 225 » }, h.be.Desc)
199 if err != nil { 226 if err != nil {
200 log.WithError(err).Errorf(ctx, "Failed to get/register current s tream state.") 227 log.WithError(err).Errorf(ctx, "Failed to get/register current s tream state.")
201 return err 228 return err
202 } 229 }
203 230
204 // Does the log stream's secret match the expected secret? 231 // Does the log stream's secret match the expected secret?
205 » if !bytes.Equal(lw.be.Secret, []byte(state.Secret)) { 232 » if !bytes.Equal(h.be.Secret, []byte(state.Secret)) {
206 log.Errorf(log.SetFields(ctx, log.Fields{ 233 log.Errorf(log.SetFields(ctx, log.Fields{
207 » » » "secret": lw.be.Secret, 234 » » » "secret": h.be.Secret,
208 "expectedSecret": state.Secret, 235 "expectedSecret": state.Secret,
209 }), "Log entry has incorrect secret.") 236 }), "Log entry has incorrect secret.")
210 return nil 237 return nil
211 } 238 }
212 239
213 if state.Archived { 240 if state.Archived {
214 log.Infof(ctx, "Skipping message bundle for archived stream.") 241 log.Infof(ctx, "Skipping message bundle for archived stream.")
215 return nil 242 return nil
216 } 243 }
217 if state.Purged { 244 if state.Purged {
218 log.Infof(ctx, "Skipping message bundle for purged stream.") 245 log.Infof(ctx, "Skipping message bundle for purged stream.")
219 return nil 246 return nil
220 } 247 }
221 248
222 // Update our terminal index if we have one. 249 // Update our terminal index if we have one.
223 // 250 //
224 // Note that even if our cached value is marked terminal, we could have failed 251 // Note that even if our cached value is marked terminal, we could have failed
225 // to push the terminal index to the Coordinator, so we will not refrain from 252 // to push the terminal index to the Coordinator, so we will not refrain from
226 // pushing every terminal index encountered regardless of cache state. 253 // pushing every terminal index encountered regardless of cache state.
227 » if lw.be.Terminal { 254 » if h.be.Terminal {
228 » » tidx := types.MessageIndex(lw.be.TerminalIndex) 255 » » tidx := types.MessageIndex(h.be.TerminalIndex)
229 log.Fields{ 256 log.Fields{
230 "value": tidx, 257 "value": tidx,
231 }.Debugf(ctx, "Bundle includes a terminal index.") 258 }.Debugf(ctx, "Bundle includes a terminal index.")
232 259
233 if state.TerminalIndex < 0 { 260 if state.TerminalIndex < 0 {
234 state.TerminalIndex = tidx 261 state.TerminalIndex = tidx
235 } else if state.TerminalIndex != tidx { 262 } else if state.TerminalIndex != tidx {
236 log.Fields{ 263 log.Fields{
237 "cachedIndex": state.TerminalIndex, 264 "cachedIndex": state.TerminalIndex,
238 "bundleIndex": tidx, 265 "bundleIndex": tidx,
239 }.Warningf(ctx, "Cached terminal index disagrees with st ate.") 266 }.Warningf(ctx, "Cached terminal index disagrees with st ate.")
240 } 267 }
241 } 268 }
242 269
243 » // In parallel, load the log entries into Storage. Throttle this with ou r 270 » // Perform stream processing operations. We can do these operations in
244 » // ingest semaphore. 271 » // parallel.
245 » return errors.MultiErrorFromErrors(c.runner.Run(func(taskC chan<- func() error) { 272 » return parallel.FanOutIn(func(taskC chan<- func() error) {
246 » » for i, le := range lw.be.Logs { 273 » » // Store log data, if any was provided. It has already been vali dated.
247 » » » i, le := i, le 274 » » if len(logData) > 0 {
275 » » » taskC <- func() error {
276 » » » » // Post the log to storage.
277 » » » » err = c.Storage.Put(storage.PutRequest{
278 » » » » » Path: h.path,
279 » » » » » Index: types.MessageIndex(blockIndex),
280 » » » » » Values: logData,
281 » » » » })
248 282
249 » » » // Store this LogEntry 283 » » » » // If the log entry already exists, consider the "put" successful.
250 » » » taskC <- func() error { 284 » » » » // Storage will return a transient error if one occurred.
251 » » » » if err := le.Validate(lw.be.Desc); err != nil { 285 » » » » if err != nil && err != storage.ErrExists {
252 log.Fields{ 286 log.Fields{
253 log.ErrorKey: err, 287 log.ErrorKey: err,
254 » » » » » » "index": i, 288 » » » » » » "blockIndex": blockIndex,
255 » » » » » }.Warningf(ctx, "Discarding invalid log entry.") 289 » » » » » }.Errorf(ctx, "Failed to load log entry into Storage.")
256 » » » » » return nil 290 » » » » » return err
257 } 291 }
258 292 » » » » return nil
259 » » » » if state.TerminalIndex >= 0 && types.MessageInde x(le.StreamIndex) > state.TerminalIndex {
260 » » » » » log.Fields{
261 » » » » » » "index": le.StreamIndex,
262 » » » » » » "terminalIndex": state.TerminalI ndex,
263 » » » » » }.Warningf(ctx, "Stream is terminated be fore log entry; discarding.")
264 » » » » » return nil
265 » » » » }
266
267 » » » » lw := *lw
268 » » » » lw.le = le
269 » » » » return c.processLogEntry(ctx, &lw)
270 } 293 }
271 } 294 }
272 295
273 // If our bundle entry is terminal, we have an additional task o f reporting 296 // If our bundle entry is terminal, we have an additional task o f reporting
274 // this to the Coordinator. 297 // this to the Coordinator.
275 » » if lw.be.Terminal { 298 » » if h.be.Terminal {
276 taskC <- func() error { 299 taskC <- func() error {
277 // Sentinel task: Update the terminal bundle sta te. 300 // Sentinel task: Update the terminal bundle sta te.
278 state := *state 301 state := *state
279 » » » » state.TerminalIndex = types.MessageIndex(lw.be.T erminalIndex) 302 » » » » state.TerminalIndex = types.MessageIndex(h.be.Te rminalIndex)
280 303
281 log.Fields{ 304 log.Fields{
282 "terminalIndex": state.TerminalIndex, 305 "terminalIndex": state.TerminalIndex,
283 }.Infof(ctx, "Received terminal log; updating Co ordinator state.") 306 }.Infof(ctx, "Received terminal log; updating Co ordinator state.")
284 307
285 if err := c.Coordinator.TerminateStream(ctx, &st ate); err != nil { 308 if err := c.Coordinator.TerminateStream(ctx, &st ate); err != nil {
286 log.WithError(err).Errorf(ctx, "Failed t o set stream terminal index.") 309 log.WithError(err).Errorf(ctx, "Failed t o set stream terminal index.")
287 return err 310 return err
288 } 311 }
289 return nil 312 return nil
290 } 313 }
291 } 314 }
292 }))
293 }
294
295 func (c *Collector) processLogEntry(ctx context.Context, lw *logWork) error {
296 data, err := proto.Marshal(lw.le)
297 if err != nil {
298 log.WithError(err).Errorf(ctx, "Failed to marshal log entry.")
299 return err
300 }
301
302 // Post the log to storage.
303 err = c.Storage.Put(&storage.PutRequest{
304 Path: lw.path,
305 Index: types.MessageIndex(lw.le.StreamIndex),
306 Value: data,
307 }) 315 })
308
309 // If the log entry already exists, consider the "put" successful.
310 //
311 // All Storage errors are considered transient, as they are safe and
312 // data-agnostic.
313 if err != nil && err != storage.ErrExists {
314 log.WithError(err).Errorf(ctx, "Failed to load log entry into St orage.")
315 return errors.WrapTransient(err)
316 }
317 return nil
318 } 316 }
319 317
320 // wrapMultiErrorTransient wraps an error in a TransientError wrapper. 318 // wrapMultiErrorTransient wraps an error in a TransientError wrapper.
321 // 319 //
322 // If the error is nil, it will return nil. If the error is already transient, 320 // If the error is nil, it will return nil. If the error is already transient,
323 // it will be directly returned. If the error is a MultiError, its sub-errors 321 // it will be directly returned. If the error is a MultiError, its sub-errors
324 // will be evaluated and wrapped in a TransientError if any of its sub-errors 322 // will be evaluated and wrapped in a TransientError if any of its sub-errors
325 // are transient errors. 323 // are transient errors.
326 func hasTransientError(err error) bool { 324 func hasTransientError(err error) bool {
327 if merr, ok := err.(errors.MultiError); ok { 325 if merr, ok := err.(errors.MultiError); ok {
328 for _, e := range merr { 326 for _, e := range merr {
329 if hasTransientError(e) { 327 if hasTransientError(e) {
330 return true 328 return true
331 } 329 }
332 } 330 }
333 return false 331 return false
334 } 332 }
335 333
336 return errors.IsTransient(err) 334 return errors.IsTransient(err)
337 } 335 }
OLDNEW
« no previous file with comments | « server/internal/logdog/archivist/archivist_test.go ('k') | server/internal/logdog/collector/collector_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698