Chromium Code Reviews| Index: server/internal/logdog/archivist/archivist.go |
| diff --git a/server/internal/logdog/archivist/archivist.go b/server/internal/logdog/archivist/archivist.go |
| index 33e837df9b4862dfbc92e3b4e8660ae7663a2e98..bb20602b0413f001051c19135f29ec7be10c2e89 100644 |
| --- a/server/internal/logdog/archivist/archivist.go |
| +++ b/server/internal/logdog/archivist/archivist.go |
| @@ -12,6 +12,7 @@ import ( |
| "github.com/golang/protobuf/proto" |
| "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
| + "github.com/luci/luci-go/common/config" |
| "github.com/luci/luci-go/common/errors" |
| "github.com/luci/luci-go/common/gcloud/gs" |
| "github.com/luci/luci-go/common/logdog/types" |
| @@ -86,6 +87,7 @@ func (a *Archivist) ArchiveTask(c context.Context, task Task) bool { |
| log.Fields{ |
| log.ErrorKey: err, |
| "delete": delete, |
| + "project": task.Task().Project, |
| "path": task.Task().Path, |
| }.Infof(c, "Finished archive task.") |
| return delete |
| @@ -97,14 +99,27 @@ func (a *Archivist) ArchiveTask(c context.Context, task Task) bool { |
| func (a *Archivist) archiveTaskImpl(c context.Context, task Task) (bool, error) { |
| at := task.Task() |
| log.Fields{ |
| - "path": at.Path, |
| + "project": at.Project, |
| + "path": at.Path, |
| }.Debugf(c, "Received archival task.") |
| + if err := types.StreamPath(at.Path).Validate(); err != nil { |
| + return true, fmt.Errorf("invalid path %q: %s", at.Path, err) |
| + } |
| + |
| + // TODO(dnj): Remove empty project exemption, make empty project invalid. |
| + if at.Project != "" { |
| + if err := config.ProjectName(at.Project).Validate(); err != nil { |
| + return true, fmt.Errorf("invalid project name %q: %s", at.Project, err) |
| + } |
| + } |
| + |
| // Load the log stream's current state. If it is already archived, we will |
| // return an immediate success. |
| ls, err := a.Service.LoadStream(c, &logdog.LoadStreamRequest{ |
| - Path: at.Path, |
| - Desc: true, |
| + Project: at.Project, |
| + Path: at.Path, |
| + Desc: true, |
| }) |
| switch { |
| case err != nil: |
| @@ -183,13 +198,14 @@ func (a *Archivist) archiveTaskImpl(c context.Context, task Task) (bool, error) |
| // If we're requiring completeness, perform a keys-only scan of intermediate |
| // storage to ensure that we have all of the records before we bother |
| // streaming to storage only to find that we are missing data. |
| - if err := a.checkComplete(types.StreamPath(at.Path), types.MessageIndex(tidx)); err != nil { |
| + if err := a.checkComplete(config.ProjectName(at.Project), types.StreamPath(at.Path), types.MessageIndex(tidx)); err != nil { |
| return false, err |
| } |
| } |
| ar := logdog.ArchiveStreamRequest{ |
| - Path: at.Path, |
| + Project: at.Project, |
| + Path: at.Path, |
| } |
| // Archive to staging. |
| @@ -199,7 +215,7 @@ func (a *Archivist) archiveTaskImpl(c context.Context, task Task) (bool, error) |
| // |
| // We will handle error creating the plan and executing the plan in the same |
| // switch statement below. |
| - staged, err := a.makeStagedArchival(c, types.StreamPath(at.Path), ls, task.UniqueID()) |
| + staged, err := a.makeStagedArchival(c, config.ProjectName(at.Project), types.StreamPath(at.Path), ls, task.UniqueID()) |
| if err != nil { |
| log.WithError(err).Errorf(c, "Failed to create staged archival plan.") |
| } else { |
| @@ -271,8 +287,9 @@ func (a *Archivist) archiveTaskImpl(c context.Context, task Task) (bool, error) |
| // checkComplete performs a quick scan of intermediate storage to ensure that |
| // all of the log stream's records are available. |
| -func (a *Archivist) checkComplete(path types.StreamPath, tidx types.MessageIndex) error { |
| +func (a *Archivist) checkComplete(project config.ProjectName, path types.StreamPath, tidx types.MessageIndex) error { |
| sreq := storage.GetRequest{ |
| + Project: project, |
| Path: path, |
| KeysOnly: true, |
| } |
| @@ -303,10 +320,11 @@ func (a *Archivist) checkComplete(path types.StreamPath, tidx types.MessageIndex |
| return nil |
| } |
| -func (a *Archivist) makeStagedArchival(c context.Context, path types.StreamPath, ls *logdog.LoadStreamResponse, uid string) ( |
| - *stagedArchival, error) { |
| +func (a *Archivist) makeStagedArchival(c context.Context, project config.ProjectName, path types.StreamPath, |
| + ls *logdog.LoadStreamResponse, uid string) (*stagedArchival, error) { |
| sa := stagedArchival{ |
| Archivist: a, |
| + project: project, |
| path: path, |
| terminalIndex: ls.State.TerminalIndex, |
| @@ -328,27 +346,33 @@ func (a *Archivist) makeStagedArchival(c context.Context, path types.StreamPath, |
| } |
| // Construct our staged archival paths. |
| - sa.stream = a.makeStagingPaths(path, "logstream.entries", uid) |
| - sa.index = a.makeStagingPaths(path, "logstream.index", uid) |
| - sa.data = a.makeStagingPaths(path, fmt.Sprintf("data.%s", bext), uid) |
| + sa.stream = a.makeStagingPaths(project, path, "logstream.entries", uid) |
| + sa.index = a.makeStagingPaths(project, path, "logstream.index", uid) |
| + sa.data = a.makeStagingPaths(project, path, fmt.Sprintf("data.%s", bext), uid) |
| return &sa, nil |
| } |
| // makeStagingPaths returns a stagingPaths instance for the given path and |
| // file name. It incorporates a unique ID into the staging name to differentiate |
| // it from other staging paths for the same path/name. |
| -func (a *Archivist) makeStagingPaths(path types.StreamPath, name, uid string) stagingPaths { |
| +func (a *Archivist) makeStagingPaths(project config.ProjectName, path types.StreamPath, name, uid string) stagingPaths { |
| + // TODO(dnj): This won't be necessary when empty project is invalid. |
| + if project == "" { |
| + project = "_" |
| + } |
|
Ryan Tseng
2016/04/28 19:57:34
elif project == "_", freak out? Or is that allowe
dnj
2016/04/30 03:03:13
"_" is not a valid project name character :D
|
| + |
| return stagingPaths{ |
| - staged: a.GSStagingBase.Concat(string(path), uid, name), |
| - final: a.GSBase.Concat(string(path), name), |
| + staged: a.GSStagingBase.Concat(string(project), string(path), uid, name), |
| + final: a.GSBase.Concat(string(project), string(path), name), |
| } |
| } |
| type stagedArchival struct { |
| *Archivist |
| - path types.StreamPath |
| - desc logpb.LogStreamDescriptor |
| + project config.ProjectName |
| + path types.StreamPath |
| + desc logpb.LogStreamDescriptor |
| stream stagingPaths |
| index stagingPaths |
| @@ -434,6 +458,7 @@ func (sa *stagedArchival) stage(c context.Context) (err error) { |
| ss := storageSource{ |
| Context: c, |
| st: sa.Storage, |
| + project: sa.project, |
| path: sa.path, |
| terminalIndex: types.MessageIndex(sa.terminalIndex), |
| lastIndex: -1, |