Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package collector | |
| 6 | |
| 7 import ( | |
| 8 "bytes" | |
| 9 "time" | |
| 10 | |
| 11 "github.com/golang/protobuf/proto" | |
| 12 "github.com/luci/luci-go/common/errors" | |
| 13 "github.com/luci/luci-go/common/logdog/butlerproto" | |
| 14 "github.com/luci/luci-go/common/logdog/types" | |
| 15 log "github.com/luci/luci-go/common/logging" | |
| 16 "github.com/luci/luci-go/common/parallel" | |
| 17 "github.com/luci/luci-go/common/proto/logdog/logpb" | |
| 18 cc "github.com/luci/luci-go/server/internal/logdog/coordinatorClient" | |
| 19 "github.com/luci/luci-go/server/logdog/storage" | |
| 20 "golang.org/x/net/context" | |
| 21 ) | |
| 22 | |
| 23 const ( | |
| 24 // DefaultStreamStateCacheExpire is the default expiration value. | |
| 25 DefaultStreamStateCacheExpire = 10 * time.Minute | |
| 26 ) | |
| 27 | |
| 28 // Options is the set of options to use for collection. | |
| 29 type Options struct { | |
| 30 // Coordinator is the coordinator service. | |
| 31 Coordinator CoordinatorClient | |
| 32 // Storage is the backing store to use. | |
| 33 Storage storage.Storage | |
| 34 // Sem is a semaphore used to throttle the number of simultaneous ingest | |
| 35 // tasks that are executed. | |
| 36 Sem parallel.Semaphore | |
| 37 | |
| 38 // StreamStateCacheExpire is the maximum amount of time that a cahced st ream | |
|
martiniss
2016/01/27 22:19:43
typo
dnj
2016/01/29 20:46:51
Done.
| |
| 39 // state entry is valid. If zero, DefaultStreamStateCacheExpire will be used. | |
| 40 StreamStateCacheExpire time.Duration | |
| 41 } | |
| 42 | |
| 43 // Collector is a stateful collection service. | |
| 44 type Collector interface { | |
| 45 // Process ingests an encoded ButlerLogBundle message. It is goroutine-s afe, | |
|
martiniss
2016/01/27 22:19:43
What does ingest mean? Maybe I just don't know the
dnj
2016/01/29 20:46:51
Ingest as in like, eating something.
| |
| 46 // but may throttle based on the configured semaphore. | |
| 47 // | |
| 48 // If a transient error occurs during ingest, Process will return an | |
| 49 // errors.Transient error. | |
| 50 Process(context.Context, []byte) error | |
| 51 } | |
| 52 | |
| 53 // collector is an implementation of Collector. | |
| 54 type collector struct { | |
| 55 *Options | |
| 56 | |
| 57 // streamState is the stream state lookup and caching. | |
| 58 streamState *streamStateCache | |
| 59 } | |
| 60 | |
| 61 // New instantiates a new Collector instance. | |
| 62 func New(o Options) Collector { | |
| 63 return &collector{ | |
| 64 Options: &o, | |
| 65 streamState: newStreamStateCache(streamStateCacheOptions{ | |
| 66 coordinator: o.Coordinator, | |
| 67 expiration: o.StreamStateCacheExpire, | |
| 68 }), | |
| 69 } | |
| 70 } | |
| 71 | |
| 72 // Process ingests an encoded ButlerLogBundle message. | |
|
martiniss
2016/01/27 22:19:43
Why not just say it implements the interface?
dnj
2016/01/29 20:46:52
Done.
| |
| 73 // | |
| 74 // If a transient error occurs during ingest, Process will return an error. | |
| 75 // If no error occurred, or if there was an error with the input data, no error | |
| 76 // will be returned. | |
| 77 func (c *collector) Process(ctx context.Context, msg []byte) error { | |
| 78 pr := butlerproto.Reader{} | |
| 79 if err := pr.Read(bytes.NewReader(msg)); err != nil { | |
| 80 log.Errorf(log.SetError(ctx, err), "Failed to unpack message.") | |
| 81 return nil | |
| 82 } | |
| 83 if pr.Metadata.ProtoVersion != logpb.Version { | |
| 84 log.Fields{ | |
| 85 "messageProtoVersion": pr.Metadata.ProtoVersion, | |
| 86 "currentProtoVersion": logpb.Version, | |
| 87 }.Errorf(ctx, "Unknown protobuf version.") | |
| 88 return nil | |
| 89 } | |
| 90 if pr.Bundle == nil { | |
| 91 log.Errorf(ctx, "Protocol message did not contain a Butler bundl e.") | |
| 92 return nil | |
| 93 } | |
| 94 | |
| 95 // Handle each individual stream in parallel. | |
|
martiniss
2016/01/27 22:19:43
This doesn't seem related to the next line of code
iannucci
2016/01/28 01:15:35
duplicate comment w/ below?
dnj
2016/01/29 20:46:51
Done.
| |
| 96 if len(pr.Bundle.Entries) == 0 { | |
| 97 return nil | |
| 98 } | |
| 99 | |
| 100 lw := logWork{ | |
| 101 md: pr.Metadata, | |
| 102 b: pr.Bundle, | |
| 103 } | |
| 104 | |
| 105 // Handle each bundle entry in parallel. We will use a separate work poo l | |
| 106 // here so that top-level bundle dispatch can't deadlock the processing tasks. | |
| 107 // | |
| 108 // If we don't have a semaphore, this will also be unbounded, since cap( nil) | |
| 109 // is 0. | |
| 110 err := parallel.WorkPool(cap(c.Sem), func(taskC chan<- func() error) { | |
| 111 for _, be := range pr.Bundle.Entries { | |
| 112 lw := lw | |
| 113 lw.be = be | |
| 114 taskC <- func() error { | |
| 115 return c.processLogStream(ctx, &lw) | |
| 116 } | |
| 117 } | |
| 118 }) | |
| 119 if err != nil { | |
| 120 if hasTransientError(err) { | |
| 121 log.Fields{ | |
| 122 log.ErrorKey: err, | |
| 123 }.Warningf(ctx, "Transient error encountered during proc essing.") | |
| 124 return err | |
| 125 } | |
| 126 | |
| 127 log.Fields{ | |
| 128 log.ErrorKey: err, | |
| 129 }.Errorf(ctx, "Non-transient error encountered during processing ; discarding.") | |
| 130 } | |
| 131 return nil | |
|
martiniss
2016/01/27 22:19:43
wait why don't we return an error if there was one
dnj
2016/01/29 20:46:52
I actually mixed transient and non-transient code
| |
| 132 } | |
| 133 | |
| 134 // logWork is a cumulative set of read-only state passed around by value for log | |
| 135 // processing. | |
| 136 type logWork struct { | |
| 137 // md is the metadata associated with the overall message. | |
| 138 md *logpb.ButlerMetadata | |
| 139 // b is the Butler bundle. | |
| 140 b *logpb.ButlerLogBundle | |
| 141 // be is the Bundle entry. | |
| 142 be *logpb.ButlerLogBundle_Entry | |
| 143 // path is the constructed path of the stream being processed. | |
| 144 path types.StreamPath | |
| 145 // le is the LogEntry in the bundle entry. | |
| 146 le *logpb.LogEntry | |
| 147 } | |
| 148 | |
| 149 // processLogStream processes an individual set of log messages belonging to the | |
| 150 // same log stream. | |
| 151 func (c *collector) processLogStream(ctx context.Context, lw *logWork) error { | |
| 152 if err := lw.be.Desc.Validate(true); err != nil { | |
| 153 log.Errorf(log.SetError(ctx, err), "Invalid log stream descripto r.") | |
| 154 return nil | |
| 155 } | |
| 156 lw.path = types.StreamName(lw.be.Desc.Prefix).Join(types.StreamName(lw.b e.Desc.Name)) | |
| 157 ctx = log.SetField(ctx, "path", lw.path) | |
| 158 | |
| 159 if len(lw.be.Secret) == 0 { | |
| 160 log.Errorf(ctx, "Missing secret.") | |
| 161 return nil | |
| 162 } | |
| 163 | |
| 164 // Fetch our cached/remote state. This will replace our state object wit h the | |
| 165 // fetched state, so any future calls will need to re-set the Secret val ue. | |
| 166 // TODO: Use timeout? | |
| 167 state, err := c.streamState.getOrRegister(ctx, &cc.State{ | |
| 168 Path: lw.path, | |
| 169 Secret: types.StreamSecret(lw.be.Secret), | |
| 170 ProtoVersion: lw.md.ProtoVersion, | |
| 171 Descriptor: lw.be.Desc, | |
| 172 }) | |
| 173 if err != nil { | |
| 174 log.WithError(err).Errorf(ctx, "Failed to get/register current s tream state.") | |
| 175 return err | |
| 176 } | |
| 177 | |
| 178 // Does the log stream's secret match the expected secret? | |
| 179 if !bytes.Equal(lw.be.Secret, []byte(state.secret)) { | |
| 180 log.Errorf(log.SetFields(ctx, log.Fields{ | |
| 181 "secret": lw.be.Secret, | |
| 182 "expectedSecret": state.secret, | |
| 183 }), "Log entry has incorrect secret.") | |
| 184 return nil | |
| 185 } | |
| 186 | |
| 187 if state.archived { | |
| 188 log.Infof(ctx, "Skipping message bundle for archived stream.") | |
| 189 return nil | |
| 190 } | |
| 191 if state.purged { | |
| 192 log.Infof(ctx, "Skipping message bundle for purged stream.") | |
| 193 return nil | |
| 194 } | |
| 195 | |
| 196 // Update our terminal index if we have one. | |
| 197 // | |
| 198 // Note that even if our cached value is marked terminal, we could have failed | |
| 199 // to push the terminal index to the Coordinator, so we will not refrain from | |
| 200 // pushing every terminal index encountered regardless of cache state. | |
| 201 if lw.be.Terminal { | |
| 202 tidx := types.MessageIndex(lw.be.TerminalIndex) | |
| 203 log.Fields{ | |
| 204 "value": tidx, | |
| 205 }.Debugf(ctx, "Bundle includes a terminal index.") | |
| 206 | |
| 207 if state.terminalIndex < 0 { | |
| 208 state.terminalIndex = tidx | |
| 209 } else if state.terminalIndex != tidx { | |
| 210 log.Fields{ | |
| 211 "cachedIndex": state.terminalIndex, | |
| 212 "bundleIndex": tidx, | |
| 213 }.Warningf(ctx, "Cached terminal index disagrees with st ate.") | |
| 214 } | |
| 215 } | |
| 216 | |
| 217 // In parallel, load the log entries into Storage. Throttle this with ou r | |
| 218 // ingest semaphore. | |
| 219 return parallel.Run(c.Sem, func(taskC chan<- func() error) { | |
| 220 for i, le := range lw.be.Logs { | |
| 221 i, le := i, le | |
| 222 | |
| 223 // Store this LogEntry | |
| 224 taskC <- func() error { | |
| 225 if err := le.Validate(lw.be.Desc); err != nil { | |
| 226 log.Fields{ | |
| 227 log.ErrorKey: err, | |
| 228 "index": i, | |
| 229 }.Warningf(ctx, "Discarding invalid log entry.") | |
| 230 return nil | |
| 231 } | |
| 232 | |
| 233 if state.terminalIndex >= 0 && types.MessageInde x(le.StreamIndex) > state.terminalIndex { | |
| 234 log.Fields{ | |
| 235 "index": le.StreamIndex, | |
| 236 "terminalIndex": state.terminalI ndex, | |
| 237 }.Warningf(ctx, "Stream is terminated be fore log entry; discarding.") | |
| 238 return nil | |
| 239 } | |
| 240 | |
| 241 lw := *lw | |
| 242 lw.le = le | |
| 243 return c.processLogEntry(ctx, &lw) | |
| 244 } | |
| 245 } | |
| 246 | |
| 247 // If our bundle entry is terminal, we have an additional task o f reporting | |
| 248 // this to the Coordinator. | |
| 249 if lw.be.Terminal { | |
| 250 taskC <- func() error { | |
| 251 // Sentinel task: Update the terminal bundle sta te. | |
| 252 state := *state | |
| 253 state.terminalIndex = types.MessageIndex(lw.be.T erminalIndex) | |
| 254 | |
| 255 log.Fields{ | |
| 256 "terminalIndex": state.terminalIndex, | |
| 257 }.Infof(ctx, "Received terminal log; updating Co ordinator state.") | |
| 258 state.secret = types.StreamSecret(lw.be.Secret) | |
| 259 | |
| 260 if err := c.streamState.setTerminalIndex(ctx, &s tate); err != nil { | |
| 261 log.WithError(err).Errorf(ctx, "Failed t o set stream terminal index.") | |
| 262 return err | |
| 263 } | |
| 264 return nil | |
| 265 } | |
| 266 } | |
| 267 }) | |
| 268 } | |
| 269 | |
| 270 func (c *collector) processLogEntry(ctx context.Context, lw *logWork) error { | |
| 271 data, err := proto.Marshal(lw.le) | |
| 272 if err != nil { | |
| 273 log.WithError(err).Errorf(ctx, "Failed to marshal log entry.") | |
| 274 return err | |
| 275 } | |
| 276 | |
| 277 // Post the log to storage. | |
| 278 err = c.Storage.Put(&storage.PutRequest{ | |
| 279 Path: lw.path, | |
| 280 Index: types.MessageIndex(lw.le.StreamIndex), | |
| 281 Value: data, | |
| 282 }) | |
| 283 // If the log entry already exists, consider the "put" successful. | |
| 284 if err != nil && err != storage.ErrExists { | |
| 285 log.WithError(err).Errorf(ctx, "Failed to load log entry into St orage.") | |
| 286 return err | |
| 287 } | |
| 288 return nil | |
| 289 } | |
| 290 | |
| 291 // wrapMultiErrorTransient wraps an error in a TransientError wrapper. | |
| 292 // | |
| 293 // If the error is nil, it will return nil. If the error is already transient, | |
| 294 // it will be directly returned. If the error is a MultiError, its sub-errors | |
| 295 // will be evaluated and wrapped in a TransientError if any of its sub-errors | |
| 296 // are transient errors. | |
| 297 func hasTransientError(err error) bool { | |
| 298 if merr, ok := err.(errors.MultiError); ok { | |
| 299 for _, e := range merr { | |
| 300 if hasTransientError(e) { | |
| 301 return true | |
| 302 } | |
| 303 } | |
| 304 return false | |
| 305 } | |
| 306 | |
| 307 return errors.IsTransient(err) | |
| 308 } | |
| OLD | NEW |