Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(628)

Unified Diff: logdog/appengine/coordinator/endpoints/services/registerStream.go

Issue 2592753002: Create unbuffered Tumble entry point for LogDog. (Closed)
Patch Set: Add bench, update. Created 4 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | logdog/appengine/coordinator/endpoints/services/registerStream_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: logdog/appengine/coordinator/endpoints/services/registerStream.go
diff --git a/logdog/appengine/coordinator/endpoints/services/registerStream.go b/logdog/appengine/coordinator/endpoints/services/registerStream.go
index 2cf99f75f4b42512dc04771c50316fe15c5cc212..58dcfa5d27a3d8dea16910b6c39074894241bc0c 100644
--- a/logdog/appengine/coordinator/endpoints/services/registerStream.go
+++ b/logdog/appengine/coordinator/endpoints/services/registerStream.go
@@ -12,11 +12,9 @@ import (
"github.com/luci/luci-go/common/clock"
log "github.com/luci/luci-go/common/logging"
"github.com/luci/luci-go/grpc/grpcutil"
- "github.com/luci/luci-go/logdog/api/config/svcconfig"
"github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1"
"github.com/luci/luci-go/logdog/api/logpb"
"github.com/luci/luci-go/logdog/appengine/coordinator"
- "github.com/luci/luci-go/logdog/appengine/coordinator/config"
"github.com/luci/luci-go/logdog/appengine/coordinator/endpoints"
"github.com/luci/luci-go/logdog/appengine/coordinator/hierarchy"
"github.com/luci/luci-go/logdog/appengine/coordinator/mutations"
@@ -162,14 +160,103 @@ func (s *server) RegisterStream(c context.Context, req *logdog.RegisterStreamReq
}
// The stream does not exist. Proceed with transactional registration.
- err = tumble.RunMutation(c, &registerStreamMutation{
- RegisterStreamRequest: req,
- cfg: cfg,
- pcfg: pcfg,
- desc: &desc,
- pfx: pfx,
- lst: lst,
- ls: ls,
+ lsKey := ds.KeyForObj(c, ls)
+ err = tumble.RunUnbuffered(c, lsKey, func(c context.Context) ([]tumble.Mutation, error) {
+ // Load our state and stream (transactional).
+ switch err := ds.Get(c, ls, lst); {
+ case err == nil:
+ // The stream is already registered.
+ return nil, nil
+
+ case !anyNoSuchEntity(err):
+ log.WithError(err).Errorf(c, "Failed to check for stream registration (transactional).")
+ return nil, err
+ }
+
+ // The stream is not yet registered.
+ log.Infof(c, "Registering new log stream.")
+
+ // Construct our LogStreamState.
+ now := clock.Now(c).UTC()
+ lst.Created = now
+ lst.Updated = now
+ lst.Secret = pfx.Secret // Copy Prefix Secret to reduce datastore Gets.
+
+ // Construct our LogStream.
+ ls.Created = now
+ ls.ProtoVersion = req.ProtoVersion
+
+ if err := ls.LoadDescriptor(&desc); err != nil {
+ log.Fields{
+ log.ErrorKey: err,
+ }.Errorf(c, "Failed to load descriptor into LogStream.")
+ return nil, grpcutil.Errf(codes.InvalidArgument, "Failed to load descriptor.")
+ }
+
+ // If our registration request included a terminal index, terminate the
+ // log stream state as well.
+ if req.TerminalIndex >= 0 {
+ log.Fields{
+ "terminalIndex": req.TerminalIndex,
+ }.Debugf(c, "Registration request included terminal index.")
+
+ lst.TerminalIndex = req.TerminalIndex
+ lst.TerminatedTime = now
+ } else {
+ lst.TerminalIndex = -1
+ }
+
+ if err := ds.Put(c, ls, lst); err != nil {
+ log.Fields{
+ log.ErrorKey: err,
+ }.Errorf(c, "Failed to Put LogStream.")
+ return nil, grpcutil.Internal
+ }
+
+ // Add a named delayed mutation to archive this stream if it's not archived
+ // yet.
+ //
+ // If the registration did not include a terminal index, this will be our
+ // pessimistic archival request, scheduled on registration to catch streams
+ // that don't expire. This mutation will be replaced by the optimistic
+ // archival mutation when/if the stream is terminated via TerminateStream.
+ //
+ // If the registration included a terminal index, apply our standard
+ // parameters to the archival. Since TerminateStream will not be called,
+ // this will be our formal optimistic archival task.
+ params := standardArchivalParams(cfg, pcfg)
+ cat := mutations.CreateArchiveTask{
+ ID: ls.ID,
+ }
+ if req.TerminalIndex < 0 {
+ // No terminal index, schedule pessimistic cleanup archival.
+ cat.Expiration = now.Add(params.CompletePeriod)
+
+ log.Fields{
+ "deadline": cat.Expiration,
+ }.Debugf(c, "Scheduling cleanup archival mutation.")
+ } else {
+ // Terminal index, schedule optimistic archival (mirrors TerminateStream).
+ cat.SettleDelay = params.SettleDelay
+ cat.CompletePeriod = params.CompletePeriod
+
+ // Schedule this mutation to execute after our settle delay.
+ cat.Expiration = now.Add(params.SettleDelay)
+
+ log.Fields{
+ "settleDelay": cat.SettleDelay,
+ "completePeriod": cat.CompletePeriod,
+ "scheduledAt": cat.Expiration,
+ }.Debugf(c, "Scheduling archival mutation.")
+ }
+
+ aeName := cat.TaskName(c)
+ if err := tumble.PutNamedMutations(c, lsKey, map[string]tumble.Mutation{aeName: &cat}); err != nil {
+ log.WithError(err).Errorf(c, "Failed to write named mutations.")
+ return nil, grpcutil.Internal
+ }
+
+ return nil, nil
})
if err != nil {
log.Fields{
@@ -184,117 +271,3 @@ func (s *server) RegisterStream(c context.Context, req *logdog.RegisterStreamReq
State: buildLogStreamState(ls, lst),
}, nil
}
-
-type registerStreamMutation struct {
- *logdog.RegisterStreamRequest
-
- cfg *config.Config
- pcfg *svcconfig.ProjectConfig
-
- desc *logpb.LogStreamDescriptor
- pfx *coordinator.LogPrefix
- ls *coordinator.LogStream
- lst *coordinator.LogStreamState
-}
-
-func (m *registerStreamMutation) RollForward(c context.Context) ([]tumble.Mutation, error) {
- // Load our state and stream (transactional).
- switch err := ds.Get(c, m.ls, m.lst); {
- case err == nil:
- // The stream is already registered.
- return nil, nil
-
- case !anyNoSuchEntity(err):
- log.WithError(err).Errorf(c, "Failed to check for stream registration (transactional).")
- return nil, err
- }
-
- // The stream is not yet registered.
- log.Infof(c, "Registering new log stream.")
-
- // Construct our LogStreamState.
- now := clock.Now(c).UTC()
- m.lst.Created = now
- m.lst.Updated = now
- m.lst.Secret = m.pfx.Secret // Copy Prefix Secret to reduce datastore Gets.
-
- // Construct our LogStream.
- m.ls.Created = now
- m.ls.ProtoVersion = m.ProtoVersion
-
- if err := m.ls.LoadDescriptor(m.desc); err != nil {
- log.Fields{
- log.ErrorKey: err,
- }.Errorf(c, "Failed to load descriptor into LogStream.")
- return nil, grpcutil.Errf(codes.InvalidArgument, "Failed to load descriptor.")
- }
-
- // If our registration request included a terminal index, terminate the
- // log stream state as well.
- if m.TerminalIndex >= 0 {
- log.Fields{
- "terminalIndex": m.TerminalIndex,
- }.Debugf(c, "Registration request included terminal index.")
-
- m.lst.TerminalIndex = m.TerminalIndex
- m.lst.TerminatedTime = now
- } else {
- m.lst.TerminalIndex = -1
- }
-
- if err := ds.Put(c, m.ls, m.lst); err != nil {
- log.Fields{
- log.ErrorKey: err,
- }.Errorf(c, "Failed to Put LogStream.")
- return nil, grpcutil.Internal
- }
-
- // Add a named delayed mutation to archive this stream if it's not archived
- // yet.
- //
- // If the registration did not include a terminal index, this will be our
- // pessimistic archival request, scheduled on registration to catch streams
- // that don't expire. This mutation will be replaced by the optimistic
- // archival mutation when/if the stream is terminated via TerminateStream.
- //
- // If the registration included a terminal index, apply our standard
- // parameters to the archival. Since TerminateStream will not be called,
- // this will be our formal optimistic archival task.
- params := standardArchivalParams(m.cfg, m.pcfg)
- cat := mutations.CreateArchiveTask{
- ID: m.ls.ID,
- }
- if m.TerminalIndex < 0 {
- // No terminal index, schedule pessimistic cleanup archival.
- cat.Expiration = now.Add(params.CompletePeriod)
-
- log.Fields{
- "deadline": cat.Expiration,
- }.Debugf(c, "Scheduling cleanup archival mutation.")
- } else {
- // Terminal index, schedule optimistic archival (mirrors TerminateStream).
- cat.SettleDelay = params.SettleDelay
- cat.CompletePeriod = params.CompletePeriod
-
- // Schedule this mutation to execute after our settle delay.
- cat.Expiration = now.Add(params.SettleDelay)
-
- log.Fields{
- "settleDelay": cat.SettleDelay,
- "completePeriod": cat.CompletePeriod,
- "scheduledAt": cat.Expiration,
- }.Debugf(c, "Scheduling archival mutation.")
- }
-
- aeParent, aeName := cat.TaskName(c)
- if err := tumble.PutNamedMutations(c, aeParent, map[string]tumble.Mutation{aeName: &cat}); err != nil {
- log.WithError(err).Errorf(c, "Failed to write named mutations.")
- return nil, grpcutil.Internal
- }
-
- return nil, nil
-}
-
-func (m *registerStreamMutation) Root(c context.Context) *ds.Key {
- return ds.KeyForObj(c, m.ls)
-}
« no previous file with comments | « no previous file | logdog/appengine/coordinator/endpoints/services/registerStream_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698