Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 package redisutil | 1 package redisutil |
| 2 | 2 |
| 3 import ( | 3 import ( |
| 4 "fmt" | 4 "fmt" |
| 5 » "time" | 5 » "sync" |
| 6 | 6 |
| 7 "github.com/garyburd/redigo/redis" | 7 "github.com/garyburd/redigo/redis" |
| 8 "github.com/skia-dev/glog" | 8 "github.com/skia-dev/glog" |
| 9 | |
| 9 "go.skia.org/infra/go/rtcache" | 10 "go.skia.org/infra/go/rtcache" |
| 10 "go.skia.org/infra/go/util" | 11 "go.skia.org/infra/go/util" |
| 11 ) | 12 ) |
| 12 | 13 |
| 13 const ( | 14 const ( |
| 14 // QUEUE_PREFIX is the prefix for the task queue which is a sorted set i n Redis. | 15 // QUEUE_PREFIX is the prefix for the task queue which is a sorted set i n Redis. |
| 15 QUEUE_PREFIX = "rc:" | 16 QUEUE_PREFIX = "rc:" |
| 16 ) | 17 ) |
| 17 | 18 |
| 18 // RedisRTC implements a rtcache.ReadThroughCache with a Redis backend. | 19 // RedisRTC implements a rtcache.ReadThroughCache with a Redis backend. |
| 19 type RedisRTC struct { | 20 type RedisRTC struct { |
| 20 // redisPool is the connection to redis this read-through-cache uses. | 21 // redisPool is the connection to redis this read-through-cache uses. |
| 21 redisPool *RedisPool | 22 redisPool *RedisPool |
| 22 | 23 |
| 23 // queueKey is the key of the work queue (sorted set) in Redis. | 24 // queueKey is the key of the work queue (sorted set) in Redis. |
| 24 queueKey string | 25 queueKey string |
| 25 | 26 |
| 26 // inProgressKey is the key of the set of current tasks in progress. | 27 // inProgressKey is the key of the set of current tasks in progress. |
| 27 inProgressKey string | 28 inProgressKey string |
| 28 | 29 |
| 29 // keyPrefix is the prefix of the results that are cached in redis. | 30 // keyPrefix is the prefix of the results that are cached in redis. |
| 30 keyPrefix string | 31 keyPrefix string |
| 31 | 32 |
| 32 // errKeyPrefix is the prefix of the error message if the task failed to | 33 // errKeyPrefix is the prefix of the error message if the task failed to |
| 33 // produce the desired resul. | 34 // produce the desired resul. |
| 34 errKeyPrefix string | 35 errKeyPrefix string |
| 35 | 36 |
| 36 » // queueSubChannel is the channel where keyspace notifications are gener ated | 37 » // workReadyKey is the name of the queue that indicates that there is wo rk |
| 37 » // when a new item is added to the task queue. | 38 » // available. |
| 38 » queueSubChannel string | 39 » workReadyKey string |
| 39 | 40 |
| 40 // finishedChannel is the key of the PubSub channel where finished taske d are | 41 // finishedChannel is the key of the PubSub channel where finished taske d are |
| 41 // announced. | 42 // announced. |
| 42 finishedChannel string | 43 finishedChannel string |
| 43 | 44 |
| 44 // codec is used to serialize and desirialize the cached items. | 45 // codec is used to serialize and desirialize the cached items. |
| 45 codec util.LRUCodec | 46 codec util.LRUCodec |
| 46 | 47 |
| 47 // worker is the function that is called to produce an item when it is n ot | 48 // worker is the function that is called to produce an item when it is n ot |
| 48 // cached. | 49 // cached. |
| 49 worker rtcache.ReadThroughFunc | 50 worker rtcache.ReadThroughFunc |
| 50 | 51 |
| 51 // doneCh is a channel to subscribe to be notified when a task is done. | 52 // doneCh is a channel to subscribe to be notified when a task is done. |
| 52 doneCh chan<- *doneSub | 53 doneCh chan<- *doneSub |
| 54 | |
| 55 // shutdownCh is used to signal by closing it that all go-routines need to shut down. | |
| 56 shutdownCh chan bool | |
| 57 | |
| 58 // shutdownWg is the WaitGroup used to synchronize the shutdown. | |
| 59 shutdownWg sync.WaitGroup | |
| 53 } | 60 } |
| 54 | 61 |
| 55 // workerTask is an auxiliary struct that contains the ID of a task | 62 // workerTask is an auxiliary struct that contains the ID of a task |
| 56 // and it's priority. | 63 // and it's priority. |
| 57 type workerTask struct { | 64 type workerTask struct { |
| 58 id string | 65 id string |
| 59 priority int64 | 66 priority int64 |
| 60 } | 67 } |
| 61 | 68 |
| 62 // doneSub is an auxiliary struct that is used by tasks to subscribe to be | 69 // doneSub is an auxiliary struct that is used by tasks to subscribe to be |
| (...skipping 18 matching lines...) Expand all Loading... | |
| 81 } | 88 } |
| 82 | 89 |
| 83 queueKey := QUEUE_PREFIX + queueName | 90 queueKey := QUEUE_PREFIX + queueName |
| 84 keyPrefix := queueKey + ":k:" | 91 keyPrefix := queueKey + ":k:" |
| 85 errKeyPrefix := queueKey + ":er:" | 92 errKeyPrefix := queueKey + ":er:" |
| 86 finishedChannel := queueKey + ":ch" | 93 finishedChannel := queueKey + ":ch" |
| 87 | 94 |
| 88 ret := &RedisRTC{ | 95 ret := &RedisRTC{ |
| 89 redisPool: redisPool, | 96 redisPool: redisPool, |
| 90 queueKey: queueKey, | 97 queueKey: queueKey, |
| 98 workReadyKey: queueKey + ":wr", | |
| 91 inProgressKey: queueKey + ":inp", | 99 inProgressKey: queueKey + ":inp", |
| 92 finishedChannel: finishedChannel, | 100 finishedChannel: finishedChannel, |
| 93 keyPrefix: keyPrefix, | 101 keyPrefix: keyPrefix, |
| 94 errKeyPrefix: errKeyPrefix, | 102 errKeyPrefix: errKeyPrefix, |
| 95 queueSubChannel: fmt.Sprintf("__keyspace@%d__:%s", redisPool.db, queueKey), | |
| 96 codec: codec, | 103 codec: codec, |
| 97 worker: worker, | 104 worker: worker, |
| 105 shutdownCh: make(chan bool), | |
| 98 } | 106 } |
| 99 | 107 |
| 100 // Start the feeder process. | 108 // Start the feeder process. |
| 101 var err error | 109 var err error |
| 102 if ret.doneCh, err = ret.startWorkScheduler(); err != nil { | 110 if ret.doneCh, err = ret.startWorkScheduler(); err != nil { |
| 103 return nil, err | 111 return nil, err |
| 104 } | 112 } |
| 105 | 113 |
| 106 // Start the workers if we have a worker specified. | 114 // Start the workers if we have a worker specified. |
| 107 if worker != nil { | 115 if worker != nil { |
| (...skipping 18 matching lines...) Expand all Loading... | |
| 126 return r.waitFor(id, priority, returnBytes) | 134 return r.waitFor(id, priority, returnBytes) |
| 127 } | 135 } |
| 128 | 136 |
| 129 func (r *RedisRTC) startWorkers(nWorkers int) error { | 137 func (r *RedisRTC) startWorkers(nWorkers int) error { |
| 130 workCh, err := r.getWorkChannel() | 138 workCh, err := r.getWorkChannel() |
| 131 if err != nil { | 139 if err != nil { |
| 132 return err | 140 return err |
| 133 } | 141 } |
| 134 | 142 |
| 135 for i := 0; i < nWorkers; i++ { | 143 for i := 0; i < nWorkers; i++ { |
| 144 r.shutdownWg.Add(1) | |
| 136 go func() { | 145 go func() { |
| 146 var task *workerTask | |
| 147 WorkerLoop: | |
| 137 for { | 148 for { |
| 138 » » » » task := <-workCh | 149 » » » » // Wait for a task or the signal to shut down. |
| 150 » » » » select { | |
| 151 » » » » case task = <-workCh: | |
| 152 » » » » case <-r.shutdownCh: | |
| 153 » » » » » break WorkerLoop | |
| 154 » » » » } | |
| 155 | |
| 139 r.writeWorkerResult(task.priority, task.id) | 156 r.writeWorkerResult(task.priority, task.id) |
| 140 } | 157 } |
| 158 r.shutdownWg.Done() | |
| 141 }() | 159 }() |
| 142 } | 160 } |
| 143 | 161 |
| 144 return nil | 162 return nil |
| 145 } | 163 } |
| 146 | 164 |
| 165 // shutdown terminates all go-routines started by this instance. | |
| 166 func (r *RedisRTC) shutdown() { | |
| 167 close(r.shutdownCh) | |
| 168 r.shutdownWg.Wait() | |
| 169 } | |
| 170 | |
| 147 // getWorkChannel returns a channel that sends tasks to be processed. | 171 // getWorkChannel returns a channel that sends tasks to be processed. |
| 148 func (r *RedisRTC) getWorkChannel() (<-chan *workerTask, error) { | 172 func (r *RedisRTC) getWorkChannel() (<-chan *workerTask, error) { |
| 149 ret := make(chan *workerTask) | 173 ret := make(chan *workerTask) |
| 150 | 174 |
| 151 » subCh, err := r.getKeyEventChannel(r.queueSubChannel, false) | 175 » r.shutdownWg.Add(1) |
| 152 » if err != nil { | |
| 153 » » return nil, err | |
| 154 » } | |
| 155 | |
| 156 go func() { | 176 go func() { |
| 157 » » hasMore := true | 177 » » hasMore := false |
| 178 » WorkLoop: | |
| 158 for { | 179 for { |
| 159 if !hasMore { | 180 if !hasMore { |
| 160 » » » » <-subCh | 181 » » » » hasMore = r.checkForWork() |
| 182 » » » } | |
| 183 | |
| 184 » » » // Check if we need to shutdown. | |
| 185 » » » select { | |
| 186 » » » case <-r.shutdownCh: | |
| 187 » » » » break WorkLoop | |
| 188 » » » default: | |
| 189 » » » } | |
| 190 | |
| 191 » » » if !hasMore { | |
| 192 » » » » continue | |
| 161 } | 193 } |
| 162 | 194 |
| 163 // Get the next task. | 195 // Get the next task. |
| 164 workTask, itemsLeft, err := r.dequeue() | 196 workTask, itemsLeft, err := r.dequeue() |
| 165 if err != nil { | 197 if err != nil { |
| 166 glog.Errorf("Error moving work ids: %s", err) | 198 glog.Errorf("Error moving work ids: %s", err) |
| 167 continue | 199 continue |
| 168 } | 200 } |
| 169 | 201 |
| 170 hasMore = itemsLeft > 0 | 202 hasMore = itemsLeft > 0 |
| 171 if workTask != nil { | 203 if workTask != nil { |
| 172 ret <- workTask | 204 ret <- workTask |
| 173 } | 205 } |
| 174 } | 206 } |
| 207 r.shutdownWg.Done() | |
| 175 }() | 208 }() |
| 176 | 209 |
| 177 return ret, nil | 210 return ret, nil |
| 178 } | 211 } |
| 179 | 212 |
| 180 // dequeue returns a task to be performed by the worker. | 213 // dequeue returns a task to be performed by the worker. |
| 181 // It returns a triple: workerTask, itemsLeft_in_the_task_queue, error. | 214 // It returns a triple: workerTask, itemsLeft_in_the_task_queue, error. |
| 182 func (r *RedisRTC) dequeue() (*workerTask, int, error) { | 215 func (r *RedisRTC) dequeue() (*workerTask, int, error) { |
| 183 c := r.redisPool.Get() | 216 c := r.redisPool.Get() |
| 184 defer util.Close(c) | 217 defer util.Close(c) |
| 185 | 218 |
| 186 » // TODO: this needs to be a transaction. | 219 » // Remove the first element in the transaction. |
| 187 util.LogErr(c.Send("MULTI")) | 220 util.LogErr(c.Send("MULTI")) |
| 188 util.LogErr(c.Send("ZRANGE", r.queueKey, 0, 0, "WITHSCORES")) | 221 util.LogErr(c.Send("ZRANGE", r.queueKey, 0, 0, "WITHSCORES")) |
| 189 util.LogErr(c.Send("ZREMRANGEBYRANK", r.queueKey, 0, 0)) | 222 util.LogErr(c.Send("ZREMRANGEBYRANK", r.queueKey, 0, 0)) |
| 190 util.LogErr(c.Send("ZCARD", r.queueKey)) | 223 util.LogErr(c.Send("ZCARD", r.queueKey)) |
| 191 combinedVals, err := redis.Values(c.Do("EXEC")) | 224 combinedVals, err := redis.Values(c.Do("EXEC")) |
| 192 | 225 |
| 193 if err != nil { | 226 if err != nil { |
| 194 return nil, 0, err | 227 return nil, 0, err |
| 195 } | 228 } |
| 196 | 229 |
| (...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 247 // whether we need to retry. | 280 // whether we need to retry. |
| 248 if err != nil { | 281 if err != nil { |
| 249 util.LogErr(c.Send("EXPIRE", writeKey, 10)) | 282 util.LogErr(c.Send("EXPIRE", writeKey, 10)) |
| 250 } | 283 } |
| 251 util.LogErr(c.Send("PUBLISH", r.finishedChannel, id)) | 284 util.LogErr(c.Send("PUBLISH", r.finishedChannel, id)) |
| 252 if _, err = c.Do("EXEC"); err != nil { | 285 if _, err = c.Do("EXEC"); err != nil { |
| 253 glog.Errorf("Error writing result to redis: %s", err) | 286 glog.Errorf("Error writing result to redis: %s", err) |
| 254 } | 287 } |
| 255 } | 288 } |
| 256 | 289 |
| 257 // getKeyEventChannel returns a channel that sends events for key changes in | 290 // waitForWork waits until there is either a work indicator in the queue |
| 258 // Redis. | 291 // or 1 second has passed. It will only return false if there was an error. |
| 259 func (r *RedisRTC) getKeyEventChannel(channelOrPattern string, isPattern bool) ( <-chan string, error) { | 292 func (r *RedisRTC) checkForWork() bool { |
| 260 » // Listen for changes on the queue continously. | 293 » c := r.redisPool.Get() |
| 261 » psc := redis.PubSubConn{Conn: r.redisPool.Get()} | 294 » defer util.Close(c) |
| 295 » _, err := redis.Values(c.Do("BLPOP", r.workReadyKey, 1)) | |
| 262 | 296 |
| 263 » subscribe := func() error { | 297 » // If we timed out return true because there could be work and it will |
| 264 » » if isPattern { | 298 » // cause the caller to poll the queue. |
|
dogben
2016/08/09 13:50:49
Why checkForWork() at all? Why not just poll the q
| |
| 265 » » » return psc.PSubscribe(channelOrPattern) | 299 » if err == redis.ErrNil { |
| 266 » » } else { | 300 » » return true |
| 267 » » » return psc.Subscribe(channelOrPattern) | |
| 268 » » } | |
| 269 } | 301 } |
| 270 | 302 |
| 271 » // Subscribe to the key events | 303 » // If there was an error. |
| 272 » if err := subscribe(); err != nil { | 304 » if err != nil { |
| 273 » » return nil, err | 305 » » glog.Errorf("Error retrieving list: %s", err) |
| 306 » » return false | |
| 274 } | 307 } |
| 275 | 308 |
| 276 » readyCh := make(chan bool) | 309 » // Now we got something from the queue and should check for work. |
| 277 » ret := make(chan string) | 310 » return true |
| 278 » go func() { | |
| 279 » » for { | |
| 280 » » Loop: | |
| 281 » » » for { | |
| 282 » » » » switch v := psc.Receive().(type) { | |
| 283 » » » » case redis.PMessage: | |
| 284 » » » » » ret <- string(v.Data) | |
| 285 » » » » case redis.Message: | |
| 286 » » » » » ret <- string(v.Data) | |
| 287 » » » » case redis.Subscription: | |
| 288 » » » » » if readyCh != nil { | |
| 289 » » » » » » readyCh <- true | |
| 290 » » » » » » close(readyCh) | |
| 291 » » » » » } | |
| 292 » » » » case error: | |
| 293 » » » » » glog.Errorf("Error waiting for key event s: %s", v) | |
| 294 » » » » » glog.Infof("Reconnecting.") | |
| 295 » » » » » util.Close(psc) | |
| 296 » » » » » break Loop | |
| 297 » » » » } | |
| 298 » » » } | |
| 299 | |
| 300 » » » readyCh = nil | |
| 301 » » » psc = redis.PubSubConn{Conn: r.redisPool.Get()} | |
| 302 » » » if err := subscribe(); err != nil { | |
| 303 » » » » glog.Errorf("Error re-connecting: %s", err) | |
| 304 » » » » time.Sleep(time.Second) | |
| 305 » » » } | |
| 306 » » } | |
| 307 » }() | |
| 308 » <-readyCh | |
| 309 | |
| 310 » return ret, nil | |
| 311 } | 311 } |
| 312 | 312 |
| 313 // enqueue adds the given task and priority to the task queue. Updating the | 313 // enqueue adds the given task and priority to the task queue. Updating the |
| 314 // priority if necessary. | 314 // priority if necessary. |
| 315 func (r *RedisRTC) enqueue(id string, priority int64) (bool, error) { | 315 func (r *RedisRTC) enqueue(id string, priority int64) (bool, error) { |
| 316 c := r.redisPool.Get() | 316 c := r.redisPool.Get() |
| 317 defer util.Close(c) | 317 defer util.Close(c) |
| 318 | 318 |
| 319 util.LogErr(c.Send("MULTI")) | 319 util.LogErr(c.Send("MULTI")) |
| 320 util.LogErr(c.Send("ZSCORE", r.queueKey, id)) | 320 util.LogErr(c.Send("ZSCORE", r.queueKey, id)) |
| (...skipping 14 matching lines...) Expand all Loading... | |
| 335 if !isInProgress && !found { | 335 if !isInProgress && !found { |
| 336 saveId := true | 336 saveId := true |
| 337 | 337 |
| 338 // Only update the queue if this has a lower score. | 338 // Only update the queue if this has a lower score. |
| 339 if inQueueScore != nil { | 339 if inQueueScore != nil { |
| 340 oldPriority, _ := redis.Int64(inQueueScore, nil) | 340 oldPriority, _ := redis.Int64(inQueueScore, nil) |
| 341 saveId = priority < oldPriority | 341 saveId = priority < oldPriority |
| 342 } | 342 } |
| 343 | 343 |
| 344 if saveId { | 344 if saveId { |
| 345 » » » if _, err = c.Do("ZADD", r.queueKey, priority, id); err != nil { | 345 » » » util.LogErr(c.Send("MULTI")) |
| 346 » » » util.LogErr(c.Send("ZADD", r.queueKey, priority, id)) | |
| 347 » » » util.LogErr(c.Send("RPUSH", r.workReadyKey, []byte("W")) ) | |
| 348 » » » if _, err = c.Do("EXEC"); err != nil { | |
| 346 return false, err | 349 return false, err |
| 347 } | 350 } |
| 348 } | 351 } |
| 349 } | 352 } |
| 350 | 353 |
| 351 return found, nil | 354 return found, nil |
| 352 } | 355 } |
| 353 | 356 |
| 354 // inQueue returns up to 'maxElements' ids that are currently in the work | 357 // inQueue returns up to 'maxElements' ids that are currently in the work |
| 355 // queue. This is primarily for testing. | 358 // queue. This is primarily for testing. |
| (...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 388 | 391 |
| 389 // startWorkScheduler starts the background progress that takes tasks | 392 // startWorkScheduler starts the background progress that takes tasks |
| 390 // from the doneCh and adds them to the Redis priority queue. | 393 // from the doneCh and adds them to the Redis priority queue. |
| 391 func (r *RedisRTC) startWorkScheduler() (chan<- *doneSub, error) { | 394 func (r *RedisRTC) startWorkScheduler() (chan<- *doneSub, error) { |
| 392 finishedCh, err := r.redisPool.subscribeToChannel(r.finishedChannel) | 395 finishedCh, err := r.redisPool.subscribeToChannel(r.finishedChannel) |
| 393 if err != nil { | 396 if err != nil { |
| 394 return nil, err | 397 return nil, err |
| 395 } | 398 } |
| 396 | 399 |
| 397 doneCh := make(chan *doneSub) | 400 doneCh := make(chan *doneSub) |
| 401 r.shutdownWg.Add(1) | |
| 398 go func() { | 402 go func() { |
| 399 watchIds := map[string][]chan bool{} | 403 watchIds := map[string][]chan bool{} |
| 400 notifyChannels := func(id string) { | 404 notifyChannels := func(id string) { |
| 401 for _, ch := range watchIds[id] { | 405 for _, ch := range watchIds[id] { |
| 402 ch <- true | 406 ch <- true |
| 403 close(ch) | 407 close(ch) |
| 404 } | 408 } |
| 405 delete(watchIds, id) | 409 delete(watchIds, id) |
| 406 } | 410 } |
| 407 | 411 |
| 408 notifyAll := func() { | 412 notifyAll := func() { |
| 409 for id := range watchIds { | 413 for id := range watchIds { |
| 410 if r.isFinished(id) { | 414 if r.isFinished(id) { |
| 411 notifyChannels(id) | 415 notifyChannels(id) |
| 412 } | 416 } |
| 413 } | 417 } |
| 414 } | 418 } |
| 415 | 419 |
| 420 WorkLoop: | |
| 416 for { | 421 for { |
| 417 select { | 422 select { |
| 423 case <-r.shutdownCh: | |
| 424 break WorkLoop | |
| 418 case subscription := <-doneCh: | 425 case subscription := <-doneCh: |
| 419 watchIds[subscription.id] = append(watchIds[subs cription.id], subscription.notifyCh) | 426 watchIds[subscription.id] = append(watchIds[subs cription.id], subscription.notifyCh) |
| 420 if r.isFinished(subscription.id) { | 427 if r.isFinished(subscription.id) { |
| 421 notifyChannels(subscription.id) | 428 notifyChannels(subscription.id) |
| 422 } | 429 } |
| 423 case finishedId := <-finishedCh: | 430 case finishedId := <-finishedCh: |
| 424 // An emtpy string indicates that we (re)connect ed. | 431 // An emtpy string indicates that we (re)connect ed. |
| 425 if string(finishedId) == "" { | 432 if string(finishedId) == "" { |
| 426 notifyAll() | 433 notifyAll() |
| 427 } else { | 434 } else { |
| 428 notifyChannels(string(finishedId)) | 435 notifyChannels(string(finishedId)) |
| 429 } | 436 } |
| 430 } | 437 } |
| 431 } | 438 } |
| 439 r.shutdownWg.Done() | |
| 432 }() | 440 }() |
| 433 | 441 |
| 434 return doneCh, nil | 442 return doneCh, nil |
| 435 } | 443 } |
| 436 | 444 |
| 437 // waitFor blocks until the key identified by id is available in Redis. | 445 // waitFor blocks until the key identified by id is available in Redis. |
| 438 func (r *RedisRTC) waitFor(id string, priority int64, returnBytes bool) (interfa ce{}, error) { | 446 func (r *RedisRTC) waitFor(id string, priority int64, returnBytes bool) (interfa ce{}, error) { |
| 439 var found bool | 447 var found bool |
| 440 var err error | 448 var err error |
| 441 if found, err = r.enqueue(id, priority); err != nil { | 449 if found, err = r.enqueue(id, priority); err != nil { |
| (...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 492 // key returns the Redis key for the given id. | 500 // key returns the Redis key for the given id. |
| 493 func (r *RedisRTC) key(id string) string { | 501 func (r *RedisRTC) key(id string) string { |
| 494 return r.keyPrefix + id | 502 return r.keyPrefix + id |
| 495 } | 503 } |
| 496 | 504 |
| 497 // errorKey returns the key of the error message for the given ID if the | 505 // errorKey returns the key of the error message for the given ID if the |
| 498 // the worker call failed. | 506 // the worker call failed. |
| 499 func (r *RedisRTC) errorKey(id string) string { | 507 func (r *RedisRTC) errorKey(id string) string { |
| 500 return r.errKeyPrefix + id | 508 return r.errKeyPrefix + id |
| 501 } | 509 } |
| OLD | NEW |