Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(301)

Unified Diff: server/internal/logdog/collector/collector.go

Issue 1838803002: LogDog: BigTable batching schema. (Closed) Base URL: https://github.com/luci/luci-go@recordio-split
Patch Set: Minor comments and quality of code tweaks. Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: server/internal/logdog/collector/collector.go
diff --git a/server/internal/logdog/collector/collector.go b/server/internal/logdog/collector/collector.go
index a5dcb3f100a3d30419428e73a4c2122c50c42461..90d055278688ecebb61f36fd5f52fd5b9a23a7ad 100644
--- a/server/internal/logdog/collector/collector.go
+++ b/server/internal/logdog/collector/collector.go
@@ -6,7 +6,6 @@ package collector
import (
"bytes"
- "sync"
"time"
"github.com/golang/protobuf/proto"
@@ -21,6 +20,12 @@ import (
"golang.org/x/net/context"
)
+const (
+ // DefaultMaxMessageWorkers is the default number of concurrent worker
+ // goroutones to employ for a single message.
+ DefaultMaxMessageWorkers = 4
+)
+
// Collector is a stateful object responsible for ingesting LogDog logs,
// registering them with a Coordinator, and stowing them in short-term storage
// for streaming and processing.
@@ -34,40 +39,16 @@ type Collector struct {
// the stateCache sub-package) to avoid overwhelming the server.
Coordinator coordinator.Coordinator
- // Storage is the backing store to use.
+ // Storage is the intermediate storage instance to use.
Storage storage.Storage
// StreamStateCacheExpire is the maximum amount of time that a cached stream
// state entry is valid. If zero, DefaultStreamStateCacheExpire will be used.
StreamStateCacheExpire time.Duration
- // MaxParallelBundles is the maximum number of log entry bundles per message
- // to handle in parallel. If <= 0, no maximum will be applied.
- MaxParallelBundles int
- // MaxIngestWorkers is the maximum number of ingest worker goroutines that
- // will operate at a time. If <= 0, no maximum will be applied.
- MaxIngestWorkers int
-
- // initOnce is used to ensure that the Collector's internal state is
- // initialized at most once.
- initOnce sync.Once
- // runner is the Runner that will be used for ingest. It will be configured
- // based on the supplied MaxIngestWorkers parameter.
- //
- // Internally, runner must not be used by tasks that themselves use the
- // runner, else deadlock could occur.
- runner *parallel.Runner
-}
-
-// init initializes the operational state of the Collector. It must be called
-// internally at the beginning of any exported method that uses that state.
-func (c *Collector) init() {
- c.initOnce.Do(func() {
- c.runner = &parallel.Runner{
- Sustained: c.MaxIngestWorkers,
- Maximum: c.MaxIngestWorkers,
- }
- })
+ // MaxMessageWorkers is the maximum number of concurrent workers to employ
+ // for any given message. If <= 0, DefaultMaxMessageWorkers will be applied.
+ MaxMessageWorkers int
}
// Process ingests an encoded ButlerLogBundle message, registering it with
@@ -78,8 +59,6 @@ func (c *Collector) init() {
// If no error occurred, or if there was an error with the input data, no error
// will be returned.
func (c *Collector) Process(ctx context.Context, msg []byte) error {
- c.init()
-
pr := butlerproto.Reader{}
if err := pr.Read(bytes.NewReader(msg)); err != nil {
log.Errorf(log.SetError(ctx, err), "Failed to unpack message.")
@@ -122,26 +101,32 @@ func (c *Collector) Process(ctx context.Context, msg []byte) error {
return nil
}
- // Define our logWork template. This will be cloned for each ingested log
- // stream.
- lw := logWork{
- md: pr.Metadata,
- b: pr.Bundle,
+ lw := bundleHandler{
+ msg: msg,
+ md: pr.Metadata,
+ b: pr.Bundle,
}
// Handle each bundle entry in parallel. We will use a separate work pool
// here so that top-level bundle dispatch can't deadlock the processing tasks.
- err := parallel.WorkPool(c.MaxParallelBundles, func(taskC chan<- func() error) {
+ workers := c.MaxMessageWorkers
+ if workers <= 0 {
+ workers = DefaultMaxMessageWorkers
+ }
+ err := parallel.WorkPool(workers, func(taskC chan<- func() error) {
for _, be := range pr.Bundle.Entries {
- lw := lw
- lw.be = be
+ be := be
+
taskC <- func() error {
- return c.processLogStream(ctx, &lw)
+ return c.processLogStream(ctx, &bundleEntryHandler{
+ bundleHandler: &lw,
+ be: be,
+ })
}
}
})
if err != nil {
- if hasTransientError(err) && !errors.IsTransient(err) {
+ if !errors.IsTransient(err) && hasTransientError(err) {
// err has a nested transient error; propagate that to top.
err = errors.WrapTransient(err)
}
@@ -153,58 +138,100 @@ func (c *Collector) Process(ctx context.Context, msg []byte) error {
// Close releases any internal resources and blocks pending the completion of
// any outstanding operations. After Close, no new Process calls may be made.
func (c *Collector) Close() {
- c.init()
-
- c.runner.Close()
}
-// logWork is a cumulative set of read-only state passed around by value for log
-// processing.
-type logWork struct {
+// bundleHandler is a cumulative set of read-only state passed around by
+// value for log processing.
+type bundleHandler struct {
+ // msg is the original message bytes.
+ msg []byte
// md is the metadata associated with the overall message.
md *logpb.ButlerMetadata
// b is the Butler bundle.
b *logpb.ButlerLogBundle
+}
+
+type bundleEntryHandler struct {
+ *bundleHandler
+
// be is the Bundle entry.
be *logpb.ButlerLogBundle_Entry
// path is the constructed path of the stream being processed.
path types.StreamPath
- // le is the LogEntry in the bundle entry.
- le *logpb.LogEntry
}
// processLogStream processes an individual set of log messages belonging to the
// same log stream.
-func (c *Collector) processLogStream(ctx context.Context, lw *logWork) error {
- if err := lw.be.Desc.Validate(true); err != nil {
+func (c *Collector) processLogStream(ctx context.Context, h *bundleEntryHandler) error {
+ if err := h.be.Desc.Validate(true); err != nil {
log.Errorf(log.SetError(ctx, err), "Invalid log stream descriptor.")
- return nil
+ return err
}
- lw.path = types.StreamName(lw.be.Desc.Prefix).Join(types.StreamName(lw.be.Desc.Name))
- ctx = log.SetField(ctx, "path", lw.path)
+ h.path = types.StreamName(h.be.Desc.Prefix).Join(types.StreamName(h.be.Desc.Name))
+ ctx = log.SetField(ctx, "path", h.path)
- if len(lw.be.Secret) == 0 {
+ if len(h.be.Secret) == 0 {
log.Errorf(ctx, "Missing secret.")
- return nil
+ return errors.New("missing stream secret")
+ }
+
+ // Confirm that the log entries are valid and contiguous. Serialize the log
+ // entries for ingest as we validate them.
+ var logData [][]byte
+ var blockIndex uint64
+ if logs := h.be.Logs; len(logs) > 0 {
+ logData = make([][]byte, len(logs))
+ blockIndex = logs[0].StreamIndex
+
+ for i, le := range logs {
+ // Validate this log entry.
+ if err := le.Validate(h.be.Desc); err != nil {
+ log.Fields{
+ log.ErrorKey: err,
+ "index": le.StreamIndex,
+ }.Warningf(ctx, "Discarding invalid log entry.")
+ return errors.New("invalid log entry")
+ }
+
+ // Validate that this entry is contiguous.
+ if le.StreamIndex != blockIndex+uint64(i) {
+ log.Fields{
+ "index": i,
+ "expected": (blockIndex + uint64(i)),
+ "actual": le.StreamIndex,
+ }.Errorf(ctx, "Non-contiguous log entry block in stream.")
+ return errors.New("non-contiguous log entry block")
+ }
+
+ var err error
+ logData[i], err = proto.Marshal(le)
+ if err != nil {
+ log.Fields{
+ log.ErrorKey: err,
+ "index": le.StreamIndex,
+ }.Errorf(ctx, "Failed to marshal log entry.")
+ return errors.New("failed to marshal log entries")
+ }
+ }
}
// Fetch our cached/remote state. This will replace our state object with the
// fetched state, so any future calls will need to re-set the Secret value.
// TODO: Use timeout?
state, err := c.Coordinator.RegisterStream(ctx, &coordinator.LogStreamState{
- Path: lw.path,
- Secret: types.StreamSecret(lw.be.Secret),
- ProtoVersion: lw.md.ProtoVersion,
- }, lw.be.Desc)
+ Path: h.path,
+ Secret: types.StreamSecret(h.be.Secret),
+ ProtoVersion: h.md.ProtoVersion,
+ }, h.be.Desc)
if err != nil {
log.WithError(err).Errorf(ctx, "Failed to get/register current stream state.")
return err
}
// Does the log stream's secret match the expected secret?
- if !bytes.Equal(lw.be.Secret, []byte(state.Secret)) {
+ if !bytes.Equal(h.be.Secret, []byte(state.Secret)) {
log.Errorf(log.SetFields(ctx, log.Fields{
- "secret": lw.be.Secret,
+ "secret": h.be.Secret,
"expectedSecret": state.Secret,
}), "Log entry has incorrect secret.")
return nil
@@ -224,8 +251,8 @@ func (c *Collector) processLogStream(ctx context.Context, lw *logWork) error {
// Note that even if our cached value is marked terminal, we could have failed
// to push the terminal index to the Coordinator, so we will not refrain from
// pushing every terminal index encountered regardless of cache state.
- if lw.be.Terminal {
- tidx := types.MessageIndex(lw.be.TerminalIndex)
+ if h.be.Terminal {
+ tidx := types.MessageIndex(h.be.TerminalIndex)
log.Fields{
"value": tidx,
}.Debugf(ctx, "Bundle includes a terminal index.")
@@ -240,43 +267,39 @@ func (c *Collector) processLogStream(ctx context.Context, lw *logWork) error {
}
}
- // In parallel, load the log entries into Storage. Throttle this with our
- // ingest semaphore.
- return errors.MultiErrorFromErrors(c.runner.Run(func(taskC chan<- func() error) {
- for i, le := range lw.be.Logs {
- i, le := i, le
-
- // Store this LogEntry
+ // Perform stream processing operations. We can do these operations in
+ // parallel.
+ return parallel.FanOutIn(func(taskC chan<- func() error) {
+ // Store log data, if any was provided. It has already been validated.
+ if len(logData) > 0 {
taskC <- func() error {
- if err := le.Validate(lw.be.Desc); err != nil {
+ // Post the log to storage.
+ err = c.Storage.Put(storage.PutRequest{
+ Path: h.path,
+ Index: types.MessageIndex(blockIndex),
+ Values: logData,
+ })
+
+ // If the log entry already exists, consider the "put" successful.
+ // Storage will return a transient error if one occurred.
+ if err != nil && err != storage.ErrExists {
log.Fields{
log.ErrorKey: err,
- "index": i,
- }.Warningf(ctx, "Discarding invalid log entry.")
- return nil
- }
-
- if state.TerminalIndex >= 0 && types.MessageIndex(le.StreamIndex) > state.TerminalIndex {
- log.Fields{
- "index": le.StreamIndex,
- "terminalIndex": state.TerminalIndex,
- }.Warningf(ctx, "Stream is terminated before log entry; discarding.")
- return nil
+ "blockIndex": blockIndex,
+ }.Errorf(ctx, "Failed to load log entry into Storage.")
+ return err
}
-
- lw := *lw
- lw.le = le
- return c.processLogEntry(ctx, &lw)
+ return nil
}
}
// If our bundle entry is terminal, we have an additional task of reporting
// this to the Coordinator.
- if lw.be.Terminal {
+ if h.be.Terminal {
taskC <- func() error {
// Sentinel task: Update the terminal bundle state.
state := *state
- state.TerminalIndex = types.MessageIndex(lw.be.TerminalIndex)
+ state.TerminalIndex = types.MessageIndex(h.be.TerminalIndex)
log.Fields{
"terminalIndex": state.TerminalIndex,
@@ -289,32 +312,7 @@ func (c *Collector) processLogStream(ctx context.Context, lw *logWork) error {
return nil
}
}
- }))
-}
-
-func (c *Collector) processLogEntry(ctx context.Context, lw *logWork) error {
- data, err := proto.Marshal(lw.le)
- if err != nil {
- log.WithError(err).Errorf(ctx, "Failed to marshal log entry.")
- return err
- }
-
- // Post the log to storage.
- err = c.Storage.Put(&storage.PutRequest{
- Path: lw.path,
- Index: types.MessageIndex(lw.le.StreamIndex),
- Value: data,
})
-
- // If the log entry already exists, consider the "put" successful.
- //
- // All Storage errors are considered transient, as they are safe and
- // data-agnostic.
- if err != nil && err != storage.ErrExists {
- log.WithError(err).Errorf(ctx, "Failed to load log entry into Storage.")
- return errors.WrapTransient(err)
- }
- return nil
}
// wrapMultiErrorTransient wraps an error in a TransientError wrapper.
« no previous file with comments | « server/internal/logdog/archivist/archivist_test.go ('k') | server/internal/logdog/collector/collector_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698