Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2017 The LUCI Authors. | |
| 2 // | |
| 3 // Licensed under the Apache License, Version 2.0 (the "License"); | |
| 4 // you may not use this file except in compliance with the License. | |
| 5 // You may obtain a copy of the License at | |
| 6 // | |
| 7 // http://www.apache.org/licenses/LICENSE-2.0 | |
| 8 // | |
| 9 // Unless required by applicable law or agreed to in writing, software | |
| 10 // distributed under the License is distributed on an "AS IS" BASIS, | |
| 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| 12 // See the License for the specific language governing permissions and | |
| 13 // limitations under the License. | |
| 14 | |
| 15 package tasks | |
| 16 | |
| 17 import ( | |
| 18 "fmt" | |
| 19 "io" | |
| 20 "time" | |
| 21 | |
| 22 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" | |
| 23 "github.com/luci/luci-go/logdog/appengine/coordinator" | |
| 24 | |
| 25 "github.com/luci/luci-go/appengine/tq" | |
| 26 "github.com/luci/luci-go/common/errors" | |
| 27 log "github.com/luci/luci-go/common/logging" | |
| 28 "github.com/luci/luci-go/common/proto/google" | |
| 29 "github.com/luci/luci-go/common/retry/transient" | |
| 30 | |
| 31 ds "github.com/luci/gae/service/datastore" | |
| 32 "github.com/luci/gae/service/info" | |
| 33 | |
| 34 "github.com/golang/protobuf/proto" | |
| 35 "golang.org/x/net/context" | |
| 36 ) | |
| 37 | |
| 38 // CreateArchivalTask adds a task to the task queue to initiate | |
| 39 // archival on an stream. The task will be delayed by "delay". | |
| 40 func CreateArchivalTask(c context.Context, id coordinator.HashID, tag logdog.Arc hiveDispatchTask_Tag, | |
| 41 delay time.Duration, params *coordinator.ArchivalParams) error { | |
| 42 | |
| 43 task := makeArchivalTask(c, id, tag) | |
| 44 task.Payload = &logdog.ArchiveDispatchTask{ | |
| 45 Id: string(id), | |
| 46 Tag: tag, | |
| 47 SettleDelay: google.NewDuration(params.SettleDelay), | |
| 48 CompletePeriod: google.NewDuration(params.CompletePeriod), | |
| 49 } | |
| 50 task.Delay = delay | |
| 51 | |
| 52 if err := taskDispatcher.AddTask(c, task); err != nil { | |
| 53 log.Fields{ | |
| 54 log.ErrorKey: err, | |
| 55 "taskName": task.Name(), | |
| 56 }.Errorf(c, "Failed to add task to task queue.") | |
|
Ryan Tseng
2017/08/03 19:30:04
This will cause the same error to log multiple tim
dnj
2017/08/03 21:05:47
Done.
| |
| 57 return err | |
| 58 } | |
| 59 | |
| 60 log.Debugf(c, "Successfully created archival task: %q", task.Name()) | |
| 61 return nil | |
| 62 } | |
| 63 | |
| 64 // ClearArchiveIncompleteExpiredTask deletes the enqueued task for the specified | |
| 65 // hash ID. | |
| 66 // | |
| 67 // This task will be created by CreateArchiveIncompleteTask. | |
| 68 func ClearArchiveIncompleteExpiredTask(c context.Context, id coordinator.HashID) error { | |
|
Ryan Tseng
2017/08/03 19:30:04
DeleteAllExpiredTasks()?
dnj
2017/08/03 21:05:46
Nope, this clears a single archival tasks for the
| |
| 69 task := makeArchivalTask(c, id, logdog.ArchiveDispatchTask_EXPIRED) | |
| 70 log.Debugf(c, "Deleting archival task: %q", task.Name()) | |
| 71 if err := taskDispatcher.DeleteTask(c, task); err != nil { | |
| 72 log.Fields{ | |
| 73 log.ErrorKey: err, | |
| 74 "taskName": task.Name(), | |
| 75 }.Errorf(c, "Failed to delete expired archival task.") | |
| 76 return err | |
| 77 } | |
| 78 | |
| 79 log.Debugf(c, "Successfully removed EXPIRED archival task: %q", task.Nam e()) | |
| 80 return nil | |
| 81 } | |
| 82 | |
| 83 // handleArchiveDispatchTask is a tq.Handler for an ArchiveDispatchTask. | |
|
Ryan Tseng
2017/08/03 19:30:04
What does it do?
dnj
2017/08/03 21:05:47
Updated comment.
| |
| 84 func handleArchiveDispatchTask(c context.Context, payload proto.Message, execCou nt int) error { | |
| 85 adt, ok := payload.(*logdog.ArchiveDispatchTask) | |
| 86 if !ok { | |
| 87 return errors.Reason("unexpected message type %T", payload).Err( ) | |
| 88 } | |
| 89 | |
| 90 log.Infof(c, "Handling archival for %q task (#%d) in namespace %q: %q", | |
| 91 adt.Tag, execCount, info.GetNamespace(c), adt.Id) | |
| 92 | |
| 93 // Get our archival publisher. | |
| 94 svc := coordinator.GetServices(c) | |
| 95 ap, err := svc.ArchivalPublisher(c) | |
| 96 if err != nil { | |
| 97 log.WithError(err).Errorf(c, "Failed to get archival publisher." ) | |
| 98 return transient.Tag.Apply(err) | |
| 99 } | |
| 100 defer func() { | |
| 101 if err := ap.Close(); err != nil { | |
| 102 log.WithError(err).Warningf(c, "Failed to close archival publisher.") | |
| 103 } | |
| 104 }() | |
| 105 | |
| 106 // Get the log stream. | |
| 107 stream := coordinator.LogStream{ID: coordinator.HashID(adt.Id)} | |
| 108 state := stream.State(c) | |
| 109 | |
| 110 err = ds.RunInTransaction(c, func(c context.Context) error { | |
| 111 if err := ds.Get(c, state); err != nil { | |
| 112 if err == ds.ErrNoSuchEntity { | |
| 113 log.Warningf(c, "Log stream no longer exists.") | |
| 114 return nil | |
| 115 } | |
| 116 | |
| 117 log.WithError(err).Errorf(c, "Failed to load archival lo g stream.") | |
|
Ryan Tseng
2017/08/03 19:30:04
Use errors.Annotate() instead for this transaction
dnj
2017/08/03 21:05:46
Done.
| |
| 118 return err | |
| 119 } | |
| 120 | |
| 121 params := coordinator.ArchivalParams{ | |
| 122 RequestID: info.RequestID(c), | |
| 123 SettleDelay: google.DurationFromProto(adt.SettleDelay ), | |
| 124 CompletePeriod: google.DurationFromProto(adt.CompletePer iod), | |
| 125 } | |
| 126 if err = params.PublishTask(c, ap, state); err != nil { | |
| 127 if err == coordinator.ErrArchiveTasked { | |
| 128 log.Warningf(c, "Archival already tasked, skippi ng.") | |
| 129 return nil | |
| 130 } | |
| 131 | |
| 132 log.WithError(err).Errorf(c, "Failed to publish archival task.") | |
| 133 return err | |
| 134 } | |
| 135 | |
| 136 if err := ds.Put(c, state); err != nil { | |
| 137 log.WithError(err).Errorf(c, "Failed to update datastore .") | |
| 138 return err | |
| 139 } | |
| 140 | |
| 141 return nil | |
| 142 }, nil) | |
| 143 if err != nil { | |
| 144 log.WithError(err).Errorf(c, "Failed to publish archival task.") | |
| 145 return transient.Tag.Apply(err) | |
| 146 } | |
| 147 | |
| 148 log.Debugf(c, "Successfully published cleanup archival task.") | |
| 149 return nil | |
| 150 } | |
| 151 | |
| 152 func makeArchivalTask(c context.Context, id coordinator.HashID, tag logdog.Archi veDispatchTask_Tag) *tq.Task { | |
| 153 name := fmt.Sprintf("%s_%s", id, tag.String()) | |
| 154 return &tq.Task{ | |
| 155 NamePrefix: name, | |
| 156 DeduplicationKey: info.GetNamespace(c), | |
| 157 Title: name, | |
| 158 } | |
| 159 } | |
| 160 | |
| 161 // errCheckReader is an io.Reader wrapper which tracks whether or not the | |
|
Ryan Tseng
2017/08/03 19:30:04
Is this used?
dnj
2017/08/03 21:05:47
No, now that I'm using "tq". Is deleted.
| |
| 162 // underlying io.Reader encountered an error. | |
| 163 // | |
| 164 // We use this because we stream our Reader through a json.Decoder, which can | |
| 165 // also return an error (on invalid JSON), so we can't outright tell whether the | |
| 166 // error is in I/O (transient) or due to JSON content (non-transient). | |
| 167 // | |
| 168 // This lets us differentiate. | |
| 169 type errCheckReader struct { | |
| 170 inner io.Reader | |
| 171 wasError bool | |
| 172 } | |
| 173 | |
| 174 func (r *errCheckReader) Read(p []byte) (int, error) { | |
| 175 v, err := r.inner.Read(p) | |
| 176 if err != nil { | |
| 177 r.wasError = true | |
| 178 } | |
| 179 return v, err | |
| 180 } | |
| OLD | NEW |