OLD | NEW |
---|---|
1 package redisutil | 1 package redisutil |
2 | 2 |
3 import ( | 3 import ( |
4 "crypto/rand" | |
4 "fmt" | 5 "fmt" |
5 "runtime" | 6 "runtime" |
6 "strconv" | |
7 "sync" | 7 "sync" |
8 "testing" | 8 "testing" |
9 "time" | |
10 | |
11 assert "github.com/stretchr/testify/require" | |
9 | 12 |
10 "go.skia.org/infra/go/testutils" | 13 "go.skia.org/infra/go/testutils" |
11 | |
12 assert "github.com/stretchr/testify/require" | |
13 ) | 14 ) |
14 | 15 |
15 const ( | 16 const ( |
16 Q_NAME = "mytype" | 17 Q_NAME = "mytype" |
17 Q_NAME_PRIMITIVES = "mytestq" | 18 Q_NAME_PRIMITIVES = "mytestq" |
18 » N_TASKS = 10000 | 19 » N_TASKS = 1000 |
20 » PACKAGE_SIZE = 1024 * 512 | |
19 ) | 21 ) |
20 | 22 |
21 // TODO (stephana): Re-enable this test once we have a way to cleanly shutdown | 23 // BytesCodec for testing. |
22 // an instance of RedisRTC. | 24 type BytesCodec struct{} |
23 func xTestReadThroughCache(t *testing.T) { | 25 |
26 func (b BytesCodec) Encode(data interface{}) ([]byte, error) { | |
27 » // Make a copy to simulate the generic case. | |
28 » return append([]byte(nil), data.([]byte)...), nil | |
29 } | |
30 | |
31 func (b BytesCodec) Decode(byteData []byte) (interface{}, error) { | |
32 » // Make a copy to simulate the generic case. | |
33 » return append([]byte(nil), byteData...), nil | |
34 } | |
35 | |
36 func TestReadThroughCache(t *testing.T) { | |
24 testutils.SkipIfShort(t) | 37 testutils.SkipIfShort(t) |
25 | 38 |
26 » runtime.GOMAXPROCS(runtime.NumCPU() - 1) | 39 » rp := NewRedisPool(REDIS_SERVER_ADDRESS, REDIS_DB_RTCACHE) |
40 » defer testutils.CloseInTest(t, rp) | |
27 | 41 |
28 rp := NewRedisPool(REDIS_SERVER_ADDRESS, REDIS_DB_RTCACHE) | |
29 assert.NoError(t, rp.FlushDB()) | 42 assert.NoError(t, rp.FlushDB()) |
43 randBytes := make([]byte, PACKAGE_SIZE) | |
44 _, err := rand.Read(randBytes) | |
45 assert.NoError(t, err) | |
30 | 46 |
31 worker := func(priority int64, id string) (interface{}, error) { | 47 worker := func(priority int64, id string) (interface{}, error) { |
32 » » // Run a few calculations in a loop. | 48 » » // Create a unique version of the random array. |
33 » » result := 0 | 49 » » return []byte(id + string(randBytes)), nil |
34 » » for i := 0; i < 10; i++ { | |
35 » » » result += i | |
36 » » } | |
37 | |
38 » » // Do the work | |
39 » » return id + "-" + strconv.Itoa(result), nil | |
40 } | 50 } |
41 | 51 |
42 // create a worker queue for a given type | 52 // create a worker queue for a given type |
43 » codec := StringCodec{} | 53 » codec := BytesCodec{} |
44 » qRet, err := NewReadThroughCache(rp, Q_NAME, nil, codec, runtime.NumCPU( )-2) | 54 » qRet, err := NewReadThroughCache(rp, Q_NAME, worker, codec, runtime.NumC PU()-2) |
45 assert.NoError(t, err) | 55 assert.NoError(t, err) |
46 q := qRet.(*RedisRTC) | 56 q := qRet.(*RedisRTC) |
57 defer q.shutdown() | |
58 | |
59 // Wait for 2 seconds to make sure wait for work function times out | |
dogben
2016/08/09 13:50:49
"2 seconds" vs. "1 * time.Second" below
| |
60 // at least once and queries the queueu directly. | |
dogben
2016/08/09 13:50:49
typo: "queueu"
| |
61 time.Sleep(1 * time.Second) | |
47 | 62 |
48 // make sure all results arrive. | 63 // make sure all results arrive. |
49 var allDone sync.WaitGroup | 64 var allDone sync.WaitGroup |
50 retCh := make(chan interface{}, N_TASKS) | 65 retCh := make(chan interface{}, N_TASKS) |
51 errCh := make(chan error, N_TASKS) | 66 errCh := make(chan error, N_TASKS) |
52 | 67 |
53 for i := 0; i < N_TASKS; i++ { | 68 for i := 0; i < N_TASKS; i++ { |
54 allDone.Add(1) | 69 allDone.Add(1) |
55 go func(idx, priority int) { | 70 go func(idx, priority int) { |
56 » » » id := "id-" + strconv.Itoa(idx) | 71 » » » id := "id-" + fmt.Sprintf("%04d", idx) |
57 result, err := q.Get(int64(priority), false, id) | 72 result, err := q.Get(int64(priority), false, id) |
58 if err != nil { | 73 if err != nil { |
59 errCh <- err | 74 errCh <- err |
60 } else { | 75 } else { |
61 retCh <- result | 76 retCh <- result |
62 } | 77 } |
63 | 78 |
64 allDone.Done() | 79 allDone.Done() |
65 }(i, i) | 80 }(i, i) |
66 } | 81 } |
67 | |
68 q.worker = worker | |
69 assert.NoError(t, q.startWorkers(runtime.NumCPU()-2)) | |
70 allDone.Wait() | 82 allDone.Wait() |
71 | 83 |
72 close(errCh) | 84 close(errCh) |
73 close(retCh) | 85 close(retCh) |
74 | 86 |
75 if len(errCh) > 0 { | 87 if len(errCh) > 0 { |
76 for err := range errCh { | 88 for err := range errCh { |
77 fmt.Printf("Error: %s", err) | 89 fmt.Printf("Error: %s", err) |
78 } | 90 } |
79 » » assert.True(t, false) | 91 » » assert.Fail(t, "Received above error messages.") |
80 } | 92 } |
81 | 93 |
82 assert.Equal(t, 0, len(errCh)) | 94 assert.Equal(t, 0, len(errCh)) |
83 found := make(map[string]bool, N_TASKS) | 95 found := make(map[string]bool, N_TASKS) |
84 for ret := range retCh { | 96 for ret := range retCh { |
85 » » assert.IsType(t, "", ret) | 97 » » assert.IsType(t, []byte(""), ret) |
86 » » found[ret.(string)] = true | 98 |
99 » » // Add the prefix size to PACKAGE_SIZE to account for prefix add ed above. | |
100 » » assert.Equal(t, PACKAGE_SIZE+7, len(ret.([]byte))) | |
101 » » found[string(ret.([]byte))] = true | |
87 } | 102 } |
88 | 103 |
104 // Make sure all strings are unique. | |
89 assert.Equal(t, N_TASKS, len(found)) | 105 assert.Equal(t, N_TASKS, len(found)) |
90 } | 106 } |
OLD | NEW |