Index: appengine/logdog/coordinator/endpoints/services/terminateStream.go |
diff --git a/appengine/logdog/coordinator/endpoints/services/terminateStream.go b/appengine/logdog/coordinator/endpoints/services/terminateStream.go |
index 923fd0548237159cae23b71911f519123ecba74d..3237f04cfa1e2dced044647cdc7fd382ecb4984c 100644 |
--- a/appengine/logdog/coordinator/endpoints/services/terminateStream.go |
+++ b/appengine/logdog/coordinator/endpoints/services/terminateStream.go |
@@ -10,6 +10,7 @@ import ( |
ds "github.com/luci/gae/service/datastore" |
"github.com/luci/gae/service/info" |
"github.com/luci/luci-go/appengine/logdog/coordinator" |
+ "github.com/luci/luci-go/appengine/logdog/coordinator/endpoints" |
"github.com/luci/luci-go/appengine/logdog/coordinator/mutations" |
"github.com/luci/luci-go/appengine/tumble" |
"github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
@@ -38,6 +39,7 @@ func (s *server) TerminateStream(c context.Context, req *logdog.TerminateStreamR |
return nil, grpcutil.Errf(codes.InvalidArgument, "Invalid ID (%s): %s", id, err) |
} |
+ // Load our service and project configs. |
svc := coordinator.GetServices(c) |
cfg, err := svc.Config(c) |
if err != nil { |
@@ -45,23 +47,29 @@ func (s *server) TerminateStream(c context.Context, req *logdog.TerminateStreamR |
return nil, grpcutil.Internal |
} |
- ap, err := svc.ArchivalPublisher(c) |
+ pcfg, err := coordinator.CurrentProjectConfig(c) |
if err != nil { |
- log.WithError(err).Errorf(c, "Failed to get archival publisher instance.") |
+ log.WithError(err).Errorf(c, "Failed to load current project configuration.") |
return nil, grpcutil.Internal |
} |
- // Initialize our log stream state. |
- di := ds.Get(c) |
- lst := coordinator.NewLogStreamState(di, id) |
- |
// Initialize our archival parameters. |
params := coordinator.ArchivalParams{ |
RequestID: info.Get(c).RequestID(), |
SettleDelay: cfg.Coordinator.ArchiveSettleDelay.Duration(), |
- CompletePeriod: cfg.Coordinator.ArchiveDelayMax.Duration(), |
+ CompletePeriod: endpoints.MinDuration(cfg.Coordinator.ArchiveDelayMax, pcfg.MaxStreamAge), |
+ } |
+ |
+ ap, err := svc.ArchivalPublisher(c) |
+ if err != nil { |
+ log.WithError(err).Errorf(c, "Failed to get archival publisher instance.") |
+ return nil, grpcutil.Internal |
} |
+ // Initialize our log stream state. |
+ di := ds.Get(c) |
+ lst := coordinator.NewLogStreamState(di, id) |
+ |
// Transactionally validate and update the terminal index. |
err = di.RunInTransaction(func(c context.Context) error { |
di := ds.Get(c) |