Index: logdog/server/cmd/logdog_archivist/task.go |
diff --git a/logdog/server/cmd/logdog_archivist/task.go b/logdog/server/cmd/logdog_archivist/task.go |
index e3f375967663f18aa41a2e9720efa770d72ea1e6..b09958f687489c1b64a98d8ae003557f126c6617 100644 |
--- a/logdog/server/cmd/logdog_archivist/task.go |
+++ b/logdog/server/cmd/logdog_archivist/task.go |
@@ -5,25 +5,16 @@ |
package main |
import ( |
- "time" |
- |
"github.com/golang/protobuf/proto" |
- "github.com/luci/luci-go/common/gcloud/pubsub" |
- log "github.com/luci/luci-go/common/logging" |
- "github.com/luci/luci-go/common/retry" |
"github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" |
+ |
+ gcps "cloud.google.com/go/pubsub" |
"golang.org/x/net/context" |
- gcps "google.golang.org/cloud/pubsub" |
) |
// pubsubArchiveTask implements the archivist.Task interface for a ArchiveTask |
// Pub/Sub message. |
type pubSubArchivistTask struct { |
- // Context is a cloud package authenticated Context that can be used for raw |
- // Pub/Sub interaction. This is necessary because ModifyAckDeadline is not |
- // available to the "new API" Client. |
- context.Context |
- |
// subscriptionName is the name of the subscription that this task was pulled |
// from. This is NOT the full subscription path. |
subscriptionName string |
@@ -37,11 +28,10 @@ type pubSubArchivistTask struct { |
consumed bool |
} |
-func makePubSubArchivistTask(c context.Context, s string, msg *gcps.Message) (*pubSubArchivistTask, error) { |
+func makePubSubArchivistTask(s string, msg *gcps.Message) (*pubSubArchivistTask, error) { |
// If we can't decode the archival task, we can't decide whether or not to |
// delete it, so we will leave it in the queue. |
t := pubSubArchivistTask{ |
- Context: c, |
subscriptionName: s, |
msg: msg, |
} |
@@ -65,21 +55,4 @@ func (t *pubSubArchivistTask) Consume() { |
t.consumed = true |
} |
-func (t *pubSubArchivistTask) AssertLease(c context.Context) error { |
- return retry.Retry(c, retry.Default, func() error { |
- // Call ModifyAckDeadline directly, since we need immediate confirmation of |
- // our continued ownership of the ACK. This will change the ACK's state |
- // from that expected by the Message Iterator's keepalive system; however, |
- // since we're extending it to the maximum deadline, worst-case the |
- // keepalive will underestimate it and aggressively modify it. |
- // |
- // In practice, we tell the keepalive to use the maximum ACK deadline too, |
- // so the disconnect will be minor at best. |
- return gcps.ModifyAckDeadline(t, t.subscriptionName, t.msg.AckID, pubsub.MaxACKDeadline) |
- }, func(err error, d time.Duration) { |
- log.Fields{ |
- log.ErrorKey: err, |
- "delay": d, |
- }.Warningf(c, "Failed to modify ACK deadline. Retrying...") |
- }) |
-} |
+func (t *pubSubArchivistTask) AssertLease(c context.Context) error { return nil } |
nodir
2016/08/05 17:15:39
i think you can delete this method now
|