Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(155)

Side by Side Diff: deploytool/cmd/work.go

Issue 2182213002: deploytool: Add README.md, migrate docs to it. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Rename to "luci_deploy" Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « deploytool/cmd/version_test.go ('k') | luci-deploy.cfg » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2016 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file.
4
5 package main
6
7 import (
8 "bufio"
9 "bytes"
10 "fmt"
11 "io"
12 "os"
13 "os/exec"
14 "sort"
15 "strings"
16 "sync"
17 "time"
18
19 "github.com/luci/luci-go/common/clock"
20 "github.com/luci/luci-go/common/ctxcmd"
21 "github.com/luci/luci-go/common/errors"
22 log "github.com/luci/luci-go/common/logging"
23 "github.com/luci/luci-go/common/parallel"
24
25 "golang.org/x/net/context"
26 )
27
28 const (
29 workExecDumpLineCount = 10
30 workExecDumpLineTimeout = 5 * time.Second
31 )
32
33 type work struct {
34 context.Context
35 parallel.MultiRunner
36 *tools
37 }
38
39 type workExecutor struct {
40 command string
41 args []string
42 workdir string
43 envMap map[string]string
44
45 outputLevel log.Level
46 shouldForwardOutput bool
47
48 stdout bytes.Buffer
49 stderr bytes.Buffer
50 }
51
52 func execute(cmd string, args ...string) *workExecutor {
53 return &workExecutor{
54 command: cmd,
55 args: args,
56 outputLevel: log.Debug,
57 }
58 }
59
60 func (x *workExecutor) bootstrap(command string, args ...string) *workExecutor {
61 nargs := make([]string, 0, 1+len(args)+len(x.args))
62 nargs = append(append(append(nargs, args...), x.command), x.args...)
63 x.command, x.args = command, nargs
64 return x
65 }
66
67 func (x *workExecutor) cwd(path string) *workExecutor {
68 x.workdir = path
69 return x
70 }
71
72 func (x *workExecutor) loadEnv(e []string) *workExecutor {
73 for _, v := range e {
74 switch parts := strings.SplitN(v, "=", 2); len(parts) {
75 case 1:
76 x.env(parts[0], "")
77
78 case 2:
79 x.env(parts[0], parts[1])
80 }
81 }
82 return x
83 }
84
85 func (x *workExecutor) env(key string, value string) *workExecutor {
86 if x.envMap == nil {
87 x.envMap = make(map[string]string)
88 }
89 x.envMap[key] = value
90 return x
91 }
92
93 func (x *workExecutor) envPath(key string, value ...string) *workExecutor {
94 return x.env(key, strings.Join(value, string(os.PathListSeparator)))
95 }
96
97 func (x *workExecutor) forwardOutput() *workExecutor {
98 x.shouldForwardOutput = true
99 return x
100 }
101
102 func (x *workExecutor) outputAt(l log.Level) *workExecutor {
103 x.outputLevel = l
104 return x
105 }
106
107 func (x *workExecutor) run(c context.Context) (int, error) {
108 // Clear our buffers for this command.
109 x.stdout.Reset()
110 x.stderr.Reset()
111
112 // Setup / execute the command.
113 cmd := ctxcmd.CtxCmd{
114 Cmd: exec.Command(x.command, x.args...),
115 }
116 cmd.Dir = x.workdir
117
118 // Setup pipes and goroutines to dump pipes periodically so we can see
119 // what's happening.
120 var wg sync.WaitGroup
121 switch {
122 case x.shouldForwardOutput:
123 cmd.Stdout = &teeWriter{os.Stdout, &x.stdout}
124 cmd.Stderr = &teeWriter{os.Stderr, &x.stderr}
125
126 case log.IsLogging(c, x.outputLevel):
127 // Get our command pipes. Wrap each one in a "closeOnceReader" s o that our
128 // reader goroutine can close on error and our outer loop can al so close on
129 // error without conflicting.
130 stdoutPipe, err := cmd.StdoutPipe()
131 if err != nil {
132 return -1, errors.Annotate(err).Reason("failed to create STDOUT pipe").Err()
133 }
134 stdoutPipe = &closeOnceReader{ReadCloser: stdoutPipe}
135 defer stdoutPipe.Close()
136
137 stderrPipe, err := cmd.StderrPipe()
138 if err != nil {
139 return -1, errors.Annotate(err).Reason("failed to create STDERR pipe").Err()
140 }
141 stderrPipe = &closeOnceReader{ReadCloser: stderrPipe}
142 defer stderrPipe.Close()
143
144 spawnMonitor := func(name string, in io.ReadCloser, tee io.Write r) {
145 wg.Add(1)
146 go func() {
147 defer wg.Done()
148 defer in.Close()
149
150 var (
151 tr = io.TeeReader(in, tee)
152 reader = bufio.NewReader(tr)
153 lastDump = clock.Now(c)
154 lines = make([]string, 0, workExecDum pLineCount)
155 )
156
157 dump := func(now time.Time) {
158 if len(lines) > 0 {
159 log.Logf(c, x.outputLevel, "Comm and %s %s output %s:\n%s",
160 x.command, x.args, name, strings.Join(lines, ""))
161 }
162 lines = lines[:0]
163 lastDump = now
164 }
165
166 for {
167 line, err := reader.ReadString('\n')
168 if len(line) > 0 {
169 lines = append(lines, line)
170 }
171 if err != nil {
172 break
173 }
174
175 dumpThreshold := lastDump.Add(workExecDu mpLineTimeout)
176 now := clock.Now(c)
177 if len(lines) >= workExecDumpLineCount | | dumpThreshold.Before(now) {
178 dump(now)
179 }
180 }
181 dump(time.Time{})
182 }()
183 }
184 spawnMonitor("stdout", stdoutPipe, &x.stdout)
185 spawnMonitor("stderr", stderrPipe, &x.stderr)
186
187 default:
188 // We wouldn't see the logs anyway, so buffer directly.
189 cmd.Stdout = &x.stdout
190 cmd.Stderr = &x.stderr
191 }
192
193 if len(x.envMap) > 0 {
194 // Get a sorted list of keys (determinism).
195 env := make([]string, 0, len(x.envMap))
196 for k := range x.envMap {
197 env = append(env, k)
198 }
199 sort.Strings(env)
200
201 // Replace with environment.
202 for i, k := range env {
203 env[i] = fmt.Sprintf("%s=%s", k, x.envMap[k])
204 }
205 cmd.Env = env
206 }
207
208 log.Fields{
209 "cwd": x.workdir,
210 }.Debugf(c, "Running command: %s %s.", x.command, x.args)
211 if err := cmd.Start(c); err != nil {
212 return -1, errors.Annotate(err).Reason("failed to start command" ).
213 D("command", x.command).D("args", x.args).D("cwd", x.wor kdir).Err()
214 }
215
216 // Wait for our stream processing to finish.
217 wg.Wait()
218
219 if err := cmd.Wait(); err != nil {
220 if rc, ok := ctxcmd.ExitCode(err); ok {
221 log.Fields{
222 "returnCode": rc,
223 }.Debugf(c, "Command completed with non-zero return code : %s %s", x.command, x.args)
224 return rc, nil
225 }
226
227 return -1, errors.Annotate(err).Reason("failed to wait for comma nd").
228 D("command", x.command).D("args", x.args).D("cwd", x.wor kdir).Err()
229 }
230
231 log.Debugf(c, "Command completed with zero return code: %s %s", x.comman d, x.args)
232 return 0, nil
233 }
234
235 func (x *workExecutor) check(c context.Context) error {
236 switch rc, err := x.run(c); {
237 case err != nil:
238 return errors.Annotate(err).Err()
239
240 case rc != 0:
241 log.Fields{
242 "returnCode": rc,
243 "command": x.command,
244 "args": x.args,
245 "cwd": x.workdir,
246 }.Errorf(c, "Command failed with error return code.\nSTDOUT:\n%s \n\nSTDERR:\n%s",
247 x.stdout.String(), x.stderr.String())
248 return errors.Reason("process exited with return code: %(returnC ode)d").
249 D("command", x.command).D("args", x.args).D("cwd", x.wor kdir).D("returnCode", rc).Err()
250
251 default:
252 return nil
253 }
254 }
255
256 func runWork(c context.Context, workers int, tools *tools, f func(w *work) error ) error {
257 return parallel.RunMulti(c, workers, func(mr parallel.MultiRunner) error {
258 return f(&work{
259 Context: c,
260 tools: tools,
261 MultiRunner: mr,
262 })
263 })
264 }
265
266 type closeOnceReader struct {
267 io.ReadCloser
268 close sync.Once
269 }
270
271 func (r *closeOnceReader) Close() (err error) {
272 r.close.Do(func() {
273 err = r.ReadCloser.Close()
274 })
275 return
276 }
277
278 func addGoEnv(goPath []string, x *workExecutor) *workExecutor {
279 return x.loadEnv(os.Environ()).envPath("GOPATH", goPath...)
280 }
281
282 // teeWriter writes data to the base writer, then to the supplied tee writer.
283 // The output of the base Write will be returned. If the tee write failed, Write
284 // will panic.
285 type teeWriter struct {
286 base io.Writer
287 tee io.Writer
288 }
289
290 func (w *teeWriter) Write(d []byte) (amt int, err error) {
291 amt, err = w.base.Write(d)
292 if amt > 0 {
293 if _, ierr := w.tee.Write(d[:amt]); ierr != nil {
294 panic(errors.Annotate(ierr).Reason("failed to write to t ee writer").Err())
295 }
296 }
297 return
298 }
OLDNEW
« no previous file with comments | « deploytool/cmd/version_test.go ('k') | luci-deploy.cfg » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698