Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(228)

Unified Diff: go/redisutil/rtc.go

Issue 2200833004: Fixing RedisRTC implementation and test (Closed) Base URL: https://skia.googlesource.com/buildbot@master
Patch Set: Incorporated Feedback Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | go/redisutil/rtc_test.go » ('j') | go/redisutil/rtc_test.go » ('J')
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: go/redisutil/rtc.go
diff --git a/go/redisutil/rtc.go b/go/redisutil/rtc.go
index 81b5b4b4b0908a976e33323bd046c43f2975746e..b171aae3e2b15651a652f6fb19fdebfd598647c1 100644
--- a/go/redisutil/rtc.go
+++ b/go/redisutil/rtc.go
@@ -2,10 +2,11 @@ package redisutil
import (
"fmt"
- "time"
+ "sync"
"github.com/garyburd/redigo/redis"
"github.com/skia-dev/glog"
+
"go.skia.org/infra/go/rtcache"
"go.skia.org/infra/go/util"
)
@@ -33,9 +34,9 @@ type RedisRTC struct {
// produce the desired resul.
errKeyPrefix string
- // queueSubChannel is the channel where keyspace notifications are generated
- // when a new item is added to the task queue.
- queueSubChannel string
+ // workReadyKey is the name of the queue that indicates that there is work
+ // available.
+ workReadyKey string
// finishedChannel is the key of the PubSub channel where finished tasked are
// announced.
@@ -50,6 +51,12 @@ type RedisRTC struct {
// doneCh is a channel to subscribe to be notified when a task is done.
doneCh chan<- *doneSub
+
+ // shutdownCh is used to signal by closing it that all go-routines need to shut down.
+ shutdownCh chan bool
+
+ // shutdownWg is the WaitGroup used to synchronize the shutdown.
+ shutdownWg sync.WaitGroup
}
// workerTask is an auxiliary struct that contains the ID of a task
@@ -88,13 +95,14 @@ func NewReadThroughCache(redisPool *RedisPool, queueName string, worker rtcache.
ret := &RedisRTC{
redisPool: redisPool,
queueKey: queueKey,
+ workReadyKey: queueKey + ":wr",
inProgressKey: queueKey + ":inp",
finishedChannel: finishedChannel,
keyPrefix: keyPrefix,
errKeyPrefix: errKeyPrefix,
- queueSubChannel: fmt.Sprintf("__keyspace@%d__:%s", redisPool.db, queueKey),
codec: codec,
worker: worker,
+ shutdownCh: make(chan bool),
}
// Start the feeder process.
@@ -133,31 +141,55 @@ func (r *RedisRTC) startWorkers(nWorkers int) error {
}
for i := 0; i < nWorkers; i++ {
+ r.shutdownWg.Add(1)
go func() {
+ var task *workerTask
+ WorkerLoop:
for {
- task := <-workCh
+ // Wait for a task or the signal to shut down.
+ select {
+ case task = <-workCh:
+ case <-r.shutdownCh:
+ break WorkerLoop
+ }
+
r.writeWorkerResult(task.priority, task.id)
}
+ r.shutdownWg.Done()
}()
}
return nil
}
+// shutdown terminates all go-routines started by this instance.
+func (r *RedisRTC) shutdown() {
+ close(r.shutdownCh)
+ r.shutdownWg.Wait()
+}
+
// getWorkChannel returns a channel that sends tasks to be processed.
func (r *RedisRTC) getWorkChannel() (<-chan *workerTask, error) {
ret := make(chan *workerTask)
- subCh, err := r.getKeyEventChannel(r.queueSubChannel, false)
- if err != nil {
- return nil, err
- }
-
+ r.shutdownWg.Add(1)
go func() {
- hasMore := true
+ hasMore := false
+ WorkLoop:
for {
if !hasMore {
- <-subCh
+ hasMore = r.checkForWork()
+ }
+
+ // Check if we need to shutdown.
+ select {
+ case <-r.shutdownCh:
+ break WorkLoop
+ default:
+ }
+
+ if !hasMore {
+ continue
}
// Get the next task.
@@ -172,6 +204,7 @@ func (r *RedisRTC) getWorkChannel() (<-chan *workerTask, error) {
ret <- workTask
}
}
+ r.shutdownWg.Done()
}()
return ret, nil
@@ -183,7 +216,7 @@ func (r *RedisRTC) dequeue() (*workerTask, int, error) {
c := r.redisPool.Get()
defer util.Close(c)
- // TODO: this needs to be a transaction.
+ // Remove the first element in the transaction.
util.LogErr(c.Send("MULTI"))
util.LogErr(c.Send("ZRANGE", r.queueKey, 0, 0, "WITHSCORES"))
util.LogErr(c.Send("ZREMRANGEBYRANK", r.queueKey, 0, 0))
@@ -254,60 +287,27 @@ func (r *RedisRTC) writeWorkerResult(priority int64, id string) {
}
}
-// getKeyEventChannel returns a channel that sends events for key changes in
-// Redis.
-func (r *RedisRTC) getKeyEventChannel(channelOrPattern string, isPattern bool) (<-chan string, error) {
- // Listen for changes on the queue continously.
- psc := redis.PubSubConn{Conn: r.redisPool.Get()}
+// waitForWork waits until there is either a work indicator in the queue
+// or 1 second has passed. It will only return false if there was an error.
+func (r *RedisRTC) checkForWork() bool {
+ c := r.redisPool.Get()
+ defer util.Close(c)
+ _, err := redis.Values(c.Do("BLPOP", r.workReadyKey, 1))
- subscribe := func() error {
- if isPattern {
- return psc.PSubscribe(channelOrPattern)
- } else {
- return psc.Subscribe(channelOrPattern)
- }
+ // If we timed out return true because there could be work and it will
+ // cause the caller to poll the queue.
dogben 2016/08/09 13:50:49 Why checkForWork() at all? Why not just poll the q
+ if err == redis.ErrNil {
+ return true
}
- // Subscribe to the key events
- if err := subscribe(); err != nil {
- return nil, err
+ // If there was an error.
+ if err != nil {
+ glog.Errorf("Error retrieving list: %s", err)
+ return false
}
- readyCh := make(chan bool)
- ret := make(chan string)
- go func() {
- for {
- Loop:
- for {
- switch v := psc.Receive().(type) {
- case redis.PMessage:
- ret <- string(v.Data)
- case redis.Message:
- ret <- string(v.Data)
- case redis.Subscription:
- if readyCh != nil {
- readyCh <- true
- close(readyCh)
- }
- case error:
- glog.Errorf("Error waiting for key events: %s", v)
- glog.Infof("Reconnecting.")
- util.Close(psc)
- break Loop
- }
- }
-
- readyCh = nil
- psc = redis.PubSubConn{Conn: r.redisPool.Get()}
- if err := subscribe(); err != nil {
- glog.Errorf("Error re-connecting: %s", err)
- time.Sleep(time.Second)
- }
- }
- }()
- <-readyCh
-
- return ret, nil
+ // Now we got something from the queue and should check for work.
+ return true
}
// enqueue adds the given task and priority to the task queue. Updating the
@@ -342,7 +342,10 @@ func (r *RedisRTC) enqueue(id string, priority int64) (bool, error) {
}
if saveId {
- if _, err = c.Do("ZADD", r.queueKey, priority, id); err != nil {
+ util.LogErr(c.Send("MULTI"))
+ util.LogErr(c.Send("ZADD", r.queueKey, priority, id))
+ util.LogErr(c.Send("RPUSH", r.workReadyKey, []byte("W")))
+ if _, err = c.Do("EXEC"); err != nil {
return false, err
}
}
@@ -395,6 +398,7 @@ func (r *RedisRTC) startWorkScheduler() (chan<- *doneSub, error) {
}
doneCh := make(chan *doneSub)
+ r.shutdownWg.Add(1)
go func() {
watchIds := map[string][]chan bool{}
notifyChannels := func(id string) {
@@ -413,8 +417,11 @@ func (r *RedisRTC) startWorkScheduler() (chan<- *doneSub, error) {
}
}
+ WorkLoop:
for {
select {
+ case <-r.shutdownCh:
+ break WorkLoop
case subscription := <-doneCh:
watchIds[subscription.id] = append(watchIds[subscription.id], subscription.notifyCh)
if r.isFinished(subscription.id) {
@@ -429,6 +436,7 @@ func (r *RedisRTC) startWorkScheduler() (chan<- *doneSub, error) {
}
}
}
+ r.shutdownWg.Done()
}()
return doneCh, nil
« no previous file with comments | « no previous file | go/redisutil/rtc_test.go » ('j') | go/redisutil/rtc_test.go » ('J')

Powered by Google App Engine
This is Rietveld 408576698