Index: common/dirwalk/tests/tools/walkdir/fileprocessor_phash.go |
diff --git a/common/dirwalk/tests/tools/walkdir/fileprocessor_phash.go b/common/dirwalk/tests/tools/walkdir/fileprocessor_phash.go |
new file mode 100644 |
index 0000000000000000000000000000000000000000..83278b59faf94abc14241e2f137285a683e5c70b |
--- /dev/null |
+++ b/common/dirwalk/tests/tools/walkdir/fileprocessor_phash.go |
@@ -0,0 +1,98 @@ |
+// Copyright 2016 The LUCI Authors. All rights reserved. |
+// Use of this source code is governed under the Apache License, Version 2.0 |
+// that can be found in the LICENSE file. |
+ |
+package main |
+ |
+import ( |
+ "flag" |
+ "fmt" |
+ "io" |
+ "io/ioutil" |
+ "runtime" |
+ |
+ "github.com/dustin/go-humanize" |
+) |
+ |
+var maxworkers = flag.Int("maxworkers", 100, "Maximum number of workers to use.") |
+ |
+type ToHash struct { |
+ path string |
+ r io.ReadCloser |
+} |
+ |
+// ParallelHashFileProcessor implements FileProcessor. It generates a hash for the contents |
+// of each found file using multiple threads. |
+type ParallelHashFileProcessor struct { |
+ BaseFileProcessor |
+ obuf io.Writer |
+ workers int |
+ queue *chan ToHash |
+ finished chan bool |
+} |
+ |
+func ParallelHashFileProcessorWorker(name int, obuf io.Writer, queue <-chan ToHash, finished chan<- bool) { |
+ fmt.Fprintf(obuf, "Starting hash worker %d\n", name) |
+ |
+ var filecount uint64 |
+ var bytecount uint64 |
+ for tohash := range queue { |
+ filecount++ |
+ |
+ digest, bytes, err := hash(tohash.r) |
+ tohash.r.Close() |
+ if err != nil { |
+ // FIXME(mithro): Do something here? |
+ continue |
+ } |
+ bytecount += bytes |
+ fmt.Fprintf(obuf, "%s: %v\n", tohash.path, digest) |
+ } |
+ fmt.Fprintf(obuf, "Finished hash worker %d (hashed %d files, %s)\n", name, filecount, humanize.Bytes(bytecount)) |
+ finished <- true |
+} |
+ |
+func CreateParallelHashFileProcessor(obuf io.Writer) *ParallelHashFileProcessor { |
+ max := *maxworkers |
+ |
+ maxProcs := runtime.GOMAXPROCS(0) |
+ if maxProcs < max { |
+ max = maxProcs |
+ } |
+ |
+ numCPU := runtime.NumCPU() |
+ if numCPU < maxProcs { |
+ max = numCPU |
+ } |
+ |
+ if max < *maxworkers { |
+ // FIXME: Warn |
+ } |
+ |
+ p := ParallelHashFileProcessor{obuf: obuf, workers: max, finished: make(chan bool)} |
+ q := make(chan ToHash, p.workers) |
+ p.queue = &q |
+ for i := 0; i < p.workers; i++ { |
+ go ParallelHashFileProcessorWorker(i, p.obuf, *p.queue, p.finished) |
+ } |
+ return &p |
+} |
+ |
+func (p *ParallelHashFileProcessor) SmallFile(path string, r io.ReadCloser) { |
+ *p.queue <- ToHash{path: path, r: r} |
+ p.BaseFileProcessor.SmallFile(path, ioutil.NopCloser(r)) |
+} |
+ |
+func (p *ParallelHashFileProcessor) LargeFile(path string, r io.ReadCloser) { |
+ *p.queue <- ToHash{path: path, r: r} |
+ p.BaseFileProcessor.LargeFile(path, ioutil.NopCloser(r)) |
+} |
+ |
+func (p *ParallelHashFileProcessor) Complete(path string) { |
+ close(*p.queue) |
+ for i := 0; i < p.workers; i++ { |
+ <-p.finished |
+ } |
+ fmt.Fprintln(p.obuf, "All workers finished.") |
+ p.queue = nil |
+} |