| OLD | NEW |
| 1 // Copyright 2016 The LUCI Authors. | 1 // Copyright 2016 The LUCI Authors. |
| 2 // | 2 // |
| 3 // Licensed under the Apache License, Version 2.0 (the "License"); | 3 // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 // you may not use this file except in compliance with the License. | 4 // you may not use this file except in compliance with the License. |
| 5 // You may obtain a copy of the License at | 5 // You may obtain a copy of the License at |
| 6 // | 6 // |
| 7 // http://www.apache.org/licenses/LICENSE-2.0 | 7 // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 // | 8 // |
| 9 // Unless required by applicable law or agreed to in writing, software | 9 // Unless required by applicable law or agreed to in writing, software |
| 10 // distributed under the License is distributed on an "AS IS" BASIS, | 10 // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 // See the License for the specific language governing permissions and | 12 // See the License for the specific language governing permissions and |
| 13 // limitations under the License. | 13 // limitations under the License. |
| 14 | 14 |
| 15 package coordinator | 15 package coordinator |
| 16 | 16 |
| 17 import ( | 17 import ( |
| 18 "time" | 18 "time" |
| 19 | 19 |
| 20 » gcps "cloud.google.com/go/pubsub" | 20 » "github.com/luci/luci-go/common/gcloud/pubsub" |
| 21 » "github.com/golang/protobuf/proto" | |
| 22 log "github.com/luci/luci-go/common/logging" | 21 log "github.com/luci/luci-go/common/logging" |
| 23 "github.com/luci/luci-go/common/retry" | 22 "github.com/luci/luci-go/common/retry" |
| 24 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" | 23 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" |
| 24 |
| 25 gcps "cloud.google.com/go/pubsub" |
| 26 "github.com/golang/protobuf/proto" |
| 27 |
| 25 "golang.org/x/net/context" | 28 "golang.org/x/net/context" |
| 26 ) | 29 ) |
| 27 | 30 |
| 28 // ArchivalPublisher is capable of publishing archival requests. | 31 // ArchivalPublisher is capable of publishing archival requests. |
| 29 type ArchivalPublisher interface { | 32 type ArchivalPublisher interface { |
| 30 // Close shutdowns this publisher instance, releasing all its resources. | 33 // Close shutdowns this publisher instance, releasing all its resources. |
| 31 Close() error | 34 Close() error |
| 32 | 35 |
| 33 // Publish publishes the supplied ArchiveTask. | 36 // Publish publishes the supplied ArchiveTask. |
| 34 Publish(context.Context, *logdog.ArchiveTask) error | 37 Publish(context.Context, *logdog.ArchiveTask) error |
| 35 | 38 |
| 36 // NewPublishIndex returns a new publish index. Each publish index is un
ique | 39 // NewPublishIndex returns a new publish index. Each publish index is un
ique |
| 37 // within its request. | 40 // within its request. |
| 38 NewPublishIndex() uint64 | 41 NewPublishIndex() uint64 |
| 39 } | 42 } |
| 40 | 43 |
| 41 type pubsubArchivalPublisher struct { | 44 type pubsubArchivalPublisher struct { |
| 42 » // client is Pub/Sub client used by the publisher. | 45 » // publisher is the client used to publish messages. |
| 43 » // | 46 » publisher pubsub.Publisher |
| 44 » // This client is owned by the prodServicesInst that created this instna
ce, | |
| 45 » // and should not be closed on shutdown here. | |
| 46 » client *gcps.Client | |
| 47 | |
| 48 » // topic is the authenticated Pub/Sub topic handle to publish to. | |
| 49 » topic *gcps.Topic | |
| 50 | 47 |
| 51 // publishIndexFunc is a function that will return a unique publish inde
x | 48 // publishIndexFunc is a function that will return a unique publish inde
x |
| 52 // for this request. | 49 // for this request. |
| 53 publishIndexFunc func() uint64 | 50 publishIndexFunc func() uint64 |
| 54 } | 51 } |
| 55 | 52 |
| 56 func (p *pubsubArchivalPublisher) Close() error { return nil } | 53 func (p *pubsubArchivalPublisher) Close() error { return nil } |
| 57 | 54 |
| 58 func (p *pubsubArchivalPublisher) Publish(c context.Context, t *logdog.ArchiveTa
sk) error { | 55 func (p *pubsubArchivalPublisher) Publish(c context.Context, t *logdog.ArchiveTa
sk) error { |
| 59 d, err := proto.Marshal(t) | 56 d, err := proto.Marshal(t) |
| 60 if err != nil { | 57 if err != nil { |
| 61 log.WithError(err).Errorf(c, "Failed to marshal task.") | 58 log.WithError(err).Errorf(c, "Failed to marshal task.") |
| 62 return err | 59 return err |
| 63 } | 60 } |
| 64 | 61 |
| 65 // TODO: Route this through some system (e.g., task queue, tumble) that
can | 62 // TODO: Route this through some system (e.g., task queue, tumble) that
can |
| 66 // impose a dispatch delay for the settle period. | 63 // impose a dispatch delay for the settle period. |
| 67 msg := gcps.Message{ | 64 msg := gcps.Message{ |
| 68 Data: d, | 65 Data: d, |
| 69 } | 66 } |
| 70 | 67 |
| 71 return retry.Retry(c, retry.Default, func() error { | 68 return retry.Retry(c, retry.Default, func() error { |
| 72 log.Fields{ | 69 log.Fields{ |
| 73 "project": t.Project, | 70 "project": t.Project, |
| 74 "hash": t.Id, | 71 "hash": t.Id, |
| 75 "key": t.Key, | 72 "key": t.Key, |
| 76 }.Infof(c, "Publishing archival message for stream.") | 73 }.Infof(c, "Publishing archival message for stream.") |
| 77 | 74 |
| 78 » » _, err := p.topic.Publish(c, &msg).Get(c) | 75 » » _, err := p.publisher.Publish(c, &msg) |
| 79 return err | 76 return err |
| 80 }, func(err error, d time.Duration) { | 77 }, func(err error, d time.Duration) { |
| 81 log.Fields{ | 78 log.Fields{ |
| 82 log.ErrorKey: err, | 79 log.ErrorKey: err, |
| 83 "delay": d, | 80 "delay": d, |
| 84 }.Warningf(c, "Failed to publish task. Retrying...") | 81 }.Warningf(c, "Failed to publish task. Retrying...") |
| 85 }) | 82 }) |
| 86 } | 83 } |
| 87 | 84 |
| 88 func (p *pubsubArchivalPublisher) NewPublishIndex() uint64 { | 85 func (p *pubsubArchivalPublisher) NewPublishIndex() uint64 { |
| 89 return p.publishIndexFunc() | 86 return p.publishIndexFunc() |
| 90 } | 87 } |
| OLD | NEW |