Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1093)

Side by Side Diff: common/dirwalk/tests/tools/walkdir/fileprocessor_phash.go

Issue 2054763004: luci-go/common/dirwalk: Code for walking a directory tree efficiently Base URL: https://github.com/luci/luci-go@master
Patch Set: Major rewrite of the code. Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file.
4
5 package main
6
7 import (
8 "flag"
9 "fmt"
10 "io"
11 "io/ioutil"
12 "runtime"
13
14 "github.com/dustin/go-humanize"
15 )
16
17 var maxworkers = flag.Int("maxworkers", 100, "Maximum number of workers to use." )
18
19 type ToHash struct {
20 path string
21 r io.ReadCloser
22 }
23
24 // ParallelHashFileProcessor implements FileProcessor. It generates a hash for t he contents
25 // of each found file using multiple threads.
26 type ParallelHashFileProcessor struct {
27 BaseFileProcessor
28 obuf io.Writer
29 workers int
30 queue *chan ToHash
31 finished chan bool
32 }
33
34 func ParallelHashFileProcessorWorker(name int, obuf io.Writer, queue <-chan ToHa sh, finished chan<- bool) {
35 fmt.Fprintf(obuf, "Starting hash worker %d\n", name)
36
37 var filecount uint64
38 var bytecount uint64
39 for tohash := range queue {
40 filecount++
41
42 digest, bytes, err := hash(tohash.r)
43 tohash.r.Close()
44 if err != nil {
45 // FIXME(mithro): Do something here?
46 continue
47 }
48 bytecount += bytes
49 fmt.Fprintf(obuf, "%s: %v\n", tohash.path, digest)
50 }
51 fmt.Fprintf(obuf, "Finished hash worker %d (hashed %d files, %s)\n", nam e, filecount, humanize.Bytes(bytecount))
52 finished <- true
53 }
54
55 func CreateParallelHashFileProcessor(obuf io.Writer) *ParallelHashFileProcessor {
56 max := *maxworkers
57
58 maxProcs := runtime.GOMAXPROCS(0)
59 if maxProcs < max {
60 max = maxProcs
61 }
62
63 numCPU := runtime.NumCPU()
64 if numCPU < maxProcs {
65 max = numCPU
66 }
67
68 if max < *maxworkers {
69 // FIXME: Warn
70 }
71
72 p := ParallelHashFileProcessor{obuf: obuf, workers: max, finished: make( chan bool)}
73 q := make(chan ToHash, p.workers)
74 p.queue = &q
75 for i := 0; i < p.workers; i++ {
76 go ParallelHashFileProcessorWorker(i, p.obuf, *p.queue, p.finish ed)
77 }
78 return &p
79 }
80
81 func (p *ParallelHashFileProcessor) SmallFile(path string, r io.ReadCloser) {
82 *p.queue <- ToHash{path: path, r: r}
83 p.BaseFileProcessor.SmallFile(path, ioutil.NopCloser(r))
84 }
85
86 func (p *ParallelHashFileProcessor) LargeFile(path string, r io.ReadCloser) {
87 *p.queue <- ToHash{path: path, r: r}
88 p.BaseFileProcessor.LargeFile(path, ioutil.NopCloser(r))
89 }
90
91 func (p *ParallelHashFileProcessor) Complete(path string) {
92 close(*p.queue)
93 for i := 0; i < p.workers; i++ {
94 <-p.finished
95 }
96 fmt.Fprintln(p.obuf, "All workers finished.")
97 p.queue = nil
98 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698