| Index: server/internal/logdog/archivist/archivist.go
|
| diff --git a/server/internal/logdog/archivist/archivist.go b/server/internal/logdog/archivist/archivist.go
|
| index 33e837df9b4862dfbc92e3b4e8660ae7663a2e98..bb20602b0413f001051c19135f29ec7be10c2e89 100644
|
| --- a/server/internal/logdog/archivist/archivist.go
|
| +++ b/server/internal/logdog/archivist/archivist.go
|
| @@ -12,6 +12,7 @@ import (
|
|
|
| "github.com/golang/protobuf/proto"
|
| "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
|
| + "github.com/luci/luci-go/common/config"
|
| "github.com/luci/luci-go/common/errors"
|
| "github.com/luci/luci-go/common/gcloud/gs"
|
| "github.com/luci/luci-go/common/logdog/types"
|
| @@ -86,6 +87,7 @@ func (a *Archivist) ArchiveTask(c context.Context, task Task) bool {
|
| log.Fields{
|
| log.ErrorKey: err,
|
| "delete": delete,
|
| + "project": task.Task().Project,
|
| "path": task.Task().Path,
|
| }.Infof(c, "Finished archive task.")
|
| return delete
|
| @@ -97,14 +99,27 @@ func (a *Archivist) ArchiveTask(c context.Context, task Task) bool {
|
| func (a *Archivist) archiveTaskImpl(c context.Context, task Task) (bool, error) {
|
| at := task.Task()
|
| log.Fields{
|
| - "path": at.Path,
|
| + "project": at.Project,
|
| + "path": at.Path,
|
| }.Debugf(c, "Received archival task.")
|
|
|
| + if err := types.StreamPath(at.Path).Validate(); err != nil {
|
| + return true, fmt.Errorf("invalid path %q: %s", at.Path, err)
|
| + }
|
| +
|
| + // TODO(dnj): Remove empty project exemption, make empty project invalid.
|
| + if at.Project != "" {
|
| + if err := config.ProjectName(at.Project).Validate(); err != nil {
|
| + return true, fmt.Errorf("invalid project name %q: %s", at.Project, err)
|
| + }
|
| + }
|
| +
|
| // Load the log stream's current state. If it is already archived, we will
|
| // return an immediate success.
|
| ls, err := a.Service.LoadStream(c, &logdog.LoadStreamRequest{
|
| - Path: at.Path,
|
| - Desc: true,
|
| + Project: at.Project,
|
| + Path: at.Path,
|
| + Desc: true,
|
| })
|
| switch {
|
| case err != nil:
|
| @@ -183,13 +198,14 @@ func (a *Archivist) archiveTaskImpl(c context.Context, task Task) (bool, error)
|
| // If we're requiring completeness, perform a keys-only scan of intermediate
|
| // storage to ensure that we have all of the records before we bother
|
| // streaming to storage only to find that we are missing data.
|
| - if err := a.checkComplete(types.StreamPath(at.Path), types.MessageIndex(tidx)); err != nil {
|
| + if err := a.checkComplete(config.ProjectName(at.Project), types.StreamPath(at.Path), types.MessageIndex(tidx)); err != nil {
|
| return false, err
|
| }
|
| }
|
|
|
| ar := logdog.ArchiveStreamRequest{
|
| - Path: at.Path,
|
| + Project: at.Project,
|
| + Path: at.Path,
|
| }
|
|
|
| // Archive to staging.
|
| @@ -199,7 +215,7 @@ func (a *Archivist) archiveTaskImpl(c context.Context, task Task) (bool, error)
|
| //
|
| // We will handle error creating the plan and executing the plan in the same
|
| // switch statement below.
|
| - staged, err := a.makeStagedArchival(c, types.StreamPath(at.Path), ls, task.UniqueID())
|
| + staged, err := a.makeStagedArchival(c, config.ProjectName(at.Project), types.StreamPath(at.Path), ls, task.UniqueID())
|
| if err != nil {
|
| log.WithError(err).Errorf(c, "Failed to create staged archival plan.")
|
| } else {
|
| @@ -271,8 +287,9 @@ func (a *Archivist) archiveTaskImpl(c context.Context, task Task) (bool, error)
|
|
|
| // checkComplete performs a quick scan of intermediate storage to ensure that
|
| // all of the log stream's records are available.
|
| -func (a *Archivist) checkComplete(path types.StreamPath, tidx types.MessageIndex) error {
|
| +func (a *Archivist) checkComplete(project config.ProjectName, path types.StreamPath, tidx types.MessageIndex) error {
|
| sreq := storage.GetRequest{
|
| + Project: project,
|
| Path: path,
|
| KeysOnly: true,
|
| }
|
| @@ -303,10 +320,11 @@ func (a *Archivist) checkComplete(path types.StreamPath, tidx types.MessageIndex
|
| return nil
|
| }
|
|
|
| -func (a *Archivist) makeStagedArchival(c context.Context, path types.StreamPath, ls *logdog.LoadStreamResponse, uid string) (
|
| - *stagedArchival, error) {
|
| +func (a *Archivist) makeStagedArchival(c context.Context, project config.ProjectName, path types.StreamPath,
|
| + ls *logdog.LoadStreamResponse, uid string) (*stagedArchival, error) {
|
| sa := stagedArchival{
|
| Archivist: a,
|
| + project: project,
|
| path: path,
|
|
|
| terminalIndex: ls.State.TerminalIndex,
|
| @@ -328,27 +346,33 @@ func (a *Archivist) makeStagedArchival(c context.Context, path types.StreamPath,
|
| }
|
|
|
| // Construct our staged archival paths.
|
| - sa.stream = a.makeStagingPaths(path, "logstream.entries", uid)
|
| - sa.index = a.makeStagingPaths(path, "logstream.index", uid)
|
| - sa.data = a.makeStagingPaths(path, fmt.Sprintf("data.%s", bext), uid)
|
| + sa.stream = a.makeStagingPaths(project, path, "logstream.entries", uid)
|
| + sa.index = a.makeStagingPaths(project, path, "logstream.index", uid)
|
| + sa.data = a.makeStagingPaths(project, path, fmt.Sprintf("data.%s", bext), uid)
|
| return &sa, nil
|
| }
|
|
|
| // makeStagingPaths returns a stagingPaths instance for the given path and
|
| // file name. It incorporates a unique ID into the staging name to differentiate
|
| // it from other staging paths for the same path/name.
|
| -func (a *Archivist) makeStagingPaths(path types.StreamPath, name, uid string) stagingPaths {
|
| +func (a *Archivist) makeStagingPaths(project config.ProjectName, path types.StreamPath, name, uid string) stagingPaths {
|
| + // TODO(dnj): This won't be necessary when empty project is invalid.
|
| + if project == "" {
|
| + project = "_"
|
| + }
|
| +
|
| return stagingPaths{
|
| - staged: a.GSStagingBase.Concat(string(path), uid, name),
|
| - final: a.GSBase.Concat(string(path), name),
|
| + staged: a.GSStagingBase.Concat(string(project), string(path), uid, name),
|
| + final: a.GSBase.Concat(string(project), string(path), name),
|
| }
|
| }
|
|
|
| type stagedArchival struct {
|
| *Archivist
|
|
|
| - path types.StreamPath
|
| - desc logpb.LogStreamDescriptor
|
| + project config.ProjectName
|
| + path types.StreamPath
|
| + desc logpb.LogStreamDescriptor
|
|
|
| stream stagingPaths
|
| index stagingPaths
|
| @@ -434,6 +458,7 @@ func (sa *stagedArchival) stage(c context.Context) (err error) {
|
| ss := storageSource{
|
| Context: c,
|
| st: sa.Storage,
|
| + project: sa.project,
|
| path: sa.path,
|
| terminalIndex: types.MessageIndex(sa.terminalIndex),
|
| lastIndex: -1,
|
|
|