| Index: appengine/logdog/coordinator/endpoints/services/registerStream.go
|
| diff --git a/appengine/logdog/coordinator/endpoints/services/registerStream.go b/appengine/logdog/coordinator/endpoints/services/registerStream.go
|
| index 3b015185d6ec0450b1e3e5731202cc13294ed132..b8261765b644d10a6daa66a00a8e34a63e615275 100644
|
| --- a/appengine/logdog/coordinator/endpoints/services/registerStream.go
|
| +++ b/appengine/logdog/coordinator/endpoints/services/registerStream.go
|
| @@ -5,6 +5,7 @@
|
| package services
|
|
|
| import (
|
| + "crypto/subtle"
|
| "time"
|
|
|
| "github.com/golang/protobuf/proto"
|
| @@ -92,33 +93,35 @@ func (s *server) RegisterStream(c context.Context, req *logdog.RegisterStreamReq
|
| // Determine the archival expiration.
|
| archiveDelayMax := endpoints.MinDuration(cfg.Coordinator.ArchiveDelayMax, pcfg.MaxStreamAge)
|
|
|
| - // Register our Prefix.
|
| - //
|
| - // This will also verify that our request secret matches the registered one,
|
| - // if one is registered.
|
| - //
|
| - // Note: This step will not be necessary once a "register prefix" RPC call is
|
| - // implemented.
|
| - lsp := logStreamPrefix{
|
| - prefix: string(prefix),
|
| - secret: req.Secret,
|
| + // Load our Prefix. It must be registered.
|
| + di := ds.Get(c)
|
| +
|
| + pfx := &coordinator.LogPrefix{ID: coordinator.LogPrefixID(prefix)}
|
| + if err := di.Get(pfx); err != nil {
|
| + log.Fields{
|
| + log.ErrorKey: err,
|
| + "id": pfx.ID,
|
| + "prefix": prefix,
|
| + }.Errorf(c, "Failed to load log stream prefix.")
|
| + if err == ds.ErrNoSuchEntity {
|
| + return nil, grpcutil.Errf(codes.FailedPrecondition, "prefix is not registered")
|
| + }
|
| + return nil, grpcutil.Internal
|
| }
|
| - pfx, err := registerPrefix(c, &lsp)
|
| - if err != nil {
|
| - log.Errorf(c, "Failed to register/validate log stream prefix.")
|
| - return nil, err
|
| +
|
| + // The prefix secret must match the request secret. If it does, we know this
|
| + // is a legitimate registration attempt.
|
| + if subtle.ConstantTimeCompare(pfx.Secret, req.Secret) != 1 {
|
| + log.Errorf(c, "Request secret does not match prefix secret.")
|
| + return nil, grpcutil.Errf(codes.InvalidArgument, "invalid secret")
|
| }
|
| - log.Fields{
|
| - "prefix": pfx.Prefix,
|
| - "prefixCreated": pfx.Created,
|
| - }.Debugf(c, "Loaded log stream prefix.")
|
|
|
| - di := ds.Get(c)
|
| + // Check for registration, and that the prefix did not expire
|
| + // (non-transactional).
|
| ls := &coordinator.LogStream{ID: coordinator.LogStreamID(path)}
|
| lst := ls.State(di)
|
|
|
| - // Check for registration (non-transactional).
|
| - if err := checkRegisterStream(di, ls, lst); err != nil {
|
| + if err := di.GetMulti([]interface{}{ls, lst}); err != nil {
|
| if !anyNoSuchEntity(err) {
|
| log.WithError(err).Errorf(c, "Failed to check for log stream.")
|
| return nil, err
|
| @@ -136,7 +139,7 @@ func (s *server) RegisterStream(c context.Context, req *logdog.RegisterStreamReq
|
| // Before we go into transaction, try and put these entries. This should not
|
| // be contested, since components don't share an entity root.
|
| if err := hierarchy.PutMulti(di, comps); err != nil {
|
| - log.WithError(err).Infof(c, "Failed to add missing hierarchy components.")
|
| + log.WithError(err).Errorf(c, "Failed to add missing hierarchy components.")
|
| return nil, grpcutil.Internal
|
| }
|
|
|
| @@ -163,15 +166,6 @@ func (s *server) RegisterStream(c context.Context, req *logdog.RegisterStreamReq
|
| }, nil
|
| }
|
|
|
| -func checkRegisterStream(di ds.Interface, ls *coordinator.LogStream, lst *coordinator.LogStreamState) error {
|
| - // Load the existing log stream state.
|
| - //
|
| - // We have already verified that the secrets match when the Prefix was
|
| - // registered. This will have to verify secrets when prefix registration
|
| - // moves its own RPC.
|
| - return di.GetMulti([]interface{}{ls, lst})
|
| -}
|
| -
|
| type registerStreamMutation struct {
|
| *logdog.RegisterStreamRequest
|
|
|
| @@ -185,8 +179,8 @@ type registerStreamMutation struct {
|
| func (m *registerStreamMutation) RollForward(c context.Context) ([]tumble.Mutation, error) {
|
| di := ds.Get(c)
|
|
|
| - // Check if our stream is registered (transactional).
|
| - if err := checkRegisterStream(di, m.ls, m.lst); err != nil {
|
| + // Load our state and stream (transactional).
|
| + if err := di.GetMulti([]interface{}{m.ls, m.lst}); err != nil {
|
| if !anyNoSuchEntity(err) {
|
| log.WithError(err).Errorf(c, "Failed to check for stream registration (transactional).")
|
| return nil, err
|
|
|