Index: server/internal/logdog/archivist/archivist.go |
diff --git a/server/internal/logdog/archivist/archivist.go b/server/internal/logdog/archivist/archivist.go |
index 33e837df9b4862dfbc92e3b4e8660ae7663a2e98..bb20602b0413f001051c19135f29ec7be10c2e89 100644 |
--- a/server/internal/logdog/archivist/archivist.go |
+++ b/server/internal/logdog/archivist/archivist.go |
@@ -12,6 +12,7 @@ import ( |
"github.com/golang/protobuf/proto" |
"github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
+ "github.com/luci/luci-go/common/config" |
"github.com/luci/luci-go/common/errors" |
"github.com/luci/luci-go/common/gcloud/gs" |
"github.com/luci/luci-go/common/logdog/types" |
@@ -86,6 +87,7 @@ func (a *Archivist) ArchiveTask(c context.Context, task Task) bool { |
log.Fields{ |
log.ErrorKey: err, |
"delete": delete, |
+ "project": task.Task().Project, |
"path": task.Task().Path, |
}.Infof(c, "Finished archive task.") |
return delete |
@@ -97,14 +99,27 @@ func (a *Archivist) ArchiveTask(c context.Context, task Task) bool { |
func (a *Archivist) archiveTaskImpl(c context.Context, task Task) (bool, error) { |
at := task.Task() |
log.Fields{ |
- "path": at.Path, |
+ "project": at.Project, |
+ "path": at.Path, |
}.Debugf(c, "Received archival task.") |
+ if err := types.StreamPath(at.Path).Validate(); err != nil { |
+ return true, fmt.Errorf("invalid path %q: %s", at.Path, err) |
+ } |
+ |
+ // TODO(dnj): Remove empty project exemption, make empty project invalid. |
+ if at.Project != "" { |
+ if err := config.ProjectName(at.Project).Validate(); err != nil { |
+ return true, fmt.Errorf("invalid project name %q: %s", at.Project, err) |
+ } |
+ } |
+ |
// Load the log stream's current state. If it is already archived, we will |
// return an immediate success. |
ls, err := a.Service.LoadStream(c, &logdog.LoadStreamRequest{ |
- Path: at.Path, |
- Desc: true, |
+ Project: at.Project, |
+ Path: at.Path, |
+ Desc: true, |
}) |
switch { |
case err != nil: |
@@ -183,13 +198,14 @@ func (a *Archivist) archiveTaskImpl(c context.Context, task Task) (bool, error) |
// If we're requiring completeness, perform a keys-only scan of intermediate |
// storage to ensure that we have all of the records before we bother |
// streaming to storage only to find that we are missing data. |
- if err := a.checkComplete(types.StreamPath(at.Path), types.MessageIndex(tidx)); err != nil { |
+ if err := a.checkComplete(config.ProjectName(at.Project), types.StreamPath(at.Path), types.MessageIndex(tidx)); err != nil { |
return false, err |
} |
} |
ar := logdog.ArchiveStreamRequest{ |
- Path: at.Path, |
+ Project: at.Project, |
+ Path: at.Path, |
} |
// Archive to staging. |
@@ -199,7 +215,7 @@ func (a *Archivist) archiveTaskImpl(c context.Context, task Task) (bool, error) |
// |
// We will handle error creating the plan and executing the plan in the same |
// switch statement below. |
- staged, err := a.makeStagedArchival(c, types.StreamPath(at.Path), ls, task.UniqueID()) |
+ staged, err := a.makeStagedArchival(c, config.ProjectName(at.Project), types.StreamPath(at.Path), ls, task.UniqueID()) |
if err != nil { |
log.WithError(err).Errorf(c, "Failed to create staged archival plan.") |
} else { |
@@ -271,8 +287,9 @@ func (a *Archivist) archiveTaskImpl(c context.Context, task Task) (bool, error) |
// checkComplete performs a quick scan of intermediate storage to ensure that |
// all of the log stream's records are available. |
-func (a *Archivist) checkComplete(path types.StreamPath, tidx types.MessageIndex) error { |
+func (a *Archivist) checkComplete(project config.ProjectName, path types.StreamPath, tidx types.MessageIndex) error { |
sreq := storage.GetRequest{ |
+ Project: project, |
Path: path, |
KeysOnly: true, |
} |
@@ -303,10 +320,11 @@ func (a *Archivist) checkComplete(path types.StreamPath, tidx types.MessageIndex |
return nil |
} |
-func (a *Archivist) makeStagedArchival(c context.Context, path types.StreamPath, ls *logdog.LoadStreamResponse, uid string) ( |
- *stagedArchival, error) { |
+func (a *Archivist) makeStagedArchival(c context.Context, project config.ProjectName, path types.StreamPath, |
+ ls *logdog.LoadStreamResponse, uid string) (*stagedArchival, error) { |
sa := stagedArchival{ |
Archivist: a, |
+ project: project, |
path: path, |
terminalIndex: ls.State.TerminalIndex, |
@@ -328,27 +346,33 @@ func (a *Archivist) makeStagedArchival(c context.Context, path types.StreamPath, |
} |
// Construct our staged archival paths. |
- sa.stream = a.makeStagingPaths(path, "logstream.entries", uid) |
- sa.index = a.makeStagingPaths(path, "logstream.index", uid) |
- sa.data = a.makeStagingPaths(path, fmt.Sprintf("data.%s", bext), uid) |
+ sa.stream = a.makeStagingPaths(project, path, "logstream.entries", uid) |
+ sa.index = a.makeStagingPaths(project, path, "logstream.index", uid) |
+ sa.data = a.makeStagingPaths(project, path, fmt.Sprintf("data.%s", bext), uid) |
return &sa, nil |
} |
// makeStagingPaths returns a stagingPaths instance for the given path and |
// file name. It incorporates a unique ID into the staging name to differentiate |
// it from other staging paths for the same path/name. |
-func (a *Archivist) makeStagingPaths(path types.StreamPath, name, uid string) stagingPaths { |
+func (a *Archivist) makeStagingPaths(project config.ProjectName, path types.StreamPath, name, uid string) stagingPaths { |
+ // TODO(dnj): This won't be necessary when empty project is invalid. |
+ if project == "" { |
+ project = "_" |
+ } |
+ |
return stagingPaths{ |
- staged: a.GSStagingBase.Concat(string(path), uid, name), |
- final: a.GSBase.Concat(string(path), name), |
+ staged: a.GSStagingBase.Concat(string(project), string(path), uid, name), |
+ final: a.GSBase.Concat(string(project), string(path), name), |
} |
} |
type stagedArchival struct { |
*Archivist |
- path types.StreamPath |
- desc logpb.LogStreamDescriptor |
+ project config.ProjectName |
+ path types.StreamPath |
+ desc logpb.LogStreamDescriptor |
stream stagingPaths |
index stagingPaths |
@@ -434,6 +458,7 @@ func (sa *stagedArchival) stage(c context.Context) (err error) { |
ss := storageSource{ |
Context: c, |
st: sa.Storage, |
+ project: sa.project, |
path: sa.path, |
terminalIndex: types.MessageIndex(sa.terminalIndex), |
lastIndex: -1, |