Index: go/redisutil/rtc_test.go |
diff --git a/go/redisutil/rtc_test.go b/go/redisutil/rtc_test.go |
index c3e0494cb7f64d025bdf6c3fcacb3a1885923b4a..0abee09b6a0d5a62dccd1b5eab8e6893e28c9f95 100644 |
--- a/go/redisutil/rtc_test.go |
+++ b/go/redisutil/rtc_test.go |
@@ -1,49 +1,64 @@ |
package redisutil |
import ( |
+ "crypto/rand" |
"fmt" |
"runtime" |
- "strconv" |
"sync" |
"testing" |
- |
- "go.skia.org/infra/go/testutils" |
+ "time" |
assert "github.com/stretchr/testify/require" |
+ |
+ "go.skia.org/infra/go/testutils" |
) |
const ( |
Q_NAME = "mytype" |
Q_NAME_PRIMITIVES = "mytestq" |
- N_TASKS = 10000 |
+ N_TASKS = 1000 |
+ PACKAGE_SIZE = 1024 * 512 |
) |
-// TODO (stephana): Re-enable this test once we have a way to cleanly shutdown |
-// an instance of RedisRTC. |
-func xTestReadThroughCache(t *testing.T) { |
- testutils.SkipIfShort(t) |
+// BytesCodec for testing. |
+type BytesCodec struct{} |
- runtime.GOMAXPROCS(runtime.NumCPU() - 1) |
+func (b BytesCodec) Encode(data interface{}) ([]byte, error) { |
+ // Make a copy to simulate the generic case. |
+ return append([]byte(nil), data.([]byte)...), nil |
+} |
+ |
+func (b BytesCodec) Decode(byteData []byte) (interface{}, error) { |
+ // Make a copy to simulate the generic case. |
+ return append([]byte(nil), byteData...), nil |
+} |
+ |
+func TestReadThroughCache(t *testing.T) { |
+ testutils.SkipIfShort(t) |
rp := NewRedisPool(REDIS_SERVER_ADDRESS, REDIS_DB_RTCACHE) |
+ defer testutils.CloseInTest(t, rp) |
+ |
assert.NoError(t, rp.FlushDB()) |
+ randBytes := make([]byte, PACKAGE_SIZE) |
+ _, err := rand.Read(randBytes) |
+ assert.NoError(t, err) |
worker := func(priority int64, id string) (interface{}, error) { |
- // Run a few calculations in a loop. |
- result := 0 |
- for i := 0; i < 10; i++ { |
- result += i |
- } |
- |
- // Do the work |
- return id + "-" + strconv.Itoa(result), nil |
+ // Create a unique version of the random array. |
+ return []byte(id + string(randBytes)), nil |
} |
// create a worker queue for a given type |
- codec := StringCodec{} |
- qRet, err := NewReadThroughCache(rp, Q_NAME, nil, codec, runtime.NumCPU()-2) |
+ codec := BytesCodec{} |
+ qRet, err := NewReadThroughCache(rp, Q_NAME, worker, codec, runtime.NumCPU()-2) |
assert.NoError(t, err) |
q := qRet.(*RedisRTC) |
+ defer q.shutdown() |
+ |
+ // Wait for 2 seconds to make sure wait for work function times out |
dogben
2016/08/09 13:50:49
"2 seconds" vs. "1 * time.Second" below
|
+ // at least once and queries the queueu directly. |
dogben
2016/08/09 13:50:49
typo: "queueu"
|
+ time.Sleep(1 * time.Second) |
// make sure all results arrive. |
var allDone sync.WaitGroup |
@@ -53,7 +68,7 @@ func xTestReadThroughCache(t *testing.T) { |
for i := 0; i < N_TASKS; i++ { |
allDone.Add(1) |
go func(idx, priority int) { |
- id := "id-" + strconv.Itoa(idx) |
+ id := "id-" + fmt.Sprintf("%04d", idx) |
result, err := q.Get(int64(priority), false, id) |
if err != nil { |
errCh <- err |
@@ -64,9 +79,6 @@ func xTestReadThroughCache(t *testing.T) { |
allDone.Done() |
}(i, i) |
} |
- |
- q.worker = worker |
- assert.NoError(t, q.startWorkers(runtime.NumCPU()-2)) |
allDone.Wait() |
close(errCh) |
@@ -76,15 +88,19 @@ func xTestReadThroughCache(t *testing.T) { |
for err := range errCh { |
fmt.Printf("Error: %s", err) |
} |
- assert.True(t, false) |
+ assert.Fail(t, "Received above error messages.") |
} |
assert.Equal(t, 0, len(errCh)) |
found := make(map[string]bool, N_TASKS) |
for ret := range retCh { |
- assert.IsType(t, "", ret) |
- found[ret.(string)] = true |
+ assert.IsType(t, []byte(""), ret) |
+ |
+ // Add the prefix size to PACKAGE_SIZE to account for prefix added above. |
+ assert.Equal(t, PACKAGE_SIZE+7, len(ret.([]byte))) |
+ found[string(ret.([]byte))] = true |
} |
+ // Make sure all strings are unique. |
assert.Equal(t, N_TASKS, len(found)) |
} |