Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1780)

Unified Diff: appengine/logdog/coordinator/endpoints/registration/registerPrefix.go

Issue 1967273002: LogDog: Implement RegisterPrefix RPC. (Closed) Base URL: https://github.com/luci/luci-go@logdog-butler-register-coordinator-endpoint
Patch Set: Updated patchset dependency Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/logdog/coordinator/endpoints/registration/registerPrefix.go
diff --git a/appengine/logdog/coordinator/endpoints/registration/registerPrefix.go b/appengine/logdog/coordinator/endpoints/registration/registerPrefix.go
index 41b02d9e5127a0debab8a45af85adc4824803e77..775d91d77ab8eb5db725a4493f7386b46e2fd50f 100644
--- a/appengine/logdog/coordinator/endpoints/registration/registerPrefix.go
+++ b/appengine/logdog/coordinator/endpoints/registration/registerPrefix.go
@@ -5,12 +5,150 @@
package registration
import (
+ ds "github.com/luci/gae/service/datastore"
+ "golang.org/x/net/context"
+ "google.golang.org/grpc/codes"
+
+ "github.com/luci/luci-go/appengine/logdog/coordinator"
+ "github.com/luci/luci-go/appengine/logdog/coordinator/hierarchy"
"github.com/luci/luci-go/common/api/logdog_coordinator/registration/v1"
+ "github.com/luci/luci-go/common/clock"
+ "github.com/luci/luci-go/common/cryptorand"
+ "github.com/luci/luci-go/common/gcloud/pubsub"
"github.com/luci/luci-go/common/grpcutil"
- "golang.org/x/net/context"
+ "github.com/luci/luci-go/common/logdog/types"
+ log "github.com/luci/luci-go/common/logging"
)
func (s *server) RegisterPrefix(c context.Context, req *logdog.RegisterPrefixRequest) (*logdog.RegisterPrefixResponse, error) {
- // TODO(dnj): Actually implement this endpoint.
- return nil, grpcutil.Unimplemented
+ log.Fields{
+ "project": req.Project,
+ "prefix": req.Prefix,
+ "source": req.SourceInfo,
+ "expiration": req.Expiration.Duration(),
+ }.Debugf(c, "Registering log prefix.")
+
+ // Confirm that the Prefix is a valid stream name.
+ prefix := types.StreamName(req.Prefix)
+ if err := prefix.Validate(); err != nil {
+ log.WithError(err).Warningf(c, "Invalid prefix.")
+ return nil, grpcutil.Errf(codes.InvalidArgument, "invalid prefix")
+ }
+
+ // Has the prefix already been registered?
+ pfx := &coordinator.LogPrefix{ID: coordinator.LogPrefixID(prefix)}
+
+ // Check for existing prefix registration (non-transactional).
+ di := ds.Get(c)
+ switch exists, err := di.Exists(di.KeyForObj(pfx)); {
+ case err != nil:
+ log.WithError(err).Errorf(c, "Failed to check for existing prefix (non-transactional).")
+ return nil, grpcutil.Internal
+
+ case exists:
+ log.Errorf(c, "The prefix is already registered (non-transactional).")
+ return nil, grpcutil.AlreadyExists
+ }
+
+ // Load our service and project configurations.
+ svcs := coordinator.GetServices(c)
+ cfg, err := svcs.Config(c)
+ if err != nil {
+ log.WithError(err).Errorf(c, "Failed to load service configuration.")
+ return nil, grpcutil.Internal
+ }
+
+ // Determine our Pub/Sub topic.
+ cfgTransport := cfg.Transport
+ if cfgTransport == nil {
+ log.Errorf(c, "Missing transport configuration.")
+ return nil, grpcutil.Internal
+ }
+
+ cfgTransportPubSub := cfgTransport.GetPubsub()
+ if cfgTransportPubSub == nil {
+ log.Errorf(c, "Missing transport Pub/Sub configuration.")
+ return nil, grpcutil.Internal
+ }
+
+ pubsubTopic := pubsub.NewTopic(cfgTransportPubSub.Project, cfgTransportPubSub.Topic)
+ if err := pubsubTopic.Validate(); err != nil {
+ log.Fields{
+ log.ErrorKey: err,
+ "topic": pubsubTopic,
+ }.Errorf(c, "Invalid transport Pub/Sub topic.")
+ return nil, grpcutil.Internal
+ }
+
+ // Best effort: register the stream prefix hierarchy components, including the
+ // separator.
+ //
+ // Determine which hierarchy components we need to add.
+ comps := hierarchy.Components(prefix.AsPathPrefix(""))
+ if comps, err = hierarchy.Missing(di, comps); err != nil {
+ log.WithError(err).Warningf(c, "Failed to probe for missing hierarchy components.")
+ }
+
+ // Before we go into transaction, try and put these entries. This should not
+ // be contested, since components don't share an entity root.
+ //
+ // If this fails, that's okay; we'll handle this when the stream gets
+ // registered.
+ if err := hierarchy.PutMulti(di, comps); err != nil {
+ log.WithError(err).Infof(c, "Failed to add missing hierarchy components.")
+ }
+
+ // The prefix doesn't appear to be registered. Prepare to transactionally
+ // register it.
+ now := clock.Now(c).UTC()
+
+ // Generate a prefix secret.
+ secret := make(types.PrefixSecret, types.PrefixSecretLength)
+ if _, err := cryptorand.Read(c, []byte(secret)); err != nil {
+ log.WithError(err).Errorf(c, "Failed to generate prefix secret.")
+ return nil, grpcutil.Internal
+ }
+ if err := secret.Validate(); err != nil {
+ log.WithError(err).Errorf(c, "Generated invalid prefix secret.")
+ return nil, grpcutil.Internal
+ }
+
+ // Transactionally register the prefix.
+ err = di.RunInTransaction(func(c context.Context) error {
+ di := ds.Get(c)
+
+ // Check if this Prefix exists (transactional).
+ switch exists, err := di.Exists(di.KeyForObj(pfx)); {
+ case err != nil:
+ log.WithError(err).Errorf(c, "Failed to check for existing prefix (transactional).")
+ return grpcutil.Internal
+
+ case exists:
+ log.Errorf(c, "The prefix is already registered (transactional).")
+ return grpcutil.AlreadyExists
+ }
+
+ // The Prefix is not registered, so let's register it.
+ pfx.Created = now
+ pfx.Prefix = string(prefix)
+ pfx.Source = req.SourceInfo
+ pfx.Secret = []byte(secret)
+
+ if err := di.Put(pfx); err != nil {
+ log.WithError(err).Errorf(c, "Failed to register prefix.")
+ return grpcutil.Internal
+ }
+
+ log.Infof(c, "The prefix was successfully registered.")
+ return nil
+ }, nil)
+ if err != nil {
+ log.WithError(err).Errorf(c, "Failed to register prefix (transactional).")
+ return nil, err
+ }
+
+ return &logdog.RegisterPrefixResponse{
+ Secret: []byte(secret),
+ LogBundleTopic: string(pubsubTopic),
+ }, nil
}

Powered by Google App Engine
This is Rietveld 408576698