Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(383)

Unified Diff: server/internal/logdog/collector/collector.go

Issue 1906023002: LogDog: Add project namespace to Butler/Collector. (Closed) Base URL: https://github.com/luci/luci-go@logdog-project-archivist
Patch Set: Rebase? Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « common/proto/logdog/logpb/butler.pb.go ('k') | server/internal/logdog/collector/collector_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: server/internal/logdog/collector/collector.go
diff --git a/server/internal/logdog/collector/collector.go b/server/internal/logdog/collector/collector.go
index 923531eeae2eb7af510b271826550d2121ec4aa0..28ecb8137f2123fc9978155f0dcbd330011ce26e 100644
--- a/server/internal/logdog/collector/collector.go
+++ b/server/internal/logdog/collector/collector.go
@@ -9,6 +9,7 @@ import (
"time"
"github.com/golang/protobuf/proto"
+ "github.com/luci/luci-go/common/config"
"github.com/luci/luci-go/common/errors"
"github.com/luci/luci-go/common/logdog/butlerproto"
"github.com/luci/luci-go/common/logdog/types"
@@ -81,8 +82,9 @@ func (c *Collector) Process(ctx context.Context, msg []byte) error {
if log.IsLogging(ctx, log.Info) {
for i, entry := range pr.Bundle.Entries {
fields := log.Fields{
- "index": i,
- "path": entry.GetDesc().Path(),
+ "index": i,
+ "project": pr.Bundle.Project,
+ "path": entry.GetDesc().Path(),
}
if entry.Terminal {
fields["terminalIndex"] = entry.TerminalIndex
@@ -96,17 +98,46 @@ func (c *Collector) Process(ctx context.Context, msg []byte) error {
}
}
- // If there are no entries, there is nothing to do.
- if len(pr.Bundle.Entries) == 0 {
- return nil
- }
-
lw := bundleHandler{
msg: msg,
md: pr.Metadata,
b: pr.Bundle,
}
+ if err := types.PrefixSecret(lw.b.Secret).Validate(); err != nil {
+ log.Fields{
+ log.ErrorKey: err,
+ "secretLength": len(lw.b.Secret),
+ }.Errorf(ctx, "Failed to validate prefix secret.")
+ return errors.New("invalid prefix secret")
+ }
+
+ // TODO(dnj): Make this actually an fatal error, once project becomes
+ // required.
+ if lw.b.Project != "" {
+ lw.project = config.ProjectName(lw.b.Project)
+ if err := lw.project.Validate(); err != nil {
+ log.Fields{
+ log.ErrorKey: err,
+ "project": lw.b.Project,
+ }.Errorf(ctx, "Failed to validate bundle project name.")
+ return errors.New("invalid bundle project name")
+ }
+ }
+
+ if err := types.StreamName(lw.b.Prefix).Validate(); err != nil {
+ log.Fields{
+ log.ErrorKey: err,
+ "prefix": lw.b.Prefix,
+ }.Errorf(ctx, "Failed to validate bundle prefix.")
+ return errors.New("invalid bundle prefix")
+ }
+
+ // If there are no entries, there is nothing to do.
+ if len(pr.Bundle.Entries) == 0 {
+ return nil
+ }
+
// Handle each bundle entry in parallel. We will use a separate work pool
// here so that top-level bundle dispatch can't deadlock the processing tasks.
workers := c.MaxMessageWorkers
@@ -149,6 +180,9 @@ type bundleHandler struct {
md *logpb.ButlerMetadata
// b is the Butler bundle.
b *logpb.ButlerLogBundle
+
+ // project is the validated project name.
+ project config.ProjectName
}
type bundleEntryHandler struct {
@@ -174,17 +208,29 @@ func (c *Collector) processLogStream(ctx context.Context, h *bundleEntryHandler)
return nil
}
+ // If the descriptor has a Prefix, it must match the bundle's Prefix.
+ if p := h.be.Desc.Prefix; p != "" {
+ if p != h.b.Prefix {
+ log.Fields{
+ "bundlePrefix": h.b.Prefix,
+ "bundleEntryPrefix": p,
+ }.Errorf(ctx, "Bundle prefix does not match entry prefix.")
+ return errors.New("mismatched bundle and entry prefixes")
+ }
+ } else {
+ // Fill in the bundle's Prefix.
+ h.be.Desc.Prefix = h.b.Prefix
+ }
+
if err := h.be.Desc.Validate(true); err != nil {
log.Errorf(log.SetError(ctx, err), "Invalid log stream descriptor.")
return err
}
h.path = types.StreamName(h.be.Desc.Prefix).Join(types.StreamName(h.be.Desc.Name))
- ctx = log.SetField(ctx, "path", h.path)
-
- if len(h.be.Secret) == 0 {
- log.Errorf(ctx, "Missing secret.")
- return errors.New("missing stream secret")
- }
+ ctx = log.SetFields(ctx, log.Fields{
+ "project": h.project,
+ "path": h.path,
+ })
// Confirm that the log entries are valid and contiguous. Serialize the log
// entries for ingest as we validate them.
@@ -230,8 +276,9 @@ func (c *Collector) processLogStream(ctx context.Context, h *bundleEntryHandler)
// fetched state, so any future calls will need to re-set the Secret value.
// TODO: Use timeout?
state, err := c.Coordinator.RegisterStream(ctx, &coordinator.LogStreamState{
+ Project: h.project,
Path: h.path,
- Secret: types.PrefixSecret(h.be.Secret),
+ Secret: types.PrefixSecret(h.b.Secret),
ProtoVersion: h.md.ProtoVersion,
}, h.be.Desc)
if err != nil {
@@ -240,9 +287,9 @@ func (c *Collector) processLogStream(ctx context.Context, h *bundleEntryHandler)
}
// Does the log stream's secret match the expected secret?
- if !bytes.Equal(h.be.Secret, []byte(state.Secret)) {
+ if !bytes.Equal(h.b.Secret, []byte(state.Secret)) {
log.Errorf(log.SetFields(ctx, log.Fields{
- "secret": h.be.Secret,
+ "secret": h.b.Secret,
"expectedSecret": state.Secret,
}), "Log entry has incorrect secret.")
return nil
@@ -286,9 +333,10 @@ func (c *Collector) processLogStream(ctx context.Context, h *bundleEntryHandler)
taskC <- func() error {
// Post the log to storage.
err = c.Storage.Put(storage.PutRequest{
- Path: h.path,
- Index: types.MessageIndex(blockIndex),
- Values: logData,
+ Project: h.project,
+ Path: h.path,
+ Index: types.MessageIndex(blockIndex),
+ Values: logData,
})
// If the log entry already exists, consider the "put" successful.
« no previous file with comments | « common/proto/logdog/logpb/butler.pb.go ('k') | server/internal/logdog/collector/collector_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698