Chromium Code Reviews| Index: appengine/cmd/dm/distributor/registry.go |
| diff --git a/appengine/cmd/dm/distributor/registry.go b/appengine/cmd/dm/distributor/registry.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..a74879a3d3463bee67f89944ef5e5f109a7bfdaf |
| --- /dev/null |
| +++ b/appengine/cmd/dm/distributor/registry.go |
| @@ -0,0 +1,185 @@ |
| +// Copyright 2015 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +package distributor |
| + |
| +import ( |
| + "fmt" |
| + "reflect" |
| + "strings" |
| + |
| + "github.com/golang/protobuf/proto" |
| + "github.com/luci/gae/service/info" |
| + "github.com/luci/luci-go/appengine/tumble" |
| + "github.com/luci/luci-go/common/api/dm/distributor" |
| + "github.com/luci/luci-go/common/api/dm/service/v1" |
| + "github.com/luci/luci-go/common/config" |
| + "github.com/luci/luci-go/common/logging" |
| + "golang.org/x/net/context" |
| +) |
| + |
| +var regKey = "holds a Registry" |
|
dnj (Google)
2016/06/09 18:00:56
nit: "holds a DM Distributor Registry"
iannucci
2016/06/15 00:46:00
Done.
|
| + |
| +// WithRegistry adds the registry to the Context. |
| +func WithRegistry(c context.Context, r Registry) context.Context { |
| + if r == nil { |
| + return c |
|
dnj (Google)
2016/06/09 18:00:56
Perhaps this should panic? I think that this is cl
iannucci
2016/06/15 00:46:00
Done.
|
| + } |
| + return context.WithValue(c, ®Key, r) |
| +} |
| + |
| +// GetRegistry gets the registry from the Context. This will return nil if the |
| +// Context does not contain a Registry. |
| +func GetRegistry(c context.Context) Registry { |
| + ret, _ := c.Value(®Key).(Registry) |
| + return ret |
| +} |
| + |
| +// FinishExecutionFn is required to eliminate a circular dependency |
| +// between mutate <-> distributor. Essentially this just makes a new |
| +// mutate.FinishExecution. |
| +// |
| +// See mutate.FinishExecutionFn for the only actual implementation of this. |
| +type FinishExecutionFn func(c context.Context, eid *dm.Execution_ID, rslt *TaskResult) ([]tumble.Mutation, error) |
| + |
| +// Registry holds a collection of all of the available distributor types. |
| +type Registry interface { |
| + FinishExecution(c context.Context, eid *dm.Execution_ID, rslt *TaskResult) ([]tumble.Mutation, error) |
| + |
| + // MakeDistributor builds a distributor instance that's configured with the |
| + // provided config. |
| + // |
| + // The configuration for this distributor are obtained from luci-config at the |
| + // time an Execution is started. |
| + MakeDistributor(c context.Context, cfgName string) (d D, ver string, err error) |
| +} |
| + |
| +// NewRegistry builds a new implementation of Registry configured to load |
| +// configuration data from luci-config. |
| +// |
| +// The mapping should hold nil-ptrs of various config protos -> respective |
| +// Factory. When loading from luci-config, when we see a given message type, |
| +// we'll construct the distributor instance using the provided Factory. |
| +func NewRegistry(mapping map[proto.Message]Factory, fFn FinishExecutionFn) Registry { |
| + ret := ®istry{fFn, make(map[reflect.Type]Factory, len(mapping))} |
| + add := func(p proto.Message, factory Factory) { |
| + if factory == nil { |
| + panic("factory is nil") |
| + } |
| + if p == nil { |
| + panic("proto.Message is nil") |
| + } |
| + |
| + typ := reflect.TypeOf(p) |
| + |
| + if _, ok := ret.data[typ]; ok { |
| + panic(fmt.Errorf("trying to register %q twice", typ)) |
| + } |
| + ret.data[typ] = factory |
| + } |
| + for p, f := range mapping { |
| + add(p, f) |
| + } |
| + return ret |
| +} |
| + |
| +type registry struct { |
| + finishExecutionImpl FinishExecutionFn |
| + data map[reflect.Type]Factory |
| +} |
| + |
| +var _ Registry = (*registry)(nil) |
| + |
| +func (r *registry) SetFinishExecutionImpl(i func(context.Context, *dm.Execution_ID, *TaskResult) ([]tumble.Mutation, error)) { |
| + r.finishExecutionImpl = i |
|
dnj (Google)
2016/06/09 18:00:55
This seems like it would need locking. But it's no
iannucci
2016/06/15 00:46:00
it was a bogus extra method
|
| +} |
| + |
| +func (r *registry) FinishExecution(c context.Context, eid *dm.Execution_ID, rslt *TaskResult) ([]tumble.Mutation, error) { |
| + return r.finishExecutionImpl(c, eid, rslt) |
| +} |
| + |
| +func (r *registry) MakeDistributor(c context.Context, cfgName string) (d D, ver string, err error) { |
| + cfg, err := loadConfig(c, cfgName) |
| + if err != nil { |
| + logging.Fields{"error": err, "cfg": cfgName}.Errorf(c, "Failed to load config") |
| + return |
| + } |
| + |
| + ver = cfg.Version |
| + |
| + typ := reflect.TypeOf(cfg.Content) |
| + |
| + fn, ok := r.data[typ] |
| + if !ok { |
| + return nil, "", fmt.Errorf("unknown distributor type %T", cfg.Content) |
| + } |
| + |
| + d, err = fn(c, cfg) |
| + return |
| +} |
| + |
| +func getTrimmedAppID(c context.Context) string { |
|
dnj (Google)
2016/06/09 18:00:56
Maybe this should be built into gae/info?
iannucci
2016/06/15 00:46:00
yeah probably. Done: https://chromiumcodereview.ap
|
| + // custom domains show up as "foo.com:appid" |
| + toks := strings.Split(info.Get(c).AppID(), ":") |
| + return toks[len(toks)-1] |
| +} |
| + |
| +// loadConfig loads the named distributor configurtaion from luci-config, |
|
dnj (Google)
2016/06/09 18:00:55
nit: "configuration"
iannucci
2016/06/15 00:46:00
derp
|
| +// possibly using the in-memory or memcache version. |
| +func loadConfig(c context.Context, cfgName string) (ret *Config, err error) { |
| + aid := getTrimmedAppID(c) |
| + cfgSvc := config.Get(c) |
|
dnj (Google)
2016/06/09 18:00:56
Note: You allow a nil config service in "init()",
iannucci
2016/06/15 00:46:00
was missing a fallthrough there, added.
|
| + distCfgObj, err := cfgSvc.GetConfig(fmt.Sprintf("services/%s", aid), "distributors.cfg", false) |
| + if err != nil { |
| + return |
| + } |
| + |
| + cfgVersion := distCfgObj.Revision |
| + distCfg := &distributor.Config{} |
| + if err = proto.UnmarshalText(distCfgObj.Content, distCfg); err != nil { |
| + return |
| + } |
| + |
| + cfg, ok := distCfg.DistributorConfigs[cfgName] |
| + if !ok { |
| + err = fmt.Errorf("unknown distributor configuration: %q", cfgName) |
| + return |
| + } |
| + if alias := cfg.GetAlias(); alias != nil { |
| + cfg, ok = distCfg.DistributorConfigs[alias.OtherConfig] |
| + if !ok { |
| + err = fmt.Errorf("unknown distributor configuration: %q (via alias %q)", cfgName, alias.OtherConfig) |
| + return |
| + } |
| + if cfg.GetAlias() != nil { |
| + err = fmt.Errorf("too many levels of indirection for alias %q (points to alias %q)", cfgName, alias.OtherConfig) |
| + return |
| + } |
| + } |
| + |
| + dt := cfg.DistributorType |
| + if dt == nil { |
| + err = fmt.Errorf("blank or unrecognized distributor_type") |
| + return |
| + } |
| + dVal := reflect.ValueOf(dt) |
| + |
| + // All non-nil DistributorType's have a single field which is the actual oneof |
| + // value. |
| + implConfig := dVal.Elem().Field(0).Interface().(proto.Message) |
| + |
| + inf := info.Get(c) |
| + scheme := "https" |
| + if inf.IsDevAppServer() { |
| + scheme = "http" |
| + } |
| + |
| + ret = &Config{ |
| + scheme + "://" + inf.DefaultVersionHostname() + "/", |
|
dnj (Google)
2016/06/09 18:00:55
I think you should use a net.URL here, then call i
iannucci
2016/06/15 00:46:00
better, changed Config to have a url.URL natively.
|
| + cfgName, |
| + cfgVersion, |
| + implConfig, |
| + } |
| + return |
| +} |