Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package main | |
| 6 | |
| 7 import ( | |
| 8 "errors" | |
| 9 "fmt" | |
| 10 "runtime" | |
| 11 "strings" | |
| 12 "time" | |
| 13 | |
| 14 "github.com/luci/luci-go/client/internal/logdog/butler/output" | |
| 15 out "github.com/luci/luci-go/client/internal/logdog/butler/output/pubsub " | |
| 16 api "github.com/luci/luci-go/common/api/logdog_coordinator/registration/ v1" | |
| 17 "github.com/luci/luci-go/common/auth" | |
| 18 "github.com/luci/luci-go/common/clock/clockflag" | |
| 19 "github.com/luci/luci-go/common/flag/multiflag" | |
| 20 ps "github.com/luci/luci-go/common/gcloud/pubsub" | |
| 21 log "github.com/luci/luci-go/common/logging" | |
| 22 "github.com/luci/luci-go/common/proto/google" | |
| 23 "github.com/luci/luci-go/common/prpc" | |
| 24 "github.com/luci/luci-go/common/retry" | |
| 25 "golang.org/x/net/context" | |
| 26 "google.golang.org/cloud" | |
| 27 "google.golang.org/cloud/pubsub" | |
| 28 ) | |
| 29 | |
| 30 func init() { | |
| 31 registerOutputFactory(new(logdogOutputFactory)) | |
| 32 } | |
| 33 | |
| 34 // logdogOutputFactory for publishing logs using a LogDog Coordinator host. | |
| 35 type logdogOutputFactory struct { | |
| 36 host string | |
| 37 prefixExpiration clockflag.Duration | |
| 38 | |
| 39 track bool | |
| 40 } | |
| 41 | |
| 42 var _ outputFactory = (*logdogOutputFactory)(nil) | |
| 43 | |
| 44 func (f *logdogOutputFactory) option() multiflag.Option { | |
| 45 opt := newOutputOption("logdog", "Output to a LogDog Coordinator instanc e.", f) | |
| 46 | |
| 47 flags := opt.Flags() | |
| 48 flags.StringVar(&f.host, "host", "", | |
| 49 "The LogDog Coordinator host name.") | |
| 50 flags.Var(&f.prefixExpiration, "prefix-expiration", | |
| 51 "Amount of time after registration that the prefix will be activ e. If omitted, the service "+ | |
| 52 "default will be used. This should exceed the expected l ifetime of the job by a fair margin.") | |
| 53 | |
| 54 // TODO(dnj): Default to false when mandatory debugging is finished. | |
| 55 flags.BoolVar(&f.track, "track", true, | |
| 56 "Track each sent message. This adds CPU/memory overhead.") | |
| 57 | |
| 58 return opt | |
| 59 } | |
| 60 | |
| 61 func (f *logdogOutputFactory) configOutput(a *application) (output.Output, error ) { | |
| 62 // Open a pRPC client to our Coordinator instance. | |
| 63 authenticator, err := a.authenticator(a) | |
| 64 if err != nil { | |
| 65 log.WithError(err).Errorf(a, "Failed to get authenticator.") | |
| 66 return nil, err | |
| 67 } | |
| 68 httpClient, err := authenticator.Client() | |
| 69 if err != nil { | |
| 70 log.WithError(err).Errorf(a, "Failed to get authenticated HTTP c lient.") | |
| 71 return nil, err | |
| 72 } | |
| 73 | |
| 74 // Configure our pRPC client. | |
| 75 client := prpc.Client{ | |
| 76 C: httpClient, | |
| 77 Host: f.host, | |
| 78 Options: prpc.DefaultOptions(), | |
| 79 } | |
| 80 | |
| 81 // If our host begins with "localhost", set insecure option automaticall y. | |
| 82 if isLocalHost(f.host) { | |
| 83 log.Infof(a, "Detected localhost; enabling insecure RPC connecti on.") | |
| 84 client.Options.Insecure = true | |
| 85 } | |
| 86 | |
| 87 // Register our Prefix with the Coordinator. | |
| 88 log.Fields{ | |
| 89 "prefix": a.prefix, | |
| 90 "host": f.host, | |
| 91 }.Debugf(a, "Registering prefix space with Coordinator service.") | |
| 92 | |
| 93 svc := retryRegistrationClient{api.NewRegistrationPRPCClient(&client)} | |
| 94 resp, err := svc.RegisterPrefix(a, &api.RegisterPrefixRequest{ | |
| 95 Project: string(a.project), | |
| 96 Prefix: string(a.prefix), | |
| 97 SourceInfo: []string{ | |
| 98 "LogDog Butler", | |
| 99 fmt.Sprintf("GOARCH=%s", runtime.GOARCH), | |
| 100 fmt.Sprintf("GOOS=%s", runtime.GOOS), | |
| 101 }, | |
| 102 Expiration: google.NewDuration(time.Duration(f.prefixExpiration) ), | |
| 103 }) | |
| 104 if err != nil { | |
| 105 log.WithError(err).Errorf(a, "Failed to register prefix with Coo rdinator service.") | |
| 106 return nil, err | |
| 107 } | |
| 108 log.Fields{ | |
| 109 "prefix": a.prefix, | |
| 110 "bundleTopic": resp.LogBundleTopic, | |
| 111 }.Debugf(a, "Successfully registered log stream prefix.") | |
| 112 | |
| 113 // Validate the response topic. | |
| 114 fullTopic := ps.Topic(resp.LogBundleTopic) | |
| 115 if err := fullTopic.Validate(); err != nil { | |
| 116 log.Fields{ | |
| 117 log.ErrorKey: err, | |
| 118 "fullTopic": fullTopic, | |
| 119 }.Errorf(a, "Coordinator returned invalid Pub/Sub topic.") | |
| 120 return nil, err | |
| 121 } | |
| 122 | |
| 123 // Split our topic into project and topic name. This must succeed, since we | |
| 124 // just finished validating the topic. | |
| 125 proj, topic := fullTopic.Split() | |
| 126 | |
| 127 // Instantiate our Pub/Sub instance. | |
| 128 // | |
| 129 // We will use the non-cancelling context, for all Pub/Sub calls, as we want | |
| 130 // the Pub/Sub system to drain without interruption if the application i s | |
| 131 // otherwise canceled. | |
| 132 psClient, err := pubsub.NewClient(a.ncCtx, proj, cloud.WithTokenSource(a uthenticator.TokenSource())) | |
| 133 if err != nil { | |
| 134 log.Fields{ | |
| 135 log.ErrorKey: err, | |
| 136 "project": proj, | |
| 137 }.Errorf(a, "Failed to create Pub/Sub client.") | |
| 138 return nil, errors.New("failed to get Pub/Sub client") | |
| 139 } | |
| 140 psTopic := psClient.Topic(topic) | |
| 141 | |
| 142 // Assert that our Topic exists. | |
| 143 exists, err := retryTopicExists(a, psTopic) | |
| 144 if err != nil { | |
| 145 log.Fields{ | |
| 146 log.ErrorKey: err, | |
| 147 "project": proj, | |
| 148 "topic": topic, | |
| 149 }.Errorf(a, "Failed to check for Pub/Sub topic.") | |
| 150 return nil, errors.New("failed to check for Pub/Sub topic") | |
| 151 } | |
| 152 if !exists { | |
| 153 log.Fields{ | |
| 154 "fullTopic": fullTopic, | |
| 155 }.Errorf(a, "Pub/Sub Topic does not exist.") | |
| 156 return nil, errors.New("PubSub topic does not exist") | |
| 157 } | |
| 158 | |
| 159 // We own the prefix and all verifiable parameters have been validated. | |
| 160 // Successfully return our Output instance. | |
| 161 // | |
| 162 // Note that we use our non-cancelling context here. | |
| 163 return out.New(a.ncCtx, out.Config{ | |
| 164 Topic: psTopic, | |
| 165 Secret: resp.Secret, | |
| 166 Compress: true, | |
| 167 Track: f.track, | |
| 168 }), nil | |
| 169 } | |
| 170 | |
| 171 func (f *logdogOutputFactory) scopes() []string { | |
| 172 // E-mail scope needed for Coordinator authentication. | |
| 173 scopes := []string{auth.OAuthScopeEmail} | |
| 174 // Publisher scope needed to publish to Pub/Sub transport. | |
| 175 scopes = append(scopes, ps.PublisherScopes...) | |
| 176 return scopes | |
| 177 } | |
| 178 | |
| 179 type retryRegistrationClient struct { | |
| 180 inner api.RegistrationClient | |
| 181 } | |
| 182 | |
| 183 func (rc *retryRegistrationClient) RegisterPrefix(ctx context.Context, req *api. RegisterPrefixRequest) ( | |
| 184 *api.RegisterPrefixResponse, error) { | |
| 185 | |
| 186 var resp *api.RegisterPrefixResponse | |
| 187 err := retry.Retry(ctx, retry.TransientOnly(retry.Default), func() (err error) { | |
|
nodir
2016/05/19 02:04:44
I am not sure rc.inner.RegisterPrefix ever returns
dnj (Google)
2016/05/19 02:09:35
Okay, I'll remove this entire retry block.
| |
| 188 resp, err = rc.inner.RegisterPrefix(ctx, req) | |
|
nodir
2016/05/19 02:04:44
it already does retries
dnj (Google)
2016/05/19 02:09:35
Good point. I'll remove this. But weren't we plann
nodir
2016/05/19 16:46:39
yeah, we were planning to make a "luci" client and
| |
| 189 return | |
| 190 }, func(err error, d time.Duration) { | |
| 191 log.Fields{ | |
| 192 log.ErrorKey: err, | |
| 193 "delay": d, | |
| 194 }.Warningf(ctx, "Transient failure registering prefix; retrying. ..") | |
| 195 }) | |
| 196 return resp, err | |
| 197 } | |
| 198 | |
| 199 func retryTopicExists(ctx context.Context, t *pubsub.TopicHandle) (bool, error) { | |
| 200 var exists bool | |
| 201 err := retry.Retry(ctx, retry.Default, func() (err error) { | |
| 202 exists, err = t.Exists(ctx) | |
| 203 return | |
| 204 }, func(err error, d time.Duration) { | |
| 205 log.Fields{ | |
| 206 log.ErrorKey: err, | |
| 207 "delay": d, | |
| 208 }.Errorf(ctx, "Failed to check if topic exists; retrying...") | |
| 209 }) | |
| 210 return exists, err | |
| 211 } | |
| 212 | |
| 213 func isLocalHost(host string) bool { | |
| 214 switch { | |
| 215 case host == "localhost", strings.HasPrefix(host, "localhost:"): | |
| 216 case host == "127.0.0.1", strings.HasPrefix(host, "127.0.0.1:"): | |
| 217 case host == "[::1]", strings.HasPrefix(host, "[::1]:"): | |
| 218 case strings.HasPrefix(host, ":"): | |
| 219 | |
| 220 default: | |
| 221 return false | |
| 222 } | |
| 223 return true | |
| 224 } | |
| OLD | NEW |