| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. | 1 // Copyright 2015 The LUCI Authors. |
| 2 // | 2 // |
| 3 // Licensed under the Apache License, Version 2.0 (the "License"); | 3 // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 // you may not use this file except in compliance with the License. | 4 // you may not use this file except in compliance with the License. |
| 5 // You may obtain a copy of the License at | 5 // You may obtain a copy of the License at |
| 6 // | 6 // |
| 7 // http://www.apache.org/licenses/LICENSE-2.0 | 7 // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 // | 8 // |
| 9 // Unless required by applicable law or agreed to in writing, software | 9 // Unless required by applicable law or agreed to in writing, software |
| 10 // distributed under the License is distributed on an "AS IS" BASIS, | 10 // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 // See the License for the specific language governing permissions and | 12 // See the License for the specific language governing permissions and |
| 13 // limitations under the License. | 13 // limitations under the License. |
| 14 | 14 |
| 15 package services | 15 package services |
| 16 | 16 |
| 17 import ( | 17 import ( |
| 18 "crypto/subtle" | 18 "crypto/subtle" |
| 19 | 19 |
| 20 "github.com/golang/protobuf/ptypes/empty" | 20 "github.com/golang/protobuf/ptypes/empty" |
| 21 |
| 21 ds "github.com/luci/gae/service/datastore" | 22 ds "github.com/luci/gae/service/datastore" |
| 23 |
| 22 "github.com/luci/luci-go/common/clock" | 24 "github.com/luci/luci-go/common/clock" |
| 23 log "github.com/luci/luci-go/common/logging" | 25 log "github.com/luci/luci-go/common/logging" |
| 24 "github.com/luci/luci-go/common/proto/google" | 26 "github.com/luci/luci-go/common/proto/google" |
| 25 "github.com/luci/luci-go/grpc/grpcutil" | 27 "github.com/luci/luci-go/grpc/grpcutil" |
| 26 "github.com/luci/luci-go/logdog/api/config/svcconfig" | 28 "github.com/luci/luci-go/logdog/api/config/svcconfig" |
| 27 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" | 29 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" |
| 28 "github.com/luci/luci-go/logdog/appengine/coordinator" | 30 "github.com/luci/luci-go/logdog/appengine/coordinator" |
| 29 "github.com/luci/luci-go/logdog/appengine/coordinator/config" | 31 "github.com/luci/luci-go/logdog/appengine/coordinator/config" |
| 30 "github.com/luci/luci-go/logdog/appengine/coordinator/endpoints" | 32 "github.com/luci/luci-go/logdog/appengine/coordinator/endpoints" |
| 31 "github.com/luci/luci-go/logdog/appengine/coordinator/mutations" | 33 "github.com/luci/luci-go/logdog/appengine/coordinator/mutations" |
| 34 "github.com/luci/luci-go/logdog/appengine/coordinator/tasks" |
| 32 "github.com/luci/luci-go/tumble" | 35 "github.com/luci/luci-go/tumble" |
| 36 |
| 33 "golang.org/x/net/context" | 37 "golang.org/x/net/context" |
| 34 "google.golang.org/grpc/codes" | 38 "google.golang.org/grpc/codes" |
| 35 ) | 39 ) |
| 36 | 40 |
| 37 // TerminateStream is an idempotent stream state terminate operation. | 41 // TerminateStream is an idempotent stream state terminate operation. |
| 38 func (s *server) TerminateStream(c context.Context, req *logdog.TerminateStreamR
equest) (*empty.Empty, error) { | 42 func (s *server) TerminateStream(c context.Context, req *logdog.TerminateStreamR
equest) (*empty.Empty, error) { |
| 39 log.Fields{ | 43 log.Fields{ |
| 40 "project": req.Project, | 44 "project": req.Project, |
| 41 "id": req.Id, | 45 "id": req.Id, |
| 42 "terminalIndex": req.TerminalIndex, | 46 "terminalIndex": req.TerminalIndex, |
| (...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 109 lst.TerminalIndex = req.TerminalIndex | 113 lst.TerminalIndex = req.TerminalIndex |
| 110 lst.TerminatedTime = now | 114 lst.TerminatedTime = now |
| 111 | 115 |
| 112 if err := ds.Put(c, lst); err != nil { | 116 if err := ds.Put(c, lst); err != nil { |
| 113 log.Fields{ | 117 log.Fields{ |
| 114 log.ErrorKey: err, | 118 log.ErrorKey: err, |
| 115 }.Errorf(c, "Failed to Put() LogStream.") | 119 }.Errorf(c, "Failed to Put() LogStream.") |
| 116 return grpcutil.Internal | 120 return grpcutil.Internal |
| 117 } | 121 } |
| 118 | 122 |
| 119 » » » // Replace the pessimistic archive expiration mutation s
cheduled in | 123 » » » // Replace the pessimistic archive expiration task sched
uled in |
| 120 » » » // RegisterStream with an optimistic archival mutation. | 124 » » » // RegisterStream with an optimistic archival task. |
| 121 » » » cat := mutations.CreateArchiveTask{ | 125 » » » if err := tasks.CreateArchivalTask(c, id, logdog.Archive
DispatchTask_TERMINATED, |
| 122 » » » » ID: id, | 126 » » » » params.SettleDelay, params); err != nil { |
| 123 | 127 |
| 124 » » » » // Optimistic parameters. | 128 » » » » log.WithError(err).Errorf(c, "Failed to create t
erminated archival task.") |
| 125 » » » » SettleDelay: params.SettleDelay, | 129 » » » » return grpcutil.Internal |
| 126 » » » » CompletePeriod: params.CompletePeriod, | |
| 127 | |
| 128 » » » » // Schedule this mutation to execute after our s
ettle delay. | |
| 129 » » » » Expiration: now.Add(params.SettleDelay), | |
| 130 } | 130 } |
| 131 | 131 |
| 132 » » » aeParent, aeName := ds.KeyForObj(c, lst), cat.TaskName(c
) | 132 » » » if err := tasks.DeleteArchiveStreamExpiredTask(c, id); e
rr != nil { |
| 133 » » » if err := tumble.PutNamedMutations(c, aeParent, map[stri
ng]tumble.Mutation{aeName: &cat}); err != nil { | 133 » » » » // If we can't delete this task, it will just ru
n, notice that the |
| 134 » » » » log.WithError(err).Errorf(c, "Failed to replace
archive expiration mutation.") | 134 » » » » // stream is archived, and quit. No big deal. |
| 135 » » » » return grpcutil.Internal | 135 » » » » log.WithError(err).Warningf(c, "(Non-fatal) Fail
ed to delete expired archival task.") |
| 136 » » » } |
| 137 |
| 138 » » » // In case the stream was *registered* with Tumble, but
is now being |
| 139 » » » // processed with task queue code, clear the Tumble arch
ival mutation. |
| 140 » » » // |
| 141 » » » // TODO(dnj): Remove this once Tumble is drained. |
| 142 » » » archiveMutation := mutations.CreateArchiveTask{ID: id} |
| 143 » » » if err := tumble.CancelNamedMutations(c, archiveMutation
.Root(c), archiveMutation.TaskName(c)); err != nil { |
| 144 » » » » log.WithError(err).Warningf(c, "(Non-fatal) Fail
ed to cancel archive mutation.") |
| 136 } | 145 } |
| 137 | 146 |
| 138 log.Fields{ | 147 log.Fields{ |
| 139 "terminalIndex": lst.TerminalIndex, | 148 "terminalIndex": lst.TerminalIndex, |
| 140 » » » » "settleDelay": cat.SettleDelay, | 149 » » » » "settleDelay": params.SettleDelay, |
| 141 » » » » "completePeriod": cat.CompletePeriod, | 150 » » » » "completePeriod": params.CompletePeriod, |
| 142 » » » » "scheduledAt": cat.Expiration, | 151 » » » }.Debugf(c, "Terminal index was set, and archival task w
as scheduled.") |
| 143 » » » }.Debugf(c, "Terminal index was set, and archival mutati
on was scheduled.") | |
| 144 return nil | 152 return nil |
| 145 } | 153 } |
| 146 }, nil) | 154 }, nil) |
| 147 if err != nil { | 155 if err != nil { |
| 148 log.Fields{ | 156 log.Fields{ |
| 149 log.ErrorKey: err, | 157 log.ErrorKey: err, |
| 150 }.Errorf(c, "Failed to update LogStream.") | 158 }.Errorf(c, "Failed to update LogStream.") |
| 151 return nil, err | 159 return nil, err |
| 152 } | 160 } |
| 153 | 161 |
| 154 return &empty.Empty{}, nil | 162 return &empty.Empty{}, nil |
| 155 } | 163 } |
| 156 | 164 |
| 157 func standardArchivalParams(cfg *config.Config, pcfg *svcconfig.ProjectConfig) *
coordinator.ArchivalParams { | 165 func standardArchivalParams(cfg *config.Config, pcfg *svcconfig.ProjectConfig) *
coordinator.ArchivalParams { |
| 158 return &coordinator.ArchivalParams{ | 166 return &coordinator.ArchivalParams{ |
| 159 SettleDelay: google.DurationFromProto(cfg.Coordinator.Archive
SettleDelay), | 167 SettleDelay: google.DurationFromProto(cfg.Coordinator.Archive
SettleDelay), |
| 160 CompletePeriod: endpoints.MinDuration(cfg.Coordinator.ArchiveDel
ayMax, pcfg.MaxStreamAge), | 168 CompletePeriod: endpoints.MinDuration(cfg.Coordinator.ArchiveDel
ayMax, pcfg.MaxStreamAge), |
| 161 } | 169 } |
| 162 } | 170 } |
| OLD | NEW |