Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1864)

Unified Diff: server/internal/logdog/collector/collector.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: server/internal/logdog/collector/collector.go
diff --git a/server/internal/logdog/collector/collector.go b/server/internal/logdog/collector/collector.go
new file mode 100644
index 0000000000000000000000000000000000000000..77aab6a8d826ca155dd26bf6fad2628b24acd99f
--- /dev/null
+++ b/server/internal/logdog/collector/collector.go
@@ -0,0 +1,308 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
dnj (Google) 2016/01/21 04:36:25 Main file for review.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package collector
+
+import (
+ "bytes"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+ "github.com/luci/luci-go/common/errors"
+ "github.com/luci/luci-go/common/logdog/butlerproto"
+ "github.com/luci/luci-go/common/logdog/protocol"
+ "github.com/luci/luci-go/common/logdog/types"
+ log "github.com/luci/luci-go/common/logging"
+ "github.com/luci/luci-go/common/parallel"
+ cc "github.com/luci/luci-go/server/internal/logdog/coordinatorClient"
+ "github.com/luci/luci-go/server/logdog/storage"
+ "golang.org/x/net/context"
+)
+
+const (
+ // DefaultStreamStateCacheExpire is the default expiration value.
+ DefaultStreamStateCacheExpire = 10 * time.Minute
+)
+
+// Options is the set of options to use for collection.
+type Options struct {
+ // Coordinator is the coordinator service.
+ Coordinator CoordinatorClient
+ // Storage is the backing store to use.
+ Storage storage.Storage
+ // Sem is a semaphore used to throttle the number of simultaneous ingest
+ // tasks that are executed.
+ Sem parallel.Semaphore
+
+ // StreamStateCacheExpire is the maximum amount of time that a cahced stream
+ // state entry is valid. If zero, DefaultStreamStateCacheExpire will be used.
+ StreamStateCacheExpire time.Duration
+}
+
+// Collector is a stateful collection service.
+type Collector interface {
+ // Process ingests an encoded ButlerLogBundle message. It is goroutine-safe,
+ // but may throttle based on the configured semaphore.
+ //
+ // If a transient error occurs during ingest, Process will return an
+ // errors.Transient error.
+ Process(context.Context, []byte) error
+}
+
+// collector is an implementation of Collector.
+type collector struct {
+ *Options
+
+ // streamState is the stream state lookup and caching.
+ streamState *streamStateCache
+}
+
+// New instantiates a new Collector instance.
+func New(o Options) Collector {
+ return &collector{
+ Options: &o,
+ streamState: newStreamStateCache(streamStateCacheOptions{
+ coordinator: o.Coordinator,
+ expiration: o.StreamStateCacheExpire,
+ }),
+ }
+}
+
+// Process ingests an encoded ButlerLogBundle message.
+//
+// If a transient error occurs during ingest, Process will return an error.
+// If no error occurred, or if there was an error with the input data, no error
+// will be returned.
+func (c *collector) Process(ctx context.Context, msg []byte) error {
+ pr := butlerproto.Reader{}
+ if err := pr.Read(bytes.NewReader(msg)); err != nil {
+ log.Errorf(log.SetError(ctx, err), "Failed to unpack message.")
+ return nil
+ }
+ if pr.Metadata.ProtoVersion != protocol.Version {
+ log.Fields{
+ "messageProtoVersion": pr.Metadata.ProtoVersion,
+ "currentProtoVersion": protocol.Version,
+ }.Errorf(ctx, "Unknown protobuf version.")
+ return nil
+ }
+ if pr.Bundle == nil {
+ log.Errorf(ctx, "Protocol message did not contain a Butler bundle.")
+ return nil
+ }
+
+ // Handle each individual stream in parallel.
+ if len(pr.Bundle.Entries) == 0 {
+ return nil
+ }
+
+ lw := logWork{
+ md: pr.Metadata,
+ b: pr.Bundle,
+ }
+
+ // Handle each bundle entry in parallel. We will use a separate work pool
+ // here so that top-level bundle dispatch can't deadlock the processing tasks.
+ //
+ // If we don't have a semaphore, this will also be unbounded, since cap(nil)
+ // is 0.
+ err := parallel.WorkPool(cap(c.Sem), func(taskC chan<- func() error) {
+ for _, be := range pr.Bundle.Entries {
+ lw := lw
+ lw.be = be
+ taskC <- func() error {
+ return c.processLogStream(ctx, &lw)
+ }
+ }
+ })
+ if err != nil {
+ if hasTransientError(err) {
+ log.Fields{
+ log.ErrorKey: err,
+ }.Warningf(ctx, "Transient error encountered during processing.")
+ return err
+ }
+
+ log.Fields{
+ log.ErrorKey: err,
+ }.Errorf(ctx, "Non-transient error encountered during processing; discarding.")
+ }
+ return nil
+}
+
+// logWork is a cumulative set of read-only state passed around by value for log
+// processing.
+type logWork struct {
+ // md is the metadata associated with the overall message.
+ md *protocol.ButlerMetadata
+ // b is the Butler bundle.
+ b *protocol.ButlerLogBundle
+ // be is the Bundle entry.
+ be *protocol.ButlerLogBundle_Entry
+ // path is the constructed path of the stream being processed.
+ path types.StreamPath
+ // le is the LogEntry in the bundle entry.
+ le *protocol.LogEntry
+}
+
+// processLogStream processes an individual set of log messages belonging to the
+// same log stream.
+func (c *collector) processLogStream(ctx context.Context, lw *logWork) error {
+ if err := lw.be.Desc.Validate(true); err != nil {
+ log.Errorf(log.SetError(ctx, err), "Invalid log stream descriptor.")
+ return nil
+ }
+ lw.path = types.StreamName(lw.be.Desc.Prefix).Join(types.StreamName(lw.be.Desc.Name))
+ ctx = log.SetField(ctx, "path", lw.path)
+
+ if len(lw.be.Secret) == 0 {
+ log.Errorf(ctx, "Missing secret.")
+ return nil
+ }
+
+ // Fetch our cached/remote state. This will replace our state object with the
+ // fetched state, so any future calls will need to re-set the Secret value.
+ // TODO: Use timeout?
+ state, err := c.streamState.getOrRegister(ctx, &cc.State{
+ Path: lw.path,
+ Secret: types.StreamSecret(lw.be.Secret),
+ ProtoVersion: lw.md.ProtoVersion,
+ Descriptor: lw.be.Desc,
+ })
+ if err != nil {
+ log.WithError(err).Errorf(ctx, "Failed to get/register current stream state.")
+ return err
+ }
+
+ // Does the log stream's secret match the expected secret?
+ if !bytes.Equal(lw.be.Secret, []byte(state.secret)) {
+ log.Errorf(log.SetFields(ctx, log.Fields{
+ "secret": lw.be.Secret,
+ "expectedSecret": state.secret,
+ }), "Log entry has incorrect secret.")
+ return nil
+ }
+
+ if state.archived {
+ log.Infof(ctx, "Skipping message bundle for archived stream.")
+ return nil
+ }
+ if state.purged {
+ log.Infof(ctx, "Skipping message bundle for purged stream.")
+ return nil
+ }
+
+ // Update our terminal index if we have one.
+ //
+ // Note that even if our cached value is marked terminal, we could have failed
+ // to push the terminal index to the Coordinator, so we will not refrain from
+ // pushing every terminal index encountered regardless of cache state.
+ if lw.be.Terminal {
+ tidx := types.MessageIndex(lw.be.TerminalIndex)
+ log.Fields{
+ "value": tidx,
+ }.Debugf(ctx, "Bundle includes a terminal index.")
+
+ if state.terminalIndex < 0 {
+ state.terminalIndex = tidx
+ } else if state.terminalIndex != tidx {
+ log.Fields{
+ "cachedIndex": state.terminalIndex,
+ "bundleIndex": tidx,
+ }.Warningf(ctx, "Cached terminal index disagrees with state.")
+ }
+ }
+
+ // In parallel, load the log entries into Storage. Throttle this with our
+ // ingest semaphore.
+ return parallel.Run(c.Sem, func(taskC chan<- func() error) {
+ for i, le := range lw.be.Logs {
+ i, le := i, le
+
+ // Store this LogEntry
+ taskC <- func() error {
+ if err := le.Validate(lw.be.Desc); err != nil {
+ log.Fields{
+ log.ErrorKey: err,
+ "index": i,
+ }.Warningf(ctx, "Discarding invalid log entry.")
+ return nil
+ }
+
+ if state.terminalIndex >= 0 && types.MessageIndex(le.StreamIndex) > state.terminalIndex {
+ log.Fields{
+ "index": le.StreamIndex,
+ "terminalIndex": state.terminalIndex,
+ }.Warningf(ctx, "Stream is terminated before log entry; discarding.")
+ return nil
+ }
+
+ lw := *lw
+ lw.le = le
+ return c.processLogEntry(ctx, &lw)
+ }
+ }
+
+ // If our bundle entry is terminal, we have an additional task of reporting
+ // this to the Coordinator.
+ if lw.be.Terminal {
+ taskC <- func() error {
+ // Sentinel task: Update the terminal bundle state.
+ state := *state
+ state.terminalIndex = types.MessageIndex(lw.be.TerminalIndex)
+
+ log.Fields{
+ "terminalIndex": state.terminalIndex,
+ }.Infof(ctx, "Received terminal log; updating Coordinator state.")
+ state.secret = types.StreamSecret(lw.be.Secret)
+
+ if err := c.streamState.setTerminalIndex(ctx, &state); err != nil {
+ log.WithError(err).Errorf(ctx, "Failed to set stream terminal index.")
+ return err
+ }
+ return nil
+ }
+ }
+ })
+}
+
+func (c *collector) processLogEntry(ctx context.Context, lw *logWork) error {
+ data, err := proto.Marshal(lw.le)
+ if err != nil {
+ log.WithError(err).Errorf(ctx, "Failed to marshal log entry.")
+ return err
+ }
+
+ // Post the log to storage.
+ err = c.Storage.Put(&storage.PutRequest{
+ Path: lw.path,
+ Index: types.MessageIndex(lw.le.StreamIndex),
+ Value: data,
+ })
+ // If the log entry already exists, consider the "put" successful.
+ if err != nil && err != storage.ErrExists {
+ log.WithError(err).Errorf(ctx, "Failed to load log entry into Storage.")
+ return err
+ }
+ return nil
+}
+
+// wrapMultiErrorTransient wraps an error in a TransientError wrapper.
+//
+// If the error is nil, it will return nil. If the error is already transient,
+// it will be directly returned. If the error is a MultiError, its sub-errors
+// will be evaluated and wrapped in a TransientError if any of its sub-errors
+// are transient errors.
+func hasTransientError(err error) bool {
+ if merr, ok := err.(errors.MultiError); ok {
+ for _, e := range merr {
+ if hasTransientError(e) {
+ return true
+ }
+ }
+ return false
+ }
+
+ return errors.IsTransient(err)
+}

Powered by Google App Engine
This is Rietveld 408576698