Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3443)

Unified Diff: appengine/cmd/dm/distributor/registry.go

Issue 1537883002: Initial distributor implementation (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: self review Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/cmd/dm/distributor/registry.go
diff --git a/appengine/cmd/dm/distributor/registry.go b/appengine/cmd/dm/distributor/registry.go
new file mode 100644
index 0000000000000000000000000000000000000000..a74879a3d3463bee67f89944ef5e5f109a7bfdaf
--- /dev/null
+++ b/appengine/cmd/dm/distributor/registry.go
@@ -0,0 +1,185 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package distributor
+
+import (
+ "fmt"
+ "reflect"
+ "strings"
+
+ "github.com/golang/protobuf/proto"
+ "github.com/luci/gae/service/info"
+ "github.com/luci/luci-go/appengine/tumble"
+ "github.com/luci/luci-go/common/api/dm/distributor"
+ "github.com/luci/luci-go/common/api/dm/service/v1"
+ "github.com/luci/luci-go/common/config"
+ "github.com/luci/luci-go/common/logging"
+ "golang.org/x/net/context"
+)
+
+var regKey = "holds a Registry"
dnj (Google) 2016/06/09 18:00:56 nit: "holds a DM Distributor Registry"
iannucci 2016/06/15 00:46:00 Done.
+
+// WithRegistry adds the registry to the Context.
+func WithRegistry(c context.Context, r Registry) context.Context {
+ if r == nil {
+ return c
dnj (Google) 2016/06/09 18:00:56 Perhaps this should panic? I think that this is cl
iannucci 2016/06/15 00:46:00 Done.
+ }
+ return context.WithValue(c, &regKey, r)
+}
+
+// GetRegistry gets the registry from the Context. This will return nil if the
+// Context does not contain a Registry.
+func GetRegistry(c context.Context) Registry {
+ ret, _ := c.Value(&regKey).(Registry)
+ return ret
+}
+
+// FinishExecutionFn is required to eliminate a circular dependency
+// between mutate <-> distributor. Essentially this just makes a new
+// mutate.FinishExecution.
+//
+// See mutate.FinishExecutionFn for the only actual implementation of this.
+type FinishExecutionFn func(c context.Context, eid *dm.Execution_ID, rslt *TaskResult) ([]tumble.Mutation, error)
+
+// Registry holds a collection of all of the available distributor types.
+type Registry interface {
+ FinishExecution(c context.Context, eid *dm.Execution_ID, rslt *TaskResult) ([]tumble.Mutation, error)
+
+ // MakeDistributor builds a distributor instance that's configured with the
+ // provided config.
+ //
+ // The configuration for this distributor are obtained from luci-config at the
+ // time an Execution is started.
+ MakeDistributor(c context.Context, cfgName string) (d D, ver string, err error)
+}
+
+// NewRegistry builds a new implementation of Registry configured to load
+// configuration data from luci-config.
+//
+// The mapping should hold nil-ptrs of various config protos -> respective
+// Factory. When loading from luci-config, when we see a given message type,
+// we'll construct the distributor instance using the provided Factory.
+func NewRegistry(mapping map[proto.Message]Factory, fFn FinishExecutionFn) Registry {
+ ret := &registry{fFn, make(map[reflect.Type]Factory, len(mapping))}
+ add := func(p proto.Message, factory Factory) {
+ if factory == nil {
+ panic("factory is nil")
+ }
+ if p == nil {
+ panic("proto.Message is nil")
+ }
+
+ typ := reflect.TypeOf(p)
+
+ if _, ok := ret.data[typ]; ok {
+ panic(fmt.Errorf("trying to register %q twice", typ))
+ }
+ ret.data[typ] = factory
+ }
+ for p, f := range mapping {
+ add(p, f)
+ }
+ return ret
+}
+
+type registry struct {
+ finishExecutionImpl FinishExecutionFn
+ data map[reflect.Type]Factory
+}
+
+var _ Registry = (*registry)(nil)
+
+func (r *registry) SetFinishExecutionImpl(i func(context.Context, *dm.Execution_ID, *TaskResult) ([]tumble.Mutation, error)) {
+ r.finishExecutionImpl = i
dnj (Google) 2016/06/09 18:00:55 This seems like it would need locking. But it's no
iannucci 2016/06/15 00:46:00 it was a bogus extra method
+}
+
+func (r *registry) FinishExecution(c context.Context, eid *dm.Execution_ID, rslt *TaskResult) ([]tumble.Mutation, error) {
+ return r.finishExecutionImpl(c, eid, rslt)
+}
+
+func (r *registry) MakeDistributor(c context.Context, cfgName string) (d D, ver string, err error) {
+ cfg, err := loadConfig(c, cfgName)
+ if err != nil {
+ logging.Fields{"error": err, "cfg": cfgName}.Errorf(c, "Failed to load config")
+ return
+ }
+
+ ver = cfg.Version
+
+ typ := reflect.TypeOf(cfg.Content)
+
+ fn, ok := r.data[typ]
+ if !ok {
+ return nil, "", fmt.Errorf("unknown distributor type %T", cfg.Content)
+ }
+
+ d, err = fn(c, cfg)
+ return
+}
+
+func getTrimmedAppID(c context.Context) string {
dnj (Google) 2016/06/09 18:00:56 Maybe this should be built into gae/info?
iannucci 2016/06/15 00:46:00 yeah probably. Done: https://chromiumcodereview.ap
+ // custom domains show up as "foo.com:appid"
+ toks := strings.Split(info.Get(c).AppID(), ":")
+ return toks[len(toks)-1]
+}
+
+// loadConfig loads the named distributor configurtaion from luci-config,
dnj (Google) 2016/06/09 18:00:55 nit: "configuration"
iannucci 2016/06/15 00:46:00 derp
+// possibly using the in-memory or memcache version.
+func loadConfig(c context.Context, cfgName string) (ret *Config, err error) {
+ aid := getTrimmedAppID(c)
+ cfgSvc := config.Get(c)
dnj (Google) 2016/06/09 18:00:56 Note: You allow a nil config service in "init()",
iannucci 2016/06/15 00:46:00 was missing a fallthrough there, added.
+ distCfgObj, err := cfgSvc.GetConfig(fmt.Sprintf("services/%s", aid), "distributors.cfg", false)
+ if err != nil {
+ return
+ }
+
+ cfgVersion := distCfgObj.Revision
+ distCfg := &distributor.Config{}
+ if err = proto.UnmarshalText(distCfgObj.Content, distCfg); err != nil {
+ return
+ }
+
+ cfg, ok := distCfg.DistributorConfigs[cfgName]
+ if !ok {
+ err = fmt.Errorf("unknown distributor configuration: %q", cfgName)
+ return
+ }
+ if alias := cfg.GetAlias(); alias != nil {
+ cfg, ok = distCfg.DistributorConfigs[alias.OtherConfig]
+ if !ok {
+ err = fmt.Errorf("unknown distributor configuration: %q (via alias %q)", cfgName, alias.OtherConfig)
+ return
+ }
+ if cfg.GetAlias() != nil {
+ err = fmt.Errorf("too many levels of indirection for alias %q (points to alias %q)", cfgName, alias.OtherConfig)
+ return
+ }
+ }
+
+ dt := cfg.DistributorType
+ if dt == nil {
+ err = fmt.Errorf("blank or unrecognized distributor_type")
+ return
+ }
+ dVal := reflect.ValueOf(dt)
+
+ // All non-nil DistributorType's have a single field which is the actual oneof
+ // value.
+ implConfig := dVal.Elem().Field(0).Interface().(proto.Message)
+
+ inf := info.Get(c)
+ scheme := "https"
+ if inf.IsDevAppServer() {
+ scheme = "http"
+ }
+
+ ret = &Config{
+ scheme + "://" + inf.DefaultVersionHostname() + "/",
dnj (Google) 2016/06/09 18:00:55 I think you should use a net.URL here, then call i
iannucci 2016/06/15 00:46:00 better, changed Config to have a url.URL natively.
+ cfgName,
+ cfgVersion,
+ implConfig,
+ }
+ return
+}

Powered by Google App Engine
This is Rietveld 408576698