| Index: logdog/appengine/coordinator/endpoints/services/registerStream.go
|
| diff --git a/logdog/appengine/coordinator/endpoints/services/registerStream.go b/logdog/appengine/coordinator/endpoints/services/registerStream.go
|
| index 2cf99f75f4b42512dc04771c50316fe15c5cc212..58dcfa5d27a3d8dea16910b6c39074894241bc0c 100644
|
| --- a/logdog/appengine/coordinator/endpoints/services/registerStream.go
|
| +++ b/logdog/appengine/coordinator/endpoints/services/registerStream.go
|
| @@ -12,11 +12,9 @@ import (
|
| "github.com/luci/luci-go/common/clock"
|
| log "github.com/luci/luci-go/common/logging"
|
| "github.com/luci/luci-go/grpc/grpcutil"
|
| - "github.com/luci/luci-go/logdog/api/config/svcconfig"
|
| "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1"
|
| "github.com/luci/luci-go/logdog/api/logpb"
|
| "github.com/luci/luci-go/logdog/appengine/coordinator"
|
| - "github.com/luci/luci-go/logdog/appengine/coordinator/config"
|
| "github.com/luci/luci-go/logdog/appengine/coordinator/endpoints"
|
| "github.com/luci/luci-go/logdog/appengine/coordinator/hierarchy"
|
| "github.com/luci/luci-go/logdog/appengine/coordinator/mutations"
|
| @@ -162,14 +160,103 @@ func (s *server) RegisterStream(c context.Context, req *logdog.RegisterStreamReq
|
| }
|
|
|
| // The stream does not exist. Proceed with transactional registration.
|
| - err = tumble.RunMutation(c, ®isterStreamMutation{
|
| - RegisterStreamRequest: req,
|
| - cfg: cfg,
|
| - pcfg: pcfg,
|
| - desc: &desc,
|
| - pfx: pfx,
|
| - lst: lst,
|
| - ls: ls,
|
| + lsKey := ds.KeyForObj(c, ls)
|
| + err = tumble.RunUnbuffered(c, lsKey, func(c context.Context) ([]tumble.Mutation, error) {
|
| + // Load our state and stream (transactional).
|
| + switch err := ds.Get(c, ls, lst); {
|
| + case err == nil:
|
| + // The stream is already registered.
|
| + return nil, nil
|
| +
|
| + case !anyNoSuchEntity(err):
|
| + log.WithError(err).Errorf(c, "Failed to check for stream registration (transactional).")
|
| + return nil, err
|
| + }
|
| +
|
| + // The stream is not yet registered.
|
| + log.Infof(c, "Registering new log stream.")
|
| +
|
| + // Construct our LogStreamState.
|
| + now := clock.Now(c).UTC()
|
| + lst.Created = now
|
| + lst.Updated = now
|
| + lst.Secret = pfx.Secret // Copy Prefix Secret to reduce datastore Gets.
|
| +
|
| + // Construct our LogStream.
|
| + ls.Created = now
|
| + ls.ProtoVersion = req.ProtoVersion
|
| +
|
| + if err := ls.LoadDescriptor(&desc); err != nil {
|
| + log.Fields{
|
| + log.ErrorKey: err,
|
| + }.Errorf(c, "Failed to load descriptor into LogStream.")
|
| + return nil, grpcutil.Errf(codes.InvalidArgument, "Failed to load descriptor.")
|
| + }
|
| +
|
| + // If our registration request included a terminal index, terminate the
|
| + // log stream state as well.
|
| + if req.TerminalIndex >= 0 {
|
| + log.Fields{
|
| + "terminalIndex": req.TerminalIndex,
|
| + }.Debugf(c, "Registration request included terminal index.")
|
| +
|
| + lst.TerminalIndex = req.TerminalIndex
|
| + lst.TerminatedTime = now
|
| + } else {
|
| + lst.TerminalIndex = -1
|
| + }
|
| +
|
| + if err := ds.Put(c, ls, lst); err != nil {
|
| + log.Fields{
|
| + log.ErrorKey: err,
|
| + }.Errorf(c, "Failed to Put LogStream.")
|
| + return nil, grpcutil.Internal
|
| + }
|
| +
|
| + // Add a named delayed mutation to archive this stream if it's not archived
|
| + // yet.
|
| + //
|
| + // If the registration did not include a terminal index, this will be our
|
| + // pessimistic archival request, scheduled on registration to catch streams
|
| + // that don't expire. This mutation will be replaced by the optimistic
|
| + // archival mutation when/if the stream is terminated via TerminateStream.
|
| + //
|
| + // If the registration included a terminal index, apply our standard
|
| + // parameters to the archival. Since TerminateStream will not be called,
|
| + // this will be our formal optimistic archival task.
|
| + params := standardArchivalParams(cfg, pcfg)
|
| + cat := mutations.CreateArchiveTask{
|
| + ID: ls.ID,
|
| + }
|
| + if req.TerminalIndex < 0 {
|
| + // No terminal index, schedule pessimistic cleanup archival.
|
| + cat.Expiration = now.Add(params.CompletePeriod)
|
| +
|
| + log.Fields{
|
| + "deadline": cat.Expiration,
|
| + }.Debugf(c, "Scheduling cleanup archival mutation.")
|
| + } else {
|
| + // Terminal index, schedule optimistic archival (mirrors TerminateStream).
|
| + cat.SettleDelay = params.SettleDelay
|
| + cat.CompletePeriod = params.CompletePeriod
|
| +
|
| + // Schedule this mutation to execute after our settle delay.
|
| + cat.Expiration = now.Add(params.SettleDelay)
|
| +
|
| + log.Fields{
|
| + "settleDelay": cat.SettleDelay,
|
| + "completePeriod": cat.CompletePeriod,
|
| + "scheduledAt": cat.Expiration,
|
| + }.Debugf(c, "Scheduling archival mutation.")
|
| + }
|
| +
|
| + aeName := cat.TaskName(c)
|
| + if err := tumble.PutNamedMutations(c, lsKey, map[string]tumble.Mutation{aeName: &cat}); err != nil {
|
| + log.WithError(err).Errorf(c, "Failed to write named mutations.")
|
| + return nil, grpcutil.Internal
|
| + }
|
| +
|
| + return nil, nil
|
| })
|
| if err != nil {
|
| log.Fields{
|
| @@ -184,117 +271,3 @@ func (s *server) RegisterStream(c context.Context, req *logdog.RegisterStreamReq
|
| State: buildLogStreamState(ls, lst),
|
| }, nil
|
| }
|
| -
|
| -type registerStreamMutation struct {
|
| - *logdog.RegisterStreamRequest
|
| -
|
| - cfg *config.Config
|
| - pcfg *svcconfig.ProjectConfig
|
| -
|
| - desc *logpb.LogStreamDescriptor
|
| - pfx *coordinator.LogPrefix
|
| - ls *coordinator.LogStream
|
| - lst *coordinator.LogStreamState
|
| -}
|
| -
|
| -func (m *registerStreamMutation) RollForward(c context.Context) ([]tumble.Mutation, error) {
|
| - // Load our state and stream (transactional).
|
| - switch err := ds.Get(c, m.ls, m.lst); {
|
| - case err == nil:
|
| - // The stream is already registered.
|
| - return nil, nil
|
| -
|
| - case !anyNoSuchEntity(err):
|
| - log.WithError(err).Errorf(c, "Failed to check for stream registration (transactional).")
|
| - return nil, err
|
| - }
|
| -
|
| - // The stream is not yet registered.
|
| - log.Infof(c, "Registering new log stream.")
|
| -
|
| - // Construct our LogStreamState.
|
| - now := clock.Now(c).UTC()
|
| - m.lst.Created = now
|
| - m.lst.Updated = now
|
| - m.lst.Secret = m.pfx.Secret // Copy Prefix Secret to reduce datastore Gets.
|
| -
|
| - // Construct our LogStream.
|
| - m.ls.Created = now
|
| - m.ls.ProtoVersion = m.ProtoVersion
|
| -
|
| - if err := m.ls.LoadDescriptor(m.desc); err != nil {
|
| - log.Fields{
|
| - log.ErrorKey: err,
|
| - }.Errorf(c, "Failed to load descriptor into LogStream.")
|
| - return nil, grpcutil.Errf(codes.InvalidArgument, "Failed to load descriptor.")
|
| - }
|
| -
|
| - // If our registration request included a terminal index, terminate the
|
| - // log stream state as well.
|
| - if m.TerminalIndex >= 0 {
|
| - log.Fields{
|
| - "terminalIndex": m.TerminalIndex,
|
| - }.Debugf(c, "Registration request included terminal index.")
|
| -
|
| - m.lst.TerminalIndex = m.TerminalIndex
|
| - m.lst.TerminatedTime = now
|
| - } else {
|
| - m.lst.TerminalIndex = -1
|
| - }
|
| -
|
| - if err := ds.Put(c, m.ls, m.lst); err != nil {
|
| - log.Fields{
|
| - log.ErrorKey: err,
|
| - }.Errorf(c, "Failed to Put LogStream.")
|
| - return nil, grpcutil.Internal
|
| - }
|
| -
|
| - // Add a named delayed mutation to archive this stream if it's not archived
|
| - // yet.
|
| - //
|
| - // If the registration did not include a terminal index, this will be our
|
| - // pessimistic archival request, scheduled on registration to catch streams
|
| - // that don't expire. This mutation will be replaced by the optimistic
|
| - // archival mutation when/if the stream is terminated via TerminateStream.
|
| - //
|
| - // If the registration included a terminal index, apply our standard
|
| - // parameters to the archival. Since TerminateStream will not be called,
|
| - // this will be our formal optimistic archival task.
|
| - params := standardArchivalParams(m.cfg, m.pcfg)
|
| - cat := mutations.CreateArchiveTask{
|
| - ID: m.ls.ID,
|
| - }
|
| - if m.TerminalIndex < 0 {
|
| - // No terminal index, schedule pessimistic cleanup archival.
|
| - cat.Expiration = now.Add(params.CompletePeriod)
|
| -
|
| - log.Fields{
|
| - "deadline": cat.Expiration,
|
| - }.Debugf(c, "Scheduling cleanup archival mutation.")
|
| - } else {
|
| - // Terminal index, schedule optimistic archival (mirrors TerminateStream).
|
| - cat.SettleDelay = params.SettleDelay
|
| - cat.CompletePeriod = params.CompletePeriod
|
| -
|
| - // Schedule this mutation to execute after our settle delay.
|
| - cat.Expiration = now.Add(params.SettleDelay)
|
| -
|
| - log.Fields{
|
| - "settleDelay": cat.SettleDelay,
|
| - "completePeriod": cat.CompletePeriod,
|
| - "scheduledAt": cat.Expiration,
|
| - }.Debugf(c, "Scheduling archival mutation.")
|
| - }
|
| -
|
| - aeParent, aeName := cat.TaskName(c)
|
| - if err := tumble.PutNamedMutations(c, aeParent, map[string]tumble.Mutation{aeName: &cat}); err != nil {
|
| - log.WithError(err).Errorf(c, "Failed to write named mutations.")
|
| - return nil, grpcutil.Internal
|
| - }
|
| -
|
| - return nil, nil
|
| -}
|
| -
|
| -func (m *registerStreamMutation) Root(c context.Context) *ds.Key {
|
| - return ds.KeyForObj(c, m.ls)
|
| -}
|
|
|