| Index: appengine/logdog/coordinator/endpoints/services/terminateStream.go
|
| diff --git a/appengine/logdog/coordinator/endpoints/services/terminateStream.go b/appengine/logdog/coordinator/endpoints/services/terminateStream.go
|
| index b2afa56a7a297fb18048f2501f721524975111c0..51c92e6f7839f1733e5e674f9107725a80f81002 100644
|
| --- a/appengine/logdog/coordinator/endpoints/services/terminateStream.go
|
| +++ b/appengine/logdog/coordinator/endpoints/services/terminateStream.go
|
| @@ -6,10 +6,11 @@ package services
|
|
|
| import (
|
| "crypto/subtle"
|
| - "errors"
|
|
|
| ds "github.com/luci/gae/service/datastore"
|
| + tq "github.com/luci/gae/service/taskqueue"
|
| "github.com/luci/luci-go/appengine/logdog/coordinator"
|
| + "github.com/luci/luci-go/appengine/logdog/coordinator/config"
|
| "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
|
| "github.com/luci/luci-go/common/clock"
|
| "github.com/luci/luci-go/common/grpcutil"
|
| @@ -17,58 +18,95 @@ import (
|
| log "github.com/luci/luci-go/common/logging"
|
| "github.com/luci/luci-go/common/proto/google"
|
| "golang.org/x/net/context"
|
| - "google.golang.org/grpc"
|
| "google.golang.org/grpc/codes"
|
| )
|
|
|
| -var errAlreadyUpdated = errors.New("already updated")
|
| -
|
| // TerminateStream is an idempotent stream state terminate operation.
|
| func (b *Server) TerminateStream(c context.Context, req *logdog.TerminateStreamRequest) (*google.Empty, error) {
|
| if err := Auth(c); err != nil {
|
| return nil, err
|
| }
|
|
|
| - path := types.StreamPath(req.Path)
|
| - if err := path.Validate(); err != nil {
|
| - return nil, grpcutil.Errf(codes.InvalidArgument, "Invalid path (%s): %s", req.Path, err)
|
| + log.Fields{
|
| + "path": req.Path,
|
| + "terminalIndex": req.TerminalIndex,
|
| + }.Infof(c, "Request to terminate log stream.")
|
| +
|
| + cfg, err := config.Load(c)
|
| + if err != nil {
|
| + log.WithError(err).Errorf(c, "Failed to load configuration.")
|
| + return nil, grpcutil.Internal
|
| + }
|
| + ccfg := cfg.GetCoordinator() // config.Load ensures this is not nil.
|
| + if ccfg.ArchiveTaskQueue == "" {
|
| + log.Errorf(c, "No archive task queue defined.")
|
| + return nil, grpcutil.Internal
|
| }
|
| - c = log.SetField(c, "path", req.Path)
|
|
|
| if req.TerminalIndex < 0 {
|
| return nil, grpcutil.Errf(codes.InvalidArgument, "Negative terminal index.")
|
| }
|
|
|
| + path := types.StreamPath(req.Path)
|
| + if err := path.Validate(); err != nil {
|
| + return nil, grpcutil.Errf(codes.InvalidArgument, "Invalid path (%s): %s", req.Path, err)
|
| + }
|
| +
|
| // Initialize our log stream. This cannot fail since we have already validated
|
| // req.Path.
|
| ls := coordinator.LogStreamFromPath(path)
|
| - switch err := updateTerminalIndex(c, ls, req); err {
|
| - case errAlreadyUpdated:
|
| - return &google.Empty{}, nil
|
| -
|
| - // To be confirmed/resolved transactionally.
|
| - case nil:
|
| - break
|
| - default:
|
| - // Because we're not in a transaction, forgive a "not found" status.
|
| - if grpc.Code(err) != codes.NotFound {
|
| - log.WithError(err).Errorf(c, "Failed to check LogStream status.")
|
| - return nil, err
|
| - }
|
| +
|
| + // Initialize our archival parameters.
|
| + params := coordinator.ArchivalParams{
|
| + SettleDelay: ccfg.ArchiveSettleDelay.Duration(),
|
| + CompletePeriod: ccfg.ArchiveDelayMax.Duration(),
|
| }
|
|
|
| - // Transactionally update.
|
| - now := clock.Now(c).UTC()
|
| - err := ds.Get(c).RunInTransaction(func(c context.Context) error {
|
| - di := ds.Get(c)
|
| + // Transactionally validate and update the terminal index.
|
| + err = ds.Get(c).RunInTransaction(func(c context.Context) error {
|
| + if err := ds.Get(c).Get(ls); err != nil {
|
| + if err == ds.ErrNoSuchEntity {
|
| + log.Debugf(c, "LogEntry not found.")
|
| + return grpcutil.Errf(codes.NotFound, "Log stream %q is not registered", req.Path)
|
| + }
|
| +
|
| + log.WithError(err).Errorf(c, "Failed to load LogEntry.")
|
| + return grpcutil.Internal
|
| + }
|
|
|
| - // Load the log stream state.
|
| - switch err := updateTerminalIndex(c, ls, req); err {
|
| - case nil:
|
| - ls.Updated = now
|
| - ls.State = coordinator.LSTerminated
|
| + switch {
|
| + case subtle.ConstantTimeCompare(ls.Secret, req.Secret) != 1:
|
| + log.Errorf(c, "Secrets do not match.")
|
| + return grpcutil.Errf(codes.InvalidArgument, "Request secret doesn't match the stream secret.")
|
|
|
| - if err := ls.Put(di); err != nil {
|
| + case ls.State > coordinator.LSStreaming:
|
| + // Succeed if this is non-conflicting (idempotent).
|
| + if ls.TerminalIndex == req.TerminalIndex {
|
| + log.Fields{
|
| + "state": ls.State.String(),
|
| + "terminalIndex": ls.TerminalIndex,
|
| + }.Infof(c, "Log stream is already terminated.")
|
| + return nil
|
| + }
|
| +
|
| + log.Fields{
|
| + "state": ls.State.String(),
|
| + "terminalIndex": ls.TerminalIndex,
|
| + }.Warningf(c, "Log stream is not in streaming state.")
|
| + return grpcutil.Errf(codes.FailedPrecondition, "Log stream is not in streaming state.")
|
| +
|
| + default:
|
| + // Everything looks good, let's proceed...
|
| + ls.TerminalIndex = req.TerminalIndex
|
| + ls.TerminatedTime = ds.RoundTime(clock.Now(c).UTC())
|
| +
|
| + // Create an archival task.
|
| + if _, err := params.CreateTask(tq.Get(c), ls, ccfg.ArchiveTaskQueue); err != nil {
|
| + log.WithError(err).Errorf(c, "Failed to create archive task.")
|
| + return grpcutil.Internal
|
| + }
|
| +
|
| + if err := ds.Get(c).Put(ls); err != nil {
|
| log.Fields{
|
| log.ErrorKey: err,
|
| }.Errorf(c, "Failed to Put() LogStream.")
|
| @@ -77,14 +115,8 @@ func (b *Server) TerminateStream(c context.Context, req *logdog.TerminateStreamR
|
|
|
| log.Fields{
|
| "terminalIndex": ls.TerminalIndex,
|
| - }.Infof(c, "Terminal index was set.")
|
| + }.Infof(c, "Terminal index was set and archival was dispatched.")
|
| return nil
|
| -
|
| - case errAlreadyUpdated:
|
| - return nil
|
| -
|
| - default:
|
| - return err
|
| }
|
| }, nil)
|
| if err != nil {
|
| @@ -96,39 +128,3 @@ func (b *Server) TerminateStream(c context.Context, req *logdog.TerminateStreamR
|
|
|
| return &google.Empty{}, nil
|
| }
|
| -
|
| -func updateTerminalIndex(c context.Context, ls *coordinator.LogStream, req *logdog.TerminateStreamRequest) error {
|
| - if err := ds.Get(c).Get(ls); err != nil {
|
| - if err == ds.ErrNoSuchEntity {
|
| - log.Debugf(c, "LogEntry not found.")
|
| - return grpcutil.Errf(codes.NotFound, "Log stream [%s] is not registered", req.Path)
|
| - }
|
| -
|
| - log.WithError(err).Errorf(c, "Failed to load LogEntry.")
|
| - return grpcutil.Internal
|
| - }
|
| -
|
| - if subtle.ConstantTimeCompare(ls.Secret, req.Secret) != 1 {
|
| - log.Errorf(c, "Secrets do not match.")
|
| - return grpcutil.Errf(codes.InvalidArgument, "Request secret doesn't match the stream secret.")
|
| - }
|
| -
|
| - switch {
|
| - case ls.TerminalIndex == req.TerminalIndex:
|
| - // Idempotent: already updated to this value.
|
| - log.Debugf(c, "Log stream is already updated (idempotent).")
|
| - return errAlreadyUpdated
|
| -
|
| - case ls.Terminated():
|
| - // Terminated, but with a different value.
|
| - log.Fields{
|
| - "current": ls.TerminalIndex,
|
| - "requested": req.TerminalIndex,
|
| - }.Warningf(c, "Refusing to change terminal index.")
|
| - return grpcutil.Errf(codes.AlreadyExists, "Terminal index is already set.")
|
| -
|
| - default:
|
| - ls.TerminalIndex = req.TerminalIndex
|
| - return nil
|
| - }
|
| -}
|
|
|