| OLD | NEW |
| (Empty) |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | |
| 3 // that can be found in the LICENSE file. | |
| 4 | |
| 5 package main | |
| 6 | |
| 7 import ( | |
| 8 "bufio" | |
| 9 "bytes" | |
| 10 "fmt" | |
| 11 "io" | |
| 12 "os" | |
| 13 "os/exec" | |
| 14 "sort" | |
| 15 "strings" | |
| 16 "sync" | |
| 17 "time" | |
| 18 | |
| 19 "github.com/luci/luci-go/common/clock" | |
| 20 "github.com/luci/luci-go/common/ctxcmd" | |
| 21 "github.com/luci/luci-go/common/errors" | |
| 22 log "github.com/luci/luci-go/common/logging" | |
| 23 "github.com/luci/luci-go/common/parallel" | |
| 24 | |
| 25 "golang.org/x/net/context" | |
| 26 ) | |
| 27 | |
| 28 const ( | |
| 29 workExecDumpLineCount = 10 | |
| 30 workExecDumpLineTimeout = 5 * time.Second | |
| 31 ) | |
| 32 | |
| 33 type work struct { | |
| 34 context.Context | |
| 35 parallel.MultiRunner | |
| 36 *tools | |
| 37 } | |
| 38 | |
| 39 type workExecutor struct { | |
| 40 command string | |
| 41 args []string | |
| 42 workdir string | |
| 43 envMap map[string]string | |
| 44 | |
| 45 outputLevel log.Level | |
| 46 shouldForwardOutput bool | |
| 47 | |
| 48 stdout bytes.Buffer | |
| 49 stderr bytes.Buffer | |
| 50 } | |
| 51 | |
| 52 func execute(cmd string, args ...string) *workExecutor { | |
| 53 return &workExecutor{ | |
| 54 command: cmd, | |
| 55 args: args, | |
| 56 outputLevel: log.Debug, | |
| 57 } | |
| 58 } | |
| 59 | |
| 60 func (x *workExecutor) bootstrap(command string, args ...string) *workExecutor { | |
| 61 nargs := make([]string, 0, 1+len(args)+len(x.args)) | |
| 62 nargs = append(append(append(nargs, args...), x.command), x.args...) | |
| 63 x.command, x.args = command, nargs | |
| 64 return x | |
| 65 } | |
| 66 | |
| 67 func (x *workExecutor) cwd(path string) *workExecutor { | |
| 68 x.workdir = path | |
| 69 return x | |
| 70 } | |
| 71 | |
| 72 func (x *workExecutor) loadEnv(e []string) *workExecutor { | |
| 73 for _, v := range e { | |
| 74 switch parts := strings.SplitN(v, "=", 2); len(parts) { | |
| 75 case 1: | |
| 76 x.env(parts[0], "") | |
| 77 | |
| 78 case 2: | |
| 79 x.env(parts[0], parts[1]) | |
| 80 } | |
| 81 } | |
| 82 return x | |
| 83 } | |
| 84 | |
| 85 func (x *workExecutor) env(key string, value string) *workExecutor { | |
| 86 if x.envMap == nil { | |
| 87 x.envMap = make(map[string]string) | |
| 88 } | |
| 89 x.envMap[key] = value | |
| 90 return x | |
| 91 } | |
| 92 | |
| 93 func (x *workExecutor) envPath(key string, value ...string) *workExecutor { | |
| 94 return x.env(key, strings.Join(value, string(os.PathListSeparator))) | |
| 95 } | |
| 96 | |
| 97 func (x *workExecutor) forwardOutput() *workExecutor { | |
| 98 x.shouldForwardOutput = true | |
| 99 return x | |
| 100 } | |
| 101 | |
| 102 func (x *workExecutor) outputAt(l log.Level) *workExecutor { | |
| 103 x.outputLevel = l | |
| 104 return x | |
| 105 } | |
| 106 | |
| 107 func (x *workExecutor) run(c context.Context) (int, error) { | |
| 108 // Clear our buffers for this command. | |
| 109 x.stdout.Reset() | |
| 110 x.stderr.Reset() | |
| 111 | |
| 112 // Setup / execute the command. | |
| 113 cmd := ctxcmd.CtxCmd{ | |
| 114 Cmd: exec.Command(x.command, x.args...), | |
| 115 } | |
| 116 cmd.Dir = x.workdir | |
| 117 | |
| 118 // Setup pipes and goroutines to dump pipes periodically so we can see | |
| 119 // what's happening. | |
| 120 var wg sync.WaitGroup | |
| 121 switch { | |
| 122 case x.shouldForwardOutput: | |
| 123 cmd.Stdout = &teeWriter{os.Stdout, &x.stdout} | |
| 124 cmd.Stderr = &teeWriter{os.Stderr, &x.stderr} | |
| 125 | |
| 126 case log.IsLogging(c, x.outputLevel): | |
| 127 // Get our command pipes. Wrap each one in a "closeOnceReader" s
o that our | |
| 128 // reader goroutine can close on error and our outer loop can al
so close on | |
| 129 // error without conflicting. | |
| 130 stdoutPipe, err := cmd.StdoutPipe() | |
| 131 if err != nil { | |
| 132 return -1, errors.Annotate(err).Reason("failed to create
STDOUT pipe").Err() | |
| 133 } | |
| 134 stdoutPipe = &closeOnceReader{ReadCloser: stdoutPipe} | |
| 135 defer stdoutPipe.Close() | |
| 136 | |
| 137 stderrPipe, err := cmd.StderrPipe() | |
| 138 if err != nil { | |
| 139 return -1, errors.Annotate(err).Reason("failed to create
STDERR pipe").Err() | |
| 140 } | |
| 141 stderrPipe = &closeOnceReader{ReadCloser: stderrPipe} | |
| 142 defer stderrPipe.Close() | |
| 143 | |
| 144 spawnMonitor := func(name string, in io.ReadCloser, tee io.Write
r) { | |
| 145 wg.Add(1) | |
| 146 go func() { | |
| 147 defer wg.Done() | |
| 148 defer in.Close() | |
| 149 | |
| 150 var ( | |
| 151 tr = io.TeeReader(in, tee) | |
| 152 reader = bufio.NewReader(tr) | |
| 153 lastDump = clock.Now(c) | |
| 154 lines = make([]string, 0, workExecDum
pLineCount) | |
| 155 ) | |
| 156 | |
| 157 dump := func(now time.Time) { | |
| 158 if len(lines) > 0 { | |
| 159 log.Logf(c, x.outputLevel, "Comm
and %s %s output %s:\n%s", | |
| 160 x.command, x.args, name,
strings.Join(lines, "")) | |
| 161 } | |
| 162 lines = lines[:0] | |
| 163 lastDump = now | |
| 164 } | |
| 165 | |
| 166 for { | |
| 167 line, err := reader.ReadString('\n') | |
| 168 if len(line) > 0 { | |
| 169 lines = append(lines, line) | |
| 170 } | |
| 171 if err != nil { | |
| 172 break | |
| 173 } | |
| 174 | |
| 175 dumpThreshold := lastDump.Add(workExecDu
mpLineTimeout) | |
| 176 now := clock.Now(c) | |
| 177 if len(lines) >= workExecDumpLineCount |
| dumpThreshold.Before(now) { | |
| 178 dump(now) | |
| 179 } | |
| 180 } | |
| 181 dump(time.Time{}) | |
| 182 }() | |
| 183 } | |
| 184 spawnMonitor("stdout", stdoutPipe, &x.stdout) | |
| 185 spawnMonitor("stderr", stderrPipe, &x.stderr) | |
| 186 | |
| 187 default: | |
| 188 // We wouldn't see the logs anyway, so buffer directly. | |
| 189 cmd.Stdout = &x.stdout | |
| 190 cmd.Stderr = &x.stderr | |
| 191 } | |
| 192 | |
| 193 if len(x.envMap) > 0 { | |
| 194 // Get a sorted list of keys (determinism). | |
| 195 env := make([]string, 0, len(x.envMap)) | |
| 196 for k := range x.envMap { | |
| 197 env = append(env, k) | |
| 198 } | |
| 199 sort.Strings(env) | |
| 200 | |
| 201 // Replace with environment. | |
| 202 for i, k := range env { | |
| 203 env[i] = fmt.Sprintf("%s=%s", k, x.envMap[k]) | |
| 204 } | |
| 205 cmd.Env = env | |
| 206 } | |
| 207 | |
| 208 log.Fields{ | |
| 209 "cwd": x.workdir, | |
| 210 }.Debugf(c, "Running command: %s %s.", x.command, x.args) | |
| 211 if err := cmd.Start(c); err != nil { | |
| 212 return -1, errors.Annotate(err).Reason("failed to start command"
). | |
| 213 D("command", x.command).D("args", x.args).D("cwd", x.wor
kdir).Err() | |
| 214 } | |
| 215 | |
| 216 // Wait for our stream processing to finish. | |
| 217 wg.Wait() | |
| 218 | |
| 219 if err := cmd.Wait(); err != nil { | |
| 220 if rc, ok := ctxcmd.ExitCode(err); ok { | |
| 221 log.Fields{ | |
| 222 "returnCode": rc, | |
| 223 }.Debugf(c, "Command completed with non-zero return code
: %s %s", x.command, x.args) | |
| 224 return rc, nil | |
| 225 } | |
| 226 | |
| 227 return -1, errors.Annotate(err).Reason("failed to wait for comma
nd"). | |
| 228 D("command", x.command).D("args", x.args).D("cwd", x.wor
kdir).Err() | |
| 229 } | |
| 230 | |
| 231 log.Debugf(c, "Command completed with zero return code: %s %s", x.comman
d, x.args) | |
| 232 return 0, nil | |
| 233 } | |
| 234 | |
| 235 func (x *workExecutor) check(c context.Context) error { | |
| 236 switch rc, err := x.run(c); { | |
| 237 case err != nil: | |
| 238 return errors.Annotate(err).Err() | |
| 239 | |
| 240 case rc != 0: | |
| 241 log.Fields{ | |
| 242 "returnCode": rc, | |
| 243 "command": x.command, | |
| 244 "args": x.args, | |
| 245 "cwd": x.workdir, | |
| 246 }.Errorf(c, "Command failed with error return code.\nSTDOUT:\n%s
\n\nSTDERR:\n%s", | |
| 247 x.stdout.String(), x.stderr.String()) | |
| 248 return errors.Reason("process exited with return code: %(returnC
ode)d"). | |
| 249 D("command", x.command).D("args", x.args).D("cwd", x.wor
kdir).D("returnCode", rc).Err() | |
| 250 | |
| 251 default: | |
| 252 return nil | |
| 253 } | |
| 254 } | |
| 255 | |
| 256 func runWork(c context.Context, workers int, tools *tools, f func(w *work) error
) error { | |
| 257 return parallel.RunMulti(c, workers, func(mr parallel.MultiRunner) error
{ | |
| 258 return f(&work{ | |
| 259 Context: c, | |
| 260 tools: tools, | |
| 261 MultiRunner: mr, | |
| 262 }) | |
| 263 }) | |
| 264 } | |
| 265 | |
| 266 type closeOnceReader struct { | |
| 267 io.ReadCloser | |
| 268 close sync.Once | |
| 269 } | |
| 270 | |
| 271 func (r *closeOnceReader) Close() (err error) { | |
| 272 r.close.Do(func() { | |
| 273 err = r.ReadCloser.Close() | |
| 274 }) | |
| 275 return | |
| 276 } | |
| 277 | |
| 278 func addGoEnv(goPath []string, x *workExecutor) *workExecutor { | |
| 279 return x.loadEnv(os.Environ()).envPath("GOPATH", goPath...) | |
| 280 } | |
| 281 | |
| 282 // teeWriter writes data to the base writer, then to the supplied tee writer. | |
| 283 // The output of the base Write will be returned. If the tee write failed, Write | |
| 284 // will panic. | |
| 285 type teeWriter struct { | |
| 286 base io.Writer | |
| 287 tee io.Writer | |
| 288 } | |
| 289 | |
| 290 func (w *teeWriter) Write(d []byte) (amt int, err error) { | |
| 291 amt, err = w.base.Write(d) | |
| 292 if amt > 0 { | |
| 293 if _, ierr := w.tee.Write(d[:amt]); ierr != nil { | |
| 294 panic(errors.Annotate(ierr).Reason("failed to write to t
ee writer").Err()) | |
| 295 } | |
| 296 } | |
| 297 return | |
| 298 } | |
| OLD | NEW |