Chromium Code Reviews| Index: appengine/logdog/coordinator/endpoints/registration/registerPrefix.go |
| diff --git a/appengine/logdog/coordinator/endpoints/registration/registerPrefix.go b/appengine/logdog/coordinator/endpoints/registration/registerPrefix.go |
| index 41b02d9e5127a0debab8a45af85adc4824803e77..775d91d77ab8eb5db725a4493f7386b46e2fd50f 100644 |
| --- a/appengine/logdog/coordinator/endpoints/registration/registerPrefix.go |
| +++ b/appengine/logdog/coordinator/endpoints/registration/registerPrefix.go |
| @@ -5,12 +5,150 @@ |
| package registration |
| import ( |
| + ds "github.com/luci/gae/service/datastore" |
| + "golang.org/x/net/context" |
| + "google.golang.org/grpc/codes" |
|
nodir
2016/05/17 16:19:47
A tool that organizes blocks of imports will proba
|
| + |
| + "github.com/luci/luci-go/appengine/logdog/coordinator" |
| + "github.com/luci/luci-go/appengine/logdog/coordinator/hierarchy" |
| "github.com/luci/luci-go/common/api/logdog_coordinator/registration/v1" |
| + "github.com/luci/luci-go/common/clock" |
| + "github.com/luci/luci-go/common/cryptorand" |
| + "github.com/luci/luci-go/common/gcloud/pubsub" |
| "github.com/luci/luci-go/common/grpcutil" |
| - "golang.org/x/net/context" |
| + "github.com/luci/luci-go/common/logdog/types" |
| + log "github.com/luci/luci-go/common/logging" |
| ) |
| func (s *server) RegisterPrefix(c context.Context, req *logdog.RegisterPrefixRequest) (*logdog.RegisterPrefixResponse, error) { |
| - // TODO(dnj): Actually implement this endpoint. |
| - return nil, grpcutil.Unimplemented |
| + log.Fields{ |
| + "project": req.Project, |
| + "prefix": req.Prefix, |
| + "source": req.SourceInfo, |
| + "expiration": req.Expiration.Duration(), |
| + }.Debugf(c, "Registering log prefix.") |
| + |
| + // Confirm that the Prefix is a valid stream name. |
| + prefix := types.StreamName(req.Prefix) |
| + if err := prefix.Validate(); err != nil { |
| + log.WithError(err).Warningf(c, "Invalid prefix.") |
| + return nil, grpcutil.Errf(codes.InvalidArgument, "invalid prefix") |
| + } |
| + |
| + // Has the prefix already been registered? |
| + pfx := &coordinator.LogPrefix{ID: coordinator.LogPrefixID(prefix)} |
| + |
| + // Check for existing prefix registration (non-transactional). |
| + di := ds.Get(c) |
| + switch exists, err := di.Exists(di.KeyForObj(pfx)); { |
| + case err != nil: |
| + log.WithError(err).Errorf(c, "Failed to check for existing prefix (non-transactional).") |
| + return nil, grpcutil.Internal |
| + |
| + case exists: |
| + log.Errorf(c, "The prefix is already registered (non-transactional).") |
| + return nil, grpcutil.AlreadyExists |
| + } |
| + |
| + // Load our service and project configurations. |
| + svcs := coordinator.GetServices(c) |
| + cfg, err := svcs.Config(c) |
| + if err != nil { |
| + log.WithError(err).Errorf(c, "Failed to load service configuration.") |
| + return nil, grpcutil.Internal |
| + } |
| + |
| + // Determine our Pub/Sub topic. |
| + cfgTransport := cfg.Transport |
|
nodir
2016/05/17 16:19:47
Why do you introduce a new variable for that?
dnj (Google)
2016/05/17 16:32:54
I don't want to dereference the same thing three t
nodir
2016/05/19 00:48:32
without variable performance is about the same and
|
| + if cfgTransport == nil { |
| + log.Errorf(c, "Missing transport configuration.") |
| + return nil, grpcutil.Internal |
| + } |
| + |
| + cfgTransportPubSub := cfgTransport.GetPubsub() |
| + if cfgTransportPubSub == nil { |
| + log.Errorf(c, "Missing transport Pub/Sub configuration.") |
| + return nil, grpcutil.Internal |
| + } |
| + |
| + pubsubTopic := pubsub.NewTopic(cfgTransportPubSub.Project, cfgTransportPubSub.Topic) |
| + if err := pubsubTopic.Validate(); err != nil { |
| + log.Fields{ |
| + log.ErrorKey: err, |
| + "topic": pubsubTopic, |
| + }.Errorf(c, "Invalid transport Pub/Sub topic.") |
| + return nil, grpcutil.Internal |
| + } |
| + |
| + // Best effort: register the stream prefix hierarchy components, including the |
| + // separator. |
| + // |
| + // Determine which hierarchy components we need to add. |
| + comps := hierarchy.Components(prefix.AsPathPrefix("")) |
| + if comps, err = hierarchy.Missing(di, comps); err != nil { |
| + log.WithError(err).Warningf(c, "Failed to probe for missing hierarchy components.") |
| + } |
| + |
| + // Before we go into transaction, try and put these entries. This should not |
| + // be contested, since components don't share an entity root. |
| + // |
| + // If this fails, that's okay; we'll handle this when the stream gets |
| + // registered. |
| + if err := hierarchy.PutMulti(di, comps); err != nil { |
| + log.WithError(err).Infof(c, "Failed to add missing hierarchy components.") |
| + } |
| + |
| + // The prefix doesn't appear to be registered. Prepare to transactionally |
| + // register it. |
| + now := clock.Now(c).UTC() |
| + |
| + // Generate a prefix secret. |
| + secret := make(types.PrefixSecret, types.PrefixSecretLength) |
| + if _, err := cryptorand.Read(c, []byte(secret)); err != nil { |
| + log.WithError(err).Errorf(c, "Failed to generate prefix secret.") |
| + return nil, grpcutil.Internal |
| + } |
| + if err := secret.Validate(); err != nil { |
| + log.WithError(err).Errorf(c, "Generated invalid prefix secret.") |
| + return nil, grpcutil.Internal |
| + } |
| + |
| + // Transactionally register the prefix. |
| + err = di.RunInTransaction(func(c context.Context) error { |
| + di := ds.Get(c) |
| + |
| + // Check if this Prefix exists (transactional). |
| + switch exists, err := di.Exists(di.KeyForObj(pfx)); { |
| + case err != nil: |
| + log.WithError(err).Errorf(c, "Failed to check for existing prefix (transactional).") |
| + return grpcutil.Internal |
| + |
| + case exists: |
| + log.Errorf(c, "The prefix is already registered (transactional).") |
| + return grpcutil.AlreadyExists |
| + } |
| + |
| + // The Prefix is not registered, so let's register it. |
| + pfx.Created = now |
| + pfx.Prefix = string(prefix) |
| + pfx.Source = req.SourceInfo |
| + pfx.Secret = []byte(secret) |
| + |
| + if err := di.Put(pfx); err != nil { |
| + log.WithError(err).Errorf(c, "Failed to register prefix.") |
| + return grpcutil.Internal |
| + } |
| + |
| + log.Infof(c, "The prefix was successfully registered.") |
| + return nil |
| + }, nil) |
| + if err != nil { |
| + log.WithError(err).Errorf(c, "Failed to register prefix (transactional).") |
| + return nil, err |
| + } |
| + |
| + return &logdog.RegisterPrefixResponse{ |
| + Secret: []byte(secret), |
| + LogBundleTopic: string(pubsubTopic), |
| + }, nil |
| } |