Chromium Code Reviews| Index: common/dirwalk/walkparallel.go |
| diff --git a/common/dirwalk/walkparallel.go b/common/dirwalk/walkparallel.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..0c862d41f534f9a731e9062afa363aa315264c44 |
| --- /dev/null |
| +++ b/common/dirwalk/walkparallel.go |
| @@ -0,0 +1,101 @@ |
| +// Copyright 2016 The LUCI Authors. All rights reserved. |
| +// Use of this source code is governed under the Apache License, Version 2.0 |
| +// that can be found in the LICENSE file. |
| + |
| +/** |
| + |
| +WARNING: THIS FUNCTION IS SLOWER THAN THE NON-PARALLEL VERSION! |
|
M-A Ruel
2016/09/15 14:31:03
// Testing shows that this function is slower than
mithro
2016/09/20 12:41:45
Done.
The problem is frequently around locking of
M-A Ruel
2016/09/20 16:37:27
That is actually a more helpful comment that shoul
|
| + |
| +**/ |
| +package dirwalk |
| + |
| +import ( |
| + "io/ioutil" |
| + "os" |
| + "path/filepath" |
| + "sort" |
| + "sync/atomic" |
| + |
| + "github.com/eapache/channels" |
| +) |
| + |
| +type fileQueue struct { |
| + queued uint64 |
| + finished uint64 |
| + items channels.Channel |
| + waiton chan bool |
| +} |
| + |
| +func (q *fileQueue) add(s string) { |
| + atomic.AddUint64(&q.queued, 1) |
| + q.items.In() <- s |
| +} |
| + |
| +func (q *fileQueue) done() { |
| + atomic.AddUint64(&q.finished, 1) |
| + |
| + if q.queued == q.finished { |
| + q.items.Close() |
| + q.waiton <- true |
| + } |
| +} |
| + |
| +func (q *fileQueue) wait() { |
| + <-q.waiton |
| +} |
| + |
| +func examinePath(queue *fileQueue, smallfile_limit int64, obs WalkObserver) { |
| + for ipath := range queue.items.Out() { |
| + path := ipath.(string) |
| + |
| + fi, err := os.Stat(path) |
| + if err != nil { |
| + obs.Error(path, err) |
| + return |
| + } |
| + |
| + if fi.IsDir() { |
| + f, err := os.Open(path) |
| + if err != nil { |
| + obs.Error(path, err) |
| + } |
| + |
| + dircontents, err := f.Readdirnames(-1) |
| + if err != nil { |
| + obs.Error(path, err) |
| + } |
| + sort.Strings(dircontents) |
| + for _, name := range dircontents { |
| + fname := filepath.Join(path, name) |
| + queue.add(fname) |
| + } |
| + } else { |
| + if fi.Size() < smallfile_limit { |
| + data, err := ioutil.ReadFile(path) |
| + if err != nil { |
| + obs.Error(path, err) |
| + return |
| + } |
| + if int64(len(data)) != fi.Size() { |
| + panic("file size was wrong!") |
| + } |
| + obs.SmallFile(path, data) |
| + } else { |
| + obs.LargeFile(path) |
| + } |
| + } |
| + queue.done() |
| + } |
| +} |
| + |
| +func WalkParallel(root string, smallfile_limit int64, obs WalkObserver) { |
| + queue := fileQueue{queued: 0, finished: 0, items: channels.NewInfiniteChannel(), waiton: make(chan bool)} |
| + |
| + for w := 0; w <= 10; w++ { |
| + go examinePath(&queue, smallfile_limit, obs) |
| + } |
| + |
| + queue.add(root) |
| + queue.wait() |
| + obs.Finished() |
| +} |