Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2017 The LUCI Authors. | |
| 2 // | |
| 3 // Licensed under the Apache License, Version 2.0 (the "License"); | |
| 4 // you may not use this file except in compliance with the License. | |
| 5 // You may obtain a copy of the License at | |
| 6 // | |
| 7 // http://www.apache.org/licenses/LICENSE-2.0 | |
| 8 // | |
| 9 // Unless required by applicable law or agreed to in writing, software | |
| 10 // distributed under the License is distributed on an "AS IS" BASIS, | |
| 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| 12 // See the License for the specific language governing permissions and | |
| 13 // limitations under the License. | |
| 14 | |
| 15 package backend | |
| 16 | |
| 17 import ( | |
| 18 "encoding/json" | |
| 19 "fmt" | |
| 20 "io" | |
| 21 "net/http" | |
| 22 "time" | |
| 23 | |
| 24 "github.com/luci/luci-go/logdog/appengine/coordinator" | |
| 25 | |
| 26 log "github.com/luci/luci-go/common/logging" | |
| 27 "github.com/luci/luci-go/server/router" | |
| 28 | |
| 29 ds "github.com/luci/gae/service/datastore" | |
| 30 "github.com/luci/gae/service/info" | |
| 31 "github.com/luci/gae/service/taskqueue" | |
| 32 | |
| 33 "golang.org/x/net/context" | |
| 34 ) | |
| 35 | |
| 36 type ArchivalTaskTag string | |
| 37 | |
| 38 const ( | |
| 39 // ArchiveExpired is a tag that is applied to the initially-scheduled | |
| 40 // emergency cleanup task that will catch streams that never terminate. | |
| 41 ArchiveExpired ArchivalTaskTag = "expired" | |
| 42 | |
| 43 // ArchiveTerminated is a tag that is applied to the standard terminal c leanup | |
| 44 // task. | |
| 45 ArchiveTerminated ArchivalTaskTag = "terminated" | |
| 46 ) | |
| 47 | |
| 48 // archivalTaskParams is the data handed in the POST body to the | |
| 49 // handleArchivalTask handler. | |
| 50 type archivalTaskParams struct { | |
| 51 // ID is the hash ID of the LogStream whose archive task is being create d. | |
| 52 // | |
| 53 // Note that the task will apply to the LogStreamState, not the stream | |
| 54 // entity itself. | |
| 55 ID coordinator.HashID `json:"id"` | |
| 56 | |
| 57 // Tag is this task's tag. | |
| 58 Tag ArchivalTaskTag `json:"tag"` | |
| 59 | |
| 60 // SettleDelay is the settle delay (see ArchivalParams). | |
| 61 SettleDelay time.Duration `json:"settleDelay,omitempty"` | |
| 62 // CompletePeriod is the complete period to use (see ArchivalParams). | |
| 63 CompletePeriod time.Duration `json:"completePeriod,omitempty"` | |
| 64 } | |
| 65 | |
| 66 func wrapTask(ctx *router.Context, fn func() bool) { | |
| 67 if !fn() { | |
| 68 log.Errorf(ctx.Context, "Task reported transient failure, resche duling.") | |
| 69 ctx.Writer.WriteHeader(http.StatusInternalServerError) | |
| 70 return | |
| 71 } | |
| 72 } | |
| 73 | |
| 74 // CreateArchivalTask adds a task to the task queue to initiate | |
| 75 // archival on an stream. The task will be delayed by "delay". | |
| 76 // | |
| 77 // The resulting task will be named, and can be cancelled by | |
| 78 // ClearArchivalTask. | |
| 79 func CreateArchivalTask(c context.Context, id coordinator.HashID, tag ArchivalTa skTag, | |
| 80 delay time.Duration, params *coordinator.ArchivalParams) error { | |
| 81 | |
| 82 atp := archivalTaskParams{ | |
| 83 ID: id, | |
| 84 Tag: tag, | |
| 85 SettleDelay: params.SettleDelay, | |
| 86 CompletePeriod: params.CompletePeriod, | |
| 87 } | |
| 88 payload, err := json.Marshal(&atp) | |
| 89 if err != nil { | |
| 90 log.WithError(err).Errorf(c, "Failed to generate payload JSON.") | |
| 91 return err | |
| 92 } | |
| 93 | |
| 94 taskName := archivalTaskName(id, tag) | |
| 95 task := taskqueue.NewPOSTTask("/internal/tasks/archival", nil) | |
| 96 task.Name = taskName | |
| 97 task.Payload = payload | |
| 98 task.Delay = delay | |
| 99 if err := taskqueue.Add(c, ArchivalTaskQueue, task); err != nil { | |
| 100 if err == taskqueue.ErrTaskAlreadyAdded { | |
| 101 log.Warningf(c, "Task %q was already added; skipping.", task.Name) | |
| 102 return nil | |
| 103 } | |
| 104 | |
| 105 log.Fields{ | |
| 106 log.ErrorKey: err, | |
| 107 "taskName": task.Name, | |
| 108 }.Errorf(c, "Failed to add task to task queue.") | |
| 109 return err | |
| 110 } | |
| 111 | |
| 112 log.Debugf(c, "Successfully created archival task: %q", taskName) | |
| 113 return nil | |
| 114 } | |
| 115 | |
| 116 // ClearArchiveIncompleteTask deletes the enqueued task for the specified | |
| 117 // hash ID. | |
| 118 // | |
| 119 // This task will be created by CreateArchiveIncompleteTask. | |
| 120 func ClearArchiveIncompleteExpiredTask(c context.Context, id coordinator.HashID) error { | |
| 121 task := taskqueue.Task{Name: archivalTaskName(id, ArchiveExpired)} | |
| 122 log.Debugf(c, "Deleting archival task: %q", task.Name) | |
| 123 if err := taskqueue.Delete(c, ArchivalTaskQueue, &task); err != nil { | |
| 124 log.Fields{ | |
| 125 log.ErrorKey: err, | |
| 126 "taskName": task.Name, | |
| 127 }.Errorf(c, "Failed to delete expired archival task.") | |
| 128 return err | |
| 129 } | |
| 130 return nil | |
| 131 } | |
| 132 | |
| 133 // handleArchivalTask is an HTTP handler for the archival task. | |
| 134 func handleArchivalTask(ctx *router.Context) { | |
| 135 wrapTask(ctx, func() bool { | |
| 136 c := ctx.Context | |
|
dnj
2017/08/03 03:45:55
Note: This logic is largely a clone of the Tumble
| |
| 137 | |
| 138 // Read the request body. | |
| 139 if ctx.Request.Body == nil { | |
| 140 log.Errorf(c, "Request has no body.") | |
| 141 return true // Consume this task. | |
| 142 } | |
| 143 defer ctx.Request.Body.Close() | |
| 144 | |
| 145 // Load the JSON body from the request. | |
| 146 var atp archivalTaskParams | |
| 147 ecr := errCheckReader{ctx.Request.Body, false} | |
| 148 if err := json.NewDecoder(&ecr).Decode(&atp); err != nil { | |
| 149 log.WithError(err).Errorf(c, "Failed to read requet body .") | |
| 150 return !ecr.wasError // If there was an I/O error, retur n transient. | |
| 151 } | |
| 152 | |
| 153 log.Infof(c, "Scheduling archival for %q task in namespace %q: % q", | |
| 154 atp.Tag, info.GetNamespace(c), atp.ID) | |
| 155 | |
| 156 // Get our archival publisher. | |
| 157 svc := coordinator.GetServices(c) | |
| 158 ap, err := svc.ArchivalPublisher(c) | |
| 159 if err != nil { | |
| 160 log.WithError(err).Errorf(c, "Failed to get archival pub lisher.") | |
| 161 return false // Transient | |
| 162 } | |
| 163 defer func() { | |
| 164 if err := ap.Close(); err != nil { | |
| 165 log.WithError(err).Warningf(c, "Failed to close archival publisher.") | |
| 166 } | |
| 167 }() | |
| 168 | |
| 169 // Get the log stream. | |
| 170 stream := coordinator.LogStream{ID: atp.ID} | |
| 171 state := stream.State(c) | |
| 172 | |
| 173 err = ds.RunInTransaction(c, func(c context.Context) error { | |
| 174 if err := ds.Get(c, state); err != nil { | |
| 175 if err == ds.ErrNoSuchEntity { | |
| 176 log.Warningf(c, "Log stream no longer ex ists.") | |
| 177 return nil | |
| 178 } | |
| 179 | |
| 180 log.WithError(err).Errorf(c, "Failed to load arc hival log stream.") | |
| 181 return err | |
| 182 } | |
| 183 | |
| 184 params := coordinator.ArchivalParams{ | |
| 185 RequestID: info.RequestID(c), | |
| 186 SettleDelay: atp.SettleDelay, | |
| 187 CompletePeriod: atp.CompletePeriod, | |
| 188 } | |
| 189 if err = params.PublishTask(c, ap, state); err != nil { | |
| 190 if err == coordinator.ErrArchiveTasked { | |
| 191 log.Warningf(c, "Archival already tasked , skipping.") | |
| 192 return nil | |
| 193 } | |
| 194 | |
| 195 log.WithError(err).Errorf(c, "Failed to publish archival task.") | |
| 196 return err | |
| 197 } | |
| 198 | |
| 199 if err := ds.Put(c, state); err != nil { | |
| 200 log.WithError(err).Errorf(c, "Failed to update d atastore.") | |
| 201 return err | |
| 202 } | |
| 203 | |
| 204 return nil | |
| 205 }, nil) | |
| 206 if err != nil { | |
| 207 log.WithError(err).Errorf(c, "Failed to publish archival task.") | |
| 208 return false // Transient. | |
| 209 } | |
| 210 | |
| 211 log.Debugf(c, "Successfully published cleanup archival task.") | |
| 212 return true | |
| 213 }) | |
| 214 } | |
| 215 | |
| 216 func archivalTaskName(id coordinator.HashID, tag ArchivalTaskTag) string { | |
| 217 return fmt.Sprintf("archive_%s_%s", tag, id) | |
| 218 } | |
| 219 | |
| 220 // errCheckReader is an io.Reader wrapper which tracks whether or not the | |
| 221 // underlying io.Reader encountered an error. | |
| 222 // | |
| 223 // We use this because we stream our Reader through a json.Decoder, which can | |
| 224 // also return an error (on invalid JSON), so we can't outright tell whether the | |
| 225 // error is in I/O (transient) or due to JSON content (non-transient). | |
| 226 // | |
| 227 // This lets us differentiate. | |
| 228 type errCheckReader struct { | |
| 229 inner io.Reader | |
| 230 wasError bool | |
| 231 } | |
| 232 | |
| 233 func (r *errCheckReader) Read(p []byte) (int, error) { | |
| 234 v, err := r.inner.Read(p) | |
| 235 if err != nil { | |
| 236 r.wasError = true | |
| 237 } | |
| 238 return v, err | |
| 239 } | |
| OLD | NEW |