Chromium Code Reviews| Index: appengine/logdog/coordinator/endpoints/services/registerStream.go |
| diff --git a/appengine/logdog/coordinator/endpoints/services/registerStream.go b/appengine/logdog/coordinator/endpoints/services/registerStream.go |
| index 3c3bc808fdd74c973c85dc8adcba663bcde080f3..2b57fb17b991e6e521107fba450584b8288a9540 100644 |
| --- a/appengine/logdog/coordinator/endpoints/services/registerStream.go |
| +++ b/appengine/logdog/coordinator/endpoints/services/registerStream.go |
| @@ -5,6 +5,7 @@ |
| package services |
| import ( |
| + "crypto/subtle" |
| "time" |
| "github.com/golang/protobuf/proto" |
| @@ -102,33 +103,34 @@ func (s *server) RegisterStream(c context.Context, req *logdog.RegisterStreamReq |
| // Determine the archival expiration. |
| archiveDelayMax := resolveArchiveDelay(cfg.Coordinator, pcfg) |
| - // Register our Prefix. |
| - // |
| - // This will also verify that our request secret matches the registered one, |
| - // if one is registered. |
| - // |
| - // Note: This step will not be necessary once a "register prefix" RPC call is |
| - // implemented. |
| - lsp := logStreamPrefix{ |
| - prefix: string(prefix), |
| - secret: req.Secret, |
| + // Load our Prefix. It must be registered. |
| + di := ds.Get(c) |
| + |
| + pfx := &coordinator.LogPrefix{ID: coordinator.LogPrefixID(prefix)} |
| + if err := di.Get(pfx); err != nil { |
| + log.Fields{ |
| + log.ErrorKey: err, |
| + "id": pfx.ID, |
| + "prefix": prefix, |
| + }.Errorf(c, "Failed to load log stream prefix.") |
| + if err == ds.ErrNoSuchEntity { |
| + return nil, grpcutil.Errf(codes.FailedPrecondition, "prefix is not registered") |
|
nodir
2016/05/17 02:45:59
include the prefix in the error msg?
dnj (Google)
2016/05/17 14:44:33
I'm choosing not to. The user knows what they've s
|
| + } |
| + return nil, grpcutil.Internal |
| } |
| - pfx, err := registerPrefix(c, &lsp) |
| - if err != nil { |
| - log.Errorf(c, "Failed to register/validate log stream prefix.") |
| - return nil, err |
| + |
| + // The prefix secret much match the request secret. If it does, we know this |
|
nodir
2016/05/17 02:45:59
s/much/must
dnj (Google)
2016/05/17 14:44:33
Done.
|
| + // is a legitimate registration attempt. |
| + if subtle.ConstantTimeCompare(pfx.Secret, req.Secret) != 1 { |
| + log.Errorf(c, "Request secret does not match prefix secret.") |
| + return nil, grpcutil.Errf(codes.InvalidArgument, "invalid secret") |
| } |
| - log.Fields{ |
| - "prefix": pfx.Prefix, |
| - "prefixCreated": pfx.Created, |
| - }.Debugf(c, "Loaded log stream prefix.") |
|
nodir
2016/05/17 02:45:59
check that prefix did not expire
dnj (Google)
2016/05/17 14:44:33
Done.
nodir
2016/05/17 16:19:47
I don't see where is it done. You've added two pre
dnj (Google)
2016/05/17 16:32:54
Oh sorry meant to link the other CL: https://coder
|
| - di := ds.Get(c) |
| + // Check for registration (non-transactional). |
| ls := &coordinator.LogStream{ID: coordinator.LogStreamID(path)} |
| lst := ls.State(di) |
| - // Check for registration (non-transactional). |
| - if err := checkRegisterStream(di, ls, lst); err != nil { |
| + if err := di.GetMulti([]interface{}{ls, lst}); err != nil { |
|
nodir
2016/05/17 02:45:59
yeah.. GetMulti should have been made variadic. I
dnj (Google)
2016/05/17 14:44:33
Yeah we're actually going to make "Get" variadic f
|
| if !anyNoSuchEntity(err) { |
| log.WithError(err).Errorf(c, "Failed to check for log stream.") |
| return nil, err |
| @@ -146,7 +148,7 @@ func (s *server) RegisterStream(c context.Context, req *logdog.RegisterStreamReq |
| // Before we go into transaction, try and put these entries. This should not |
| // be contested, since components don't share an entity root. |
| if err := hierarchy.PutMulti(di, comps); err != nil { |
| - log.WithError(err).Infof(c, "Failed to add missing hierarchy components.") |
| + log.WithError(err).Errorf(c, "Failed to add missing hierarchy components.") |
| return nil, grpcutil.Internal |
| } |
| @@ -173,15 +175,6 @@ func (s *server) RegisterStream(c context.Context, req *logdog.RegisterStreamReq |
| }, nil |
| } |
| -func checkRegisterStream(di ds.Interface, ls *coordinator.LogStream, lst *coordinator.LogStreamState) error { |
| - // Load the existing log stream state. |
| - // |
| - // We have already verified that the secrets match when the Prefix was |
| - // registered. This will have to verify secrets when prefix registration |
| - // moves its own RPC. |
| - return di.GetMulti([]interface{}{ls, lst}) |
| -} |
| - |
| type registerStreamMutation struct { |
| *logdog.RegisterStreamRequest |
| @@ -195,8 +188,8 @@ type registerStreamMutation struct { |
| func (m *registerStreamMutation) RollForward(c context.Context) ([]tumble.Mutation, error) { |
| di := ds.Get(c) |
| - // Check if our stream is registered (transactional). |
| - if err := checkRegisterStream(di, m.ls, m.lst); err != nil { |
| + // Load our state and stream (transactional). |
| + if err := di.GetMulti([]interface{}{m.ls, m.lst}); err != nil { |
| if !anyNoSuchEntity(err) { |
| log.WithError(err).Errorf(c, "Failed to check for stream registration (transactional).") |
| return nil, err |