Chromium Code Reviews| Index: appengine/logdog/coordinator/endpoints/services/registerStream.go |
| diff --git a/appengine/logdog/coordinator/endpoints/services/registerStream.go b/appengine/logdog/coordinator/endpoints/services/registerStream.go |
| index 4256e861b43aa8e69274d1c5069722184511becc..bb32f5d992f48fba4aedf73d705e52f1621b3fbf 100644 |
| --- a/appengine/logdog/coordinator/endpoints/services/registerStream.go |
| +++ b/appengine/logdog/coordinator/endpoints/services/registerStream.go |
| @@ -7,6 +7,7 @@ package services |
| import ( |
| "crypto/subtle" |
| "errors" |
| + "time" |
| ds "github.com/luci/gae/service/datastore" |
| "github.com/luci/luci-go/appengine/logdog/coordinator" |
| @@ -97,6 +98,21 @@ func (s *server) RegisterStream(c context.Context, req *logdog.RegisterStreamReq |
| req.Desc.Name, name) |
| } |
| + // Load our config and archive expiration. |
| + _, cfg, err := coordinator.GetServices(c).Config(c) |
| + if err != nil { |
| + log.WithError(err).Errorf(c, "Failed to load configuration.") |
| + return nil, grpcutil.Internal |
| + } |
| + |
| + archiveDelayMax := cfg.Coordinator.ArchiveDelayMax.Duration() |
| + if archiveDelayMax < 0 { |
| + log.Fields{ |
| + "archiveDelayMax": archiveDelayMax, |
| + }.Errorf(c, "Must have positive maximum archive delay.") |
| + return nil, grpcutil.Internal |
| + } |
| + |
| // Already registered? (Non-transactional). |
| ls := coordinator.LogStreamFromPath(path) |
| switch err := ds.Get(c).Get(ls); err { |
| @@ -109,7 +125,12 @@ func (s *server) RegisterStream(c context.Context, req *logdog.RegisterStreamReq |
| case ds.ErrNoSuchEntity: |
| // The registration is valid, so retain it. |
| - if err := tumble.RunMutation(c, ®isterStreamMutation{ls, req}); err != nil { |
| + err = tumble.RunMutation(c, ®isterStreamMutation{ |
| + LogStream: ls, |
| + req: req, |
| + archiveDelay: archiveDelayMax, |
| + }) |
| + if err != nil { |
| log.Fields{ |
| log.ErrorKey: err, |
| }.Errorf(c, "Failed to register LogStream.") |
| @@ -141,7 +162,8 @@ func filterError(err error) error { |
| type registerStreamMutation struct { |
| *coordinator.LogStream |
| - req *logdog.RegisterStreamRequest |
| + req *logdog.RegisterStreamRequest |
| + archiveDelay time.Duration |
| } |
| func (m registerStreamMutation) RollForward(c context.Context) ([]tumble.Mutation, error) { |
| @@ -189,6 +211,22 @@ func (m registerStreamMutation) RollForward(c context.Context) ([]tumble.Mutatio |
| return nil, grpcutil.Internal |
| } |
| + // Add a named delayed mutation to archive this stream if it's not archived |
| + // yet. We will cancel this in terminateStream once we dispatch an immediate |
| + // archival task. |
| + archiveExpiredMutation := mutations.CreateArchiveTask{ |
| + Path: m.Path(), |
| + Expiration: clock.Now(c).Add(m.archiveDelay), |
| + } |
| + aeParent, aeName := archiveExpiredMutation.TaskName(di) |
|
iannucci
2016/04/29 20:09:42
why not just .SetArchiveTaskDeadline(c, m)? In dm
dnj
2016/04/29 23:04:21
Discussed, this interface in general needs a face
|
| + err := tumble.PutNamedMutations(c, aeParent, map[string]tumble.Mutation{ |
| + aeName: &archiveExpiredMutation, |
| + }) |
| + if err != nil { |
| + log.WithError(err).Errorf(c, "Failed to load named mutations.") |
| + return nil, grpcutil.Internal |
| + } |
| + |
| return []tumble.Mutation{ |
| &mutations.PutHierarchyMutation{ |
| Path: m.Path(), |