Chromium Code Reviews| Index: logdog/appengine/coordinator/tasks/archival.go |
| diff --git a/logdog/appengine/coordinator/tasks/archival.go b/logdog/appengine/coordinator/tasks/archival.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..8cee444f0ae744336376cffe12d7df40e0be27f2 |
| --- /dev/null |
| +++ b/logdog/appengine/coordinator/tasks/archival.go |
| @@ -0,0 +1,180 @@ |
| +// Copyright 2017 The LUCI Authors. |
| +// |
| +// Licensed under the Apache License, Version 2.0 (the "License"); |
| +// you may not use this file except in compliance with the License. |
| +// You may obtain a copy of the License at |
| +// |
| +// http://www.apache.org/licenses/LICENSE-2.0 |
| +// |
| +// Unless required by applicable law or agreed to in writing, software |
| +// distributed under the License is distributed on an "AS IS" BASIS, |
| +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| +// See the License for the specific language governing permissions and |
| +// limitations under the License. |
| + |
| +package tasks |
| + |
| +import ( |
| + "fmt" |
| + "io" |
| + "time" |
| + |
| + "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" |
| + "github.com/luci/luci-go/logdog/appengine/coordinator" |
| + |
| + "github.com/luci/luci-go/appengine/tq" |
| + "github.com/luci/luci-go/common/errors" |
| + log "github.com/luci/luci-go/common/logging" |
| + "github.com/luci/luci-go/common/proto/google" |
| + "github.com/luci/luci-go/common/retry/transient" |
| + |
| + ds "github.com/luci/gae/service/datastore" |
| + "github.com/luci/gae/service/info" |
| + |
| + "github.com/golang/protobuf/proto" |
| + "golang.org/x/net/context" |
| +) |
| + |
| +// CreateArchivalTask adds a task to the task queue to initiate |
| +// archival on an stream. The task will be delayed by "delay". |
| +func CreateArchivalTask(c context.Context, id coordinator.HashID, tag logdog.ArchiveDispatchTask_Tag, |
| + delay time.Duration, params *coordinator.ArchivalParams) error { |
| + |
| + task := makeArchivalTask(c, id, tag) |
| + task.Payload = &logdog.ArchiveDispatchTask{ |
| + Id: string(id), |
| + Tag: tag, |
| + SettleDelay: google.NewDuration(params.SettleDelay), |
| + CompletePeriod: google.NewDuration(params.CompletePeriod), |
| + } |
| + task.Delay = delay |
| + |
| + if err := taskDispatcher.AddTask(c, task); err != nil { |
| + log.Fields{ |
| + log.ErrorKey: err, |
| + "taskName": task.Name(), |
| + }.Errorf(c, "Failed to add task to task queue.") |
|
Ryan Tseng
2017/08/03 19:30:04
This will cause the same error to log multiple tim
dnj
2017/08/03 21:05:47
Done.
|
| + return err |
| + } |
| + |
| + log.Debugf(c, "Successfully created archival task: %q", task.Name()) |
| + return nil |
| +} |
| + |
| +// ClearArchiveIncompleteExpiredTask deletes the enqueued task for the specified |
| +// hash ID. |
| +// |
| +// This task will be created by CreateArchiveIncompleteTask. |
| +func ClearArchiveIncompleteExpiredTask(c context.Context, id coordinator.HashID) error { |
|
Ryan Tseng
2017/08/03 19:30:04
DeleteAllExpiredTasks()?
dnj
2017/08/03 21:05:46
Nope, this clears a single archival tasks for the
|
| + task := makeArchivalTask(c, id, logdog.ArchiveDispatchTask_EXPIRED) |
| + log.Debugf(c, "Deleting archival task: %q", task.Name()) |
| + if err := taskDispatcher.DeleteTask(c, task); err != nil { |
| + log.Fields{ |
| + log.ErrorKey: err, |
| + "taskName": task.Name(), |
| + }.Errorf(c, "Failed to delete expired archival task.") |
| + return err |
| + } |
| + |
| + log.Debugf(c, "Successfully removed EXPIRED archival task: %q", task.Name()) |
| + return nil |
| +} |
| + |
| +// handleArchiveDispatchTask is a tq.Handler for an ArchiveDispatchTask. |
|
Ryan Tseng
2017/08/03 19:30:04
What does it do?
dnj
2017/08/03 21:05:47
Updated comment.
|
| +func handleArchiveDispatchTask(c context.Context, payload proto.Message, execCount int) error { |
| + adt, ok := payload.(*logdog.ArchiveDispatchTask) |
| + if !ok { |
| + return errors.Reason("unexpected message type %T", payload).Err() |
| + } |
| + |
| + log.Infof(c, "Handling archival for %q task (#%d) in namespace %q: %q", |
| + adt.Tag, execCount, info.GetNamespace(c), adt.Id) |
| + |
| + // Get our archival publisher. |
| + svc := coordinator.GetServices(c) |
| + ap, err := svc.ArchivalPublisher(c) |
| + if err != nil { |
| + log.WithError(err).Errorf(c, "Failed to get archival publisher.") |
| + return transient.Tag.Apply(err) |
| + } |
| + defer func() { |
| + if err := ap.Close(); err != nil { |
| + log.WithError(err).Warningf(c, "Failed to close archival publisher.") |
| + } |
| + }() |
| + |
| + // Get the log stream. |
| + stream := coordinator.LogStream{ID: coordinator.HashID(adt.Id)} |
| + state := stream.State(c) |
| + |
| + err = ds.RunInTransaction(c, func(c context.Context) error { |
| + if err := ds.Get(c, state); err != nil { |
| + if err == ds.ErrNoSuchEntity { |
| + log.Warningf(c, "Log stream no longer exists.") |
| + return nil |
| + } |
| + |
| + log.WithError(err).Errorf(c, "Failed to load archival log stream.") |
|
Ryan Tseng
2017/08/03 19:30:04
Use errors.Annotate() instead for this transaction
dnj
2017/08/03 21:05:46
Done.
|
| + return err |
| + } |
| + |
| + params := coordinator.ArchivalParams{ |
| + RequestID: info.RequestID(c), |
| + SettleDelay: google.DurationFromProto(adt.SettleDelay), |
| + CompletePeriod: google.DurationFromProto(adt.CompletePeriod), |
| + } |
| + if err = params.PublishTask(c, ap, state); err != nil { |
| + if err == coordinator.ErrArchiveTasked { |
| + log.Warningf(c, "Archival already tasked, skipping.") |
| + return nil |
| + } |
| + |
| + log.WithError(err).Errorf(c, "Failed to publish archival task.") |
| + return err |
| + } |
| + |
| + if err := ds.Put(c, state); err != nil { |
| + log.WithError(err).Errorf(c, "Failed to update datastore.") |
| + return err |
| + } |
| + |
| + return nil |
| + }, nil) |
| + if err != nil { |
| + log.WithError(err).Errorf(c, "Failed to publish archival task.") |
| + return transient.Tag.Apply(err) |
| + } |
| + |
| + log.Debugf(c, "Successfully published cleanup archival task.") |
| + return nil |
| +} |
| + |
| +func makeArchivalTask(c context.Context, id coordinator.HashID, tag logdog.ArchiveDispatchTask_Tag) *tq.Task { |
| + name := fmt.Sprintf("%s_%s", id, tag.String()) |
| + return &tq.Task{ |
| + NamePrefix: name, |
| + DeduplicationKey: info.GetNamespace(c), |
| + Title: name, |
| + } |
| +} |
| + |
| +// errCheckReader is an io.Reader wrapper which tracks whether or not the |
|
Ryan Tseng
2017/08/03 19:30:04
Is this used?
dnj
2017/08/03 21:05:47
No, now that I'm using "tq". Is deleted.
|
| +// underlying io.Reader encountered an error. |
| +// |
| +// We use this because we stream our Reader through a json.Decoder, which can |
| +// also return an error (on invalid JSON), so we can't outright tell whether the |
| +// error is in I/O (transient) or due to JSON content (non-transient). |
| +// |
| +// This lets us differentiate. |
| +type errCheckReader struct { |
| + inner io.Reader |
| + wasError bool |
| +} |
| + |
| +func (r *errCheckReader) Read(p []byte) (int, error) { |
| + v, err := r.inner.Read(p) |
| + if err != nil { |
| + r.wasError = true |
| + } |
| + return v, err |
| +} |