OLD | NEW |
---|---|
1 package redisutil | 1 package redisutil |
2 | 2 |
3 import ( | 3 import ( |
4 "fmt" | 4 "fmt" |
5 » "time" | 5 » "sync" |
6 | 6 |
7 "github.com/garyburd/redigo/redis" | 7 "github.com/garyburd/redigo/redis" |
8 "github.com/skia-dev/glog" | 8 "github.com/skia-dev/glog" |
9 | |
9 "go.skia.org/infra/go/rtcache" | 10 "go.skia.org/infra/go/rtcache" |
10 "go.skia.org/infra/go/util" | 11 "go.skia.org/infra/go/util" |
11 ) | 12 ) |
12 | 13 |
13 const ( | 14 const ( |
14 // QUEUE_PREFIX is the prefix for the task queue which is a sorted set i n Redis. | 15 // QUEUE_PREFIX is the prefix for the task queue which is a sorted set i n Redis. |
15 QUEUE_PREFIX = "rc:" | 16 QUEUE_PREFIX = "rc:" |
16 ) | 17 ) |
17 | 18 |
18 // RedisRTC implements a rtcache.ReadThroughCache with a Redis backend. | 19 // RedisRTC implements a rtcache.ReadThroughCache with a Redis backend. |
19 type RedisRTC struct { | 20 type RedisRTC struct { |
20 // redisPool is the connection to redis this read-through-cache uses. | 21 // redisPool is the connection to redis this read-through-cache uses. |
21 redisPool *RedisPool | 22 redisPool *RedisPool |
22 | 23 |
23 // queueKey is the key of the work queue (sorted set) in Redis. | 24 // queueKey is the key of the work queue (sorted set) in Redis. |
24 queueKey string | 25 queueKey string |
25 | 26 |
26 // inProgressKey is the key of the set of current tasks in progress. | 27 // inProgressKey is the key of the set of current tasks in progress. |
27 inProgressKey string | 28 inProgressKey string |
28 | 29 |
29 // keyPrefix is the prefix of the results that are cached in redis. | 30 // keyPrefix is the prefix of the results that are cached in redis. |
30 keyPrefix string | 31 keyPrefix string |
31 | 32 |
32 // errKeyPrefix is the prefix of the error message if the task failed to | 33 // errKeyPrefix is the prefix of the error message if the task failed to |
33 // produce the desired resul. | 34 // produce the desired resul. |
34 errKeyPrefix string | 35 errKeyPrefix string |
35 | 36 |
36 » // queueSubChannel is the channel where keyspace notifications are gener ated | 37 » // workReadyKey is the name of the queue that indicates that there is wo rk |
37 » // when a new item is added to the task queue. | 38 » // available. |
38 » queueSubChannel string | 39 » workReadyKey string |
39 | 40 |
40 // finishedChannel is the key of the PubSub channel where finished taske d are | 41 // finishedChannel is the key of the PubSub channel where finished taske d are |
41 // announced. | 42 // announced. |
42 finishedChannel string | 43 finishedChannel string |
43 | 44 |
44 // codec is used to serialize and desirialize the cached items. | 45 // codec is used to serialize and desirialize the cached items. |
45 codec util.LRUCodec | 46 codec util.LRUCodec |
46 | 47 |
47 // worker is the function that is called to produce an item when it is n ot | 48 // worker is the function that is called to produce an item when it is n ot |
48 // cached. | 49 // cached. |
49 worker rtcache.ReadThroughFunc | 50 worker rtcache.ReadThroughFunc |
50 | 51 |
51 // doneCh is a channel to subscribe to be notified when a task is done. | 52 // doneCh is a channel to subscribe to be notified when a task is done. |
52 doneCh chan<- *doneSub | 53 doneCh chan<- *doneSub |
54 | |
55 // shutdownCh is used to signal by closing it that all go-routines need to shut down. | |
56 shutdownCh chan bool | |
57 | |
58 // shutdownWg is the WaitGroup used to synchronize the shutdown. | |
59 shutdownWg sync.WaitGroup | |
53 } | 60 } |
54 | 61 |
55 // workerTask is an auxiliary struct that contains the ID of a task | 62 // workerTask is an auxiliary struct that contains the ID of a task |
56 // and it's priority. | 63 // and it's priority. |
57 type workerTask struct { | 64 type workerTask struct { |
58 id string | 65 id string |
59 priority int64 | 66 priority int64 |
60 } | 67 } |
61 | 68 |
62 // doneSub is an auxiliary struct that is used by tasks to subscribe to be | 69 // doneSub is an auxiliary struct that is used by tasks to subscribe to be |
(...skipping 18 matching lines...) Expand all Loading... | |
81 } | 88 } |
82 | 89 |
83 queueKey := QUEUE_PREFIX + queueName | 90 queueKey := QUEUE_PREFIX + queueName |
84 keyPrefix := queueKey + ":k:" | 91 keyPrefix := queueKey + ":k:" |
85 errKeyPrefix := queueKey + ":er:" | 92 errKeyPrefix := queueKey + ":er:" |
86 finishedChannel := queueKey + ":ch" | 93 finishedChannel := queueKey + ":ch" |
87 | 94 |
88 ret := &RedisRTC{ | 95 ret := &RedisRTC{ |
89 redisPool: redisPool, | 96 redisPool: redisPool, |
90 queueKey: queueKey, | 97 queueKey: queueKey, |
98 workReadyKey: queueKey + ":wr", | |
91 inProgressKey: queueKey + ":inp", | 99 inProgressKey: queueKey + ":inp", |
92 finishedChannel: finishedChannel, | 100 finishedChannel: finishedChannel, |
93 keyPrefix: keyPrefix, | 101 keyPrefix: keyPrefix, |
94 errKeyPrefix: errKeyPrefix, | 102 errKeyPrefix: errKeyPrefix, |
95 queueSubChannel: fmt.Sprintf("__keyspace@%d__:%s", redisPool.db, queueKey), | |
96 codec: codec, | 103 codec: codec, |
97 worker: worker, | 104 worker: worker, |
105 shutdownCh: make(chan bool), | |
98 } | 106 } |
99 | 107 |
100 // Start the feeder process. | 108 // Start the feeder process. |
101 var err error | 109 var err error |
102 if ret.doneCh, err = ret.startWorkScheduler(); err != nil { | 110 if ret.doneCh, err = ret.startWorkScheduler(); err != nil { |
103 return nil, err | 111 return nil, err |
104 } | 112 } |
105 | 113 |
106 // Start the workers if we have a worker specified. | 114 // Start the workers if we have a worker specified. |
107 if worker != nil { | 115 if worker != nil { |
(...skipping 18 matching lines...) Expand all Loading... | |
126 return r.waitFor(id, priority, returnBytes) | 134 return r.waitFor(id, priority, returnBytes) |
127 } | 135 } |
128 | 136 |
129 func (r *RedisRTC) startWorkers(nWorkers int) error { | 137 func (r *RedisRTC) startWorkers(nWorkers int) error { |
130 workCh, err := r.getWorkChannel() | 138 workCh, err := r.getWorkChannel() |
131 if err != nil { | 139 if err != nil { |
132 return err | 140 return err |
133 } | 141 } |
134 | 142 |
135 for i := 0; i < nWorkers; i++ { | 143 for i := 0; i < nWorkers; i++ { |
144 r.shutdownWg.Add(1) | |
136 go func() { | 145 go func() { |
146 var task *workerTask | |
147 WorkerLoop: | |
137 for { | 148 for { |
138 » » » » task := <-workCh | 149 » » » » // Wait for a task or the signal to shut down. |
150 » » » » select { | |
151 » » » » case task = <-workCh: | |
152 » » » » case <-r.shutdownCh: | |
153 » » » » » break WorkerLoop | |
154 » » » » } | |
155 | |
139 r.writeWorkerResult(task.priority, task.id) | 156 r.writeWorkerResult(task.priority, task.id) |
140 } | 157 } |
158 r.shutdownWg.Done() | |
141 }() | 159 }() |
142 } | 160 } |
143 | 161 |
144 return nil | 162 return nil |
145 } | 163 } |
146 | 164 |
165 // shutdown terminates all go-routines started by this instance. | |
166 func (r *RedisRTC) shutdown() { | |
167 close(r.shutdownCh) | |
168 r.shutdownWg.Wait() | |
169 } | |
170 | |
147 // getWorkChannel returns a channel that sends tasks to be processed. | 171 // getWorkChannel returns a channel that sends tasks to be processed. |
148 func (r *RedisRTC) getWorkChannel() (<-chan *workerTask, error) { | 172 func (r *RedisRTC) getWorkChannel() (<-chan *workerTask, error) { |
149 ret := make(chan *workerTask) | 173 ret := make(chan *workerTask) |
150 | 174 |
151 » subCh, err := r.getKeyEventChannel(r.queueSubChannel, false) | 175 » r.shutdownWg.Add(1) |
152 » if err != nil { | |
153 » » return nil, err | |
154 » } | |
155 | |
156 go func() { | 176 go func() { |
157 » » hasMore := true | 177 » » hasMore := false |
178 » WorkLoop: | |
158 for { | 179 for { |
159 if !hasMore { | 180 if !hasMore { |
160 » » » » <-subCh | 181 » » » » hasMore = r.checkForWork() |
182 » » » } | |
183 | |
184 » » » // Check if we need to shutdown. | |
185 » » » select { | |
186 » » » case <-r.shutdownCh: | |
187 » » » » break WorkLoop | |
188 » » » default: | |
189 » » » } | |
190 | |
191 » » » if !hasMore { | |
192 » » » » continue | |
161 } | 193 } |
162 | 194 |
163 // Get the next task. | 195 // Get the next task. |
164 workTask, itemsLeft, err := r.dequeue() | 196 workTask, itemsLeft, err := r.dequeue() |
165 if err != nil { | 197 if err != nil { |
166 glog.Errorf("Error moving work ids: %s", err) | 198 glog.Errorf("Error moving work ids: %s", err) |
167 continue | 199 continue |
168 } | 200 } |
169 | 201 |
170 hasMore = itemsLeft > 0 | 202 hasMore = itemsLeft > 0 |
171 if workTask != nil { | 203 if workTask != nil { |
172 ret <- workTask | 204 ret <- workTask |
173 } | 205 } |
174 } | 206 } |
207 r.shutdownWg.Done() | |
175 }() | 208 }() |
176 | 209 |
177 return ret, nil | 210 return ret, nil |
178 } | 211 } |
179 | 212 |
180 // dequeue returns a task to be performed by the worker. | 213 // dequeue returns a task to be performed by the worker. |
181 // It returns a triple: workerTask, itemsLeft_in_the_task_queue, error. | 214 // It returns a triple: workerTask, itemsLeft_in_the_task_queue, error. |
182 func (r *RedisRTC) dequeue() (*workerTask, int, error) { | 215 func (r *RedisRTC) dequeue() (*workerTask, int, error) { |
183 c := r.redisPool.Get() | 216 c := r.redisPool.Get() |
184 defer util.Close(c) | 217 defer util.Close(c) |
185 | 218 |
186 » // TODO: this needs to be a transaction. | 219 » // Remove the first element in the transaction. |
187 util.LogErr(c.Send("MULTI")) | 220 util.LogErr(c.Send("MULTI")) |
188 util.LogErr(c.Send("ZRANGE", r.queueKey, 0, 0, "WITHSCORES")) | 221 util.LogErr(c.Send("ZRANGE", r.queueKey, 0, 0, "WITHSCORES")) |
189 util.LogErr(c.Send("ZREMRANGEBYRANK", r.queueKey, 0, 0)) | 222 util.LogErr(c.Send("ZREMRANGEBYRANK", r.queueKey, 0, 0)) |
190 util.LogErr(c.Send("ZCARD", r.queueKey)) | 223 util.LogErr(c.Send("ZCARD", r.queueKey)) |
191 combinedVals, err := redis.Values(c.Do("EXEC")) | 224 combinedVals, err := redis.Values(c.Do("EXEC")) |
192 | 225 |
193 if err != nil { | 226 if err != nil { |
194 return nil, 0, err | 227 return nil, 0, err |
195 } | 228 } |
196 | 229 |
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
247 // whether we need to retry. | 280 // whether we need to retry. |
248 if err != nil { | 281 if err != nil { |
249 util.LogErr(c.Send("EXPIRE", writeKey, 10)) | 282 util.LogErr(c.Send("EXPIRE", writeKey, 10)) |
250 } | 283 } |
251 util.LogErr(c.Send("PUBLISH", r.finishedChannel, id)) | 284 util.LogErr(c.Send("PUBLISH", r.finishedChannel, id)) |
252 if _, err = c.Do("EXEC"); err != nil { | 285 if _, err = c.Do("EXEC"); err != nil { |
253 glog.Errorf("Error writing result to redis: %s", err) | 286 glog.Errorf("Error writing result to redis: %s", err) |
254 } | 287 } |
255 } | 288 } |
256 | 289 |
257 // getKeyEventChannel returns a channel that sends events for key changes in | 290 // waitForWork waits until there is either a work indicator in the queue |
258 // Redis. | 291 // or 1 second has passed. It will only return false if there was an error. |
259 func (r *RedisRTC) getKeyEventChannel(channelOrPattern string, isPattern bool) ( <-chan string, error) { | 292 func (r *RedisRTC) checkForWork() bool { |
260 » // Listen for changes on the queue continously. | 293 » c := r.redisPool.Get() |
261 » psc := redis.PubSubConn{Conn: r.redisPool.Get()} | 294 » defer util.Close(c) |
295 » _, err := redis.Values(c.Do("BLPOP", r.workReadyKey, 1)) | |
262 | 296 |
263 » subscribe := func() error { | 297 » // If we timed out return true because there could be work and it will |
264 » » if isPattern { | 298 » // cause the caller to poll the queue. |
dogben
2016/08/09 13:50:49
Why checkForWork() at all? Why not just poll the q
| |
265 » » » return psc.PSubscribe(channelOrPattern) | 299 » if err == redis.ErrNil { |
266 » » } else { | 300 » » return true |
267 » » » return psc.Subscribe(channelOrPattern) | |
268 » » } | |
269 } | 301 } |
270 | 302 |
271 » // Subscribe to the key events | 303 » // If there was an error. |
272 » if err := subscribe(); err != nil { | 304 » if err != nil { |
273 » » return nil, err | 305 » » glog.Errorf("Error retrieving list: %s", err) |
306 » » return false | |
274 } | 307 } |
275 | 308 |
276 » readyCh := make(chan bool) | 309 » // Now we got something from the queue and should check for work. |
277 » ret := make(chan string) | 310 » return true |
278 » go func() { | |
279 » » for { | |
280 » » Loop: | |
281 » » » for { | |
282 » » » » switch v := psc.Receive().(type) { | |
283 » » » » case redis.PMessage: | |
284 » » » » » ret <- string(v.Data) | |
285 » » » » case redis.Message: | |
286 » » » » » ret <- string(v.Data) | |
287 » » » » case redis.Subscription: | |
288 » » » » » if readyCh != nil { | |
289 » » » » » » readyCh <- true | |
290 » » » » » » close(readyCh) | |
291 » » » » » } | |
292 » » » » case error: | |
293 » » » » » glog.Errorf("Error waiting for key event s: %s", v) | |
294 » » » » » glog.Infof("Reconnecting.") | |
295 » » » » » util.Close(psc) | |
296 » » » » » break Loop | |
297 » » » » } | |
298 » » » } | |
299 | |
300 » » » readyCh = nil | |
301 » » » psc = redis.PubSubConn{Conn: r.redisPool.Get()} | |
302 » » » if err := subscribe(); err != nil { | |
303 » » » » glog.Errorf("Error re-connecting: %s", err) | |
304 » » » » time.Sleep(time.Second) | |
305 » » » } | |
306 » » } | |
307 » }() | |
308 » <-readyCh | |
309 | |
310 » return ret, nil | |
311 } | 311 } |
312 | 312 |
313 // enqueue adds the given task and priority to the task queue. Updating the | 313 // enqueue adds the given task and priority to the task queue. Updating the |
314 // priority if necessary. | 314 // priority if necessary. |
315 func (r *RedisRTC) enqueue(id string, priority int64) (bool, error) { | 315 func (r *RedisRTC) enqueue(id string, priority int64) (bool, error) { |
316 c := r.redisPool.Get() | 316 c := r.redisPool.Get() |
317 defer util.Close(c) | 317 defer util.Close(c) |
318 | 318 |
319 util.LogErr(c.Send("MULTI")) | 319 util.LogErr(c.Send("MULTI")) |
320 util.LogErr(c.Send("ZSCORE", r.queueKey, id)) | 320 util.LogErr(c.Send("ZSCORE", r.queueKey, id)) |
(...skipping 14 matching lines...) Expand all Loading... | |
335 if !isInProgress && !found { | 335 if !isInProgress && !found { |
336 saveId := true | 336 saveId := true |
337 | 337 |
338 // Only update the queue if this has a lower score. | 338 // Only update the queue if this has a lower score. |
339 if inQueueScore != nil { | 339 if inQueueScore != nil { |
340 oldPriority, _ := redis.Int64(inQueueScore, nil) | 340 oldPriority, _ := redis.Int64(inQueueScore, nil) |
341 saveId = priority < oldPriority | 341 saveId = priority < oldPriority |
342 } | 342 } |
343 | 343 |
344 if saveId { | 344 if saveId { |
345 » » » if _, err = c.Do("ZADD", r.queueKey, priority, id); err != nil { | 345 » » » util.LogErr(c.Send("MULTI")) |
346 » » » util.LogErr(c.Send("ZADD", r.queueKey, priority, id)) | |
347 » » » util.LogErr(c.Send("RPUSH", r.workReadyKey, []byte("W")) ) | |
348 » » » if _, err = c.Do("EXEC"); err != nil { | |
346 return false, err | 349 return false, err |
347 } | 350 } |
348 } | 351 } |
349 } | 352 } |
350 | 353 |
351 return found, nil | 354 return found, nil |
352 } | 355 } |
353 | 356 |
354 // inQueue returns up to 'maxElements' ids that are currently in the work | 357 // inQueue returns up to 'maxElements' ids that are currently in the work |
355 // queue. This is primarily for testing. | 358 // queue. This is primarily for testing. |
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
388 | 391 |
389 // startWorkScheduler starts the background progress that takes tasks | 392 // startWorkScheduler starts the background progress that takes tasks |
390 // from the doneCh and adds them to the Redis priority queue. | 393 // from the doneCh and adds them to the Redis priority queue. |
391 func (r *RedisRTC) startWorkScheduler() (chan<- *doneSub, error) { | 394 func (r *RedisRTC) startWorkScheduler() (chan<- *doneSub, error) { |
392 finishedCh, err := r.redisPool.subscribeToChannel(r.finishedChannel) | 395 finishedCh, err := r.redisPool.subscribeToChannel(r.finishedChannel) |
393 if err != nil { | 396 if err != nil { |
394 return nil, err | 397 return nil, err |
395 } | 398 } |
396 | 399 |
397 doneCh := make(chan *doneSub) | 400 doneCh := make(chan *doneSub) |
401 r.shutdownWg.Add(1) | |
398 go func() { | 402 go func() { |
399 watchIds := map[string][]chan bool{} | 403 watchIds := map[string][]chan bool{} |
400 notifyChannels := func(id string) { | 404 notifyChannels := func(id string) { |
401 for _, ch := range watchIds[id] { | 405 for _, ch := range watchIds[id] { |
402 ch <- true | 406 ch <- true |
403 close(ch) | 407 close(ch) |
404 } | 408 } |
405 delete(watchIds, id) | 409 delete(watchIds, id) |
406 } | 410 } |
407 | 411 |
408 notifyAll := func() { | 412 notifyAll := func() { |
409 for id := range watchIds { | 413 for id := range watchIds { |
410 if r.isFinished(id) { | 414 if r.isFinished(id) { |
411 notifyChannels(id) | 415 notifyChannels(id) |
412 } | 416 } |
413 } | 417 } |
414 } | 418 } |
415 | 419 |
420 WorkLoop: | |
416 for { | 421 for { |
417 select { | 422 select { |
423 case <-r.shutdownCh: | |
424 break WorkLoop | |
418 case subscription := <-doneCh: | 425 case subscription := <-doneCh: |
419 watchIds[subscription.id] = append(watchIds[subs cription.id], subscription.notifyCh) | 426 watchIds[subscription.id] = append(watchIds[subs cription.id], subscription.notifyCh) |
420 if r.isFinished(subscription.id) { | 427 if r.isFinished(subscription.id) { |
421 notifyChannels(subscription.id) | 428 notifyChannels(subscription.id) |
422 } | 429 } |
423 case finishedId := <-finishedCh: | 430 case finishedId := <-finishedCh: |
424 // An emtpy string indicates that we (re)connect ed. | 431 // An emtpy string indicates that we (re)connect ed. |
425 if string(finishedId) == "" { | 432 if string(finishedId) == "" { |
426 notifyAll() | 433 notifyAll() |
427 } else { | 434 } else { |
428 notifyChannels(string(finishedId)) | 435 notifyChannels(string(finishedId)) |
429 } | 436 } |
430 } | 437 } |
431 } | 438 } |
439 r.shutdownWg.Done() | |
432 }() | 440 }() |
433 | 441 |
434 return doneCh, nil | 442 return doneCh, nil |
435 } | 443 } |
436 | 444 |
437 // waitFor blocks until the key identified by id is available in Redis. | 445 // waitFor blocks until the key identified by id is available in Redis. |
438 func (r *RedisRTC) waitFor(id string, priority int64, returnBytes bool) (interfa ce{}, error) { | 446 func (r *RedisRTC) waitFor(id string, priority int64, returnBytes bool) (interfa ce{}, error) { |
439 var found bool | 447 var found bool |
440 var err error | 448 var err error |
441 if found, err = r.enqueue(id, priority); err != nil { | 449 if found, err = r.enqueue(id, priority); err != nil { |
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
492 // key returns the Redis key for the given id. | 500 // key returns the Redis key for the given id. |
493 func (r *RedisRTC) key(id string) string { | 501 func (r *RedisRTC) key(id string) string { |
494 return r.keyPrefix + id | 502 return r.keyPrefix + id |
495 } | 503 } |
496 | 504 |
497 // errorKey returns the key of the error message for the given ID if the | 505 // errorKey returns the key of the error message for the given ID if the |
498 // the worker call failed. | 506 // the worker call failed. |
499 func (r *RedisRTC) errorKey(id string) string { | 507 func (r *RedisRTC) errorKey(id string) string { |
500 return r.errKeyPrefix + id | 508 return r.errKeyPrefix + id |
501 } | 509 } |
OLD | NEW |