OLD | NEW |
(Empty) | |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. |
| 4 |
| 5 package main |
| 6 |
| 7 import ( |
| 8 "flag" |
| 9 "fmt" |
| 10 "io" |
| 11 "io/ioutil" |
| 12 "runtime" |
| 13 |
| 14 "github.com/dustin/go-humanize" |
| 15 ) |
| 16 |
| 17 var maxworkers = flag.Int("maxworkers", 100, "Maximum number of workers to use."
) |
| 18 |
| 19 type ToHash struct { |
| 20 path string |
| 21 r io.ReadCloser |
| 22 } |
| 23 |
| 24 // ParallelHashFileProcessor implements FileProcessor. It generates a hash for t
he contents |
| 25 // of each found file using multiple threads. |
| 26 type ParallelHashFileProcessor struct { |
| 27 BaseFileProcessor |
| 28 obuf io.Writer |
| 29 workers int |
| 30 queue *chan ToHash |
| 31 finished chan bool |
| 32 } |
| 33 |
| 34 func ParallelHashFileProcessorWorker(name int, obuf io.Writer, queue <-chan ToHa
sh, finished chan<- bool) { |
| 35 fmt.Fprintf(obuf, "Starting hash worker %d\n", name) |
| 36 |
| 37 var filecount uint64 |
| 38 var bytecount uint64 |
| 39 for tohash := range queue { |
| 40 filecount++ |
| 41 |
| 42 digest, bytes, err := hash(tohash.r) |
| 43 tohash.r.Close() |
| 44 if err != nil { |
| 45 // FIXME(mithro): Do something here? |
| 46 continue |
| 47 } |
| 48 bytecount += bytes |
| 49 fmt.Fprintf(obuf, "%s: %v\n", tohash.path, digest) |
| 50 } |
| 51 fmt.Fprintf(obuf, "Finished hash worker %d (hashed %d files, %s)\n", nam
e, filecount, humanize.Bytes(bytecount)) |
| 52 finished <- true |
| 53 } |
| 54 |
| 55 func CreateParallelHashFileProcessor(obuf io.Writer) *ParallelHashFileProcessor
{ |
| 56 max := *maxworkers |
| 57 |
| 58 maxProcs := runtime.GOMAXPROCS(0) |
| 59 if maxProcs < max { |
| 60 max = maxProcs |
| 61 } |
| 62 |
| 63 numCPU := runtime.NumCPU() |
| 64 if numCPU < maxProcs { |
| 65 max = numCPU |
| 66 } |
| 67 |
| 68 if max < *maxworkers { |
| 69 // FIXME: Warn |
| 70 } |
| 71 |
| 72 p := ParallelHashFileProcessor{obuf: obuf, workers: max, finished: make(
chan bool)} |
| 73 q := make(chan ToHash, p.workers) |
| 74 p.queue = &q |
| 75 for i := 0; i < p.workers; i++ { |
| 76 go ParallelHashFileProcessorWorker(i, p.obuf, *p.queue, p.finish
ed) |
| 77 } |
| 78 return &p |
| 79 } |
| 80 |
| 81 func (p *ParallelHashFileProcessor) SmallFile(path string, r io.ReadCloser) { |
| 82 *p.queue <- ToHash{path: path, r: r} |
| 83 p.BaseFileProcessor.SmallFile(path, ioutil.NopCloser(r)) |
| 84 } |
| 85 |
| 86 func (p *ParallelHashFileProcessor) LargeFile(path string, r io.ReadCloser) { |
| 87 *p.queue <- ToHash{path: path, r: r} |
| 88 p.BaseFileProcessor.LargeFile(path, ioutil.NopCloser(r)) |
| 89 } |
| 90 |
| 91 func (p *ParallelHashFileProcessor) Complete(path string) { |
| 92 close(*p.queue) |
| 93 for i := 0; i < p.workers; i++ { |
| 94 <-p.finished |
| 95 } |
| 96 fmt.Fprintln(p.obuf, "All workers finished.") |
| 97 p.queue = nil |
| 98 } |
OLD | NEW |