| OLD | NEW |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package main | 5 package main |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "flag" | 8 "flag" |
| 9 "fmt" | 9 "fmt" |
| 10 "io" | 10 "io" |
| 11 "net/url" |
| 12 "os" |
| 11 "os/exec" | 13 "os/exec" |
| 12 "sync" | 14 "sync" |
| 13 "time" | 15 "time" |
| 14 | 16 |
| 17 "infra/libs/infraenv" |
| 18 |
| 15 "github.com/luci/luci-go/client/logdog/annotee" | 19 "github.com/luci/luci-go/client/logdog/annotee" |
| 16 "github.com/luci/luci-go/client/logdog/annotee/annotation" | 20 "github.com/luci/luci-go/client/logdog/annotee/annotation" |
| 17 "github.com/luci/luci-go/client/logdog/butler" | 21 "github.com/luci/luci-go/client/logdog/butler" |
| 18 "github.com/luci/luci-go/client/logdog/butler/bootstrap" | 22 "github.com/luci/luci-go/client/logdog/butler/bootstrap" |
| 19 "github.com/luci/luci-go/client/logdog/butler/output" | 23 "github.com/luci/luci-go/client/logdog/butler/output" |
| 20 fileOut "github.com/luci/luci-go/client/logdog/butler/output/file" | 24 fileOut "github.com/luci/luci-go/client/logdog/butler/output/file" |
| 21 out "github.com/luci/luci-go/client/logdog/butler/output/logdog" | 25 out "github.com/luci/luci-go/client/logdog/butler/output/logdog" |
| 22 "github.com/luci/luci-go/client/logdog/butlerlib/streamclient" | 26 "github.com/luci/luci-go/client/logdog/butlerlib/streamclient" |
| 27 "github.com/luci/luci-go/client/logdog/butlerlib/streamproto" |
| 23 "github.com/luci/luci-go/common/auth" | 28 "github.com/luci/luci-go/common/auth" |
| 24 "github.com/luci/luci-go/common/config" | 29 "github.com/luci/luci-go/common/config" |
| 25 "github.com/luci/luci-go/common/ctxcmd" | 30 "github.com/luci/luci-go/common/ctxcmd" |
| 26 "github.com/luci/luci-go/common/environ" | 31 "github.com/luci/luci-go/common/environ" |
| 27 "github.com/luci/luci-go/common/errors" | 32 "github.com/luci/luci-go/common/errors" |
| 28 "github.com/luci/luci-go/common/logdog/types" | 33 "github.com/luci/luci-go/common/logdog/types" |
| 29 log "github.com/luci/luci-go/common/logging" | 34 log "github.com/luci/luci-go/common/logging" |
| 30 | 35 |
| 31 "github.com/golang/protobuf/proto" | 36 "github.com/golang/protobuf/proto" |
| 32 "golang.org/x/net/context" | 37 "golang.org/x/net/context" |
| 33 ) | 38 ) |
| 34 | 39 |
| 35 type cookLogDogParams struct { | 40 type cookLogDogParams struct { |
| 36 host string | 41 host string |
| 37 project string | 42 project string |
| 38 prefix types.StreamName | 43 prefix types.StreamName |
| 39 annotee bool | 44 annotee bool |
| 45 tee bool |
| 40 filePath string | 46 filePath string |
| 41 } | 47 } |
| 42 | 48 |
| 43 func (p *cookLogDogParams) addFlags(fs *flag.FlagSet) { | 49 func (p *cookLogDogParams) addFlags(fs *flag.FlagSet) { |
| 44 fs.StringVar( | 50 fs.StringVar( |
| 45 &p.host, | 51 &p.host, |
| 46 "logdog-host", | 52 "logdog-host", |
| 47 "", | 53 "", |
| 48 "The name of the LogDog host.") | 54 "The name of the LogDog host.") |
| 49 fs.StringVar( | 55 fs.StringVar( |
| 50 &p.project, | 56 &p.project, |
| 51 "logdog-project", | 57 "logdog-project", |
| 52 "", | 58 "", |
| 53 "The name of the LogDog project to log into. Projects have diffe
rent ACL sets, "+ | 59 "The name of the LogDog project to log into. Projects have diffe
rent ACL sets, "+ |
| 54 "so choose this appropriately.") | 60 "so choose this appropriately.") |
| 55 fs.Var( | 61 fs.Var( |
| 56 &p.prefix, | 62 &p.prefix, |
| 57 "logdog-prefix", | 63 "logdog-prefix", |
| 58 "The LogDog stream Prefix to use. If empty, one will be construc
ted from the Swarming "+ | 64 "The LogDog stream Prefix to use. If empty, one will be construc
ted from the Swarming "+ |
| 59 "task parameters (found in enviornment).") | 65 "task parameters (found in enviornment).") |
| 60 fs.BoolVar( | 66 fs.BoolVar( |
| 61 &p.annotee, | 67 &p.annotee, |
| 62 "logdog-enable-annotee", | 68 "logdog-enable-annotee", |
| 63 true, | 69 true, |
| 64 "Process bootstrap STDOUT/STDERR annotations through Annotee.") | 70 "Process bootstrap STDOUT/STDERR annotations through Annotee.") |
| 71 fs.BoolVar( |
| 72 &p.tee, |
| 73 "logdog-tee", |
| 74 true, |
| 75 "Tee bootstrapped STDOUT and STDERR through Kitchen. If false, t
hese will only be sent as LogDog streams") |
| 65 fs.StringVar( | 76 fs.StringVar( |
| 66 &p.filePath, | 77 &p.filePath, |
| 67 "logdog-debug-out-file", | 78 "logdog-debug-out-file", |
| 68 "", | 79 "", |
| 69 "If specified, write all generated logs to this path instead of
sending them.") | 80 "If specified, write all generated logs to this path instead of
sending them.") |
| 70 } | 81 } |
| 71 | 82 |
| 72 func (p *cookLogDogParams) active() bool { | 83 func (p *cookLogDogParams) active() bool { |
| 73 » return p.host != "" || p.project != "" || p.prefix != "" | 84 » return p.host != "" || p.project != "" || p.prefix != "" || p.tee |
| 74 } | 85 } |
| 75 | 86 |
| 76 func (p *cookLogDogParams) validate() error { | 87 func (p *cookLogDogParams) validate() error { |
| 77 if p.project == "" { | 88 if p.project == "" { |
| 78 return fmt.Errorf("a LogDog project must be supplied (-logdog-pr
oject)") | 89 return fmt.Errorf("a LogDog project must be supplied (-logdog-pr
oject)") |
| 79 } | 90 } |
| 80 return nil | 91 return nil |
| 81 } | 92 } |
| 82 | 93 |
| 83 func (p *cookLogDogParams) getPrefix(env environ.Env) (types.StreamName, error)
{ | 94 func (p *cookLogDogParams) getPrefix(env environ.Env) (types.StreamName, error)
{ |
| 84 if p.prefix != "" { | 95 if p.prefix != "" { |
| 85 return p.prefix, nil | 96 return p.prefix, nil |
| 86 } | 97 } |
| 87 | 98 |
| 88 » // Construct our LogDog prefix from the Swarming task parameters. | 99 » // Construct our LogDog prefix from the Swarming task parameters. The se
rver |
| 89 » host, _ := env.Get("SWARMING_SERVER") | 100 » // will be exported as a URL. We want the "host" parameter from this URL
. |
| 101 » server, _ := env.Get("SWARMING_SERVER") |
| 102 » serverURL, err := url.Parse(server) |
| 103 » if err != nil { |
| 104 » » return "", errors.Annotate(err).Reason("failed to parse SWARMING
_SERVER URL %(value)q"). |
| 105 » » » D("value", server).Err() |
| 106 » } |
| 107 |
| 108 » host := serverURL.Host |
| 109 » if serverURL.Scheme == "" { |
| 110 » » // SWARMING_SERVER is not a full URL, so use its Path instead of
its Host. |
| 111 » » host = serverURL.Path |
| 112 » } |
| 90 if host == "" { | 113 if host == "" { |
| 91 » » return "", errors.Reason("missing or empty SWARMING_SERVER").Err
() | 114 » » return "", errors.Reason("missing or empty SWARMING_SERVER host
in %(value)q"). |
| 115 » » » D("value", server).Err() |
| 92 } | 116 } |
| 117 |
| 93 taskID, _ := env.Get("SWARMING_TASK_ID") | 118 taskID, _ := env.Get("SWARMING_TASK_ID") |
| 94 if taskID == "" { | 119 if taskID == "" { |
| 95 return "", errors.Reason("missing or empty SWARMING_TASK_ID").Er
r() | 120 return "", errors.Reason("missing or empty SWARMING_TASK_ID").Er
r() |
| 96 } | 121 } |
| 97 | 122 |
| 98 return types.MakeStreamName("", "swarm", host, taskID) | 123 return types.MakeStreamName("", "swarm", host, taskID) |
| 99 } | 124 } |
| 100 | 125 |
| 101 // runWithLogdogButler rus the supplied command through the a LogDog Butler | 126 // runWithLogdogButler rus the supplied command through the a LogDog Butler |
| 102 // engine instance. This involves: | 127 // engine instance. This involves: |
| (...skipping 14 matching lines...) Expand all Loading... |
| 117 } | 142 } |
| 118 | 143 |
| 119 prefix, err := c.logdog.getPrefix(env) | 144 prefix, err := c.logdog.getPrefix(env) |
| 120 if err != nil { | 145 if err != nil { |
| 121 return 0, errors.Annotate(err).Reason("failed to get LogDog pref
ix").Err() | 146 return 0, errors.Annotate(err).Reason("failed to get LogDog pref
ix").Err() |
| 122 } | 147 } |
| 123 log.Fields{ | 148 log.Fields{ |
| 124 "prefix": prefix, | 149 "prefix": prefix, |
| 125 }.Infof(ctx, "Using LogDog prefix: %q", prefix) | 150 }.Infof(ctx, "Using LogDog prefix: %q", prefix) |
| 126 | 151 |
| 127 » authenticator := auth.NewAuthenticator(ctx, auth.SilentLogin, auth.Optio
ns{ | 152 » // Set up authentication. |
| 153 » authOpts := auth.Options{ |
| 128 Scopes: out.Scopes(), | 154 Scopes: out.Scopes(), |
| 129 » }) | 155 » } |
| 156 » if !infraenv.OnGCE() { |
| 157 » » // If we're not on GCE, we will need to explicitly supply the Lo
gDog |
| 158 » » // credentials path. |
| 159 » » credPath, err := infraenv.GetLogDogServiceAccountJSON() |
| 160 » » if err != nil { |
| 161 » » » return 0, errors.Annotate(err).Reason("failed to get Log
Dog service account JSON path").Err() |
| 162 » » } |
| 163 » » authOpts.ServiceAccountJSONPath = credPath |
| 164 » } |
| 165 » authenticator := auth.NewAuthenticator(ctx, auth.SilentLogin, authOpts) |
| 130 | 166 |
| 131 // Register and instantiate our LogDog Output. | 167 // Register and instantiate our LogDog Output. |
| 132 var o output.Output | 168 var o output.Output |
| 133 if c.logdog.filePath == "" { | 169 if c.logdog.filePath == "" { |
| 134 ocfg := out.Config{ | 170 ocfg := out.Config{ |
| 135 Auth: authenticator, | 171 Auth: authenticator, |
| 136 Host: c.logdog.host, | 172 Host: c.logdog.host, |
| 137 Project: config.ProjectName(c.logdog.project), | 173 Project: config.ProjectName(c.logdog.project), |
| 138 Prefix: prefix, | 174 Prefix: prefix, |
| 139 SourceInfo: []string{ | 175 SourceInfo: []string{ |
| 140 "Kitchen", | 176 "Kitchen", |
| 141 }, | 177 }, |
| 142 PublishContext: withNonCancel(ctx), | 178 PublishContext: withNonCancel(ctx), |
| 143 } | 179 } |
| 144 | 180 |
| 145 var err error | 181 var err error |
| 146 if o, err = ocfg.Register(ctx); err != nil { | 182 if o, err = ocfg.Register(ctx); err != nil { |
| 147 return 0, errors.Annotate(err).Reason("failed to create
LogDog Output instance").Err() | 183 return 0, errors.Annotate(err).Reason("failed to create
LogDog Output instance").Err() |
| 148 } | 184 } |
| 149 } else { | 185 } else { |
| 150 // Debug: Use a file output. | 186 // Debug: Use a file output. |
| 151 ocfg := fileOut.Options{ | 187 ocfg := fileOut.Options{ |
| 152 Path: c.logdog.filePath, | 188 Path: c.logdog.filePath, |
| 153 } | 189 } |
| 154 o = ocfg.New(ctx) | 190 o = ocfg.New(ctx) |
| 155 } | 191 } |
| 156 defer o.Close() | 192 defer o.Close() |
| 157 | 193 |
| 158 » ncCtx := withNonCancel(ctx) | 194 » butlerCfg := butler.Config{ |
| 159 » b, err := butler.New(ncCtx, butler.Config{ | |
| 160 Output: o, | 195 Output: o, |
| 161 Project: config.ProjectName(c.logdog.project), | 196 Project: config.ProjectName(c.logdog.project), |
| 162 Prefix: c.logdog.prefix, | 197 Prefix: c.logdog.prefix, |
| 163 BufferLogs: true, | 198 BufferLogs: true, |
| 164 » }) | 199 » } |
| 200 |
| 201 » // If we're teeing and we're not using Annotee, tee our subprocess' STDO
UT |
| 202 » // and STDERR through Kitchen's STDOUT/STDERR. |
| 203 » // |
| 204 » // If we're using Annotee, we will configure this in the Annotee setup |
| 205 » // directly. |
| 206 » if c.logdog.tee && !c.logdog.annotee { |
| 207 » » butlerCfg.TeeStdout = os.Stdout |
| 208 » » butlerCfg.TeeStderr = os.Stderr |
| 209 » } |
| 210 |
| 211 » ncCtx := withNonCancel(ctx) |
| 212 » b, err := butler.New(ncCtx, butlerCfg) |
| 165 if err != nil { | 213 if err != nil { |
| 166 err = errors.Annotate(err).Reason("failed to create Butler insta
nce").Err() | 214 err = errors.Annotate(err).Reason("failed to create Butler insta
nce").Err() |
| 167 return | 215 return |
| 168 } | 216 } |
| 169 defer func() { | 217 defer func() { |
| 170 b.Activate() | 218 b.Activate() |
| 171 if ierr := b.Wait(); ierr != nil { | 219 if ierr := b.Wait(); ierr != nil { |
| 172 ierr = errors.Annotate(ierr).Reason("failed to Wait() fo
r Butler").Err() | 220 ierr = errors.Annotate(ierr).Reason("failed to Wait() fo
r Butler").Err() |
| 173 logAnnotatedErr(ctx, ierr) | 221 logAnnotatedErr(ctx, ierr) |
| 174 | 222 |
| (...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 263 Annotate: true, | 311 Annotate: true, |
| 264 StripAnnotations: true, | 312 StripAnnotations: true, |
| 265 }, | 313 }, |
| 266 { | 314 { |
| 267 Reader: stderr, | 315 Reader: stderr, |
| 268 Name: annotee.STDERR, | 316 Name: annotee.STDERR, |
| 269 Annotate: true, | 317 Annotate: true, |
| 270 StripAnnotations: true, | 318 StripAnnotations: true, |
| 271 }, | 319 }, |
| 272 } | 320 } |
| 321 if c.logdog.tee { |
| 322 streams[0].Tee = os.Stdout |
| 323 streams[1].Tee = os.Stderr |
| 324 } |
| 273 | 325 |
| 274 // Run the process' output streams through Annotee. This will bl
ock until | 326 // Run the process' output streams through Annotee. This will bl
ock until |
| 275 // they are all consumed. | 327 // they are all consumed. |
| 276 if err = annoteeProcessor.RunStreams(streams); err != nil { | 328 if err = annoteeProcessor.RunStreams(streams); err != nil { |
| 277 err = errors.Annotate(err).Reason("failed to process str
eams through Annotee").Err() | 329 err = errors.Annotate(err).Reason("failed to process str
eams through Annotee").Err() |
| 278 return | 330 return |
| 279 } | 331 } |
| 280 } else { | 332 } else { |
| 281 // Get our STDOUT / STDERR stream flags. Tailor them to match An
notee. | 333 // Get our STDOUT / STDERR stream flags. Tailor them to match An
notee. |
| 282 stdoutFlags := annotee.TextStreamFlags(ctx, annotee.STDOUT) | 334 stdoutFlags := annotee.TextStreamFlags(ctx, annotee.STDOUT) |
| 283 stderrFlags := annotee.TextStreamFlags(ctx, annotee.STDERR) | 335 stderrFlags := annotee.TextStreamFlags(ctx, annotee.STDERR) |
| 336 if c.logdog.tee { |
| 337 stdoutFlags.Tee = streamproto.TeeStdout |
| 338 stderrFlags.Tee = streamproto.TeeStderr |
| 339 } |
| 284 | 340 |
| 285 // Wait for our STDOUT / STDERR streams to complete. | 341 // Wait for our STDOUT / STDERR streams to complete. |
| 286 var wg sync.WaitGroup | 342 var wg sync.WaitGroup |
| 287 stdout = &callbackReadCloser{stdout, wg.Done} | 343 stdout = &callbackReadCloser{stdout, wg.Done} |
| 288 stderr = &callbackReadCloser{stderr, wg.Done} | 344 stderr = &callbackReadCloser{stderr, wg.Done} |
| 289 wg.Add(2) | 345 wg.Add(2) |
| 290 | 346 |
| 291 // Explicitly add these streams to the Butler. | 347 // Explicitly add these streams to the Butler. |
| 292 if err = b.AddStream(stdout, *stdoutFlags.Properties()); err !=
nil { | 348 if err = b.AddStream(stdout, *stdoutFlags.Properties()); err !=
nil { |
| 293 err = errors.Annotate(err).Reason("failed to add STDOUT
stream to Butler").Err() | 349 err = errors.Annotate(err).Reason("failed to add STDOUT
stream to Butler").Err() |
| (...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 331 // callbackReadCloser invokes a callback method when closed. | 387 // callbackReadCloser invokes a callback method when closed. |
| 332 type callbackReadCloser struct { | 388 type callbackReadCloser struct { |
| 333 io.ReadCloser | 389 io.ReadCloser |
| 334 callback func() | 390 callback func() |
| 335 } | 391 } |
| 336 | 392 |
| 337 func (c *callbackReadCloser) Close() error { | 393 func (c *callbackReadCloser) Close() error { |
| 338 defer c.callback() | 394 defer c.callback() |
| 339 return c.ReadCloser.Close() | 395 return c.ReadCloser.Close() |
| 340 } | 396 } |
| OLD | NEW |