| OLD | NEW |
| (Empty) |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package services | |
| 6 | |
| 7 import ( | |
| 8 "crypto/subtle" | |
| 9 "errors" | |
| 10 | |
| 11 ds "github.com/luci/gae/service/datastore" | |
| 12 "github.com/luci/luci-go/appengine/logdog/coordinator" | |
| 13 "github.com/luci/luci-go/common/clock" | |
| 14 "github.com/luci/luci-go/common/grpcutil" | |
| 15 "github.com/luci/luci-go/common/logdog/types" | |
| 16 log "github.com/luci/luci-go/common/logging" | |
| 17 "golang.org/x/net/context" | |
| 18 "google.golang.org/grpc/codes" | |
| 19 ) | |
| 20 | |
| 21 // logStreamPrefix is a placeholder for a future "register prefix" RPC call. | |
| 22 // | |
| 23 // It represents an application's intent to register a log stream prefix. | |
| 24 type logStreamPrefix struct { | |
| 25 prefix string | |
| 26 secret []byte | |
| 27 } | |
| 28 | |
| 29 // registerPrefix registers a log stream's Prefix value. | |
| 30 // | |
| 31 // This function behaves like an RPC call, returning a gRPC error code on | |
| 32 // failure. This is because it will eventually be a separate RPC call when | |
| 33 // Butler prefix registration is implemented. | |
| 34 func registerPrefix(c context.Context, lsp *logStreamPrefix) (*coordinator.LogPr
efix, error) { | |
| 35 log.Fields{ | |
| 36 "prefix": lsp.prefix, | |
| 37 }.Debugf(c, "Registering log prefix.") | |
| 38 | |
| 39 prefix := types.StreamName(lsp.prefix) | |
| 40 if err := prefix.Validate(); err != nil { | |
| 41 return nil, grpcutil.Errf(codes.InvalidArgument, "Invalid prefix
(%s): %s", lsp.prefix, err) | |
| 42 } | |
| 43 | |
| 44 secret := types.PrefixSecret(lsp.secret) | |
| 45 if err := secret.Validate(); err != nil { | |
| 46 return nil, grpcutil.Errf(codes.InvalidArgument, "Invalid prefix
secret: %s", err) | |
| 47 } | |
| 48 | |
| 49 pfx := &coordinator.LogPrefix{ID: coordinator.LogPrefixID(prefix)} | |
| 50 | |
| 51 // Check for existing prefix registration (non-transactional). | |
| 52 di := ds.Get(c) | |
| 53 switch err := checkRegisterPrefix(ds.Get(c), lsp, pfx); err { | |
| 54 case nil: | |
| 55 // The prefix is already compatibly registered. | |
| 56 return pfx, nil | |
| 57 | |
| 58 case ds.ErrNoSuchEntity: | |
| 59 // The prefix does not exist. Proceed with transactional registr
ation. | |
| 60 break | |
| 61 | |
| 62 default: | |
| 63 log.WithError(err).Errorf(c, "Failed to register prefix (non-tra
nsactional).") | |
| 64 return nil, err | |
| 65 } | |
| 66 | |
| 67 // The Prefix isn't registered. Register it transactionally. | |
| 68 now := clock.Now(c).UTC() | |
| 69 err := di.RunInTransaction(func(c context.Context) error { | |
| 70 di := ds.Get(c) | |
| 71 | |
| 72 // Check if this Prefix exists (transactional). | |
| 73 switch err := checkRegisterPrefix(di, lsp, pfx); err { | |
| 74 case nil: | |
| 75 // The prefix is already compatibly registered. | |
| 76 log.Debugf(c, "Prefix is already registered.") | |
| 77 return nil | |
| 78 | |
| 79 case ds.ErrNoSuchEntity: | |
| 80 // The Prefix is not registered, so let's register it. | |
| 81 pfx.Prefix = string(prefix) | |
| 82 pfx.Secret = []byte(secret) | |
| 83 pfx.Created = now | |
| 84 | |
| 85 if err := di.Put(pfx); err != nil { | |
| 86 log.WithError(err).Errorf(c, "Failed to register
prefix.") | |
| 87 return grpcutil.Internal | |
| 88 } | |
| 89 log.Infof(c, "The prefix was successfully registered.") | |
| 90 return nil | |
| 91 | |
| 92 default: | |
| 93 // Unexpected error. | |
| 94 log.WithError(err).Errorf(c, "Failed to check prefix reg
istration.") | |
| 95 return err | |
| 96 } | |
| 97 }, nil) | |
| 98 if err != nil { | |
| 99 log.WithError(err).Errorf(c, "Failed to register prefix (transac
tional).") | |
| 100 return nil, err | |
| 101 } | |
| 102 | |
| 103 return pfx, nil | |
| 104 } | |
| 105 | |
| 106 // checkRegisterPrefix is our registration logic. It will be executed once | |
| 107 // non-transactionally and (if registration is needed) again transactionally. | |
| 108 // | |
| 109 // If the prefix is already compatibly registered, nil will be returned. If no | |
| 110 // such entity exists, ds.ErrNoSuchEntity will be returned. | |
| 111 // | |
| 112 // gRPC errors returned by this method will be forwarded to the caller. | |
| 113 func checkRegisterPrefix(di ds.Interface, lsp *logStreamPrefix, pfx *coordinator
.LogPrefix) error { | |
| 114 switch err := di.Get(pfx); err { | |
| 115 case nil: | |
| 116 // Prefix registered, does it match? If so, this is an idempoten
t operation. | |
| 117 if err := matchesLogPrefix(lsp, pfx); err != nil { | |
| 118 return grpcutil.Errf(codes.AlreadyExists, "Log prefix is
already registered: %v", err) | |
| 119 } | |
| 120 | |
| 121 return nil | |
| 122 | |
| 123 case ds.ErrNoSuchEntity: | |
| 124 return err | |
| 125 | |
| 126 default: | |
| 127 return grpcutil.Internal | |
| 128 } | |
| 129 } | |
| 130 | |
| 131 func matchesLogPrefix(lsp *logStreamPrefix, pfx *coordinator.LogPrefix) error { | |
| 132 if lsp.prefix != pfx.Prefix { | |
| 133 return errors.New("prefixes do not match") | |
| 134 } | |
| 135 if subtle.ConstantTimeCompare(lsp.secret, pfx.Secret) != 1 { | |
| 136 return errors.New("secrets do not match") | |
| 137 } | |
| 138 return nil | |
| 139 } | |
| OLD | NEW |