Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(371)

Side by Side Diff: go/redisutil/rtc.go

Issue 2200833004: Fixing RedisRTC implementation and test (Closed) Base URL: https://skia.googlesource.com/buildbot@master
Patch Set: Incorporated Feedback Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | go/redisutil/rtc_test.go » ('j') | go/redisutil/rtc_test.go » ('J')
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 package redisutil 1 package redisutil
2 2
3 import ( 3 import (
4 "fmt" 4 "fmt"
5 » "time" 5 » "sync"
6 6
7 "github.com/garyburd/redigo/redis" 7 "github.com/garyburd/redigo/redis"
8 "github.com/skia-dev/glog" 8 "github.com/skia-dev/glog"
9
9 "go.skia.org/infra/go/rtcache" 10 "go.skia.org/infra/go/rtcache"
10 "go.skia.org/infra/go/util" 11 "go.skia.org/infra/go/util"
11 ) 12 )
12 13
13 const ( 14 const (
14 // QUEUE_PREFIX is the prefix for the task queue which is a sorted set i n Redis. 15 // QUEUE_PREFIX is the prefix for the task queue which is a sorted set i n Redis.
15 QUEUE_PREFIX = "rc:" 16 QUEUE_PREFIX = "rc:"
16 ) 17 )
17 18
18 // RedisRTC implements a rtcache.ReadThroughCache with a Redis backend. 19 // RedisRTC implements a rtcache.ReadThroughCache with a Redis backend.
19 type RedisRTC struct { 20 type RedisRTC struct {
20 // redisPool is the connection to redis this read-through-cache uses. 21 // redisPool is the connection to redis this read-through-cache uses.
21 redisPool *RedisPool 22 redisPool *RedisPool
22 23
23 // queueKey is the key of the work queue (sorted set) in Redis. 24 // queueKey is the key of the work queue (sorted set) in Redis.
24 queueKey string 25 queueKey string
25 26
26 // inProgressKey is the key of the set of current tasks in progress. 27 // inProgressKey is the key of the set of current tasks in progress.
27 inProgressKey string 28 inProgressKey string
28 29
29 // keyPrefix is the prefix of the results that are cached in redis. 30 // keyPrefix is the prefix of the results that are cached in redis.
30 keyPrefix string 31 keyPrefix string
31 32
32 // errKeyPrefix is the prefix of the error message if the task failed to 33 // errKeyPrefix is the prefix of the error message if the task failed to
33 // produce the desired resul. 34 // produce the desired resul.
34 errKeyPrefix string 35 errKeyPrefix string
35 36
36 » // queueSubChannel is the channel where keyspace notifications are gener ated 37 » // workReadyKey is the name of the queue that indicates that there is wo rk
37 » // when a new item is added to the task queue. 38 » // available.
38 » queueSubChannel string 39 » workReadyKey string
39 40
40 // finishedChannel is the key of the PubSub channel where finished taske d are 41 // finishedChannel is the key of the PubSub channel where finished taske d are
41 // announced. 42 // announced.
42 finishedChannel string 43 finishedChannel string
43 44
44 // codec is used to serialize and desirialize the cached items. 45 // codec is used to serialize and desirialize the cached items.
45 codec util.LRUCodec 46 codec util.LRUCodec
46 47
47 // worker is the function that is called to produce an item when it is n ot 48 // worker is the function that is called to produce an item when it is n ot
48 // cached. 49 // cached.
49 worker rtcache.ReadThroughFunc 50 worker rtcache.ReadThroughFunc
50 51
51 // doneCh is a channel to subscribe to be notified when a task is done. 52 // doneCh is a channel to subscribe to be notified when a task is done.
52 doneCh chan<- *doneSub 53 doneCh chan<- *doneSub
54
55 // shutdownCh is used to signal by closing it that all go-routines need to shut down.
56 shutdownCh chan bool
57
58 // shutdownWg is the WaitGroup used to synchronize the shutdown.
59 shutdownWg sync.WaitGroup
53 } 60 }
54 61
55 // workerTask is an auxiliary struct that contains the ID of a task 62 // workerTask is an auxiliary struct that contains the ID of a task
56 // and it's priority. 63 // and it's priority.
57 type workerTask struct { 64 type workerTask struct {
58 id string 65 id string
59 priority int64 66 priority int64
60 } 67 }
61 68
62 // doneSub is an auxiliary struct that is used by tasks to subscribe to be 69 // doneSub is an auxiliary struct that is used by tasks to subscribe to be
(...skipping 18 matching lines...) Expand all
81 } 88 }
82 89
83 queueKey := QUEUE_PREFIX + queueName 90 queueKey := QUEUE_PREFIX + queueName
84 keyPrefix := queueKey + ":k:" 91 keyPrefix := queueKey + ":k:"
85 errKeyPrefix := queueKey + ":er:" 92 errKeyPrefix := queueKey + ":er:"
86 finishedChannel := queueKey + ":ch" 93 finishedChannel := queueKey + ":ch"
87 94
88 ret := &RedisRTC{ 95 ret := &RedisRTC{
89 redisPool: redisPool, 96 redisPool: redisPool,
90 queueKey: queueKey, 97 queueKey: queueKey,
98 workReadyKey: queueKey + ":wr",
91 inProgressKey: queueKey + ":inp", 99 inProgressKey: queueKey + ":inp",
92 finishedChannel: finishedChannel, 100 finishedChannel: finishedChannel,
93 keyPrefix: keyPrefix, 101 keyPrefix: keyPrefix,
94 errKeyPrefix: errKeyPrefix, 102 errKeyPrefix: errKeyPrefix,
95 queueSubChannel: fmt.Sprintf("__keyspace@%d__:%s", redisPool.db, queueKey),
96 codec: codec, 103 codec: codec,
97 worker: worker, 104 worker: worker,
105 shutdownCh: make(chan bool),
98 } 106 }
99 107
100 // Start the feeder process. 108 // Start the feeder process.
101 var err error 109 var err error
102 if ret.doneCh, err = ret.startWorkScheduler(); err != nil { 110 if ret.doneCh, err = ret.startWorkScheduler(); err != nil {
103 return nil, err 111 return nil, err
104 } 112 }
105 113
106 // Start the workers if we have a worker specified. 114 // Start the workers if we have a worker specified.
107 if worker != nil { 115 if worker != nil {
(...skipping 18 matching lines...) Expand all
126 return r.waitFor(id, priority, returnBytes) 134 return r.waitFor(id, priority, returnBytes)
127 } 135 }
128 136
129 func (r *RedisRTC) startWorkers(nWorkers int) error { 137 func (r *RedisRTC) startWorkers(nWorkers int) error {
130 workCh, err := r.getWorkChannel() 138 workCh, err := r.getWorkChannel()
131 if err != nil { 139 if err != nil {
132 return err 140 return err
133 } 141 }
134 142
135 for i := 0; i < nWorkers; i++ { 143 for i := 0; i < nWorkers; i++ {
144 r.shutdownWg.Add(1)
136 go func() { 145 go func() {
146 var task *workerTask
147 WorkerLoop:
137 for { 148 for {
138 » » » » task := <-workCh 149 » » » » // Wait for a task or the signal to shut down.
150 » » » » select {
151 » » » » case task = <-workCh:
152 » » » » case <-r.shutdownCh:
153 » » » » » break WorkerLoop
154 » » » » }
155
139 r.writeWorkerResult(task.priority, task.id) 156 r.writeWorkerResult(task.priority, task.id)
140 } 157 }
158 r.shutdownWg.Done()
141 }() 159 }()
142 } 160 }
143 161
144 return nil 162 return nil
145 } 163 }
146 164
165 // shutdown terminates all go-routines started by this instance.
166 func (r *RedisRTC) shutdown() {
167 close(r.shutdownCh)
168 r.shutdownWg.Wait()
169 }
170
147 // getWorkChannel returns a channel that sends tasks to be processed. 171 // getWorkChannel returns a channel that sends tasks to be processed.
148 func (r *RedisRTC) getWorkChannel() (<-chan *workerTask, error) { 172 func (r *RedisRTC) getWorkChannel() (<-chan *workerTask, error) {
149 ret := make(chan *workerTask) 173 ret := make(chan *workerTask)
150 174
151 » subCh, err := r.getKeyEventChannel(r.queueSubChannel, false) 175 » r.shutdownWg.Add(1)
152 » if err != nil {
153 » » return nil, err
154 » }
155
156 go func() { 176 go func() {
157 » » hasMore := true 177 » » hasMore := false
178 » WorkLoop:
158 for { 179 for {
159 if !hasMore { 180 if !hasMore {
160 » » » » <-subCh 181 » » » » hasMore = r.checkForWork()
182 » » » }
183
184 » » » // Check if we need to shutdown.
185 » » » select {
186 » » » case <-r.shutdownCh:
187 » » » » break WorkLoop
188 » » » default:
189 » » » }
190
191 » » » if !hasMore {
192 » » » » continue
161 } 193 }
162 194
163 // Get the next task. 195 // Get the next task.
164 workTask, itemsLeft, err := r.dequeue() 196 workTask, itemsLeft, err := r.dequeue()
165 if err != nil { 197 if err != nil {
166 glog.Errorf("Error moving work ids: %s", err) 198 glog.Errorf("Error moving work ids: %s", err)
167 continue 199 continue
168 } 200 }
169 201
170 hasMore = itemsLeft > 0 202 hasMore = itemsLeft > 0
171 if workTask != nil { 203 if workTask != nil {
172 ret <- workTask 204 ret <- workTask
173 } 205 }
174 } 206 }
207 r.shutdownWg.Done()
175 }() 208 }()
176 209
177 return ret, nil 210 return ret, nil
178 } 211 }
179 212
180 // dequeue returns a task to be performed by the worker. 213 // dequeue returns a task to be performed by the worker.
181 // It returns a triple: workerTask, itemsLeft_in_the_task_queue, error. 214 // It returns a triple: workerTask, itemsLeft_in_the_task_queue, error.
182 func (r *RedisRTC) dequeue() (*workerTask, int, error) { 215 func (r *RedisRTC) dequeue() (*workerTask, int, error) {
183 c := r.redisPool.Get() 216 c := r.redisPool.Get()
184 defer util.Close(c) 217 defer util.Close(c)
185 218
186 » // TODO: this needs to be a transaction. 219 » // Remove the first element in the transaction.
187 util.LogErr(c.Send("MULTI")) 220 util.LogErr(c.Send("MULTI"))
188 util.LogErr(c.Send("ZRANGE", r.queueKey, 0, 0, "WITHSCORES")) 221 util.LogErr(c.Send("ZRANGE", r.queueKey, 0, 0, "WITHSCORES"))
189 util.LogErr(c.Send("ZREMRANGEBYRANK", r.queueKey, 0, 0)) 222 util.LogErr(c.Send("ZREMRANGEBYRANK", r.queueKey, 0, 0))
190 util.LogErr(c.Send("ZCARD", r.queueKey)) 223 util.LogErr(c.Send("ZCARD", r.queueKey))
191 combinedVals, err := redis.Values(c.Do("EXEC")) 224 combinedVals, err := redis.Values(c.Do("EXEC"))
192 225
193 if err != nil { 226 if err != nil {
194 return nil, 0, err 227 return nil, 0, err
195 } 228 }
196 229
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after
247 // whether we need to retry. 280 // whether we need to retry.
248 if err != nil { 281 if err != nil {
249 util.LogErr(c.Send("EXPIRE", writeKey, 10)) 282 util.LogErr(c.Send("EXPIRE", writeKey, 10))
250 } 283 }
251 util.LogErr(c.Send("PUBLISH", r.finishedChannel, id)) 284 util.LogErr(c.Send("PUBLISH", r.finishedChannel, id))
252 if _, err = c.Do("EXEC"); err != nil { 285 if _, err = c.Do("EXEC"); err != nil {
253 glog.Errorf("Error writing result to redis: %s", err) 286 glog.Errorf("Error writing result to redis: %s", err)
254 } 287 }
255 } 288 }
256 289
257 // getKeyEventChannel returns a channel that sends events for key changes in 290 // waitForWork waits until there is either a work indicator in the queue
258 // Redis. 291 // or 1 second has passed. It will only return false if there was an error.
259 func (r *RedisRTC) getKeyEventChannel(channelOrPattern string, isPattern bool) ( <-chan string, error) { 292 func (r *RedisRTC) checkForWork() bool {
260 » // Listen for changes on the queue continously. 293 » c := r.redisPool.Get()
261 » psc := redis.PubSubConn{Conn: r.redisPool.Get()} 294 » defer util.Close(c)
295 » _, err := redis.Values(c.Do("BLPOP", r.workReadyKey, 1))
262 296
263 » subscribe := func() error { 297 » // If we timed out return true because there could be work and it will
264 » » if isPattern { 298 » // cause the caller to poll the queue.
dogben 2016/08/09 13:50:49 Why checkForWork() at all? Why not just poll the q
265 » » » return psc.PSubscribe(channelOrPattern) 299 » if err == redis.ErrNil {
266 » » } else { 300 » » return true
267 » » » return psc.Subscribe(channelOrPattern)
268 » » }
269 } 301 }
270 302
271 » // Subscribe to the key events 303 » // If there was an error.
272 » if err := subscribe(); err != nil { 304 » if err != nil {
273 » » return nil, err 305 » » glog.Errorf("Error retrieving list: %s", err)
306 » » return false
274 } 307 }
275 308
276 » readyCh := make(chan bool) 309 » // Now we got something from the queue and should check for work.
277 » ret := make(chan string) 310 » return true
278 » go func() {
279 » » for {
280 » » Loop:
281 » » » for {
282 » » » » switch v := psc.Receive().(type) {
283 » » » » case redis.PMessage:
284 » » » » » ret <- string(v.Data)
285 » » » » case redis.Message:
286 » » » » » ret <- string(v.Data)
287 » » » » case redis.Subscription:
288 » » » » » if readyCh != nil {
289 » » » » » » readyCh <- true
290 » » » » » » close(readyCh)
291 » » » » » }
292 » » » » case error:
293 » » » » » glog.Errorf("Error waiting for key event s: %s", v)
294 » » » » » glog.Infof("Reconnecting.")
295 » » » » » util.Close(psc)
296 » » » » » break Loop
297 » » » » }
298 » » » }
299
300 » » » readyCh = nil
301 » » » psc = redis.PubSubConn{Conn: r.redisPool.Get()}
302 » » » if err := subscribe(); err != nil {
303 » » » » glog.Errorf("Error re-connecting: %s", err)
304 » » » » time.Sleep(time.Second)
305 » » » }
306 » » }
307 » }()
308 » <-readyCh
309
310 » return ret, nil
311 } 311 }
312 312
313 // enqueue adds the given task and priority to the task queue. Updating the 313 // enqueue adds the given task and priority to the task queue. Updating the
314 // priority if necessary. 314 // priority if necessary.
315 func (r *RedisRTC) enqueue(id string, priority int64) (bool, error) { 315 func (r *RedisRTC) enqueue(id string, priority int64) (bool, error) {
316 c := r.redisPool.Get() 316 c := r.redisPool.Get()
317 defer util.Close(c) 317 defer util.Close(c)
318 318
319 util.LogErr(c.Send("MULTI")) 319 util.LogErr(c.Send("MULTI"))
320 util.LogErr(c.Send("ZSCORE", r.queueKey, id)) 320 util.LogErr(c.Send("ZSCORE", r.queueKey, id))
(...skipping 14 matching lines...) Expand all
335 if !isInProgress && !found { 335 if !isInProgress && !found {
336 saveId := true 336 saveId := true
337 337
338 // Only update the queue if this has a lower score. 338 // Only update the queue if this has a lower score.
339 if inQueueScore != nil { 339 if inQueueScore != nil {
340 oldPriority, _ := redis.Int64(inQueueScore, nil) 340 oldPriority, _ := redis.Int64(inQueueScore, nil)
341 saveId = priority < oldPriority 341 saveId = priority < oldPriority
342 } 342 }
343 343
344 if saveId { 344 if saveId {
345 » » » if _, err = c.Do("ZADD", r.queueKey, priority, id); err != nil { 345 » » » util.LogErr(c.Send("MULTI"))
346 » » » util.LogErr(c.Send("ZADD", r.queueKey, priority, id))
347 » » » util.LogErr(c.Send("RPUSH", r.workReadyKey, []byte("W")) )
348 » » » if _, err = c.Do("EXEC"); err != nil {
346 return false, err 349 return false, err
347 } 350 }
348 } 351 }
349 } 352 }
350 353
351 return found, nil 354 return found, nil
352 } 355 }
353 356
354 // inQueue returns up to 'maxElements' ids that are currently in the work 357 // inQueue returns up to 'maxElements' ids that are currently in the work
355 // queue. This is primarily for testing. 358 // queue. This is primarily for testing.
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
388 391
389 // startWorkScheduler starts the background progress that takes tasks 392 // startWorkScheduler starts the background progress that takes tasks
390 // from the doneCh and adds them to the Redis priority queue. 393 // from the doneCh and adds them to the Redis priority queue.
391 func (r *RedisRTC) startWorkScheduler() (chan<- *doneSub, error) { 394 func (r *RedisRTC) startWorkScheduler() (chan<- *doneSub, error) {
392 finishedCh, err := r.redisPool.subscribeToChannel(r.finishedChannel) 395 finishedCh, err := r.redisPool.subscribeToChannel(r.finishedChannel)
393 if err != nil { 396 if err != nil {
394 return nil, err 397 return nil, err
395 } 398 }
396 399
397 doneCh := make(chan *doneSub) 400 doneCh := make(chan *doneSub)
401 r.shutdownWg.Add(1)
398 go func() { 402 go func() {
399 watchIds := map[string][]chan bool{} 403 watchIds := map[string][]chan bool{}
400 notifyChannels := func(id string) { 404 notifyChannels := func(id string) {
401 for _, ch := range watchIds[id] { 405 for _, ch := range watchIds[id] {
402 ch <- true 406 ch <- true
403 close(ch) 407 close(ch)
404 } 408 }
405 delete(watchIds, id) 409 delete(watchIds, id)
406 } 410 }
407 411
408 notifyAll := func() { 412 notifyAll := func() {
409 for id := range watchIds { 413 for id := range watchIds {
410 if r.isFinished(id) { 414 if r.isFinished(id) {
411 notifyChannels(id) 415 notifyChannels(id)
412 } 416 }
413 } 417 }
414 } 418 }
415 419
420 WorkLoop:
416 for { 421 for {
417 select { 422 select {
423 case <-r.shutdownCh:
424 break WorkLoop
418 case subscription := <-doneCh: 425 case subscription := <-doneCh:
419 watchIds[subscription.id] = append(watchIds[subs cription.id], subscription.notifyCh) 426 watchIds[subscription.id] = append(watchIds[subs cription.id], subscription.notifyCh)
420 if r.isFinished(subscription.id) { 427 if r.isFinished(subscription.id) {
421 notifyChannels(subscription.id) 428 notifyChannels(subscription.id)
422 } 429 }
423 case finishedId := <-finishedCh: 430 case finishedId := <-finishedCh:
424 // An emtpy string indicates that we (re)connect ed. 431 // An emtpy string indicates that we (re)connect ed.
425 if string(finishedId) == "" { 432 if string(finishedId) == "" {
426 notifyAll() 433 notifyAll()
427 } else { 434 } else {
428 notifyChannels(string(finishedId)) 435 notifyChannels(string(finishedId))
429 } 436 }
430 } 437 }
431 } 438 }
439 r.shutdownWg.Done()
432 }() 440 }()
433 441
434 return doneCh, nil 442 return doneCh, nil
435 } 443 }
436 444
437 // waitFor blocks until the key identified by id is available in Redis. 445 // waitFor blocks until the key identified by id is available in Redis.
438 func (r *RedisRTC) waitFor(id string, priority int64, returnBytes bool) (interfa ce{}, error) { 446 func (r *RedisRTC) waitFor(id string, priority int64, returnBytes bool) (interfa ce{}, error) {
439 var found bool 447 var found bool
440 var err error 448 var err error
441 if found, err = r.enqueue(id, priority); err != nil { 449 if found, err = r.enqueue(id, priority); err != nil {
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after
492 // key returns the Redis key for the given id. 500 // key returns the Redis key for the given id.
493 func (r *RedisRTC) key(id string) string { 501 func (r *RedisRTC) key(id string) string {
494 return r.keyPrefix + id 502 return r.keyPrefix + id
495 } 503 }
496 504
497 // errorKey returns the key of the error message for the given ID if the 505 // errorKey returns the key of the error message for the given ID if the
498 // the worker call failed. 506 // the worker call failed.
499 func (r *RedisRTC) errorKey(id string) string { 507 func (r *RedisRTC) errorKey(id string) string {
500 return r.errKeyPrefix + id 508 return r.errKeyPrefix + id
501 } 509 }
OLDNEW
« no previous file with comments | « no previous file | go/redisutil/rtc_test.go » ('j') | go/redisutil/rtc_test.go » ('J')

Powered by Google App Engine
This is Rietveld 408576698