Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(397)

Unified Diff: logdog/appengine/coordinator/endpoints/services/registerStream.go

Issue 2989333002: [logdog] Replace Tumble with push queues. (Closed)
Patch Set: comments Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: logdog/appengine/coordinator/endpoints/services/registerStream.go
diff --git a/logdog/appengine/coordinator/endpoints/services/registerStream.go b/logdog/appengine/coordinator/endpoints/services/registerStream.go
index 809b1496a10883ed9fe077e5b34dd274a2df4b89..a39987ddd3a8df8a25af76a2d5936c47bdad726f 100644
--- a/logdog/appengine/coordinator/endpoints/services/registerStream.go
+++ b/logdog/appengine/coordinator/endpoints/services/registerStream.go
@@ -16,9 +16,8 @@ package services
import (
"crypto/subtle"
+ "time"
- "github.com/golang/protobuf/proto"
- ds "github.com/luci/gae/service/datastore"
"github.com/luci/luci-go/common/clock"
log "github.com/luci/luci-go/common/logging"
"github.com/luci/luci-go/grpc/grpcutil"
@@ -26,9 +25,12 @@ import (
"github.com/luci/luci-go/logdog/api/logpb"
"github.com/luci/luci-go/logdog/appengine/coordinator"
"github.com/luci/luci-go/logdog/appengine/coordinator/endpoints"
- "github.com/luci/luci-go/logdog/appengine/coordinator/mutations"
+ "github.com/luci/luci-go/logdog/appengine/coordinator/tasks"
"github.com/luci/luci-go/logdog/common/types"
- "github.com/luci/luci-go/tumble"
+
+ ds "github.com/luci/gae/service/datastore"
+
+ "github.com/golang/protobuf/proto"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
)
@@ -153,17 +155,16 @@ func (s *server) RegisterStream(c context.Context, req *logdog.RegisterStreamReq
}
// The stream does not exist. Proceed with transactional registration.
- lstKey := ds.KeyForObj(c, lst)
- err = tumble.RunUnbuffered(c, lstKey, func(c context.Context) ([]tumble.Mutation, error) {
+ err = ds.RunInTransaction(c, func(c context.Context) error {
// Load our state and stream (transactional).
switch err := ds.Get(c, ls, lst); {
case err == nil:
// The stream is already registered.
- return nil, nil
+ return nil
case !anyNoSuchEntity(err):
log.WithError(err).Errorf(c, "Failed to check for stream registration (transactional).")
- return nil, err
+ return err
}
// The stream is not yet registered.
@@ -183,7 +184,7 @@ func (s *server) RegisterStream(c context.Context, req *logdog.RegisterStreamReq
log.Fields{
log.ErrorKey: err,
}.Errorf(c, "Failed to load descriptor into LogStream.")
- return nil, grpcutil.Errf(codes.InvalidArgument, "Failed to load descriptor.")
+ return grpcutil.Errf(codes.InvalidArgument, "Failed to load descriptor.")
}
// If our registration request included a terminal index, terminate the
@@ -203,54 +204,58 @@ func (s *server) RegisterStream(c context.Context, req *logdog.RegisterStreamReq
log.Fields{
log.ErrorKey: err,
}.Errorf(c, "Failed to Put LogStream.")
- return nil, grpcutil.Internal
+ return grpcutil.Internal
}
- // Add a named delayed mutation to archive this stream if it's not archived
- // yet.
+ // Add a named delayed task queue task to archive this stream if it's not
+ // archived yet.
//
// If the registration did not include a terminal index, this will be our
- // pessimistic archival request, scheduled on registration to catch streams
- // that don't expire. This mutation will be replaced by the optimistic
- // archival mutation when/if the stream is terminated via TerminateStream.
+ // pessimistic archival request, scheduled on registration to catch
+ // streams that don't expire. This task will be replaced by the optimistic
+ // archival task when/if the stream is terminated via TerminateStream.
//
// If the registration included a terminal index, apply our standard
// parameters to the archival. Since TerminateStream will not be called,
// this will be our formal optimistic archival task.
params := standardArchivalParams(cfg, pcfg)
- cat := mutations.CreateArchiveTask{
- ID: ls.ID,
- }
+
+ var (
+ delay time.Duration
+ archivalTag logdog.ArchiveDispatchTask_Tag
+ )
if req.TerminalIndex < 0 {
// No terminal index, schedule pessimistic cleanup archival.
- cat.Expiration = now.Add(params.CompletePeriod)
+ delay = params.CompletePeriod
+ archivalTag = logdog.ArchiveDispatchTask_EXPIRED
+
+ // For cleanup, we instruct the archival to not wait any longer or
+ // allow the stream time extra time to become complete (archive as-is).
+ params.SettleDelay = 0
+ params.CompletePeriod = 0
log.Fields{
- "deadline": cat.Expiration,
- }.Debugf(c, "Scheduling cleanup archival mutation.")
+ "deadline": delay,
+ }.Debugf(c, "Scheduling cleanup archival task.")
} else {
- // Terminal index, schedule optimistic archival (mirrors TerminateStream).
- cat.SettleDelay = params.SettleDelay
- cat.CompletePeriod = params.CompletePeriod
-
- // Schedule this mutation to execute after our settle delay.
- cat.Expiration = now.Add(params.SettleDelay)
+ // Schedule this task to execute after our settle delay.
+ delay = params.SettleDelay
+ archivalTag = logdog.ArchiveDispatchTask_TERMINATED
log.Fields{
- "settleDelay": cat.SettleDelay,
- "completePeriod": cat.CompletePeriod,
- "scheduledAt": cat.Expiration,
- }.Debugf(c, "Scheduling archival mutation.")
+ "settleDelay": params.SettleDelay,
+ "completePeriod": params.CompletePeriod,
+ "scheduledAt": delay,
+ }.Debugf(c, "Scheduling archival task.")
}
- aeName := cat.TaskName(c)
- if err := tumble.PutNamedMutations(c, lstKey, map[string]tumble.Mutation{aeName: &cat}); err != nil {
- log.WithError(err).Errorf(c, "Failed to write named mutations.")
- return nil, grpcutil.Internal
+ if err := tasks.CreateArchivalTask(c, logStreamID, archivalTag, delay, params); err != nil {
+ log.WithError(err).Errorf(c, "Failed to create archival task.")
+ return grpcutil.Internal
}
- return nil, nil
- })
+ return nil
+ }, nil)
if err != nil {
log.Fields{
log.ErrorKey: err,

Powered by Google App Engine
This is Rietveld 408576698