Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2631)

Unified Diff: appengine/logdog/coordinator/endpoints/services/registerStream.go

Issue 1910633006: LogDog: Support per-namespace expired archival. (Closed) Base URL: https://github.com/luci/luci-go@logdog-coordinator-svcdec
Patch Set: Switch to Tumble delayed mutations. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/logdog/coordinator/endpoints/services/registerStream.go
diff --git a/appengine/logdog/coordinator/endpoints/services/registerStream.go b/appengine/logdog/coordinator/endpoints/services/registerStream.go
index 4256e861b43aa8e69274d1c5069722184511becc..bb32f5d992f48fba4aedf73d705e52f1621b3fbf 100644
--- a/appengine/logdog/coordinator/endpoints/services/registerStream.go
+++ b/appengine/logdog/coordinator/endpoints/services/registerStream.go
@@ -7,6 +7,7 @@ package services
import (
"crypto/subtle"
"errors"
+ "time"
ds "github.com/luci/gae/service/datastore"
"github.com/luci/luci-go/appengine/logdog/coordinator"
@@ -97,6 +98,21 @@ func (s *server) RegisterStream(c context.Context, req *logdog.RegisterStreamReq
req.Desc.Name, name)
}
+ // Load our config and archive expiration.
+ _, cfg, err := coordinator.GetServices(c).Config(c)
+ if err != nil {
+ log.WithError(err).Errorf(c, "Failed to load configuration.")
+ return nil, grpcutil.Internal
+ }
+
+ archiveDelayMax := cfg.Coordinator.ArchiveDelayMax.Duration()
+ if archiveDelayMax < 0 {
+ log.Fields{
+ "archiveDelayMax": archiveDelayMax,
+ }.Errorf(c, "Must have positive maximum archive delay.")
+ return nil, grpcutil.Internal
+ }
+
// Already registered? (Non-transactional).
ls := coordinator.LogStreamFromPath(path)
switch err := ds.Get(c).Get(ls); err {
@@ -109,7 +125,12 @@ func (s *server) RegisterStream(c context.Context, req *logdog.RegisterStreamReq
case ds.ErrNoSuchEntity:
// The registration is valid, so retain it.
- if err := tumble.RunMutation(c, &registerStreamMutation{ls, req}); err != nil {
+ err = tumble.RunMutation(c, &registerStreamMutation{
+ LogStream: ls,
+ req: req,
+ archiveDelay: archiveDelayMax,
+ })
+ if err != nil {
log.Fields{
log.ErrorKey: err,
}.Errorf(c, "Failed to register LogStream.")
@@ -141,7 +162,8 @@ func filterError(err error) error {
type registerStreamMutation struct {
*coordinator.LogStream
- req *logdog.RegisterStreamRequest
+ req *logdog.RegisterStreamRequest
+ archiveDelay time.Duration
}
func (m registerStreamMutation) RollForward(c context.Context) ([]tumble.Mutation, error) {
@@ -189,6 +211,22 @@ func (m registerStreamMutation) RollForward(c context.Context) ([]tumble.Mutatio
return nil, grpcutil.Internal
}
+ // Add a named delayed mutation to archive this stream if it's not archived
+ // yet. We will cancel this in terminateStream once we dispatch an immediate
+ // archival task.
+ archiveExpiredMutation := mutations.CreateArchiveTask{
+ Path: m.Path(),
+ Expiration: clock.Now(c).Add(m.archiveDelay),
+ }
+ aeParent, aeName := archiveExpiredMutation.TaskName(di)
iannucci 2016/04/29 20:09:42 why not just .SetArchiveTaskDeadline(c, m)? In dm
dnj 2016/04/29 23:04:21 Discussed, this interface in general needs a face
+ err := tumble.PutNamedMutations(c, aeParent, map[string]tumble.Mutation{
+ aeName: &archiveExpiredMutation,
+ })
+ if err != nil {
+ log.WithError(err).Errorf(c, "Failed to load named mutations.")
+ return nil, grpcutil.Internal
+ }
+
return []tumble.Mutation{
&mutations.PutHierarchyMutation{
Path: m.Path(),

Powered by Google App Engine
This is Rietveld 408576698