Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(404)

Side by Side Diff: appengine/logdog/coordinator/endpoints/services/terminateStream.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Minor fixes, works in dev now. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package services 5 package services
6 6
7 import ( 7 import (
8 "crypto/subtle" 8 "crypto/subtle"
9 "errors"
10 9
11 ds "github.com/luci/gae/service/datastore" 10 ds "github.com/luci/gae/service/datastore"
11 tq "github.com/luci/gae/service/taskqueue"
12 "github.com/luci/luci-go/appengine/logdog/coordinator" 12 "github.com/luci/luci-go/appengine/logdog/coordinator"
13 "github.com/luci/luci-go/appengine/logdog/coordinator/config"
13 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" 14 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
14 "github.com/luci/luci-go/common/clock" 15 "github.com/luci/luci-go/common/clock"
15 "github.com/luci/luci-go/common/grpcutil" 16 "github.com/luci/luci-go/common/grpcutil"
16 "github.com/luci/luci-go/common/logdog/types" 17 "github.com/luci/luci-go/common/logdog/types"
17 log "github.com/luci/luci-go/common/logging" 18 log "github.com/luci/luci-go/common/logging"
18 "github.com/luci/luci-go/common/proto/google" 19 "github.com/luci/luci-go/common/proto/google"
19 "golang.org/x/net/context" 20 "golang.org/x/net/context"
20 "google.golang.org/grpc"
21 "google.golang.org/grpc/codes" 21 "google.golang.org/grpc/codes"
22 ) 22 )
23 23
24 var errAlreadyUpdated = errors.New("already updated")
25
26 // TerminateStream is an idempotent stream state terminate operation. 24 // TerminateStream is an idempotent stream state terminate operation.
27 func (b *Server) TerminateStream(c context.Context, req *logdog.TerminateStreamR equest) (*google.Empty, error) { 25 func (b *Server) TerminateStream(c context.Context, req *logdog.TerminateStreamR equest) (*google.Empty, error) {
28 if err := Auth(c); err != nil { 26 if err := Auth(c); err != nil {
29 return nil, err 27 return nil, err
30 } 28 }
31 29
32 » path := types.StreamPath(req.Path) 30 » log.Fields{
33 » if err := path.Validate(); err != nil { 31 » » "path": req.Path,
34 » » return nil, grpcutil.Errf(codes.InvalidArgument, "Invalid path ( %s): %s", req.Path, err) 32 » » "terminalIndex": req.TerminalIndex,
33 » }.Infof(c, "Request to terminate log stream.")
34
35 » cfg, err := config.Load(c)
36 » if err != nil {
37 » » log.WithError(err).Errorf(c, "Failed to load configuration.")
38 » » return nil, grpcutil.Internal
35 } 39 }
36 » c = log.SetField(c, "path", req.Path) 40 » ccfg := cfg.GetCoordinator() // config.Load ensures this is not nil.
41 » if ccfg.ArchiveTaskQueue == "" {
42 » » log.Errorf(c, "No archive task queue defined.")
43 » » return nil, grpcutil.Internal
44 » }
37 45
38 if req.TerminalIndex < 0 { 46 if req.TerminalIndex < 0 {
39 return nil, grpcutil.Errf(codes.InvalidArgument, "Negative termi nal index.") 47 return nil, grpcutil.Errf(codes.InvalidArgument, "Negative termi nal index.")
40 } 48 }
41 49
50 path := types.StreamPath(req.Path)
51 if err := path.Validate(); err != nil {
52 return nil, grpcutil.Errf(codes.InvalidArgument, "Invalid path ( %s): %s", req.Path, err)
53 }
54
42 // Initialize our log stream. This cannot fail since we have already val idated 55 // Initialize our log stream. This cannot fail since we have already val idated
43 // req.Path. 56 // req.Path.
44 ls := coordinator.LogStreamFromPath(path) 57 ls := coordinator.LogStreamFromPath(path)
45 switch err := updateTerminalIndex(c, ls, req); err {
46 case errAlreadyUpdated:
47 return &google.Empty{}, nil
48 58
49 » // To be confirmed/resolved transactionally. 59 » // Initialize our archival parameters.
50 » case nil: 60 » params := coordinator.ArchivalParams{
51 » » break 61 » » SettleDelay: ccfg.ArchiveSettleDelay.Duration(),
52 » default: 62 » » CompletePeriod: ccfg.ArchiveDelayMax.Duration(),
53 » » // Because we're not in a transaction, forgive a "not found" sta tus.
54 » » if grpc.Code(err) != codes.NotFound {
55 » » » log.WithError(err).Errorf(c, "Failed to check LogStream status.")
56 » » » return nil, err
57 » » }
58 } 63 }
59 64
60 » // Transactionally update. 65 » // Transactionally validate and update the terminal index.
61 » now := clock.Now(c).UTC() 66 » err = ds.Get(c).RunInTransaction(func(c context.Context) error {
62 » err := ds.Get(c).RunInTransaction(func(c context.Context) error { 67 » » if err := ds.Get(c).Get(ls); err != nil {
63 » » di := ds.Get(c) 68 » » » if err == ds.ErrNoSuchEntity {
69 » » » » log.Debugf(c, "LogEntry not found.")
70 » » » » return grpcutil.Errf(codes.NotFound, "Log stream %q is not registered", req.Path)
71 » » » }
64 72
65 » » // Load the log stream state. 73 » » » log.WithError(err).Errorf(c, "Failed to load LogEntry.")
66 » » switch err := updateTerminalIndex(c, ls, req); err { 74 » » » return grpcutil.Internal
67 » » case nil: 75 » » }
68 » » » ls.Updated = now
69 » » » ls.State = coordinator.LSTerminated
70 76
71 » » » if err := ls.Put(di); err != nil { 77 » » switch {
78 » » case subtle.ConstantTimeCompare(ls.Secret, req.Secret) != 1:
79 » » » log.Errorf(c, "Secrets do not match.")
80 » » » return grpcutil.Errf(codes.InvalidArgument, "Request sec ret doesn't match the stream secret.")
81
82 » » case ls.State > coordinator.LSStreaming:
83 » » » // Succeed if this is non-conflicting (idempotent).
84 » » » if ls.TerminalIndex == req.TerminalIndex {
85 » » » » log.Fields{
86 » » » » » "state": ls.State.String(),
87 » » » » » "terminalIndex": ls.TerminalIndex,
88 » » » » }.Infof(c, "Log stream is already terminated.")
89 » » » » return nil
90 » » » }
91
92 » » » log.Fields{
93 » » » » "state": ls.State.String(),
94 » » » » "terminalIndex": ls.TerminalIndex,
95 » » » }.Warningf(c, "Log stream is not in streaming state.")
96 » » » return grpcutil.Errf(codes.FailedPrecondition, "Log stre am is not in streaming state.")
97
98 » » default:
99 » » » // Everything looks good, let's proceed...
100 » » » ls.TerminalIndex = req.TerminalIndex
101 » » » ls.TerminatedTime = ds.RoundTime(clock.Now(c).UTC())
102
103 » » » // Create an archival task.
104 » » » if _, err := params.CreateTask(tq.Get(c), ls, ccfg.Archi veTaskQueue); err != nil {
105 » » » » log.WithError(err).Errorf(c, "Failed to create a rchive task.")
106 » » » » return grpcutil.Internal
107 » » » }
108
109 » » » if err := ds.Get(c).Put(ls); err != nil {
72 log.Fields{ 110 log.Fields{
73 log.ErrorKey: err, 111 log.ErrorKey: err,
74 }.Errorf(c, "Failed to Put() LogStream.") 112 }.Errorf(c, "Failed to Put() LogStream.")
75 return grpcutil.Internal 113 return grpcutil.Internal
76 } 114 }
77 115
78 log.Fields{ 116 log.Fields{
79 "terminalIndex": ls.TerminalIndex, 117 "terminalIndex": ls.TerminalIndex,
80 » » » }.Infof(c, "Terminal index was set.") 118 » » » }.Infof(c, "Terminal index was set and archival was disp atched.")
81 return nil 119 return nil
82
83 case errAlreadyUpdated:
84 return nil
85
86 default:
87 return err
88 } 120 }
89 }, nil) 121 }, nil)
90 if err != nil { 122 if err != nil {
91 log.Fields{ 123 log.Fields{
92 log.ErrorKey: err, 124 log.ErrorKey: err,
93 }.Errorf(c, "Failed to update LogStream.") 125 }.Errorf(c, "Failed to update LogStream.")
94 return nil, err 126 return nil, err
95 } 127 }
96 128
97 return &google.Empty{}, nil 129 return &google.Empty{}, nil
98 } 130 }
99
100 func updateTerminalIndex(c context.Context, ls *coordinator.LogStream, req *logd og.TerminateStreamRequest) error {
101 if err := ds.Get(c).Get(ls); err != nil {
102 if err == ds.ErrNoSuchEntity {
103 log.Debugf(c, "LogEntry not found.")
104 return grpcutil.Errf(codes.NotFound, "Log stream [%s] is not registered", req.Path)
105 }
106
107 log.WithError(err).Errorf(c, "Failed to load LogEntry.")
108 return grpcutil.Internal
109 }
110
111 if subtle.ConstantTimeCompare(ls.Secret, req.Secret) != 1 {
112 log.Errorf(c, "Secrets do not match.")
113 return grpcutil.Errf(codes.InvalidArgument, "Request secret does n't match the stream secret.")
114 }
115
116 switch {
117 case ls.TerminalIndex == req.TerminalIndex:
118 // Idempotent: already updated to this value.
119 log.Debugf(c, "Log stream is already updated (idempotent).")
120 return errAlreadyUpdated
121
122 case ls.Terminated():
123 // Terminated, but with a different value.
124 log.Fields{
125 "current": ls.TerminalIndex,
126 "requested": req.TerminalIndex,
127 }.Warningf(c, "Refusing to change terminal index.")
128 return grpcutil.Errf(codes.AlreadyExists, "Terminal index is alr eady set.")
129
130 default:
131 ls.TerminalIndex = req.TerminalIndex
132 return nil
133 }
134 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698