Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(296)

Side by Side Diff: logdog/server/cmd/logdog_archivist/task.go

Issue 2219023003: Update APIs to use new Google cloud paths. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The LUCI Authors. All rights reserved. 1 // Copyright 2016 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package main 5 package main
6 6
7 import ( 7 import (
8 » "time" 8 » "github.com/golang/protobuf/proto"
9 » "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1"
9 10
10 » "github.com/golang/protobuf/proto" 11 » gcps "cloud.google.com/go/pubsub"
11 » "github.com/luci/luci-go/common/gcloud/pubsub"
12 » log "github.com/luci/luci-go/common/logging"
13 » "github.com/luci/luci-go/common/retry"
14 » "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1"
15 "golang.org/x/net/context" 12 "golang.org/x/net/context"
16 gcps "google.golang.org/cloud/pubsub"
17 ) 13 )
18 14
19 // pubsubArchiveTask implements the archivist.Task interface for a ArchiveTask 15 // pubsubArchiveTask implements the archivist.Task interface for a ArchiveTask
20 // Pub/Sub message. 16 // Pub/Sub message.
21 type pubSubArchivistTask struct { 17 type pubSubArchivistTask struct {
22 // Context is a cloud package authenticated Context that can be used for raw
23 // Pub/Sub interaction. This is necessary because ModifyAckDeadline is n ot
24 // available to the "new API" Client.
25 context.Context
26
27 // subscriptionName is the name of the subscription that this task was p ulled 18 // subscriptionName is the name of the subscription that this task was p ulled
28 // from. This is NOT the full subscription path. 19 // from. This is NOT the full subscription path.
29 subscriptionName string 20 subscriptionName string
30 // msg is the message that this task is bound to. 21 // msg is the message that this task is bound to.
31 msg *gcps.Message 22 msg *gcps.Message
32 23
33 // at is the unmarshalled ArchiveTask from msg. 24 // at is the unmarshalled ArchiveTask from msg.
34 at logdog.ArchiveTask 25 at logdog.ArchiveTask
35 26
36 // consumed is true if this task has been marked for consumption. 27 // consumed is true if this task has been marked for consumption.
37 consumed bool 28 consumed bool
38 } 29 }
39 30
40 func makePubSubArchivistTask(c context.Context, s string, msg *gcps.Message) (*p ubSubArchivistTask, error) { 31 func makePubSubArchivistTask(s string, msg *gcps.Message) (*pubSubArchivistTask, error) {
41 // If we can't decode the archival task, we can't decide whether or not to 32 // If we can't decode the archival task, we can't decide whether or not to
42 // delete it, so we will leave it in the queue. 33 // delete it, so we will leave it in the queue.
43 t := pubSubArchivistTask{ 34 t := pubSubArchivistTask{
44 Context: c,
45 subscriptionName: s, 35 subscriptionName: s,
46 msg: msg, 36 msg: msg,
47 } 37 }
48 38
49 if err := proto.Unmarshal(msg.Data, &t.at); err != nil { 39 if err := proto.Unmarshal(msg.Data, &t.at); err != nil {
50 return nil, err 40 return nil, err
51 } 41 }
52 return &t, nil 42 return &t, nil
53 } 43 }
54 44
55 func (t *pubSubArchivistTask) UniqueID() string { 45 func (t *pubSubArchivistTask) UniqueID() string {
56 // The Message's AckID is guaranteed to be unique for a single lease. 46 // The Message's AckID is guaranteed to be unique for a single lease.
57 return t.msg.AckID 47 return t.msg.AckID
58 } 48 }
59 49
60 func (t *pubSubArchivistTask) Task() *logdog.ArchiveTask { 50 func (t *pubSubArchivistTask) Task() *logdog.ArchiveTask {
61 return &t.at 51 return &t.at
62 } 52 }
63 53
64 func (t *pubSubArchivistTask) Consume() { 54 func (t *pubSubArchivistTask) Consume() {
65 t.consumed = true 55 t.consumed = true
66 } 56 }
67 57
68 func (t *pubSubArchivistTask) AssertLease(c context.Context) error { 58 func (t *pubSubArchivistTask) AssertLease(c context.Context) error { return nil }
nodir 2016/08/05 17:15:39 i think you can delete this method now
69 » return retry.Retry(c, retry.Default, func() error {
70 » » // Call ModifyAckDeadline directly, since we need immediate conf irmation of
71 » » // our continued ownership of the ACK. This will change the ACK' s state
72 » » // from that expected by the Message Iterator's keepalive system ; however,
73 » » // since we're extending it to the maximum deadline, worst-case the
74 » » // keepalive will underestimate it and aggressively modify it.
75 » » //
76 » » // In practice, we tell the keepalive to use the maximum ACK dea dline too,
77 » » // so the disconnect will be minor at best.
78 » » return gcps.ModifyAckDeadline(t, t.subscriptionName, t.msg.AckID , pubsub.MaxACKDeadline)
79 » }, func(err error, d time.Duration) {
80 » » log.Fields{
81 » » » log.ErrorKey: err,
82 » » » "delay": d,
83 » » }.Warningf(c, "Failed to modify ACK deadline. Retrying...")
84 » })
85 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698