| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package services | 5 package services |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "crypto/subtle" | 8 "crypto/subtle" |
| 9 "errors" | |
| 10 | 9 |
| 11 ds "github.com/luci/gae/service/datastore" | 10 ds "github.com/luci/gae/service/datastore" |
| 11 "github.com/luci/gae/service/info" |
| 12 "github.com/luci/luci-go/appengine/logdog/coordinator" | 12 "github.com/luci/luci-go/appengine/logdog/coordinator" |
| 13 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" | 13 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
| 14 "github.com/luci/luci-go/common/clock" | 14 "github.com/luci/luci-go/common/clock" |
| 15 "github.com/luci/luci-go/common/grpcutil" | 15 "github.com/luci/luci-go/common/grpcutil" |
| 16 "github.com/luci/luci-go/common/logdog/types" | 16 "github.com/luci/luci-go/common/logdog/types" |
| 17 log "github.com/luci/luci-go/common/logging" | 17 log "github.com/luci/luci-go/common/logging" |
| 18 "github.com/luci/luci-go/common/proto/google" | 18 "github.com/luci/luci-go/common/proto/google" |
| 19 "golang.org/x/net/context" | 19 "golang.org/x/net/context" |
| 20 "google.golang.org/grpc" | |
| 21 "google.golang.org/grpc/codes" | 20 "google.golang.org/grpc/codes" |
| 22 ) | 21 ) |
| 23 | 22 |
| 24 var errAlreadyUpdated = errors.New("already updated") | 23 // TerminateStream is an idempotent stream state terminate operation. |
| 24 func (s *Server) TerminateStream(c context.Context, req *logdog.TerminateStreamR
equest) (*google.Empty, error) { |
| 25 » svc := s.GetServices() |
| 26 » if err := Auth(c, svc); err != nil { |
| 27 » » return nil, err |
| 28 » } |
| 25 | 29 |
| 26 // TerminateStream is an idempotent stream state terminate operation. | 30 » log.Fields{ |
| 27 func (b *Server) TerminateStream(c context.Context, req *logdog.TerminateStreamR
equest) (*google.Empty, error) { | 31 » » "path": req.Path, |
| 28 » if err := Auth(c); err != nil { | 32 » » "terminalIndex": req.TerminalIndex, |
| 29 » » return nil, err | 33 » }.Infof(c, "Request to terminate log stream.") |
| 34 |
| 35 » if req.TerminalIndex < 0 { |
| 36 » » return nil, grpcutil.Errf(codes.InvalidArgument, "Negative termi
nal index.") |
| 30 } | 37 } |
| 31 | 38 |
| 32 path := types.StreamPath(req.Path) | 39 path := types.StreamPath(req.Path) |
| 33 if err := path.Validate(); err != nil { | 40 if err := path.Validate(); err != nil { |
| 34 return nil, grpcutil.Errf(codes.InvalidArgument, "Invalid path (
%s): %s", req.Path, err) | 41 return nil, grpcutil.Errf(codes.InvalidArgument, "Invalid path (
%s): %s", req.Path, err) |
| 35 } | 42 } |
| 36 c = log.SetField(c, "path", req.Path) | |
| 37 | 43 |
| 38 » if req.TerminalIndex < 0 { | 44 » _, cfg, err := svc.Config(c) |
| 39 » » return nil, grpcutil.Errf(codes.InvalidArgument, "Negative termi
nal index.") | 45 » if err != nil { |
| 46 » » log.WithError(err).Errorf(c, "Failed to load configuration.") |
| 47 » » return nil, grpcutil.Internal |
| 48 » } |
| 49 |
| 50 » ap, err := svc.ArchivalPublisher(c) |
| 51 » if err != nil { |
| 52 » » log.WithError(err).Errorf(c, "Failed to get archival publisher i
nstance.") |
| 53 » » return nil, grpcutil.Internal |
| 40 } | 54 } |
| 41 | 55 |
| 42 // Initialize our log stream. This cannot fail since we have already val
idated | 56 // Initialize our log stream. This cannot fail since we have already val
idated |
| 43 // req.Path. | 57 // req.Path. |
| 44 ls := coordinator.LogStreamFromPath(path) | 58 ls := coordinator.LogStreamFromPath(path) |
| 45 switch err := updateTerminalIndex(c, ls, req); err { | |
| 46 case errAlreadyUpdated: | |
| 47 return &google.Empty{}, nil | |
| 48 | 59 |
| 49 » // To be confirmed/resolved transactionally. | 60 » // Initialize our archival parameters. |
| 50 » case nil: | 61 » params := coordinator.ArchivalParams{ |
| 51 » » break | 62 » » RequestID: info.Get(c).RequestID(), |
| 52 » default: | 63 » » SettleDelay: cfg.Coordinator.ArchiveSettleDelay.Duration(), |
| 53 » » // Because we're not in a transaction, forgive a "not found" sta
tus. | 64 » » CompletePeriod: cfg.Coordinator.ArchiveDelayMax.Duration(), |
| 54 » » if grpc.Code(err) != codes.NotFound { | |
| 55 » » » log.WithError(err).Errorf(c, "Failed to check LogStream
status.") | |
| 56 » » » return nil, err | |
| 57 » » } | |
| 58 } | 65 } |
| 59 | 66 |
| 60 » // Transactionally update. | 67 » // Transactionally validate and update the terminal index. |
| 61 » now := clock.Now(c).UTC() | 68 » err = ds.Get(c).RunInTransaction(func(c context.Context) error { |
| 62 » err := ds.Get(c).RunInTransaction(func(c context.Context) error { | 69 » » if err := ds.Get(c).Get(ls); err != nil { |
| 63 » » di := ds.Get(c) | 70 » » » if err == ds.ErrNoSuchEntity { |
| 71 » » » » log.Debugf(c, "LogEntry not found.") |
| 72 » » » » return grpcutil.Errf(codes.NotFound, "Log stream
%q is not registered", req.Path) |
| 73 » » » } |
| 64 | 74 |
| 65 » » // Load the log stream state. | 75 » » » log.WithError(err).Errorf(c, "Failed to load LogEntry.") |
| 66 » » switch err := updateTerminalIndex(c, ls, req); err { | 76 » » » return grpcutil.Internal |
| 67 » » case nil: | 77 » » } |
| 68 » » » ls.Updated = now | |
| 69 » » » ls.State = coordinator.LSTerminated | |
| 70 | 78 |
| 71 » » » if err := ls.Put(di); err != nil { | 79 » » switch { |
| 80 » » case subtle.ConstantTimeCompare(ls.Secret, req.Secret) != 1: |
| 81 » » » log.Errorf(c, "Secrets do not match.") |
| 82 » » » return grpcutil.Errf(codes.InvalidArgument, "Request sec
ret doesn't match the stream secret.") |
| 83 |
| 84 » » case ls.State > coordinator.LSStreaming: |
| 85 » » » // Succeed if this is non-conflicting (idempotent). |
| 86 » » » if ls.TerminalIndex == req.TerminalIndex { |
| 87 » » » » log.Fields{ |
| 88 » » » » » "state": ls.State.String(), |
| 89 » » » » » "terminalIndex": ls.TerminalIndex, |
| 90 » » » » }.Infof(c, "Log stream is already terminated.") |
| 91 » » » » return nil |
| 92 » » » } |
| 93 |
| 94 » » » log.Fields{ |
| 95 » » » » "state": ls.State.String(), |
| 96 » » » » "terminalIndex": ls.TerminalIndex, |
| 97 » » » }.Warningf(c, "Log stream is not in streaming state.") |
| 98 » » » return grpcutil.Errf(codes.FailedPrecondition, "Log stre
am is not in streaming state.") |
| 99 |
| 100 » » default: |
| 101 » » » // Everything looks good, let's proceed... |
| 102 » » » ls.TerminalIndex = req.TerminalIndex |
| 103 » » » ls.TerminatedTime = ds.RoundTime(clock.Now(c).UTC()) |
| 104 |
| 105 » » » // Create an archival task. |
| 106 » » » if err := params.PublishTask(c, ap, ls); err != nil { |
| 107 » » » » if err == coordinator.ErrArchiveTasked { |
| 108 » » » » » log.Warningf(c, "Archival has already be
en tasked for this stream.") |
| 109 » » » » » return nil |
| 110 » » » » } |
| 111 |
| 112 » » » » log.WithError(err).Errorf(c, "Failed to create a
rchive task.") |
| 113 » » » » return grpcutil.Internal |
| 114 » » » } |
| 115 |
| 116 » » » if err := ds.Get(c).Put(ls); err != nil { |
| 72 log.Fields{ | 117 log.Fields{ |
| 73 log.ErrorKey: err, | 118 log.ErrorKey: err, |
| 74 }.Errorf(c, "Failed to Put() LogStream.") | 119 }.Errorf(c, "Failed to Put() LogStream.") |
| 75 return grpcutil.Internal | 120 return grpcutil.Internal |
| 76 } | 121 } |
| 77 | 122 |
| 78 log.Fields{ | 123 log.Fields{ |
| 79 "terminalIndex": ls.TerminalIndex, | 124 "terminalIndex": ls.TerminalIndex, |
| 80 » » » }.Infof(c, "Terminal index was set.") | 125 » » » }.Infof(c, "Terminal index was set and archival was disp
atched.") |
| 81 return nil | 126 return nil |
| 82 | |
| 83 case errAlreadyUpdated: | |
| 84 return nil | |
| 85 | |
| 86 default: | |
| 87 return err | |
| 88 } | 127 } |
| 89 }, nil) | 128 }, nil) |
| 90 if err != nil { | 129 if err != nil { |
| 91 log.Fields{ | 130 log.Fields{ |
| 92 log.ErrorKey: err, | 131 log.ErrorKey: err, |
| 93 }.Errorf(c, "Failed to update LogStream.") | 132 }.Errorf(c, "Failed to update LogStream.") |
| 94 return nil, err | 133 return nil, err |
| 95 } | 134 } |
| 96 | 135 |
| 97 return &google.Empty{}, nil | 136 return &google.Empty{}, nil |
| 98 } | 137 } |
| 99 | |
| 100 func updateTerminalIndex(c context.Context, ls *coordinator.LogStream, req *logd
og.TerminateStreamRequest) error { | |
| 101 if err := ds.Get(c).Get(ls); err != nil { | |
| 102 if err == ds.ErrNoSuchEntity { | |
| 103 log.Debugf(c, "LogEntry not found.") | |
| 104 return grpcutil.Errf(codes.NotFound, "Log stream [%s] is
not registered", req.Path) | |
| 105 } | |
| 106 | |
| 107 log.WithError(err).Errorf(c, "Failed to load LogEntry.") | |
| 108 return grpcutil.Internal | |
| 109 } | |
| 110 | |
| 111 if subtle.ConstantTimeCompare(ls.Secret, req.Secret) != 1 { | |
| 112 log.Errorf(c, "Secrets do not match.") | |
| 113 return grpcutil.Errf(codes.InvalidArgument, "Request secret does
n't match the stream secret.") | |
| 114 } | |
| 115 | |
| 116 switch { | |
| 117 case ls.TerminalIndex == req.TerminalIndex: | |
| 118 // Idempotent: already updated to this value. | |
| 119 log.Debugf(c, "Log stream is already updated (idempotent).") | |
| 120 return errAlreadyUpdated | |
| 121 | |
| 122 case ls.Terminated(): | |
| 123 // Terminated, but with a different value. | |
| 124 log.Fields{ | |
| 125 "current": ls.TerminalIndex, | |
| 126 "requested": req.TerminalIndex, | |
| 127 }.Warningf(c, "Refusing to change terminal index.") | |
| 128 return grpcutil.Errf(codes.AlreadyExists, "Terminal index is alr
eady set.") | |
| 129 | |
| 130 default: | |
| 131 ls.TerminalIndex = req.TerminalIndex | |
| 132 return nil | |
| 133 } | |
| 134 } | |
| OLD | NEW |