| OLD | NEW |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package coordinator | 5 package coordinator |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "time" | 8 "time" |
| 9 | 9 |
| 10 gcps "cloud.google.com/go/pubsub" | 10 gcps "cloud.google.com/go/pubsub" |
| 11 "github.com/golang/protobuf/proto" | 11 "github.com/golang/protobuf/proto" |
| 12 log "github.com/luci/luci-go/common/logging" | 12 log "github.com/luci/luci-go/common/logging" |
| 13 "github.com/luci/luci-go/common/retry" | 13 "github.com/luci/luci-go/common/retry" |
| 14 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" | 14 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" |
| 15 "golang.org/x/net/context" | 15 "golang.org/x/net/context" |
| 16 ) | 16 ) |
| 17 | 17 |
| 18 // ArchivalPublisher is capable of publishing archival requests. | 18 // ArchivalPublisher is capable of publishing archival requests. |
| 19 type ArchivalPublisher interface { | 19 type ArchivalPublisher interface { |
| 20 » // Close shutdowns this publisher, releasing all its resources. | 20 » // Close shutdowns this publisher instance, releasing all its resources. |
| 21 Close() error | 21 Close() error |
| 22 | 22 |
| 23 // Publish publishes the supplied ArchiveTask. | 23 // Publish publishes the supplied ArchiveTask. |
| 24 Publish(context.Context, *logdog.ArchiveTask) error | 24 Publish(context.Context, *logdog.ArchiveTask) error |
| 25 | 25 |
| 26 // NewPublishIndex returns a new publish index. Each publish index is un
ique | 26 // NewPublishIndex returns a new publish index. Each publish index is un
ique |
| 27 // within its request. | 27 // within its request. |
| 28 NewPublishIndex() uint64 | 28 NewPublishIndex() uint64 |
| 29 } | 29 } |
| 30 | 30 |
| 31 type pubsubArchivalPublisher struct { | 31 type pubsubArchivalPublisher struct { |
| 32 // client is Pub/Sub client used by the publisher. | 32 // client is Pub/Sub client used by the publisher. |
| 33 // |
| 34 // This client is owned by the prodServicesInst that created this instna
ce, |
| 35 // and should not be closed on shutdown here. |
| 33 client *gcps.Client | 36 client *gcps.Client |
| 34 | 37 |
| 35 // topic is the authenticated Pub/Sub topic handle to publish to. | 38 // topic is the authenticated Pub/Sub topic handle to publish to. |
| 36 topic *gcps.Topic | 39 topic *gcps.Topic |
| 37 | 40 |
| 38 // publishIndexFunc is a function that will return a unique publish inde
x | 41 // publishIndexFunc is a function that will return a unique publish inde
x |
| 39 // for this request. | 42 // for this request. |
| 40 publishIndexFunc func() uint64 | 43 publishIndexFunc func() uint64 |
| 41 } | 44 } |
| 42 | 45 |
| 43 func (p *pubsubArchivalPublisher) Close() error { | 46 func (p *pubsubArchivalPublisher) Close() error { return nil } |
| 44 » return p.client.Close() | |
| 45 } | |
| 46 | 47 |
| 47 func (p *pubsubArchivalPublisher) Publish(c context.Context, t *logdog.ArchiveTa
sk) error { | 48 func (p *pubsubArchivalPublisher) Publish(c context.Context, t *logdog.ArchiveTa
sk) error { |
| 48 d, err := proto.Marshal(t) | 49 d, err := proto.Marshal(t) |
| 49 if err != nil { | 50 if err != nil { |
| 50 log.WithError(err).Errorf(c, "Failed to marshal task.") | 51 log.WithError(err).Errorf(c, "Failed to marshal task.") |
| 51 return err | 52 return err |
| 52 } | 53 } |
| 53 | 54 |
| 54 // TODO: Route this through some system (e.g., task queue, tumble) that
can | 55 // TODO: Route this through some system (e.g., task queue, tumble) that
can |
| 55 // impose a dispatch delay for the settle period. | 56 // impose a dispatch delay for the settle period. |
| (...skipping 14 matching lines...) Expand all Loading... |
| 70 log.Fields{ | 71 log.Fields{ |
| 71 log.ErrorKey: err, | 72 log.ErrorKey: err, |
| 72 "delay": d, | 73 "delay": d, |
| 73 }.Warningf(c, "Failed to publish task. Retrying...") | 74 }.Warningf(c, "Failed to publish task. Retrying...") |
| 74 }) | 75 }) |
| 75 } | 76 } |
| 76 | 77 |
| 77 func (p *pubsubArchivalPublisher) NewPublishIndex() uint64 { | 78 func (p *pubsubArchivalPublisher) NewPublishIndex() uint64 { |
| 78 return p.publishIndexFunc() | 79 return p.publishIndexFunc() |
| 79 } | 80 } |
| OLD | NEW |