| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2017 The LUCI Authors. |
| 2 // |
| 3 // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 // you may not use this file except in compliance with the License. |
| 5 // You may obtain a copy of the License at |
| 6 // |
| 7 // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 // |
| 9 // Unless required by applicable law or agreed to in writing, software |
| 10 // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 // See the License for the specific language governing permissions and |
| 13 // limitations under the License. |
| 14 |
| 15 package tasks |
| 16 |
| 17 import ( |
| 18 "fmt" |
| 19 "time" |
| 20 |
| 21 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" |
| 22 "github.com/luci/luci-go/logdog/appengine/coordinator" |
| 23 |
| 24 "github.com/luci/luci-go/appengine/tq" |
| 25 "github.com/luci/luci-go/common/errors" |
| 26 log "github.com/luci/luci-go/common/logging" |
| 27 "github.com/luci/luci-go/common/proto/google" |
| 28 "github.com/luci/luci-go/common/retry/transient" |
| 29 |
| 30 ds "github.com/luci/gae/service/datastore" |
| 31 "github.com/luci/gae/service/info" |
| 32 |
| 33 "github.com/golang/protobuf/proto" |
| 34 "golang.org/x/net/context" |
| 35 ) |
| 36 |
| 37 // emptyArchiveDispatchTask is a singleton empty ArchiveDispatchTask. |
| 38 // |
| 39 // This is used because tq.Task are typed to their Payload, so every task needs |
| 40 // a Payload. This is the default Payload value, and is overridden when the |
| 41 // Payload needs to actually be populated for publishing. |
| 42 var emptyArchiveDispatchTask = &logdog.ArchiveDispatchTask{} |
| 43 |
| 44 // CreateArchivalTask adds a task to the task queue to initiate |
| 45 // archival on an stream. The task will be delayed by "delay". |
| 46 func CreateArchivalTask(c context.Context, id coordinator.HashID, tag logdog.Arc
hiveDispatchTask_Tag, |
| 47 delay time.Duration, params *coordinator.ArchivalParams) error { |
| 48 |
| 49 task := makeArchivalTask(c, id, tag) |
| 50 task.Payload = &logdog.ArchiveDispatchTask{ |
| 51 Id: string(id), |
| 52 Tag: tag, |
| 53 SettleDelay: google.NewDuration(params.SettleDelay), |
| 54 CompletePeriod: google.NewDuration(params.CompletePeriod), |
| 55 } |
| 56 task.Delay = delay |
| 57 |
| 58 // Add the task outside of a transaction, since it's a named task. |
| 59 // |
| 60 // This introduces two possibilities: |
| 61 // - During transaction retries, the task may be added multiple times. T
his |
| 62 // is fine, since it will naturally deduplicate. |
| 63 // - If the overall transaction fails, the task may be added for a log s
tream |
| 64 // that never exists. We handle this in the handler by warning (but no
t |
| 65 // failing) on non-existent log streams. |
| 66 if err := taskDispatcher.AddTask(ds.WithoutTransaction(c), task); err !=
nil { |
| 67 log.Fields{ |
| 68 log.ErrorKey: err, |
| 69 "taskName": task.Name(), |
| 70 }.Errorf(c, "Failed to add task to task queue.") |
| 71 return errors.Annotate(err, "failed to add task to task queue").
Err() |
| 72 } |
| 73 |
| 74 log.Debugf(c, "Successfully created archival task: %q", task.Name()) |
| 75 return nil |
| 76 } |
| 77 |
| 78 // DeleteArchiveStreamExpiredTask deletes stream EXPIRED task associated with |
| 79 // id. If the task has already been deleted, this will do nothing. |
| 80 func DeleteArchiveStreamExpiredTask(c context.Context, id coordinator.HashID) er
ror { |
| 81 task := makeArchivalTask(c, id, logdog.ArchiveDispatchTask_EXPIRED) |
| 82 |
| 83 log.Debugf(c, "Deleting archival task: %q", task.Name()) |
| 84 if err := taskDispatcher.DeleteTask(c, task); err != nil { |
| 85 log.Fields{ |
| 86 log.ErrorKey: err, |
| 87 "taskName": task.Name(), |
| 88 }.Errorf(c, "Failed to delete expired archival task.") |
| 89 return errors.Annotate(err, "failed to delete expired archival t
ask").Err() |
| 90 } |
| 91 |
| 92 log.Debugf(c, "Successfully removed EXPIRED archival task: %q", task.Nam
e()) |
| 93 return nil |
| 94 } |
| 95 |
| 96 // handleArchiveDispatchTask is a tq.Handler for an ArchiveDispatchTask. |
| 97 // |
| 98 // This task is associated with a log stream and some archival parameters. It |
| 99 // will verify that the log stream hasn't had archival dispatched yet. If it |
| 100 // has, the task will terminate without further operation. |
| 101 // |
| 102 // For streams that haven't been archived, this task will transactionally |
| 103 // dispatch an archival task to the Archivist fleet and update the stream's |
| 104 // status. |
| 105 func handleArchiveDispatchTask(c context.Context, payload proto.Message, execCou
nt int) error { |
| 106 adt, ok := payload.(*logdog.ArchiveDispatchTask) |
| 107 if !ok { |
| 108 return errors.Reason("unexpected message type %T", payload).Err(
) |
| 109 } |
| 110 |
| 111 log.Infof(c, "Handling archival for %q task (#%d) in namespace %q: %q", |
| 112 adt.Tag, execCount, info.GetNamespace(c), adt.Id) |
| 113 |
| 114 stream := coordinator.LogStream{ID: coordinator.HashID(adt.Id)} |
| 115 state := stream.State(c) |
| 116 |
| 117 // Check if we're already archived (non-transactional). |
| 118 if err := ds.Get(c, state); err != nil { |
| 119 if err == ds.ErrNoSuchEntity { |
| 120 log.Warningf(c, "Log stream no longer exists.") |
| 121 return nil |
| 122 } |
| 123 |
| 124 log.WithError(err).Errorf(c, "Failed to load archival log stream
.") |
| 125 return errors.Annotate(err, "failed to load archival log stream"
).Tag(transient.Tag).Err() |
| 126 } |
| 127 if state.ArchivalState() != coordinator.NotArchived { |
| 128 log.Infof(c, "Log stream archival is already tasked.") |
| 129 return nil |
| 130 } |
| 131 |
| 132 // Get our archival publisher. |
| 133 svc := coordinator.GetServices(c) |
| 134 ap, err := svc.ArchivalPublisher(c) |
| 135 if err != nil { |
| 136 log.WithError(err).Errorf(c, "Failed to get archival publisher."
) |
| 137 return errors.Annotate(err, "failed to get archival publisher").
Tag(transient.Tag).Err() |
| 138 } |
| 139 defer func() { |
| 140 if err := ap.Close(); err != nil { |
| 141 log.WithError(err).Warningf(c, "Failed to close archival
publisher.") |
| 142 } |
| 143 }() |
| 144 |
| 145 params := coordinator.ArchivalParams{ |
| 146 RequestID: info.RequestID(c), |
| 147 SettleDelay: google.DurationFromProto(adt.SettleDelay), |
| 148 CompletePeriod: google.DurationFromProto(adt.CompletePeriod), |
| 149 } |
| 150 |
| 151 err = ds.RunInTransaction(c, func(c context.Context) error { |
| 152 // Check if we're already archived (transactional). |
| 153 if err := ds.Get(c, state); err != nil { |
| 154 if err == ds.ErrNoSuchEntity { |
| 155 log.Warningf(c, "(Transactional) Log stream no l
onger exists.") |
| 156 return nil |
| 157 } |
| 158 |
| 159 log.WithError(err).Errorf(c, "(Transactional) Failed to
load archival log stream.") |
| 160 return errors.Annotate(err, "failed to load archival str
eam").Err() |
| 161 } |
| 162 if state.ArchivalState() != coordinator.NotArchived { |
| 163 log.Infof(c, "(Transactional) Log stream archival is alr
eady tasked.") |
| 164 return nil |
| 165 } |
| 166 |
| 167 if err = params.PublishTask(c, ap, state); err != nil { |
| 168 if err == coordinator.ErrArchiveTasked { |
| 169 log.Warningf(c, "Archival already tasked, skippi
ng.") |
| 170 return nil |
| 171 } |
| 172 |
| 173 log.WithError(err).Errorf(c, "Failed to publish archival
task.") |
| 174 return errors.Annotate(err, "failed to publish archival
task").Err() |
| 175 } |
| 176 |
| 177 if err := ds.Put(c, state); err != nil { |
| 178 log.WithError(err).Errorf(c, "Failed to update datastore
.") |
| 179 return errors.Annotate(err, "failed to update datastore"
).Err() |
| 180 } |
| 181 |
| 182 return nil |
| 183 }, nil) |
| 184 if err != nil { |
| 185 log.WithError(err).Errorf(c, "Failed to publish archival task.") |
| 186 return errors.Annotate(err, "failed to publish archival task").T
ag(transient.Tag).Err() |
| 187 } |
| 188 |
| 189 log.Debugf(c, "Successfully published cleanup archival task.") |
| 190 return nil |
| 191 } |
| 192 |
| 193 func makeArchivalTask(c context.Context, id coordinator.HashID, tag logdog.Archi
veDispatchTask_Tag) *tq.Task { |
| 194 name := fmt.Sprintf("%s_%s", id, tag) |
| 195 return &tq.Task{ |
| 196 Payload: emptyArchiveDispatchTask, |
| 197 NamePrefix: name, |
| 198 DeduplicationKey: info.GetNamespace(c), |
| 199 Title: name, |
| 200 } |
| 201 } |
| OLD | NEW |