| Index: common/dirwalk/walk_parallel.go
|
| diff --git a/common/dirwalk/walk_parallel.go b/common/dirwalk/walk_parallel.go
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..45e8257e17248e6ff2b28b5eb90e6728caab7462
|
| --- /dev/null
|
| +++ b/common/dirwalk/walk_parallel.go
|
| @@ -0,0 +1,97 @@
|
| +// Copyright 2016 The LUCI Authors. All rights reserved.
|
| +// Use of this source code is governed under the Apache License, Version 2.0
|
| +// that can be found in the LICENSE file.
|
| +
|
| +package dirwalk
|
| +
|
| +import (
|
| + "os"
|
| + "path/filepath"
|
| + "sync/atomic"
|
| +
|
| + "github.com/eapache/channels"
|
| +)
|
| +
|
| +type fileQueue struct {
|
| + queued uint64
|
| + finished uint64
|
| + items channels.Channel
|
| + waiton chan bool
|
| +}
|
| +
|
| +func (q *fileQueue) add(s string) {
|
| + atomic.AddUint64(&q.queued, 1)
|
| + q.items.In() <- s
|
| +}
|
| +
|
| +func (q *fileQueue) done() {
|
| + atomic.AddUint64(&q.finished, 1)
|
| +
|
| + if q.queued == q.finished {
|
| + q.items.Close()
|
| + q.waiton <- true
|
| + }
|
| +}
|
| +
|
| +func (q *fileQueue) wait() {
|
| + <-q.waiton
|
| +}
|
| +
|
| +func examinePath(queue *fileQueue, callback WalkFunc) {
|
| + for ipath := range queue.items.Out() {
|
| + path := ipath.(string)
|
| +
|
| + fi, err := os.Stat(path)
|
| + if err != nil {
|
| + callback(path, -1, nil, err)
|
| + return
|
| + }
|
| +
|
| + if fi.IsDir() {
|
| + d, err := os.Open(path)
|
| + if err != nil {
|
| + callback(path, -1, nil, err)
|
| + }
|
| +
|
| + dircontents, err := d.Readdirnames(-1)
|
| + if err != nil {
|
| + callback(path, -1, nil, err)
|
| + }
|
| + for _, name := range dircontents {
|
| + fname := filepath.Join(path, name)
|
| + queue.add(fname)
|
| + }
|
| + } else {
|
| + f, err := os.Open(path)
|
| + if err != nil {
|
| + callback(path, -1, nil, err)
|
| + } else {
|
| + callback(path, fi.Size(), f, nil)
|
| + }
|
| + }
|
| + queue.done()
|
| + }
|
| +}
|
| +
|
| +// WalkParallel is a directory walking function which uses multiple threads to
|
| +// walk a directory tree.
|
| +//
|
| +// On the majority of systems testing shows that this function is either
|
| +// slower than (or at best comparable) the non-parallel version while consuming
|
| +// many times the resources.
|
| +//
|
| +// Linux Kernel versions newer than >4.8 which disable locks in stat path can
|
| +// make this version faster.
|
| +//
|
| +// Use the performance tests to determine the correct walker for your platform
|
| +// and system!
|
| +func WalkParallel(root string, callback WalkFunc) {
|
| + queue := fileQueue{queued: 0, finished: 0, items: channels.NewInfiniteChannel(), waiton: make(chan bool)}
|
| +
|
| + for w := 0; w <= 10; w++ {
|
| + go examinePath(&queue, callback)
|
| + }
|
| +
|
| + queue.add(root)
|
| + queue.wait()
|
| +}
|
|
|