Index: server/internal/logdog/collector/collector.go |
diff --git a/server/internal/logdog/collector/collector.go b/server/internal/logdog/collector/collector.go |
index 923531eeae2eb7af510b271826550d2121ec4aa0..28ecb8137f2123fc9978155f0dcbd330011ce26e 100644 |
--- a/server/internal/logdog/collector/collector.go |
+++ b/server/internal/logdog/collector/collector.go |
@@ -9,6 +9,7 @@ import ( |
"time" |
"github.com/golang/protobuf/proto" |
+ "github.com/luci/luci-go/common/config" |
"github.com/luci/luci-go/common/errors" |
"github.com/luci/luci-go/common/logdog/butlerproto" |
"github.com/luci/luci-go/common/logdog/types" |
@@ -81,8 +82,9 @@ func (c *Collector) Process(ctx context.Context, msg []byte) error { |
if log.IsLogging(ctx, log.Info) { |
for i, entry := range pr.Bundle.Entries { |
fields := log.Fields{ |
- "index": i, |
- "path": entry.GetDesc().Path(), |
+ "index": i, |
+ "project": pr.Bundle.Project, |
+ "path": entry.GetDesc().Path(), |
} |
if entry.Terminal { |
fields["terminalIndex"] = entry.TerminalIndex |
@@ -96,17 +98,46 @@ func (c *Collector) Process(ctx context.Context, msg []byte) error { |
} |
} |
- // If there are no entries, there is nothing to do. |
- if len(pr.Bundle.Entries) == 0 { |
- return nil |
- } |
- |
lw := bundleHandler{ |
msg: msg, |
md: pr.Metadata, |
b: pr.Bundle, |
} |
+ if err := types.PrefixSecret(lw.b.Secret).Validate(); err != nil { |
+ log.Fields{ |
+ log.ErrorKey: err, |
+ "secretLength": len(lw.b.Secret), |
+ }.Errorf(ctx, "Failed to validate prefix secret.") |
+ return errors.New("invalid prefix secret") |
+ } |
+ |
+ // TODO(dnj): Make this actually an fatal error, once project becomes |
+ // required. |
+ if lw.b.Project != "" { |
+ lw.project = config.ProjectName(lw.b.Project) |
+ if err := lw.project.Validate(); err != nil { |
+ log.Fields{ |
+ log.ErrorKey: err, |
+ "project": lw.b.Project, |
+ }.Errorf(ctx, "Failed to validate bundle project name.") |
+ return errors.New("invalid bundle project name") |
+ } |
+ } |
+ |
+ if err := types.StreamName(lw.b.Prefix).Validate(); err != nil { |
+ log.Fields{ |
+ log.ErrorKey: err, |
+ "prefix": lw.b.Prefix, |
+ }.Errorf(ctx, "Failed to validate bundle prefix.") |
+ return errors.New("invalid bundle prefix") |
+ } |
+ |
+ // If there are no entries, there is nothing to do. |
+ if len(pr.Bundle.Entries) == 0 { |
+ return nil |
+ } |
+ |
// Handle each bundle entry in parallel. We will use a separate work pool |
// here so that top-level bundle dispatch can't deadlock the processing tasks. |
workers := c.MaxMessageWorkers |
@@ -149,6 +180,9 @@ type bundleHandler struct { |
md *logpb.ButlerMetadata |
// b is the Butler bundle. |
b *logpb.ButlerLogBundle |
+ |
+ // project is the validated project name. |
+ project config.ProjectName |
} |
type bundleEntryHandler struct { |
@@ -174,17 +208,29 @@ func (c *Collector) processLogStream(ctx context.Context, h *bundleEntryHandler) |
return nil |
} |
+ // If the descriptor has a Prefix, it must match the bundle's Prefix. |
+ if p := h.be.Desc.Prefix; p != "" { |
+ if p != h.b.Prefix { |
+ log.Fields{ |
+ "bundlePrefix": h.b.Prefix, |
+ "bundleEntryPrefix": p, |
+ }.Errorf(ctx, "Bundle prefix does not match entry prefix.") |
+ return errors.New("mismatched bundle and entry prefixes") |
+ } |
+ } else { |
+ // Fill in the bundle's Prefix. |
+ h.be.Desc.Prefix = h.b.Prefix |
+ } |
+ |
if err := h.be.Desc.Validate(true); err != nil { |
log.Errorf(log.SetError(ctx, err), "Invalid log stream descriptor.") |
return err |
} |
h.path = types.StreamName(h.be.Desc.Prefix).Join(types.StreamName(h.be.Desc.Name)) |
- ctx = log.SetField(ctx, "path", h.path) |
- |
- if len(h.be.Secret) == 0 { |
- log.Errorf(ctx, "Missing secret.") |
- return errors.New("missing stream secret") |
- } |
+ ctx = log.SetFields(ctx, log.Fields{ |
+ "project": h.project, |
+ "path": h.path, |
+ }) |
// Confirm that the log entries are valid and contiguous. Serialize the log |
// entries for ingest as we validate them. |
@@ -230,8 +276,9 @@ func (c *Collector) processLogStream(ctx context.Context, h *bundleEntryHandler) |
// fetched state, so any future calls will need to re-set the Secret value. |
// TODO: Use timeout? |
state, err := c.Coordinator.RegisterStream(ctx, &coordinator.LogStreamState{ |
+ Project: h.project, |
Path: h.path, |
- Secret: types.PrefixSecret(h.be.Secret), |
+ Secret: types.PrefixSecret(h.b.Secret), |
ProtoVersion: h.md.ProtoVersion, |
}, h.be.Desc) |
if err != nil { |
@@ -240,9 +287,9 @@ func (c *Collector) processLogStream(ctx context.Context, h *bundleEntryHandler) |
} |
// Does the log stream's secret match the expected secret? |
- if !bytes.Equal(h.be.Secret, []byte(state.Secret)) { |
+ if !bytes.Equal(h.b.Secret, []byte(state.Secret)) { |
log.Errorf(log.SetFields(ctx, log.Fields{ |
- "secret": h.be.Secret, |
+ "secret": h.b.Secret, |
"expectedSecret": state.Secret, |
}), "Log entry has incorrect secret.") |
return nil |
@@ -286,9 +333,10 @@ func (c *Collector) processLogStream(ctx context.Context, h *bundleEntryHandler) |
taskC <- func() error { |
// Post the log to storage. |
err = c.Storage.Put(storage.PutRequest{ |
- Path: h.path, |
- Index: types.MessageIndex(blockIndex), |
- Values: logData, |
+ Project: h.project, |
+ Path: h.path, |
+ Index: types.MessageIndex(blockIndex), |
+ Values: logData, |
}) |
// If the log entry already exists, consider the "put" successful. |