| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package main |
| 6 |
| 7 import ( |
| 8 "errors" |
| 9 "fmt" |
| 10 "runtime" |
| 11 "strings" |
| 12 "time" |
| 13 |
| 14 "github.com/luci/luci-go/client/internal/logdog/butler/output" |
| 15 out "github.com/luci/luci-go/client/internal/logdog/butler/output/pubsub
" |
| 16 api "github.com/luci/luci-go/common/api/logdog_coordinator/registration/
v1" |
| 17 "github.com/luci/luci-go/common/auth" |
| 18 "github.com/luci/luci-go/common/clock/clockflag" |
| 19 "github.com/luci/luci-go/common/flag/multiflag" |
| 20 ps "github.com/luci/luci-go/common/gcloud/pubsub" |
| 21 log "github.com/luci/luci-go/common/logging" |
| 22 "github.com/luci/luci-go/common/proto/google" |
| 23 "github.com/luci/luci-go/common/prpc" |
| 24 "github.com/luci/luci-go/common/retry" |
| 25 "golang.org/x/net/context" |
| 26 "google.golang.org/cloud" |
| 27 "google.golang.org/cloud/pubsub" |
| 28 ) |
| 29 |
| 30 func init() { |
| 31 registerOutputFactory(new(logdogOutputFactory)) |
| 32 } |
| 33 |
| 34 // logdogOutputFactory for publishing logs using a LogDog Coordinator host. |
| 35 type logdogOutputFactory struct { |
| 36 host string |
| 37 prefixExpiration clockflag.Duration |
| 38 |
| 39 track bool |
| 40 } |
| 41 |
| 42 var _ outputFactory = (*logdogOutputFactory)(nil) |
| 43 |
| 44 func (f *logdogOutputFactory) option() multiflag.Option { |
| 45 opt := newOutputOption("logdog", "Output to a LogDog Coordinator instanc
e.", f) |
| 46 |
| 47 flags := opt.Flags() |
| 48 flags.StringVar(&f.host, "host", "", |
| 49 "The LogDog Coordinator host name.") |
| 50 flags.Var(&f.prefixExpiration, "prefix-expiration", |
| 51 "Amount of time after registration that the prefix will be activ
e. If omitted, the service "+ |
| 52 "default will be used. This should exceed the expected l
ifetime of the job by a fair margin.") |
| 53 |
| 54 // TODO(dnj): Default to false when mandatory debugging is finished. |
| 55 flags.BoolVar(&f.track, "track", true, |
| 56 "Track each sent message. This adds CPU/memory overhead.") |
| 57 |
| 58 return opt |
| 59 } |
| 60 |
| 61 func (f *logdogOutputFactory) configOutput(a *application) (output.Output, error
) { |
| 62 // Open a pRPC client to our Coordinator instance. |
| 63 authenticator, err := a.authenticator(a) |
| 64 if err != nil { |
| 65 log.WithError(err).Errorf(a, "Failed to get authenticator.") |
| 66 return nil, err |
| 67 } |
| 68 httpClient, err := authenticator.Client() |
| 69 if err != nil { |
| 70 log.WithError(err).Errorf(a, "Failed to get authenticated HTTP c
lient.") |
| 71 return nil, err |
| 72 } |
| 73 |
| 74 // Configure our pRPC client. |
| 75 client := prpc.Client{ |
| 76 C: httpClient, |
| 77 Host: f.host, |
| 78 Options: prpc.DefaultOptions(), |
| 79 } |
| 80 |
| 81 // If our host begins with "localhost", set insecure option automaticall
y. |
| 82 if isLocalHost(f.host) { |
| 83 log.Infof(a, "Detected localhost; enabling insecure RPC connecti
on.") |
| 84 client.Options.Insecure = true |
| 85 } |
| 86 |
| 87 // Register our Prefix with the Coordinator. |
| 88 log.Fields{ |
| 89 "prefix": a.prefix, |
| 90 "host": f.host, |
| 91 }.Debugf(a, "Registering prefix space with Coordinator service.") |
| 92 |
| 93 svc := api.NewRegistrationPRPCClient(&client) |
| 94 resp, err := svc.RegisterPrefix(a, &api.RegisterPrefixRequest{ |
| 95 Project: string(a.project), |
| 96 Prefix: string(a.prefix), |
| 97 SourceInfo: []string{ |
| 98 "LogDog Butler", |
| 99 fmt.Sprintf("GOARCH=%s", runtime.GOARCH), |
| 100 fmt.Sprintf("GOOS=%s", runtime.GOOS), |
| 101 }, |
| 102 Expiration: google.NewDuration(time.Duration(f.prefixExpiration)
), |
| 103 }) |
| 104 if err != nil { |
| 105 log.WithError(err).Errorf(a, "Failed to register prefix with Coo
rdinator service.") |
| 106 return nil, err |
| 107 } |
| 108 log.Fields{ |
| 109 "prefix": a.prefix, |
| 110 "bundleTopic": resp.LogBundleTopic, |
| 111 }.Debugf(a, "Successfully registered log stream prefix.") |
| 112 |
| 113 // Validate the response topic. |
| 114 fullTopic := ps.Topic(resp.LogBundleTopic) |
| 115 if err := fullTopic.Validate(); err != nil { |
| 116 log.Fields{ |
| 117 log.ErrorKey: err, |
| 118 "fullTopic": fullTopic, |
| 119 }.Errorf(a, "Coordinator returned invalid Pub/Sub topic.") |
| 120 return nil, err |
| 121 } |
| 122 |
| 123 // Split our topic into project and topic name. This must succeed, since
we |
| 124 // just finished validating the topic. |
| 125 proj, topic := fullTopic.Split() |
| 126 |
| 127 // Instantiate our Pub/Sub instance. |
| 128 // |
| 129 // We will use the non-cancelling context, for all Pub/Sub calls, as we
want |
| 130 // the Pub/Sub system to drain without interruption if the application i
s |
| 131 // otherwise canceled. |
| 132 psClient, err := pubsub.NewClient(a.ncCtx, proj, cloud.WithTokenSource(a
uthenticator.TokenSource())) |
| 133 if err != nil { |
| 134 log.Fields{ |
| 135 log.ErrorKey: err, |
| 136 "project": proj, |
| 137 }.Errorf(a, "Failed to create Pub/Sub client.") |
| 138 return nil, errors.New("failed to get Pub/Sub client") |
| 139 } |
| 140 psTopic := psClient.Topic(topic) |
| 141 |
| 142 // Assert that our Topic exists. |
| 143 exists, err := retryTopicExists(a, psTopic) |
| 144 if err != nil { |
| 145 log.Fields{ |
| 146 log.ErrorKey: err, |
| 147 "project": proj, |
| 148 "topic": topic, |
| 149 }.Errorf(a, "Failed to check for Pub/Sub topic.") |
| 150 return nil, errors.New("failed to check for Pub/Sub topic") |
| 151 } |
| 152 if !exists { |
| 153 log.Fields{ |
| 154 "fullTopic": fullTopic, |
| 155 }.Errorf(a, "Pub/Sub Topic does not exist.") |
| 156 return nil, errors.New("PubSub topic does not exist") |
| 157 } |
| 158 |
| 159 // We own the prefix and all verifiable parameters have been validated. |
| 160 // Successfully return our Output instance. |
| 161 // |
| 162 // Note that we use our non-cancelling context here. |
| 163 return out.New(a.ncCtx, out.Config{ |
| 164 Topic: psTopic, |
| 165 Secret: resp.Secret, |
| 166 Compress: true, |
| 167 Track: f.track, |
| 168 }), nil |
| 169 } |
| 170 |
| 171 func (f *logdogOutputFactory) scopes() []string { |
| 172 // E-mail scope needed for Coordinator authentication. |
| 173 scopes := []string{auth.OAuthScopeEmail} |
| 174 // Publisher scope needed to publish to Pub/Sub transport. |
| 175 scopes = append(scopes, ps.PublisherScopes...) |
| 176 return scopes |
| 177 } |
| 178 |
| 179 func retryTopicExists(ctx context.Context, t *pubsub.Topic) (bool, error) { |
| 180 var exists bool |
| 181 err := retry.Retry(ctx, retry.Default, func() (err error) { |
| 182 exists, err = t.Exists(ctx) |
| 183 return |
| 184 }, func(err error, d time.Duration) { |
| 185 log.Fields{ |
| 186 log.ErrorKey: err, |
| 187 "delay": d, |
| 188 }.Errorf(ctx, "Failed to check if topic exists; retrying...") |
| 189 }) |
| 190 return exists, err |
| 191 } |
| 192 |
| 193 func isLocalHost(host string) bool { |
| 194 switch { |
| 195 case host == "localhost", strings.HasPrefix(host, "localhost:"): |
| 196 case host == "127.0.0.1", strings.HasPrefix(host, "127.0.0.1:"): |
| 197 case host == "[::1]", strings.HasPrefix(host, "[::1]:"): |
| 198 case strings.HasPrefix(host, ":"): |
| 199 |
| 200 default: |
| 201 return false |
| 202 } |
| 203 return true |
| 204 } |
| OLD | NEW |