| Index: logdog/appengine/coordinator/coordinatorTest/context.go
|
| diff --git a/logdog/appengine/coordinator/coordinatorTest/context.go b/logdog/appengine/coordinator/coordinatorTest/context.go
|
| index 7d227ee8ce41459cc9d6a20b8e5b7ee5be7e585e..f44ea749fb6472f87f1e1d38916e03b7817de2cc 100644
|
| --- a/logdog/appengine/coordinator/coordinatorTest/context.go
|
| +++ b/logdog/appengine/coordinator/coordinatorTest/context.go
|
| @@ -16,6 +16,7 @@ package coordinatorTest
|
|
|
| import (
|
| "fmt"
|
| + "path/filepath"
|
| "strings"
|
| "time"
|
|
|
| @@ -31,6 +32,7 @@ import (
|
| "github.com/luci/luci-go/logdog/api/config/svcconfig"
|
| "github.com/luci/luci-go/logdog/appengine/coordinator"
|
| "github.com/luci/luci-go/logdog/appengine/coordinator/config"
|
| + "github.com/luci/luci-go/logdog/appengine/coordinator/tasks"
|
| "github.com/luci/luci-go/logdog/common/storage/archive"
|
| "github.com/luci/luci-go/logdog/common/storage/bigtable"
|
| "github.com/luci/luci-go/luci_config/common/cfgtypes"
|
| @@ -40,11 +42,13 @@ import (
|
| "github.com/luci/luci-go/server/auth"
|
| "github.com/luci/luci-go/server/auth/authtest"
|
| "github.com/luci/luci-go/server/auth/identity"
|
| + "github.com/luci/luci-go/server/router"
|
| "github.com/luci/luci-go/server/settings"
|
| "github.com/luci/luci-go/tumble"
|
|
|
| ds "github.com/luci/gae/service/datastore"
|
| "github.com/luci/gae/service/info"
|
| + tq "github.com/luci/gae/service/taskqueue"
|
|
|
| "github.com/golang/protobuf/proto"
|
| "golang.org/x/net/context"
|
| @@ -157,6 +161,20 @@ func (e *Environment) addConfigEntry(configSet cfgtypes.ConfigSet, path, content
|
| cset[path] = content
|
| }
|
|
|
| +// RunTaskQueues processes all tasks in all task queues if they are available.
|
| +func (e *Environment) RunTaskQueues(c context.Context, tls *TestStream) {
|
| + r := router.New()
|
| +
|
| + tls.WithProjectNamespace(c, func(c context.Context) {
|
| + tasks.InstallHandlers(r, router.NewMiddlewareChain(func(ctx *router.Context, next router.Handler) {
|
| + ctx.Context = c
|
| + next(ctx)
|
| + }))
|
| +
|
| + drainTaskQueues(c, r)
|
| + })
|
| +}
|
| +
|
| // Install creates a testing Context and installs common test facilities into
|
| // it, returning the Environment to which they're bound.
|
| func Install() (context.Context, *Environment) {
|
| @@ -180,35 +198,16 @@ func Install() (context.Context, *Environment) {
|
| Cache: &e.StorageCache,
|
| })
|
|
|
| - // Add indexes. These should match the indexes defined in the application's
|
| - // "index.yaml".
|
| - indexDefs := [][]string{
|
| - {"Prefix", "-Created"},
|
| - {"Name", "-Created"},
|
| - {"State", "-Created"},
|
| - {"Purged", "-Created"},
|
| - {"ProtoVersion", "-Created"},
|
| - {"ContentType", "-Created"},
|
| - {"StreamType", "-Created"},
|
| - {"Timestamp", "-Created"},
|
| - {"_C", "-Created"},
|
| - {"_Tags", "-Created"},
|
| - {"_Terminated", "-Created"},
|
| - {"_Archived", "-Created"},
|
| - }
|
| - indexes := make([]*ds.IndexDefinition, len(indexDefs))
|
| - for i, id := range indexDefs {
|
| - cols := make([]ds.IndexColumn, len(id))
|
| - for j, ic := range id {
|
| - var err error
|
| - cols[j], err = ds.ParseIndexColumn(ic)
|
| - if err != nil {
|
| - panic(fmt.Errorf("failed to parse index %q: %s", ic, err))
|
| - }
|
| - }
|
| - indexes[i] = &ds.IndexDefinition{Kind: "LogStream", SortBy: cols}
|
| + // Register our task queues.
|
| + tq.GetTestable(c).CreateQueue(tasks.ArchivalTaskQueue)
|
| +
|
| + // Load indexes from "index.yaml".
|
| + mainServicePath := filepath.Join("..", "..", "..", "cmd", "coordinator", "vmuser")
|
| + indexDefs, err := ds.FindAndParseIndexYAML(mainServicePath)
|
| + if err != nil {
|
| + panic(fmt.Errorf("failed to load 'index.yaml': %s", err))
|
| }
|
| - ds.GetTestable(c).AddIndexes(indexes...)
|
| + ds.GetTestable(c).AddIndexes(indexDefs...)
|
|
|
| // Setup clock.
|
| e.Clock = clock.Get(c).(testclock.TestClock)
|
|
|