OLD | NEW |
---|---|
1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package main | 5 package main |
6 | 6 |
7 import ( | 7 import ( |
8 » "time" | 8 » "github.com/golang/protobuf/proto" |
9 » "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" | |
9 | 10 |
10 » "github.com/golang/protobuf/proto" | 11 » gcps "cloud.google.com/go/pubsub" |
11 » "github.com/luci/luci-go/common/gcloud/pubsub" | |
12 » log "github.com/luci/luci-go/common/logging" | |
13 » "github.com/luci/luci-go/common/retry" | |
14 » "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" | |
15 "golang.org/x/net/context" | 12 "golang.org/x/net/context" |
16 gcps "google.golang.org/cloud/pubsub" | |
17 ) | 13 ) |
18 | 14 |
19 // pubsubArchiveTask implements the archivist.Task interface for a ArchiveTask | 15 // pubsubArchiveTask implements the archivist.Task interface for a ArchiveTask |
20 // Pub/Sub message. | 16 // Pub/Sub message. |
21 type pubSubArchivistTask struct { | 17 type pubSubArchivistTask struct { |
22 // Context is a cloud package authenticated Context that can be used for raw | |
23 // Pub/Sub interaction. This is necessary because ModifyAckDeadline is n ot | |
24 // available to the "new API" Client. | |
25 context.Context | |
26 | |
27 // subscriptionName is the name of the subscription that this task was p ulled | 18 // subscriptionName is the name of the subscription that this task was p ulled |
28 // from. This is NOT the full subscription path. | 19 // from. This is NOT the full subscription path. |
29 subscriptionName string | 20 subscriptionName string |
30 // msg is the message that this task is bound to. | 21 // msg is the message that this task is bound to. |
31 msg *gcps.Message | 22 msg *gcps.Message |
32 | 23 |
33 // at is the unmarshalled ArchiveTask from msg. | 24 // at is the unmarshalled ArchiveTask from msg. |
34 at logdog.ArchiveTask | 25 at logdog.ArchiveTask |
35 | 26 |
36 // consumed is true if this task has been marked for consumption. | 27 // consumed is true if this task has been marked for consumption. |
37 consumed bool | 28 consumed bool |
38 } | 29 } |
39 | 30 |
40 func makePubSubArchivistTask(c context.Context, s string, msg *gcps.Message) (*p ubSubArchivistTask, error) { | 31 func makePubSubArchivistTask(s string, msg *gcps.Message) (*pubSubArchivistTask, error) { |
41 // If we can't decode the archival task, we can't decide whether or not to | 32 // If we can't decode the archival task, we can't decide whether or not to |
42 // delete it, so we will leave it in the queue. | 33 // delete it, so we will leave it in the queue. |
43 t := pubSubArchivistTask{ | 34 t := pubSubArchivistTask{ |
44 Context: c, | |
45 subscriptionName: s, | 35 subscriptionName: s, |
46 msg: msg, | 36 msg: msg, |
47 } | 37 } |
48 | 38 |
49 if err := proto.Unmarshal(msg.Data, &t.at); err != nil { | 39 if err := proto.Unmarshal(msg.Data, &t.at); err != nil { |
50 return nil, err | 40 return nil, err |
51 } | 41 } |
52 return &t, nil | 42 return &t, nil |
53 } | 43 } |
54 | 44 |
55 func (t *pubSubArchivistTask) UniqueID() string { | 45 func (t *pubSubArchivistTask) UniqueID() string { |
56 // The Message's AckID is guaranteed to be unique for a single lease. | 46 // The Message's AckID is guaranteed to be unique for a single lease. |
57 return t.msg.AckID | 47 return t.msg.AckID |
58 } | 48 } |
59 | 49 |
60 func (t *pubSubArchivistTask) Task() *logdog.ArchiveTask { | 50 func (t *pubSubArchivistTask) Task() *logdog.ArchiveTask { |
61 return &t.at | 51 return &t.at |
62 } | 52 } |
63 | 53 |
64 func (t *pubSubArchivistTask) Consume() { | 54 func (t *pubSubArchivistTask) Consume() { |
65 t.consumed = true | 55 t.consumed = true |
66 } | 56 } |
67 | 57 |
68 func (t *pubSubArchivistTask) AssertLease(c context.Context) error { | 58 func (t *pubSubArchivistTask) AssertLease(c context.Context) error { return nil } |
nodir
2016/08/05 17:15:39
i think you can delete this method now
| |
69 » return retry.Retry(c, retry.Default, func() error { | |
70 » » // Call ModifyAckDeadline directly, since we need immediate conf irmation of | |
71 » » // our continued ownership of the ACK. This will change the ACK' s state | |
72 » » // from that expected by the Message Iterator's keepalive system ; however, | |
73 » » // since we're extending it to the maximum deadline, worst-case the | |
74 » » // keepalive will underestimate it and aggressively modify it. | |
75 » » // | |
76 » » // In practice, we tell the keepalive to use the maximum ACK dea dline too, | |
77 » » // so the disconnect will be minor at best. | |
78 » » return gcps.ModifyAckDeadline(t, t.subscriptionName, t.msg.AckID , pubsub.MaxACKDeadline) | |
79 » }, func(err error, d time.Duration) { | |
80 » » log.Fields{ | |
81 » » » log.ErrorKey: err, | |
82 » » » "delay": d, | |
83 » » }.Warningf(c, "Failed to modify ACK deadline. Retrying...") | |
84 » }) | |
85 } | |
OLD | NEW |