Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2)

Side by Side Diff: appengine/logdog/coordinator/endpoints/services/registerStream.go

Issue 1967273002: LogDog: Implement RegisterPrefix RPC. (Closed) Base URL: https://github.com/luci/luci-go@logdog-butler-register-coordinator-endpoint
Patch Set: Updated patchset dependency Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package services 5 package services
6 6
7 import ( 7 import (
8 "crypto/subtle"
8 "time" 9 "time"
9 10
10 "github.com/golang/protobuf/proto" 11 "github.com/golang/protobuf/proto"
11 ds "github.com/luci/gae/service/datastore" 12 ds "github.com/luci/gae/service/datastore"
12 "github.com/luci/luci-go/appengine/logdog/coordinator" 13 "github.com/luci/luci-go/appengine/logdog/coordinator"
13 "github.com/luci/luci-go/appengine/logdog/coordinator/endpoints" 14 "github.com/luci/luci-go/appengine/logdog/coordinator/endpoints"
14 "github.com/luci/luci-go/appengine/logdog/coordinator/hierarchy" 15 "github.com/luci/luci-go/appengine/logdog/coordinator/hierarchy"
15 "github.com/luci/luci-go/appengine/logdog/coordinator/mutations" 16 "github.com/luci/luci-go/appengine/logdog/coordinator/mutations"
16 "github.com/luci/luci-go/appengine/tumble" 17 "github.com/luci/luci-go/appengine/tumble"
17 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" 18 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
(...skipping 67 matching lines...) Expand 10 before | Expand all | Expand 10 after
85 86
86 pcfg, err := coordinator.CurrentProjectConfig(c) 87 pcfg, err := coordinator.CurrentProjectConfig(c)
87 if err != nil { 88 if err != nil {
88 log.WithError(err).Errorf(c, "Failed to load current project con figuration.") 89 log.WithError(err).Errorf(c, "Failed to load current project con figuration.")
89 return nil, grpcutil.Internal 90 return nil, grpcutil.Internal
90 } 91 }
91 92
92 // Determine the archival expiration. 93 // Determine the archival expiration.
93 archiveDelayMax := endpoints.MinDuration(cfg.Coordinator.ArchiveDelayMax , pcfg.MaxStreamAge) 94 archiveDelayMax := endpoints.MinDuration(cfg.Coordinator.ArchiveDelayMax , pcfg.MaxStreamAge)
94 95
95 » // Register our Prefix. 96 » // Load our Prefix. It must be registered.
96 » // 97 » di := ds.Get(c)
97 » // This will also verify that our request secret matches the registered one, 98
98 » // if one is registered. 99 » pfx := &coordinator.LogPrefix{ID: coordinator.LogPrefixID(prefix)}
99 » // 100 » if err := di.Get(pfx); err != nil {
100 » // Note: This step will not be necessary once a "register prefix" RPC ca ll is 101 » » log.Fields{
101 » // implemented. 102 » » » log.ErrorKey: err,
102 » lsp := logStreamPrefix{ 103 » » » "id": pfx.ID,
103 » » prefix: string(prefix), 104 » » » "prefix": prefix,
104 » » secret: req.Secret, 105 » » }.Errorf(c, "Failed to load log stream prefix.")
106 » » if err == ds.ErrNoSuchEntity {
107 » » » return nil, grpcutil.Errf(codes.FailedPrecondition, "pre fix is not registered")
108 » » }
109 » » return nil, grpcutil.Internal
105 } 110 }
106 » pfx, err := registerPrefix(c, &lsp) 111
107 » if err != nil { 112 » // The prefix secret must match the request secret. If it does, we know this
108 » » log.Errorf(c, "Failed to register/validate log stream prefix.") 113 » // is a legitimate registration attempt.
109 » » return nil, err 114 » if subtle.ConstantTimeCompare(pfx.Secret, req.Secret) != 1 {
115 » » log.Errorf(c, "Request secret does not match prefix secret.")
116 » » return nil, grpcutil.Errf(codes.InvalidArgument, "invalid secret ")
110 } 117 }
111 log.Fields{
112 "prefix": pfx.Prefix,
113 "prefixCreated": pfx.Created,
114 }.Debugf(c, "Loaded log stream prefix.")
115 118
116 » di := ds.Get(c) 119 » // Check for registration, and that the prefix did not expire
120 » // (non-transactional).
117 ls := &coordinator.LogStream{ID: coordinator.LogStreamID(path)} 121 ls := &coordinator.LogStream{ID: coordinator.LogStreamID(path)}
118 lst := ls.State(di) 122 lst := ls.State(di)
119 123
120 » // Check for registration (non-transactional). 124 » if err := di.GetMulti([]interface{}{ls, lst}); err != nil {
121 » if err := checkRegisterStream(di, ls, lst); err != nil {
122 if !anyNoSuchEntity(err) { 125 if !anyNoSuchEntity(err) {
123 log.WithError(err).Errorf(c, "Failed to check for log st ream.") 126 log.WithError(err).Errorf(c, "Failed to check for log st ream.")
124 return nil, err 127 return nil, err
125 } 128 }
126 129
127 // The stream is not registered. Perform a transactional registr ation via 130 // The stream is not registered. Perform a transactional registr ation via
128 // mutation. 131 // mutation.
129 // 132 //
130 // Determine which hierarchy components we need to add. 133 // Determine which hierarchy components we need to add.
131 comps := hierarchy.Components(path) 134 comps := hierarchy.Components(path)
132 if comps, err = hierarchy.Missing(di, comps); err != nil { 135 if comps, err = hierarchy.Missing(di, comps); err != nil {
133 log.WithError(err).Warningf(c, "Failed to probe for miss ing hierarchy components.") 136 log.WithError(err).Warningf(c, "Failed to probe for miss ing hierarchy components.")
134 } 137 }
135 138
136 // Before we go into transaction, try and put these entries. Thi s should not 139 // Before we go into transaction, try and put these entries. Thi s should not
137 // be contested, since components don't share an entity root. 140 // be contested, since components don't share an entity root.
138 if err := hierarchy.PutMulti(di, comps); err != nil { 141 if err := hierarchy.PutMulti(di, comps); err != nil {
139 » » » log.WithError(err).Infof(c, "Failed to add missing hiera rchy components.") 142 » » » log.WithError(err).Errorf(c, "Failed to add missing hier archy components.")
140 return nil, grpcutil.Internal 143 return nil, grpcutil.Internal
141 } 144 }
142 145
143 // The stream does not exist. Proceed with transactional registr ation. 146 // The stream does not exist. Proceed with transactional registr ation.
144 err = tumble.RunMutation(c, &registerStreamMutation{ 147 err = tumble.RunMutation(c, &registerStreamMutation{
145 RegisterStreamRequest: req, 148 RegisterStreamRequest: req,
146 desc: &desc, 149 desc: &desc,
147 pfx: pfx, 150 pfx: pfx,
148 lst: lst, 151 lst: lst,
149 ls: ls, 152 ls: ls,
150 archiveDelay: archiveDelayMax, 153 archiveDelay: archiveDelayMax,
151 }) 154 })
152 if err != nil { 155 if err != nil {
153 log.Fields{ 156 log.Fields{
154 log.ErrorKey: err, 157 log.ErrorKey: err,
155 }.Errorf(c, "Failed to register LogStream.") 158 }.Errorf(c, "Failed to register LogStream.")
156 return nil, err 159 return nil, err
157 } 160 }
158 } 161 }
159 162
160 return &logdog.RegisterStreamResponse{ 163 return &logdog.RegisterStreamResponse{
161 Id: string(ls.ID), 164 Id: string(ls.ID),
162 State: buildLogStreamState(ls, lst), 165 State: buildLogStreamState(ls, lst),
163 }, nil 166 }, nil
164 } 167 }
165 168
166 func checkRegisterStream(di ds.Interface, ls *coordinator.LogStream, lst *coordi nator.LogStreamState) error {
167 // Load the existing log stream state.
168 //
169 // We have already verified that the secrets match when the Prefix was
170 // registered. This will have to verify secrets when prefix registration
171 // moves its own RPC.
172 return di.GetMulti([]interface{}{ls, lst})
173 }
174
175 type registerStreamMutation struct { 169 type registerStreamMutation struct {
176 *logdog.RegisterStreamRequest 170 *logdog.RegisterStreamRequest
177 171
178 desc *logpb.LogStreamDescriptor 172 desc *logpb.LogStreamDescriptor
179 pfx *coordinator.LogPrefix 173 pfx *coordinator.LogPrefix
180 ls *coordinator.LogStream 174 ls *coordinator.LogStream
181 lst *coordinator.LogStreamState 175 lst *coordinator.LogStreamState
182 archiveDelay time.Duration 176 archiveDelay time.Duration
183 } 177 }
184 178
185 func (m *registerStreamMutation) RollForward(c context.Context) ([]tumble.Mutati on, error) { 179 func (m *registerStreamMutation) RollForward(c context.Context) ([]tumble.Mutati on, error) {
186 di := ds.Get(c) 180 di := ds.Get(c)
187 181
188 » // Check if our stream is registered (transactional). 182 » // Load our state and stream (transactional).
189 » if err := checkRegisterStream(di, m.ls, m.lst); err != nil { 183 » if err := di.GetMulti([]interface{}{m.ls, m.lst}); err != nil {
190 if !anyNoSuchEntity(err) { 184 if !anyNoSuchEntity(err) {
191 log.WithError(err).Errorf(c, "Failed to check for stream registration (transactional).") 185 log.WithError(err).Errorf(c, "Failed to check for stream registration (transactional).")
192 return nil, err 186 return nil, err
193 } 187 }
194 188
195 // The stream is not yet registered. 189 // The stream is not yet registered.
196 log.Infof(c, "Registering new log stream.") 190 log.Infof(c, "Registering new log stream.")
197 191
198 now := clock.Now(c).UTC() 192 now := clock.Now(c).UTC()
199 193
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after
242 return nil, grpcutil.Internal 236 return nil, grpcutil.Internal
243 } 237 }
244 } 238 }
245 239
246 return nil, nil 240 return nil, nil
247 } 241 }
248 242
249 func (m *registerStreamMutation) Root(c context.Context) *ds.Key { 243 func (m *registerStreamMutation) Root(c context.Context) *ds.Key {
250 return ds.Get(c).KeyForObj(m.ls) 244 return ds.Get(c).KeyForObj(m.ls)
251 } 245 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698