| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package main | 5 package main |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "github.com/luci/luci-go/common/auth" | 8 "github.com/luci/luci-go/common/auth" |
| 9 "github.com/luci/luci-go/common/clock" | 9 "github.com/luci/luci-go/common/clock" |
| 10 "github.com/luci/luci-go/common/errors" | 10 "github.com/luci/luci-go/common/errors" |
| (...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 103 } | 103 } |
| 104 | 104 |
| 105 log.Fields{ | 105 log.Fields{ |
| 106 "project": tqOpts.Project, | 106 "project": tqOpts.Project, |
| 107 "queue": tqOpts.Queue, | 107 "queue": tqOpts.Queue, |
| 108 }.Infof(c, "Pulling tasks from task queue.") | 108 }.Infof(c, "Pulling tasks from task queue.") |
| 109 taskqueueClient.RunTasks(shutdownCtx, tqOpts, func(c context.Context, t
taskqueueClient.Task) bool { | 109 taskqueueClient.RunTasks(shutdownCtx, tqOpts, func(c context.Context, t
taskqueueClient.Task) bool { |
| 110 c = log.SetField(c, "taskID", t.ID) | 110 c = log.SetField(c, "taskID", t.ID) |
| 111 | 111 |
| 112 startTime := clock.Now(c) | 112 startTime := clock.Now(c) |
| 113 » » err := ar.ArchiveTask(c, t.Payload) | 113 » » deleteTask := ar.ArchiveTask(c, t.Payload, t.Age(clock.Get(c))) |
| 114 duration := clock.Now(c).Sub(startTime) | 114 duration := clock.Now(c).Sub(startTime) |
| 115 | 115 |
| 116 » » switch { | 116 » » if !deleteTask { |
| 117 » » case errors.IsTransient(err): | |
| 118 » » » // Do not consume | |
| 119 » » » log.Fields{ | |
| 120 » » » » log.ErrorKey: err, | |
| 121 » » » » "duration": duration, | |
| 122 » » » }.Warningf(c, "TRANSIENT error processing task.") | |
| 123 » » » return false | |
| 124 | |
| 125 » » case err == nil: | |
| 126 log.Fields{ | 117 log.Fields{ |
| 127 "duration": duration, | 118 "duration": duration, |
| 128 » » » }.Infof(c, "Task successfully processed; deleting.") | 119 » » » }.Errorf(c, "Task processing failed. Not deleting.") |
| 129 » » » return true | 120 » » » return false |
| 121 » » } |
| 130 | 122 |
| 131 » » default: | 123 » » log.Fields{ |
| 132 » » » log.Fields{ | 124 » » » "duration": duration, |
| 133 » » » » log.ErrorKey: err, | 125 » » }.Infof(c, "Task successfully processed; deleting.") |
| 134 » » » » "duration": duration, | 126 » » return true |
| 135 » » » }.Errorf(c, "Non-transient error processing task; deleti
ng.") | |
| 136 » » » return true | |
| 137 » » } | |
| 138 }) | 127 }) |
| 139 | 128 |
| 140 log.Debugf(c, "Archivist finished.") | 129 log.Debugf(c, "Archivist finished.") |
| 141 return nil | 130 return nil |
| 142 } | 131 } |
| 143 | 132 |
| 144 // Entry point. | 133 // Entry point. |
| 145 func main() { | 134 func main() { |
| 146 a := application{} | 135 a := application{} |
| 147 a.Run(context.Background(), a.runArchivist) | 136 a.Run(context.Background(), a.runArchivist) |
| 148 } | 137 } |
| OLD | NEW |