| Index: server/internal/logdog/collector/collector.go
|
| diff --git a/server/internal/logdog/collector/collector.go b/server/internal/logdog/collector/collector.go
|
| index a5dcb3f100a3d30419428e73a4c2122c50c42461..90d055278688ecebb61f36fd5f52fd5b9a23a7ad 100644
|
| --- a/server/internal/logdog/collector/collector.go
|
| +++ b/server/internal/logdog/collector/collector.go
|
| @@ -6,7 +6,6 @@ package collector
|
|
|
| import (
|
| "bytes"
|
| - "sync"
|
| "time"
|
|
|
| "github.com/golang/protobuf/proto"
|
| @@ -21,6 +20,12 @@ import (
|
| "golang.org/x/net/context"
|
| )
|
|
|
| +const (
|
| + // DefaultMaxMessageWorkers is the default number of concurrent worker
|
| + // goroutones to employ for a single message.
|
| + DefaultMaxMessageWorkers = 4
|
| +)
|
| +
|
| // Collector is a stateful object responsible for ingesting LogDog logs,
|
| // registering them with a Coordinator, and stowing them in short-term storage
|
| // for streaming and processing.
|
| @@ -34,40 +39,16 @@ type Collector struct {
|
| // the stateCache sub-package) to avoid overwhelming the server.
|
| Coordinator coordinator.Coordinator
|
|
|
| - // Storage is the backing store to use.
|
| + // Storage is the intermediate storage instance to use.
|
| Storage storage.Storage
|
|
|
| // StreamStateCacheExpire is the maximum amount of time that a cached stream
|
| // state entry is valid. If zero, DefaultStreamStateCacheExpire will be used.
|
| StreamStateCacheExpire time.Duration
|
|
|
| - // MaxParallelBundles is the maximum number of log entry bundles per message
|
| - // to handle in parallel. If <= 0, no maximum will be applied.
|
| - MaxParallelBundles int
|
| - // MaxIngestWorkers is the maximum number of ingest worker goroutines that
|
| - // will operate at a time. If <= 0, no maximum will be applied.
|
| - MaxIngestWorkers int
|
| -
|
| - // initOnce is used to ensure that the Collector's internal state is
|
| - // initialized at most once.
|
| - initOnce sync.Once
|
| - // runner is the Runner that will be used for ingest. It will be configured
|
| - // based on the supplied MaxIngestWorkers parameter.
|
| - //
|
| - // Internally, runner must not be used by tasks that themselves use the
|
| - // runner, else deadlock could occur.
|
| - runner *parallel.Runner
|
| -}
|
| -
|
| -// init initializes the operational state of the Collector. It must be called
|
| -// internally at the beginning of any exported method that uses that state.
|
| -func (c *Collector) init() {
|
| - c.initOnce.Do(func() {
|
| - c.runner = ¶llel.Runner{
|
| - Sustained: c.MaxIngestWorkers,
|
| - Maximum: c.MaxIngestWorkers,
|
| - }
|
| - })
|
| + // MaxMessageWorkers is the maximum number of concurrent workers to employ
|
| + // for any given message. If <= 0, DefaultMaxMessageWorkers will be applied.
|
| + MaxMessageWorkers int
|
| }
|
|
|
| // Process ingests an encoded ButlerLogBundle message, registering it with
|
| @@ -78,8 +59,6 @@ func (c *Collector) init() {
|
| // If no error occurred, or if there was an error with the input data, no error
|
| // will be returned.
|
| func (c *Collector) Process(ctx context.Context, msg []byte) error {
|
| - c.init()
|
| -
|
| pr := butlerproto.Reader{}
|
| if err := pr.Read(bytes.NewReader(msg)); err != nil {
|
| log.Errorf(log.SetError(ctx, err), "Failed to unpack message.")
|
| @@ -122,26 +101,32 @@ func (c *Collector) Process(ctx context.Context, msg []byte) error {
|
| return nil
|
| }
|
|
|
| - // Define our logWork template. This will be cloned for each ingested log
|
| - // stream.
|
| - lw := logWork{
|
| - md: pr.Metadata,
|
| - b: pr.Bundle,
|
| + lw := bundleHandler{
|
| + msg: msg,
|
| + md: pr.Metadata,
|
| + b: pr.Bundle,
|
| }
|
|
|
| // Handle each bundle entry in parallel. We will use a separate work pool
|
| // here so that top-level bundle dispatch can't deadlock the processing tasks.
|
| - err := parallel.WorkPool(c.MaxParallelBundles, func(taskC chan<- func() error) {
|
| + workers := c.MaxMessageWorkers
|
| + if workers <= 0 {
|
| + workers = DefaultMaxMessageWorkers
|
| + }
|
| + err := parallel.WorkPool(workers, func(taskC chan<- func() error) {
|
| for _, be := range pr.Bundle.Entries {
|
| - lw := lw
|
| - lw.be = be
|
| + be := be
|
| +
|
| taskC <- func() error {
|
| - return c.processLogStream(ctx, &lw)
|
| + return c.processLogStream(ctx, &bundleEntryHandler{
|
| + bundleHandler: &lw,
|
| + be: be,
|
| + })
|
| }
|
| }
|
| })
|
| if err != nil {
|
| - if hasTransientError(err) && !errors.IsTransient(err) {
|
| + if !errors.IsTransient(err) && hasTransientError(err) {
|
| // err has a nested transient error; propagate that to top.
|
| err = errors.WrapTransient(err)
|
| }
|
| @@ -153,58 +138,100 @@ func (c *Collector) Process(ctx context.Context, msg []byte) error {
|
| // Close releases any internal resources and blocks pending the completion of
|
| // any outstanding operations. After Close, no new Process calls may be made.
|
| func (c *Collector) Close() {
|
| - c.init()
|
| -
|
| - c.runner.Close()
|
| }
|
|
|
| -// logWork is a cumulative set of read-only state passed around by value for log
|
| -// processing.
|
| -type logWork struct {
|
| +// bundleHandler is a cumulative set of read-only state passed around by
|
| +// value for log processing.
|
| +type bundleHandler struct {
|
| + // msg is the original message bytes.
|
| + msg []byte
|
| // md is the metadata associated with the overall message.
|
| md *logpb.ButlerMetadata
|
| // b is the Butler bundle.
|
| b *logpb.ButlerLogBundle
|
| +}
|
| +
|
| +type bundleEntryHandler struct {
|
| + *bundleHandler
|
| +
|
| // be is the Bundle entry.
|
| be *logpb.ButlerLogBundle_Entry
|
| // path is the constructed path of the stream being processed.
|
| path types.StreamPath
|
| - // le is the LogEntry in the bundle entry.
|
| - le *logpb.LogEntry
|
| }
|
|
|
| // processLogStream processes an individual set of log messages belonging to the
|
| // same log stream.
|
| -func (c *Collector) processLogStream(ctx context.Context, lw *logWork) error {
|
| - if err := lw.be.Desc.Validate(true); err != nil {
|
| +func (c *Collector) processLogStream(ctx context.Context, h *bundleEntryHandler) error {
|
| + if err := h.be.Desc.Validate(true); err != nil {
|
| log.Errorf(log.SetError(ctx, err), "Invalid log stream descriptor.")
|
| - return nil
|
| + return err
|
| }
|
| - lw.path = types.StreamName(lw.be.Desc.Prefix).Join(types.StreamName(lw.be.Desc.Name))
|
| - ctx = log.SetField(ctx, "path", lw.path)
|
| + h.path = types.StreamName(h.be.Desc.Prefix).Join(types.StreamName(h.be.Desc.Name))
|
| + ctx = log.SetField(ctx, "path", h.path)
|
|
|
| - if len(lw.be.Secret) == 0 {
|
| + if len(h.be.Secret) == 0 {
|
| log.Errorf(ctx, "Missing secret.")
|
| - return nil
|
| + return errors.New("missing stream secret")
|
| + }
|
| +
|
| + // Confirm that the log entries are valid and contiguous. Serialize the log
|
| + // entries for ingest as we validate them.
|
| + var logData [][]byte
|
| + var blockIndex uint64
|
| + if logs := h.be.Logs; len(logs) > 0 {
|
| + logData = make([][]byte, len(logs))
|
| + blockIndex = logs[0].StreamIndex
|
| +
|
| + for i, le := range logs {
|
| + // Validate this log entry.
|
| + if err := le.Validate(h.be.Desc); err != nil {
|
| + log.Fields{
|
| + log.ErrorKey: err,
|
| + "index": le.StreamIndex,
|
| + }.Warningf(ctx, "Discarding invalid log entry.")
|
| + return errors.New("invalid log entry")
|
| + }
|
| +
|
| + // Validate that this entry is contiguous.
|
| + if le.StreamIndex != blockIndex+uint64(i) {
|
| + log.Fields{
|
| + "index": i,
|
| + "expected": (blockIndex + uint64(i)),
|
| + "actual": le.StreamIndex,
|
| + }.Errorf(ctx, "Non-contiguous log entry block in stream.")
|
| + return errors.New("non-contiguous log entry block")
|
| + }
|
| +
|
| + var err error
|
| + logData[i], err = proto.Marshal(le)
|
| + if err != nil {
|
| + log.Fields{
|
| + log.ErrorKey: err,
|
| + "index": le.StreamIndex,
|
| + }.Errorf(ctx, "Failed to marshal log entry.")
|
| + return errors.New("failed to marshal log entries")
|
| + }
|
| + }
|
| }
|
|
|
| // Fetch our cached/remote state. This will replace our state object with the
|
| // fetched state, so any future calls will need to re-set the Secret value.
|
| // TODO: Use timeout?
|
| state, err := c.Coordinator.RegisterStream(ctx, &coordinator.LogStreamState{
|
| - Path: lw.path,
|
| - Secret: types.StreamSecret(lw.be.Secret),
|
| - ProtoVersion: lw.md.ProtoVersion,
|
| - }, lw.be.Desc)
|
| + Path: h.path,
|
| + Secret: types.StreamSecret(h.be.Secret),
|
| + ProtoVersion: h.md.ProtoVersion,
|
| + }, h.be.Desc)
|
| if err != nil {
|
| log.WithError(err).Errorf(ctx, "Failed to get/register current stream state.")
|
| return err
|
| }
|
|
|
| // Does the log stream's secret match the expected secret?
|
| - if !bytes.Equal(lw.be.Secret, []byte(state.Secret)) {
|
| + if !bytes.Equal(h.be.Secret, []byte(state.Secret)) {
|
| log.Errorf(log.SetFields(ctx, log.Fields{
|
| - "secret": lw.be.Secret,
|
| + "secret": h.be.Secret,
|
| "expectedSecret": state.Secret,
|
| }), "Log entry has incorrect secret.")
|
| return nil
|
| @@ -224,8 +251,8 @@ func (c *Collector) processLogStream(ctx context.Context, lw *logWork) error {
|
| // Note that even if our cached value is marked terminal, we could have failed
|
| // to push the terminal index to the Coordinator, so we will not refrain from
|
| // pushing every terminal index encountered regardless of cache state.
|
| - if lw.be.Terminal {
|
| - tidx := types.MessageIndex(lw.be.TerminalIndex)
|
| + if h.be.Terminal {
|
| + tidx := types.MessageIndex(h.be.TerminalIndex)
|
| log.Fields{
|
| "value": tidx,
|
| }.Debugf(ctx, "Bundle includes a terminal index.")
|
| @@ -240,43 +267,39 @@ func (c *Collector) processLogStream(ctx context.Context, lw *logWork) error {
|
| }
|
| }
|
|
|
| - // In parallel, load the log entries into Storage. Throttle this with our
|
| - // ingest semaphore.
|
| - return errors.MultiErrorFromErrors(c.runner.Run(func(taskC chan<- func() error) {
|
| - for i, le := range lw.be.Logs {
|
| - i, le := i, le
|
| -
|
| - // Store this LogEntry
|
| + // Perform stream processing operations. We can do these operations in
|
| + // parallel.
|
| + return parallel.FanOutIn(func(taskC chan<- func() error) {
|
| + // Store log data, if any was provided. It has already been validated.
|
| + if len(logData) > 0 {
|
| taskC <- func() error {
|
| - if err := le.Validate(lw.be.Desc); err != nil {
|
| + // Post the log to storage.
|
| + err = c.Storage.Put(storage.PutRequest{
|
| + Path: h.path,
|
| + Index: types.MessageIndex(blockIndex),
|
| + Values: logData,
|
| + })
|
| +
|
| + // If the log entry already exists, consider the "put" successful.
|
| + // Storage will return a transient error if one occurred.
|
| + if err != nil && err != storage.ErrExists {
|
| log.Fields{
|
| log.ErrorKey: err,
|
| - "index": i,
|
| - }.Warningf(ctx, "Discarding invalid log entry.")
|
| - return nil
|
| - }
|
| -
|
| - if state.TerminalIndex >= 0 && types.MessageIndex(le.StreamIndex) > state.TerminalIndex {
|
| - log.Fields{
|
| - "index": le.StreamIndex,
|
| - "terminalIndex": state.TerminalIndex,
|
| - }.Warningf(ctx, "Stream is terminated before log entry; discarding.")
|
| - return nil
|
| + "blockIndex": blockIndex,
|
| + }.Errorf(ctx, "Failed to load log entry into Storage.")
|
| + return err
|
| }
|
| -
|
| - lw := *lw
|
| - lw.le = le
|
| - return c.processLogEntry(ctx, &lw)
|
| + return nil
|
| }
|
| }
|
|
|
| // If our bundle entry is terminal, we have an additional task of reporting
|
| // this to the Coordinator.
|
| - if lw.be.Terminal {
|
| + if h.be.Terminal {
|
| taskC <- func() error {
|
| // Sentinel task: Update the terminal bundle state.
|
| state := *state
|
| - state.TerminalIndex = types.MessageIndex(lw.be.TerminalIndex)
|
| + state.TerminalIndex = types.MessageIndex(h.be.TerminalIndex)
|
|
|
| log.Fields{
|
| "terminalIndex": state.TerminalIndex,
|
| @@ -289,32 +312,7 @@ func (c *Collector) processLogStream(ctx context.Context, lw *logWork) error {
|
| return nil
|
| }
|
| }
|
| - }))
|
| -}
|
| -
|
| -func (c *Collector) processLogEntry(ctx context.Context, lw *logWork) error {
|
| - data, err := proto.Marshal(lw.le)
|
| - if err != nil {
|
| - log.WithError(err).Errorf(ctx, "Failed to marshal log entry.")
|
| - return err
|
| - }
|
| -
|
| - // Post the log to storage.
|
| - err = c.Storage.Put(&storage.PutRequest{
|
| - Path: lw.path,
|
| - Index: types.MessageIndex(lw.le.StreamIndex),
|
| - Value: data,
|
| })
|
| -
|
| - // If the log entry already exists, consider the "put" successful.
|
| - //
|
| - // All Storage errors are considered transient, as they are safe and
|
| - // data-agnostic.
|
| - if err != nil && err != storage.ErrExists {
|
| - log.WithError(err).Errorf(ctx, "Failed to load log entry into Storage.")
|
| - return errors.WrapTransient(err)
|
| - }
|
| - return nil
|
| }
|
|
|
| // wrapMultiErrorTransient wraps an error in a TransientError wrapper.
|
|
|