Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(88)

Side by Side Diff: common/parallel/workPool.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package parallel 5 package parallel
6 6
7 import ( 7 import (
8 "sync" 8 "sync"
9 9
10 "github.com/luci/luci-go/common/errors" 10 "github.com/luci/luci-go/common/errors"
11 ) 11 )
12 12
13 // WorkPool creates a fixed-size pool of worker goroutines. A supplied generator 13 // WorkPool creates a fixed-size pool of worker goroutines. A supplied generator
14 // method creates task functions and passes them through to the work pool. 14 // method creates task functions and passes them through to the work pool.
15 // Available workers will consume tasks from the pool and execute them until the 15 // Available workers will consume tasks from the pool and execute them until the
16 // generator is finished. 16 // generator is finished.
17 // 17 //
18 // WorkPool blocks until all the generator completes and all workers have 18 // WorkPool blocks until all the generator completes and all workers have
19 // finished their tasks. 19 // finished their tasks.
20 func WorkPool(workers int, gen func(chan<- func() error)) error { 20 func WorkPool(workers int, gen func(chan<- func() error)) error {
21 if workers < 0 { 21 if workers < 0 {
22 return errors.New("invalid number of workers") 22 return errors.New("invalid number of workers")
23 } 23 }
24 24
25 sem := make(Semaphore, workers) 25 sem := make(Semaphore, workers)
26 » errchan := make(chan error, workers) 26 » return Run(sem, gen)
27 » funchan := make(chan func() error, workers) 27 }
28
29 // Run executes task functions produced by a generator method. Execution is
30 // throttled by an optional Semaphore, requiring a token prior to dispatch.
31 //
32 // Run blocks until all the generator completes and all workers have finished
33 // their tasks, returning a MultiError if a failure was encountered.
34 func Run(sem Semaphore, gen func(chan<- func() error)) error {
dnj (Google) 2016/01/21 04:36:24 Useful b/c the FanOutIn model is great, but someti
35 » errchan := make(chan error, cap(sem))
36 » funchan := make(chan func() error, cap(sem))
28 37
29 go func() { 38 go func() {
30 defer close(funchan) 39 defer close(funchan)
31 gen(funchan) 40 gen(funchan)
32 }() 41 }()
33 42
34 go func() { 43 go func() {
35 grp := sync.WaitGroup{} 44 grp := sync.WaitGroup{}
36 45
37 for fn := range funchan { 46 for fn := range funchan {
(...skipping 11 matching lines...) Expand all
49 errchan <- fn() 58 errchan <- fn()
50 }() 59 }()
51 } 60 }
52 61
53 grp.Wait() 62 grp.Wait()
54 close(errchan) 63 close(errchan)
55 }() 64 }()
56 65
57 return errors.MultiErrorFromErrors(errchan) 66 return errors.MultiErrorFromErrors(errchan)
58 } 67 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698