| Index: appengine/cmd/dm/distributor/registry.go
|
| diff --git a/appengine/cmd/dm/distributor/registry.go b/appengine/cmd/dm/distributor/registry.go
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..86a4a18216dd73dca7797e3967bb53bab2a893cf
|
| --- /dev/null
|
| +++ b/appengine/cmd/dm/distributor/registry.go
|
| @@ -0,0 +1,179 @@
|
| +// Copyright 2016 The LUCI Authors. All rights reserved.
|
| +// Use of this source code is governed under the Apache License, Version 2.0
|
| +// that can be found in the LICENSE file.
|
| +
|
| +package distributor
|
| +
|
| +import (
|
| + "errors"
|
| + "fmt"
|
| + "net/url"
|
| + "reflect"
|
| +
|
| + "github.com/golang/protobuf/proto"
|
| + "github.com/luci/gae/service/info"
|
| + "github.com/luci/luci-go/appengine/tumble"
|
| + "github.com/luci/luci-go/common/api/dm/distributor"
|
| + "github.com/luci/luci-go/common/api/dm/service/v1"
|
| + "github.com/luci/luci-go/common/config"
|
| + "github.com/luci/luci-go/common/logging"
|
| + "golang.org/x/net/context"
|
| +)
|
| +
|
| +var regKey = "holds a DM Distributor Registry"
|
| +
|
| +// WithRegistry adds the registry to the Context.
|
| +func WithRegistry(c context.Context, r Registry) context.Context {
|
| + if r == nil {
|
| + panic(errors.New("you may not use WithRegistry on a nil Registry"))
|
| + }
|
| + return context.WithValue(c, ®Key, r)
|
| +}
|
| +
|
| +// GetRegistry gets the registry from the Context. This will return nil if the
|
| +// Context does not contain a Registry.
|
| +func GetRegistry(c context.Context) Registry {
|
| + ret, _ := c.Value(®Key).(Registry)
|
| + return ret
|
| +}
|
| +
|
| +// FinishExecutionFn is required to eliminate a circular dependency
|
| +// between mutate <-> distributor. Essentially this just makes a new
|
| +// mutate.FinishExecution.
|
| +//
|
| +// See mutate.FinishExecutionFn for the only actual implementation of this.
|
| +type FinishExecutionFn func(c context.Context, eid *dm.Execution_ID, rslt *TaskResult) ([]tumble.Mutation, error)
|
| +
|
| +// Registry holds a collection of all of the available distributor types.
|
| +type Registry interface {
|
| + FinishExecution(c context.Context, eid *dm.Execution_ID, rslt *TaskResult) ([]tumble.Mutation, error)
|
| +
|
| + // MakeDistributor builds a distributor instance that's configured with the
|
| + // provided config.
|
| + //
|
| + // The configuration for this distributor are obtained from luci-config at the
|
| + // time an Execution is started.
|
| + MakeDistributor(c context.Context, cfgName string) (d D, ver string, err error)
|
| +}
|
| +
|
| +// NewRegistry builds a new implementation of Registry configured to load
|
| +// configuration data from luci-config.
|
| +//
|
| +// The mapping should hold nil-ptrs of various config protos -> respective
|
| +// Factory. When loading from luci-config, when we see a given message type,
|
| +// we'll construct the distributor instance using the provided Factory.
|
| +func NewRegistry(mapping map[proto.Message]Factory, fFn FinishExecutionFn) Registry {
|
| + ret := ®istry{fFn, make(map[reflect.Type]Factory, len(mapping))}
|
| + add := func(p proto.Message, factory Factory) {
|
| + if factory == nil {
|
| + panic("factory is nil")
|
| + }
|
| + if p == nil {
|
| + panic("proto.Message is nil")
|
| + }
|
| +
|
| + typ := reflect.TypeOf(p)
|
| +
|
| + if _, ok := ret.data[typ]; ok {
|
| + panic(fmt.Errorf("trying to register %q twice", typ))
|
| + }
|
| + ret.data[typ] = factory
|
| + }
|
| + for p, f := range mapping {
|
| + add(p, f)
|
| + }
|
| + return ret
|
| +}
|
| +
|
| +type registry struct {
|
| + finishExecutionImpl FinishExecutionFn
|
| + data map[reflect.Type]Factory
|
| +}
|
| +
|
| +var _ Registry = (*registry)(nil)
|
| +
|
| +func (r *registry) FinishExecution(c context.Context, eid *dm.Execution_ID, rslt *TaskResult) ([]tumble.Mutation, error) {
|
| + return r.finishExecutionImpl(c, eid, rslt)
|
| +}
|
| +
|
| +func (r *registry) MakeDistributor(c context.Context, cfgName string) (d D, ver string, err error) {
|
| + cfg, err := loadConfig(c, cfgName)
|
| + if err != nil {
|
| + logging.Fields{"error": err, "cfg": cfgName}.Errorf(c, "Failed to load config")
|
| + return
|
| + }
|
| +
|
| + ver = cfg.Version
|
| +
|
| + typ := reflect.TypeOf(cfg.Content)
|
| +
|
| + fn, ok := r.data[typ]
|
| + if !ok {
|
| + return nil, "", fmt.Errorf("unknown distributor type %T", cfg.Content)
|
| + }
|
| +
|
| + d, err = fn(c, cfg)
|
| + return
|
| +}
|
| +
|
| +// loadConfig loads the named distributor configuration from luci-config,
|
| +// possibly using the in-memory or memcache version.
|
| +func loadConfig(c context.Context, cfgName string) (ret *Config, err error) {
|
| + aid := info.Get(c).TrimmedAppID()
|
| + cfgSvc := config.Get(c)
|
| + distCfgObj, err := cfgSvc.GetConfig(fmt.Sprintf("services/%s", aid), "distributors.cfg", false)
|
| + if err != nil {
|
| + return
|
| + }
|
| +
|
| + cfgVersion := distCfgObj.Revision
|
| + distCfg := &distributor.Config{}
|
| + if err = proto.UnmarshalText(distCfgObj.Content, distCfg); err != nil {
|
| + return
|
| + }
|
| +
|
| + cfg, ok := distCfg.DistributorConfigs[cfgName]
|
| + if !ok {
|
| + err = fmt.Errorf("unknown distributor configuration: %q", cfgName)
|
| + return
|
| + }
|
| + if alias := cfg.GetAlias(); alias != nil {
|
| + cfg, ok = distCfg.DistributorConfigs[alias.OtherConfig]
|
| + if !ok {
|
| + err = fmt.Errorf("unknown distributor configuration: %q (via alias %q)", cfgName, alias.OtherConfig)
|
| + return
|
| + }
|
| + if cfg.GetAlias() != nil {
|
| + err = fmt.Errorf("too many levels of indirection for alias %q (points to alias %q)", cfgName, alias.OtherConfig)
|
| + return
|
| + }
|
| + }
|
| +
|
| + dt := cfg.DistributorType
|
| + if dt == nil {
|
| + err = fmt.Errorf("blank or unrecognized distributor_type")
|
| + return
|
| + }
|
| + dVal := reflect.ValueOf(dt)
|
| +
|
| + // All non-nil DistributorType's have a single field which is the actual oneof
|
| + // value.
|
| + implConfig := dVal.Elem().Field(0).Interface().(proto.Message)
|
| +
|
| + inf := info.Get(c)
|
| + scheme := "https"
|
| + if inf.IsDevAppServer() {
|
| + scheme = "http"
|
| + }
|
| +
|
| + ret = &Config{
|
| + &url.URL{
|
| + Scheme: scheme,
|
| + Host: inf.DefaultVersionHostname(),
|
| + },
|
| + cfgName,
|
| + cfgVersion,
|
| + implConfig,
|
| + }
|
| + return
|
| +}
|
|
|