Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(277)

Side by Side Diff: appengine/logdog/coordinator/endpoints/services/registerStream.go

Issue 1967273002: LogDog: Implement RegisterPrefix RPC. (Closed) Base URL: https://github.com/luci/luci-go@logdog-butler-register-coordinator-endpoint
Patch Set: Add missing test. Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package services 5 package services
6 6
7 import ( 7 import (
8 "crypto/subtle"
8 "time" 9 "time"
9 10
10 "github.com/golang/protobuf/proto" 11 "github.com/golang/protobuf/proto"
11 ds "github.com/luci/gae/service/datastore" 12 ds "github.com/luci/gae/service/datastore"
12 "github.com/luci/luci-go/appengine/logdog/coordinator" 13 "github.com/luci/luci-go/appengine/logdog/coordinator"
13 "github.com/luci/luci-go/appengine/logdog/coordinator/hierarchy" 14 "github.com/luci/luci-go/appengine/logdog/coordinator/hierarchy"
14 "github.com/luci/luci-go/appengine/logdog/coordinator/mutations" 15 "github.com/luci/luci-go/appengine/logdog/coordinator/mutations"
15 "github.com/luci/luci-go/appengine/tumble" 16 "github.com/luci/luci-go/appengine/tumble"
16 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" 17 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
17 "github.com/luci/luci-go/common/clock" 18 "github.com/luci/luci-go/common/clock"
(...skipping 77 matching lines...) Expand 10 before | Expand all | Expand 10 after
95 96
96 pcfg, err := coordinator.CurrentProjectConfig(c) 97 pcfg, err := coordinator.CurrentProjectConfig(c)
97 if err != nil { 98 if err != nil {
98 log.WithError(err).Errorf(c, "Failed to load current project con figuration.") 99 log.WithError(err).Errorf(c, "Failed to load current project con figuration.")
99 return nil, grpcutil.Internal 100 return nil, grpcutil.Internal
100 } 101 }
101 102
102 // Determine the archival expiration. 103 // Determine the archival expiration.
103 archiveDelayMax := resolveArchiveDelay(cfg.Coordinator, pcfg) 104 archiveDelayMax := resolveArchiveDelay(cfg.Coordinator, pcfg)
104 105
105 » // Register our Prefix. 106 » // Load our Prefix. It must be registered.
106 » // 107 » di := ds.Get(c)
107 » // This will also verify that our request secret matches the registered one, 108
108 » // if one is registered. 109 » pfx := &coordinator.LogPrefix{ID: coordinator.LogPrefixID(prefix)}
109 » // 110 » if err := di.Get(pfx); err != nil {
110 » // Note: This step will not be necessary once a "register prefix" RPC ca ll is 111 » » log.Fields{
111 » // implemented. 112 » » » log.ErrorKey: err,
112 » lsp := logStreamPrefix{ 113 » » » "id": pfx.ID,
113 » » prefix: string(prefix), 114 » » » "prefix": prefix,
114 » » secret: req.Secret, 115 » » }.Errorf(c, "Failed to load log stream prefix.")
116 » » if err == ds.ErrNoSuchEntity {
117 » » » return nil, grpcutil.Errf(codes.FailedPrecondition, "pre fix is not registered")
nodir 2016/05/17 02:45:59 include the prefix in the error msg?
dnj (Google) 2016/05/17 14:44:33 I'm choosing not to. The user knows what they've s
118 » » }
119 » » return nil, grpcutil.Internal
115 } 120 }
116 » pfx, err := registerPrefix(c, &lsp) 121
117 » if err != nil { 122 » // The prefix secret much match the request secret. If it does, we know this
nodir 2016/05/17 02:45:59 s/much/must
dnj (Google) 2016/05/17 14:44:33 Done.
118 » » log.Errorf(c, "Failed to register/validate log stream prefix.") 123 » // is a legitimate registration attempt.
119 » » return nil, err 124 » if subtle.ConstantTimeCompare(pfx.Secret, req.Secret) != 1 {
125 » » log.Errorf(c, "Request secret does not match prefix secret.")
126 » » return nil, grpcutil.Errf(codes.InvalidArgument, "invalid secret ")
120 } 127 }
121 log.Fields{
122 "prefix": pfx.Prefix,
123 "prefixCreated": pfx.Created,
124 }.Debugf(c, "Loaded log stream prefix.")
125 128
nodir 2016/05/17 02:45:59 check that prefix did not expire
dnj (Google) 2016/05/17 14:44:33 Done.
nodir 2016/05/17 16:19:47 I don't see where is it done. You've added two pre
dnj (Google) 2016/05/17 16:32:54 Oh sorry meant to link the other CL: https://coder
126 » di := ds.Get(c) 129 » // Check for registration (non-transactional).
127 ls := &coordinator.LogStream{ID: coordinator.LogStreamID(path)} 130 ls := &coordinator.LogStream{ID: coordinator.LogStreamID(path)}
128 lst := ls.State(di) 131 lst := ls.State(di)
129 132
130 » // Check for registration (non-transactional). 133 » if err := di.GetMulti([]interface{}{ls, lst}); err != nil {
nodir 2016/05/17 02:45:59 yeah.. GetMulti should have been made variadic. I
dnj (Google) 2016/05/17 14:44:33 Yeah we're actually going to make "Get" variadic f
131 » if err := checkRegisterStream(di, ls, lst); err != nil {
132 if !anyNoSuchEntity(err) { 134 if !anyNoSuchEntity(err) {
133 log.WithError(err).Errorf(c, "Failed to check for log st ream.") 135 log.WithError(err).Errorf(c, "Failed to check for log st ream.")
134 return nil, err 136 return nil, err
135 } 137 }
136 138
137 // The stream is not registered. Perform a transactional registr ation via 139 // The stream is not registered. Perform a transactional registr ation via
138 // mutation. 140 // mutation.
139 // 141 //
140 // Determine which hierarchy components we need to add. 142 // Determine which hierarchy components we need to add.
141 comps := hierarchy.Components(path) 143 comps := hierarchy.Components(path)
142 if comps, err = hierarchy.Missing(di, comps); err != nil { 144 if comps, err = hierarchy.Missing(di, comps); err != nil {
143 log.WithError(err).Warningf(c, "Failed to probe for miss ing hierarchy components.") 145 log.WithError(err).Warningf(c, "Failed to probe for miss ing hierarchy components.")
144 } 146 }
145 147
146 // Before we go into transaction, try and put these entries. Thi s should not 148 // Before we go into transaction, try and put these entries. Thi s should not
147 // be contested, since components don't share an entity root. 149 // be contested, since components don't share an entity root.
148 if err := hierarchy.PutMulti(di, comps); err != nil { 150 if err := hierarchy.PutMulti(di, comps); err != nil {
149 » » » log.WithError(err).Infof(c, "Failed to add missing hiera rchy components.") 151 » » » log.WithError(err).Errorf(c, "Failed to add missing hier archy components.")
150 return nil, grpcutil.Internal 152 return nil, grpcutil.Internal
151 } 153 }
152 154
153 // The stream does not exist. Proceed with transactional registr ation. 155 // The stream does not exist. Proceed with transactional registr ation.
154 err = tumble.RunMutation(c, &registerStreamMutation{ 156 err = tumble.RunMutation(c, &registerStreamMutation{
155 RegisterStreamRequest: req, 157 RegisterStreamRequest: req,
156 desc: &desc, 158 desc: &desc,
157 pfx: pfx, 159 pfx: pfx,
158 lst: lst, 160 lst: lst,
159 ls: ls, 161 ls: ls,
160 archiveDelay: archiveDelayMax, 162 archiveDelay: archiveDelayMax,
161 }) 163 })
162 if err != nil { 164 if err != nil {
163 log.Fields{ 165 log.Fields{
164 log.ErrorKey: err, 166 log.ErrorKey: err,
165 }.Errorf(c, "Failed to register LogStream.") 167 }.Errorf(c, "Failed to register LogStream.")
166 return nil, err 168 return nil, err
167 } 169 }
168 } 170 }
169 171
170 return &logdog.RegisterStreamResponse{ 172 return &logdog.RegisterStreamResponse{
171 Id: string(ls.ID), 173 Id: string(ls.ID),
172 State: buildLogStreamState(ls, lst), 174 State: buildLogStreamState(ls, lst),
173 }, nil 175 }, nil
174 } 176 }
175 177
176 func checkRegisterStream(di ds.Interface, ls *coordinator.LogStream, lst *coordi nator.LogStreamState) error {
177 // Load the existing log stream state.
178 //
179 // We have already verified that the secrets match when the Prefix was
180 // registered. This will have to verify secrets when prefix registration
181 // moves its own RPC.
182 return di.GetMulti([]interface{}{ls, lst})
183 }
184
185 type registerStreamMutation struct { 178 type registerStreamMutation struct {
186 *logdog.RegisterStreamRequest 179 *logdog.RegisterStreamRequest
187 180
188 desc *logpb.LogStreamDescriptor 181 desc *logpb.LogStreamDescriptor
189 pfx *coordinator.LogPrefix 182 pfx *coordinator.LogPrefix
190 ls *coordinator.LogStream 183 ls *coordinator.LogStream
191 lst *coordinator.LogStreamState 184 lst *coordinator.LogStreamState
192 archiveDelay time.Duration 185 archiveDelay time.Duration
193 } 186 }
194 187
195 func (m *registerStreamMutation) RollForward(c context.Context) ([]tumble.Mutati on, error) { 188 func (m *registerStreamMutation) RollForward(c context.Context) ([]tumble.Mutati on, error) {
196 di := ds.Get(c) 189 di := ds.Get(c)
197 190
198 » // Check if our stream is registered (transactional). 191 » // Load our state and stream (transactional).
199 » if err := checkRegisterStream(di, m.ls, m.lst); err != nil { 192 » if err := di.GetMulti([]interface{}{m.ls, m.lst}); err != nil {
200 if !anyNoSuchEntity(err) { 193 if !anyNoSuchEntity(err) {
201 log.WithError(err).Errorf(c, "Failed to check for stream registration (transactional).") 194 log.WithError(err).Errorf(c, "Failed to check for stream registration (transactional).")
202 return nil, err 195 return nil, err
203 } 196 }
204 197
205 // The stream is not yet registered. 198 // The stream is not yet registered.
206 log.Infof(c, "Registering new log stream.") 199 log.Infof(c, "Registering new log stream.")
207 200
208 now := clock.Now(c).UTC() 201 now := clock.Now(c).UTC()
209 202
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after
252 return nil, grpcutil.Internal 245 return nil, grpcutil.Internal
253 } 246 }
254 } 247 }
255 248
256 return nil, nil 249 return nil, nil
257 } 250 }
258 251
259 func (m *registerStreamMutation) Root(c context.Context) *ds.Key { 252 func (m *registerStreamMutation) Root(c context.Context) *ds.Key {
260 return ds.Get(c).KeyForObj(m.ls) 253 return ds.Get(c).KeyForObj(m.ls)
261 } 254 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698