Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package parallel | 5 package parallel |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "sync" | 8 "sync" |
| 9 | 9 |
| 10 "github.com/luci/luci-go/common/errors" | 10 "github.com/luci/luci-go/common/errors" |
| 11 ) | 11 ) |
| 12 | 12 |
| 13 // WorkPool creates a fixed-size pool of worker goroutines. A supplied generator | 13 // WorkPool creates a fixed-size pool of worker goroutines. A supplied generator |
| 14 // method creates task functions and passes them through to the work pool. | 14 // method creates task functions and passes them through to the work pool. |
| 15 // Available workers will consume tasks from the pool and execute them until the | 15 // Available workers will consume tasks from the pool and execute them until the |
| 16 // generator is finished. | 16 // generator is finished. |
| 17 // | 17 // |
| 18 // WorkPool blocks until all the generator completes and all workers have | 18 // WorkPool blocks until all the generator completes and all workers have |
| 19 // finished their tasks. | 19 // finished their tasks. |
| 20 func WorkPool(workers int, gen func(chan<- func() error)) error { | 20 func WorkPool(workers int, gen func(chan<- func() error)) error { |
| 21 if workers < 0 { | 21 if workers < 0 { |
| 22 return errors.New("invalid number of workers") | 22 return errors.New("invalid number of workers") |
| 23 } | 23 } |
| 24 | 24 |
| 25 sem := make(Semaphore, workers) | 25 sem := make(Semaphore, workers) |
| 26 » errchan := make(chan error, workers) | 26 » return Run(sem, gen) |
| 27 » funchan := make(chan func() error, workers) | 27 } |
| 28 | |
| 29 // Run executes task functions produced by a generator method. Execution is | |
| 30 // throttled by an optional Semaphore, requiring a token prior to dispatch. | |
| 31 // | |
| 32 // Run blocks until all the generator completes and all workers have finished | |
| 33 // their tasks, returning a MultiError if a failure was encountered. | |
| 34 func Run(sem Semaphore, gen func(chan<- func() error)) error { | |
|
dnj (Google)
2016/01/21 04:36:24
Useful b/c the FanOutIn model is great, but someti
| |
| 35 » errchan := make(chan error, cap(sem)) | |
| 36 » funchan := make(chan func() error, cap(sem)) | |
| 28 | 37 |
| 29 go func() { | 38 go func() { |
| 30 defer close(funchan) | 39 defer close(funchan) |
| 31 gen(funchan) | 40 gen(funchan) |
| 32 }() | 41 }() |
| 33 | 42 |
| 34 go func() { | 43 go func() { |
| 35 grp := sync.WaitGroup{} | 44 grp := sync.WaitGroup{} |
| 36 | 45 |
| 37 for fn := range funchan { | 46 for fn := range funchan { |
| (...skipping 11 matching lines...) Expand all Loading... | |
| 49 errchan <- fn() | 58 errchan <- fn() |
| 50 }() | 59 }() |
| 51 } | 60 } |
| 52 | 61 |
| 53 grp.Wait() | 62 grp.Wait() |
| 54 close(errchan) | 63 close(errchan) |
| 55 }() | 64 }() |
| 56 | 65 |
| 57 return errors.MultiErrorFromErrors(errchan) | 66 return errors.MultiErrorFromErrors(errchan) |
| 58 } | 67 } |
| OLD | NEW |