Chromium Code Reviews| Index: appengine/logdog/coordinator/endpoints/registration/registerPrefix.go |
| diff --git a/appengine/logdog/coordinator/endpoints/registration/registerPrefix.go b/appengine/logdog/coordinator/endpoints/registration/registerPrefix.go |
| index 41b02d9e5127a0debab8a45af85adc4824803e77..40f090aea4dad22e2dd9f8efdae1b6effd364997 100644 |
| --- a/appengine/logdog/coordinator/endpoints/registration/registerPrefix.go |
| +++ b/appengine/logdog/coordinator/endpoints/registration/registerPrefix.go |
| @@ -5,12 +5,155 @@ |
| package registration |
| import ( |
| + ds "github.com/luci/gae/service/datastore" |
| + "github.com/luci/luci-go/appengine/logdog/coordinator" |
| + "github.com/luci/luci-go/appengine/logdog/coordinator/hierarchy" |
| "github.com/luci/luci-go/common/api/logdog_coordinator/registration/v1" |
| + "github.com/luci/luci-go/common/clock" |
| + "github.com/luci/luci-go/common/cryptorand" |
| + "github.com/luci/luci-go/common/gcloud/pubsub" |
| "github.com/luci/luci-go/common/grpcutil" |
| + "github.com/luci/luci-go/common/logdog/types" |
| + log "github.com/luci/luci-go/common/logging" |
| + |
| "golang.org/x/net/context" |
| + "google.golang.org/grpc/codes" |
|
nodir
2016/05/17 02:45:59
this block should go before the luci one
dnj (Google)
2016/05/17 14:44:33
Done.
|
| ) |
| func (s *server) RegisterPrefix(c context.Context, req *logdog.RegisterPrefixRequest) (*logdog.RegisterPrefixResponse, error) { |
| - // TODO(dnj): Actually implement this endpoint. |
| - return nil, grpcutil.Unimplemented |
| + log.Fields{ |
| + "project": req.Project, |
| + "prefix": req.Prefix, |
| + "source": req.SourceInfo, |
| + "expiration": req.Expiration.Duration(), |
| + }.Debugf(c, "Registering log prefix.") |
| + |
| + // Confirm that the Prefix is a valid stream name. |
| + prefix := types.StreamName(req.Prefix) |
| + if err := prefix.Validate(); err != nil { |
| + log.WithError(err).Errorf(c, "Invalid prefix.") |
|
nodir
2016/05/17 02:45:58
not sure about this. this is a user error, not a s
dnj (Google)
2016/05/17 14:44:33
Done.
|
| + return nil, grpcutil.Errf(codes.InvalidArgument, "invalid prefix") |
| + } |
| + |
| + // Has the prefix already been registered? |
| + pfx := &coordinator.LogPrefix{ID: coordinator.LogPrefixID(prefix)} |
| + |
| + // Check for existing prefix registration (non-transactional). |
| + di := ds.Get(c) |
| + switch exists, err := di.Exists(di.KeyForObj(pfx)); { |
| + case err != nil: |
| + log.WithError(err).Errorf(c, "Failed to check for existing prefix (non-transactional).") |
| + return nil, grpcutil.Internal |
| + |
| + case exists: |
| + log.Errorf(c, "The prefix is already registered (non-transactional).") |
| + return nil, grpcutil.AlreadyExists |
| + } |
| + |
| + // Load our service and project configurations. |
| + svcs := coordinator.GetServices(c) |
| + cfg, err := svcs.Config(c) |
| + if err != nil { |
| + log.WithError(err).Errorf(c, "Failed to load service configuration.") |
| + return nil, grpcutil.Internal |
| + } |
| + |
| + // Determine our Pub/Sub topic. |
| + var pubsubTopic pubsub.Topic |
| + switch t := cfg.Transport; t { |
|
nodir
2016/05/17 02:45:58
switch is overkill here. Also you would have less
dnj (Google)
2016/05/17 14:44:33
Not sure that I agree here. Switch is a perfectly
|
| + case nil: |
| + log.Errorf(c, "Missing transport configuration.") |
| + return nil, grpcutil.Internal |
| + |
| + default: |
| + switch tps := t.GetPubsub(); tps { |
|
nodir
2016/05/17 02:45:58
same here
|
| + case nil: |
| + log.Errorf(c, "Missing transport Pub/Sub configuration.") |
| + return nil, grpcutil.Internal |
| + default: |
| + pubsubTopic = pubsub.NewTopic(tps.Project, tps.Topic) |
| + } |
| + } |
| + if err := pubsubTopic.Validate(); err != nil { |
| + log.Fields{ |
| + log.ErrorKey: err, |
| + "topic": pubsubTopic, |
| + }.Errorf(c, "Invalid transport Pub/Sub topic.") |
| + return nil, grpcutil.Internal |
| + } |
| + |
| + // Best effort: register the stream prefix hierarchy components, including the |
| + // separator. |
| + // |
| + // Determine which hierarchy components we need to add. |
| + comps := hierarchy.Components(prefix.AsPathPrefix("")) |
| + if comps, err = hierarchy.Missing(di, comps); err != nil { |
| + log.WithError(err).Warningf(c, "Failed to probe for missing hierarchy components.") |
|
nodir
2016/05/17 02:45:58
now this is an error we want to notice as opposed
dnj (Google)
2016/05/17 14:44:33
This is actually not a big deal, since hierarchy c
|
| + } |
| + |
| + // Before we go into transaction, try and put these entries. This should not |
| + // be contested, since components don't share an entity root. |
| + // |
| + // If this fails, that's okay; we'll handle this when the stream gets |
| + // registered. |
| + if err := hierarchy.PutMulti(di, comps); err != nil { |
|
nodir
2016/05/17 02:45:59
in hierarchy.go you need to update comment
"This e
dnj (Google)
2016/05/17 14:44:33
Done.
|
| + log.WithError(err).Infof(c, "Failed to add missing hierarchy components.") |
| + } |
| + |
| + // The prefix doesn't appear to be registered. Prepare to transactionally |
| + // register it. |
| + now := clock.Now(c).UTC() |
| + |
| + // Generate a prefix secret. |
| + secret := make(types.PrefixSecret, types.PrefixSecretLength) |
| + secretSize, err := cryptorand.Read(c, []byte(secret)) |
|
nodir
2016/05/17 02:45:58
secretSize is unnecessary because according to htt
dnj (Google)
2016/05/17 14:44:33
Done.
|
| + if err != nil { |
| + log.WithError(err).Errorf(c, "Failed to generate prefix secret.") |
| + return nil, grpcutil.Internal |
| + } |
| + |
| + secret = secret[:secretSize] // Should be no-op. |
|
nodir
2016/05/17 02:45:58
unnecessary
dnj (Google)
2016/05/17 14:44:33
Done.
|
| + if err := secret.Validate(); err != nil { |
|
nodir
2016/05/17 02:45:59
will always return nil because it checks only leng
dnj (Google)
2016/05/17 14:44:32
Correct, but I am going to do it anyway, because i
|
| + log.WithError(err).Errorf(c, "Generated invalid prefix secret.") |
| + return nil, grpcutil.Internal |
| + } |
| + |
| + // Transactionally register the prefix. |
| + err = di.RunInTransaction(func(c context.Context) error { |
| + di := ds.Get(c) |
| + |
| + // Check if this Prefix exists (transactional). |
| + switch exists, err := di.Exists(di.KeyForObj(pfx)); { |
| + case err != nil: |
| + log.WithError(err).Errorf(c, "Failed to check for existing prefix (transactional).") |
| + return grpcutil.Internal |
| + |
| + case exists: |
| + log.Errorf(c, "The prefix is already registered (transactional).") |
| + return grpcutil.AlreadyExists |
| + } |
| + |
| + // The Prefix is not registered, so let's register it. |
| + pfx.Created = now |
| + pfx.Prefix = string(prefix) |
| + pfx.Source = req.SourceInfo |
| + pfx.Secret = []byte(secret) |
| + |
| + if err := di.Put(pfx); err != nil { |
| + log.WithError(err).Errorf(c, "Failed to register prefix.") |
| + return grpcutil.Internal |
| + } |
| + |
| + log.Infof(c, "The prefix was successfully registered.") |
| + return nil |
| + }, nil) |
| + if err != nil { |
| + log.WithError(err).Errorf(c, "Failed to register prefix (transactional).") |
|
nodir
2016/05/17 02:45:58
some cases of errors are already logged inside the
dnj (Google)
2016/05/17 14:44:33
Yes, but that's fine.
|
| + return nil, err |
| + } |
| + |
| + return &logdog.RegisterPrefixResponse{ |
| + Secret: []byte(secret), |
|
nodir
2016/05/17 02:45:58
types.PrefixSecret has a comment
"This is a Base6
dnj (Google)
2016/05/17 14:44:33
Done.
|
| + LogBundleTopic: string(pubsubTopic), |
| + }, nil |
| } |