| Index: logdog/appengine/coordinator/endpoints/services/terminateStream.go
|
| diff --git a/logdog/appengine/coordinator/endpoints/services/terminateStream.go b/logdog/appengine/coordinator/endpoints/services/terminateStream.go
|
| index aea7a9acb9ec4bf7a1da3f2b142897c87913de82..c7cfec98f22b412e19963b8822c89cb625a67c5a 100644
|
| --- a/logdog/appengine/coordinator/endpoints/services/terminateStream.go
|
| +++ b/logdog/appengine/coordinator/endpoints/services/terminateStream.go
|
| @@ -18,7 +18,9 @@ import (
|
| "crypto/subtle"
|
|
|
| "github.com/golang/protobuf/ptypes/empty"
|
| +
|
| ds "github.com/luci/gae/service/datastore"
|
| +
|
| "github.com/luci/luci-go/common/clock"
|
| log "github.com/luci/luci-go/common/logging"
|
| "github.com/luci/luci-go/common/proto/google"
|
| @@ -29,7 +31,9 @@ import (
|
| "github.com/luci/luci-go/logdog/appengine/coordinator/config"
|
| "github.com/luci/luci-go/logdog/appengine/coordinator/endpoints"
|
| "github.com/luci/luci-go/logdog/appengine/coordinator/mutations"
|
| + "github.com/luci/luci-go/logdog/appengine/coordinator/tasks"
|
| "github.com/luci/luci-go/tumble"
|
| +
|
| "golang.org/x/net/context"
|
| "google.golang.org/grpc/codes"
|
| )
|
| @@ -116,31 +120,35 @@ func (s *server) TerminateStream(c context.Context, req *logdog.TerminateStreamR
|
| return grpcutil.Internal
|
| }
|
|
|
| - // Replace the pessimistic archive expiration mutation scheduled in
|
| - // RegisterStream with an optimistic archival mutation.
|
| - cat := mutations.CreateArchiveTask{
|
| - ID: id,
|
| + // Replace the pessimistic archive expiration task scheduled in
|
| + // RegisterStream with an optimistic archival task.
|
| + if err := tasks.CreateArchivalTask(c, id, logdog.ArchiveDispatchTask_TERMINATED,
|
| + params.SettleDelay, params); err != nil {
|
|
|
| - // Optimistic parameters.
|
| - SettleDelay: params.SettleDelay,
|
| - CompletePeriod: params.CompletePeriod,
|
| + log.WithError(err).Errorf(c, "Failed to create terminated archival task.")
|
| + return grpcutil.Internal
|
| + }
|
|
|
| - // Schedule this mutation to execute after our settle delay.
|
| - Expiration: now.Add(params.SettleDelay),
|
| + if err := tasks.DeleteArchiveStreamExpiredTask(c, id); err != nil {
|
| + // If we can't delete this task, it will just run, notice that the
|
| + // stream is archived, and quit. No big deal.
|
| + log.WithError(err).Warningf(c, "(Non-fatal) Failed to delete expired archival task.")
|
| }
|
|
|
| - aeParent, aeName := ds.KeyForObj(c, lst), cat.TaskName(c)
|
| - if err := tumble.PutNamedMutations(c, aeParent, map[string]tumble.Mutation{aeName: &cat}); err != nil {
|
| - log.WithError(err).Errorf(c, "Failed to replace archive expiration mutation.")
|
| - return grpcutil.Internal
|
| + // In case the stream was *registered* with Tumble, but is now being
|
| + // processed with task queue code, clear the Tumble archival mutation.
|
| + //
|
| + // TODO(dnj): Remove this once Tumble is drained.
|
| + archiveMutation := mutations.CreateArchiveTask{ID: id}
|
| + if err := tumble.CancelNamedMutations(c, archiveMutation.Root(c), archiveMutation.TaskName(c)); err != nil {
|
| + log.WithError(err).Warningf(c, "(Non-fatal) Failed to cancel archive mutation.")
|
| }
|
|
|
| log.Fields{
|
| "terminalIndex": lst.TerminalIndex,
|
| - "settleDelay": cat.SettleDelay,
|
| - "completePeriod": cat.CompletePeriod,
|
| - "scheduledAt": cat.Expiration,
|
| - }.Debugf(c, "Terminal index was set, and archival mutation was scheduled.")
|
| + "settleDelay": params.SettleDelay,
|
| + "completePeriod": params.CompletePeriod,
|
| + }.Debugf(c, "Terminal index was set, and archival task was scheduled.")
|
| return nil
|
| }
|
| }, nil)
|
|
|