| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package services | 5 package services |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "crypto/subtle" | 8 "crypto/subtle" |
| 9 "errors" | 9 "errors" |
| 10 "time" |
| 10 | 11 |
| 11 ds "github.com/luci/gae/service/datastore" | 12 ds "github.com/luci/gae/service/datastore" |
| 12 "github.com/luci/luci-go/appengine/logdog/coordinator" | 13 "github.com/luci/luci-go/appengine/logdog/coordinator" |
| 13 "github.com/luci/luci-go/appengine/logdog/coordinator/mutations" | 14 "github.com/luci/luci-go/appengine/logdog/coordinator/mutations" |
| 14 "github.com/luci/luci-go/appengine/tumble" | 15 "github.com/luci/luci-go/appengine/tumble" |
| 15 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" | 16 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
| 16 "github.com/luci/luci-go/common/clock" | 17 "github.com/luci/luci-go/common/clock" |
| 17 "github.com/luci/luci-go/common/grpcutil" | 18 "github.com/luci/luci-go/common/grpcutil" |
| 18 "github.com/luci/luci-go/common/logdog/types" | 19 "github.com/luci/luci-go/common/logdog/types" |
| 19 log "github.com/luci/luci-go/common/logging" | 20 log "github.com/luci/luci-go/common/logging" |
| (...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 90 } | 91 } |
| 91 switch { | 92 switch { |
| 92 case req.Desc.Prefix != string(prefix): | 93 case req.Desc.Prefix != string(prefix): |
| 93 return nil, grpcutil.Errf(codes.InvalidArgument, "Descriptor pre
fix does not match path (%s != %s)", | 94 return nil, grpcutil.Errf(codes.InvalidArgument, "Descriptor pre
fix does not match path (%s != %s)", |
| 94 req.Desc.Prefix, prefix) | 95 req.Desc.Prefix, prefix) |
| 95 case req.Desc.Name != string(name): | 96 case req.Desc.Name != string(name): |
| 96 return nil, grpcutil.Errf(codes.InvalidArgument, "Descriptor nam
e does not match path (%s != %s)", | 97 return nil, grpcutil.Errf(codes.InvalidArgument, "Descriptor nam
e does not match path (%s != %s)", |
| 97 req.Desc.Name, name) | 98 req.Desc.Name, name) |
| 98 } | 99 } |
| 99 | 100 |
| 101 // Load our config and archive expiration. |
| 102 _, cfg, err := coordinator.GetServices(c).Config(c) |
| 103 if err != nil { |
| 104 log.WithError(err).Errorf(c, "Failed to load configuration.") |
| 105 return nil, grpcutil.Internal |
| 106 } |
| 107 |
| 108 archiveDelayMax := cfg.Coordinator.ArchiveDelayMax.Duration() |
| 109 if archiveDelayMax < 0 { |
| 110 log.Fields{ |
| 111 "archiveDelayMax": archiveDelayMax, |
| 112 }.Errorf(c, "Must have positive maximum archive delay.") |
| 113 return nil, grpcutil.Internal |
| 114 } |
| 115 |
| 100 // Already registered? (Non-transactional). | 116 // Already registered? (Non-transactional). |
| 101 ls := coordinator.LogStreamFromPath(path) | 117 ls := coordinator.LogStreamFromPath(path) |
| 102 switch err := ds.Get(c).Get(ls); err { | 118 switch err := ds.Get(c).Get(ls); err { |
| 103 case nil: | 119 case nil: |
| 104 // We want this to be idempotent, so validate that it matches th
e current | 120 // We want this to be idempotent, so validate that it matches th
e current |
| 105 // configuration and return accordingly. | 121 // configuration and return accordingly. |
| 106 if err := matchesLogStream(req, ls); err != nil { | 122 if err := matchesLogStream(req, ls); err != nil { |
| 107 return nil, grpcutil.Errf(codes.AlreadyExists, "Log stre
am is already incompatibly registered: %v", err) | 123 return nil, grpcutil.Errf(codes.AlreadyExists, "Log stre
am is already incompatibly registered: %v", err) |
| 108 } | 124 } |
| 109 | 125 |
| 110 case ds.ErrNoSuchEntity: | 126 case ds.ErrNoSuchEntity: |
| 111 // The registration is valid, so retain it. | 127 // The registration is valid, so retain it. |
| 112 » » if err := tumble.RunMutation(c, ®isterStreamMutation{ls, req}
); err != nil { | 128 » » err = tumble.RunMutation(c, ®isterStreamMutation{ |
| 129 » » » LogStream: ls, |
| 130 » » » req: req, |
| 131 » » » archiveDelay: archiveDelayMax, |
| 132 » » }) |
| 133 » » if err != nil { |
| 113 log.Fields{ | 134 log.Fields{ |
| 114 log.ErrorKey: err, | 135 log.ErrorKey: err, |
| 115 }.Errorf(c, "Failed to register LogStream.") | 136 }.Errorf(c, "Failed to register LogStream.") |
| 116 return nil, filterError(err) | 137 return nil, filterError(err) |
| 117 } | 138 } |
| 118 | 139 |
| 119 default: | 140 default: |
| 120 log.WithError(err).Errorf(c, "Failed to check for log stream.") | 141 log.WithError(err).Errorf(c, "Failed to check for log stream.") |
| 121 return nil, grpcutil.Internal | 142 return nil, grpcutil.Internal |
| 122 } | 143 } |
| (...skipping 11 matching lines...) Expand all Loading... |
| 134 case grpc.Code(err) == codes.Unknown: | 155 case grpc.Code(err) == codes.Unknown: |
| 135 return grpcutil.Internal | 156 return grpcutil.Internal |
| 136 default: | 157 default: |
| 137 return err | 158 return err |
| 138 } | 159 } |
| 139 } | 160 } |
| 140 | 161 |
| 141 type registerStreamMutation struct { | 162 type registerStreamMutation struct { |
| 142 *coordinator.LogStream | 163 *coordinator.LogStream |
| 143 | 164 |
| 144 » req *logdog.RegisterStreamRequest | 165 » req *logdog.RegisterStreamRequest |
| 166 » archiveDelay time.Duration |
| 145 } | 167 } |
| 146 | 168 |
| 147 func (m registerStreamMutation) RollForward(c context.Context) ([]tumble.Mutatio
n, error) { | 169 func (m registerStreamMutation) RollForward(c context.Context) ([]tumble.Mutatio
n, error) { |
| 148 di := ds.Get(c) | 170 di := ds.Get(c) |
| 149 | 171 |
| 150 // Already registered? (Transactional). | 172 // Already registered? (Transactional). |
| 151 switch err := di.Get(m.LogStream); err { | 173 switch err := di.Get(m.LogStream); err { |
| 152 case ds.ErrNoSuchEntity: | 174 case ds.ErrNoSuchEntity: |
| 153 break | 175 break |
| 154 | 176 |
| (...skipping 27 matching lines...) Expand all Loading... |
| 182 m.Created = ds.RoundTime(clock.Now(c).UTC()) | 204 m.Created = ds.RoundTime(clock.Now(c).UTC()) |
| 183 m.TerminalIndex = -1 | 205 m.TerminalIndex = -1 |
| 184 | 206 |
| 185 if err := di.Put(m.LogStream); err != nil { | 207 if err := di.Put(m.LogStream); err != nil { |
| 186 log.Fields{ | 208 log.Fields{ |
| 187 log.ErrorKey: err, | 209 log.ErrorKey: err, |
| 188 }.Errorf(c, "Failed to Put() LogStream.") | 210 }.Errorf(c, "Failed to Put() LogStream.") |
| 189 return nil, grpcutil.Internal | 211 return nil, grpcutil.Internal |
| 190 } | 212 } |
| 191 | 213 |
| 214 // Add a named delayed mutation to archive this stream if it's not archi
ved |
| 215 // yet. We will cancel this in terminateStream once we dispatch an immed
iate |
| 216 // archival task. |
| 217 archiveExpiredMutation := mutations.CreateArchiveTask{ |
| 218 Path: m.Path(), |
| 219 Expiration: clock.Now(c).Add(m.archiveDelay), |
| 220 } |
| 221 aeParent, aeName := archiveExpiredMutation.TaskName(di) |
| 222 err := tumble.PutNamedMutations(c, aeParent, map[string]tumble.Mutation{ |
| 223 aeName: &archiveExpiredMutation, |
| 224 }) |
| 225 if err != nil { |
| 226 log.WithError(err).Errorf(c, "Failed to load named mutations.") |
| 227 return nil, grpcutil.Internal |
| 228 } |
| 229 |
| 192 return []tumble.Mutation{ | 230 return []tumble.Mutation{ |
| 193 &mutations.PutHierarchyMutation{ | 231 &mutations.PutHierarchyMutation{ |
| 194 Path: m.Path(), | 232 Path: m.Path(), |
| 195 }, | 233 }, |
| 196 }, nil | 234 }, nil |
| 197 } | 235 } |
| 198 | 236 |
| 199 func (m registerStreamMutation) Root(c context.Context) *ds.Key { | 237 func (m registerStreamMutation) Root(c context.Context) *ds.Key { |
| 200 return ds.Get(c).KeyForObj(m.LogStream) | 238 return ds.Get(c).KeyForObj(m.LogStream) |
| 201 } | 239 } |
| OLD | NEW |