Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(273)

Side by Side Diff: go/redisutil/rtc_test.go

Issue 2200833004: Fixing RedisRTC implementation and test (Closed) Base URL: https://skia.googlesource.com/buildbot@master
Patch Set: Incorporated Feedback Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« go/redisutil/rtc.go ('K') | « go/redisutil/rtc.go ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 package redisutil 1 package redisutil
2 2
3 import ( 3 import (
4 "crypto/rand"
4 "fmt" 5 "fmt"
5 "runtime" 6 "runtime"
6 "strconv"
7 "sync" 7 "sync"
8 "testing" 8 "testing"
9 "time"
10
11 assert "github.com/stretchr/testify/require"
9 12
10 "go.skia.org/infra/go/testutils" 13 "go.skia.org/infra/go/testutils"
11
12 assert "github.com/stretchr/testify/require"
13 ) 14 )
14 15
15 const ( 16 const (
16 Q_NAME = "mytype" 17 Q_NAME = "mytype"
17 Q_NAME_PRIMITIVES = "mytestq" 18 Q_NAME_PRIMITIVES = "mytestq"
18 » N_TASKS = 10000 19 » N_TASKS = 1000
20 » PACKAGE_SIZE = 1024 * 512
19 ) 21 )
20 22
21 // TODO (stephana): Re-enable this test once we have a way to cleanly shutdown 23 // BytesCodec for testing.
22 // an instance of RedisRTC. 24 type BytesCodec struct{}
23 func xTestReadThroughCache(t *testing.T) { 25
26 func (b BytesCodec) Encode(data interface{}) ([]byte, error) {
27 » // Make a copy to simulate the generic case.
28 » return append([]byte(nil), data.([]byte)...), nil
29 }
30
31 func (b BytesCodec) Decode(byteData []byte) (interface{}, error) {
32 » // Make a copy to simulate the generic case.
33 » return append([]byte(nil), byteData...), nil
34 }
35
36 func TestReadThroughCache(t *testing.T) {
24 testutils.SkipIfShort(t) 37 testutils.SkipIfShort(t)
25 38
26 » runtime.GOMAXPROCS(runtime.NumCPU() - 1) 39 » rp := NewRedisPool(REDIS_SERVER_ADDRESS, REDIS_DB_RTCACHE)
40 » defer testutils.CloseInTest(t, rp)
27 41
28 rp := NewRedisPool(REDIS_SERVER_ADDRESS, REDIS_DB_RTCACHE)
29 assert.NoError(t, rp.FlushDB()) 42 assert.NoError(t, rp.FlushDB())
43 randBytes := make([]byte, PACKAGE_SIZE)
44 _, err := rand.Read(randBytes)
45 assert.NoError(t, err)
30 46
31 worker := func(priority int64, id string) (interface{}, error) { 47 worker := func(priority int64, id string) (interface{}, error) {
32 » » // Run a few calculations in a loop. 48 » » // Create a unique version of the random array.
33 » » result := 0 49 » » return []byte(id + string(randBytes)), nil
34 » » for i := 0; i < 10; i++ {
35 » » » result += i
36 » » }
37
38 » » // Do the work
39 » » return id + "-" + strconv.Itoa(result), nil
40 } 50 }
41 51
42 // create a worker queue for a given type 52 // create a worker queue for a given type
43 » codec := StringCodec{} 53 » codec := BytesCodec{}
44 » qRet, err := NewReadThroughCache(rp, Q_NAME, nil, codec, runtime.NumCPU( )-2) 54 » qRet, err := NewReadThroughCache(rp, Q_NAME, worker, codec, runtime.NumC PU()-2)
45 assert.NoError(t, err) 55 assert.NoError(t, err)
46 q := qRet.(*RedisRTC) 56 q := qRet.(*RedisRTC)
57 defer q.shutdown()
58
59 // Wait for 2 seconds to make sure wait for work function times out
dogben 2016/08/09 13:50:49 "2 seconds" vs. "1 * time.Second" below
60 // at least once and queries the queueu directly.
dogben 2016/08/09 13:50:49 typo: "queueu"
61 time.Sleep(1 * time.Second)
47 62
48 // make sure all results arrive. 63 // make sure all results arrive.
49 var allDone sync.WaitGroup 64 var allDone sync.WaitGroup
50 retCh := make(chan interface{}, N_TASKS) 65 retCh := make(chan interface{}, N_TASKS)
51 errCh := make(chan error, N_TASKS) 66 errCh := make(chan error, N_TASKS)
52 67
53 for i := 0; i < N_TASKS; i++ { 68 for i := 0; i < N_TASKS; i++ {
54 allDone.Add(1) 69 allDone.Add(1)
55 go func(idx, priority int) { 70 go func(idx, priority int) {
56 » » » id := "id-" + strconv.Itoa(idx) 71 » » » id := "id-" + fmt.Sprintf("%04d", idx)
57 result, err := q.Get(int64(priority), false, id) 72 result, err := q.Get(int64(priority), false, id)
58 if err != nil { 73 if err != nil {
59 errCh <- err 74 errCh <- err
60 } else { 75 } else {
61 retCh <- result 76 retCh <- result
62 } 77 }
63 78
64 allDone.Done() 79 allDone.Done()
65 }(i, i) 80 }(i, i)
66 } 81 }
67
68 q.worker = worker
69 assert.NoError(t, q.startWorkers(runtime.NumCPU()-2))
70 allDone.Wait() 82 allDone.Wait()
71 83
72 close(errCh) 84 close(errCh)
73 close(retCh) 85 close(retCh)
74 86
75 if len(errCh) > 0 { 87 if len(errCh) > 0 {
76 for err := range errCh { 88 for err := range errCh {
77 fmt.Printf("Error: %s", err) 89 fmt.Printf("Error: %s", err)
78 } 90 }
79 » » assert.True(t, false) 91 » » assert.Fail(t, "Received above error messages.")
80 } 92 }
81 93
82 assert.Equal(t, 0, len(errCh)) 94 assert.Equal(t, 0, len(errCh))
83 found := make(map[string]bool, N_TASKS) 95 found := make(map[string]bool, N_TASKS)
84 for ret := range retCh { 96 for ret := range retCh {
85 » » assert.IsType(t, "", ret) 97 » » assert.IsType(t, []byte(""), ret)
86 » » found[ret.(string)] = true 98
99 » » // Add the prefix size to PACKAGE_SIZE to account for prefix add ed above.
100 » » assert.Equal(t, PACKAGE_SIZE+7, len(ret.([]byte)))
101 » » found[string(ret.([]byte))] = true
87 } 102 }
88 103
104 // Make sure all strings are unique.
89 assert.Equal(t, N_TASKS, len(found)) 105 assert.Equal(t, N_TASKS, len(found))
90 } 106 }
OLDNEW
« go/redisutil/rtc.go ('K') | « go/redisutil/rtc.go ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698