Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(303)

Unified Diff: common/gcloud/pubsub/subscriber/subscriber_test.go

Issue 1838303002: Use native Pub/Sub library primitives. (Closed) Base URL: https://github.com/luci/luci-go@logdog-go1.6
Patch Set: Use "Topic" instead of "NewTopic" ... don't want to create :) Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « common/gcloud/pubsub/subscriber/subscriber.go ('k') | common/tsmon/iface.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: common/gcloud/pubsub/subscriber/subscriber_test.go
diff --git a/common/gcloud/pubsub/subscriber/subscriber_test.go b/common/gcloud/pubsub/subscriber/subscriber_test.go
deleted file mode 100644
index 0eb773ad6a6fee9989ff5ac637dc4865a343ee0f..0000000000000000000000000000000000000000
--- a/common/gcloud/pubsub/subscriber/subscriber_test.go
+++ /dev/null
@@ -1,215 +0,0 @@
-// Copyright 2015 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-package subscriber
-
-import (
- "fmt"
- "sort"
- "sync"
- "sync/atomic"
- "testing"
- "time"
-
- "github.com/luci/luci-go/common/clock"
- "github.com/luci/luci-go/common/clock/testclock"
- "github.com/luci/luci-go/common/errors"
- "github.com/luci/luci-go/common/gcloud/pubsub"
- "golang.org/x/net/context"
-
- . "github.com/smartystreets/goconvey/convey"
-)
-
-type event struct {
- msg *pubsub.Message
- err error
-}
-
-type testSource struct {
- sub pubsub.Subscription
- eventC chan event
-}
-
-func (s *testSource) Pull(c context.Context, batch int) ([]*pubsub.Message, error) {
- var e event
-
- select {
- case <-c.Done():
- // Enforce determinism, preferring events.
- select {
- case e = <-s.eventC:
- break
- default:
- return nil, c.Err()
- }
-
- case e = <-s.eventC:
- break
- }
-
- switch {
- case e.err != nil:
- return nil, e.err
-
- case e.msg != nil:
- return []*pubsub.Message{e.msg}, nil
-
- default:
- return nil, nil
- }
-}
-
-func (s *testSource) message(id ...string) {
- for _, v := range id {
- if v != "" {
- s.eventC <- event{msg: &pubsub.Message{
- ID: v,
- AckID: v,
- Data: []byte(v),
- }}
- } else {
- s.eventC <- event{}
- }
- }
-}
-
-func (s *testSource) error(err error) {
- s.eventC <- event{err: err}
-}
-
-type testACK struct {
- sync.Mutex
-
- acks map[string]struct{}
-}
-
-func (a *testACK) Ack(id string) {
- a.Lock()
- defer a.Unlock()
-
- if a.acks == nil {
- a.acks = make(map[string]struct{})
- }
- a.acks[id] = struct{}{}
-}
-
-func (a *testACK) getACKs() []string {
- a.Lock()
- defer a.Unlock()
- return dumpStringSet(a.acks)
-}
-
-func dumpStringSet(s map[string]struct{}) []string {
- v := make([]string, 0, len(s))
- for a := range s {
- v = append(v, a)
- }
- sort.Strings(v)
- return v
-}
-
-func TestSubscriber(t *testing.T) {
- t.Parallel()
-
- Convey(`A Subscriber configuration using a testing Pub/Sub`, t, func() {
- c := context.Background()
- c, tc := testclock.UseTime(c, testclock.TestTimeLocal)
-
- c, cancelFunc := context.WithCancel(c)
- defer cancelFunc()
-
- src := &testSource{
- eventC: make(chan event),
- }
- ack := &testACK{}
- s := Subscriber{
- S: src,
- A: ack,
- Workers: 8,
- }
-
- // If not nil, processing goroutines will block on reads from this
- // channel, one per message.
- var pullC chan string
-
- var seenMu sync.Mutex
- seen := map[string]struct{}{}
- blacklist := map[string]struct{}{}
- runWith := func(cb func()) {
- doneC := make(chan struct{})
- go func() {
- defer close(doneC)
- s.Run(c, func(msg *pubsub.Message) bool {
- if pullC != nil {
- pullC <- msg.ID
- }
-
- seenMu.Lock()
- defer seenMu.Unlock()
- seen[msg.ID] = struct{}{}
-
- _, ok := blacklist[msg.ID]
- return !ok
- })
- }()
-
- cb()
- cancelFunc()
- <-doneC
- }
-
- Convey(`A Subscriber can pull and ACK messages.`, func() {
- var msgs []string
- pullC = make(chan string)
- runWith(func() {
- for i := 0; i < 1024; i++ {
- v := fmt.Sprintf("%08x", i)
- msgs = append(msgs, v)
- src.message(v)
-
- <-pullC
- }
- })
-
- So(dumpStringSet(seen), ShouldResemble, msgs)
- So(ack.getACKs(), ShouldResemble, msgs)
- })
-
- Convey(`A Subscriber that encounters an empty message set will sleep and try again.`, func() {
- var count int32
- tc.SetTimerCallback(func(d time.Duration, t clock.Timer) {
- if atomic.AddInt32(&count, 1) > 1 {
- panic("should not have this many callbacks")
- }
- tc.Add(d)
- })
-
- runWith(func() {
- src.message("a", "b", "", "c", "d")
- })
-
- So(dumpStringSet(seen), ShouldResemble, []string{"a", "b", "c", "d"})
- So(ack.getACKs(), ShouldResemble, []string{"a", "b", "c", "d"})
- })
-
- Convey(`A Subscriber that encounters a Source error will sleep and try again.`, func() {
- var count int32
- tc.SetTimerCallback(func(d time.Duration, t clock.Timer) {
- if atomic.AddInt32(&count, 1) > 1 {
- panic("should not have this many callbacks")
- }
- tc.Add(d)
- })
-
- runWith(func() {
- src.message("a", "b")
- src.error(errors.New("test error"))
- src.message("c", "d")
- })
-
- So(dumpStringSet(seen), ShouldResemble, []string{"a", "b", "c", "d"})
- So(ack.getACKs(), ShouldResemble, []string{"a", "b", "c", "d"})
- })
- })
-}
« no previous file with comments | « common/gcloud/pubsub/subscriber/subscriber.go ('k') | common/tsmon/iface.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698