| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package collector |
| 6 |
| 7 import ( |
| 8 "bytes" |
| 9 "time" |
| 10 |
| 11 "github.com/golang/protobuf/proto" |
| 12 "github.com/luci/luci-go/common/errors" |
| 13 "github.com/luci/luci-go/common/logdog/butlerproto" |
| 14 "github.com/luci/luci-go/common/logdog/types" |
| 15 log "github.com/luci/luci-go/common/logging" |
| 16 "github.com/luci/luci-go/common/parallel" |
| 17 "github.com/luci/luci-go/common/proto/logdog/logpb" |
| 18 cc "github.com/luci/luci-go/server/internal/logdog/coordinatorClient" |
| 19 "github.com/luci/luci-go/server/logdog/storage" |
| 20 "golang.org/x/net/context" |
| 21 ) |
| 22 |
| 23 const ( |
| 24 // DefaultStreamStateCacheExpire is the default expiration value. |
| 25 DefaultStreamStateCacheExpire = 10 * time.Minute |
| 26 ) |
| 27 |
| 28 // Options is the set of options to use for collection. |
| 29 type Options struct { |
| 30 // Coordinator is the coordinator service. |
| 31 Coordinator CoordinatorClient |
| 32 // Storage is the backing store to use. |
| 33 Storage storage.Storage |
| 34 // Sem is a semaphore used to throttle the number of simultaneous ingest |
| 35 // tasks that are executed. |
| 36 Sem parallel.Semaphore |
| 37 |
| 38 // StreamStateCacheExpire is the maximum amount of time that a cahced st
ream |
| 39 // state entry is valid. If zero, DefaultStreamStateCacheExpire will be
used. |
| 40 StreamStateCacheExpire time.Duration |
| 41 } |
| 42 |
| 43 // Collector is a stateful collection service. |
| 44 type Collector interface { |
| 45 // Process ingests an encoded ButlerLogBundle message. It is goroutine-s
afe, |
| 46 // but may throttle based on the configured semaphore. |
| 47 // |
| 48 // If a transient error occurs during ingest, Process will return an |
| 49 // errors.Transient error. |
| 50 Process(context.Context, []byte) error |
| 51 } |
| 52 |
| 53 // collector is an implementation of Collector. |
| 54 type collector struct { |
| 55 *Options |
| 56 |
| 57 // streamState is the stream state lookup and caching. |
| 58 streamState *streamStateCache |
| 59 } |
| 60 |
| 61 // New instantiates a new Collector instance. |
| 62 func New(o Options) Collector { |
| 63 return &collector{ |
| 64 Options: &o, |
| 65 streamState: newStreamStateCache(streamStateCacheOptions{ |
| 66 coordinator: o.Coordinator, |
| 67 expiration: o.StreamStateCacheExpire, |
| 68 }), |
| 69 } |
| 70 } |
| 71 |
| 72 // Process ingests an encoded ButlerLogBundle message. |
| 73 // |
| 74 // If a transient error occurs during ingest, Process will return an error. |
| 75 // If no error occurred, or if there was an error with the input data, no error |
| 76 // will be returned. |
| 77 func (c *collector) Process(ctx context.Context, msg []byte) error { |
| 78 pr := butlerproto.Reader{} |
| 79 if err := pr.Read(bytes.NewReader(msg)); err != nil { |
| 80 log.Errorf(log.SetError(ctx, err), "Failed to unpack message.") |
| 81 return nil |
| 82 } |
| 83 if pr.Metadata.ProtoVersion != logpb.Version { |
| 84 log.Fields{ |
| 85 "messageProtoVersion": pr.Metadata.ProtoVersion, |
| 86 "currentProtoVersion": logpb.Version, |
| 87 }.Errorf(ctx, "Unknown protobuf version.") |
| 88 return nil |
| 89 } |
| 90 if pr.Bundle == nil { |
| 91 log.Errorf(ctx, "Protocol message did not contain a Butler bundl
e.") |
| 92 return nil |
| 93 } |
| 94 |
| 95 // Handle each individual stream in parallel. |
| 96 if len(pr.Bundle.Entries) == 0 { |
| 97 return nil |
| 98 } |
| 99 |
| 100 lw := logWork{ |
| 101 md: pr.Metadata, |
| 102 b: pr.Bundle, |
| 103 } |
| 104 |
| 105 // Handle each bundle entry in parallel. We will use a separate work poo
l |
| 106 // here so that top-level bundle dispatch can't deadlock the processing
tasks. |
| 107 // |
| 108 // If we don't have a semaphore, this will also be unbounded, since cap(
nil) |
| 109 // is 0. |
| 110 err := parallel.WorkPool(cap(c.Sem), func(taskC chan<- func() error) { |
| 111 for _, be := range pr.Bundle.Entries { |
| 112 lw := lw |
| 113 lw.be = be |
| 114 taskC <- func() error { |
| 115 return c.processLogStream(ctx, &lw) |
| 116 } |
| 117 } |
| 118 }) |
| 119 if err != nil { |
| 120 if hasTransientError(err) { |
| 121 log.Fields{ |
| 122 log.ErrorKey: err, |
| 123 }.Warningf(ctx, "Transient error encountered during proc
essing.") |
| 124 return err |
| 125 } |
| 126 |
| 127 log.Fields{ |
| 128 log.ErrorKey: err, |
| 129 }.Errorf(ctx, "Non-transient error encountered during processing
; discarding.") |
| 130 } |
| 131 return nil |
| 132 } |
| 133 |
| 134 // logWork is a cumulative set of read-only state passed around by value for log |
| 135 // processing. |
| 136 type logWork struct { |
| 137 // md is the metadata associated with the overall message. |
| 138 md *logpb.ButlerMetadata |
| 139 // b is the Butler bundle. |
| 140 b *logpb.ButlerLogBundle |
| 141 // be is the Bundle entry. |
| 142 be *logpb.ButlerLogBundle_Entry |
| 143 // path is the constructed path of the stream being processed. |
| 144 path types.StreamPath |
| 145 // le is the LogEntry in the bundle entry. |
| 146 le *logpb.LogEntry |
| 147 } |
| 148 |
| 149 // processLogStream processes an individual set of log messages belonging to the |
| 150 // same log stream. |
| 151 func (c *collector) processLogStream(ctx context.Context, lw *logWork) error { |
| 152 if err := lw.be.Desc.Validate(true); err != nil { |
| 153 log.Errorf(log.SetError(ctx, err), "Invalid log stream descripto
r.") |
| 154 return nil |
| 155 } |
| 156 lw.path = types.StreamName(lw.be.Desc.Prefix).Join(types.StreamName(lw.b
e.Desc.Name)) |
| 157 ctx = log.SetField(ctx, "path", lw.path) |
| 158 |
| 159 if len(lw.be.Secret) == 0 { |
| 160 log.Errorf(ctx, "Missing secret.") |
| 161 return nil |
| 162 } |
| 163 |
| 164 // Fetch our cached/remote state. This will replace our state object wit
h the |
| 165 // fetched state, so any future calls will need to re-set the Secret val
ue. |
| 166 // TODO: Use timeout? |
| 167 state, err := c.streamState.getOrRegister(ctx, &cc.State{ |
| 168 Path: lw.path, |
| 169 Secret: types.StreamSecret(lw.be.Secret), |
| 170 ProtoVersion: lw.md.ProtoVersion, |
| 171 Descriptor: lw.be.Desc, |
| 172 }) |
| 173 if err != nil { |
| 174 log.WithError(err).Errorf(ctx, "Failed to get/register current s
tream state.") |
| 175 return err |
| 176 } |
| 177 |
| 178 // Does the log stream's secret match the expected secret? |
| 179 if !bytes.Equal(lw.be.Secret, []byte(state.secret)) { |
| 180 log.Errorf(log.SetFields(ctx, log.Fields{ |
| 181 "secret": lw.be.Secret, |
| 182 "expectedSecret": state.secret, |
| 183 }), "Log entry has incorrect secret.") |
| 184 return nil |
| 185 } |
| 186 |
| 187 if state.archived { |
| 188 log.Infof(ctx, "Skipping message bundle for archived stream.") |
| 189 return nil |
| 190 } |
| 191 if state.purged { |
| 192 log.Infof(ctx, "Skipping message bundle for purged stream.") |
| 193 return nil |
| 194 } |
| 195 |
| 196 // Update our terminal index if we have one. |
| 197 // |
| 198 // Note that even if our cached value is marked terminal, we could have
failed |
| 199 // to push the terminal index to the Coordinator, so we will not refrain
from |
| 200 // pushing every terminal index encountered regardless of cache state. |
| 201 if lw.be.Terminal { |
| 202 tidx := types.MessageIndex(lw.be.TerminalIndex) |
| 203 log.Fields{ |
| 204 "value": tidx, |
| 205 }.Debugf(ctx, "Bundle includes a terminal index.") |
| 206 |
| 207 if state.terminalIndex < 0 { |
| 208 state.terminalIndex = tidx |
| 209 } else if state.terminalIndex != tidx { |
| 210 log.Fields{ |
| 211 "cachedIndex": state.terminalIndex, |
| 212 "bundleIndex": tidx, |
| 213 }.Warningf(ctx, "Cached terminal index disagrees with st
ate.") |
| 214 } |
| 215 } |
| 216 |
| 217 // In parallel, load the log entries into Storage. Throttle this with ou
r |
| 218 // ingest semaphore. |
| 219 return parallel.Run(c.Sem, func(taskC chan<- func() error) { |
| 220 for i, le := range lw.be.Logs { |
| 221 i, le := i, le |
| 222 |
| 223 // Store this LogEntry |
| 224 taskC <- func() error { |
| 225 if err := le.Validate(lw.be.Desc); err != nil { |
| 226 log.Fields{ |
| 227 log.ErrorKey: err, |
| 228 "index": i, |
| 229 }.Warningf(ctx, "Discarding invalid log
entry.") |
| 230 return nil |
| 231 } |
| 232 |
| 233 if state.terminalIndex >= 0 && types.MessageInde
x(le.StreamIndex) > state.terminalIndex { |
| 234 log.Fields{ |
| 235 "index": le.StreamIndex, |
| 236 "terminalIndex": state.terminalI
ndex, |
| 237 }.Warningf(ctx, "Stream is terminated be
fore log entry; discarding.") |
| 238 return nil |
| 239 } |
| 240 |
| 241 lw := *lw |
| 242 lw.le = le |
| 243 return c.processLogEntry(ctx, &lw) |
| 244 } |
| 245 } |
| 246 |
| 247 // If our bundle entry is terminal, we have an additional task o
f reporting |
| 248 // this to the Coordinator. |
| 249 if lw.be.Terminal { |
| 250 taskC <- func() error { |
| 251 // Sentinel task: Update the terminal bundle sta
te. |
| 252 state := *state |
| 253 state.terminalIndex = types.MessageIndex(lw.be.T
erminalIndex) |
| 254 |
| 255 log.Fields{ |
| 256 "terminalIndex": state.terminalIndex, |
| 257 }.Infof(ctx, "Received terminal log; updating Co
ordinator state.") |
| 258 state.secret = types.StreamSecret(lw.be.Secret) |
| 259 |
| 260 if err := c.streamState.setTerminalIndex(ctx, &s
tate); err != nil { |
| 261 log.WithError(err).Errorf(ctx, "Failed t
o set stream terminal index.") |
| 262 return err |
| 263 } |
| 264 return nil |
| 265 } |
| 266 } |
| 267 }) |
| 268 } |
| 269 |
| 270 func (c *collector) processLogEntry(ctx context.Context, lw *logWork) error { |
| 271 data, err := proto.Marshal(lw.le) |
| 272 if err != nil { |
| 273 log.WithError(err).Errorf(ctx, "Failed to marshal log entry.") |
| 274 return err |
| 275 } |
| 276 |
| 277 // Post the log to storage. |
| 278 err = c.Storage.Put(&storage.PutRequest{ |
| 279 Path: lw.path, |
| 280 Index: types.MessageIndex(lw.le.StreamIndex), |
| 281 Value: data, |
| 282 }) |
| 283 // If the log entry already exists, consider the "put" successful. |
| 284 if err != nil && err != storage.ErrExists { |
| 285 log.WithError(err).Errorf(ctx, "Failed to load log entry into St
orage.") |
| 286 return err |
| 287 } |
| 288 return nil |
| 289 } |
| 290 |
| 291 // wrapMultiErrorTransient wraps an error in a TransientError wrapper. |
| 292 // |
| 293 // If the error is nil, it will return nil. If the error is already transient, |
| 294 // it will be directly returned. If the error is a MultiError, its sub-errors |
| 295 // will be evaluated and wrapped in a TransientError if any of its sub-errors |
| 296 // are transient errors. |
| 297 func hasTransientError(err error) bool { |
| 298 if merr, ok := err.(errors.MultiError); ok { |
| 299 for _, e := range merr { |
| 300 if hasTransientError(e) { |
| 301 return true |
| 302 } |
| 303 } |
| 304 return false |
| 305 } |
| 306 |
| 307 return errors.IsTransient(err) |
| 308 } |
| OLD | NEW |