Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package services | 5 package services |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "crypto/subtle" | |
| 8 "time" | 9 "time" |
| 9 | 10 |
| 10 "github.com/golang/protobuf/proto" | 11 "github.com/golang/protobuf/proto" |
| 11 ds "github.com/luci/gae/service/datastore" | 12 ds "github.com/luci/gae/service/datastore" |
| 12 "github.com/luci/luci-go/appengine/logdog/coordinator" | 13 "github.com/luci/luci-go/appengine/logdog/coordinator" |
| 13 "github.com/luci/luci-go/appengine/logdog/coordinator/hierarchy" | 14 "github.com/luci/luci-go/appengine/logdog/coordinator/hierarchy" |
| 14 "github.com/luci/luci-go/appengine/logdog/coordinator/mutations" | 15 "github.com/luci/luci-go/appengine/logdog/coordinator/mutations" |
| 15 "github.com/luci/luci-go/appengine/tumble" | 16 "github.com/luci/luci-go/appengine/tumble" |
| 16 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" | 17 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
| 17 "github.com/luci/luci-go/common/clock" | 18 "github.com/luci/luci-go/common/clock" |
| (...skipping 77 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 95 | 96 |
| 96 pcfg, err := coordinator.CurrentProjectConfig(c) | 97 pcfg, err := coordinator.CurrentProjectConfig(c) |
| 97 if err != nil { | 98 if err != nil { |
| 98 log.WithError(err).Errorf(c, "Failed to load current project con figuration.") | 99 log.WithError(err).Errorf(c, "Failed to load current project con figuration.") |
| 99 return nil, grpcutil.Internal | 100 return nil, grpcutil.Internal |
| 100 } | 101 } |
| 101 | 102 |
| 102 // Determine the archival expiration. | 103 // Determine the archival expiration. |
| 103 archiveDelayMax := resolveArchiveDelay(cfg.Coordinator, pcfg) | 104 archiveDelayMax := resolveArchiveDelay(cfg.Coordinator, pcfg) |
| 104 | 105 |
| 105 » // Register our Prefix. | 106 » // Load our Prefix. It must be registered. |
| 106 » // | 107 » di := ds.Get(c) |
| 107 » // This will also verify that our request secret matches the registered one, | 108 |
| 108 » // if one is registered. | 109 » pfx := &coordinator.LogPrefix{ID: coordinator.LogPrefixID(prefix)} |
| 109 » // | 110 » if err := di.Get(pfx); err != nil { |
| 110 » // Note: This step will not be necessary once a "register prefix" RPC ca ll is | 111 » » log.Fields{ |
| 111 » // implemented. | 112 » » » log.ErrorKey: err, |
| 112 » lsp := logStreamPrefix{ | 113 » » » "id": pfx.ID, |
| 113 » » prefix: string(prefix), | 114 » » » "prefix": prefix, |
| 114 » » secret: req.Secret, | 115 » » }.Errorf(c, "Failed to load log stream prefix.") |
| 116 » » if err == ds.ErrNoSuchEntity { | |
| 117 » » » return nil, grpcutil.Errf(codes.FailedPrecondition, "pre fix is not registered") | |
|
nodir
2016/05/17 02:45:59
include the prefix in the error msg?
dnj (Google)
2016/05/17 14:44:33
I'm choosing not to. The user knows what they've s
| |
| 118 » » } | |
| 119 » » return nil, grpcutil.Internal | |
| 115 } | 120 } |
| 116 » pfx, err := registerPrefix(c, &lsp) | 121 |
| 117 » if err != nil { | 122 » // The prefix secret much match the request secret. If it does, we know this |
|
nodir
2016/05/17 02:45:59
s/much/must
dnj (Google)
2016/05/17 14:44:33
Done.
| |
| 118 » » log.Errorf(c, "Failed to register/validate log stream prefix.") | 123 » // is a legitimate registration attempt. |
| 119 » » return nil, err | 124 » if subtle.ConstantTimeCompare(pfx.Secret, req.Secret) != 1 { |
| 125 » » log.Errorf(c, "Request secret does not match prefix secret.") | |
| 126 » » return nil, grpcutil.Errf(codes.InvalidArgument, "invalid secret ") | |
| 120 } | 127 } |
| 121 log.Fields{ | |
| 122 "prefix": pfx.Prefix, | |
| 123 "prefixCreated": pfx.Created, | |
| 124 }.Debugf(c, "Loaded log stream prefix.") | |
| 125 | 128 |
|
nodir
2016/05/17 02:45:59
check that prefix did not expire
dnj (Google)
2016/05/17 14:44:33
Done.
nodir
2016/05/17 16:19:47
I don't see where is it done. You've added two pre
dnj (Google)
2016/05/17 16:32:54
Oh sorry meant to link the other CL: https://coder
| |
| 126 » di := ds.Get(c) | 129 » // Check for registration (non-transactional). |
| 127 ls := &coordinator.LogStream{ID: coordinator.LogStreamID(path)} | 130 ls := &coordinator.LogStream{ID: coordinator.LogStreamID(path)} |
| 128 lst := ls.State(di) | 131 lst := ls.State(di) |
| 129 | 132 |
| 130 » // Check for registration (non-transactional). | 133 » if err := di.GetMulti([]interface{}{ls, lst}); err != nil { |
|
nodir
2016/05/17 02:45:59
yeah.. GetMulti should have been made variadic. I
dnj (Google)
2016/05/17 14:44:33
Yeah we're actually going to make "Get" variadic f
| |
| 131 » if err := checkRegisterStream(di, ls, lst); err != nil { | |
| 132 if !anyNoSuchEntity(err) { | 134 if !anyNoSuchEntity(err) { |
| 133 log.WithError(err).Errorf(c, "Failed to check for log st ream.") | 135 log.WithError(err).Errorf(c, "Failed to check for log st ream.") |
| 134 return nil, err | 136 return nil, err |
| 135 } | 137 } |
| 136 | 138 |
| 137 // The stream is not registered. Perform a transactional registr ation via | 139 // The stream is not registered. Perform a transactional registr ation via |
| 138 // mutation. | 140 // mutation. |
| 139 // | 141 // |
| 140 // Determine which hierarchy components we need to add. | 142 // Determine which hierarchy components we need to add. |
| 141 comps := hierarchy.Components(path) | 143 comps := hierarchy.Components(path) |
| 142 if comps, err = hierarchy.Missing(di, comps); err != nil { | 144 if comps, err = hierarchy.Missing(di, comps); err != nil { |
| 143 log.WithError(err).Warningf(c, "Failed to probe for miss ing hierarchy components.") | 145 log.WithError(err).Warningf(c, "Failed to probe for miss ing hierarchy components.") |
| 144 } | 146 } |
| 145 | 147 |
| 146 // Before we go into transaction, try and put these entries. Thi s should not | 148 // Before we go into transaction, try and put these entries. Thi s should not |
| 147 // be contested, since components don't share an entity root. | 149 // be contested, since components don't share an entity root. |
| 148 if err := hierarchy.PutMulti(di, comps); err != nil { | 150 if err := hierarchy.PutMulti(di, comps); err != nil { |
| 149 » » » log.WithError(err).Infof(c, "Failed to add missing hiera rchy components.") | 151 » » » log.WithError(err).Errorf(c, "Failed to add missing hier archy components.") |
| 150 return nil, grpcutil.Internal | 152 return nil, grpcutil.Internal |
| 151 } | 153 } |
| 152 | 154 |
| 153 // The stream does not exist. Proceed with transactional registr ation. | 155 // The stream does not exist. Proceed with transactional registr ation. |
| 154 err = tumble.RunMutation(c, ®isterStreamMutation{ | 156 err = tumble.RunMutation(c, ®isterStreamMutation{ |
| 155 RegisterStreamRequest: req, | 157 RegisterStreamRequest: req, |
| 156 desc: &desc, | 158 desc: &desc, |
| 157 pfx: pfx, | 159 pfx: pfx, |
| 158 lst: lst, | 160 lst: lst, |
| 159 ls: ls, | 161 ls: ls, |
| 160 archiveDelay: archiveDelayMax, | 162 archiveDelay: archiveDelayMax, |
| 161 }) | 163 }) |
| 162 if err != nil { | 164 if err != nil { |
| 163 log.Fields{ | 165 log.Fields{ |
| 164 log.ErrorKey: err, | 166 log.ErrorKey: err, |
| 165 }.Errorf(c, "Failed to register LogStream.") | 167 }.Errorf(c, "Failed to register LogStream.") |
| 166 return nil, err | 168 return nil, err |
| 167 } | 169 } |
| 168 } | 170 } |
| 169 | 171 |
| 170 return &logdog.RegisterStreamResponse{ | 172 return &logdog.RegisterStreamResponse{ |
| 171 Id: string(ls.ID), | 173 Id: string(ls.ID), |
| 172 State: buildLogStreamState(ls, lst), | 174 State: buildLogStreamState(ls, lst), |
| 173 }, nil | 175 }, nil |
| 174 } | 176 } |
| 175 | 177 |
| 176 func checkRegisterStream(di ds.Interface, ls *coordinator.LogStream, lst *coordi nator.LogStreamState) error { | |
| 177 // Load the existing log stream state. | |
| 178 // | |
| 179 // We have already verified that the secrets match when the Prefix was | |
| 180 // registered. This will have to verify secrets when prefix registration | |
| 181 // moves its own RPC. | |
| 182 return di.GetMulti([]interface{}{ls, lst}) | |
| 183 } | |
| 184 | |
| 185 type registerStreamMutation struct { | 178 type registerStreamMutation struct { |
| 186 *logdog.RegisterStreamRequest | 179 *logdog.RegisterStreamRequest |
| 187 | 180 |
| 188 desc *logpb.LogStreamDescriptor | 181 desc *logpb.LogStreamDescriptor |
| 189 pfx *coordinator.LogPrefix | 182 pfx *coordinator.LogPrefix |
| 190 ls *coordinator.LogStream | 183 ls *coordinator.LogStream |
| 191 lst *coordinator.LogStreamState | 184 lst *coordinator.LogStreamState |
| 192 archiveDelay time.Duration | 185 archiveDelay time.Duration |
| 193 } | 186 } |
| 194 | 187 |
| 195 func (m *registerStreamMutation) RollForward(c context.Context) ([]tumble.Mutati on, error) { | 188 func (m *registerStreamMutation) RollForward(c context.Context) ([]tumble.Mutati on, error) { |
| 196 di := ds.Get(c) | 189 di := ds.Get(c) |
| 197 | 190 |
| 198 » // Check if our stream is registered (transactional). | 191 » // Load our state and stream (transactional). |
| 199 » if err := checkRegisterStream(di, m.ls, m.lst); err != nil { | 192 » if err := di.GetMulti([]interface{}{m.ls, m.lst}); err != nil { |
| 200 if !anyNoSuchEntity(err) { | 193 if !anyNoSuchEntity(err) { |
| 201 log.WithError(err).Errorf(c, "Failed to check for stream registration (transactional).") | 194 log.WithError(err).Errorf(c, "Failed to check for stream registration (transactional).") |
| 202 return nil, err | 195 return nil, err |
| 203 } | 196 } |
| 204 | 197 |
| 205 // The stream is not yet registered. | 198 // The stream is not yet registered. |
| 206 log.Infof(c, "Registering new log stream.") | 199 log.Infof(c, "Registering new log stream.") |
| 207 | 200 |
| 208 now := clock.Now(c).UTC() | 201 now := clock.Now(c).UTC() |
| 209 | 202 |
| (...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 252 return nil, grpcutil.Internal | 245 return nil, grpcutil.Internal |
| 253 } | 246 } |
| 254 } | 247 } |
| 255 | 248 |
| 256 return nil, nil | 249 return nil, nil |
| 257 } | 250 } |
| 258 | 251 |
| 259 func (m *registerStreamMutation) Root(c context.Context) *ds.Key { | 252 func (m *registerStreamMutation) Root(c context.Context) *ds.Key { |
| 260 return ds.Get(c).KeyForObj(m.ls) | 253 return ds.Get(c).KeyForObj(m.ls) |
| 261 } | 254 } |
| OLD | NEW |