| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package registration | 5 package registration |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 ds "github.com/luci/gae/service/datastore" |
| 9 "golang.org/x/net/context" |
| 10 "google.golang.org/grpc/codes" |
| 11 |
| 12 "github.com/luci/luci-go/appengine/logdog/coordinator" |
| 13 "github.com/luci/luci-go/appengine/logdog/coordinator/hierarchy" |
| 8 "github.com/luci/luci-go/common/api/logdog_coordinator/registration/v1" | 14 "github.com/luci/luci-go/common/api/logdog_coordinator/registration/v1" |
| 15 "github.com/luci/luci-go/common/clock" |
| 16 "github.com/luci/luci-go/common/cryptorand" |
| 17 "github.com/luci/luci-go/common/gcloud/pubsub" |
| 9 "github.com/luci/luci-go/common/grpcutil" | 18 "github.com/luci/luci-go/common/grpcutil" |
| 10 » "golang.org/x/net/context" | 19 » "github.com/luci/luci-go/common/logdog/types" |
| 20 » log "github.com/luci/luci-go/common/logging" |
| 11 ) | 21 ) |
| 12 | 22 |
| 13 func (s *server) RegisterPrefix(c context.Context, req *logdog.RegisterPrefixReq
uest) (*logdog.RegisterPrefixResponse, error) { | 23 func (s *server) RegisterPrefix(c context.Context, req *logdog.RegisterPrefixReq
uest) (*logdog.RegisterPrefixResponse, error) { |
| 14 » // TODO(dnj): Actually implement this endpoint. | 24 » log.Fields{ |
| 15 » return nil, grpcutil.Unimplemented | 25 » » "project": req.Project, |
| 26 » » "prefix": req.Prefix, |
| 27 » » "source": req.SourceInfo, |
| 28 » » "expiration": req.Expiration.Duration(), |
| 29 » }.Debugf(c, "Registering log prefix.") |
| 30 |
| 31 » // Confirm that the Prefix is a valid stream name. |
| 32 » prefix := types.StreamName(req.Prefix) |
| 33 » if err := prefix.Validate(); err != nil { |
| 34 » » log.WithError(err).Warningf(c, "Invalid prefix.") |
| 35 » » return nil, grpcutil.Errf(codes.InvalidArgument, "invalid prefix
") |
| 36 » } |
| 37 |
| 38 » // Has the prefix already been registered? |
| 39 » pfx := &coordinator.LogPrefix{ID: coordinator.LogPrefixID(prefix)} |
| 40 |
| 41 » // Check for existing prefix registration (non-transactional). |
| 42 » di := ds.Get(c) |
| 43 » switch exists, err := di.Exists(di.KeyForObj(pfx)); { |
| 44 » case err != nil: |
| 45 » » log.WithError(err).Errorf(c, "Failed to check for existing prefi
x (non-transactional).") |
| 46 » » return nil, grpcutil.Internal |
| 47 |
| 48 » case exists: |
| 49 » » log.Errorf(c, "The prefix is already registered (non-transaction
al).") |
| 50 » » return nil, grpcutil.AlreadyExists |
| 51 » } |
| 52 |
| 53 » // Load our service and project configurations. |
| 54 » svcs := coordinator.GetServices(c) |
| 55 » cfg, err := svcs.Config(c) |
| 56 » if err != nil { |
| 57 » » log.WithError(err).Errorf(c, "Failed to load service configurati
on.") |
| 58 » » return nil, grpcutil.Internal |
| 59 » } |
| 60 |
| 61 » // Determine our Pub/Sub topic. |
| 62 » cfgTransport := cfg.Transport |
| 63 » if cfgTransport == nil { |
| 64 » » log.Errorf(c, "Missing transport configuration.") |
| 65 » » return nil, grpcutil.Internal |
| 66 » } |
| 67 |
| 68 » cfgTransportPubSub := cfgTransport.GetPubsub() |
| 69 » if cfgTransportPubSub == nil { |
| 70 » » log.Errorf(c, "Missing transport Pub/Sub configuration.") |
| 71 » » return nil, grpcutil.Internal |
| 72 » } |
| 73 |
| 74 » pubsubTopic := pubsub.NewTopic(cfgTransportPubSub.Project, cfgTransportP
ubSub.Topic) |
| 75 » if err := pubsubTopic.Validate(); err != nil { |
| 76 » » log.Fields{ |
| 77 » » » log.ErrorKey: err, |
| 78 » » » "topic": pubsubTopic, |
| 79 » » }.Errorf(c, "Invalid transport Pub/Sub topic.") |
| 80 » » return nil, grpcutil.Internal |
| 81 » } |
| 82 |
| 83 » // Best effort: register the stream prefix hierarchy components, includi
ng the |
| 84 » // separator. |
| 85 » // |
| 86 » // Determine which hierarchy components we need to add. |
| 87 » comps := hierarchy.Components(prefix.AsPathPrefix("")) |
| 88 » if comps, err = hierarchy.Missing(di, comps); err != nil { |
| 89 » » log.WithError(err).Warningf(c, "Failed to probe for missing hier
archy components.") |
| 90 » } |
| 91 |
| 92 » // Before we go into transaction, try and put these entries. This should
not |
| 93 » // be contested, since components don't share an entity root. |
| 94 » // |
| 95 » // If this fails, that's okay; we'll handle this when the stream gets |
| 96 » // registered. |
| 97 » if err := hierarchy.PutMulti(di, comps); err != nil { |
| 98 » » log.WithError(err).Infof(c, "Failed to add missing hierarchy com
ponents.") |
| 99 » } |
| 100 |
| 101 » // The prefix doesn't appear to be registered. Prepare to transactionall
y |
| 102 » // register it. |
| 103 » now := clock.Now(c).UTC() |
| 104 |
| 105 » // Generate a prefix secret. |
| 106 » secret := make(types.PrefixSecret, types.PrefixSecretLength) |
| 107 » if _, err := cryptorand.Read(c, []byte(secret)); err != nil { |
| 108 » » log.WithError(err).Errorf(c, "Failed to generate prefix secret."
) |
| 109 » » return nil, grpcutil.Internal |
| 110 » } |
| 111 » if err := secret.Validate(); err != nil { |
| 112 » » log.WithError(err).Errorf(c, "Generated invalid prefix secret.") |
| 113 » » return nil, grpcutil.Internal |
| 114 » } |
| 115 |
| 116 » // Transactionally register the prefix. |
| 117 » err = di.RunInTransaction(func(c context.Context) error { |
| 118 » » di := ds.Get(c) |
| 119 |
| 120 » » // Check if this Prefix exists (transactional). |
| 121 » » switch exists, err := di.Exists(di.KeyForObj(pfx)); { |
| 122 » » case err != nil: |
| 123 » » » log.WithError(err).Errorf(c, "Failed to check for existi
ng prefix (transactional).") |
| 124 » » » return grpcutil.Internal |
| 125 |
| 126 » » case exists: |
| 127 » » » log.Errorf(c, "The prefix is already registered (transac
tional).") |
| 128 » » » return grpcutil.AlreadyExists |
| 129 » » } |
| 130 |
| 131 » » // The Prefix is not registered, so let's register it. |
| 132 » » pfx.Created = now |
| 133 » » pfx.Prefix = string(prefix) |
| 134 » » pfx.Source = req.SourceInfo |
| 135 » » pfx.Secret = []byte(secret) |
| 136 |
| 137 » » if err := di.Put(pfx); err != nil { |
| 138 » » » log.WithError(err).Errorf(c, "Failed to register prefix.
") |
| 139 » » » return grpcutil.Internal |
| 140 » » } |
| 141 |
| 142 » » log.Infof(c, "The prefix was successfully registered.") |
| 143 » » return nil |
| 144 » }, nil) |
| 145 » if err != nil { |
| 146 » » log.WithError(err).Errorf(c, "Failed to register prefix (transac
tional).") |
| 147 » » return nil, err |
| 148 » } |
| 149 |
| 150 » return &logdog.RegisterPrefixResponse{ |
| 151 » » Secret: []byte(secret), |
| 152 » » LogBundleTopic: string(pubsubTopic), |
| 153 » }, nil |
| 16 } | 154 } |
| OLD | NEW |