Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package registration | 5 package registration |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 ds "github.com/luci/gae/service/datastore" | |
| 9 "golang.org/x/net/context" | |
| 10 "google.golang.org/grpc/codes" | |
|
nodir
2016/05/17 16:19:47
A tool that organizes blocks of imports will proba
| |
| 11 | |
| 12 "github.com/luci/luci-go/appengine/logdog/coordinator" | |
| 13 "github.com/luci/luci-go/appengine/logdog/coordinator/hierarchy" | |
| 8 "github.com/luci/luci-go/common/api/logdog_coordinator/registration/v1" | 14 "github.com/luci/luci-go/common/api/logdog_coordinator/registration/v1" |
| 15 "github.com/luci/luci-go/common/clock" | |
| 16 "github.com/luci/luci-go/common/cryptorand" | |
| 17 "github.com/luci/luci-go/common/gcloud/pubsub" | |
| 9 "github.com/luci/luci-go/common/grpcutil" | 18 "github.com/luci/luci-go/common/grpcutil" |
| 10 » "golang.org/x/net/context" | 19 » "github.com/luci/luci-go/common/logdog/types" |
| 20 » log "github.com/luci/luci-go/common/logging" | |
| 11 ) | 21 ) |
| 12 | 22 |
| 13 func (s *server) RegisterPrefix(c context.Context, req *logdog.RegisterPrefixReq uest) (*logdog.RegisterPrefixResponse, error) { | 23 func (s *server) RegisterPrefix(c context.Context, req *logdog.RegisterPrefixReq uest) (*logdog.RegisterPrefixResponse, error) { |
| 14 » // TODO(dnj): Actually implement this endpoint. | 24 » log.Fields{ |
| 15 » return nil, grpcutil.Unimplemented | 25 » » "project": req.Project, |
| 26 » » "prefix": req.Prefix, | |
| 27 » » "source": req.SourceInfo, | |
| 28 » » "expiration": req.Expiration.Duration(), | |
| 29 » }.Debugf(c, "Registering log prefix.") | |
| 30 | |
| 31 » // Confirm that the Prefix is a valid stream name. | |
| 32 » prefix := types.StreamName(req.Prefix) | |
| 33 » if err := prefix.Validate(); err != nil { | |
| 34 » » log.WithError(err).Warningf(c, "Invalid prefix.") | |
| 35 » » return nil, grpcutil.Errf(codes.InvalidArgument, "invalid prefix ") | |
| 36 » } | |
| 37 | |
| 38 » // Has the prefix already been registered? | |
| 39 » pfx := &coordinator.LogPrefix{ID: coordinator.LogPrefixID(prefix)} | |
| 40 | |
| 41 » // Check for existing prefix registration (non-transactional). | |
| 42 » di := ds.Get(c) | |
| 43 » switch exists, err := di.Exists(di.KeyForObj(pfx)); { | |
| 44 » case err != nil: | |
| 45 » » log.WithError(err).Errorf(c, "Failed to check for existing prefi x (non-transactional).") | |
| 46 » » return nil, grpcutil.Internal | |
| 47 | |
| 48 » case exists: | |
| 49 » » log.Errorf(c, "The prefix is already registered (non-transaction al).") | |
| 50 » » return nil, grpcutil.AlreadyExists | |
| 51 » } | |
| 52 | |
| 53 » // Load our service and project configurations. | |
| 54 » svcs := coordinator.GetServices(c) | |
| 55 » cfg, err := svcs.Config(c) | |
| 56 » if err != nil { | |
| 57 » » log.WithError(err).Errorf(c, "Failed to load service configurati on.") | |
| 58 » » return nil, grpcutil.Internal | |
| 59 » } | |
| 60 | |
| 61 » // Determine our Pub/Sub topic. | |
| 62 » cfgTransport := cfg.Transport | |
|
nodir
2016/05/17 16:19:47
Why do you introduce a new variable for that?
dnj (Google)
2016/05/17 16:32:54
I don't want to dereference the same thing three t
nodir
2016/05/19 00:48:32
without variable performance is about the same and
| |
| 63 » if cfgTransport == nil { | |
| 64 » » log.Errorf(c, "Missing transport configuration.") | |
| 65 » » return nil, grpcutil.Internal | |
| 66 » } | |
| 67 | |
| 68 » cfgTransportPubSub := cfgTransport.GetPubsub() | |
| 69 » if cfgTransportPubSub == nil { | |
| 70 » » log.Errorf(c, "Missing transport Pub/Sub configuration.") | |
| 71 » » return nil, grpcutil.Internal | |
| 72 » } | |
| 73 | |
| 74 » pubsubTopic := pubsub.NewTopic(cfgTransportPubSub.Project, cfgTransportP ubSub.Topic) | |
| 75 » if err := pubsubTopic.Validate(); err != nil { | |
| 76 » » log.Fields{ | |
| 77 » » » log.ErrorKey: err, | |
| 78 » » » "topic": pubsubTopic, | |
| 79 » » }.Errorf(c, "Invalid transport Pub/Sub topic.") | |
| 80 » » return nil, grpcutil.Internal | |
| 81 » } | |
| 82 | |
| 83 » // Best effort: register the stream prefix hierarchy components, includi ng the | |
| 84 » // separator. | |
| 85 » // | |
| 86 » // Determine which hierarchy components we need to add. | |
| 87 » comps := hierarchy.Components(prefix.AsPathPrefix("")) | |
| 88 » if comps, err = hierarchy.Missing(di, comps); err != nil { | |
| 89 » » log.WithError(err).Warningf(c, "Failed to probe for missing hier archy components.") | |
| 90 » } | |
| 91 | |
| 92 » // Before we go into transaction, try and put these entries. This should not | |
| 93 » // be contested, since components don't share an entity root. | |
| 94 » // | |
| 95 » // If this fails, that's okay; we'll handle this when the stream gets | |
| 96 » // registered. | |
| 97 » if err := hierarchy.PutMulti(di, comps); err != nil { | |
| 98 » » log.WithError(err).Infof(c, "Failed to add missing hierarchy com ponents.") | |
| 99 » } | |
| 100 | |
| 101 » // The prefix doesn't appear to be registered. Prepare to transactionall y | |
| 102 » // register it. | |
| 103 » now := clock.Now(c).UTC() | |
| 104 | |
| 105 » // Generate a prefix secret. | |
| 106 » secret := make(types.PrefixSecret, types.PrefixSecretLength) | |
| 107 » if _, err := cryptorand.Read(c, []byte(secret)); err != nil { | |
| 108 » » log.WithError(err).Errorf(c, "Failed to generate prefix secret." ) | |
| 109 » » return nil, grpcutil.Internal | |
| 110 » } | |
| 111 » if err := secret.Validate(); err != nil { | |
| 112 » » log.WithError(err).Errorf(c, "Generated invalid prefix secret.") | |
| 113 » » return nil, grpcutil.Internal | |
| 114 » } | |
| 115 | |
| 116 » // Transactionally register the prefix. | |
| 117 » err = di.RunInTransaction(func(c context.Context) error { | |
| 118 » » di := ds.Get(c) | |
| 119 | |
| 120 » » // Check if this Prefix exists (transactional). | |
| 121 » » switch exists, err := di.Exists(di.KeyForObj(pfx)); { | |
| 122 » » case err != nil: | |
| 123 » » » log.WithError(err).Errorf(c, "Failed to check for existi ng prefix (transactional).") | |
| 124 » » » return grpcutil.Internal | |
| 125 | |
| 126 » » case exists: | |
| 127 » » » log.Errorf(c, "The prefix is already registered (transac tional).") | |
| 128 » » » return grpcutil.AlreadyExists | |
| 129 » » } | |
| 130 | |
| 131 » » // The Prefix is not registered, so let's register it. | |
| 132 » » pfx.Created = now | |
| 133 » » pfx.Prefix = string(prefix) | |
| 134 » » pfx.Source = req.SourceInfo | |
| 135 » » pfx.Secret = []byte(secret) | |
| 136 | |
| 137 » » if err := di.Put(pfx); err != nil { | |
| 138 » » » log.WithError(err).Errorf(c, "Failed to register prefix. ") | |
| 139 » » » return grpcutil.Internal | |
| 140 » » } | |
| 141 | |
| 142 » » log.Infof(c, "The prefix was successfully registered.") | |
| 143 » » return nil | |
| 144 » }, nil) | |
| 145 » if err != nil { | |
| 146 » » log.WithError(err).Errorf(c, "Failed to register prefix (transac tional).") | |
| 147 » » return nil, err | |
| 148 » } | |
| 149 | |
| 150 » return &logdog.RegisterPrefixResponse{ | |
| 151 » » Secret: []byte(secret), | |
| 152 » » LogBundleTopic: string(pubsubTopic), | |
| 153 » }, nil | |
| 16 } | 154 } |
| OLD | NEW |