Chromium Code Reviews| Index: go/redisutil/rtc.go |
| diff --git a/go/redisutil/rtc.go b/go/redisutil/rtc.go |
| index 81b5b4b4b0908a976e33323bd046c43f2975746e..b171aae3e2b15651a652f6fb19fdebfd598647c1 100644 |
| --- a/go/redisutil/rtc.go |
| +++ b/go/redisutil/rtc.go |
| @@ -2,10 +2,11 @@ package redisutil |
| import ( |
| "fmt" |
| - "time" |
| + "sync" |
| "github.com/garyburd/redigo/redis" |
| "github.com/skia-dev/glog" |
| + |
| "go.skia.org/infra/go/rtcache" |
| "go.skia.org/infra/go/util" |
| ) |
| @@ -33,9 +34,9 @@ type RedisRTC struct { |
| // produce the desired resul. |
| errKeyPrefix string |
| - // queueSubChannel is the channel where keyspace notifications are generated |
| - // when a new item is added to the task queue. |
| - queueSubChannel string |
| + // workReadyKey is the name of the queue that indicates that there is work |
| + // available. |
| + workReadyKey string |
| // finishedChannel is the key of the PubSub channel where finished tasked are |
| // announced. |
| @@ -50,6 +51,12 @@ type RedisRTC struct { |
| // doneCh is a channel to subscribe to be notified when a task is done. |
| doneCh chan<- *doneSub |
| + |
| + // shutdownCh is used to signal by closing it that all go-routines need to shut down. |
| + shutdownCh chan bool |
| + |
| + // shutdownWg is the WaitGroup used to synchronize the shutdown. |
| + shutdownWg sync.WaitGroup |
| } |
| // workerTask is an auxiliary struct that contains the ID of a task |
| @@ -88,13 +95,14 @@ func NewReadThroughCache(redisPool *RedisPool, queueName string, worker rtcache. |
| ret := &RedisRTC{ |
| redisPool: redisPool, |
| queueKey: queueKey, |
| + workReadyKey: queueKey + ":wr", |
| inProgressKey: queueKey + ":inp", |
| finishedChannel: finishedChannel, |
| keyPrefix: keyPrefix, |
| errKeyPrefix: errKeyPrefix, |
| - queueSubChannel: fmt.Sprintf("__keyspace@%d__:%s", redisPool.db, queueKey), |
| codec: codec, |
| worker: worker, |
| + shutdownCh: make(chan bool), |
| } |
| // Start the feeder process. |
| @@ -133,31 +141,55 @@ func (r *RedisRTC) startWorkers(nWorkers int) error { |
| } |
| for i := 0; i < nWorkers; i++ { |
| + r.shutdownWg.Add(1) |
| go func() { |
| + var task *workerTask |
| + WorkerLoop: |
| for { |
| - task := <-workCh |
| + // Wait for a task or the signal to shut down. |
| + select { |
| + case task = <-workCh: |
| + case <-r.shutdownCh: |
| + break WorkerLoop |
| + } |
| + |
| r.writeWorkerResult(task.priority, task.id) |
| } |
| + r.shutdownWg.Done() |
| }() |
| } |
| return nil |
| } |
| +// shutdown terminates all go-routines started by this instance. |
| +func (r *RedisRTC) shutdown() { |
| + close(r.shutdownCh) |
| + r.shutdownWg.Wait() |
| +} |
| + |
| // getWorkChannel returns a channel that sends tasks to be processed. |
| func (r *RedisRTC) getWorkChannel() (<-chan *workerTask, error) { |
| ret := make(chan *workerTask) |
| - subCh, err := r.getKeyEventChannel(r.queueSubChannel, false) |
| - if err != nil { |
| - return nil, err |
| - } |
| - |
| + r.shutdownWg.Add(1) |
| go func() { |
| - hasMore := true |
| + hasMore := false |
| + WorkLoop: |
| for { |
| if !hasMore { |
| - <-subCh |
| + hasMore = r.checkForWork() |
| + } |
| + |
| + // Check if we need to shutdown. |
| + select { |
| + case <-r.shutdownCh: |
| + break WorkLoop |
| + default: |
| + } |
| + |
| + if !hasMore { |
| + continue |
| } |
| // Get the next task. |
| @@ -172,6 +204,7 @@ func (r *RedisRTC) getWorkChannel() (<-chan *workerTask, error) { |
| ret <- workTask |
| } |
| } |
| + r.shutdownWg.Done() |
| }() |
| return ret, nil |
| @@ -183,7 +216,7 @@ func (r *RedisRTC) dequeue() (*workerTask, int, error) { |
| c := r.redisPool.Get() |
| defer util.Close(c) |
| - // TODO: this needs to be a transaction. |
| + // Remove the first element in the transaction. |
| util.LogErr(c.Send("MULTI")) |
| util.LogErr(c.Send("ZRANGE", r.queueKey, 0, 0, "WITHSCORES")) |
| util.LogErr(c.Send("ZREMRANGEBYRANK", r.queueKey, 0, 0)) |
| @@ -254,60 +287,27 @@ func (r *RedisRTC) writeWorkerResult(priority int64, id string) { |
| } |
| } |
| -// getKeyEventChannel returns a channel that sends events for key changes in |
| -// Redis. |
| -func (r *RedisRTC) getKeyEventChannel(channelOrPattern string, isPattern bool) (<-chan string, error) { |
| - // Listen for changes on the queue continously. |
| - psc := redis.PubSubConn{Conn: r.redisPool.Get()} |
| +// waitForWork waits until there is either a work indicator in the queue |
| +// or 1 second has passed. It will only return false if there was an error. |
| +func (r *RedisRTC) checkForWork() bool { |
| + c := r.redisPool.Get() |
| + defer util.Close(c) |
| + _, err := redis.Values(c.Do("BLPOP", r.workReadyKey, 1)) |
| - subscribe := func() error { |
| - if isPattern { |
| - return psc.PSubscribe(channelOrPattern) |
| - } else { |
| - return psc.Subscribe(channelOrPattern) |
| - } |
| + // If we timed out return true because there could be work and it will |
| + // cause the caller to poll the queue. |
|
dogben
2016/08/09 13:50:49
Why checkForWork() at all? Why not just poll the q
|
| + if err == redis.ErrNil { |
| + return true |
| } |
| - // Subscribe to the key events |
| - if err := subscribe(); err != nil { |
| - return nil, err |
| + // If there was an error. |
| + if err != nil { |
| + glog.Errorf("Error retrieving list: %s", err) |
| + return false |
| } |
| - readyCh := make(chan bool) |
| - ret := make(chan string) |
| - go func() { |
| - for { |
| - Loop: |
| - for { |
| - switch v := psc.Receive().(type) { |
| - case redis.PMessage: |
| - ret <- string(v.Data) |
| - case redis.Message: |
| - ret <- string(v.Data) |
| - case redis.Subscription: |
| - if readyCh != nil { |
| - readyCh <- true |
| - close(readyCh) |
| - } |
| - case error: |
| - glog.Errorf("Error waiting for key events: %s", v) |
| - glog.Infof("Reconnecting.") |
| - util.Close(psc) |
| - break Loop |
| - } |
| - } |
| - |
| - readyCh = nil |
| - psc = redis.PubSubConn{Conn: r.redisPool.Get()} |
| - if err := subscribe(); err != nil { |
| - glog.Errorf("Error re-connecting: %s", err) |
| - time.Sleep(time.Second) |
| - } |
| - } |
| - }() |
| - <-readyCh |
| - |
| - return ret, nil |
| + // Now we got something from the queue and should check for work. |
| + return true |
| } |
| // enqueue adds the given task and priority to the task queue. Updating the |
| @@ -342,7 +342,10 @@ func (r *RedisRTC) enqueue(id string, priority int64) (bool, error) { |
| } |
| if saveId { |
| - if _, err = c.Do("ZADD", r.queueKey, priority, id); err != nil { |
| + util.LogErr(c.Send("MULTI")) |
| + util.LogErr(c.Send("ZADD", r.queueKey, priority, id)) |
| + util.LogErr(c.Send("RPUSH", r.workReadyKey, []byte("W"))) |
| + if _, err = c.Do("EXEC"); err != nil { |
| return false, err |
| } |
| } |
| @@ -395,6 +398,7 @@ func (r *RedisRTC) startWorkScheduler() (chan<- *doneSub, error) { |
| } |
| doneCh := make(chan *doneSub) |
| + r.shutdownWg.Add(1) |
| go func() { |
| watchIds := map[string][]chan bool{} |
| notifyChannels := func(id string) { |
| @@ -413,8 +417,11 @@ func (r *RedisRTC) startWorkScheduler() (chan<- *doneSub, error) { |
| } |
| } |
| + WorkLoop: |
| for { |
| select { |
| + case <-r.shutdownCh: |
| + break WorkLoop |
| case subscription := <-doneCh: |
| watchIds[subscription.id] = append(watchIds[subscription.id], subscription.notifyCh) |
| if r.isFinished(subscription.id) { |
| @@ -429,6 +436,7 @@ func (r *RedisRTC) startWorkScheduler() (chan<- *doneSub, error) { |
| } |
| } |
| } |
| + r.shutdownWg.Done() |
| }() |
| return doneCh, nil |