Index: go/redisutil/rtc.go |
diff --git a/go/redisutil/rtc.go b/go/redisutil/rtc.go |
index 81b5b4b4b0908a976e33323bd046c43f2975746e..b171aae3e2b15651a652f6fb19fdebfd598647c1 100644 |
--- a/go/redisutil/rtc.go |
+++ b/go/redisutil/rtc.go |
@@ -2,10 +2,11 @@ package redisutil |
import ( |
"fmt" |
- "time" |
+ "sync" |
"github.com/garyburd/redigo/redis" |
"github.com/skia-dev/glog" |
+ |
"go.skia.org/infra/go/rtcache" |
"go.skia.org/infra/go/util" |
) |
@@ -33,9 +34,9 @@ type RedisRTC struct { |
// produce the desired resul. |
errKeyPrefix string |
- // queueSubChannel is the channel where keyspace notifications are generated |
- // when a new item is added to the task queue. |
- queueSubChannel string |
+ // workReadyKey is the name of the queue that indicates that there is work |
+ // available. |
+ workReadyKey string |
// finishedChannel is the key of the PubSub channel where finished tasked are |
// announced. |
@@ -50,6 +51,12 @@ type RedisRTC struct { |
// doneCh is a channel to subscribe to be notified when a task is done. |
doneCh chan<- *doneSub |
+ |
+ // shutdownCh is used to signal by closing it that all go-routines need to shut down. |
+ shutdownCh chan bool |
+ |
+ // shutdownWg is the WaitGroup used to synchronize the shutdown. |
+ shutdownWg sync.WaitGroup |
} |
// workerTask is an auxiliary struct that contains the ID of a task |
@@ -88,13 +95,14 @@ func NewReadThroughCache(redisPool *RedisPool, queueName string, worker rtcache. |
ret := &RedisRTC{ |
redisPool: redisPool, |
queueKey: queueKey, |
+ workReadyKey: queueKey + ":wr", |
inProgressKey: queueKey + ":inp", |
finishedChannel: finishedChannel, |
keyPrefix: keyPrefix, |
errKeyPrefix: errKeyPrefix, |
- queueSubChannel: fmt.Sprintf("__keyspace@%d__:%s", redisPool.db, queueKey), |
codec: codec, |
worker: worker, |
+ shutdownCh: make(chan bool), |
} |
// Start the feeder process. |
@@ -133,31 +141,55 @@ func (r *RedisRTC) startWorkers(nWorkers int) error { |
} |
for i := 0; i < nWorkers; i++ { |
+ r.shutdownWg.Add(1) |
go func() { |
+ var task *workerTask |
+ WorkerLoop: |
for { |
- task := <-workCh |
+ // Wait for a task or the signal to shut down. |
+ select { |
+ case task = <-workCh: |
+ case <-r.shutdownCh: |
+ break WorkerLoop |
+ } |
+ |
r.writeWorkerResult(task.priority, task.id) |
} |
+ r.shutdownWg.Done() |
}() |
} |
return nil |
} |
+// shutdown terminates all go-routines started by this instance. |
+func (r *RedisRTC) shutdown() { |
+ close(r.shutdownCh) |
+ r.shutdownWg.Wait() |
+} |
+ |
// getWorkChannel returns a channel that sends tasks to be processed. |
func (r *RedisRTC) getWorkChannel() (<-chan *workerTask, error) { |
ret := make(chan *workerTask) |
- subCh, err := r.getKeyEventChannel(r.queueSubChannel, false) |
- if err != nil { |
- return nil, err |
- } |
- |
+ r.shutdownWg.Add(1) |
go func() { |
- hasMore := true |
+ hasMore := false |
+ WorkLoop: |
for { |
if !hasMore { |
- <-subCh |
+ hasMore = r.checkForWork() |
+ } |
+ |
+ // Check if we need to shutdown. |
+ select { |
+ case <-r.shutdownCh: |
+ break WorkLoop |
+ default: |
+ } |
+ |
+ if !hasMore { |
+ continue |
} |
// Get the next task. |
@@ -172,6 +204,7 @@ func (r *RedisRTC) getWorkChannel() (<-chan *workerTask, error) { |
ret <- workTask |
} |
} |
+ r.shutdownWg.Done() |
}() |
return ret, nil |
@@ -183,7 +216,7 @@ func (r *RedisRTC) dequeue() (*workerTask, int, error) { |
c := r.redisPool.Get() |
defer util.Close(c) |
- // TODO: this needs to be a transaction. |
+ // Remove the first element in the transaction. |
util.LogErr(c.Send("MULTI")) |
util.LogErr(c.Send("ZRANGE", r.queueKey, 0, 0, "WITHSCORES")) |
util.LogErr(c.Send("ZREMRANGEBYRANK", r.queueKey, 0, 0)) |
@@ -254,60 +287,27 @@ func (r *RedisRTC) writeWorkerResult(priority int64, id string) { |
} |
} |
-// getKeyEventChannel returns a channel that sends events for key changes in |
-// Redis. |
-func (r *RedisRTC) getKeyEventChannel(channelOrPattern string, isPattern bool) (<-chan string, error) { |
- // Listen for changes on the queue continously. |
- psc := redis.PubSubConn{Conn: r.redisPool.Get()} |
+// waitForWork waits until there is either a work indicator in the queue |
+// or 1 second has passed. It will only return false if there was an error. |
+func (r *RedisRTC) checkForWork() bool { |
+ c := r.redisPool.Get() |
+ defer util.Close(c) |
+ _, err := redis.Values(c.Do("BLPOP", r.workReadyKey, 1)) |
- subscribe := func() error { |
- if isPattern { |
- return psc.PSubscribe(channelOrPattern) |
- } else { |
- return psc.Subscribe(channelOrPattern) |
- } |
+ // If we timed out return true because there could be work and it will |
+ // cause the caller to poll the queue. |
dogben
2016/08/09 13:50:49
Why checkForWork() at all? Why not just poll the q
|
+ if err == redis.ErrNil { |
+ return true |
} |
- // Subscribe to the key events |
- if err := subscribe(); err != nil { |
- return nil, err |
+ // If there was an error. |
+ if err != nil { |
+ glog.Errorf("Error retrieving list: %s", err) |
+ return false |
} |
- readyCh := make(chan bool) |
- ret := make(chan string) |
- go func() { |
- for { |
- Loop: |
- for { |
- switch v := psc.Receive().(type) { |
- case redis.PMessage: |
- ret <- string(v.Data) |
- case redis.Message: |
- ret <- string(v.Data) |
- case redis.Subscription: |
- if readyCh != nil { |
- readyCh <- true |
- close(readyCh) |
- } |
- case error: |
- glog.Errorf("Error waiting for key events: %s", v) |
- glog.Infof("Reconnecting.") |
- util.Close(psc) |
- break Loop |
- } |
- } |
- |
- readyCh = nil |
- psc = redis.PubSubConn{Conn: r.redisPool.Get()} |
- if err := subscribe(); err != nil { |
- glog.Errorf("Error re-connecting: %s", err) |
- time.Sleep(time.Second) |
- } |
- } |
- }() |
- <-readyCh |
- |
- return ret, nil |
+ // Now we got something from the queue and should check for work. |
+ return true |
} |
// enqueue adds the given task and priority to the task queue. Updating the |
@@ -342,7 +342,10 @@ func (r *RedisRTC) enqueue(id string, priority int64) (bool, error) { |
} |
if saveId { |
- if _, err = c.Do("ZADD", r.queueKey, priority, id); err != nil { |
+ util.LogErr(c.Send("MULTI")) |
+ util.LogErr(c.Send("ZADD", r.queueKey, priority, id)) |
+ util.LogErr(c.Send("RPUSH", r.workReadyKey, []byte("W"))) |
+ if _, err = c.Do("EXEC"); err != nil { |
return false, err |
} |
} |
@@ -395,6 +398,7 @@ func (r *RedisRTC) startWorkScheduler() (chan<- *doneSub, error) { |
} |
doneCh := make(chan *doneSub) |
+ r.shutdownWg.Add(1) |
go func() { |
watchIds := map[string][]chan bool{} |
notifyChannels := func(id string) { |
@@ -413,8 +417,11 @@ func (r *RedisRTC) startWorkScheduler() (chan<- *doneSub, error) { |
} |
} |
+ WorkLoop: |
for { |
select { |
+ case <-r.shutdownCh: |
+ break WorkLoop |
case subscription := <-doneCh: |
watchIds[subscription.id] = append(watchIds[subscription.id], subscription.notifyCh) |
if r.isFinished(subscription.id) { |
@@ -429,6 +436,7 @@ func (r *RedisRTC) startWorkScheduler() (chan<- *doneSub, error) { |
} |
} |
} |
+ r.shutdownWg.Done() |
}() |
return doneCh, nil |