Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(421)

Side by Side Diff: appengine/logdog/coordinator/archivalPublisher.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Fix proto comment. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « appengine/logdog/coordinator/archival.go ('k') | appengine/logdog/coordinator/auth.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package coordinator
6
7 import (
8 "time"
9
10 "github.com/golang/protobuf/proto"
11 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
12 log "github.com/luci/luci-go/common/logging"
13 "github.com/luci/luci-go/common/retry"
14 "golang.org/x/net/context"
15 gcps "google.golang.org/cloud/pubsub"
16 )
17
18 // ArchivalPublisher is capable of publishing archival requests.
19 type ArchivalPublisher interface {
20 // Publish publishes the supplied ArchiveTask.
21 Publish(context.Context, *logdog.ArchiveTask) error
22 }
23
24 type pubsubArchivalPublisher struct {
25 // topic is the authenticated Pub/Sub topic handle to publish to.
26 topic *gcps.TopicHandle
27 }
28
29 func (p *pubsubArchivalPublisher) Publish(c context.Context, t *logdog.ArchiveTa sk) error {
30 d, err := proto.Marshal(t)
31 if err != nil {
32 log.WithError(err).Errorf(c, "Failed to marshal task.")
33 return err
34 }
35
36 // TODO: Route this through some system (e.g., task queue, tumble) that can
37 // impose a dispatch delay for the settle period.
38 msg := gcps.Message{
39 Data: d,
40 }
41
42 return retry.Retry(c, retry.Default, func() error {
43 log.Fields{
44 "path": t.Path,
45 "key": t.Key,
46 }.Infof(c, "Publishing archival message for stream.")
47
48 _, err := p.topic.Publish(c, &msg)
49 return err
50 }, func(err error, d time.Duration) {
51 log.Fields{
52 log.ErrorKey: err,
53 "delay": d,
54 }.Warningf(c, "Failed to publish task. Retrying...")
55 })
56 }
OLDNEW
« no previous file with comments | « appengine/logdog/coordinator/archival.go ('k') | appengine/logdog/coordinator/auth.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698