Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(312)

Side by Side Diff: appengine/logdog/coordinator/endpoints/services/registerStream.go

Issue 1910633006: LogDog: Support per-namespace expired archival. (Closed) Base URL: https://github.com/luci/luci-go@logdog-coordinator-svcdec
Patch Set: Switch to Tumble delayed mutations. Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package services 5 package services
6 6
7 import ( 7 import (
8 "crypto/subtle" 8 "crypto/subtle"
9 "errors" 9 "errors"
10 "time"
10 11
11 ds "github.com/luci/gae/service/datastore" 12 ds "github.com/luci/gae/service/datastore"
12 "github.com/luci/luci-go/appengine/logdog/coordinator" 13 "github.com/luci/luci-go/appengine/logdog/coordinator"
13 "github.com/luci/luci-go/appengine/logdog/coordinator/mutations" 14 "github.com/luci/luci-go/appengine/logdog/coordinator/mutations"
14 "github.com/luci/luci-go/appengine/tumble" 15 "github.com/luci/luci-go/appengine/tumble"
15 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" 16 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
16 "github.com/luci/luci-go/common/clock" 17 "github.com/luci/luci-go/common/clock"
17 "github.com/luci/luci-go/common/grpcutil" 18 "github.com/luci/luci-go/common/grpcutil"
18 "github.com/luci/luci-go/common/logdog/types" 19 "github.com/luci/luci-go/common/logdog/types"
19 log "github.com/luci/luci-go/common/logging" 20 log "github.com/luci/luci-go/common/logging"
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after
90 } 91 }
91 switch { 92 switch {
92 case req.Desc.Prefix != string(prefix): 93 case req.Desc.Prefix != string(prefix):
93 return nil, grpcutil.Errf(codes.InvalidArgument, "Descriptor pre fix does not match path (%s != %s)", 94 return nil, grpcutil.Errf(codes.InvalidArgument, "Descriptor pre fix does not match path (%s != %s)",
94 req.Desc.Prefix, prefix) 95 req.Desc.Prefix, prefix)
95 case req.Desc.Name != string(name): 96 case req.Desc.Name != string(name):
96 return nil, grpcutil.Errf(codes.InvalidArgument, "Descriptor nam e does not match path (%s != %s)", 97 return nil, grpcutil.Errf(codes.InvalidArgument, "Descriptor nam e does not match path (%s != %s)",
97 req.Desc.Name, name) 98 req.Desc.Name, name)
98 } 99 }
99 100
101 // Load our config and archive expiration.
102 _, cfg, err := coordinator.GetServices(c).Config(c)
103 if err != nil {
104 log.WithError(err).Errorf(c, "Failed to load configuration.")
105 return nil, grpcutil.Internal
106 }
107
108 archiveDelayMax := cfg.Coordinator.ArchiveDelayMax.Duration()
109 if archiveDelayMax < 0 {
110 log.Fields{
111 "archiveDelayMax": archiveDelayMax,
112 }.Errorf(c, "Must have positive maximum archive delay.")
113 return nil, grpcutil.Internal
114 }
115
100 // Already registered? (Non-transactional). 116 // Already registered? (Non-transactional).
101 ls := coordinator.LogStreamFromPath(path) 117 ls := coordinator.LogStreamFromPath(path)
102 switch err := ds.Get(c).Get(ls); err { 118 switch err := ds.Get(c).Get(ls); err {
103 case nil: 119 case nil:
104 // We want this to be idempotent, so validate that it matches th e current 120 // We want this to be idempotent, so validate that it matches th e current
105 // configuration and return accordingly. 121 // configuration and return accordingly.
106 if err := matchesLogStream(req, ls); err != nil { 122 if err := matchesLogStream(req, ls); err != nil {
107 return nil, grpcutil.Errf(codes.AlreadyExists, "Log stre am is already incompatibly registered: %v", err) 123 return nil, grpcutil.Errf(codes.AlreadyExists, "Log stre am is already incompatibly registered: %v", err)
108 } 124 }
109 125
110 case ds.ErrNoSuchEntity: 126 case ds.ErrNoSuchEntity:
111 // The registration is valid, so retain it. 127 // The registration is valid, so retain it.
112 » » if err := tumble.RunMutation(c, &registerStreamMutation{ls, req} ); err != nil { 128 » » err = tumble.RunMutation(c, &registerStreamMutation{
129 » » » LogStream: ls,
130 » » » req: req,
131 » » » archiveDelay: archiveDelayMax,
132 » » })
133 » » if err != nil {
113 log.Fields{ 134 log.Fields{
114 log.ErrorKey: err, 135 log.ErrorKey: err,
115 }.Errorf(c, "Failed to register LogStream.") 136 }.Errorf(c, "Failed to register LogStream.")
116 return nil, filterError(err) 137 return nil, filterError(err)
117 } 138 }
118 139
119 default: 140 default:
120 log.WithError(err).Errorf(c, "Failed to check for log stream.") 141 log.WithError(err).Errorf(c, "Failed to check for log stream.")
121 return nil, grpcutil.Internal 142 return nil, grpcutil.Internal
122 } 143 }
(...skipping 11 matching lines...) Expand all
134 case grpc.Code(err) == codes.Unknown: 155 case grpc.Code(err) == codes.Unknown:
135 return grpcutil.Internal 156 return grpcutil.Internal
136 default: 157 default:
137 return err 158 return err
138 } 159 }
139 } 160 }
140 161
141 type registerStreamMutation struct { 162 type registerStreamMutation struct {
142 *coordinator.LogStream 163 *coordinator.LogStream
143 164
144 » req *logdog.RegisterStreamRequest 165 » req *logdog.RegisterStreamRequest
166 » archiveDelay time.Duration
145 } 167 }
146 168
147 func (m registerStreamMutation) RollForward(c context.Context) ([]tumble.Mutatio n, error) { 169 func (m registerStreamMutation) RollForward(c context.Context) ([]tumble.Mutatio n, error) {
148 di := ds.Get(c) 170 di := ds.Get(c)
149 171
150 // Already registered? (Transactional). 172 // Already registered? (Transactional).
151 switch err := di.Get(m.LogStream); err { 173 switch err := di.Get(m.LogStream); err {
152 case ds.ErrNoSuchEntity: 174 case ds.ErrNoSuchEntity:
153 break 175 break
154 176
(...skipping 27 matching lines...) Expand all
182 m.Created = ds.RoundTime(clock.Now(c).UTC()) 204 m.Created = ds.RoundTime(clock.Now(c).UTC())
183 m.TerminalIndex = -1 205 m.TerminalIndex = -1
184 206
185 if err := di.Put(m.LogStream); err != nil { 207 if err := di.Put(m.LogStream); err != nil {
186 log.Fields{ 208 log.Fields{
187 log.ErrorKey: err, 209 log.ErrorKey: err,
188 }.Errorf(c, "Failed to Put() LogStream.") 210 }.Errorf(c, "Failed to Put() LogStream.")
189 return nil, grpcutil.Internal 211 return nil, grpcutil.Internal
190 } 212 }
191 213
214 // Add a named delayed mutation to archive this stream if it's not archi ved
215 // yet. We will cancel this in terminateStream once we dispatch an immed iate
216 // archival task.
217 archiveExpiredMutation := mutations.CreateArchiveTask{
218 Path: m.Path(),
219 Expiration: clock.Now(c).Add(m.archiveDelay),
220 }
221 aeParent, aeName := archiveExpiredMutation.TaskName(di)
iannucci 2016/04/29 20:09:42 why not just .SetArchiveTaskDeadline(c, m)? In dm
dnj 2016/04/29 23:04:21 Discussed, this interface in general needs a face
222 err := tumble.PutNamedMutations(c, aeParent, map[string]tumble.Mutation{
223 aeName: &archiveExpiredMutation,
224 })
225 if err != nil {
226 log.WithError(err).Errorf(c, "Failed to load named mutations.")
227 return nil, grpcutil.Internal
228 }
229
192 return []tumble.Mutation{ 230 return []tumble.Mutation{
193 &mutations.PutHierarchyMutation{ 231 &mutations.PutHierarchyMutation{
194 Path: m.Path(), 232 Path: m.Path(),
195 }, 233 },
196 }, nil 234 }, nil
197 } 235 }
198 236
199 func (m registerStreamMutation) Root(c context.Context) *ds.Key { 237 func (m registerStreamMutation) Root(c context.Context) *ds.Key {
200 return ds.Get(c).KeyForObj(m.LogStream) 238 return ds.Get(c).KeyForObj(m.LogStream)
201 } 239 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698