| Index: server/internal/logdog/collector/collector.go
|
| diff --git a/server/internal/logdog/collector/collector.go b/server/internal/logdog/collector/collector.go
|
| index 923531eeae2eb7af510b271826550d2121ec4aa0..28ecb8137f2123fc9978155f0dcbd330011ce26e 100644
|
| --- a/server/internal/logdog/collector/collector.go
|
| +++ b/server/internal/logdog/collector/collector.go
|
| @@ -9,6 +9,7 @@ import (
|
| "time"
|
|
|
| "github.com/golang/protobuf/proto"
|
| + "github.com/luci/luci-go/common/config"
|
| "github.com/luci/luci-go/common/errors"
|
| "github.com/luci/luci-go/common/logdog/butlerproto"
|
| "github.com/luci/luci-go/common/logdog/types"
|
| @@ -81,8 +82,9 @@ func (c *Collector) Process(ctx context.Context, msg []byte) error {
|
| if log.IsLogging(ctx, log.Info) {
|
| for i, entry := range pr.Bundle.Entries {
|
| fields := log.Fields{
|
| - "index": i,
|
| - "path": entry.GetDesc().Path(),
|
| + "index": i,
|
| + "project": pr.Bundle.Project,
|
| + "path": entry.GetDesc().Path(),
|
| }
|
| if entry.Terminal {
|
| fields["terminalIndex"] = entry.TerminalIndex
|
| @@ -96,17 +98,46 @@ func (c *Collector) Process(ctx context.Context, msg []byte) error {
|
| }
|
| }
|
|
|
| - // If there are no entries, there is nothing to do.
|
| - if len(pr.Bundle.Entries) == 0 {
|
| - return nil
|
| - }
|
| -
|
| lw := bundleHandler{
|
| msg: msg,
|
| md: pr.Metadata,
|
| b: pr.Bundle,
|
| }
|
|
|
| + if err := types.PrefixSecret(lw.b.Secret).Validate(); err != nil {
|
| + log.Fields{
|
| + log.ErrorKey: err,
|
| + "secretLength": len(lw.b.Secret),
|
| + }.Errorf(ctx, "Failed to validate prefix secret.")
|
| + return errors.New("invalid prefix secret")
|
| + }
|
| +
|
| + // TODO(dnj): Make this actually an fatal error, once project becomes
|
| + // required.
|
| + if lw.b.Project != "" {
|
| + lw.project = config.ProjectName(lw.b.Project)
|
| + if err := lw.project.Validate(); err != nil {
|
| + log.Fields{
|
| + log.ErrorKey: err,
|
| + "project": lw.b.Project,
|
| + }.Errorf(ctx, "Failed to validate bundle project name.")
|
| + return errors.New("invalid bundle project name")
|
| + }
|
| + }
|
| +
|
| + if err := types.StreamName(lw.b.Prefix).Validate(); err != nil {
|
| + log.Fields{
|
| + log.ErrorKey: err,
|
| + "prefix": lw.b.Prefix,
|
| + }.Errorf(ctx, "Failed to validate bundle prefix.")
|
| + return errors.New("invalid bundle prefix")
|
| + }
|
| +
|
| + // If there are no entries, there is nothing to do.
|
| + if len(pr.Bundle.Entries) == 0 {
|
| + return nil
|
| + }
|
| +
|
| // Handle each bundle entry in parallel. We will use a separate work pool
|
| // here so that top-level bundle dispatch can't deadlock the processing tasks.
|
| workers := c.MaxMessageWorkers
|
| @@ -149,6 +180,9 @@ type bundleHandler struct {
|
| md *logpb.ButlerMetadata
|
| // b is the Butler bundle.
|
| b *logpb.ButlerLogBundle
|
| +
|
| + // project is the validated project name.
|
| + project config.ProjectName
|
| }
|
|
|
| type bundleEntryHandler struct {
|
| @@ -174,17 +208,29 @@ func (c *Collector) processLogStream(ctx context.Context, h *bundleEntryHandler)
|
| return nil
|
| }
|
|
|
| + // If the descriptor has a Prefix, it must match the bundle's Prefix.
|
| + if p := h.be.Desc.Prefix; p != "" {
|
| + if p != h.b.Prefix {
|
| + log.Fields{
|
| + "bundlePrefix": h.b.Prefix,
|
| + "bundleEntryPrefix": p,
|
| + }.Errorf(ctx, "Bundle prefix does not match entry prefix.")
|
| + return errors.New("mismatched bundle and entry prefixes")
|
| + }
|
| + } else {
|
| + // Fill in the bundle's Prefix.
|
| + h.be.Desc.Prefix = h.b.Prefix
|
| + }
|
| +
|
| if err := h.be.Desc.Validate(true); err != nil {
|
| log.Errorf(log.SetError(ctx, err), "Invalid log stream descriptor.")
|
| return err
|
| }
|
| h.path = types.StreamName(h.be.Desc.Prefix).Join(types.StreamName(h.be.Desc.Name))
|
| - ctx = log.SetField(ctx, "path", h.path)
|
| -
|
| - if len(h.be.Secret) == 0 {
|
| - log.Errorf(ctx, "Missing secret.")
|
| - return errors.New("missing stream secret")
|
| - }
|
| + ctx = log.SetFields(ctx, log.Fields{
|
| + "project": h.project,
|
| + "path": h.path,
|
| + })
|
|
|
| // Confirm that the log entries are valid and contiguous. Serialize the log
|
| // entries for ingest as we validate them.
|
| @@ -230,8 +276,9 @@ func (c *Collector) processLogStream(ctx context.Context, h *bundleEntryHandler)
|
| // fetched state, so any future calls will need to re-set the Secret value.
|
| // TODO: Use timeout?
|
| state, err := c.Coordinator.RegisterStream(ctx, &coordinator.LogStreamState{
|
| + Project: h.project,
|
| Path: h.path,
|
| - Secret: types.PrefixSecret(h.be.Secret),
|
| + Secret: types.PrefixSecret(h.b.Secret),
|
| ProtoVersion: h.md.ProtoVersion,
|
| }, h.be.Desc)
|
| if err != nil {
|
| @@ -240,9 +287,9 @@ func (c *Collector) processLogStream(ctx context.Context, h *bundleEntryHandler)
|
| }
|
|
|
| // Does the log stream's secret match the expected secret?
|
| - if !bytes.Equal(h.be.Secret, []byte(state.Secret)) {
|
| + if !bytes.Equal(h.b.Secret, []byte(state.Secret)) {
|
| log.Errorf(log.SetFields(ctx, log.Fields{
|
| - "secret": h.be.Secret,
|
| + "secret": h.b.Secret,
|
| "expectedSecret": state.Secret,
|
| }), "Log entry has incorrect secret.")
|
| return nil
|
| @@ -286,9 +333,10 @@ func (c *Collector) processLogStream(ctx context.Context, h *bundleEntryHandler)
|
| taskC <- func() error {
|
| // Post the log to storage.
|
| err = c.Storage.Put(storage.PutRequest{
|
| - Path: h.path,
|
| - Index: types.MessageIndex(blockIndex),
|
| - Values: logData,
|
| + Project: h.project,
|
| + Path: h.path,
|
| + Index: types.MessageIndex(blockIndex),
|
| + Values: logData,
|
| })
|
|
|
| // If the log entry already exists, consider the "put" successful.
|
|
|