| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package services | 5 package services |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "time" | 8 "time" |
| 9 | 9 |
| 10 "github.com/golang/protobuf/proto" | 10 "github.com/golang/protobuf/proto" |
| 11 ds "github.com/luci/gae/service/datastore" | 11 ds "github.com/luci/gae/service/datastore" |
| 12 "github.com/luci/luci-go/appengine/logdog/coordinator" | 12 "github.com/luci/luci-go/appengine/logdog/coordinator" |
| 13 "github.com/luci/luci-go/appengine/logdog/coordinator/hierarchy" | 13 "github.com/luci/luci-go/appengine/logdog/coordinator/hierarchy" |
| 14 "github.com/luci/luci-go/appengine/logdog/coordinator/mutations" | 14 "github.com/luci/luci-go/appengine/logdog/coordinator/mutations" |
| 15 "github.com/luci/luci-go/appengine/tumble" | 15 "github.com/luci/luci-go/appengine/tumble" |
| 16 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" | 16 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
| 17 "github.com/luci/luci-go/common/clock" | 17 "github.com/luci/luci-go/common/clock" |
| 18 "github.com/luci/luci-go/common/grpcutil" | 18 "github.com/luci/luci-go/common/grpcutil" |
| 19 "github.com/luci/luci-go/common/logdog/types" | 19 "github.com/luci/luci-go/common/logdog/types" |
| 20 log "github.com/luci/luci-go/common/logging" | 20 log "github.com/luci/luci-go/common/logging" |
| 21 "github.com/luci/luci-go/common/proto/logdog/logpb" | 21 "github.com/luci/luci-go/common/proto/logdog/logpb" |
| 22 "github.com/luci/luci-go/common/proto/logdog/svcconfig" |
| 22 "golang.org/x/net/context" | 23 "golang.org/x/net/context" |
| 23 "google.golang.org/grpc/codes" | 24 "google.golang.org/grpc/codes" |
| 24 ) | 25 ) |
| 25 | 26 |
| 26 func buildLogStreamState(ls *coordinator.LogStream, lst *coordinator.LogStreamSt
ate) *logdog.LogStreamState { | 27 func buildLogStreamState(ls *coordinator.LogStream, lst *coordinator.LogStreamSt
ate) *logdog.LogStreamState { |
| 27 st := logdog.LogStreamState{ | 28 st := logdog.LogStreamState{ |
| 28 ProtoVersion: ls.ProtoVersion, | 29 ProtoVersion: ls.ProtoVersion, |
| 29 Secret: lst.Secret, | 30 Secret: lst.Secret, |
| 30 TerminalIndex: lst.TerminalIndex, | 31 TerminalIndex: lst.TerminalIndex, |
| 31 Archived: lst.ArchivalState().Archived(), | 32 Archived: lst.ArchivalState().Archived(), |
| 32 Purged: ls.Purged, | 33 Purged: ls.Purged, |
| 33 } | 34 } |
| 34 if !lst.Terminated() { | 35 if !lst.Terminated() { |
| 35 st.TerminalIndex = -1 | 36 st.TerminalIndex = -1 |
| 36 } | 37 } |
| 37 return &st | 38 return &st |
| 38 } | 39 } |
| 39 | 40 |
| 41 func resolveArchiveDelay(cfg *svcconfig.Coordinator, pcfg *svcconfig.ProjectConf
ig) (delay time.Duration) { |
| 42 if d := cfg.ArchiveDelayMax.Duration(); d > 0 { |
| 43 delay = d |
| 44 } |
| 45 if d := pcfg.MaxStreamAge.Duration(); d > 0 && d < delay { |
| 46 delay = d |
| 47 } |
| 48 return |
| 49 } |
| 50 |
| 40 // RegisterStream is an idempotent stream state register operation. | 51 // RegisterStream is an idempotent stream state register operation. |
| 41 // | 52 // |
| 42 // Successive operations will succeed if they have the correct secret for their | 53 // Successive operations will succeed if they have the correct secret for their |
| 43 // registered stream, regardless of whether the contents of their request match | 54 // registered stream, regardless of whether the contents of their request match |
| 44 // the currently registered state. | 55 // the currently registered state. |
| 45 func (s *server) RegisterStream(c context.Context, req *logdog.RegisterStreamReq
uest) (*logdog.RegisterStreamResponse, error) { | 56 func (s *server) RegisterStream(c context.Context, req *logdog.RegisterStreamReq
uest) (*logdog.RegisterStreamResponse, error) { |
| 46 var path types.StreamPath | 57 var path types.StreamPath |
| 47 | 58 |
| 48 // Unmarshal the serialized protobuf. | 59 // Unmarshal the serialized protobuf. |
| 49 var desc logpb.LogStreamDescriptor | 60 var desc logpb.LogStreamDescriptor |
| (...skipping 18 matching lines...) Expand all Loading... |
| 68 log.Fields{ | 79 log.Fields{ |
| 69 "project": req.Project, | 80 "project": req.Project, |
| 70 "path": path, | 81 "path": path, |
| 71 }.Infof(c, "Registration request for log stream.") | 82 }.Infof(c, "Registration request for log stream.") |
| 72 | 83 |
| 73 if err := desc.Validate(true); err != nil { | 84 if err := desc.Validate(true); err != nil { |
| 74 return nil, grpcutil.Errf(codes.InvalidArgument, "Invalid log st
ream descriptor: %s", err) | 85 return nil, grpcutil.Errf(codes.InvalidArgument, "Invalid log st
ream descriptor: %s", err) |
| 75 } | 86 } |
| 76 prefix, _ := path.Split() | 87 prefix, _ := path.Split() |
| 77 | 88 |
| 78 » // Load our config and archive expiration. | 89 » // Load our service and project configs. |
| 79 cfg, err := coordinator.GetServices(c).Config(c) | 90 cfg, err := coordinator.GetServices(c).Config(c) |
| 80 if err != nil { | 91 if err != nil { |
| 81 log.WithError(err).Errorf(c, "Failed to load configuration.") | 92 log.WithError(err).Errorf(c, "Failed to load configuration.") |
| 82 return nil, grpcutil.Internal | 93 return nil, grpcutil.Internal |
| 83 } | 94 } |
| 84 | 95 |
| 85 » archiveDelayMax := cfg.Coordinator.ArchiveDelayMax.Duration() | 96 » pcfg, err := coordinator.CurrentProjectConfig(c) |
| 86 » if archiveDelayMax < 0 { | 97 » if err != nil { |
| 87 » » log.Fields{ | 98 » » log.WithError(err).Errorf(c, "Failed to load current project con
figuration.") |
| 88 » » » "archiveDelayMax": archiveDelayMax, | |
| 89 » » }.Errorf(c, "Must have positive maximum archive delay.") | |
| 90 return nil, grpcutil.Internal | 99 return nil, grpcutil.Internal |
| 91 } | 100 } |
| 92 | 101 |
| 102 // Determine the archival expiration. |
| 103 archiveDelayMax := resolveArchiveDelay(cfg.Coordinator, pcfg) |
| 104 |
| 93 // Register our Prefix. | 105 // Register our Prefix. |
| 94 // | 106 // |
| 95 // This will also verify that our request secret matches the registered
one, | 107 // This will also verify that our request secret matches the registered
one, |
| 96 // if one is registered. | 108 // if one is registered. |
| 97 // | 109 // |
| 98 // Note: This step will not be necessary once a "register prefix" RPC ca
ll is | 110 // Note: This step will not be necessary once a "register prefix" RPC ca
ll is |
| 99 // implemented. | 111 // implemented. |
| 100 lsp := logStreamPrefix{ | 112 lsp := logStreamPrefix{ |
| 101 prefix: string(prefix), | 113 prefix: string(prefix), |
| 102 secret: req.Secret, | 114 secret: req.Secret, |
| (...skipping 137 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 240 return nil, grpcutil.Internal | 252 return nil, grpcutil.Internal |
| 241 } | 253 } |
| 242 } | 254 } |
| 243 | 255 |
| 244 return nil, nil | 256 return nil, nil |
| 245 } | 257 } |
| 246 | 258 |
| 247 func (m *registerStreamMutation) Root(c context.Context) *ds.Key { | 259 func (m *registerStreamMutation) Root(c context.Context) *ds.Key { |
| 248 return ds.Get(c).KeyForObj(m.ls) | 260 return ds.Get(c).KeyForObj(m.ls) |
| 249 } | 261 } |
| OLD | NEW |