Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(59)

Unified Diff: server/internal/logdog/archivist/archivist.go

Issue 1909053003: LogDog: Add project namespacing to Archivist. (Closed) Base URL: https://github.com/luci/luci-go@logdog-project-coordinator-logs
Patch Set: Rebase? Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | server/internal/logdog/archivist/archivist_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: server/internal/logdog/archivist/archivist.go
diff --git a/server/internal/logdog/archivist/archivist.go b/server/internal/logdog/archivist/archivist.go
index 33e837df9b4862dfbc92e3b4e8660ae7663a2e98..bb20602b0413f001051c19135f29ec7be10c2e89 100644
--- a/server/internal/logdog/archivist/archivist.go
+++ b/server/internal/logdog/archivist/archivist.go
@@ -12,6 +12,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
+ "github.com/luci/luci-go/common/config"
"github.com/luci/luci-go/common/errors"
"github.com/luci/luci-go/common/gcloud/gs"
"github.com/luci/luci-go/common/logdog/types"
@@ -86,6 +87,7 @@ func (a *Archivist) ArchiveTask(c context.Context, task Task) bool {
log.Fields{
log.ErrorKey: err,
"delete": delete,
+ "project": task.Task().Project,
"path": task.Task().Path,
}.Infof(c, "Finished archive task.")
return delete
@@ -97,14 +99,27 @@ func (a *Archivist) ArchiveTask(c context.Context, task Task) bool {
func (a *Archivist) archiveTaskImpl(c context.Context, task Task) (bool, error) {
at := task.Task()
log.Fields{
- "path": at.Path,
+ "project": at.Project,
+ "path": at.Path,
}.Debugf(c, "Received archival task.")
+ if err := types.StreamPath(at.Path).Validate(); err != nil {
+ return true, fmt.Errorf("invalid path %q: %s", at.Path, err)
+ }
+
+ // TODO(dnj): Remove empty project exemption, make empty project invalid.
+ if at.Project != "" {
+ if err := config.ProjectName(at.Project).Validate(); err != nil {
+ return true, fmt.Errorf("invalid project name %q: %s", at.Project, err)
+ }
+ }
+
// Load the log stream's current state. If it is already archived, we will
// return an immediate success.
ls, err := a.Service.LoadStream(c, &logdog.LoadStreamRequest{
- Path: at.Path,
- Desc: true,
+ Project: at.Project,
+ Path: at.Path,
+ Desc: true,
})
switch {
case err != nil:
@@ -183,13 +198,14 @@ func (a *Archivist) archiveTaskImpl(c context.Context, task Task) (bool, error)
// If we're requiring completeness, perform a keys-only scan of intermediate
// storage to ensure that we have all of the records before we bother
// streaming to storage only to find that we are missing data.
- if err := a.checkComplete(types.StreamPath(at.Path), types.MessageIndex(tidx)); err != nil {
+ if err := a.checkComplete(config.ProjectName(at.Project), types.StreamPath(at.Path), types.MessageIndex(tidx)); err != nil {
return false, err
}
}
ar := logdog.ArchiveStreamRequest{
- Path: at.Path,
+ Project: at.Project,
+ Path: at.Path,
}
// Archive to staging.
@@ -199,7 +215,7 @@ func (a *Archivist) archiveTaskImpl(c context.Context, task Task) (bool, error)
//
// We will handle error creating the plan and executing the plan in the same
// switch statement below.
- staged, err := a.makeStagedArchival(c, types.StreamPath(at.Path), ls, task.UniqueID())
+ staged, err := a.makeStagedArchival(c, config.ProjectName(at.Project), types.StreamPath(at.Path), ls, task.UniqueID())
if err != nil {
log.WithError(err).Errorf(c, "Failed to create staged archival plan.")
} else {
@@ -271,8 +287,9 @@ func (a *Archivist) archiveTaskImpl(c context.Context, task Task) (bool, error)
// checkComplete performs a quick scan of intermediate storage to ensure that
// all of the log stream's records are available.
-func (a *Archivist) checkComplete(path types.StreamPath, tidx types.MessageIndex) error {
+func (a *Archivist) checkComplete(project config.ProjectName, path types.StreamPath, tidx types.MessageIndex) error {
sreq := storage.GetRequest{
+ Project: project,
Path: path,
KeysOnly: true,
}
@@ -303,10 +320,11 @@ func (a *Archivist) checkComplete(path types.StreamPath, tidx types.MessageIndex
return nil
}
-func (a *Archivist) makeStagedArchival(c context.Context, path types.StreamPath, ls *logdog.LoadStreamResponse, uid string) (
- *stagedArchival, error) {
+func (a *Archivist) makeStagedArchival(c context.Context, project config.ProjectName, path types.StreamPath,
+ ls *logdog.LoadStreamResponse, uid string) (*stagedArchival, error) {
sa := stagedArchival{
Archivist: a,
+ project: project,
path: path,
terminalIndex: ls.State.TerminalIndex,
@@ -328,27 +346,33 @@ func (a *Archivist) makeStagedArchival(c context.Context, path types.StreamPath,
}
// Construct our staged archival paths.
- sa.stream = a.makeStagingPaths(path, "logstream.entries", uid)
- sa.index = a.makeStagingPaths(path, "logstream.index", uid)
- sa.data = a.makeStagingPaths(path, fmt.Sprintf("data.%s", bext), uid)
+ sa.stream = a.makeStagingPaths(project, path, "logstream.entries", uid)
+ sa.index = a.makeStagingPaths(project, path, "logstream.index", uid)
+ sa.data = a.makeStagingPaths(project, path, fmt.Sprintf("data.%s", bext), uid)
return &sa, nil
}
// makeStagingPaths returns a stagingPaths instance for the given path and
// file name. It incorporates a unique ID into the staging name to differentiate
// it from other staging paths for the same path/name.
-func (a *Archivist) makeStagingPaths(path types.StreamPath, name, uid string) stagingPaths {
+func (a *Archivist) makeStagingPaths(project config.ProjectName, path types.StreamPath, name, uid string) stagingPaths {
+ // TODO(dnj): This won't be necessary when empty project is invalid.
+ if project == "" {
+ project = "_"
+ }
+
return stagingPaths{
- staged: a.GSStagingBase.Concat(string(path), uid, name),
- final: a.GSBase.Concat(string(path), name),
+ staged: a.GSStagingBase.Concat(string(project), string(path), uid, name),
+ final: a.GSBase.Concat(string(project), string(path), name),
}
}
type stagedArchival struct {
*Archivist
- path types.StreamPath
- desc logpb.LogStreamDescriptor
+ project config.ProjectName
+ path types.StreamPath
+ desc logpb.LogStreamDescriptor
stream stagingPaths
index stagingPaths
@@ -434,6 +458,7 @@ func (sa *stagedArchival) stage(c context.Context) (err error) {
ss := storageSource{
Context: c,
st: sa.Storage,
+ project: sa.project,
path: sa.path,
terminalIndex: types.MessageIndex(sa.terminalIndex),
lastIndex: -1,
« no previous file with comments | « no previous file | server/internal/logdog/archivist/archivist_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698