Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(32)

Side by Side Diff: appengine/logdog/coordinator/endpoints/services/terminateStream.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Fix proto comment. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package services 5 package services
6 6
7 import ( 7 import (
8 "crypto/subtle" 8 "crypto/subtle"
9 "errors"
10 9
11 ds "github.com/luci/gae/service/datastore" 10 ds "github.com/luci/gae/service/datastore"
11 "github.com/luci/gae/service/info"
12 "github.com/luci/luci-go/appengine/logdog/coordinator" 12 "github.com/luci/luci-go/appengine/logdog/coordinator"
13 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" 13 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
14 "github.com/luci/luci-go/common/clock" 14 "github.com/luci/luci-go/common/clock"
15 "github.com/luci/luci-go/common/grpcutil" 15 "github.com/luci/luci-go/common/grpcutil"
16 "github.com/luci/luci-go/common/logdog/types" 16 "github.com/luci/luci-go/common/logdog/types"
17 log "github.com/luci/luci-go/common/logging" 17 log "github.com/luci/luci-go/common/logging"
18 "github.com/luci/luci-go/common/proto/google" 18 "github.com/luci/luci-go/common/proto/google"
19 "golang.org/x/net/context" 19 "golang.org/x/net/context"
20 "google.golang.org/grpc"
21 "google.golang.org/grpc/codes" 20 "google.golang.org/grpc/codes"
22 ) 21 )
23 22
24 var errAlreadyUpdated = errors.New("already updated") 23 // TerminateStream is an idempotent stream state terminate operation.
24 func (s *Server) TerminateStream(c context.Context, req *logdog.TerminateStreamR equest) (*google.Empty, error) {
25 » svc := s.GetServices()
26 » if err := Auth(c, svc); err != nil {
27 » » return nil, err
28 » }
25 29
26 // TerminateStream is an idempotent stream state terminate operation. 30 » log.Fields{
27 func (b *Server) TerminateStream(c context.Context, req *logdog.TerminateStreamR equest) (*google.Empty, error) { 31 » » "path": req.Path,
28 » if err := Auth(c); err != nil { 32 » » "terminalIndex": req.TerminalIndex,
29 » » return nil, err 33 » }.Infof(c, "Request to terminate log stream.")
34
35 » if req.TerminalIndex < 0 {
36 » » return nil, grpcutil.Errf(codes.InvalidArgument, "Negative termi nal index.")
30 } 37 }
31 38
32 path := types.StreamPath(req.Path) 39 path := types.StreamPath(req.Path)
33 if err := path.Validate(); err != nil { 40 if err := path.Validate(); err != nil {
34 return nil, grpcutil.Errf(codes.InvalidArgument, "Invalid path ( %s): %s", req.Path, err) 41 return nil, grpcutil.Errf(codes.InvalidArgument, "Invalid path ( %s): %s", req.Path, err)
35 } 42 }
36 c = log.SetField(c, "path", req.Path)
37 43
38 » if req.TerminalIndex < 0 { 44 » _, cfg, err := svc.Config(c)
39 » » return nil, grpcutil.Errf(codes.InvalidArgument, "Negative termi nal index.") 45 » if err != nil {
46 » » log.WithError(err).Errorf(c, "Failed to load configuration.")
47 » » return nil, grpcutil.Internal
48 » }
49
50 » ap, err := svc.ArchivalPublisher(c)
51 » if err != nil {
52 » » log.WithError(err).Errorf(c, "Failed to get archival publisher i nstance.")
53 » » return nil, grpcutil.Internal
40 } 54 }
41 55
42 // Initialize our log stream. This cannot fail since we have already val idated 56 // Initialize our log stream. This cannot fail since we have already val idated
43 // req.Path. 57 // req.Path.
44 ls := coordinator.LogStreamFromPath(path) 58 ls := coordinator.LogStreamFromPath(path)
45 switch err := updateTerminalIndex(c, ls, req); err {
46 case errAlreadyUpdated:
47 return &google.Empty{}, nil
48 59
49 » // To be confirmed/resolved transactionally. 60 » // Initialize our archival parameters.
50 » case nil: 61 » params := coordinator.ArchivalParams{
51 » » break 62 » » RequestID: info.Get(c).RequestID(),
52 » default: 63 » » SettleDelay: cfg.Coordinator.ArchiveSettleDelay.Duration(),
53 » » // Because we're not in a transaction, forgive a "not found" sta tus. 64 » » CompletePeriod: cfg.Coordinator.ArchiveDelayMax.Duration(),
54 » » if grpc.Code(err) != codes.NotFound {
55 » » » log.WithError(err).Errorf(c, "Failed to check LogStream status.")
56 » » » return nil, err
57 » » }
58 } 65 }
59 66
60 » // Transactionally update. 67 » // Transactionally validate and update the terminal index.
61 » now := clock.Now(c).UTC() 68 » err = ds.Get(c).RunInTransaction(func(c context.Context) error {
62 » err := ds.Get(c).RunInTransaction(func(c context.Context) error { 69 » » if err := ds.Get(c).Get(ls); err != nil {
63 » » di := ds.Get(c) 70 » » » if err == ds.ErrNoSuchEntity {
71 » » » » log.Debugf(c, "LogEntry not found.")
72 » » » » return grpcutil.Errf(codes.NotFound, "Log stream %q is not registered", req.Path)
73 » » » }
64 74
65 » » // Load the log stream state. 75 » » » log.WithError(err).Errorf(c, "Failed to load LogEntry.")
66 » » switch err := updateTerminalIndex(c, ls, req); err { 76 » » » return grpcutil.Internal
67 » » case nil: 77 » » }
68 » » » ls.Updated = now
69 » » » ls.State = coordinator.LSTerminated
70 78
71 » » » if err := ls.Put(di); err != nil { 79 » » switch {
80 » » case subtle.ConstantTimeCompare(ls.Secret, req.Secret) != 1:
81 » » » log.Errorf(c, "Secrets do not match.")
82 » » » return grpcutil.Errf(codes.InvalidArgument, "Request sec ret doesn't match the stream secret.")
83
84 » » case ls.State > coordinator.LSStreaming:
85 » » » // Succeed if this is non-conflicting (idempotent).
86 » » » if ls.TerminalIndex == req.TerminalIndex {
87 » » » » log.Fields{
88 » » » » » "state": ls.State.String(),
89 » » » » » "terminalIndex": ls.TerminalIndex,
90 » » » » }.Infof(c, "Log stream is already terminated.")
91 » » » » return nil
92 » » » }
93
94 » » » log.Fields{
95 » » » » "state": ls.State.String(),
96 » » » » "terminalIndex": ls.TerminalIndex,
97 » » » }.Warningf(c, "Log stream is not in streaming state.")
98 » » » return grpcutil.Errf(codes.FailedPrecondition, "Log stre am is not in streaming state.")
99
100 » » default:
101 » » » // Everything looks good, let's proceed...
102 » » » ls.TerminalIndex = req.TerminalIndex
103 » » » ls.TerminatedTime = ds.RoundTime(clock.Now(c).UTC())
104
105 » » » // Create an archival task.
106 » » » if err := params.PublishTask(c, ap, ls); err != nil {
107 » » » » if err == coordinator.ErrArchiveTasked {
108 » » » » » log.Warningf(c, "Archival has already be en tasked for this stream.")
109 » » » » » return nil
110 » » » » }
111
112 » » » » log.WithError(err).Errorf(c, "Failed to create a rchive task.")
113 » » » » return grpcutil.Internal
114 » » » }
115
116 » » » if err := ds.Get(c).Put(ls); err != nil {
72 log.Fields{ 117 log.Fields{
73 log.ErrorKey: err, 118 log.ErrorKey: err,
74 }.Errorf(c, "Failed to Put() LogStream.") 119 }.Errorf(c, "Failed to Put() LogStream.")
75 return grpcutil.Internal 120 return grpcutil.Internal
76 } 121 }
77 122
78 log.Fields{ 123 log.Fields{
79 "terminalIndex": ls.TerminalIndex, 124 "terminalIndex": ls.TerminalIndex,
80 » » » }.Infof(c, "Terminal index was set.") 125 » » » }.Infof(c, "Terminal index was set and archival was disp atched.")
81 return nil 126 return nil
82
83 case errAlreadyUpdated:
84 return nil
85
86 default:
87 return err
88 } 127 }
89 }, nil) 128 }, nil)
90 if err != nil { 129 if err != nil {
91 log.Fields{ 130 log.Fields{
92 log.ErrorKey: err, 131 log.ErrorKey: err,
93 }.Errorf(c, "Failed to update LogStream.") 132 }.Errorf(c, "Failed to update LogStream.")
94 return nil, err 133 return nil, err
95 } 134 }
96 135
97 return &google.Empty{}, nil 136 return &google.Empty{}, nil
98 } 137 }
99
100 func updateTerminalIndex(c context.Context, ls *coordinator.LogStream, req *logd og.TerminateStreamRequest) error {
101 if err := ds.Get(c).Get(ls); err != nil {
102 if err == ds.ErrNoSuchEntity {
103 log.Debugf(c, "LogEntry not found.")
104 return grpcutil.Errf(codes.NotFound, "Log stream [%s] is not registered", req.Path)
105 }
106
107 log.WithError(err).Errorf(c, "Failed to load LogEntry.")
108 return grpcutil.Internal
109 }
110
111 if subtle.ConstantTimeCompare(ls.Secret, req.Secret) != 1 {
112 log.Errorf(c, "Secrets do not match.")
113 return grpcutil.Errf(codes.InvalidArgument, "Request secret does n't match the stream secret.")
114 }
115
116 switch {
117 case ls.TerminalIndex == req.TerminalIndex:
118 // Idempotent: already updated to this value.
119 log.Debugf(c, "Log stream is already updated (idempotent).")
120 return errAlreadyUpdated
121
122 case ls.Terminated():
123 // Terminated, but with a different value.
124 log.Fields{
125 "current": ls.TerminalIndex,
126 "requested": req.TerminalIndex,
127 }.Warningf(c, "Refusing to change terminal index.")
128 return grpcutil.Errf(codes.AlreadyExists, "Terminal index is alr eady set.")
129
130 default:
131 ls.TerminalIndex = req.TerminalIndex
132 return nil
133 }
134 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698