Index: logdog/appengine/coordinator/endpoints/services/registerStream.go |
diff --git a/logdog/appengine/coordinator/endpoints/services/registerStream.go b/logdog/appengine/coordinator/endpoints/services/registerStream.go |
index 2cf99f75f4b42512dc04771c50316fe15c5cc212..58dcfa5d27a3d8dea16910b6c39074894241bc0c 100644 |
--- a/logdog/appengine/coordinator/endpoints/services/registerStream.go |
+++ b/logdog/appengine/coordinator/endpoints/services/registerStream.go |
@@ -12,11 +12,9 @@ import ( |
"github.com/luci/luci-go/common/clock" |
log "github.com/luci/luci-go/common/logging" |
"github.com/luci/luci-go/grpc/grpcutil" |
- "github.com/luci/luci-go/logdog/api/config/svcconfig" |
"github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" |
"github.com/luci/luci-go/logdog/api/logpb" |
"github.com/luci/luci-go/logdog/appengine/coordinator" |
- "github.com/luci/luci-go/logdog/appengine/coordinator/config" |
"github.com/luci/luci-go/logdog/appengine/coordinator/endpoints" |
"github.com/luci/luci-go/logdog/appengine/coordinator/hierarchy" |
"github.com/luci/luci-go/logdog/appengine/coordinator/mutations" |
@@ -162,14 +160,103 @@ func (s *server) RegisterStream(c context.Context, req *logdog.RegisterStreamReq |
} |
// The stream does not exist. Proceed with transactional registration. |
- err = tumble.RunMutation(c, ®isterStreamMutation{ |
- RegisterStreamRequest: req, |
- cfg: cfg, |
- pcfg: pcfg, |
- desc: &desc, |
- pfx: pfx, |
- lst: lst, |
- ls: ls, |
+ lsKey := ds.KeyForObj(c, ls) |
+ err = tumble.RunUnbuffered(c, lsKey, func(c context.Context) ([]tumble.Mutation, error) { |
+ // Load our state and stream (transactional). |
+ switch err := ds.Get(c, ls, lst); { |
+ case err == nil: |
+ // The stream is already registered. |
+ return nil, nil |
+ |
+ case !anyNoSuchEntity(err): |
+ log.WithError(err).Errorf(c, "Failed to check for stream registration (transactional).") |
+ return nil, err |
+ } |
+ |
+ // The stream is not yet registered. |
+ log.Infof(c, "Registering new log stream.") |
+ |
+ // Construct our LogStreamState. |
+ now := clock.Now(c).UTC() |
+ lst.Created = now |
+ lst.Updated = now |
+ lst.Secret = pfx.Secret // Copy Prefix Secret to reduce datastore Gets. |
+ |
+ // Construct our LogStream. |
+ ls.Created = now |
+ ls.ProtoVersion = req.ProtoVersion |
+ |
+ if err := ls.LoadDescriptor(&desc); err != nil { |
+ log.Fields{ |
+ log.ErrorKey: err, |
+ }.Errorf(c, "Failed to load descriptor into LogStream.") |
+ return nil, grpcutil.Errf(codes.InvalidArgument, "Failed to load descriptor.") |
+ } |
+ |
+ // If our registration request included a terminal index, terminate the |
+ // log stream state as well. |
+ if req.TerminalIndex >= 0 { |
+ log.Fields{ |
+ "terminalIndex": req.TerminalIndex, |
+ }.Debugf(c, "Registration request included terminal index.") |
+ |
+ lst.TerminalIndex = req.TerminalIndex |
+ lst.TerminatedTime = now |
+ } else { |
+ lst.TerminalIndex = -1 |
+ } |
+ |
+ if err := ds.Put(c, ls, lst); err != nil { |
+ log.Fields{ |
+ log.ErrorKey: err, |
+ }.Errorf(c, "Failed to Put LogStream.") |
+ return nil, grpcutil.Internal |
+ } |
+ |
+ // Add a named delayed mutation to archive this stream if it's not archived |
+ // yet. |
+ // |
+ // If the registration did not include a terminal index, this will be our |
+ // pessimistic archival request, scheduled on registration to catch streams |
+ // that don't expire. This mutation will be replaced by the optimistic |
+ // archival mutation when/if the stream is terminated via TerminateStream. |
+ // |
+ // If the registration included a terminal index, apply our standard |
+ // parameters to the archival. Since TerminateStream will not be called, |
+ // this will be our formal optimistic archival task. |
+ params := standardArchivalParams(cfg, pcfg) |
+ cat := mutations.CreateArchiveTask{ |
+ ID: ls.ID, |
+ } |
+ if req.TerminalIndex < 0 { |
+ // No terminal index, schedule pessimistic cleanup archival. |
+ cat.Expiration = now.Add(params.CompletePeriod) |
+ |
+ log.Fields{ |
+ "deadline": cat.Expiration, |
+ }.Debugf(c, "Scheduling cleanup archival mutation.") |
+ } else { |
+ // Terminal index, schedule optimistic archival (mirrors TerminateStream). |
+ cat.SettleDelay = params.SettleDelay |
+ cat.CompletePeriod = params.CompletePeriod |
+ |
+ // Schedule this mutation to execute after our settle delay. |
+ cat.Expiration = now.Add(params.SettleDelay) |
+ |
+ log.Fields{ |
+ "settleDelay": cat.SettleDelay, |
+ "completePeriod": cat.CompletePeriod, |
+ "scheduledAt": cat.Expiration, |
+ }.Debugf(c, "Scheduling archival mutation.") |
+ } |
+ |
+ aeName := cat.TaskName(c) |
+ if err := tumble.PutNamedMutations(c, lsKey, map[string]tumble.Mutation{aeName: &cat}); err != nil { |
+ log.WithError(err).Errorf(c, "Failed to write named mutations.") |
+ return nil, grpcutil.Internal |
+ } |
+ |
+ return nil, nil |
}) |
if err != nil { |
log.Fields{ |
@@ -184,117 +271,3 @@ func (s *server) RegisterStream(c context.Context, req *logdog.RegisterStreamReq |
State: buildLogStreamState(ls, lst), |
}, nil |
} |
- |
-type registerStreamMutation struct { |
- *logdog.RegisterStreamRequest |
- |
- cfg *config.Config |
- pcfg *svcconfig.ProjectConfig |
- |
- desc *logpb.LogStreamDescriptor |
- pfx *coordinator.LogPrefix |
- ls *coordinator.LogStream |
- lst *coordinator.LogStreamState |
-} |
- |
-func (m *registerStreamMutation) RollForward(c context.Context) ([]tumble.Mutation, error) { |
- // Load our state and stream (transactional). |
- switch err := ds.Get(c, m.ls, m.lst); { |
- case err == nil: |
- // The stream is already registered. |
- return nil, nil |
- |
- case !anyNoSuchEntity(err): |
- log.WithError(err).Errorf(c, "Failed to check for stream registration (transactional).") |
- return nil, err |
- } |
- |
- // The stream is not yet registered. |
- log.Infof(c, "Registering new log stream.") |
- |
- // Construct our LogStreamState. |
- now := clock.Now(c).UTC() |
- m.lst.Created = now |
- m.lst.Updated = now |
- m.lst.Secret = m.pfx.Secret // Copy Prefix Secret to reduce datastore Gets. |
- |
- // Construct our LogStream. |
- m.ls.Created = now |
- m.ls.ProtoVersion = m.ProtoVersion |
- |
- if err := m.ls.LoadDescriptor(m.desc); err != nil { |
- log.Fields{ |
- log.ErrorKey: err, |
- }.Errorf(c, "Failed to load descriptor into LogStream.") |
- return nil, grpcutil.Errf(codes.InvalidArgument, "Failed to load descriptor.") |
- } |
- |
- // If our registration request included a terminal index, terminate the |
- // log stream state as well. |
- if m.TerminalIndex >= 0 { |
- log.Fields{ |
- "terminalIndex": m.TerminalIndex, |
- }.Debugf(c, "Registration request included terminal index.") |
- |
- m.lst.TerminalIndex = m.TerminalIndex |
- m.lst.TerminatedTime = now |
- } else { |
- m.lst.TerminalIndex = -1 |
- } |
- |
- if err := ds.Put(c, m.ls, m.lst); err != nil { |
- log.Fields{ |
- log.ErrorKey: err, |
- }.Errorf(c, "Failed to Put LogStream.") |
- return nil, grpcutil.Internal |
- } |
- |
- // Add a named delayed mutation to archive this stream if it's not archived |
- // yet. |
- // |
- // If the registration did not include a terminal index, this will be our |
- // pessimistic archival request, scheduled on registration to catch streams |
- // that don't expire. This mutation will be replaced by the optimistic |
- // archival mutation when/if the stream is terminated via TerminateStream. |
- // |
- // If the registration included a terminal index, apply our standard |
- // parameters to the archival. Since TerminateStream will not be called, |
- // this will be our formal optimistic archival task. |
- params := standardArchivalParams(m.cfg, m.pcfg) |
- cat := mutations.CreateArchiveTask{ |
- ID: m.ls.ID, |
- } |
- if m.TerminalIndex < 0 { |
- // No terminal index, schedule pessimistic cleanup archival. |
- cat.Expiration = now.Add(params.CompletePeriod) |
- |
- log.Fields{ |
- "deadline": cat.Expiration, |
- }.Debugf(c, "Scheduling cleanup archival mutation.") |
- } else { |
- // Terminal index, schedule optimistic archival (mirrors TerminateStream). |
- cat.SettleDelay = params.SettleDelay |
- cat.CompletePeriod = params.CompletePeriod |
- |
- // Schedule this mutation to execute after our settle delay. |
- cat.Expiration = now.Add(params.SettleDelay) |
- |
- log.Fields{ |
- "settleDelay": cat.SettleDelay, |
- "completePeriod": cat.CompletePeriod, |
- "scheduledAt": cat.Expiration, |
- }.Debugf(c, "Scheduling archival mutation.") |
- } |
- |
- aeParent, aeName := cat.TaskName(c) |
- if err := tumble.PutNamedMutations(c, aeParent, map[string]tumble.Mutation{aeName: &cat}); err != nil { |
- log.WithError(err).Errorf(c, "Failed to write named mutations.") |
- return nil, grpcutil.Internal |
- } |
- |
- return nil, nil |
-} |
- |
-func (m *registerStreamMutation) Root(c context.Context) *ds.Key { |
- return ds.KeyForObj(c, m.ls) |
-} |