Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package main | 5 package main |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 » "time" | 8 » "github.com/golang/protobuf/proto" |
| 9 » "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" | |
| 9 | 10 |
| 10 » "github.com/golang/protobuf/proto" | 11 » gcps "cloud.google.com/go/pubsub" |
| 11 » "github.com/luci/luci-go/common/gcloud/pubsub" | |
| 12 » log "github.com/luci/luci-go/common/logging" | |
| 13 » "github.com/luci/luci-go/common/retry" | |
| 14 » "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" | |
| 15 "golang.org/x/net/context" | 12 "golang.org/x/net/context" |
| 16 gcps "google.golang.org/cloud/pubsub" | |
| 17 ) | 13 ) |
| 18 | 14 |
| 19 // pubsubArchiveTask implements the archivist.Task interface for a ArchiveTask | 15 // pubsubArchiveTask implements the archivist.Task interface for a ArchiveTask |
| 20 // Pub/Sub message. | 16 // Pub/Sub message. |
| 21 type pubSubArchivistTask struct { | 17 type pubSubArchivistTask struct { |
| 22 // Context is a cloud package authenticated Context that can be used for raw | |
| 23 // Pub/Sub interaction. This is necessary because ModifyAckDeadline is n ot | |
| 24 // available to the "new API" Client. | |
| 25 context.Context | |
| 26 | |
| 27 // subscriptionName is the name of the subscription that this task was p ulled | 18 // subscriptionName is the name of the subscription that this task was p ulled |
| 28 // from. This is NOT the full subscription path. | 19 // from. This is NOT the full subscription path. |
| 29 subscriptionName string | 20 subscriptionName string |
| 30 // msg is the message that this task is bound to. | 21 // msg is the message that this task is bound to. |
| 31 msg *gcps.Message | 22 msg *gcps.Message |
| 32 | 23 |
| 33 // at is the unmarshalled ArchiveTask from msg. | 24 // at is the unmarshalled ArchiveTask from msg. |
| 34 at logdog.ArchiveTask | 25 at logdog.ArchiveTask |
| 35 | 26 |
| 36 // consumed is true if this task has been marked for consumption. | 27 // consumed is true if this task has been marked for consumption. |
| 37 consumed bool | 28 consumed bool |
| 38 } | 29 } |
| 39 | 30 |
| 40 func makePubSubArchivistTask(c context.Context, s string, msg *gcps.Message) (*p ubSubArchivistTask, error) { | 31 func makePubSubArchivistTask(s string, msg *gcps.Message) (*pubSubArchivistTask, error) { |
| 41 // If we can't decode the archival task, we can't decide whether or not to | 32 // If we can't decode the archival task, we can't decide whether or not to |
| 42 // delete it, so we will leave it in the queue. | 33 // delete it, so we will leave it in the queue. |
| 43 t := pubSubArchivistTask{ | 34 t := pubSubArchivistTask{ |
| 44 Context: c, | |
| 45 subscriptionName: s, | 35 subscriptionName: s, |
| 46 msg: msg, | 36 msg: msg, |
| 47 } | 37 } |
| 48 | 38 |
| 49 if err := proto.Unmarshal(msg.Data, &t.at); err != nil { | 39 if err := proto.Unmarshal(msg.Data, &t.at); err != nil { |
| 50 return nil, err | 40 return nil, err |
| 51 } | 41 } |
| 52 return &t, nil | 42 return &t, nil |
| 53 } | 43 } |
| 54 | 44 |
| 55 func (t *pubSubArchivistTask) UniqueID() string { | 45 func (t *pubSubArchivistTask) UniqueID() string { |
| 56 // The Message's AckID is guaranteed to be unique for a single lease. | 46 // The Message's AckID is guaranteed to be unique for a single lease. |
| 57 return t.msg.AckID | 47 return t.msg.AckID |
| 58 } | 48 } |
| 59 | 49 |
| 60 func (t *pubSubArchivistTask) Task() *logdog.ArchiveTask { | 50 func (t *pubSubArchivistTask) Task() *logdog.ArchiveTask { |
| 61 return &t.at | 51 return &t.at |
| 62 } | 52 } |
| 63 | 53 |
| 64 func (t *pubSubArchivistTask) Consume() { | 54 func (t *pubSubArchivistTask) Consume() { |
| 65 t.consumed = true | 55 t.consumed = true |
| 66 } | 56 } |
| 67 | 57 |
| 68 func (t *pubSubArchivistTask) AssertLease(c context.Context) error { | 58 func (t *pubSubArchivistTask) AssertLease(c context.Context) error { return nil } |
|
nodir
2016/08/05 17:15:39
i think you can delete this method now
| |
| 69 » return retry.Retry(c, retry.Default, func() error { | |
| 70 » » // Call ModifyAckDeadline directly, since we need immediate conf irmation of | |
| 71 » » // our continued ownership of the ACK. This will change the ACK' s state | |
| 72 » » // from that expected by the Message Iterator's keepalive system ; however, | |
| 73 » » // since we're extending it to the maximum deadline, worst-case the | |
| 74 » » // keepalive will underestimate it and aggressively modify it. | |
| 75 » » // | |
| 76 » » // In practice, we tell the keepalive to use the maximum ACK dea dline too, | |
| 77 » » // so the disconnect will be minor at best. | |
| 78 » » return gcps.ModifyAckDeadline(t, t.subscriptionName, t.msg.AckID , pubsub.MaxACKDeadline) | |
| 79 » }, func(err error, d time.Duration) { | |
| 80 » » log.Fields{ | |
| 81 » » » log.ErrorKey: err, | |
| 82 » » » "delay": d, | |
| 83 » » }.Warningf(c, "Failed to modify ACK deadline. Retrying...") | |
| 84 » }) | |
| 85 } | |
| OLD | NEW |