Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(373)

Side by Side Diff: logdog/appengine/coordinator/endpoints/services/registerStream.go

Issue 2592753002: Create unbuffered Tumble entry point for LogDog. (Closed)
Patch Set: Add bench, update. Created 4 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | logdog/appengine/coordinator/endpoints/services/registerStream_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package services 5 package services
6 6
7 import ( 7 import (
8 "crypto/subtle" 8 "crypto/subtle"
9 9
10 "github.com/golang/protobuf/proto" 10 "github.com/golang/protobuf/proto"
11 ds "github.com/luci/gae/service/datastore" 11 ds "github.com/luci/gae/service/datastore"
12 "github.com/luci/luci-go/common/clock" 12 "github.com/luci/luci-go/common/clock"
13 log "github.com/luci/luci-go/common/logging" 13 log "github.com/luci/luci-go/common/logging"
14 "github.com/luci/luci-go/grpc/grpcutil" 14 "github.com/luci/luci-go/grpc/grpcutil"
15 "github.com/luci/luci-go/logdog/api/config/svcconfig"
16 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" 15 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1"
17 "github.com/luci/luci-go/logdog/api/logpb" 16 "github.com/luci/luci-go/logdog/api/logpb"
18 "github.com/luci/luci-go/logdog/appengine/coordinator" 17 "github.com/luci/luci-go/logdog/appengine/coordinator"
19 "github.com/luci/luci-go/logdog/appengine/coordinator/config"
20 "github.com/luci/luci-go/logdog/appengine/coordinator/endpoints" 18 "github.com/luci/luci-go/logdog/appengine/coordinator/endpoints"
21 "github.com/luci/luci-go/logdog/appengine/coordinator/hierarchy" 19 "github.com/luci/luci-go/logdog/appengine/coordinator/hierarchy"
22 "github.com/luci/luci-go/logdog/appengine/coordinator/mutations" 20 "github.com/luci/luci-go/logdog/appengine/coordinator/mutations"
23 "github.com/luci/luci-go/logdog/common/types" 21 "github.com/luci/luci-go/logdog/common/types"
24 "github.com/luci/luci-go/tumble" 22 "github.com/luci/luci-go/tumble"
25 "golang.org/x/net/context" 23 "golang.org/x/net/context"
26 "google.golang.org/grpc/codes" 24 "google.golang.org/grpc/codes"
27 ) 25 )
28 26
29 func buildLogStreamState(ls *coordinator.LogStream, lst *coordinator.LogStreamSt ate) *logdog.LogStreamState { 27 func buildLogStreamState(ls *coordinator.LogStream, lst *coordinator.LogStreamSt ate) *logdog.LogStreamState {
(...skipping 125 matching lines...) Expand 10 before | Expand all | Expand 10 after
155 } 153 }
156 154
157 // Before we go into transaction, try and put these entries. Thi s should not 155 // Before we go into transaction, try and put these entries. Thi s should not
158 // be contested, since components don't share an entity root. 156 // be contested, since components don't share an entity root.
159 if err := hierarchy.PutMulti(c, comps); err != nil { 157 if err := hierarchy.PutMulti(c, comps); err != nil {
160 log.WithError(err).Errorf(c, "Failed to add missing hier archy components.") 158 log.WithError(err).Errorf(c, "Failed to add missing hier archy components.")
161 return nil, grpcutil.Internal 159 return nil, grpcutil.Internal
162 } 160 }
163 161
164 // The stream does not exist. Proceed with transactional registr ation. 162 // The stream does not exist. Proceed with transactional registr ation.
165 » » err = tumble.RunMutation(c, &registerStreamMutation{ 163 » » lsKey := ds.KeyForObj(c, ls)
166 » » » RegisterStreamRequest: req, 164 » » err = tumble.RunUnbuffered(c, lsKey, func(c context.Context) ([] tumble.Mutation, error) {
167 » » » cfg: cfg, 165 » » » // Load our state and stream (transactional).
168 » » » pcfg: pcfg, 166 » » » switch err := ds.Get(c, ls, lst); {
169 » » » desc: &desc, 167 » » » case err == nil:
170 » » » pfx: pfx, 168 » » » » // The stream is already registered.
171 » » » lst: lst, 169 » » » » return nil, nil
172 » » » ls: ls, 170
171 » » » case !anyNoSuchEntity(err):
172 » » » » log.WithError(err).Errorf(c, "Failed to check fo r stream registration (transactional).")
173 » » » » return nil, err
174 » » » }
175
176 » » » // The stream is not yet registered.
177 » » » log.Infof(c, "Registering new log stream.")
178
179 » » » // Construct our LogStreamState.
180 » » » now := clock.Now(c).UTC()
181 » » » lst.Created = now
182 » » » lst.Updated = now
183 » » » lst.Secret = pfx.Secret // Copy Prefix Secret to reduce datastore Gets.
184
185 » » » // Construct our LogStream.
186 » » » ls.Created = now
187 » » » ls.ProtoVersion = req.ProtoVersion
188
189 » » » if err := ls.LoadDescriptor(&desc); err != nil {
190 » » » » log.Fields{
191 » » » » » log.ErrorKey: err,
192 » » » » }.Errorf(c, "Failed to load descriptor into LogS tream.")
193 » » » » return nil, grpcutil.Errf(codes.InvalidArgument, "Failed to load descriptor.")
194 » » » }
195
196 » » » // If our registration request included a terminal index , terminate the
197 » » » // log stream state as well.
198 » » » if req.TerminalIndex >= 0 {
199 » » » » log.Fields{
200 » » » » » "terminalIndex": req.TerminalIndex,
201 » » » » }.Debugf(c, "Registration request included termi nal index.")
202
203 » » » » lst.TerminalIndex = req.TerminalIndex
204 » » » » lst.TerminatedTime = now
205 » » » } else {
206 » » » » lst.TerminalIndex = -1
207 » » » }
208
209 » » » if err := ds.Put(c, ls, lst); err != nil {
210 » » » » log.Fields{
211 » » » » » log.ErrorKey: err,
212 » » » » }.Errorf(c, "Failed to Put LogStream.")
213 » » » » return nil, grpcutil.Internal
214 » » » }
215
216 » » » // Add a named delayed mutation to archive this stream i f it's not archived
217 » » » // yet.
218 » » » //
219 » » » // If the registration did not include a terminal index, this will be our
220 » » » // pessimistic archival request, scheduled on registrati on to catch streams
221 » » » // that don't expire. This mutation will be replaced by the optimistic
222 » » » // archival mutation when/if the stream is terminated vi a TerminateStream.
223 » » » //
224 » » » // If the registration included a terminal index, apply our standard
225 » » » // parameters to the archival. Since TerminateStream wil l not be called,
226 » » » // this will be our formal optimistic archival task.
227 » » » params := standardArchivalParams(cfg, pcfg)
228 » » » cat := mutations.CreateArchiveTask{
229 » » » » ID: ls.ID,
230 » » » }
231 » » » if req.TerminalIndex < 0 {
232 » » » » // No terminal index, schedule pessimistic clean up archival.
233 » » » » cat.Expiration = now.Add(params.CompletePeriod)
234
235 » » » » log.Fields{
236 » » » » » "deadline": cat.Expiration,
237 » » » » }.Debugf(c, "Scheduling cleanup archival mutatio n.")
238 » » » } else {
239 » » » » // Terminal index, schedule optimistic archival (mirrors TerminateStream).
240 » » » » cat.SettleDelay = params.SettleDelay
241 » » » » cat.CompletePeriod = params.CompletePeriod
242
243 » » » » // Schedule this mutation to execute after our s ettle delay.
244 » » » » cat.Expiration = now.Add(params.SettleDelay)
245
246 » » » » log.Fields{
247 » » » » » "settleDelay": cat.SettleDelay,
248 » » » » » "completePeriod": cat.CompletePeriod,
249 » » » » » "scheduledAt": cat.Expiration,
250 » » » » }.Debugf(c, "Scheduling archival mutation.")
251 » » » }
252
253 » » » aeName := cat.TaskName(c)
254 » » » if err := tumble.PutNamedMutations(c, lsKey, map[string] tumble.Mutation{aeName: &cat}); err != nil {
255 » » » » log.WithError(err).Errorf(c, "Failed to write na med mutations.")
256 » » » » return nil, grpcutil.Internal
257 » » » }
258
259 » » » return nil, nil
173 }) 260 })
174 if err != nil { 261 if err != nil {
175 log.Fields{ 262 log.Fields{
176 log.ErrorKey: err, 263 log.ErrorKey: err,
177 }.Errorf(c, "Failed to register LogStream.") 264 }.Errorf(c, "Failed to register LogStream.")
178 return nil, err 265 return nil, err
179 } 266 }
180 } 267 }
181 268
182 return &logdog.RegisterStreamResponse{ 269 return &logdog.RegisterStreamResponse{
183 Id: string(ls.ID), 270 Id: string(ls.ID),
184 State: buildLogStreamState(ls, lst), 271 State: buildLogStreamState(ls, lst),
185 }, nil 272 }, nil
186 } 273 }
187
188 type registerStreamMutation struct {
189 *logdog.RegisterStreamRequest
190
191 cfg *config.Config
192 pcfg *svcconfig.ProjectConfig
193
194 desc *logpb.LogStreamDescriptor
195 pfx *coordinator.LogPrefix
196 ls *coordinator.LogStream
197 lst *coordinator.LogStreamState
198 }
199
200 func (m *registerStreamMutation) RollForward(c context.Context) ([]tumble.Mutati on, error) {
201 // Load our state and stream (transactional).
202 switch err := ds.Get(c, m.ls, m.lst); {
203 case err == nil:
204 // The stream is already registered.
205 return nil, nil
206
207 case !anyNoSuchEntity(err):
208 log.WithError(err).Errorf(c, "Failed to check for stream registr ation (transactional).")
209 return nil, err
210 }
211
212 // The stream is not yet registered.
213 log.Infof(c, "Registering new log stream.")
214
215 // Construct our LogStreamState.
216 now := clock.Now(c).UTC()
217 m.lst.Created = now
218 m.lst.Updated = now
219 m.lst.Secret = m.pfx.Secret // Copy Prefix Secret to reduce datastore Ge ts.
220
221 // Construct our LogStream.
222 m.ls.Created = now
223 m.ls.ProtoVersion = m.ProtoVersion
224
225 if err := m.ls.LoadDescriptor(m.desc); err != nil {
226 log.Fields{
227 log.ErrorKey: err,
228 }.Errorf(c, "Failed to load descriptor into LogStream.")
229 return nil, grpcutil.Errf(codes.InvalidArgument, "Failed to load descriptor.")
230 }
231
232 // If our registration request included a terminal index, terminate the
233 // log stream state as well.
234 if m.TerminalIndex >= 0 {
235 log.Fields{
236 "terminalIndex": m.TerminalIndex,
237 }.Debugf(c, "Registration request included terminal index.")
238
239 m.lst.TerminalIndex = m.TerminalIndex
240 m.lst.TerminatedTime = now
241 } else {
242 m.lst.TerminalIndex = -1
243 }
244
245 if err := ds.Put(c, m.ls, m.lst); err != nil {
246 log.Fields{
247 log.ErrorKey: err,
248 }.Errorf(c, "Failed to Put LogStream.")
249 return nil, grpcutil.Internal
250 }
251
252 // Add a named delayed mutation to archive this stream if it's not archi ved
253 // yet.
254 //
255 // If the registration did not include a terminal index, this will be ou r
256 // pessimistic archival request, scheduled on registration to catch stre ams
257 // that don't expire. This mutation will be replaced by the optimistic
258 // archival mutation when/if the stream is terminated via TerminateStrea m.
259 //
260 // If the registration included a terminal index, apply our standard
261 // parameters to the archival. Since TerminateStream will not be called,
262 // this will be our formal optimistic archival task.
263 params := standardArchivalParams(m.cfg, m.pcfg)
264 cat := mutations.CreateArchiveTask{
265 ID: m.ls.ID,
266 }
267 if m.TerminalIndex < 0 {
268 // No terminal index, schedule pessimistic cleanup archival.
269 cat.Expiration = now.Add(params.CompletePeriod)
270
271 log.Fields{
272 "deadline": cat.Expiration,
273 }.Debugf(c, "Scheduling cleanup archival mutation.")
274 } else {
275 // Terminal index, schedule optimistic archival (mirrors Termina teStream).
276 cat.SettleDelay = params.SettleDelay
277 cat.CompletePeriod = params.CompletePeriod
278
279 // Schedule this mutation to execute after our settle delay.
280 cat.Expiration = now.Add(params.SettleDelay)
281
282 log.Fields{
283 "settleDelay": cat.SettleDelay,
284 "completePeriod": cat.CompletePeriod,
285 "scheduledAt": cat.Expiration,
286 }.Debugf(c, "Scheduling archival mutation.")
287 }
288
289 aeParent, aeName := cat.TaskName(c)
290 if err := tumble.PutNamedMutations(c, aeParent, map[string]tumble.Mutati on{aeName: &cat}); err != nil {
291 log.WithError(err).Errorf(c, "Failed to write named mutations.")
292 return nil, grpcutil.Internal
293 }
294
295 return nil, nil
296 }
297
298 func (m *registerStreamMutation) Root(c context.Context) *ds.Key {
299 return ds.KeyForObj(c, m.ls)
300 }
OLDNEW
« no previous file with comments | « no previous file | logdog/appengine/coordinator/endpoints/services/registerStream_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698