Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1692)

Unified Diff: appengine/logdog/coordinator/endpoints/services/terminateStream.go

Issue 1910633006: LogDog: Support per-namespace expired archival. (Closed) Base URL: https://github.com/luci/luci-go@logdog-coordinator-svcdec
Patch Set: Update another test. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/logdog/coordinator/endpoints/services/terminateStream.go
diff --git a/appengine/logdog/coordinator/endpoints/services/terminateStream.go b/appengine/logdog/coordinator/endpoints/services/terminateStream.go
index 14f016ba564882499822cda8c6e1974118799c75..23bf5f81f22920641baca315ecca3dbbdb07f330 100644
--- a/appengine/logdog/coordinator/endpoints/services/terminateStream.go
+++ b/appengine/logdog/coordinator/endpoints/services/terminateStream.go
@@ -10,6 +10,8 @@ import (
ds "github.com/luci/gae/service/datastore"
"github.com/luci/gae/service/info"
"github.com/luci/luci-go/appengine/logdog/coordinator"
+ "github.com/luci/luci-go/appengine/logdog/coordinator/mutations"
+ "github.com/luci/luci-go/appengine/tumble"
"github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
"github.com/luci/luci-go/common/clock"
"github.com/luci/luci-go/common/grpcutil"
@@ -62,7 +64,9 @@ func (s *server) TerminateStream(c context.Context, req *logdog.TerminateStreamR
// Transactionally validate and update the terminal index.
err = ds.Get(c).RunInTransaction(func(c context.Context) error {
- if err := ds.Get(c).Get(ls); err != nil {
+ di := ds.Get(c)
+
+ if err := di.Get(ls); err != nil {
if err == ds.ErrNoSuchEntity {
log.Debugf(c, "LogEntry not found.")
return grpcutil.Errf(codes.NotFound, "Log stream %q is not registered", req.Path)
@@ -109,13 +113,21 @@ func (s *server) TerminateStream(c context.Context, req *logdog.TerminateStreamR
return grpcutil.Internal
}
- if err := ds.Get(c).Put(ls); err != nil {
+ if err := di.Put(ls); err != nil {
log.Fields{
log.ErrorKey: err,
}.Errorf(c, "Failed to Put() LogStream.")
return grpcutil.Internal
}
+ // Delete the archive expiration mutation, since we have just dispatched
+ // an archive request.
+ aeParent, aeName := (&mutations.CreateArchiveTask{Path: path}).TaskName(di)
+ if err := tumble.CancelNamedMutations(c, aeParent, aeName); err != nil {
+ log.WithError(err).Errorf(c, "Failed to cancel archive expiration mutation.")
+ return grpcutil.Internal
+ }
+
log.Fields{
"terminalIndex": ls.TerminalIndex,
}.Infof(c, "Terminal index was set and archival was dispatched.")

Powered by Google App Engine
This is Rietveld 408576698