Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(48)

Side by Side Diff: logdog/appengine/coordinator/tasks/archival.go

Issue 2989333002: [logdog] Replace Tumble with push queues. (Closed)
Patch Set: incorporate namespace in task name Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2017 The LUCI Authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 package tasks
16
17 import (
18 "fmt"
19 "io"
20 "time"
21
22 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1"
23 "github.com/luci/luci-go/logdog/appengine/coordinator"
24
25 "github.com/luci/luci-go/appengine/tq"
26 "github.com/luci/luci-go/common/errors"
27 log "github.com/luci/luci-go/common/logging"
28 "github.com/luci/luci-go/common/proto/google"
29 "github.com/luci/luci-go/common/retry/transient"
30
31 ds "github.com/luci/gae/service/datastore"
32 "github.com/luci/gae/service/info"
33
34 "github.com/golang/protobuf/proto"
35 "golang.org/x/net/context"
36 )
37
38 // CreateArchivalTask adds a task to the task queue to initiate
39 // archival on an stream. The task will be delayed by "delay".
40 func CreateArchivalTask(c context.Context, id coordinator.HashID, tag logdog.Arc hiveDispatchTask_Tag,
41 delay time.Duration, params *coordinator.ArchivalParams) error {
42
43 task := makeArchivalTask(c, id, tag)
44 task.Payload = &logdog.ArchiveDispatchTask{
45 Id: string(id),
46 Tag: tag,
47 SettleDelay: google.NewDuration(params.SettleDelay),
48 CompletePeriod: google.NewDuration(params.CompletePeriod),
49 }
50 task.Delay = delay
51
52 if err := taskDispatcher.AddTask(c, task); err != nil {
53 log.Fields{
54 log.ErrorKey: err,
55 "taskName": task.Name(),
56 }.Errorf(c, "Failed to add task to task queue.")
Ryan Tseng 2017/08/03 19:30:04 This will cause the same error to log multiple tim
dnj 2017/08/03 21:05:47 Done.
57 return err
58 }
59
60 log.Debugf(c, "Successfully created archival task: %q", task.Name())
61 return nil
62 }
63
64 // ClearArchiveIncompleteExpiredTask deletes the enqueued task for the specified
65 // hash ID.
66 //
67 // This task will be created by CreateArchiveIncompleteTask.
68 func ClearArchiveIncompleteExpiredTask(c context.Context, id coordinator.HashID) error {
Ryan Tseng 2017/08/03 19:30:04 DeleteAllExpiredTasks()?
dnj 2017/08/03 21:05:46 Nope, this clears a single archival tasks for the
69 task := makeArchivalTask(c, id, logdog.ArchiveDispatchTask_EXPIRED)
70 log.Debugf(c, "Deleting archival task: %q", task.Name())
71 if err := taskDispatcher.DeleteTask(c, task); err != nil {
72 log.Fields{
73 log.ErrorKey: err,
74 "taskName": task.Name(),
75 }.Errorf(c, "Failed to delete expired archival task.")
76 return err
77 }
78
79 log.Debugf(c, "Successfully removed EXPIRED archival task: %q", task.Nam e())
80 return nil
81 }
82
83 // handleArchiveDispatchTask is a tq.Handler for an ArchiveDispatchTask.
Ryan Tseng 2017/08/03 19:30:04 What does it do?
dnj 2017/08/03 21:05:47 Updated comment.
84 func handleArchiveDispatchTask(c context.Context, payload proto.Message, execCou nt int) error {
85 adt, ok := payload.(*logdog.ArchiveDispatchTask)
86 if !ok {
87 return errors.Reason("unexpected message type %T", payload).Err( )
88 }
89
90 log.Infof(c, "Handling archival for %q task (#%d) in namespace %q: %q",
91 adt.Tag, execCount, info.GetNamespace(c), adt.Id)
92
93 // Get our archival publisher.
94 svc := coordinator.GetServices(c)
95 ap, err := svc.ArchivalPublisher(c)
96 if err != nil {
97 log.WithError(err).Errorf(c, "Failed to get archival publisher." )
98 return transient.Tag.Apply(err)
99 }
100 defer func() {
101 if err := ap.Close(); err != nil {
102 log.WithError(err).Warningf(c, "Failed to close archival publisher.")
103 }
104 }()
105
106 // Get the log stream.
107 stream := coordinator.LogStream{ID: coordinator.HashID(adt.Id)}
108 state := stream.State(c)
109
110 err = ds.RunInTransaction(c, func(c context.Context) error {
111 if err := ds.Get(c, state); err != nil {
112 if err == ds.ErrNoSuchEntity {
113 log.Warningf(c, "Log stream no longer exists.")
114 return nil
115 }
116
117 log.WithError(err).Errorf(c, "Failed to load archival lo g stream.")
Ryan Tseng 2017/08/03 19:30:04 Use errors.Annotate() instead for this transaction
dnj 2017/08/03 21:05:46 Done.
118 return err
119 }
120
121 params := coordinator.ArchivalParams{
122 RequestID: info.RequestID(c),
123 SettleDelay: google.DurationFromProto(adt.SettleDelay ),
124 CompletePeriod: google.DurationFromProto(adt.CompletePer iod),
125 }
126 if err = params.PublishTask(c, ap, state); err != nil {
127 if err == coordinator.ErrArchiveTasked {
128 log.Warningf(c, "Archival already tasked, skippi ng.")
129 return nil
130 }
131
132 log.WithError(err).Errorf(c, "Failed to publish archival task.")
133 return err
134 }
135
136 if err := ds.Put(c, state); err != nil {
137 log.WithError(err).Errorf(c, "Failed to update datastore .")
138 return err
139 }
140
141 return nil
142 }, nil)
143 if err != nil {
144 log.WithError(err).Errorf(c, "Failed to publish archival task.")
145 return transient.Tag.Apply(err)
146 }
147
148 log.Debugf(c, "Successfully published cleanup archival task.")
149 return nil
150 }
151
152 func makeArchivalTask(c context.Context, id coordinator.HashID, tag logdog.Archi veDispatchTask_Tag) *tq.Task {
153 name := fmt.Sprintf("%s_%s", id, tag.String())
154 return &tq.Task{
155 NamePrefix: name,
156 DeduplicationKey: info.GetNamespace(c),
157 Title: name,
158 }
159 }
160
161 // errCheckReader is an io.Reader wrapper which tracks whether or not the
Ryan Tseng 2017/08/03 19:30:04 Is this used?
dnj 2017/08/03 21:05:47 No, now that I'm using "tq". Is deleted.
162 // underlying io.Reader encountered an error.
163 //
164 // We use this because we stream our Reader through a json.Decoder, which can
165 // also return an error (on invalid JSON), so we can't outright tell whether the
166 // error is in I/O (transient) or due to JSON content (non-transient).
167 //
168 // This lets us differentiate.
169 type errCheckReader struct {
170 inner io.Reader
171 wasError bool
172 }
173
174 func (r *errCheckReader) Read(p []byte) (int, error) {
175 v, err := r.inner.Read(p)
176 if err != nil {
177 r.wasError = true
178 }
179 return v, err
180 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698