| Index: server/internal/logdog/collector/collector.go | 
| diff --git a/server/internal/logdog/collector/collector.go b/server/internal/logdog/collector/collector.go | 
| index 923531eeae2eb7af510b271826550d2121ec4aa0..28ecb8137f2123fc9978155f0dcbd330011ce26e 100644 | 
| --- a/server/internal/logdog/collector/collector.go | 
| +++ b/server/internal/logdog/collector/collector.go | 
| @@ -9,6 +9,7 @@ import ( | 
| "time" | 
|  | 
| "github.com/golang/protobuf/proto" | 
| +	"github.com/luci/luci-go/common/config" | 
| "github.com/luci/luci-go/common/errors" | 
| "github.com/luci/luci-go/common/logdog/butlerproto" | 
| "github.com/luci/luci-go/common/logdog/types" | 
| @@ -81,8 +82,9 @@ func (c *Collector) Process(ctx context.Context, msg []byte) error { | 
| if log.IsLogging(ctx, log.Info) { | 
| for i, entry := range pr.Bundle.Entries { | 
| fields := log.Fields{ | 
| -				"index": i, | 
| -				"path":  entry.GetDesc().Path(), | 
| +				"index":   i, | 
| +				"project": pr.Bundle.Project, | 
| +				"path":    entry.GetDesc().Path(), | 
| } | 
| if entry.Terminal { | 
| fields["terminalIndex"] = entry.TerminalIndex | 
| @@ -96,17 +98,46 @@ func (c *Collector) Process(ctx context.Context, msg []byte) error { | 
| } | 
| } | 
|  | 
| -	// If there are no entries, there is nothing to do. | 
| -	if len(pr.Bundle.Entries) == 0 { | 
| -		return nil | 
| -	} | 
| - | 
| lw := bundleHandler{ | 
| msg: msg, | 
| md:  pr.Metadata, | 
| b:   pr.Bundle, | 
| } | 
|  | 
| +	if err := types.PrefixSecret(lw.b.Secret).Validate(); err != nil { | 
| +		log.Fields{ | 
| +			log.ErrorKey:   err, | 
| +			"secretLength": len(lw.b.Secret), | 
| +		}.Errorf(ctx, "Failed to validate prefix secret.") | 
| +		return errors.New("invalid prefix secret") | 
| +	} | 
| + | 
| +	// TODO(dnj): Make this actually an fatal error, once project becomes | 
| +	// required. | 
| +	if lw.b.Project != "" { | 
| +		lw.project = config.ProjectName(lw.b.Project) | 
| +		if err := lw.project.Validate(); err != nil { | 
| +			log.Fields{ | 
| +				log.ErrorKey: err, | 
| +				"project":    lw.b.Project, | 
| +			}.Errorf(ctx, "Failed to validate bundle project name.") | 
| +			return errors.New("invalid bundle project name") | 
| +		} | 
| +	} | 
| + | 
| +	if err := types.StreamName(lw.b.Prefix).Validate(); err != nil { | 
| +		log.Fields{ | 
| +			log.ErrorKey: err, | 
| +			"prefix":     lw.b.Prefix, | 
| +		}.Errorf(ctx, "Failed to validate bundle prefix.") | 
| +		return errors.New("invalid bundle prefix") | 
| +	} | 
| + | 
| +	// If there are no entries, there is nothing to do. | 
| +	if len(pr.Bundle.Entries) == 0 { | 
| +		return nil | 
| +	} | 
| + | 
| // Handle each bundle entry in parallel. We will use a separate work pool | 
| // here so that top-level bundle dispatch can't deadlock the processing tasks. | 
| workers := c.MaxMessageWorkers | 
| @@ -149,6 +180,9 @@ type bundleHandler struct { | 
| md *logpb.ButlerMetadata | 
| // b is the Butler bundle. | 
| b *logpb.ButlerLogBundle | 
| + | 
| +	// project is the validated project name. | 
| +	project config.ProjectName | 
| } | 
|  | 
| type bundleEntryHandler struct { | 
| @@ -174,17 +208,29 @@ func (c *Collector) processLogStream(ctx context.Context, h *bundleEntryHandler) | 
| return nil | 
| } | 
|  | 
| +	// If the descriptor has a Prefix, it must match the bundle's Prefix. | 
| +	if p := h.be.Desc.Prefix; p != "" { | 
| +		if p != h.b.Prefix { | 
| +			log.Fields{ | 
| +				"bundlePrefix":      h.b.Prefix, | 
| +				"bundleEntryPrefix": p, | 
| +			}.Errorf(ctx, "Bundle prefix does not match entry prefix.") | 
| +			return errors.New("mismatched bundle and entry prefixes") | 
| +		} | 
| +	} else { | 
| +		// Fill in the bundle's Prefix. | 
| +		h.be.Desc.Prefix = h.b.Prefix | 
| +	} | 
| + | 
| if err := h.be.Desc.Validate(true); err != nil { | 
| log.Errorf(log.SetError(ctx, err), "Invalid log stream descriptor.") | 
| return err | 
| } | 
| h.path = types.StreamName(h.be.Desc.Prefix).Join(types.StreamName(h.be.Desc.Name)) | 
| -	ctx = log.SetField(ctx, "path", h.path) | 
| - | 
| -	if len(h.be.Secret) == 0 { | 
| -		log.Errorf(ctx, "Missing secret.") | 
| -		return errors.New("missing stream secret") | 
| -	} | 
| +	ctx = log.SetFields(ctx, log.Fields{ | 
| +		"project": h.project, | 
| +		"path":    h.path, | 
| +	}) | 
|  | 
| // Confirm that the log entries are valid and contiguous. Serialize the log | 
| // entries for ingest as we validate them. | 
| @@ -230,8 +276,9 @@ func (c *Collector) processLogStream(ctx context.Context, h *bundleEntryHandler) | 
| // fetched state, so any future calls will need to re-set the Secret value. | 
| // TODO: Use timeout? | 
| state, err := c.Coordinator.RegisterStream(ctx, &coordinator.LogStreamState{ | 
| +		Project:      h.project, | 
| Path:         h.path, | 
| -		Secret:       types.PrefixSecret(h.be.Secret), | 
| +		Secret:       types.PrefixSecret(h.b.Secret), | 
| ProtoVersion: h.md.ProtoVersion, | 
| }, h.be.Desc) | 
| if err != nil { | 
| @@ -240,9 +287,9 @@ func (c *Collector) processLogStream(ctx context.Context, h *bundleEntryHandler) | 
| } | 
|  | 
| // Does the log stream's secret match the expected secret? | 
| -	if !bytes.Equal(h.be.Secret, []byte(state.Secret)) { | 
| +	if !bytes.Equal(h.b.Secret, []byte(state.Secret)) { | 
| log.Errorf(log.SetFields(ctx, log.Fields{ | 
| -			"secret":         h.be.Secret, | 
| +			"secret":         h.b.Secret, | 
| "expectedSecret": state.Secret, | 
| }), "Log entry has incorrect secret.") | 
| return nil | 
| @@ -286,9 +333,10 @@ func (c *Collector) processLogStream(ctx context.Context, h *bundleEntryHandler) | 
| taskC <- func() error { | 
| // Post the log to storage. | 
| err = c.Storage.Put(storage.PutRequest{ | 
| -					Path:   h.path, | 
| -					Index:  types.MessageIndex(blockIndex), | 
| -					Values: logData, | 
| +					Project: h.project, | 
| +					Path:    h.path, | 
| +					Index:   types.MessageIndex(blockIndex), | 
| +					Values:  logData, | 
| }) | 
|  | 
| // If the log entry already exists, consider the "put" successful. | 
|  |