| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package services | 5 package services |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "crypto/subtle" | 8 "crypto/subtle" |
| 9 | 9 |
| 10 "github.com/golang/protobuf/proto" | 10 "github.com/golang/protobuf/proto" |
| 11 ds "github.com/luci/gae/service/datastore" | 11 ds "github.com/luci/gae/service/datastore" |
| 12 "github.com/luci/luci-go/common/clock" | 12 "github.com/luci/luci-go/common/clock" |
| 13 log "github.com/luci/luci-go/common/logging" | 13 log "github.com/luci/luci-go/common/logging" |
| 14 "github.com/luci/luci-go/grpc/grpcutil" | 14 "github.com/luci/luci-go/grpc/grpcutil" |
| 15 "github.com/luci/luci-go/logdog/api/config/svcconfig" | |
| 16 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" | 15 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" |
| 17 "github.com/luci/luci-go/logdog/api/logpb" | 16 "github.com/luci/luci-go/logdog/api/logpb" |
| 18 "github.com/luci/luci-go/logdog/appengine/coordinator" | 17 "github.com/luci/luci-go/logdog/appengine/coordinator" |
| 19 "github.com/luci/luci-go/logdog/appengine/coordinator/config" | |
| 20 "github.com/luci/luci-go/logdog/appengine/coordinator/endpoints" | 18 "github.com/luci/luci-go/logdog/appengine/coordinator/endpoints" |
| 21 "github.com/luci/luci-go/logdog/appengine/coordinator/hierarchy" | 19 "github.com/luci/luci-go/logdog/appengine/coordinator/hierarchy" |
| 22 "github.com/luci/luci-go/logdog/appengine/coordinator/mutations" | 20 "github.com/luci/luci-go/logdog/appengine/coordinator/mutations" |
| 23 "github.com/luci/luci-go/logdog/common/types" | 21 "github.com/luci/luci-go/logdog/common/types" |
| 24 "github.com/luci/luci-go/tumble" | 22 "github.com/luci/luci-go/tumble" |
| 25 "golang.org/x/net/context" | 23 "golang.org/x/net/context" |
| 26 "google.golang.org/grpc/codes" | 24 "google.golang.org/grpc/codes" |
| 27 ) | 25 ) |
| 28 | 26 |
| 29 func buildLogStreamState(ls *coordinator.LogStream, lst *coordinator.LogStreamSt
ate) *logdog.LogStreamState { | 27 func buildLogStreamState(ls *coordinator.LogStream, lst *coordinator.LogStreamSt
ate) *logdog.LogStreamState { |
| (...skipping 125 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 155 } | 153 } |
| 156 | 154 |
| 157 // Before we go into transaction, try and put these entries. Thi
s should not | 155 // Before we go into transaction, try and put these entries. Thi
s should not |
| 158 // be contested, since components don't share an entity root. | 156 // be contested, since components don't share an entity root. |
| 159 if err := hierarchy.PutMulti(c, comps); err != nil { | 157 if err := hierarchy.PutMulti(c, comps); err != nil { |
| 160 log.WithError(err).Errorf(c, "Failed to add missing hier
archy components.") | 158 log.WithError(err).Errorf(c, "Failed to add missing hier
archy components.") |
| 161 return nil, grpcutil.Internal | 159 return nil, grpcutil.Internal |
| 162 } | 160 } |
| 163 | 161 |
| 164 // The stream does not exist. Proceed with transactional registr
ation. | 162 // The stream does not exist. Proceed with transactional registr
ation. |
| 165 » » err = tumble.RunMutation(c, ®isterStreamMutation{ | 163 » » lsKey := ds.KeyForObj(c, ls) |
| 166 » » » RegisterStreamRequest: req, | 164 » » err = tumble.RegisterInTransaction(c, lsKey, func(c context.Cont
ext) ([]tumble.Mutation, error) { |
| 167 » » » cfg: cfg, | 165 » » » // Load our state and stream (transactional). |
| 168 » » » pcfg: pcfg, | 166 » » » switch err := ds.Get(c, ls, lst); { |
| 169 » » » desc: &desc, | 167 » » » case err == nil: |
| 170 » » » pfx: pfx, | 168 » » » » // The stream is already registered. |
| 171 » » » lst: lst, | 169 » » » » return nil, nil |
| 172 » » » ls: ls, | 170 |
| 171 » » » case !anyNoSuchEntity(err): |
| 172 » » » » log.WithError(err).Errorf(c, "Failed to check fo
r stream registration (transactional).") |
| 173 » » » » return nil, err |
| 174 » » » } |
| 175 |
| 176 » » » // The stream is not yet registered. |
| 177 » » » log.Infof(c, "Registering new log stream.") |
| 178 |
| 179 » » » // Construct our LogStreamState. |
| 180 » » » now := clock.Now(c).UTC() |
| 181 » » » lst.Created = now |
| 182 » » » lst.Updated = now |
| 183 » » » lst.Secret = pfx.Secret // Copy Prefix Secret to reduce
datastore Gets. |
| 184 |
| 185 » » » // Construct our LogStream. |
| 186 » » » ls.Created = now |
| 187 » » » ls.ProtoVersion = req.ProtoVersion |
| 188 |
| 189 » » » if err := ls.LoadDescriptor(&desc); err != nil { |
| 190 » » » » log.Fields{ |
| 191 » » » » » log.ErrorKey: err, |
| 192 » » » » }.Errorf(c, "Failed to load descriptor into LogS
tream.") |
| 193 » » » » return nil, grpcutil.Errf(codes.InvalidArgument,
"Failed to load descriptor.") |
| 194 » » » } |
| 195 |
| 196 » » » // If our registration request included a terminal index
, terminate the |
| 197 » » » // log stream state as well. |
| 198 » » » if req.TerminalIndex >= 0 { |
| 199 » » » » log.Fields{ |
| 200 » » » » » "terminalIndex": req.TerminalIndex, |
| 201 » » » » }.Debugf(c, "Registration request included termi
nal index.") |
| 202 |
| 203 » » » » lst.TerminalIndex = req.TerminalIndex |
| 204 » » » » lst.TerminatedTime = now |
| 205 » » » } else { |
| 206 » » » » lst.TerminalIndex = -1 |
| 207 » » » } |
| 208 |
| 209 » » » if err := ds.Put(c, ls, lst); err != nil { |
| 210 » » » » log.Fields{ |
| 211 » » » » » log.ErrorKey: err, |
| 212 » » » » }.Errorf(c, "Failed to Put LogStream.") |
| 213 » » » » return nil, grpcutil.Internal |
| 214 » » » } |
| 215 |
| 216 » » » // Add a named delayed mutation to archive this stream i
f it's not archived |
| 217 » » » // yet. |
| 218 » » » // |
| 219 » » » // If the registration did not include a terminal index,
this will be our |
| 220 » » » // pessimistic archival request, scheduled on registrati
on to catch streams |
| 221 » » » // that don't expire. This mutation will be replaced by
the optimistic |
| 222 » » » // archival mutation when/if the stream is terminated vi
a TerminateStream. |
| 223 » » » // |
| 224 » » » // If the registration included a terminal index, apply
our standard |
| 225 » » » // parameters to the archival. Since TerminateStream wil
l not be called, |
| 226 » » » // this will be our formal optimistic archival task. |
| 227 » » » params := standardArchivalParams(cfg, pcfg) |
| 228 » » » cat := mutations.CreateArchiveTask{ |
| 229 » » » » ID: ls.ID, |
| 230 » » » } |
| 231 » » » if req.TerminalIndex < 0 { |
| 232 » » » » // No terminal index, schedule pessimistic clean
up archival. |
| 233 » » » » cat.Expiration = now.Add(params.CompletePeriod) |
| 234 |
| 235 » » » » log.Fields{ |
| 236 » » » » » "deadline": cat.Expiration, |
| 237 » » » » }.Debugf(c, "Scheduling cleanup archival mutatio
n.") |
| 238 » » » } else { |
| 239 » » » » // Terminal index, schedule optimistic archival
(mirrors TerminateStream). |
| 240 » » » » cat.SettleDelay = params.SettleDelay |
| 241 » » » » cat.CompletePeriod = params.CompletePeriod |
| 242 |
| 243 » » » » // Schedule this mutation to execute after our s
ettle delay. |
| 244 » » » » cat.Expiration = now.Add(params.SettleDelay) |
| 245 |
| 246 » » » » log.Fields{ |
| 247 » » » » » "settleDelay": cat.SettleDelay, |
| 248 » » » » » "completePeriod": cat.CompletePeriod, |
| 249 » » » » » "scheduledAt": cat.Expiration, |
| 250 » » » » }.Debugf(c, "Scheduling archival mutation.") |
| 251 » » » } |
| 252 |
| 253 » » » aeName := cat.TaskName(c) |
| 254 » » » if err := tumble.PutNamedMutations(c, lsKey, map[string]
tumble.Mutation{aeName: &cat}); err != nil { |
| 255 » » » » log.WithError(err).Errorf(c, "Failed to write na
med mutations.") |
| 256 » » » » return nil, grpcutil.Internal |
| 257 » » » } |
| 258 |
| 259 » » » return nil, nil |
| 173 }) | 260 }) |
| 174 if err != nil { | 261 if err != nil { |
| 175 log.Fields{ | 262 log.Fields{ |
| 176 log.ErrorKey: err, | 263 log.ErrorKey: err, |
| 177 }.Errorf(c, "Failed to register LogStream.") | 264 }.Errorf(c, "Failed to register LogStream.") |
| 178 return nil, err | 265 return nil, err |
| 179 } | 266 } |
| 180 } | 267 } |
| 181 | 268 |
| 182 return &logdog.RegisterStreamResponse{ | 269 return &logdog.RegisterStreamResponse{ |
| 183 Id: string(ls.ID), | 270 Id: string(ls.ID), |
| 184 State: buildLogStreamState(ls, lst), | 271 State: buildLogStreamState(ls, lst), |
| 185 }, nil | 272 }, nil |
| 186 } | 273 } |
| 187 | |
| 188 type registerStreamMutation struct { | |
| 189 *logdog.RegisterStreamRequest | |
| 190 | |
| 191 cfg *config.Config | |
| 192 pcfg *svcconfig.ProjectConfig | |
| 193 | |
| 194 desc *logpb.LogStreamDescriptor | |
| 195 pfx *coordinator.LogPrefix | |
| 196 ls *coordinator.LogStream | |
| 197 lst *coordinator.LogStreamState | |
| 198 } | |
| 199 | |
| 200 func (m *registerStreamMutation) RollForward(c context.Context) ([]tumble.Mutati
on, error) { | |
| 201 // Load our state and stream (transactional). | |
| 202 switch err := ds.Get(c, m.ls, m.lst); { | |
| 203 case err == nil: | |
| 204 // The stream is already registered. | |
| 205 return nil, nil | |
| 206 | |
| 207 case !anyNoSuchEntity(err): | |
| 208 log.WithError(err).Errorf(c, "Failed to check for stream registr
ation (transactional).") | |
| 209 return nil, err | |
| 210 } | |
| 211 | |
| 212 // The stream is not yet registered. | |
| 213 log.Infof(c, "Registering new log stream.") | |
| 214 | |
| 215 // Construct our LogStreamState. | |
| 216 now := clock.Now(c).UTC() | |
| 217 m.lst.Created = now | |
| 218 m.lst.Updated = now | |
| 219 m.lst.Secret = m.pfx.Secret // Copy Prefix Secret to reduce datastore Ge
ts. | |
| 220 | |
| 221 // Construct our LogStream. | |
| 222 m.ls.Created = now | |
| 223 m.ls.ProtoVersion = m.ProtoVersion | |
| 224 | |
| 225 if err := m.ls.LoadDescriptor(m.desc); err != nil { | |
| 226 log.Fields{ | |
| 227 log.ErrorKey: err, | |
| 228 }.Errorf(c, "Failed to load descriptor into LogStream.") | |
| 229 return nil, grpcutil.Errf(codes.InvalidArgument, "Failed to load
descriptor.") | |
| 230 } | |
| 231 | |
| 232 // If our registration request included a terminal index, terminate the | |
| 233 // log stream state as well. | |
| 234 if m.TerminalIndex >= 0 { | |
| 235 log.Fields{ | |
| 236 "terminalIndex": m.TerminalIndex, | |
| 237 }.Debugf(c, "Registration request included terminal index.") | |
| 238 | |
| 239 m.lst.TerminalIndex = m.TerminalIndex | |
| 240 m.lst.TerminatedTime = now | |
| 241 } else { | |
| 242 m.lst.TerminalIndex = -1 | |
| 243 } | |
| 244 | |
| 245 if err := ds.Put(c, m.ls, m.lst); err != nil { | |
| 246 log.Fields{ | |
| 247 log.ErrorKey: err, | |
| 248 }.Errorf(c, "Failed to Put LogStream.") | |
| 249 return nil, grpcutil.Internal | |
| 250 } | |
| 251 | |
| 252 // Add a named delayed mutation to archive this stream if it's not archi
ved | |
| 253 // yet. | |
| 254 // | |
| 255 // If the registration did not include a terminal index, this will be ou
r | |
| 256 // pessimistic archival request, scheduled on registration to catch stre
ams | |
| 257 // that don't expire. This mutation will be replaced by the optimistic | |
| 258 // archival mutation when/if the stream is terminated via TerminateStrea
m. | |
| 259 // | |
| 260 // If the registration included a terminal index, apply our standard | |
| 261 // parameters to the archival. Since TerminateStream will not be called, | |
| 262 // this will be our formal optimistic archival task. | |
| 263 params := standardArchivalParams(m.cfg, m.pcfg) | |
| 264 cat := mutations.CreateArchiveTask{ | |
| 265 ID: m.ls.ID, | |
| 266 } | |
| 267 if m.TerminalIndex < 0 { | |
| 268 // No terminal index, schedule pessimistic cleanup archival. | |
| 269 cat.Expiration = now.Add(params.CompletePeriod) | |
| 270 | |
| 271 log.Fields{ | |
| 272 "deadline": cat.Expiration, | |
| 273 }.Debugf(c, "Scheduling cleanup archival mutation.") | |
| 274 } else { | |
| 275 // Terminal index, schedule optimistic archival (mirrors Termina
teStream). | |
| 276 cat.SettleDelay = params.SettleDelay | |
| 277 cat.CompletePeriod = params.CompletePeriod | |
| 278 | |
| 279 // Schedule this mutation to execute after our settle delay. | |
| 280 cat.Expiration = now.Add(params.SettleDelay) | |
| 281 | |
| 282 log.Fields{ | |
| 283 "settleDelay": cat.SettleDelay, | |
| 284 "completePeriod": cat.CompletePeriod, | |
| 285 "scheduledAt": cat.Expiration, | |
| 286 }.Debugf(c, "Scheduling archival mutation.") | |
| 287 } | |
| 288 | |
| 289 aeParent, aeName := cat.TaskName(c) | |
| 290 if err := tumble.PutNamedMutations(c, aeParent, map[string]tumble.Mutati
on{aeName: &cat}); err != nil { | |
| 291 log.WithError(err).Errorf(c, "Failed to write named mutations.") | |
| 292 return nil, grpcutil.Internal | |
| 293 } | |
| 294 | |
| 295 return nil, nil | |
| 296 } | |
| 297 | |
| 298 func (m *registerStreamMutation) Root(c context.Context) *ds.Key { | |
| 299 return ds.KeyForObj(c, m.ls) | |
| 300 } | |
| OLD | NEW |