| OLD | NEW |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package coordinator | 5 package coordinator |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "time" | 8 "time" |
| 9 | 9 |
| 10 gcps "cloud.google.com/go/pubsub" | 10 gcps "cloud.google.com/go/pubsub" |
| 11 "github.com/golang/protobuf/proto" | 11 "github.com/golang/protobuf/proto" |
| 12 log "github.com/luci/luci-go/common/logging" | 12 log "github.com/luci/luci-go/common/logging" |
| 13 "github.com/luci/luci-go/common/retry" | 13 "github.com/luci/luci-go/common/retry" |
| 14 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" | 14 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" |
| 15 "golang.org/x/net/context" | 15 "golang.org/x/net/context" |
| 16 ) | 16 ) |
| 17 | 17 |
| 18 // ArchivalPublisher is capable of publishing archival requests. | 18 // ArchivalPublisher is capable of publishing archival requests. |
| 19 type ArchivalPublisher interface { | 19 type ArchivalPublisher interface { |
| 20 // Close shutdowns this publisher, releasing all its resources. |
| 21 Close() error |
| 22 |
| 20 // Publish publishes the supplied ArchiveTask. | 23 // Publish publishes the supplied ArchiveTask. |
| 21 Publish(context.Context, *logdog.ArchiveTask) error | 24 Publish(context.Context, *logdog.ArchiveTask) error |
| 22 | 25 |
| 23 // NewPublishIndex returns a new publish index. Each publish index is un
ique | 26 // NewPublishIndex returns a new publish index. Each publish index is un
ique |
| 24 // within its request. | 27 // within its request. |
| 25 NewPublishIndex() uint64 | 28 NewPublishIndex() uint64 |
| 26 } | 29 } |
| 27 | 30 |
| 28 type pubsubArchivalPublisher struct { | 31 type pubsubArchivalPublisher struct { |
| 32 // client is Pub/Sub client used by the publisher. |
| 33 client *gcps.Client |
| 34 |
| 29 // topic is the authenticated Pub/Sub topic handle to publish to. | 35 // topic is the authenticated Pub/Sub topic handle to publish to. |
| 30 topic *gcps.Topic | 36 topic *gcps.Topic |
| 31 | 37 |
| 32 // publishIndexFunc is a function that will return a unique publish inde
x | 38 // publishIndexFunc is a function that will return a unique publish inde
x |
| 33 // for this request. | 39 // for this request. |
| 34 publishIndexFunc func() uint64 | 40 publishIndexFunc func() uint64 |
| 35 } | 41 } |
| 36 | 42 |
| 43 func (p *pubsubArchivalPublisher) Close() error { |
| 44 return p.client.Close() |
| 45 } |
| 46 |
| 37 func (p *pubsubArchivalPublisher) Publish(c context.Context, t *logdog.ArchiveTa
sk) error { | 47 func (p *pubsubArchivalPublisher) Publish(c context.Context, t *logdog.ArchiveTa
sk) error { |
| 38 d, err := proto.Marshal(t) | 48 d, err := proto.Marshal(t) |
| 39 if err != nil { | 49 if err != nil { |
| 40 log.WithError(err).Errorf(c, "Failed to marshal task.") | 50 log.WithError(err).Errorf(c, "Failed to marshal task.") |
| 41 return err | 51 return err |
| 42 } | 52 } |
| 43 | 53 |
| 44 // TODO: Route this through some system (e.g., task queue, tumble) that
can | 54 // TODO: Route this through some system (e.g., task queue, tumble) that
can |
| 45 // impose a dispatch delay for the settle period. | 55 // impose a dispatch delay for the settle period. |
| 46 msg := gcps.Message{ | 56 msg := gcps.Message{ |
| (...skipping 13 matching lines...) Expand all Loading... |
| 60 log.Fields{ | 70 log.Fields{ |
| 61 log.ErrorKey: err, | 71 log.ErrorKey: err, |
| 62 "delay": d, | 72 "delay": d, |
| 63 }.Warningf(c, "Failed to publish task. Retrying...") | 73 }.Warningf(c, "Failed to publish task. Retrying...") |
| 64 }) | 74 }) |
| 65 } | 75 } |
| 66 | 76 |
| 67 func (p *pubsubArchivalPublisher) NewPublishIndex() uint64 { | 77 func (p *pubsubArchivalPublisher) NewPublishIndex() uint64 { |
| 68 return p.publishIndexFunc() | 78 return p.publishIndexFunc() |
| 69 } | 79 } |
| OLD | NEW |