OLD | NEW |
1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package services | 5 package services |
6 | 6 |
7 import ( | 7 import ( |
8 "crypto/subtle" | 8 "crypto/subtle" |
9 | 9 |
10 "github.com/golang/protobuf/proto" | 10 "github.com/golang/protobuf/proto" |
11 ds "github.com/luci/gae/service/datastore" | 11 ds "github.com/luci/gae/service/datastore" |
12 "github.com/luci/luci-go/common/clock" | 12 "github.com/luci/luci-go/common/clock" |
13 log "github.com/luci/luci-go/common/logging" | 13 log "github.com/luci/luci-go/common/logging" |
14 "github.com/luci/luci-go/grpc/grpcutil" | 14 "github.com/luci/luci-go/grpc/grpcutil" |
15 "github.com/luci/luci-go/logdog/api/config/svcconfig" | |
16 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" | 15 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" |
17 "github.com/luci/luci-go/logdog/api/logpb" | 16 "github.com/luci/luci-go/logdog/api/logpb" |
18 "github.com/luci/luci-go/logdog/appengine/coordinator" | 17 "github.com/luci/luci-go/logdog/appengine/coordinator" |
19 "github.com/luci/luci-go/logdog/appengine/coordinator/config" | |
20 "github.com/luci/luci-go/logdog/appengine/coordinator/endpoints" | 18 "github.com/luci/luci-go/logdog/appengine/coordinator/endpoints" |
21 "github.com/luci/luci-go/logdog/appengine/coordinator/hierarchy" | 19 "github.com/luci/luci-go/logdog/appengine/coordinator/hierarchy" |
22 "github.com/luci/luci-go/logdog/appengine/coordinator/mutations" | 20 "github.com/luci/luci-go/logdog/appengine/coordinator/mutations" |
23 "github.com/luci/luci-go/logdog/common/types" | 21 "github.com/luci/luci-go/logdog/common/types" |
24 "github.com/luci/luci-go/tumble" | 22 "github.com/luci/luci-go/tumble" |
25 "golang.org/x/net/context" | 23 "golang.org/x/net/context" |
26 "google.golang.org/grpc/codes" | 24 "google.golang.org/grpc/codes" |
27 ) | 25 ) |
28 | 26 |
29 func buildLogStreamState(ls *coordinator.LogStream, lst *coordinator.LogStreamSt
ate) *logdog.LogStreamState { | 27 func buildLogStreamState(ls *coordinator.LogStream, lst *coordinator.LogStreamSt
ate) *logdog.LogStreamState { |
(...skipping 125 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
155 } | 153 } |
156 | 154 |
157 // Before we go into transaction, try and put these entries. Thi
s should not | 155 // Before we go into transaction, try and put these entries. Thi
s should not |
158 // be contested, since components don't share an entity root. | 156 // be contested, since components don't share an entity root. |
159 if err := hierarchy.PutMulti(c, comps); err != nil { | 157 if err := hierarchy.PutMulti(c, comps); err != nil { |
160 log.WithError(err).Errorf(c, "Failed to add missing hier
archy components.") | 158 log.WithError(err).Errorf(c, "Failed to add missing hier
archy components.") |
161 return nil, grpcutil.Internal | 159 return nil, grpcutil.Internal |
162 } | 160 } |
163 | 161 |
164 // The stream does not exist. Proceed with transactional registr
ation. | 162 // The stream does not exist. Proceed with transactional registr
ation. |
165 » » err = tumble.RunMutation(c, ®isterStreamMutation{ | 163 » » lsKey := ds.KeyForObj(c, ls) |
166 » » » RegisterStreamRequest: req, | 164 » » err = tumble.RunUnbuffered(c, lsKey, func(c context.Context) ([]
tumble.Mutation, error) { |
167 » » » cfg: cfg, | 165 » » » // Load our state and stream (transactional). |
168 » » » pcfg: pcfg, | 166 » » » switch err := ds.Get(c, ls, lst); { |
169 » » » desc: &desc, | 167 » » » case err == nil: |
170 » » » pfx: pfx, | 168 » » » » // The stream is already registered. |
171 » » » lst: lst, | 169 » » » » return nil, nil |
172 » » » ls: ls, | 170 |
| 171 » » » case !anyNoSuchEntity(err): |
| 172 » » » » log.WithError(err).Errorf(c, "Failed to check fo
r stream registration (transactional).") |
| 173 » » » » return nil, err |
| 174 » » » } |
| 175 |
| 176 » » » // The stream is not yet registered. |
| 177 » » » log.Infof(c, "Registering new log stream.") |
| 178 |
| 179 » » » // Construct our LogStreamState. |
| 180 » » » now := clock.Now(c).UTC() |
| 181 » » » lst.Created = now |
| 182 » » » lst.Updated = now |
| 183 » » » lst.Secret = pfx.Secret // Copy Prefix Secret to reduce
datastore Gets. |
| 184 |
| 185 » » » // Construct our LogStream. |
| 186 » » » ls.Created = now |
| 187 » » » ls.ProtoVersion = req.ProtoVersion |
| 188 |
| 189 » » » if err := ls.LoadDescriptor(&desc); err != nil { |
| 190 » » » » log.Fields{ |
| 191 » » » » » log.ErrorKey: err, |
| 192 » » » » }.Errorf(c, "Failed to load descriptor into LogS
tream.") |
| 193 » » » » return nil, grpcutil.Errf(codes.InvalidArgument,
"Failed to load descriptor.") |
| 194 » » » } |
| 195 |
| 196 » » » // If our registration request included a terminal index
, terminate the |
| 197 » » » // log stream state as well. |
| 198 » » » if req.TerminalIndex >= 0 { |
| 199 » » » » log.Fields{ |
| 200 » » » » » "terminalIndex": req.TerminalIndex, |
| 201 » » » » }.Debugf(c, "Registration request included termi
nal index.") |
| 202 |
| 203 » » » » lst.TerminalIndex = req.TerminalIndex |
| 204 » » » » lst.TerminatedTime = now |
| 205 » » » } else { |
| 206 » » » » lst.TerminalIndex = -1 |
| 207 » » » } |
| 208 |
| 209 » » » if err := ds.Put(c, ls, lst); err != nil { |
| 210 » » » » log.Fields{ |
| 211 » » » » » log.ErrorKey: err, |
| 212 » » » » }.Errorf(c, "Failed to Put LogStream.") |
| 213 » » » » return nil, grpcutil.Internal |
| 214 » » » } |
| 215 |
| 216 » » » // Add a named delayed mutation to archive this stream i
f it's not archived |
| 217 » » » // yet. |
| 218 » » » // |
| 219 » » » // If the registration did not include a terminal index,
this will be our |
| 220 » » » // pessimistic archival request, scheduled on registrati
on to catch streams |
| 221 » » » // that don't expire. This mutation will be replaced by
the optimistic |
| 222 » » » // archival mutation when/if the stream is terminated vi
a TerminateStream. |
| 223 » » » // |
| 224 » » » // If the registration included a terminal index, apply
our standard |
| 225 » » » // parameters to the archival. Since TerminateStream wil
l not be called, |
| 226 » » » // this will be our formal optimistic archival task. |
| 227 » » » params := standardArchivalParams(cfg, pcfg) |
| 228 » » » cat := mutations.CreateArchiveTask{ |
| 229 » » » » ID: ls.ID, |
| 230 » » » } |
| 231 » » » if req.TerminalIndex < 0 { |
| 232 » » » » // No terminal index, schedule pessimistic clean
up archival. |
| 233 » » » » cat.Expiration = now.Add(params.CompletePeriod) |
| 234 |
| 235 » » » » log.Fields{ |
| 236 » » » » » "deadline": cat.Expiration, |
| 237 » » » » }.Debugf(c, "Scheduling cleanup archival mutatio
n.") |
| 238 » » » } else { |
| 239 » » » » // Terminal index, schedule optimistic archival
(mirrors TerminateStream). |
| 240 » » » » cat.SettleDelay = params.SettleDelay |
| 241 » » » » cat.CompletePeriod = params.CompletePeriod |
| 242 |
| 243 » » » » // Schedule this mutation to execute after our s
ettle delay. |
| 244 » » » » cat.Expiration = now.Add(params.SettleDelay) |
| 245 |
| 246 » » » » log.Fields{ |
| 247 » » » » » "settleDelay": cat.SettleDelay, |
| 248 » » » » » "completePeriod": cat.CompletePeriod, |
| 249 » » » » » "scheduledAt": cat.Expiration, |
| 250 » » » » }.Debugf(c, "Scheduling archival mutation.") |
| 251 » » » } |
| 252 |
| 253 » » » aeName := cat.TaskName(c) |
| 254 » » » if err := tumble.PutNamedMutations(c, lsKey, map[string]
tumble.Mutation{aeName: &cat}); err != nil { |
| 255 » » » » log.WithError(err).Errorf(c, "Failed to write na
med mutations.") |
| 256 » » » » return nil, grpcutil.Internal |
| 257 » » » } |
| 258 |
| 259 » » » return nil, nil |
173 }) | 260 }) |
174 if err != nil { | 261 if err != nil { |
175 log.Fields{ | 262 log.Fields{ |
176 log.ErrorKey: err, | 263 log.ErrorKey: err, |
177 }.Errorf(c, "Failed to register LogStream.") | 264 }.Errorf(c, "Failed to register LogStream.") |
178 return nil, err | 265 return nil, err |
179 } | 266 } |
180 } | 267 } |
181 | 268 |
182 return &logdog.RegisterStreamResponse{ | 269 return &logdog.RegisterStreamResponse{ |
183 Id: string(ls.ID), | 270 Id: string(ls.ID), |
184 State: buildLogStreamState(ls, lst), | 271 State: buildLogStreamState(ls, lst), |
185 }, nil | 272 }, nil |
186 } | 273 } |
187 | |
188 type registerStreamMutation struct { | |
189 *logdog.RegisterStreamRequest | |
190 | |
191 cfg *config.Config | |
192 pcfg *svcconfig.ProjectConfig | |
193 | |
194 desc *logpb.LogStreamDescriptor | |
195 pfx *coordinator.LogPrefix | |
196 ls *coordinator.LogStream | |
197 lst *coordinator.LogStreamState | |
198 } | |
199 | |
200 func (m *registerStreamMutation) RollForward(c context.Context) ([]tumble.Mutati
on, error) { | |
201 // Load our state and stream (transactional). | |
202 switch err := ds.Get(c, m.ls, m.lst); { | |
203 case err == nil: | |
204 // The stream is already registered. | |
205 return nil, nil | |
206 | |
207 case !anyNoSuchEntity(err): | |
208 log.WithError(err).Errorf(c, "Failed to check for stream registr
ation (transactional).") | |
209 return nil, err | |
210 } | |
211 | |
212 // The stream is not yet registered. | |
213 log.Infof(c, "Registering new log stream.") | |
214 | |
215 // Construct our LogStreamState. | |
216 now := clock.Now(c).UTC() | |
217 m.lst.Created = now | |
218 m.lst.Updated = now | |
219 m.lst.Secret = m.pfx.Secret // Copy Prefix Secret to reduce datastore Ge
ts. | |
220 | |
221 // Construct our LogStream. | |
222 m.ls.Created = now | |
223 m.ls.ProtoVersion = m.ProtoVersion | |
224 | |
225 if err := m.ls.LoadDescriptor(m.desc); err != nil { | |
226 log.Fields{ | |
227 log.ErrorKey: err, | |
228 }.Errorf(c, "Failed to load descriptor into LogStream.") | |
229 return nil, grpcutil.Errf(codes.InvalidArgument, "Failed to load
descriptor.") | |
230 } | |
231 | |
232 // If our registration request included a terminal index, terminate the | |
233 // log stream state as well. | |
234 if m.TerminalIndex >= 0 { | |
235 log.Fields{ | |
236 "terminalIndex": m.TerminalIndex, | |
237 }.Debugf(c, "Registration request included terminal index.") | |
238 | |
239 m.lst.TerminalIndex = m.TerminalIndex | |
240 m.lst.TerminatedTime = now | |
241 } else { | |
242 m.lst.TerminalIndex = -1 | |
243 } | |
244 | |
245 if err := ds.Put(c, m.ls, m.lst); err != nil { | |
246 log.Fields{ | |
247 log.ErrorKey: err, | |
248 }.Errorf(c, "Failed to Put LogStream.") | |
249 return nil, grpcutil.Internal | |
250 } | |
251 | |
252 // Add a named delayed mutation to archive this stream if it's not archi
ved | |
253 // yet. | |
254 // | |
255 // If the registration did not include a terminal index, this will be ou
r | |
256 // pessimistic archival request, scheduled on registration to catch stre
ams | |
257 // that don't expire. This mutation will be replaced by the optimistic | |
258 // archival mutation when/if the stream is terminated via TerminateStrea
m. | |
259 // | |
260 // If the registration included a terminal index, apply our standard | |
261 // parameters to the archival. Since TerminateStream will not be called, | |
262 // this will be our formal optimistic archival task. | |
263 params := standardArchivalParams(m.cfg, m.pcfg) | |
264 cat := mutations.CreateArchiveTask{ | |
265 ID: m.ls.ID, | |
266 } | |
267 if m.TerminalIndex < 0 { | |
268 // No terminal index, schedule pessimistic cleanup archival. | |
269 cat.Expiration = now.Add(params.CompletePeriod) | |
270 | |
271 log.Fields{ | |
272 "deadline": cat.Expiration, | |
273 }.Debugf(c, "Scheduling cleanup archival mutation.") | |
274 } else { | |
275 // Terminal index, schedule optimistic archival (mirrors Termina
teStream). | |
276 cat.SettleDelay = params.SettleDelay | |
277 cat.CompletePeriod = params.CompletePeriod | |
278 | |
279 // Schedule this mutation to execute after our settle delay. | |
280 cat.Expiration = now.Add(params.SettleDelay) | |
281 | |
282 log.Fields{ | |
283 "settleDelay": cat.SettleDelay, | |
284 "completePeriod": cat.CompletePeriod, | |
285 "scheduledAt": cat.Expiration, | |
286 }.Debugf(c, "Scheduling archival mutation.") | |
287 } | |
288 | |
289 aeParent, aeName := cat.TaskName(c) | |
290 if err := tumble.PutNamedMutations(c, aeParent, map[string]tumble.Mutati
on{aeName: &cat}); err != nil { | |
291 log.WithError(err).Errorf(c, "Failed to write named mutations.") | |
292 return nil, grpcutil.Internal | |
293 } | |
294 | |
295 return nil, nil | |
296 } | |
297 | |
298 func (m *registerStreamMutation) Root(c context.Context) *ds.Key { | |
299 return ds.KeyForObj(c, m.ls) | |
300 } | |
OLD | NEW |