| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. | 1 // Copyright 2015 The LUCI Authors. |
| 2 // | 2 // |
| 3 // Licensed under the Apache License, Version 2.0 (the "License"); | 3 // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 // you may not use this file except in compliance with the License. | 4 // you may not use this file except in compliance with the License. |
| 5 // You may obtain a copy of the License at | 5 // You may obtain a copy of the License at |
| 6 // | 6 // |
| 7 // http://www.apache.org/licenses/LICENSE-2.0 | 7 // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 // | 8 // |
| 9 // Unless required by applicable law or agreed to in writing, software | 9 // Unless required by applicable law or agreed to in writing, software |
| 10 // distributed under the License is distributed on an "AS IS" BASIS, | 10 // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 // See the License for the specific language governing permissions and | 12 // See the License for the specific language governing permissions and |
| 13 // limitations under the License. | 13 // limitations under the License. |
| 14 | 14 |
| 15 package services | 15 package services |
| 16 | 16 |
| 17 import ( | 17 import ( |
| 18 "crypto/subtle" | 18 "crypto/subtle" |
| 19 "time" |
| 19 | 20 |
| 20 "github.com/golang/protobuf/proto" | |
| 21 ds "github.com/luci/gae/service/datastore" | |
| 22 "github.com/luci/luci-go/common/clock" | 21 "github.com/luci/luci-go/common/clock" |
| 23 log "github.com/luci/luci-go/common/logging" | 22 log "github.com/luci/luci-go/common/logging" |
| 24 "github.com/luci/luci-go/grpc/grpcutil" | 23 "github.com/luci/luci-go/grpc/grpcutil" |
| 25 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" | 24 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" |
| 26 "github.com/luci/luci-go/logdog/api/logpb" | 25 "github.com/luci/luci-go/logdog/api/logpb" |
| 27 "github.com/luci/luci-go/logdog/appengine/coordinator" | 26 "github.com/luci/luci-go/logdog/appengine/coordinator" |
| 28 "github.com/luci/luci-go/logdog/appengine/coordinator/endpoints" | 27 "github.com/luci/luci-go/logdog/appengine/coordinator/endpoints" |
| 29 » "github.com/luci/luci-go/logdog/appengine/coordinator/mutations" | 28 » "github.com/luci/luci-go/logdog/appengine/coordinator/tasks" |
| 30 "github.com/luci/luci-go/logdog/common/types" | 29 "github.com/luci/luci-go/logdog/common/types" |
| 31 » "github.com/luci/luci-go/tumble" | 30 |
| 31 » ds "github.com/luci/gae/service/datastore" |
| 32 |
| 33 » "github.com/golang/protobuf/proto" |
| 32 "golang.org/x/net/context" | 34 "golang.org/x/net/context" |
| 33 "google.golang.org/grpc/codes" | 35 "google.golang.org/grpc/codes" |
| 34 ) | 36 ) |
| 35 | 37 |
| 36 func buildLogStreamState(ls *coordinator.LogStream, lst *coordinator.LogStreamSt
ate) *logdog.LogStreamState { | 38 func buildLogStreamState(ls *coordinator.LogStream, lst *coordinator.LogStreamSt
ate) *logdog.LogStreamState { |
| 37 st := logdog.LogStreamState{ | 39 st := logdog.LogStreamState{ |
| 38 ProtoVersion: ls.ProtoVersion, | 40 ProtoVersion: ls.ProtoVersion, |
| 39 Secret: lst.Secret, | 41 Secret: lst.Secret, |
| 40 TerminalIndex: lst.TerminalIndex, | 42 TerminalIndex: lst.TerminalIndex, |
| 41 Archived: lst.ArchivalState().Archived(), | 43 Archived: lst.ArchivalState().Archived(), |
| (...skipping 104 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 146 ls := &coordinator.LogStream{ID: logStreamID} | 148 ls := &coordinator.LogStream{ID: logStreamID} |
| 147 lst := ls.State(c) | 149 lst := ls.State(c) |
| 148 | 150 |
| 149 if err := ds.Get(c, ls, lst); err != nil { | 151 if err := ds.Get(c, ls, lst); err != nil { |
| 150 if !anyNoSuchEntity(err) { | 152 if !anyNoSuchEntity(err) { |
| 151 log.WithError(err).Errorf(c, "Failed to check for log st
ream.") | 153 log.WithError(err).Errorf(c, "Failed to check for log st
ream.") |
| 152 return nil, err | 154 return nil, err |
| 153 } | 155 } |
| 154 | 156 |
| 155 // The stream does not exist. Proceed with transactional registr
ation. | 157 // The stream does not exist. Proceed with transactional registr
ation. |
| 156 » » lstKey := ds.KeyForObj(c, lst) | 158 » » err = ds.RunInTransaction(c, func(c context.Context) error { |
| 157 » » err = tumble.RunUnbuffered(c, lstKey, func(c context.Context) ([
]tumble.Mutation, error) { | |
| 158 // Load our state and stream (transactional). | 159 // Load our state and stream (transactional). |
| 159 switch err := ds.Get(c, ls, lst); { | 160 switch err := ds.Get(c, ls, lst); { |
| 160 case err == nil: | 161 case err == nil: |
| 161 // The stream is already registered. | 162 // The stream is already registered. |
| 162 » » » » return nil, nil | 163 » » » » return nil |
| 163 | 164 |
| 164 case !anyNoSuchEntity(err): | 165 case !anyNoSuchEntity(err): |
| 165 log.WithError(err).Errorf(c, "Failed to check fo
r stream registration (transactional).") | 166 log.WithError(err).Errorf(c, "Failed to check fo
r stream registration (transactional).") |
| 166 » » » » return nil, err | 167 » » » » return err |
| 167 } | 168 } |
| 168 | 169 |
| 169 // The stream is not yet registered. | 170 // The stream is not yet registered. |
| 170 log.Infof(c, "Registering new log stream.") | 171 log.Infof(c, "Registering new log stream.") |
| 171 | 172 |
| 172 // Construct our LogStreamState. | 173 // Construct our LogStreamState. |
| 173 now := clock.Now(c).UTC() | 174 now := clock.Now(c).UTC() |
| 174 lst.Created = now | 175 lst.Created = now |
| 175 lst.Updated = now | 176 lst.Updated = now |
| 176 lst.Secret = pfx.Secret // Copy Prefix Secret to reduce
datastore Gets. | 177 lst.Secret = pfx.Secret // Copy Prefix Secret to reduce
datastore Gets. |
| 177 | 178 |
| 178 // Construct our LogStream. | 179 // Construct our LogStream. |
| 179 ls.Created = now | 180 ls.Created = now |
| 180 ls.ProtoVersion = req.ProtoVersion | 181 ls.ProtoVersion = req.ProtoVersion |
| 181 | 182 |
| 182 if err := ls.LoadDescriptor(&desc); err != nil { | 183 if err := ls.LoadDescriptor(&desc); err != nil { |
| 183 log.Fields{ | 184 log.Fields{ |
| 184 log.ErrorKey: err, | 185 log.ErrorKey: err, |
| 185 }.Errorf(c, "Failed to load descriptor into LogS
tream.") | 186 }.Errorf(c, "Failed to load descriptor into LogS
tream.") |
| 186 » » » » return nil, grpcutil.Errf(codes.InvalidArgument,
"Failed to load descriptor.") | 187 » » » » return grpcutil.Errf(codes.InvalidArgument, "Fai
led to load descriptor.") |
| 187 } | 188 } |
| 188 | 189 |
| 189 // If our registration request included a terminal index
, terminate the | 190 // If our registration request included a terminal index
, terminate the |
| 190 // log stream state as well. | 191 // log stream state as well. |
| 191 if req.TerminalIndex >= 0 { | 192 if req.TerminalIndex >= 0 { |
| 192 log.Fields{ | 193 log.Fields{ |
| 193 "terminalIndex": req.TerminalIndex, | 194 "terminalIndex": req.TerminalIndex, |
| 194 }.Debugf(c, "Registration request included termi
nal index.") | 195 }.Debugf(c, "Registration request included termi
nal index.") |
| 195 | 196 |
| 196 lst.TerminalIndex = req.TerminalIndex | 197 lst.TerminalIndex = req.TerminalIndex |
| 197 lst.TerminatedTime = now | 198 lst.TerminatedTime = now |
| 198 } else { | 199 } else { |
| 199 lst.TerminalIndex = -1 | 200 lst.TerminalIndex = -1 |
| 200 } | 201 } |
| 201 | 202 |
| 202 if err := ds.Put(c, ls, lst); err != nil { | 203 if err := ds.Put(c, ls, lst); err != nil { |
| 203 log.Fields{ | 204 log.Fields{ |
| 204 log.ErrorKey: err, | 205 log.ErrorKey: err, |
| 205 }.Errorf(c, "Failed to Put LogStream.") | 206 }.Errorf(c, "Failed to Put LogStream.") |
| 206 » » » » return nil, grpcutil.Internal | 207 » » » » return grpcutil.Internal |
| 207 } | 208 } |
| 208 | 209 |
| 209 » » » // Add a named delayed mutation to archive this stream i
f it's not archived | 210 » » » // Add a named delayed task queue task to archive this s
tream if it's not |
| 210 » » » // yet. | 211 » » » // archived yet. |
| 211 // | 212 // |
| 212 // If the registration did not include a terminal index,
this will be our | 213 // If the registration did not include a terminal index,
this will be our |
| 213 » » » // pessimistic archival request, scheduled on registrati
on to catch streams | 214 » » » // pessimistic archival request, scheduled on registrati
on to catch |
| 214 » » » // that don't expire. This mutation will be replaced by
the optimistic | 215 » » » // streams that don't expire. This task will be replaced
by the optimistic |
| 215 » » » // archival mutation when/if the stream is terminated vi
a TerminateStream. | 216 » » » // archival task when/if the stream is terminated via Te
rminateStream. |
| 216 // | 217 // |
| 217 // If the registration included a terminal index, apply
our standard | 218 // If the registration included a terminal index, apply
our standard |
| 218 // parameters to the archival. Since TerminateStream wil
l not be called, | 219 // parameters to the archival. Since TerminateStream wil
l not be called, |
| 219 // this will be our formal optimistic archival task. | 220 // this will be our formal optimistic archival task. |
| 220 params := standardArchivalParams(cfg, pcfg) | 221 params := standardArchivalParams(cfg, pcfg) |
| 221 » » » cat := mutations.CreateArchiveTask{ | 222 |
| 222 » » » » ID: ls.ID, | 223 » » » var ( |
| 223 » » » } | 224 » » » » delay time.Duration |
| 225 » » » » archivalTag logdog.ArchiveDispatchTask_Tag |
| 226 » » » ) |
| 224 if req.TerminalIndex < 0 { | 227 if req.TerminalIndex < 0 { |
| 225 // No terminal index, schedule pessimistic clean
up archival. | 228 // No terminal index, schedule pessimistic clean
up archival. |
| 226 » » » » cat.Expiration = now.Add(params.CompletePeriod) | 229 » » » » delay = params.CompletePeriod |
| 230 » » » » archivalTag = logdog.ArchiveDispatchTask_EXPIRED |
| 231 |
| 232 » » » » // For cleanup, we instruct the archival to not
wait any longer or |
| 233 » » » » // allow the stream time extra time to become co
mplete (archive as-is). |
| 234 » » » » params.SettleDelay = 0 |
| 235 » » » » params.CompletePeriod = 0 |
| 227 | 236 |
| 228 log.Fields{ | 237 log.Fields{ |
| 229 » » » » » "deadline": cat.Expiration, | 238 » » » » » "deadline": delay, |
| 230 » » » » }.Debugf(c, "Scheduling cleanup archival mutatio
n.") | 239 » » » » }.Debugf(c, "Scheduling cleanup archival task.") |
| 231 } else { | 240 } else { |
| 232 » » » » // Terminal index, schedule optimistic archival
(mirrors TerminateStream). | 241 » » » » // Schedule this task to execute after our settl
e delay. |
| 233 » » » » cat.SettleDelay = params.SettleDelay | 242 » » » » delay = params.SettleDelay |
| 234 » » » » cat.CompletePeriod = params.CompletePeriod | 243 » » » » archivalTag = logdog.ArchiveDispatchTask_TERMINA
TED |
| 235 | |
| 236 » » » » // Schedule this mutation to execute after our s
ettle delay. | |
| 237 » » » » cat.Expiration = now.Add(params.SettleDelay) | |
| 238 | 244 |
| 239 log.Fields{ | 245 log.Fields{ |
| 240 » » » » » "settleDelay": cat.SettleDelay, | 246 » » » » » "settleDelay": params.SettleDelay, |
| 241 » » » » » "completePeriod": cat.CompletePeriod, | 247 » » » » » "completePeriod": params.CompletePeriod, |
| 242 » » » » » "scheduledAt": cat.Expiration, | 248 » » » » » "scheduledAt": delay, |
| 243 » » » » }.Debugf(c, "Scheduling archival mutation.") | 249 » » » » }.Debugf(c, "Scheduling archival task.") |
| 244 } | 250 } |
| 245 | 251 |
| 246 » » » aeName := cat.TaskName(c) | 252 » » » if err := tasks.CreateArchivalTask(c, logStreamID, archi
valTag, delay, params); err != nil { |
| 247 » » » if err := tumble.PutNamedMutations(c, lstKey, map[string
]tumble.Mutation{aeName: &cat}); err != nil { | 253 » » » » log.WithError(err).Errorf(c, "Failed to create a
rchival task.") |
| 248 » » » » log.WithError(err).Errorf(c, "Failed to write na
med mutations.") | 254 » » » » return grpcutil.Internal |
| 249 » » » » return nil, grpcutil.Internal | |
| 250 } | 255 } |
| 251 | 256 |
| 252 » » » return nil, nil | 257 » » » return nil |
| 253 » » }) | 258 » » }, nil) |
| 254 if err != nil { | 259 if err != nil { |
| 255 log.Fields{ | 260 log.Fields{ |
| 256 log.ErrorKey: err, | 261 log.ErrorKey: err, |
| 257 }.Errorf(c, "Failed to register LogStream.") | 262 }.Errorf(c, "Failed to register LogStream.") |
| 258 return nil, err | 263 return nil, err |
| 259 } | 264 } |
| 260 } | 265 } |
| 261 | 266 |
| 262 return &logdog.RegisterStreamResponse{ | 267 return &logdog.RegisterStreamResponse{ |
| 263 Id: string(ls.ID), | 268 Id: string(ls.ID), |
| 264 State: buildLogStreamState(ls, lst), | 269 State: buildLogStreamState(ls, lst), |
| 265 }, nil | 270 }, nil |
| 266 } | 271 } |
| OLD | NEW |