Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(156)

Unified Diff: client/cmd/logdog_butler/output_logdog.go

Issue 1975683002: LogDog: Implement prefix registration in Butler. (Closed) Base URL: https://github.com/luci/luci-go@logdog-butler-register-collector
Patch Set: Rebarse Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « client/cmd/logdog_butler/output_log.go ('k') | client/cmd/logdog_butler/output_pubsub.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: client/cmd/logdog_butler/output_logdog.go
diff --git a/client/cmd/logdog_butler/output_logdog.go b/client/cmd/logdog_butler/output_logdog.go
new file mode 100644
index 0000000000000000000000000000000000000000..8cd2786f469ce5cbc88f9ce28e3fa859215f027b
--- /dev/null
+++ b/client/cmd/logdog_butler/output_logdog.go
@@ -0,0 +1,204 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package main
+
+import (
+ "errors"
+ "fmt"
+ "runtime"
+ "strings"
+ "time"
+
+ "github.com/luci/luci-go/client/internal/logdog/butler/output"
+ out "github.com/luci/luci-go/client/internal/logdog/butler/output/pubsub"
+ api "github.com/luci/luci-go/common/api/logdog_coordinator/registration/v1"
+ "github.com/luci/luci-go/common/auth"
+ "github.com/luci/luci-go/common/clock/clockflag"
+ "github.com/luci/luci-go/common/flag/multiflag"
+ ps "github.com/luci/luci-go/common/gcloud/pubsub"
+ log "github.com/luci/luci-go/common/logging"
+ "github.com/luci/luci-go/common/proto/google"
+ "github.com/luci/luci-go/common/prpc"
+ "github.com/luci/luci-go/common/retry"
+ "golang.org/x/net/context"
+ "google.golang.org/cloud"
+ "google.golang.org/cloud/pubsub"
+)
+
+func init() {
+ registerOutputFactory(new(logdogOutputFactory))
+}
+
+// logdogOutputFactory for publishing logs using a LogDog Coordinator host.
+type logdogOutputFactory struct {
+ host string
+ prefixExpiration clockflag.Duration
+
+ track bool
+}
+
+var _ outputFactory = (*logdogOutputFactory)(nil)
+
+func (f *logdogOutputFactory) option() multiflag.Option {
+ opt := newOutputOption("logdog", "Output to a LogDog Coordinator instance.", f)
+
+ flags := opt.Flags()
+ flags.StringVar(&f.host, "host", "",
+ "The LogDog Coordinator host name.")
+ flags.Var(&f.prefixExpiration, "prefix-expiration",
+ "Amount of time after registration that the prefix will be active. If omitted, the service "+
+ "default will be used. This should exceed the expected lifetime of the job by a fair margin.")
+
+ // TODO(dnj): Default to false when mandatory debugging is finished.
+ flags.BoolVar(&f.track, "track", true,
+ "Track each sent message. This adds CPU/memory overhead.")
+
+ return opt
+}
+
+func (f *logdogOutputFactory) configOutput(a *application) (output.Output, error) {
+ // Open a pRPC client to our Coordinator instance.
+ authenticator, err := a.authenticator(a)
+ if err != nil {
+ log.WithError(err).Errorf(a, "Failed to get authenticator.")
+ return nil, err
+ }
+ httpClient, err := authenticator.Client()
+ if err != nil {
+ log.WithError(err).Errorf(a, "Failed to get authenticated HTTP client.")
+ return nil, err
+ }
+
+ // Configure our pRPC client.
+ client := prpc.Client{
+ C: httpClient,
+ Host: f.host,
+ Options: prpc.DefaultOptions(),
+ }
+
+ // If our host begins with "localhost", set insecure option automatically.
+ if isLocalHost(f.host) {
+ log.Infof(a, "Detected localhost; enabling insecure RPC connection.")
+ client.Options.Insecure = true
+ }
+
+ // Register our Prefix with the Coordinator.
+ log.Fields{
+ "prefix": a.prefix,
+ "host": f.host,
+ }.Debugf(a, "Registering prefix space with Coordinator service.")
+
+ svc := api.NewRegistrationPRPCClient(&client)
+ resp, err := svc.RegisterPrefix(a, &api.RegisterPrefixRequest{
+ Project: string(a.project),
+ Prefix: string(a.prefix),
+ SourceInfo: []string{
+ "LogDog Butler",
+ fmt.Sprintf("GOARCH=%s", runtime.GOARCH),
+ fmt.Sprintf("GOOS=%s", runtime.GOOS),
+ },
+ Expiration: google.NewDuration(time.Duration(f.prefixExpiration)),
+ })
+ if err != nil {
+ log.WithError(err).Errorf(a, "Failed to register prefix with Coordinator service.")
+ return nil, err
+ }
+ log.Fields{
+ "prefix": a.prefix,
+ "bundleTopic": resp.LogBundleTopic,
+ }.Debugf(a, "Successfully registered log stream prefix.")
+
+ // Validate the response topic.
+ fullTopic := ps.Topic(resp.LogBundleTopic)
+ if err := fullTopic.Validate(); err != nil {
+ log.Fields{
+ log.ErrorKey: err,
+ "fullTopic": fullTopic,
+ }.Errorf(a, "Coordinator returned invalid Pub/Sub topic.")
+ return nil, err
+ }
+
+ // Split our topic into project and topic name. This must succeed, since we
+ // just finished validating the topic.
+ proj, topic := fullTopic.Split()
+
+ // Instantiate our Pub/Sub instance.
+ //
+ // We will use the non-cancelling context, for all Pub/Sub calls, as we want
+ // the Pub/Sub system to drain without interruption if the application is
+ // otherwise canceled.
+ psClient, err := pubsub.NewClient(a.ncCtx, proj, cloud.WithTokenSource(authenticator.TokenSource()))
+ if err != nil {
+ log.Fields{
+ log.ErrorKey: err,
+ "project": proj,
+ }.Errorf(a, "Failed to create Pub/Sub client.")
+ return nil, errors.New("failed to get Pub/Sub client")
+ }
+ psTopic := psClient.Topic(topic)
+
+ // Assert that our Topic exists.
+ exists, err := retryTopicExists(a, psTopic)
+ if err != nil {
+ log.Fields{
+ log.ErrorKey: err,
+ "project": proj,
+ "topic": topic,
+ }.Errorf(a, "Failed to check for Pub/Sub topic.")
+ return nil, errors.New("failed to check for Pub/Sub topic")
+ }
+ if !exists {
+ log.Fields{
+ "fullTopic": fullTopic,
+ }.Errorf(a, "Pub/Sub Topic does not exist.")
+ return nil, errors.New("PubSub topic does not exist")
+ }
+
+ // We own the prefix and all verifiable parameters have been validated.
+ // Successfully return our Output instance.
+ //
+ // Note that we use our non-cancelling context here.
+ return out.New(a.ncCtx, out.Config{
+ Topic: psTopic,
+ Secret: resp.Secret,
+ Compress: true,
+ Track: f.track,
+ }), nil
+}
+
+func (f *logdogOutputFactory) scopes() []string {
+ // E-mail scope needed for Coordinator authentication.
+ scopes := []string{auth.OAuthScopeEmail}
+ // Publisher scope needed to publish to Pub/Sub transport.
+ scopes = append(scopes, ps.PublisherScopes...)
+ return scopes
+}
+
+func retryTopicExists(ctx context.Context, t *pubsub.Topic) (bool, error) {
+ var exists bool
+ err := retry.Retry(ctx, retry.Default, func() (err error) {
+ exists, err = t.Exists(ctx)
+ return
+ }, func(err error, d time.Duration) {
+ log.Fields{
+ log.ErrorKey: err,
+ "delay": d,
+ }.Errorf(ctx, "Failed to check if topic exists; retrying...")
+ })
+ return exists, err
+}
+
+func isLocalHost(host string) bool {
+ switch {
+ case host == "localhost", strings.HasPrefix(host, "localhost:"):
+ case host == "127.0.0.1", strings.HasPrefix(host, "127.0.0.1:"):
+ case host == "[::1]", strings.HasPrefix(host, "[::1]:"):
+ case strings.HasPrefix(host, ":"):
+
+ default:
+ return false
+ }
+ return true
+}
« no previous file with comments | « client/cmd/logdog_butler/output_log.go ('k') | client/cmd/logdog_butler/output_pubsub.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698