| Index: logdog/appengine/coordinator/endpoints/services/registerStream.go
|
| diff --git a/logdog/appengine/coordinator/endpoints/services/registerStream.go b/logdog/appengine/coordinator/endpoints/services/registerStream.go
|
| index 809b1496a10883ed9fe077e5b34dd274a2df4b89..a39987ddd3a8df8a25af76a2d5936c47bdad726f 100644
|
| --- a/logdog/appengine/coordinator/endpoints/services/registerStream.go
|
| +++ b/logdog/appengine/coordinator/endpoints/services/registerStream.go
|
| @@ -16,9 +16,8 @@ package services
|
|
|
| import (
|
| "crypto/subtle"
|
| + "time"
|
|
|
| - "github.com/golang/protobuf/proto"
|
| - ds "github.com/luci/gae/service/datastore"
|
| "github.com/luci/luci-go/common/clock"
|
| log "github.com/luci/luci-go/common/logging"
|
| "github.com/luci/luci-go/grpc/grpcutil"
|
| @@ -26,9 +25,12 @@ import (
|
| "github.com/luci/luci-go/logdog/api/logpb"
|
| "github.com/luci/luci-go/logdog/appengine/coordinator"
|
| "github.com/luci/luci-go/logdog/appengine/coordinator/endpoints"
|
| - "github.com/luci/luci-go/logdog/appengine/coordinator/mutations"
|
| + "github.com/luci/luci-go/logdog/appengine/coordinator/tasks"
|
| "github.com/luci/luci-go/logdog/common/types"
|
| - "github.com/luci/luci-go/tumble"
|
| +
|
| + ds "github.com/luci/gae/service/datastore"
|
| +
|
| + "github.com/golang/protobuf/proto"
|
| "golang.org/x/net/context"
|
| "google.golang.org/grpc/codes"
|
| )
|
| @@ -153,17 +155,16 @@ func (s *server) RegisterStream(c context.Context, req *logdog.RegisterStreamReq
|
| }
|
|
|
| // The stream does not exist. Proceed with transactional registration.
|
| - lstKey := ds.KeyForObj(c, lst)
|
| - err = tumble.RunUnbuffered(c, lstKey, func(c context.Context) ([]tumble.Mutation, error) {
|
| + err = ds.RunInTransaction(c, func(c context.Context) error {
|
| // Load our state and stream (transactional).
|
| switch err := ds.Get(c, ls, lst); {
|
| case err == nil:
|
| // The stream is already registered.
|
| - return nil, nil
|
| + return nil
|
|
|
| case !anyNoSuchEntity(err):
|
| log.WithError(err).Errorf(c, "Failed to check for stream registration (transactional).")
|
| - return nil, err
|
| + return err
|
| }
|
|
|
| // The stream is not yet registered.
|
| @@ -183,7 +184,7 @@ func (s *server) RegisterStream(c context.Context, req *logdog.RegisterStreamReq
|
| log.Fields{
|
| log.ErrorKey: err,
|
| }.Errorf(c, "Failed to load descriptor into LogStream.")
|
| - return nil, grpcutil.Errf(codes.InvalidArgument, "Failed to load descriptor.")
|
| + return grpcutil.Errf(codes.InvalidArgument, "Failed to load descriptor.")
|
| }
|
|
|
| // If our registration request included a terminal index, terminate the
|
| @@ -203,54 +204,58 @@ func (s *server) RegisterStream(c context.Context, req *logdog.RegisterStreamReq
|
| log.Fields{
|
| log.ErrorKey: err,
|
| }.Errorf(c, "Failed to Put LogStream.")
|
| - return nil, grpcutil.Internal
|
| + return grpcutil.Internal
|
| }
|
|
|
| - // Add a named delayed mutation to archive this stream if it's not archived
|
| - // yet.
|
| + // Add a named delayed task queue task to archive this stream if it's not
|
| + // archived yet.
|
| //
|
| // If the registration did not include a terminal index, this will be our
|
| - // pessimistic archival request, scheduled on registration to catch streams
|
| - // that don't expire. This mutation will be replaced by the optimistic
|
| - // archival mutation when/if the stream is terminated via TerminateStream.
|
| + // pessimistic archival request, scheduled on registration to catch
|
| + // streams that don't expire. This task will be replaced by the optimistic
|
| + // archival task when/if the stream is terminated via TerminateStream.
|
| //
|
| // If the registration included a terminal index, apply our standard
|
| // parameters to the archival. Since TerminateStream will not be called,
|
| // this will be our formal optimistic archival task.
|
| params := standardArchivalParams(cfg, pcfg)
|
| - cat := mutations.CreateArchiveTask{
|
| - ID: ls.ID,
|
| - }
|
| +
|
| + var (
|
| + delay time.Duration
|
| + archivalTag logdog.ArchiveDispatchTask_Tag
|
| + )
|
| if req.TerminalIndex < 0 {
|
| // No terminal index, schedule pessimistic cleanup archival.
|
| - cat.Expiration = now.Add(params.CompletePeriod)
|
| + delay = params.CompletePeriod
|
| + archivalTag = logdog.ArchiveDispatchTask_EXPIRED
|
| +
|
| + // For cleanup, we instruct the archival to not wait any longer or
|
| + // allow the stream time extra time to become complete (archive as-is).
|
| + params.SettleDelay = 0
|
| + params.CompletePeriod = 0
|
|
|
| log.Fields{
|
| - "deadline": cat.Expiration,
|
| - }.Debugf(c, "Scheduling cleanup archival mutation.")
|
| + "deadline": delay,
|
| + }.Debugf(c, "Scheduling cleanup archival task.")
|
| } else {
|
| - // Terminal index, schedule optimistic archival (mirrors TerminateStream).
|
| - cat.SettleDelay = params.SettleDelay
|
| - cat.CompletePeriod = params.CompletePeriod
|
| -
|
| - // Schedule this mutation to execute after our settle delay.
|
| - cat.Expiration = now.Add(params.SettleDelay)
|
| + // Schedule this task to execute after our settle delay.
|
| + delay = params.SettleDelay
|
| + archivalTag = logdog.ArchiveDispatchTask_TERMINATED
|
|
|
| log.Fields{
|
| - "settleDelay": cat.SettleDelay,
|
| - "completePeriod": cat.CompletePeriod,
|
| - "scheduledAt": cat.Expiration,
|
| - }.Debugf(c, "Scheduling archival mutation.")
|
| + "settleDelay": params.SettleDelay,
|
| + "completePeriod": params.CompletePeriod,
|
| + "scheduledAt": delay,
|
| + }.Debugf(c, "Scheduling archival task.")
|
| }
|
|
|
| - aeName := cat.TaskName(c)
|
| - if err := tumble.PutNamedMutations(c, lstKey, map[string]tumble.Mutation{aeName: &cat}); err != nil {
|
| - log.WithError(err).Errorf(c, "Failed to write named mutations.")
|
| - return nil, grpcutil.Internal
|
| + if err := tasks.CreateArchivalTask(c, logStreamID, archivalTag, delay, params); err != nil {
|
| + log.WithError(err).Errorf(c, "Failed to create archival task.")
|
| + return grpcutil.Internal
|
| }
|
|
|
| - return nil, nil
|
| - })
|
| + return nil
|
| + }, nil)
|
| if err != nil {
|
| log.Fields{
|
| log.ErrorKey: err,
|
|
|