Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3052)

Unified Diff: appengine/logdog/coordinator/endpoints/registration/registerPrefix.go

Issue 1967273002: LogDog: Implement RegisterPrefix RPC. (Closed) Base URL: https://github.com/luci/luci-go@logdog-butler-register-coordinator-endpoint
Patch Set: Add missing test. Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/logdog/coordinator/endpoints/registration/registerPrefix.go
diff --git a/appengine/logdog/coordinator/endpoints/registration/registerPrefix.go b/appengine/logdog/coordinator/endpoints/registration/registerPrefix.go
index 41b02d9e5127a0debab8a45af85adc4824803e77..40f090aea4dad22e2dd9f8efdae1b6effd364997 100644
--- a/appengine/logdog/coordinator/endpoints/registration/registerPrefix.go
+++ b/appengine/logdog/coordinator/endpoints/registration/registerPrefix.go
@@ -5,12 +5,155 @@
package registration
import (
+ ds "github.com/luci/gae/service/datastore"
+ "github.com/luci/luci-go/appengine/logdog/coordinator"
+ "github.com/luci/luci-go/appengine/logdog/coordinator/hierarchy"
"github.com/luci/luci-go/common/api/logdog_coordinator/registration/v1"
+ "github.com/luci/luci-go/common/clock"
+ "github.com/luci/luci-go/common/cryptorand"
+ "github.com/luci/luci-go/common/gcloud/pubsub"
"github.com/luci/luci-go/common/grpcutil"
+ "github.com/luci/luci-go/common/logdog/types"
+ log "github.com/luci/luci-go/common/logging"
+
"golang.org/x/net/context"
+ "google.golang.org/grpc/codes"
nodir 2016/05/17 02:45:59 this block should go before the luci one
dnj (Google) 2016/05/17 14:44:33 Done.
)
func (s *server) RegisterPrefix(c context.Context, req *logdog.RegisterPrefixRequest) (*logdog.RegisterPrefixResponse, error) {
- // TODO(dnj): Actually implement this endpoint.
- return nil, grpcutil.Unimplemented
+ log.Fields{
+ "project": req.Project,
+ "prefix": req.Prefix,
+ "source": req.SourceInfo,
+ "expiration": req.Expiration.Duration(),
+ }.Debugf(c, "Registering log prefix.")
+
+ // Confirm that the Prefix is a valid stream name.
+ prefix := types.StreamName(req.Prefix)
+ if err := prefix.Validate(); err != nil {
+ log.WithError(err).Errorf(c, "Invalid prefix.")
nodir 2016/05/17 02:45:58 not sure about this. this is a user error, not a s
dnj (Google) 2016/05/17 14:44:33 Done.
+ return nil, grpcutil.Errf(codes.InvalidArgument, "invalid prefix")
+ }
+
+ // Has the prefix already been registered?
+ pfx := &coordinator.LogPrefix{ID: coordinator.LogPrefixID(prefix)}
+
+ // Check for existing prefix registration (non-transactional).
+ di := ds.Get(c)
+ switch exists, err := di.Exists(di.KeyForObj(pfx)); {
+ case err != nil:
+ log.WithError(err).Errorf(c, "Failed to check for existing prefix (non-transactional).")
+ return nil, grpcutil.Internal
+
+ case exists:
+ log.Errorf(c, "The prefix is already registered (non-transactional).")
+ return nil, grpcutil.AlreadyExists
+ }
+
+ // Load our service and project configurations.
+ svcs := coordinator.GetServices(c)
+ cfg, err := svcs.Config(c)
+ if err != nil {
+ log.WithError(err).Errorf(c, "Failed to load service configuration.")
+ return nil, grpcutil.Internal
+ }
+
+ // Determine our Pub/Sub topic.
+ var pubsubTopic pubsub.Topic
+ switch t := cfg.Transport; t {
nodir 2016/05/17 02:45:58 switch is overkill here. Also you would have less
dnj (Google) 2016/05/17 14:44:33 Not sure that I agree here. Switch is a perfectly
+ case nil:
+ log.Errorf(c, "Missing transport configuration.")
+ return nil, grpcutil.Internal
+
+ default:
+ switch tps := t.GetPubsub(); tps {
nodir 2016/05/17 02:45:58 same here
+ case nil:
+ log.Errorf(c, "Missing transport Pub/Sub configuration.")
+ return nil, grpcutil.Internal
+ default:
+ pubsubTopic = pubsub.NewTopic(tps.Project, tps.Topic)
+ }
+ }
+ if err := pubsubTopic.Validate(); err != nil {
+ log.Fields{
+ log.ErrorKey: err,
+ "topic": pubsubTopic,
+ }.Errorf(c, "Invalid transport Pub/Sub topic.")
+ return nil, grpcutil.Internal
+ }
+
+ // Best effort: register the stream prefix hierarchy components, including the
+ // separator.
+ //
+ // Determine which hierarchy components we need to add.
+ comps := hierarchy.Components(prefix.AsPathPrefix(""))
+ if comps, err = hierarchy.Missing(di, comps); err != nil {
+ log.WithError(err).Warningf(c, "Failed to probe for missing hierarchy components.")
nodir 2016/05/17 02:45:58 now this is an error we want to notice as opposed
dnj (Google) 2016/05/17 14:44:33 This is actually not a big deal, since hierarchy c
+ }
+
+ // Before we go into transaction, try and put these entries. This should not
+ // be contested, since components don't share an entity root.
+ //
+ // If this fails, that's okay; we'll handle this when the stream gets
+ // registered.
+ if err := hierarchy.PutMulti(di, comps); err != nil {
nodir 2016/05/17 02:45:59 in hierarchy.go you need to update comment "This e
dnj (Google) 2016/05/17 14:44:33 Done.
+ log.WithError(err).Infof(c, "Failed to add missing hierarchy components.")
+ }
+
+ // The prefix doesn't appear to be registered. Prepare to transactionally
+ // register it.
+ now := clock.Now(c).UTC()
+
+ // Generate a prefix secret.
+ secret := make(types.PrefixSecret, types.PrefixSecretLength)
+ secretSize, err := cryptorand.Read(c, []byte(secret))
nodir 2016/05/17 02:45:58 secretSize is unnecessary because according to htt
dnj (Google) 2016/05/17 14:44:33 Done.
+ if err != nil {
+ log.WithError(err).Errorf(c, "Failed to generate prefix secret.")
+ return nil, grpcutil.Internal
+ }
+
+ secret = secret[:secretSize] // Should be no-op.
nodir 2016/05/17 02:45:58 unnecessary
dnj (Google) 2016/05/17 14:44:33 Done.
+ if err := secret.Validate(); err != nil {
nodir 2016/05/17 02:45:59 will always return nil because it checks only leng
dnj (Google) 2016/05/17 14:44:32 Correct, but I am going to do it anyway, because i
+ log.WithError(err).Errorf(c, "Generated invalid prefix secret.")
+ return nil, grpcutil.Internal
+ }
+
+ // Transactionally register the prefix.
+ err = di.RunInTransaction(func(c context.Context) error {
+ di := ds.Get(c)
+
+ // Check if this Prefix exists (transactional).
+ switch exists, err := di.Exists(di.KeyForObj(pfx)); {
+ case err != nil:
+ log.WithError(err).Errorf(c, "Failed to check for existing prefix (transactional).")
+ return grpcutil.Internal
+
+ case exists:
+ log.Errorf(c, "The prefix is already registered (transactional).")
+ return grpcutil.AlreadyExists
+ }
+
+ // The Prefix is not registered, so let's register it.
+ pfx.Created = now
+ pfx.Prefix = string(prefix)
+ pfx.Source = req.SourceInfo
+ pfx.Secret = []byte(secret)
+
+ if err := di.Put(pfx); err != nil {
+ log.WithError(err).Errorf(c, "Failed to register prefix.")
+ return grpcutil.Internal
+ }
+
+ log.Infof(c, "The prefix was successfully registered.")
+ return nil
+ }, nil)
+ if err != nil {
+ log.WithError(err).Errorf(c, "Failed to register prefix (transactional).")
nodir 2016/05/17 02:45:58 some cases of errors are already logged inside the
dnj (Google) 2016/05/17 14:44:33 Yes, but that's fine.
+ return nil, err
+ }
+
+ return &logdog.RegisterPrefixResponse{
+ Secret: []byte(secret),
nodir 2016/05/17 02:45:58 types.PrefixSecret has a comment "This is a Base6
dnj (Google) 2016/05/17 14:44:33 Done.
+ LogBundleTopic: string(pubsubTopic),
+ }, nil
}

Powered by Google App Engine
This is Rietveld 408576698