Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1263)

Side by Side Diff: logdog/appengine/coordinator/backend/archival.go

Issue 2989333002: [logdog] Replace Tumble with push queues. (Closed)
Patch Set: Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2017 The LUCI Authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 package backend
16
17 import (
18 "encoding/json"
19 "fmt"
20 "io"
21 "net/http"
22 "time"
23
24 "github.com/luci/luci-go/logdog/appengine/coordinator"
25
26 log "github.com/luci/luci-go/common/logging"
27 "github.com/luci/luci-go/server/router"
28
29 ds "github.com/luci/gae/service/datastore"
30 "github.com/luci/gae/service/info"
31 "github.com/luci/gae/service/taskqueue"
32
33 "golang.org/x/net/context"
34 )
35
36 type ArchivalTaskTag string
37
38 const (
39 // ArchiveExpired is a tag that is applied to the initially-scheduled
40 // emergency cleanup task that will catch streams that never terminate.
41 ArchiveExpired ArchivalTaskTag = "expired"
42
43 // ArchiveTerminated is a tag that is applied to the standard terminal c leanup
44 // task.
45 ArchiveTerminated ArchivalTaskTag = "terminated"
46 )
47
48 // archivalTaskParams is the data handed in the POST body to the
49 // handleArchivalTask handler.
50 type archivalTaskParams struct {
51 // ID is the hash ID of the LogStream whose archive task is being create d.
52 //
53 // Note that the task will apply to the LogStreamState, not the stream
54 // entity itself.
55 ID coordinator.HashID `json:"id"`
56
57 // Tag is this task's tag.
58 Tag ArchivalTaskTag `json:"tag"`
59
60 // SettleDelay is the settle delay (see ArchivalParams).
61 SettleDelay time.Duration `json:"settleDelay,omitempty"`
62 // CompletePeriod is the complete period to use (see ArchivalParams).
63 CompletePeriod time.Duration `json:"completePeriod,omitempty"`
64 }
65
66 func wrapTask(ctx *router.Context, fn func() bool) {
67 if !fn() {
68 log.Errorf(ctx.Context, "Task reported transient failure, resche duling.")
69 ctx.Writer.WriteHeader(http.StatusInternalServerError)
70 return
71 }
72 }
73
74 // CreateArchivalTask adds a task to the task queue to initiate
75 // archival on an stream. The task will be delayed by "delay".
76 //
77 // The resulting task will be named, and can be cancelled by
78 // ClearArchivalTask.
79 func CreateArchivalTask(c context.Context, id coordinator.HashID, tag ArchivalTa skTag,
80 delay time.Duration, params *coordinator.ArchivalParams) error {
81
82 atp := archivalTaskParams{
83 ID: id,
84 Tag: tag,
85 SettleDelay: params.SettleDelay,
86 CompletePeriod: params.CompletePeriod,
87 }
88 payload, err := json.Marshal(&atp)
89 if err != nil {
90 log.WithError(err).Errorf(c, "Failed to generate payload JSON.")
91 return err
92 }
93
94 taskName := archivalTaskName(id, tag)
95 task := taskqueue.NewPOSTTask("/internal/tasks/archival", nil)
96 task.Name = taskName
97 task.Payload = payload
98 task.Delay = delay
99 if err := taskqueue.Add(c, ArchivalTaskQueue, task); err != nil {
100 if err == taskqueue.ErrTaskAlreadyAdded {
101 log.Warningf(c, "Task %q was already added; skipping.", task.Name)
102 return nil
103 }
104
105 log.Fields{
106 log.ErrorKey: err,
107 "taskName": task.Name,
108 }.Errorf(c, "Failed to add task to task queue.")
109 return err
110 }
111
112 log.Debugf(c, "Successfully created archival task: %q", taskName)
113 return nil
114 }
115
116 // ClearArchiveIncompleteTask deletes the enqueued task for the specified
117 // hash ID.
118 //
119 // This task will be created by CreateArchiveIncompleteTask.
120 func ClearArchiveIncompleteExpiredTask(c context.Context, id coordinator.HashID) error {
121 task := taskqueue.Task{Name: archivalTaskName(id, ArchiveExpired)}
122 log.Debugf(c, "Deleting archival task: %q", task.Name)
123 if err := taskqueue.Delete(c, ArchivalTaskQueue, &task); err != nil {
124 log.Fields{
125 log.ErrorKey: err,
126 "taskName": task.Name,
127 }.Errorf(c, "Failed to delete expired archival task.")
128 return err
129 }
130 return nil
131 }
132
133 // handleArchivalTask is an HTTP handler for the archival task.
134 func handleArchivalTask(ctx *router.Context) {
135 wrapTask(ctx, func() bool {
136 c := ctx.Context
dnj 2017/08/03 03:45:55 Note: This logic is largely a clone of the Tumble
137
138 // Read the request body.
139 if ctx.Request.Body == nil {
140 log.Errorf(c, "Request has no body.")
141 return true // Consume this task.
142 }
143 defer ctx.Request.Body.Close()
144
145 // Load the JSON body from the request.
146 var atp archivalTaskParams
147 ecr := errCheckReader{ctx.Request.Body, false}
148 if err := json.NewDecoder(&ecr).Decode(&atp); err != nil {
149 log.WithError(err).Errorf(c, "Failed to read requet body .")
150 return !ecr.wasError // If there was an I/O error, retur n transient.
151 }
152
153 log.Infof(c, "Scheduling archival for %q task in namespace %q: % q",
154 atp.Tag, info.GetNamespace(c), atp.ID)
155
156 // Get our archival publisher.
157 svc := coordinator.GetServices(c)
158 ap, err := svc.ArchivalPublisher(c)
159 if err != nil {
160 log.WithError(err).Errorf(c, "Failed to get archival pub lisher.")
161 return false // Transient
162 }
163 defer func() {
164 if err := ap.Close(); err != nil {
165 log.WithError(err).Warningf(c, "Failed to close archival publisher.")
166 }
167 }()
168
169 // Get the log stream.
170 stream := coordinator.LogStream{ID: atp.ID}
171 state := stream.State(c)
172
173 err = ds.RunInTransaction(c, func(c context.Context) error {
174 if err := ds.Get(c, state); err != nil {
175 if err == ds.ErrNoSuchEntity {
176 log.Warningf(c, "Log stream no longer ex ists.")
177 return nil
178 }
179
180 log.WithError(err).Errorf(c, "Failed to load arc hival log stream.")
181 return err
182 }
183
184 params := coordinator.ArchivalParams{
185 RequestID: info.RequestID(c),
186 SettleDelay: atp.SettleDelay,
187 CompletePeriod: atp.CompletePeriod,
188 }
189 if err = params.PublishTask(c, ap, state); err != nil {
190 if err == coordinator.ErrArchiveTasked {
191 log.Warningf(c, "Archival already tasked , skipping.")
192 return nil
193 }
194
195 log.WithError(err).Errorf(c, "Failed to publish archival task.")
196 return err
197 }
198
199 if err := ds.Put(c, state); err != nil {
200 log.WithError(err).Errorf(c, "Failed to update d atastore.")
201 return err
202 }
203
204 return nil
205 }, nil)
206 if err != nil {
207 log.WithError(err).Errorf(c, "Failed to publish archival task.")
208 return false // Transient.
209 }
210
211 log.Debugf(c, "Successfully published cleanup archival task.")
212 return true
213 })
214 }
215
216 func archivalTaskName(id coordinator.HashID, tag ArchivalTaskTag) string {
217 return fmt.Sprintf("archive_%s_%s", tag, id)
218 }
219
220 // errCheckReader is an io.Reader wrapper which tracks whether or not the
221 // underlying io.Reader encountered an error.
222 //
223 // We use this because we stream our Reader through a json.Decoder, which can
224 // also return an error (on invalid JSON), so we can't outright tell whether the
225 // error is in I/O (transient) or due to JSON content (non-transient).
226 //
227 // This lets us differentiate.
228 type errCheckReader struct {
229 inner io.Reader
230 wasError bool
231 }
232
233 func (r *errCheckReader) Read(p []byte) (int, error) {
234 v, err := r.inner.Read(p)
235 if err != nil {
236 r.wasError = true
237 }
238 return v, err
239 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698