| Index: appengine/logdog/coordinator/endpoints/services/terminateStream.go
|
| diff --git a/appengine/logdog/coordinator/endpoints/services/terminateStream.go b/appengine/logdog/coordinator/endpoints/services/terminateStream.go
|
| index 14f016ba564882499822cda8c6e1974118799c75..23bf5f81f22920641baca315ecca3dbbdb07f330 100644
|
| --- a/appengine/logdog/coordinator/endpoints/services/terminateStream.go
|
| +++ b/appengine/logdog/coordinator/endpoints/services/terminateStream.go
|
| @@ -10,6 +10,8 @@ import (
|
| ds "github.com/luci/gae/service/datastore"
|
| "github.com/luci/gae/service/info"
|
| "github.com/luci/luci-go/appengine/logdog/coordinator"
|
| + "github.com/luci/luci-go/appengine/logdog/coordinator/mutations"
|
| + "github.com/luci/luci-go/appengine/tumble"
|
| "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
|
| "github.com/luci/luci-go/common/clock"
|
| "github.com/luci/luci-go/common/grpcutil"
|
| @@ -62,7 +64,9 @@ func (s *server) TerminateStream(c context.Context, req *logdog.TerminateStreamR
|
|
|
| // Transactionally validate and update the terminal index.
|
| err = ds.Get(c).RunInTransaction(func(c context.Context) error {
|
| - if err := ds.Get(c).Get(ls); err != nil {
|
| + di := ds.Get(c)
|
| +
|
| + if err := di.Get(ls); err != nil {
|
| if err == ds.ErrNoSuchEntity {
|
| log.Debugf(c, "LogEntry not found.")
|
| return grpcutil.Errf(codes.NotFound, "Log stream %q is not registered", req.Path)
|
| @@ -109,13 +113,21 @@ func (s *server) TerminateStream(c context.Context, req *logdog.TerminateStreamR
|
| return grpcutil.Internal
|
| }
|
|
|
| - if err := ds.Get(c).Put(ls); err != nil {
|
| + if err := di.Put(ls); err != nil {
|
| log.Fields{
|
| log.ErrorKey: err,
|
| }.Errorf(c, "Failed to Put() LogStream.")
|
| return grpcutil.Internal
|
| }
|
|
|
| + // Delete the archive expiration mutation, since we have just dispatched
|
| + // an archive request.
|
| + aeParent, aeName := (&mutations.CreateArchiveTask{Path: path}).TaskName(di)
|
| + if err := tumble.CancelNamedMutations(c, aeParent, aeName); err != nil {
|
| + log.WithError(err).Errorf(c, "Failed to cancel archive expiration mutation.")
|
| + return grpcutil.Internal
|
| + }
|
| +
|
| log.Fields{
|
| "terminalIndex": ls.TerminalIndex,
|
| }.Infof(c, "Terminal index was set and archival was dispatched.")
|
|
|