| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package collector |
| 6 |
| 7 import ( |
| 8 "bytes" |
| 9 "time" |
| 10 |
| 11 "github.com/golang/protobuf/proto" |
| 12 "github.com/luci/luci-go/common/errors" |
| 13 "github.com/luci/luci-go/common/logdog/butlerproto" |
| 14 "github.com/luci/luci-go/common/logdog/types" |
| 15 log "github.com/luci/luci-go/common/logging" |
| 16 "github.com/luci/luci-go/common/parallel" |
| 17 "github.com/luci/luci-go/common/proto/logdog/logpb" |
| 18 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator" |
| 19 "github.com/luci/luci-go/server/logdog/storage" |
| 20 "golang.org/x/net/context" |
| 21 ) |
| 22 |
| 23 // Collector is a stateful object responsible for ingesting LogDog logs, |
| 24 // registering them with a Coordinator, and stowing them in short-term storage |
| 25 // for streaming and processing. |
| 26 type Collector struct { |
| 27 // Coordinator is used to interface with the Coordinator client. |
| 28 // |
| 29 // On production systems, this should wrapped with a caching client (see |
| 30 // the stateCache sub-package) to avoid overwhelming the server. |
| 31 Coordinator coordinator.Coordinator |
| 32 |
| 33 // Storage is the backing store to use. |
| 34 Storage storage.Storage |
| 35 // Sem is a semaphore used to throttle the number of simultaneous ingest |
| 36 // tasks that are executed. |
| 37 Sem parallel.Semaphore |
| 38 |
| 39 // StreamStateCacheExpire is the maximum amount of time that a cached st
ream |
| 40 // state entry is valid. If zero, DefaultStreamStateCacheExpire will be
used. |
| 41 StreamStateCacheExpire time.Duration |
| 42 } |
| 43 |
| 44 // Process ingests an encoded ButlerLogBundle message, registering it with |
| 45 // the LogDog Coordinator and stowing it in a temporary Storage for streaming |
| 46 // retrieval. |
| 47 // |
| 48 // If a transient error occurs during ingest, Process will return an error. |
| 49 // If no error occurred, or if there was an error with the input data, no error |
| 50 // will be returned. |
| 51 func (c *Collector) Process(ctx context.Context, msg []byte) error { |
| 52 pr := butlerproto.Reader{} |
| 53 if err := pr.Read(bytes.NewReader(msg)); err != nil { |
| 54 log.Errorf(log.SetError(ctx, err), "Failed to unpack message.") |
| 55 return nil |
| 56 } |
| 57 if pr.Metadata.ProtoVersion != logpb.Version { |
| 58 log.Fields{ |
| 59 "messageProtoVersion": pr.Metadata.ProtoVersion, |
| 60 "currentProtoVersion": logpb.Version, |
| 61 }.Errorf(ctx, "Unknown protobuf version.") |
| 62 return nil |
| 63 } |
| 64 if pr.Bundle == nil { |
| 65 log.Errorf(ctx, "Protocol message did not contain a Butler bundl
e.") |
| 66 return nil |
| 67 } |
| 68 |
| 69 // If there are no entries, there is nothing to do. |
| 70 if len(pr.Bundle.Entries) == 0 { |
| 71 return nil |
| 72 } |
| 73 |
| 74 lw := logWork{ |
| 75 md: pr.Metadata, |
| 76 b: pr.Bundle, |
| 77 } |
| 78 |
| 79 // Handle each bundle entry in parallel. We will use a separate work poo
l |
| 80 // here so that top-level bundle dispatch can't deadlock the processing
tasks. |
| 81 // |
| 82 // If we don't have a semaphore, this will also be unbounded, since cap(
nil) |
| 83 // is 0. |
| 84 err := parallel.WorkPool(cap(c.Sem), func(taskC chan<- func() error) { |
| 85 for _, be := range pr.Bundle.Entries { |
| 86 lw := lw |
| 87 lw.be = be |
| 88 taskC <- func() error { |
| 89 return c.processLogStream(ctx, &lw) |
| 90 } |
| 91 } |
| 92 }) |
| 93 if err != nil { |
| 94 if hasTransientError(err) && !errors.IsTransient(err) { |
| 95 // err has a nested transient error; propagate that to t
op. |
| 96 err = errors.WrapTransient(err) |
| 97 } |
| 98 return err |
| 99 } |
| 100 return nil |
| 101 } |
| 102 |
| 103 // logWork is a cumulative set of read-only state passed around by value for log |
| 104 // processing. |
| 105 type logWork struct { |
| 106 // md is the metadata associated with the overall message. |
| 107 md *logpb.ButlerMetadata |
| 108 // b is the Butler bundle. |
| 109 b *logpb.ButlerLogBundle |
| 110 // be is the Bundle entry. |
| 111 be *logpb.ButlerLogBundle_Entry |
| 112 // path is the constructed path of the stream being processed. |
| 113 path types.StreamPath |
| 114 // le is the LogEntry in the bundle entry. |
| 115 le *logpb.LogEntry |
| 116 } |
| 117 |
| 118 // processLogStream processes an individual set of log messages belonging to the |
| 119 // same log stream. |
| 120 func (c *Collector) processLogStream(ctx context.Context, lw *logWork) error { |
| 121 if err := lw.be.Desc.Validate(true); err != nil { |
| 122 log.Errorf(log.SetError(ctx, err), "Invalid log stream descripto
r.") |
| 123 return nil |
| 124 } |
| 125 lw.path = types.StreamName(lw.be.Desc.Prefix).Join(types.StreamName(lw.b
e.Desc.Name)) |
| 126 ctx = log.SetField(ctx, "path", lw.path) |
| 127 |
| 128 if len(lw.be.Secret) == 0 { |
| 129 log.Errorf(ctx, "Missing secret.") |
| 130 return nil |
| 131 } |
| 132 |
| 133 // Fetch our cached/remote state. This will replace our state object wit
h the |
| 134 // fetched state, so any future calls will need to re-set the Secret val
ue. |
| 135 // TODO: Use timeout? |
| 136 state, err := c.Coordinator.RegisterStream(ctx, &coordinator.LogStreamSt
ate{ |
| 137 Path: lw.path, |
| 138 Secret: types.StreamSecret(lw.be.Secret), |
| 139 ProtoVersion: lw.md.ProtoVersion, |
| 140 }, lw.be.Desc) |
| 141 if err != nil { |
| 142 log.WithError(err).Errorf(ctx, "Failed to get/register current s
tream state.") |
| 143 return err |
| 144 } |
| 145 |
| 146 // Does the log stream's secret match the expected secret? |
| 147 if !bytes.Equal(lw.be.Secret, []byte(state.Secret)) { |
| 148 log.Errorf(log.SetFields(ctx, log.Fields{ |
| 149 "secret": lw.be.Secret, |
| 150 "expectedSecret": state.Secret, |
| 151 }), "Log entry has incorrect secret.") |
| 152 return nil |
| 153 } |
| 154 |
| 155 if state.Archived { |
| 156 log.Infof(ctx, "Skipping message bundle for archived stream.") |
| 157 return nil |
| 158 } |
| 159 if state.Purged { |
| 160 log.Infof(ctx, "Skipping message bundle for purged stream.") |
| 161 return nil |
| 162 } |
| 163 |
| 164 // Update our terminal index if we have one. |
| 165 // |
| 166 // Note that even if our cached value is marked terminal, we could have
failed |
| 167 // to push the terminal index to the Coordinator, so we will not refrain
from |
| 168 // pushing every terminal index encountered regardless of cache state. |
| 169 if lw.be.Terminal { |
| 170 tidx := types.MessageIndex(lw.be.TerminalIndex) |
| 171 log.Fields{ |
| 172 "value": tidx, |
| 173 }.Debugf(ctx, "Bundle includes a terminal index.") |
| 174 |
| 175 if state.TerminalIndex < 0 { |
| 176 state.TerminalIndex = tidx |
| 177 } else if state.TerminalIndex != tidx { |
| 178 log.Fields{ |
| 179 "cachedIndex": state.TerminalIndex, |
| 180 "bundleIndex": tidx, |
| 181 }.Warningf(ctx, "Cached terminal index disagrees with st
ate.") |
| 182 } |
| 183 } |
| 184 |
| 185 // In parallel, load the log entries into Storage. Throttle this with ou
r |
| 186 // ingest semaphore. |
| 187 return parallel.Run(c.Sem, func(taskC chan<- func() error) { |
| 188 for i, le := range lw.be.Logs { |
| 189 i, le := i, le |
| 190 |
| 191 // Store this LogEntry |
| 192 taskC <- func() error { |
| 193 if err := le.Validate(lw.be.Desc); err != nil { |
| 194 log.Fields{ |
| 195 log.ErrorKey: err, |
| 196 "index": i, |
| 197 }.Warningf(ctx, "Discarding invalid log
entry.") |
| 198 return nil |
| 199 } |
| 200 |
| 201 if state.TerminalIndex >= 0 && types.MessageInde
x(le.StreamIndex) > state.TerminalIndex { |
| 202 log.Fields{ |
| 203 "index": le.StreamIndex, |
| 204 "terminalIndex": state.TerminalI
ndex, |
| 205 }.Warningf(ctx, "Stream is terminated be
fore log entry; discarding.") |
| 206 return nil |
| 207 } |
| 208 |
| 209 lw := *lw |
| 210 lw.le = le |
| 211 return c.processLogEntry(ctx, &lw) |
| 212 } |
| 213 } |
| 214 |
| 215 // If our bundle entry is terminal, we have an additional task o
f reporting |
| 216 // this to the Coordinator. |
| 217 if lw.be.Terminal { |
| 218 taskC <- func() error { |
| 219 // Sentinel task: Update the terminal bundle sta
te. |
| 220 state := *state |
| 221 state.TerminalIndex = types.MessageIndex(lw.be.T
erminalIndex) |
| 222 |
| 223 log.Fields{ |
| 224 "terminalIndex": state.TerminalIndex, |
| 225 }.Infof(ctx, "Received terminal log; updating Co
ordinator state.") |
| 226 |
| 227 if err := c.Coordinator.TerminateStream(ctx, &st
ate); err != nil { |
| 228 log.WithError(err).Errorf(ctx, "Failed t
o set stream terminal index.") |
| 229 return err |
| 230 } |
| 231 return nil |
| 232 } |
| 233 } |
| 234 }) |
| 235 } |
| 236 |
| 237 func (c *Collector) processLogEntry(ctx context.Context, lw *logWork) error { |
| 238 data, err := proto.Marshal(lw.le) |
| 239 if err != nil { |
| 240 log.WithError(err).Errorf(ctx, "Failed to marshal log entry.") |
| 241 return err |
| 242 } |
| 243 |
| 244 // Post the log to storage. |
| 245 err = c.Storage.Put(&storage.PutRequest{ |
| 246 Path: lw.path, |
| 247 Index: types.MessageIndex(lw.le.StreamIndex), |
| 248 Value: data, |
| 249 }) |
| 250 |
| 251 // If the log entry already exists, consider the "put" successful. |
| 252 // |
| 253 // All Storage errors are considered transient, as they are safe and |
| 254 // data-agnostic. |
| 255 if err != nil && err != storage.ErrExists { |
| 256 log.WithError(err).Errorf(ctx, "Failed to load log entry into St
orage.") |
| 257 return errors.WrapTransient(err) |
| 258 } |
| 259 return nil |
| 260 } |
| 261 |
| 262 // wrapMultiErrorTransient wraps an error in a TransientError wrapper. |
| 263 // |
| 264 // If the error is nil, it will return nil. If the error is already transient, |
| 265 // it will be directly returned. If the error is a MultiError, its sub-errors |
| 266 // will be evaluated and wrapped in a TransientError if any of its sub-errors |
| 267 // are transient errors. |
| 268 func hasTransientError(err error) bool { |
| 269 if merr, ok := err.(errors.MultiError); ok { |
| 270 for _, e := range merr { |
| 271 if hasTransientError(e) { |
| 272 return true |
| 273 } |
| 274 } |
| 275 return false |
| 276 } |
| 277 |
| 278 return errors.IsTransient(err) |
| 279 } |
| OLD | NEW |