Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(669)

Side by Side Diff: server/cmd/logdog_archivist/task.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Code review comments, use Pub/Sub, archival staging, quality of life. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package main
6
7 import (
8 "time"
9
10 "github.com/luci/luci-go/common/gcloud/pubsub"
11 log "github.com/luci/luci-go/common/logging"
12 "github.com/luci/luci-go/common/retry"
13 "golang.org/x/net/context"
14 gcps "google.golang.org/cloud/pubsub"
15 )
16
17 // pubsubArchiveTask implements the archivist.Task interface for a ArchiveTask
18 // Pub/Sub message.
19 type pubSubArchivistTask struct {
20 // Context is a cloud package authenticated Context that can be used for raw
21 // Pub/Sub interaction. This is necessary because ModifyAckDeadline is n ot
22 // available to the "new API" Client.
23 context.Context
24
25 // subscriptionName is the name of the subscription that this task was p ulled
26 // from. This is NOT the full subscription path.
27 subscriptionName string
28 // msg is the message that this task is bound to.
29 msg *gcps.Message
30 }
31
32 func (t *pubSubArchivistTask) UniqueID() string {
33 // The Message's AckID is guaranteed to be unique for a single lease.
34 return t.msg.AckID
35 }
36
37 func (t *pubSubArchivistTask) Data() []byte {
38 return t.msg.Data
39 }
40
41 func (t *pubSubArchivistTask) AssertLease(c context.Context) error {
42 return retry.Retry(c, retry.Default, func() error {
43 // Call ModifyAckDeadline directly, since we need immediate conf irmation of
44 // our continued ownership of the ACK. This will change the ACK' s state
45 // from that expected by the Message Iterator's keepalive system ; however,
46 // since we're extending it to the maximum deadline, worst-case the
47 // keepalive will underestimate it and aggressively modify it.
48 //
49 // In practice, we tell the keepalive to use the maximum ACK dea dline too,
50 // so the disconnect will be minor at best.
51 return gcps.ModifyAckDeadline(t, t.subscriptionName, t.msg.AckID , pubsub.MaxACKDeadline)
52 }, func(err error, d time.Duration) {
53 log.Fields{
54 log.ErrorKey: err,
55 "delay": d,
56 }.Warningf(c, "Failed to modify ACK deadline. Retrying...")
57 })
58 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698