Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(239)

Side by Side Diff: logdog/appengine/coordinator/tasks/archival.go

Issue 2989333002: [logdog] Replace Tumble with push queues. (Closed)
Patch Set: comments Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « logdog/appengine/coordinator/service.go ('k') | logdog/appengine/coordinator/tasks/routes.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2017 The LUCI Authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 package tasks
16
17 import (
18 "fmt"
19 "time"
20
21 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1"
22 "github.com/luci/luci-go/logdog/appengine/coordinator"
23
24 "github.com/luci/luci-go/appengine/tq"
25 "github.com/luci/luci-go/common/errors"
26 log "github.com/luci/luci-go/common/logging"
27 "github.com/luci/luci-go/common/proto/google"
28 "github.com/luci/luci-go/common/retry/transient"
29
30 ds "github.com/luci/gae/service/datastore"
31 "github.com/luci/gae/service/info"
32
33 "github.com/golang/protobuf/proto"
34 "golang.org/x/net/context"
35 )
36
37 // emptyArchiveDispatchTask is a singleton empty ArchiveDispatchTask.
38 //
39 // This is used because tq.Task are typed to their Payload, so every task needs
40 // a Payload. This is the default Payload value, and is overridden when the
41 // Payload needs to actually be populated for publishing.
42 var emptyArchiveDispatchTask = &logdog.ArchiveDispatchTask{}
43
44 // CreateArchivalTask adds a task to the task queue to initiate
45 // archival on an stream. The task will be delayed by "delay".
46 func CreateArchivalTask(c context.Context, id coordinator.HashID, tag logdog.Arc hiveDispatchTask_Tag,
47 delay time.Duration, params *coordinator.ArchivalParams) error {
48
49 task := makeArchivalTask(c, id, tag)
50 task.Payload = &logdog.ArchiveDispatchTask{
51 Id: string(id),
52 Tag: tag,
53 SettleDelay: google.NewDuration(params.SettleDelay),
54 CompletePeriod: google.NewDuration(params.CompletePeriod),
55 }
56 task.Delay = delay
57
58 // Add the task outside of a transaction, since it's a named task.
59 //
60 // This introduces two possibilities:
61 // - During transaction retries, the task may be added multiple times. T his
62 // is fine, since it will naturally deduplicate.
63 // - If the overall transaction fails, the task may be added for a log s tream
64 // that never exists. We handle this in the handler by warning (but no t
65 // failing) on non-existent log streams.
66 if err := taskDispatcher.AddTask(ds.WithoutTransaction(c), task); err != nil {
67 log.Fields{
68 log.ErrorKey: err,
69 "taskName": task.Name(),
70 }.Errorf(c, "Failed to add task to task queue.")
71 return errors.Annotate(err, "failed to add task to task queue"). Err()
72 }
73
74 log.Debugf(c, "Successfully created archival task: %q", task.Name())
75 return nil
76 }
77
78 // DeleteArchiveStreamExpiredTask deletes stream EXPIRED task associated with
79 // id. If the task has already been deleted, this will do nothing.
80 func DeleteArchiveStreamExpiredTask(c context.Context, id coordinator.HashID) er ror {
81 task := makeArchivalTask(c, id, logdog.ArchiveDispatchTask_EXPIRED)
82
83 log.Debugf(c, "Deleting archival task: %q", task.Name())
84 if err := taskDispatcher.DeleteTask(c, task); err != nil {
85 log.Fields{
86 log.ErrorKey: err,
87 "taskName": task.Name(),
88 }.Errorf(c, "Failed to delete expired archival task.")
89 return errors.Annotate(err, "failed to delete expired archival t ask").Err()
90 }
91
92 log.Debugf(c, "Successfully removed EXPIRED archival task: %q", task.Nam e())
93 return nil
94 }
95
96 // handleArchiveDispatchTask is a tq.Handler for an ArchiveDispatchTask.
97 //
98 // This task is associated with a log stream and some archival parameters. It
99 // will verify that the log stream hasn't had archival dispatched yet. If it
100 // has, the task will terminate without further operation.
101 //
102 // For streams that haven't been archived, this task will transactionally
103 // dispatch an archival task to the Archivist fleet and update the stream's
104 // status.
105 func handleArchiveDispatchTask(c context.Context, payload proto.Message, execCou nt int) error {
106 adt, ok := payload.(*logdog.ArchiveDispatchTask)
107 if !ok {
108 return errors.Reason("unexpected message type %T", payload).Err( )
109 }
110
111 log.Infof(c, "Handling archival for %q task (#%d) in namespace %q: %q",
112 adt.Tag, execCount, info.GetNamespace(c), adt.Id)
113
114 stream := coordinator.LogStream{ID: coordinator.HashID(adt.Id)}
115 state := stream.State(c)
116
117 // Check if we're already archived (non-transactional).
118 if err := ds.Get(c, state); err != nil {
119 if err == ds.ErrNoSuchEntity {
120 log.Warningf(c, "Log stream no longer exists.")
121 return nil
122 }
123
124 log.WithError(err).Errorf(c, "Failed to load archival log stream .")
125 return errors.Annotate(err, "failed to load archival log stream" ).Tag(transient.Tag).Err()
126 }
127 if state.ArchivalState() != coordinator.NotArchived {
128 log.Infof(c, "Log stream archival is already tasked.")
129 return nil
130 }
131
132 // Get our archival publisher.
133 svc := coordinator.GetServices(c)
134 ap, err := svc.ArchivalPublisher(c)
135 if err != nil {
136 log.WithError(err).Errorf(c, "Failed to get archival publisher." )
137 return errors.Annotate(err, "failed to get archival publisher"). Tag(transient.Tag).Err()
138 }
139 defer func() {
140 if err := ap.Close(); err != nil {
141 log.WithError(err).Warningf(c, "Failed to close archival publisher.")
142 }
143 }()
144
145 params := coordinator.ArchivalParams{
146 RequestID: info.RequestID(c),
147 SettleDelay: google.DurationFromProto(adt.SettleDelay),
148 CompletePeriod: google.DurationFromProto(adt.CompletePeriod),
149 }
150
151 err = ds.RunInTransaction(c, func(c context.Context) error {
152 // Check if we're already archived (transactional).
153 if err := ds.Get(c, state); err != nil {
154 if err == ds.ErrNoSuchEntity {
155 log.Warningf(c, "(Transactional) Log stream no l onger exists.")
156 return nil
157 }
158
159 log.WithError(err).Errorf(c, "(Transactional) Failed to load archival log stream.")
160 return errors.Annotate(err, "failed to load archival str eam").Err()
161 }
162 if state.ArchivalState() != coordinator.NotArchived {
163 log.Infof(c, "(Transactional) Log stream archival is alr eady tasked.")
164 return nil
165 }
166
167 if err = params.PublishTask(c, ap, state); err != nil {
168 if err == coordinator.ErrArchiveTasked {
169 log.Warningf(c, "Archival already tasked, skippi ng.")
170 return nil
171 }
172
173 log.WithError(err).Errorf(c, "Failed to publish archival task.")
174 return errors.Annotate(err, "failed to publish archival task").Err()
175 }
176
177 if err := ds.Put(c, state); err != nil {
178 log.WithError(err).Errorf(c, "Failed to update datastore .")
179 return errors.Annotate(err, "failed to update datastore" ).Err()
180 }
181
182 return nil
183 }, nil)
184 if err != nil {
185 log.WithError(err).Errorf(c, "Failed to publish archival task.")
186 return errors.Annotate(err, "failed to publish archival task").T ag(transient.Tag).Err()
187 }
188
189 log.Debugf(c, "Successfully published cleanup archival task.")
190 return nil
191 }
192
193 func makeArchivalTask(c context.Context, id coordinator.HashID, tag logdog.Archi veDispatchTask_Tag) *tq.Task {
194 name := fmt.Sprintf("%s_%s", id, tag)
195 return &tq.Task{
196 Payload: emptyArchiveDispatchTask,
197 NamePrefix: name,
198 DeduplicationKey: info.GetNamespace(c),
199 Title: name,
200 }
201 }
OLDNEW
« no previous file with comments | « logdog/appengine/coordinator/service.go ('k') | logdog/appengine/coordinator/tasks/routes.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698