OLD | NEW |
---|---|
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package services | 5 package services |
6 | 6 |
7 import ( | 7 import ( |
8 "time" | 8 "time" |
9 | 9 |
10 "github.com/golang/protobuf/proto" | 10 "github.com/golang/protobuf/proto" |
11 ds "github.com/luci/gae/service/datastore" | 11 ds "github.com/luci/gae/service/datastore" |
12 "github.com/luci/luci-go/appengine/logdog/coordinator" | 12 "github.com/luci/luci-go/appengine/logdog/coordinator" |
13 "github.com/luci/luci-go/appengine/logdog/coordinator/hierarchy" | 13 "github.com/luci/luci-go/appengine/logdog/coordinator/hierarchy" |
14 "github.com/luci/luci-go/appengine/logdog/coordinator/mutations" | 14 "github.com/luci/luci-go/appengine/logdog/coordinator/mutations" |
15 "github.com/luci/luci-go/appengine/tumble" | 15 "github.com/luci/luci-go/appengine/tumble" |
16 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" | 16 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
17 "github.com/luci/luci-go/common/clock" | 17 "github.com/luci/luci-go/common/clock" |
18 "github.com/luci/luci-go/common/grpcutil" | 18 "github.com/luci/luci-go/common/grpcutil" |
19 "github.com/luci/luci-go/common/logdog/types" | 19 "github.com/luci/luci-go/common/logdog/types" |
20 log "github.com/luci/luci-go/common/logging" | 20 log "github.com/luci/luci-go/common/logging" |
21 "github.com/luci/luci-go/common/proto/logdog/logpb" | 21 "github.com/luci/luci-go/common/proto/logdog/logpb" |
22 "github.com/luci/luci-go/common/proto/logdog/svcconfig" | |
22 "golang.org/x/net/context" | 23 "golang.org/x/net/context" |
23 "google.golang.org/grpc/codes" | 24 "google.golang.org/grpc/codes" |
24 ) | 25 ) |
25 | 26 |
26 func buildLogStreamState(ls *coordinator.LogStream, lst *coordinator.LogStreamSt ate) *logdog.LogStreamState { | 27 func buildLogStreamState(ls *coordinator.LogStream, lst *coordinator.LogStreamSt ate) *logdog.LogStreamState { |
27 st := logdog.LogStreamState{ | 28 st := logdog.LogStreamState{ |
28 ProtoVersion: ls.ProtoVersion, | 29 ProtoVersion: ls.ProtoVersion, |
29 Secret: lst.Secret, | 30 Secret: lst.Secret, |
30 TerminalIndex: lst.TerminalIndex, | 31 TerminalIndex: lst.TerminalIndex, |
31 Archived: lst.ArchivalState().Archived(), | 32 Archived: lst.ArchivalState().Archived(), |
32 Purged: ls.Purged, | 33 Purged: ls.Purged, |
33 } | 34 } |
34 if !lst.Terminated() { | 35 if !lst.Terminated() { |
35 st.TerminalIndex = -1 | 36 st.TerminalIndex = -1 |
36 } | 37 } |
37 return &st | 38 return &st |
38 } | 39 } |
39 | 40 |
41 func resolveArchiveDelay(cfg *svcconfig.Coordinator, pcfg *svcconfig.ProjectConf ig) (delay time.Duration) { | |
42 if d := cfg.ArchiveDelayMax.Duration(); d > 0 { | |
43 delay = d | |
44 } | |
45 if d := pcfg.MaxStreamAge.Duration(); d > 0 && d < delay { | |
nodir
2016/05/19 00:38:58
if it is not set in global config, but set in pro
dnj (Google)
2016/05/19 16:12:34
Good catch. Since I use this same method to constr
| |
46 delay = d | |
47 } | |
48 return | |
49 } | |
50 | |
40 // RegisterStream is an idempotent stream state register operation. | 51 // RegisterStream is an idempotent stream state register operation. |
41 // | 52 // |
42 // Successive operations will succeed if they have the correct secret for their | 53 // Successive operations will succeed if they have the correct secret for their |
43 // registered stream, regardless of whether the contents of their request match | 54 // registered stream, regardless of whether the contents of their request match |
44 // the currently registered state. | 55 // the currently registered state. |
45 func (s *server) RegisterStream(c context.Context, req *logdog.RegisterStreamReq uest) (*logdog.RegisterStreamResponse, error) { | 56 func (s *server) RegisterStream(c context.Context, req *logdog.RegisterStreamReq uest) (*logdog.RegisterStreamResponse, error) { |
46 var path types.StreamPath | 57 var path types.StreamPath |
47 | 58 |
48 // Unmarshal the serialized protobuf. | 59 // Unmarshal the serialized protobuf. |
49 var desc logpb.LogStreamDescriptor | 60 var desc logpb.LogStreamDescriptor |
(...skipping 18 matching lines...) Expand all Loading... | |
68 log.Fields{ | 79 log.Fields{ |
69 "project": req.Project, | 80 "project": req.Project, |
70 "path": path, | 81 "path": path, |
71 }.Infof(c, "Registration request for log stream.") | 82 }.Infof(c, "Registration request for log stream.") |
72 | 83 |
73 if err := desc.Validate(true); err != nil { | 84 if err := desc.Validate(true); err != nil { |
74 return nil, grpcutil.Errf(codes.InvalidArgument, "Invalid log st ream descriptor: %s", err) | 85 return nil, grpcutil.Errf(codes.InvalidArgument, "Invalid log st ream descriptor: %s", err) |
75 } | 86 } |
76 prefix, _ := path.Split() | 87 prefix, _ := path.Split() |
77 | 88 |
78 » // Load our config and archive expiration. | 89 » // Load our service and project configs. |
79 cfg, err := coordinator.GetServices(c).Config(c) | 90 cfg, err := coordinator.GetServices(c).Config(c) |
80 if err != nil { | 91 if err != nil { |
81 log.WithError(err).Errorf(c, "Failed to load configuration.") | 92 log.WithError(err).Errorf(c, "Failed to load configuration.") |
82 return nil, grpcutil.Internal | 93 return nil, grpcutil.Internal |
83 } | 94 } |
84 | 95 |
85 » archiveDelayMax := cfg.Coordinator.ArchiveDelayMax.Duration() | 96 » pcfg, err := coordinator.CurrentProjectConfig(c) |
86 » if archiveDelayMax < 0 { | 97 » if err != nil { |
87 » » log.Fields{ | 98 » » log.WithError(err).Errorf(c, "Failed to load current project con figuration.") |
88 » » » "archiveDelayMax": archiveDelayMax, | |
89 » » }.Errorf(c, "Must have positive maximum archive delay.") | |
90 return nil, grpcutil.Internal | 99 return nil, grpcutil.Internal |
91 } | 100 } |
92 | 101 |
102 // Determine the archival expiration. | |
103 archiveDelayMax := resolveArchiveDelay(cfg.Coordinator, pcfg) | |
104 | |
93 // Register our Prefix. | 105 // Register our Prefix. |
94 // | 106 // |
95 // This will also verify that our request secret matches the registered one, | 107 // This will also verify that our request secret matches the registered one, |
96 // if one is registered. | 108 // if one is registered. |
97 // | 109 // |
98 // Note: This step will not be necessary once a "register prefix" RPC ca ll is | 110 // Note: This step will not be necessary once a "register prefix" RPC ca ll is |
99 // implemented. | 111 // implemented. |
100 lsp := logStreamPrefix{ | 112 lsp := logStreamPrefix{ |
101 prefix: string(prefix), | 113 prefix: string(prefix), |
102 secret: req.Secret, | 114 secret: req.Secret, |
(...skipping 137 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
240 return nil, grpcutil.Internal | 252 return nil, grpcutil.Internal |
241 } | 253 } |
242 } | 254 } |
243 | 255 |
244 return nil, nil | 256 return nil, nil |
245 } | 257 } |
246 | 258 |
247 func (m *registerStreamMutation) Root(c context.Context) *ds.Key { | 259 func (m *registerStreamMutation) Root(c context.Context) *ds.Key { |
248 return ds.Get(c).KeyForObj(m.ls) | 260 return ds.Get(c).KeyForObj(m.ls) |
249 } | 261 } |
OLD | NEW |