| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package services | 5 package services |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "crypto/subtle" |
| 8 "time" | 9 "time" |
| 9 | 10 |
| 10 "github.com/golang/protobuf/proto" | 11 "github.com/golang/protobuf/proto" |
| 11 ds "github.com/luci/gae/service/datastore" | 12 ds "github.com/luci/gae/service/datastore" |
| 12 "github.com/luci/luci-go/appengine/logdog/coordinator" | 13 "github.com/luci/luci-go/appengine/logdog/coordinator" |
| 13 "github.com/luci/luci-go/appengine/logdog/coordinator/endpoints" | 14 "github.com/luci/luci-go/appengine/logdog/coordinator/endpoints" |
| 14 "github.com/luci/luci-go/appengine/logdog/coordinator/hierarchy" | 15 "github.com/luci/luci-go/appengine/logdog/coordinator/hierarchy" |
| 15 "github.com/luci/luci-go/appengine/logdog/coordinator/mutations" | 16 "github.com/luci/luci-go/appengine/logdog/coordinator/mutations" |
| 16 "github.com/luci/luci-go/appengine/tumble" | 17 "github.com/luci/luci-go/appengine/tumble" |
| 17 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" | 18 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
| (...skipping 67 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 85 | 86 |
| 86 pcfg, err := coordinator.CurrentProjectConfig(c) | 87 pcfg, err := coordinator.CurrentProjectConfig(c) |
| 87 if err != nil { | 88 if err != nil { |
| 88 log.WithError(err).Errorf(c, "Failed to load current project con
figuration.") | 89 log.WithError(err).Errorf(c, "Failed to load current project con
figuration.") |
| 89 return nil, grpcutil.Internal | 90 return nil, grpcutil.Internal |
| 90 } | 91 } |
| 91 | 92 |
| 92 // Determine the archival expiration. | 93 // Determine the archival expiration. |
| 93 archiveDelayMax := endpoints.MinDuration(cfg.Coordinator.ArchiveDelayMax
, pcfg.MaxStreamAge) | 94 archiveDelayMax := endpoints.MinDuration(cfg.Coordinator.ArchiveDelayMax
, pcfg.MaxStreamAge) |
| 94 | 95 |
| 95 » // Register our Prefix. | 96 » // Load our Prefix. It must be registered. |
| 96 » // | 97 » di := ds.Get(c) |
| 97 » // This will also verify that our request secret matches the registered
one, | 98 |
| 98 » // if one is registered. | 99 » pfx := &coordinator.LogPrefix{ID: coordinator.LogPrefixID(prefix)} |
| 99 » // | 100 » if err := di.Get(pfx); err != nil { |
| 100 » // Note: This step will not be necessary once a "register prefix" RPC ca
ll is | 101 » » log.Fields{ |
| 101 » // implemented. | 102 » » » log.ErrorKey: err, |
| 102 » lsp := logStreamPrefix{ | 103 » » » "id": pfx.ID, |
| 103 » » prefix: string(prefix), | 104 » » » "prefix": prefix, |
| 104 » » secret: req.Secret, | 105 » » }.Errorf(c, "Failed to load log stream prefix.") |
| 106 » » if err == ds.ErrNoSuchEntity { |
| 107 » » » return nil, grpcutil.Errf(codes.FailedPrecondition, "pre
fix is not registered") |
| 108 » » } |
| 109 » » return nil, grpcutil.Internal |
| 105 } | 110 } |
| 106 » pfx, err := registerPrefix(c, &lsp) | 111 |
| 107 » if err != nil { | 112 » // The prefix secret must match the request secret. If it does, we know
this |
| 108 » » log.Errorf(c, "Failed to register/validate log stream prefix.") | 113 » // is a legitimate registration attempt. |
| 109 » » return nil, err | 114 » if subtle.ConstantTimeCompare(pfx.Secret, req.Secret) != 1 { |
| 115 » » log.Errorf(c, "Request secret does not match prefix secret.") |
| 116 » » return nil, grpcutil.Errf(codes.InvalidArgument, "invalid secret
") |
| 110 } | 117 } |
| 111 log.Fields{ | |
| 112 "prefix": pfx.Prefix, | |
| 113 "prefixCreated": pfx.Created, | |
| 114 }.Debugf(c, "Loaded log stream prefix.") | |
| 115 | 118 |
| 116 » di := ds.Get(c) | 119 » // Check for registration, and that the prefix did not expire |
| 120 » // (non-transactional). |
| 117 ls := &coordinator.LogStream{ID: coordinator.LogStreamID(path)} | 121 ls := &coordinator.LogStream{ID: coordinator.LogStreamID(path)} |
| 118 lst := ls.State(di) | 122 lst := ls.State(di) |
| 119 | 123 |
| 120 » // Check for registration (non-transactional). | 124 » if err := di.GetMulti([]interface{}{ls, lst}); err != nil { |
| 121 » if err := checkRegisterStream(di, ls, lst); err != nil { | |
| 122 if !anyNoSuchEntity(err) { | 125 if !anyNoSuchEntity(err) { |
| 123 log.WithError(err).Errorf(c, "Failed to check for log st
ream.") | 126 log.WithError(err).Errorf(c, "Failed to check for log st
ream.") |
| 124 return nil, err | 127 return nil, err |
| 125 } | 128 } |
| 126 | 129 |
| 127 // The stream is not registered. Perform a transactional registr
ation via | 130 // The stream is not registered. Perform a transactional registr
ation via |
| 128 // mutation. | 131 // mutation. |
| 129 // | 132 // |
| 130 // Determine which hierarchy components we need to add. | 133 // Determine which hierarchy components we need to add. |
| 131 comps := hierarchy.Components(path) | 134 comps := hierarchy.Components(path) |
| 132 if comps, err = hierarchy.Missing(di, comps); err != nil { | 135 if comps, err = hierarchy.Missing(di, comps); err != nil { |
| 133 log.WithError(err).Warningf(c, "Failed to probe for miss
ing hierarchy components.") | 136 log.WithError(err).Warningf(c, "Failed to probe for miss
ing hierarchy components.") |
| 134 } | 137 } |
| 135 | 138 |
| 136 // Before we go into transaction, try and put these entries. Thi
s should not | 139 // Before we go into transaction, try and put these entries. Thi
s should not |
| 137 // be contested, since components don't share an entity root. | 140 // be contested, since components don't share an entity root. |
| 138 if err := hierarchy.PutMulti(di, comps); err != nil { | 141 if err := hierarchy.PutMulti(di, comps); err != nil { |
| 139 » » » log.WithError(err).Infof(c, "Failed to add missing hiera
rchy components.") | 142 » » » log.WithError(err).Errorf(c, "Failed to add missing hier
archy components.") |
| 140 return nil, grpcutil.Internal | 143 return nil, grpcutil.Internal |
| 141 } | 144 } |
| 142 | 145 |
| 143 // The stream does not exist. Proceed with transactional registr
ation. | 146 // The stream does not exist. Proceed with transactional registr
ation. |
| 144 err = tumble.RunMutation(c, ®isterStreamMutation{ | 147 err = tumble.RunMutation(c, ®isterStreamMutation{ |
| 145 RegisterStreamRequest: req, | 148 RegisterStreamRequest: req, |
| 146 desc: &desc, | 149 desc: &desc, |
| 147 pfx: pfx, | 150 pfx: pfx, |
| 148 lst: lst, | 151 lst: lst, |
| 149 ls: ls, | 152 ls: ls, |
| 150 archiveDelay: archiveDelayMax, | 153 archiveDelay: archiveDelayMax, |
| 151 }) | 154 }) |
| 152 if err != nil { | 155 if err != nil { |
| 153 log.Fields{ | 156 log.Fields{ |
| 154 log.ErrorKey: err, | 157 log.ErrorKey: err, |
| 155 }.Errorf(c, "Failed to register LogStream.") | 158 }.Errorf(c, "Failed to register LogStream.") |
| 156 return nil, err | 159 return nil, err |
| 157 } | 160 } |
| 158 } | 161 } |
| 159 | 162 |
| 160 return &logdog.RegisterStreamResponse{ | 163 return &logdog.RegisterStreamResponse{ |
| 161 Id: string(ls.ID), | 164 Id: string(ls.ID), |
| 162 State: buildLogStreamState(ls, lst), | 165 State: buildLogStreamState(ls, lst), |
| 163 }, nil | 166 }, nil |
| 164 } | 167 } |
| 165 | 168 |
| 166 func checkRegisterStream(di ds.Interface, ls *coordinator.LogStream, lst *coordi
nator.LogStreamState) error { | |
| 167 // Load the existing log stream state. | |
| 168 // | |
| 169 // We have already verified that the secrets match when the Prefix was | |
| 170 // registered. This will have to verify secrets when prefix registration | |
| 171 // moves its own RPC. | |
| 172 return di.GetMulti([]interface{}{ls, lst}) | |
| 173 } | |
| 174 | |
| 175 type registerStreamMutation struct { | 169 type registerStreamMutation struct { |
| 176 *logdog.RegisterStreamRequest | 170 *logdog.RegisterStreamRequest |
| 177 | 171 |
| 178 desc *logpb.LogStreamDescriptor | 172 desc *logpb.LogStreamDescriptor |
| 179 pfx *coordinator.LogPrefix | 173 pfx *coordinator.LogPrefix |
| 180 ls *coordinator.LogStream | 174 ls *coordinator.LogStream |
| 181 lst *coordinator.LogStreamState | 175 lst *coordinator.LogStreamState |
| 182 archiveDelay time.Duration | 176 archiveDelay time.Duration |
| 183 } | 177 } |
| 184 | 178 |
| 185 func (m *registerStreamMutation) RollForward(c context.Context) ([]tumble.Mutati
on, error) { | 179 func (m *registerStreamMutation) RollForward(c context.Context) ([]tumble.Mutati
on, error) { |
| 186 di := ds.Get(c) | 180 di := ds.Get(c) |
| 187 | 181 |
| 188 » // Check if our stream is registered (transactional). | 182 » // Load our state and stream (transactional). |
| 189 » if err := checkRegisterStream(di, m.ls, m.lst); err != nil { | 183 » if err := di.GetMulti([]interface{}{m.ls, m.lst}); err != nil { |
| 190 if !anyNoSuchEntity(err) { | 184 if !anyNoSuchEntity(err) { |
| 191 log.WithError(err).Errorf(c, "Failed to check for stream
registration (transactional).") | 185 log.WithError(err).Errorf(c, "Failed to check for stream
registration (transactional).") |
| 192 return nil, err | 186 return nil, err |
| 193 } | 187 } |
| 194 | 188 |
| 195 // The stream is not yet registered. | 189 // The stream is not yet registered. |
| 196 log.Infof(c, "Registering new log stream.") | 190 log.Infof(c, "Registering new log stream.") |
| 197 | 191 |
| 198 now := clock.Now(c).UTC() | 192 now := clock.Now(c).UTC() |
| 199 | 193 |
| (...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 242 return nil, grpcutil.Internal | 236 return nil, grpcutil.Internal |
| 243 } | 237 } |
| 244 } | 238 } |
| 245 | 239 |
| 246 return nil, nil | 240 return nil, nil |
| 247 } | 241 } |
| 248 | 242 |
| 249 func (m *registerStreamMutation) Root(c context.Context) *ds.Key { | 243 func (m *registerStreamMutation) Root(c context.Context) *ds.Key { |
| 250 return ds.Get(c).KeyForObj(m.ls) | 244 return ds.Get(c).KeyForObj(m.ls) |
| 251 } | 245 } |
| OLD | NEW |