Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 package redisutil | 1 package redisutil |
| 2 | 2 |
| 3 import ( | 3 import ( |
| 4 "crypto/rand" | |
| 4 "fmt" | 5 "fmt" |
| 5 "runtime" | 6 "runtime" |
| 6 "strconv" | |
| 7 "sync" | 7 "sync" |
| 8 "testing" | 8 "testing" |
| 9 | 9 |
| 10 assert "github.com/stretchr/testify/require" | |
| 11 | |
| 10 "go.skia.org/infra/go/testutils" | 12 "go.skia.org/infra/go/testutils" |
| 11 | |
| 12 assert "github.com/stretchr/testify/require" | |
| 13 ) | 13 ) |
| 14 | 14 |
| 15 const ( | 15 const ( |
| 16 Q_NAME = "mytype" | 16 Q_NAME = "mytype" |
| 17 Q_NAME_PRIMITIVES = "mytestq" | 17 Q_NAME_PRIMITIVES = "mytestq" |
| 18 » N_TASKS = 10000 | 18 » N_TASKS = 1 |
|
dogben
2016/08/08 21:18:24
Did you intend to test only one task? Seems like t
stephana
2016/08/08 21:46:55
Good catch. That was for debugging. Changed to 100
| |
| 19 » PACKAGE_SIZE = 1024 * 512 | |
| 19 ) | 20 ) |
| 20 | 21 |
| 22 // BytesCodec for testing. | |
| 23 type BytesCodec struct{} | |
| 24 | |
| 25 func (b BytesCodec) Encode(data interface{}) ([]byte, error) { | |
| 26 // Make a copy to simulate the generic case. | |
| 27 return append([]byte(nil), data.([]byte)...), nil | |
| 28 } | |
| 29 | |
| 30 func (b BytesCodec) Decode(byteData []byte) (interface{}, error) { | |
| 31 // Make a copy to simulate the generic case. | |
| 32 return append([]byte(nil), byteData...), nil | |
| 33 } | |
| 34 | |
| 21 // TODO (stephana): Re-enable this test once we have a way to cleanly shutdown | 35 // TODO (stephana): Re-enable this test once we have a way to cleanly shutdown |
|
dogben
2016/08/08 21:18:24
delete?
stephana
2016/08/08 21:46:55
Done.
| |
| 22 // an instance of RedisRTC. | 36 // an instance of RedisRTC. |
| 23 func xTestReadThroughCache(t *testing.T) { | 37 func TestReadThroughCache(t *testing.T) { |
| 24 testutils.SkipIfShort(t) | 38 testutils.SkipIfShort(t) |
| 25 | 39 |
| 26 » runtime.GOMAXPROCS(runtime.NumCPU() - 1) | 40 » rp := NewRedisPool(REDIS_SERVER_ADDRESS, REDIS_DB_RTCACHE) |
| 41 » defer testutils.CloseInTest(t, rp) | |
| 27 | 42 |
| 28 rp := NewRedisPool(REDIS_SERVER_ADDRESS, REDIS_DB_RTCACHE) | |
| 29 assert.NoError(t, rp.FlushDB()) | 43 assert.NoError(t, rp.FlushDB()) |
| 44 randBytes := make([]byte, PACKAGE_SIZE) | |
| 45 _, err := rand.Read(randBytes) | |
| 30 | 46 |
|
dogben
2016/08/08 21:18:24
Do you want assert.NoError here, and not return er
stephana
2016/08/08 21:46:55
Done.
| |
| 31 worker := func(priority int64, id string) (interface{}, error) { | 47 worker := func(priority int64, id string) (interface{}, error) { |
| 32 » » // Run a few calculations in a loop. | 48 » » // Create a unique version of the random array. |
| 33 » » result := 0 | 49 » » return []byte(id + string(randBytes)), err |
| 34 » » for i := 0; i < 10; i++ { | |
| 35 » » » result += i | |
| 36 » » } | |
| 37 | |
| 38 » » // Do the work | |
| 39 » » return id + "-" + strconv.Itoa(result), nil | |
| 40 } | 50 } |
| 41 | 51 |
| 42 // create a worker queue for a given type | 52 // create a worker queue for a given type |
| 43 » codec := StringCodec{} | 53 » // codec := StringCodec{} |
|
dogben
2016/08/08 21:18:24
nit: delete
stephana
2016/08/08 21:46:55
Done.
| |
| 44 » qRet, err := NewReadThroughCache(rp, Q_NAME, nil, codec, runtime.NumCPU( )-2) | 54 » codec := BytesCodec{} |
| 55 » qRet, err := NewReadThroughCache(rp, Q_NAME, worker, codec, runtime.NumC PU()-2) | |
| 45 assert.NoError(t, err) | 56 assert.NoError(t, err) |
| 46 q := qRet.(*RedisRTC) | 57 q := qRet.(*RedisRTC) |
| 58 defer q.shutdown() | |
| 47 | 59 |
| 48 // make sure all results arrive. | 60 // make sure all results arrive. |
| 49 var allDone sync.WaitGroup | 61 var allDone sync.WaitGroup |
| 50 retCh := make(chan interface{}, N_TASKS) | 62 retCh := make(chan interface{}, N_TASKS) |
| 51 errCh := make(chan error, N_TASKS) | 63 errCh := make(chan error, N_TASKS) |
| 52 | 64 |
| 53 for i := 0; i < N_TASKS; i++ { | 65 for i := 0; i < N_TASKS; i++ { |
| 54 allDone.Add(1) | 66 allDone.Add(1) |
| 55 go func(idx, priority int) { | 67 go func(idx, priority int) { |
| 56 » » » id := "id-" + strconv.Itoa(idx) | 68 » » » id := "id-" + fmt.Sprintf("%04d", idx) |
| 57 result, err := q.Get(int64(priority), false, id) | 69 result, err := q.Get(int64(priority), false, id) |
| 58 if err != nil { | 70 if err != nil { |
| 59 errCh <- err | 71 errCh <- err |
| 60 } else { | 72 } else { |
| 61 retCh <- result | 73 retCh <- result |
| 62 } | 74 } |
| 63 | 75 |
| 64 allDone.Done() | 76 allDone.Done() |
| 65 }(i, i) | 77 }(i, i) |
| 66 } | 78 } |
| 67 | |
| 68 q.worker = worker | |
| 69 assert.NoError(t, q.startWorkers(runtime.NumCPU()-2)) | |
| 70 allDone.Wait() | 79 allDone.Wait() |
| 71 | 80 |
| 72 close(errCh) | 81 close(errCh) |
| 73 close(retCh) | 82 close(retCh) |
| 74 | 83 |
| 75 if len(errCh) > 0 { | 84 if len(errCh) > 0 { |
| 76 for err := range errCh { | 85 for err := range errCh { |
| 77 fmt.Printf("Error: %s", err) | 86 fmt.Printf("Error: %s", err) |
| 78 } | 87 } |
| 79 » » assert.True(t, false) | 88 » » assert.Fail(t, "Received above error messages.") |
| 80 } | 89 } |
| 81 | 90 |
| 82 assert.Equal(t, 0, len(errCh)) | 91 assert.Equal(t, 0, len(errCh)) |
| 83 found := make(map[string]bool, N_TASKS) | 92 found := make(map[string]bool, N_TASKS) |
| 84 for ret := range retCh { | 93 for ret := range retCh { |
| 85 » » assert.IsType(t, "", ret) | 94 » » assert.IsType(t, []byte(""), ret) |
| 86 » » found[ret.(string)] = true | 95 » » assert.Equal(t, 1024*512+7, len(ret.([]byte))) |
|
dogben
2016/08/08 21:18:24
nit: maybe add a comment to explain this value
stephana
2016/08/08 21:46:55
Done.
| |
| 96 » » found[string(ret.([]byte))] = true | |
| 87 } | 97 } |
| 88 | 98 |
| 99 // Make sure all strings are unique. | |
| 89 assert.Equal(t, N_TASKS, len(found)) | 100 assert.Equal(t, N_TASKS, len(found)) |
| 90 } | 101 } |
| OLD | NEW |