Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(119)

Side by Side Diff: logdog/appengine/coordinator/endpoints/services/registerStream.go

Issue 2989333002: [logdog] Replace Tumble with push queues. (Closed)
Patch Set: comments Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The LUCI Authors. 1 // Copyright 2015 The LUCI Authors.
2 // 2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License. 4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at 5 // You may obtain a copy of the License at
6 // 6 //
7 // http://www.apache.org/licenses/LICENSE-2.0 7 // http://www.apache.org/licenses/LICENSE-2.0
8 // 8 //
9 // Unless required by applicable law or agreed to in writing, software 9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, 10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and 12 // See the License for the specific language governing permissions and
13 // limitations under the License. 13 // limitations under the License.
14 14
15 package services 15 package services
16 16
17 import ( 17 import (
18 "crypto/subtle" 18 "crypto/subtle"
19 "time"
19 20
20 "github.com/golang/protobuf/proto"
21 ds "github.com/luci/gae/service/datastore"
22 "github.com/luci/luci-go/common/clock" 21 "github.com/luci/luci-go/common/clock"
23 log "github.com/luci/luci-go/common/logging" 22 log "github.com/luci/luci-go/common/logging"
24 "github.com/luci/luci-go/grpc/grpcutil" 23 "github.com/luci/luci-go/grpc/grpcutil"
25 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" 24 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1"
26 "github.com/luci/luci-go/logdog/api/logpb" 25 "github.com/luci/luci-go/logdog/api/logpb"
27 "github.com/luci/luci-go/logdog/appengine/coordinator" 26 "github.com/luci/luci-go/logdog/appengine/coordinator"
28 "github.com/luci/luci-go/logdog/appengine/coordinator/endpoints" 27 "github.com/luci/luci-go/logdog/appengine/coordinator/endpoints"
29 » "github.com/luci/luci-go/logdog/appengine/coordinator/mutations" 28 » "github.com/luci/luci-go/logdog/appengine/coordinator/tasks"
30 "github.com/luci/luci-go/logdog/common/types" 29 "github.com/luci/luci-go/logdog/common/types"
31 » "github.com/luci/luci-go/tumble" 30
31 » ds "github.com/luci/gae/service/datastore"
32
33 » "github.com/golang/protobuf/proto"
32 "golang.org/x/net/context" 34 "golang.org/x/net/context"
33 "google.golang.org/grpc/codes" 35 "google.golang.org/grpc/codes"
34 ) 36 )
35 37
36 func buildLogStreamState(ls *coordinator.LogStream, lst *coordinator.LogStreamSt ate) *logdog.LogStreamState { 38 func buildLogStreamState(ls *coordinator.LogStream, lst *coordinator.LogStreamSt ate) *logdog.LogStreamState {
37 st := logdog.LogStreamState{ 39 st := logdog.LogStreamState{
38 ProtoVersion: ls.ProtoVersion, 40 ProtoVersion: ls.ProtoVersion,
39 Secret: lst.Secret, 41 Secret: lst.Secret,
40 TerminalIndex: lst.TerminalIndex, 42 TerminalIndex: lst.TerminalIndex,
41 Archived: lst.ArchivalState().Archived(), 43 Archived: lst.ArchivalState().Archived(),
(...skipping 104 matching lines...) Expand 10 before | Expand all | Expand 10 after
146 ls := &coordinator.LogStream{ID: logStreamID} 148 ls := &coordinator.LogStream{ID: logStreamID}
147 lst := ls.State(c) 149 lst := ls.State(c)
148 150
149 if err := ds.Get(c, ls, lst); err != nil { 151 if err := ds.Get(c, ls, lst); err != nil {
150 if !anyNoSuchEntity(err) { 152 if !anyNoSuchEntity(err) {
151 log.WithError(err).Errorf(c, "Failed to check for log st ream.") 153 log.WithError(err).Errorf(c, "Failed to check for log st ream.")
152 return nil, err 154 return nil, err
153 } 155 }
154 156
155 // The stream does not exist. Proceed with transactional registr ation. 157 // The stream does not exist. Proceed with transactional registr ation.
156 » » lstKey := ds.KeyForObj(c, lst) 158 » » err = ds.RunInTransaction(c, func(c context.Context) error {
157 » » err = tumble.RunUnbuffered(c, lstKey, func(c context.Context) ([ ]tumble.Mutation, error) {
158 // Load our state and stream (transactional). 159 // Load our state and stream (transactional).
159 switch err := ds.Get(c, ls, lst); { 160 switch err := ds.Get(c, ls, lst); {
160 case err == nil: 161 case err == nil:
161 // The stream is already registered. 162 // The stream is already registered.
162 » » » » return nil, nil 163 » » » » return nil
163 164
164 case !anyNoSuchEntity(err): 165 case !anyNoSuchEntity(err):
165 log.WithError(err).Errorf(c, "Failed to check fo r stream registration (transactional).") 166 log.WithError(err).Errorf(c, "Failed to check fo r stream registration (transactional).")
166 » » » » return nil, err 167 » » » » return err
167 } 168 }
168 169
169 // The stream is not yet registered. 170 // The stream is not yet registered.
170 log.Infof(c, "Registering new log stream.") 171 log.Infof(c, "Registering new log stream.")
171 172
172 // Construct our LogStreamState. 173 // Construct our LogStreamState.
173 now := clock.Now(c).UTC() 174 now := clock.Now(c).UTC()
174 lst.Created = now 175 lst.Created = now
175 lst.Updated = now 176 lst.Updated = now
176 lst.Secret = pfx.Secret // Copy Prefix Secret to reduce datastore Gets. 177 lst.Secret = pfx.Secret // Copy Prefix Secret to reduce datastore Gets.
177 178
178 // Construct our LogStream. 179 // Construct our LogStream.
179 ls.Created = now 180 ls.Created = now
180 ls.ProtoVersion = req.ProtoVersion 181 ls.ProtoVersion = req.ProtoVersion
181 182
182 if err := ls.LoadDescriptor(&desc); err != nil { 183 if err := ls.LoadDescriptor(&desc); err != nil {
183 log.Fields{ 184 log.Fields{
184 log.ErrorKey: err, 185 log.ErrorKey: err,
185 }.Errorf(c, "Failed to load descriptor into LogS tream.") 186 }.Errorf(c, "Failed to load descriptor into LogS tream.")
186 » » » » return nil, grpcutil.Errf(codes.InvalidArgument, "Failed to load descriptor.") 187 » » » » return grpcutil.Errf(codes.InvalidArgument, "Fai led to load descriptor.")
187 } 188 }
188 189
189 // If our registration request included a terminal index , terminate the 190 // If our registration request included a terminal index , terminate the
190 // log stream state as well. 191 // log stream state as well.
191 if req.TerminalIndex >= 0 { 192 if req.TerminalIndex >= 0 {
192 log.Fields{ 193 log.Fields{
193 "terminalIndex": req.TerminalIndex, 194 "terminalIndex": req.TerminalIndex,
194 }.Debugf(c, "Registration request included termi nal index.") 195 }.Debugf(c, "Registration request included termi nal index.")
195 196
196 lst.TerminalIndex = req.TerminalIndex 197 lst.TerminalIndex = req.TerminalIndex
197 lst.TerminatedTime = now 198 lst.TerminatedTime = now
198 } else { 199 } else {
199 lst.TerminalIndex = -1 200 lst.TerminalIndex = -1
200 } 201 }
201 202
202 if err := ds.Put(c, ls, lst); err != nil { 203 if err := ds.Put(c, ls, lst); err != nil {
203 log.Fields{ 204 log.Fields{
204 log.ErrorKey: err, 205 log.ErrorKey: err,
205 }.Errorf(c, "Failed to Put LogStream.") 206 }.Errorf(c, "Failed to Put LogStream.")
206 » » » » return nil, grpcutil.Internal 207 » » » » return grpcutil.Internal
207 } 208 }
208 209
209 » » » // Add a named delayed mutation to archive this stream i f it's not archived 210 » » » // Add a named delayed task queue task to archive this s tream if it's not
210 » » » // yet. 211 » » » // archived yet.
211 // 212 //
212 // If the registration did not include a terminal index, this will be our 213 // If the registration did not include a terminal index, this will be our
213 » » » // pessimistic archival request, scheduled on registrati on to catch streams 214 » » » // pessimistic archival request, scheduled on registrati on to catch
214 » » » // that don't expire. This mutation will be replaced by the optimistic 215 » » » // streams that don't expire. This task will be replaced by the optimistic
215 » » » // archival mutation when/if the stream is terminated vi a TerminateStream. 216 » » » // archival task when/if the stream is terminated via Te rminateStream.
216 // 217 //
217 // If the registration included a terminal index, apply our standard 218 // If the registration included a terminal index, apply our standard
218 // parameters to the archival. Since TerminateStream wil l not be called, 219 // parameters to the archival. Since TerminateStream wil l not be called,
219 // this will be our formal optimistic archival task. 220 // this will be our formal optimistic archival task.
220 params := standardArchivalParams(cfg, pcfg) 221 params := standardArchivalParams(cfg, pcfg)
221 » » » cat := mutations.CreateArchiveTask{ 222
222 » » » » ID: ls.ID, 223 » » » var (
223 » » » } 224 » » » » delay time.Duration
225 » » » » archivalTag logdog.ArchiveDispatchTask_Tag
226 » » » )
224 if req.TerminalIndex < 0 { 227 if req.TerminalIndex < 0 {
225 // No terminal index, schedule pessimistic clean up archival. 228 // No terminal index, schedule pessimistic clean up archival.
226 » » » » cat.Expiration = now.Add(params.CompletePeriod) 229 » » » » delay = params.CompletePeriod
230 » » » » archivalTag = logdog.ArchiveDispatchTask_EXPIRED
231
232 » » » » // For cleanup, we instruct the archival to not wait any longer or
233 » » » » // allow the stream time extra time to become co mplete (archive as-is).
234 » » » » params.SettleDelay = 0
235 » » » » params.CompletePeriod = 0
227 236
228 log.Fields{ 237 log.Fields{
229 » » » » » "deadline": cat.Expiration, 238 » » » » » "deadline": delay,
230 » » » » }.Debugf(c, "Scheduling cleanup archival mutatio n.") 239 » » » » }.Debugf(c, "Scheduling cleanup archival task.")
231 } else { 240 } else {
232 » » » » // Terminal index, schedule optimistic archival (mirrors TerminateStream). 241 » » » » // Schedule this task to execute after our settl e delay.
233 » » » » cat.SettleDelay = params.SettleDelay 242 » » » » delay = params.SettleDelay
234 » » » » cat.CompletePeriod = params.CompletePeriod 243 » » » » archivalTag = logdog.ArchiveDispatchTask_TERMINA TED
235
236 » » » » // Schedule this mutation to execute after our s ettle delay.
237 » » » » cat.Expiration = now.Add(params.SettleDelay)
238 244
239 log.Fields{ 245 log.Fields{
240 » » » » » "settleDelay": cat.SettleDelay, 246 » » » » » "settleDelay": params.SettleDelay,
241 » » » » » "completePeriod": cat.CompletePeriod, 247 » » » » » "completePeriod": params.CompletePeriod,
242 » » » » » "scheduledAt": cat.Expiration, 248 » » » » » "scheduledAt": delay,
243 » » » » }.Debugf(c, "Scheduling archival mutation.") 249 » » » » }.Debugf(c, "Scheduling archival task.")
244 } 250 }
245 251
246 » » » aeName := cat.TaskName(c) 252 » » » if err := tasks.CreateArchivalTask(c, logStreamID, archi valTag, delay, params); err != nil {
247 » » » if err := tumble.PutNamedMutations(c, lstKey, map[string ]tumble.Mutation{aeName: &cat}); err != nil { 253 » » » » log.WithError(err).Errorf(c, "Failed to create a rchival task.")
248 » » » » log.WithError(err).Errorf(c, "Failed to write na med mutations.") 254 » » » » return grpcutil.Internal
249 » » » » return nil, grpcutil.Internal
250 } 255 }
251 256
252 » » » return nil, nil 257 » » » return nil
253 » » }) 258 » » }, nil)
254 if err != nil { 259 if err != nil {
255 log.Fields{ 260 log.Fields{
256 log.ErrorKey: err, 261 log.ErrorKey: err,
257 }.Errorf(c, "Failed to register LogStream.") 262 }.Errorf(c, "Failed to register LogStream.")
258 return nil, err 263 return nil, err
259 } 264 }
260 } 265 }
261 266
262 return &logdog.RegisterStreamResponse{ 267 return &logdog.RegisterStreamResponse{
263 Id: string(ls.ID), 268 Id: string(ls.ID),
264 State: buildLogStreamState(ls, lst), 269 State: buildLogStreamState(ls, lst),
265 }, nil 270 }, nil
266 } 271 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698