Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(218)

Unified Diff: logdog/appengine/coordinator/endpoints/services/terminateStream.go

Issue 2989333002: [logdog] Replace Tumble with push queues. (Closed)
Patch Set: comments Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: logdog/appengine/coordinator/endpoints/services/terminateStream.go
diff --git a/logdog/appengine/coordinator/endpoints/services/terminateStream.go b/logdog/appengine/coordinator/endpoints/services/terminateStream.go
index aea7a9acb9ec4bf7a1da3f2b142897c87913de82..c7cfec98f22b412e19963b8822c89cb625a67c5a 100644
--- a/logdog/appengine/coordinator/endpoints/services/terminateStream.go
+++ b/logdog/appengine/coordinator/endpoints/services/terminateStream.go
@@ -18,7 +18,9 @@ import (
"crypto/subtle"
"github.com/golang/protobuf/ptypes/empty"
+
ds "github.com/luci/gae/service/datastore"
+
"github.com/luci/luci-go/common/clock"
log "github.com/luci/luci-go/common/logging"
"github.com/luci/luci-go/common/proto/google"
@@ -29,7 +31,9 @@ import (
"github.com/luci/luci-go/logdog/appengine/coordinator/config"
"github.com/luci/luci-go/logdog/appengine/coordinator/endpoints"
"github.com/luci/luci-go/logdog/appengine/coordinator/mutations"
+ "github.com/luci/luci-go/logdog/appengine/coordinator/tasks"
"github.com/luci/luci-go/tumble"
+
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
)
@@ -116,31 +120,35 @@ func (s *server) TerminateStream(c context.Context, req *logdog.TerminateStreamR
return grpcutil.Internal
}
- // Replace the pessimistic archive expiration mutation scheduled in
- // RegisterStream with an optimistic archival mutation.
- cat := mutations.CreateArchiveTask{
- ID: id,
+ // Replace the pessimistic archive expiration task scheduled in
+ // RegisterStream with an optimistic archival task.
+ if err := tasks.CreateArchivalTask(c, id, logdog.ArchiveDispatchTask_TERMINATED,
+ params.SettleDelay, params); err != nil {
- // Optimistic parameters.
- SettleDelay: params.SettleDelay,
- CompletePeriod: params.CompletePeriod,
+ log.WithError(err).Errorf(c, "Failed to create terminated archival task.")
+ return grpcutil.Internal
+ }
- // Schedule this mutation to execute after our settle delay.
- Expiration: now.Add(params.SettleDelay),
+ if err := tasks.DeleteArchiveStreamExpiredTask(c, id); err != nil {
+ // If we can't delete this task, it will just run, notice that the
+ // stream is archived, and quit. No big deal.
+ log.WithError(err).Warningf(c, "(Non-fatal) Failed to delete expired archival task.")
}
- aeParent, aeName := ds.KeyForObj(c, lst), cat.TaskName(c)
- if err := tumble.PutNamedMutations(c, aeParent, map[string]tumble.Mutation{aeName: &cat}); err != nil {
- log.WithError(err).Errorf(c, "Failed to replace archive expiration mutation.")
- return grpcutil.Internal
+ // In case the stream was *registered* with Tumble, but is now being
+ // processed with task queue code, clear the Tumble archival mutation.
+ //
+ // TODO(dnj): Remove this once Tumble is drained.
+ archiveMutation := mutations.CreateArchiveTask{ID: id}
+ if err := tumble.CancelNamedMutations(c, archiveMutation.Root(c), archiveMutation.TaskName(c)); err != nil {
+ log.WithError(err).Warningf(c, "(Non-fatal) Failed to cancel archive mutation.")
}
log.Fields{
"terminalIndex": lst.TerminalIndex,
- "settleDelay": cat.SettleDelay,
- "completePeriod": cat.CompletePeriod,
- "scheduledAt": cat.Expiration,
- }.Debugf(c, "Terminal index was set, and archival mutation was scheduled.")
+ "settleDelay": params.SettleDelay,
+ "completePeriod": params.CompletePeriod,
+ }.Debugf(c, "Terminal index was set, and archival task was scheduled.")
return nil
}
}, nil)

Powered by Google App Engine
This is Rietveld 408576698