Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(175)

Side by Side Diff: appengine/logdog/coordinator/endpoints/services/terminateStream.go

Issue 1910633006: LogDog: Support per-namespace expired archival. (Closed) Base URL: https://github.com/luci/luci-go@logdog-coordinator-svcdec
Patch Set: Update another test. Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package services 5 package services
6 6
7 import ( 7 import (
8 "crypto/subtle" 8 "crypto/subtle"
9 9
10 ds "github.com/luci/gae/service/datastore" 10 ds "github.com/luci/gae/service/datastore"
11 "github.com/luci/gae/service/info" 11 "github.com/luci/gae/service/info"
12 "github.com/luci/luci-go/appengine/logdog/coordinator" 12 "github.com/luci/luci-go/appengine/logdog/coordinator"
13 "github.com/luci/luci-go/appengine/logdog/coordinator/mutations"
14 "github.com/luci/luci-go/appengine/tumble"
13 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" 15 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
14 "github.com/luci/luci-go/common/clock" 16 "github.com/luci/luci-go/common/clock"
15 "github.com/luci/luci-go/common/grpcutil" 17 "github.com/luci/luci-go/common/grpcutil"
16 "github.com/luci/luci-go/common/logdog/types" 18 "github.com/luci/luci-go/common/logdog/types"
17 log "github.com/luci/luci-go/common/logging" 19 log "github.com/luci/luci-go/common/logging"
18 "github.com/luci/luci-go/common/proto/google" 20 "github.com/luci/luci-go/common/proto/google"
19 "golang.org/x/net/context" 21 "golang.org/x/net/context"
20 "google.golang.org/grpc/codes" 22 "google.golang.org/grpc/codes"
21 ) 23 )
22 24
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
55 57
56 // Initialize our archival parameters. 58 // Initialize our archival parameters.
57 params := coordinator.ArchivalParams{ 59 params := coordinator.ArchivalParams{
58 RequestID: info.Get(c).RequestID(), 60 RequestID: info.Get(c).RequestID(),
59 SettleDelay: cfg.Coordinator.ArchiveSettleDelay.Duration(), 61 SettleDelay: cfg.Coordinator.ArchiveSettleDelay.Duration(),
60 CompletePeriod: cfg.Coordinator.ArchiveDelayMax.Duration(), 62 CompletePeriod: cfg.Coordinator.ArchiveDelayMax.Duration(),
61 } 63 }
62 64
63 // Transactionally validate and update the terminal index. 65 // Transactionally validate and update the terminal index.
64 err = ds.Get(c).RunInTransaction(func(c context.Context) error { 66 err = ds.Get(c).RunInTransaction(func(c context.Context) error {
65 » » if err := ds.Get(c).Get(ls); err != nil { 67 » » di := ds.Get(c)
68
69 » » if err := di.Get(ls); err != nil {
66 if err == ds.ErrNoSuchEntity { 70 if err == ds.ErrNoSuchEntity {
67 log.Debugf(c, "LogEntry not found.") 71 log.Debugf(c, "LogEntry not found.")
68 return grpcutil.Errf(codes.NotFound, "Log stream %q is not registered", req.Path) 72 return grpcutil.Errf(codes.NotFound, "Log stream %q is not registered", req.Path)
69 } 73 }
70 74
71 log.WithError(err).Errorf(c, "Failed to load LogEntry.") 75 log.WithError(err).Errorf(c, "Failed to load LogEntry.")
72 return grpcutil.Internal 76 return grpcutil.Internal
73 } 77 }
74 78
75 switch { 79 switch {
(...skipping 26 matching lines...) Expand all
102 if err := params.PublishTask(c, ap, ls); err != nil { 106 if err := params.PublishTask(c, ap, ls); err != nil {
103 if err == coordinator.ErrArchiveTasked { 107 if err == coordinator.ErrArchiveTasked {
104 log.Warningf(c, "Archival has already be en tasked for this stream.") 108 log.Warningf(c, "Archival has already be en tasked for this stream.")
105 return nil 109 return nil
106 } 110 }
107 111
108 log.WithError(err).Errorf(c, "Failed to create a rchive task.") 112 log.WithError(err).Errorf(c, "Failed to create a rchive task.")
109 return grpcutil.Internal 113 return grpcutil.Internal
110 } 114 }
111 115
112 » » » if err := ds.Get(c).Put(ls); err != nil { 116 » » » if err := di.Put(ls); err != nil {
113 log.Fields{ 117 log.Fields{
114 log.ErrorKey: err, 118 log.ErrorKey: err,
115 }.Errorf(c, "Failed to Put() LogStream.") 119 }.Errorf(c, "Failed to Put() LogStream.")
116 return grpcutil.Internal 120 return grpcutil.Internal
117 } 121 }
118 122
123 // Delete the archive expiration mutation, since we have just dispatched
124 // an archive request.
125 aeParent, aeName := (&mutations.CreateArchiveTask{Path: path}).TaskName(di)
126 if err := tumble.CancelNamedMutations(c, aeParent, aeNam e); err != nil {
127 log.WithError(err).Errorf(c, "Failed to cancel a rchive expiration mutation.")
128 return grpcutil.Internal
129 }
130
119 log.Fields{ 131 log.Fields{
120 "terminalIndex": ls.TerminalIndex, 132 "terminalIndex": ls.TerminalIndex,
121 }.Infof(c, "Terminal index was set and archival was disp atched.") 133 }.Infof(c, "Terminal index was set and archival was disp atched.")
122 return nil 134 return nil
123 } 135 }
124 }, nil) 136 }, nil)
125 if err != nil { 137 if err != nil {
126 log.Fields{ 138 log.Fields{
127 log.ErrorKey: err, 139 log.ErrorKey: err,
128 }.Errorf(c, "Failed to update LogStream.") 140 }.Errorf(c, "Failed to update LogStream.")
129 return nil, err 141 return nil, err
130 } 142 }
131 143
132 return &google.Empty{}, nil 144 return &google.Empty{}, nil
133 } 145 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698