| Index: appengine/logdog/coordinator/endpoints/services/terminateStream.go
|
| diff --git a/appengine/logdog/coordinator/endpoints/services/terminateStream.go b/appengine/logdog/coordinator/endpoints/services/terminateStream.go
|
| index 923fd0548237159cae23b71911f519123ecba74d..e7a4c9bc4c5864a69d7eff76e5b8481c79e0ecf4 100644
|
| --- a/appengine/logdog/coordinator/endpoints/services/terminateStream.go
|
| +++ b/appengine/logdog/coordinator/endpoints/services/terminateStream.go
|
| @@ -38,6 +38,7 @@ func (s *server) TerminateStream(c context.Context, req *logdog.TerminateStreamR
|
| return nil, grpcutil.Errf(codes.InvalidArgument, "Invalid ID (%s): %s", id, err)
|
| }
|
|
|
| + // Load our service and project configs.
|
| svc := coordinator.GetServices(c)
|
| cfg, err := svc.Config(c)
|
| if err != nil {
|
| @@ -45,23 +46,29 @@ func (s *server) TerminateStream(c context.Context, req *logdog.TerminateStreamR
|
| return nil, grpcutil.Internal
|
| }
|
|
|
| - ap, err := svc.ArchivalPublisher(c)
|
| + pcfg, err := coordinator.CurrentProjectConfig(c)
|
| if err != nil {
|
| - log.WithError(err).Errorf(c, "Failed to get archival publisher instance.")
|
| + log.WithError(err).Errorf(c, "Failed to load current project configuration.")
|
| return nil, grpcutil.Internal
|
| }
|
|
|
| - // Initialize our log stream state.
|
| - di := ds.Get(c)
|
| - lst := coordinator.NewLogStreamState(di, id)
|
| -
|
| // Initialize our archival parameters.
|
| params := coordinator.ArchivalParams{
|
| RequestID: info.Get(c).RequestID(),
|
| SettleDelay: cfg.Coordinator.ArchiveSettleDelay.Duration(),
|
| - CompletePeriod: cfg.Coordinator.ArchiveDelayMax.Duration(),
|
| + CompletePeriod: resolveArchiveDelay(cfg.Coordinator, pcfg),
|
| + }
|
| +
|
| + ap, err := svc.ArchivalPublisher(c)
|
| + if err != nil {
|
| + log.WithError(err).Errorf(c, "Failed to get archival publisher instance.")
|
| + return nil, grpcutil.Internal
|
| }
|
|
|
| + // Initialize our log stream state.
|
| + di := ds.Get(c)
|
| + lst := coordinator.NewLogStreamState(di, id)
|
| +
|
| // Transactionally validate and update the terminal index.
|
| err = di.RunInTransaction(func(c context.Context) error {
|
| di := ds.Get(c)
|
|
|