| Index: appengine/logdog/coordinator/endpoints/services/registerStream.go
|
| diff --git a/appengine/logdog/coordinator/endpoints/services/registerStream.go b/appengine/logdog/coordinator/endpoints/services/registerStream.go
|
| index 4256e861b43aa8e69274d1c5069722184511becc..bb32f5d992f48fba4aedf73d705e52f1621b3fbf 100644
|
| --- a/appengine/logdog/coordinator/endpoints/services/registerStream.go
|
| +++ b/appengine/logdog/coordinator/endpoints/services/registerStream.go
|
| @@ -7,6 +7,7 @@ package services
|
| import (
|
| "crypto/subtle"
|
| "errors"
|
| + "time"
|
|
|
| ds "github.com/luci/gae/service/datastore"
|
| "github.com/luci/luci-go/appengine/logdog/coordinator"
|
| @@ -97,6 +98,21 @@ func (s *server) RegisterStream(c context.Context, req *logdog.RegisterStreamReq
|
| req.Desc.Name, name)
|
| }
|
|
|
| + // Load our config and archive expiration.
|
| + _, cfg, err := coordinator.GetServices(c).Config(c)
|
| + if err != nil {
|
| + log.WithError(err).Errorf(c, "Failed to load configuration.")
|
| + return nil, grpcutil.Internal
|
| + }
|
| +
|
| + archiveDelayMax := cfg.Coordinator.ArchiveDelayMax.Duration()
|
| + if archiveDelayMax < 0 {
|
| + log.Fields{
|
| + "archiveDelayMax": archiveDelayMax,
|
| + }.Errorf(c, "Must have positive maximum archive delay.")
|
| + return nil, grpcutil.Internal
|
| + }
|
| +
|
| // Already registered? (Non-transactional).
|
| ls := coordinator.LogStreamFromPath(path)
|
| switch err := ds.Get(c).Get(ls); err {
|
| @@ -109,7 +125,12 @@ func (s *server) RegisterStream(c context.Context, req *logdog.RegisterStreamReq
|
|
|
| case ds.ErrNoSuchEntity:
|
| // The registration is valid, so retain it.
|
| - if err := tumble.RunMutation(c, ®isterStreamMutation{ls, req}); err != nil {
|
| + err = tumble.RunMutation(c, ®isterStreamMutation{
|
| + LogStream: ls,
|
| + req: req,
|
| + archiveDelay: archiveDelayMax,
|
| + })
|
| + if err != nil {
|
| log.Fields{
|
| log.ErrorKey: err,
|
| }.Errorf(c, "Failed to register LogStream.")
|
| @@ -141,7 +162,8 @@ func filterError(err error) error {
|
| type registerStreamMutation struct {
|
| *coordinator.LogStream
|
|
|
| - req *logdog.RegisterStreamRequest
|
| + req *logdog.RegisterStreamRequest
|
| + archiveDelay time.Duration
|
| }
|
|
|
| func (m registerStreamMutation) RollForward(c context.Context) ([]tumble.Mutation, error) {
|
| @@ -189,6 +211,22 @@ func (m registerStreamMutation) RollForward(c context.Context) ([]tumble.Mutatio
|
| return nil, grpcutil.Internal
|
| }
|
|
|
| + // Add a named delayed mutation to archive this stream if it's not archived
|
| + // yet. We will cancel this in terminateStream once we dispatch an immediate
|
| + // archival task.
|
| + archiveExpiredMutation := mutations.CreateArchiveTask{
|
| + Path: m.Path(),
|
| + Expiration: clock.Now(c).Add(m.archiveDelay),
|
| + }
|
| + aeParent, aeName := archiveExpiredMutation.TaskName(di)
|
| + err := tumble.PutNamedMutations(c, aeParent, map[string]tumble.Mutation{
|
| + aeName: &archiveExpiredMutation,
|
| + })
|
| + if err != nil {
|
| + log.WithError(err).Errorf(c, "Failed to load named mutations.")
|
| + return nil, grpcutil.Internal
|
| + }
|
| +
|
| return []tumble.Mutation{
|
| &mutations.PutHierarchyMutation{
|
| Path: m.Path(),
|
|
|