Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package tumble | |
| 6 | |
| 7 import ( | |
| 8 "fmt" | |
| 9 "net/http" | |
| 10 "strings" | |
| 11 "time" | |
| 12 | |
| 13 "github.com/julienschmidt/httprouter" | |
| 14 "github.com/luci/gae/impl/prod" | |
| 15 "github.com/luci/luci-go/appengine/gaelogger" | |
| 16 "golang.org/x/net/context" | |
| 17 ) | |
| 18 | |
| 19 // Config is the set of tweakable things for tumble. If you use something other | |
| 20 // than the defaults (e.g. unset values), you must ensure that all aspects of | |
| 21 // your application use the same config. | |
| 22 type Config struct { | |
| 23 // Name is the name of this service. This is the expected name of the | |
| 24 // configured taskqueue, as well as the prefix used for things like memc ache | |
| 25 // keys. | |
| 26 // | |
| 27 // It defaults to "tumble". It is illegal for the Name to contain '/', a nd | |
| 28 // Use will panic if it does. | |
| 29 Name string | |
| 30 | |
| 31 // URLPrefix is the prefix to append for all registered routes. It's | |
| 32 // normalized to begin and end with a '/'. So "wat" would register: | |
| 33 // "/wat/{Service.Name}/fire_all_tasks" | |
| 34 // "/wat/{Service.Name}/process_shard/:shard_id/at/:timestamp" | |
| 35 // | |
| 36 // This defaults to "internal" | |
| 37 URLPrefix string | |
| 38 | |
| 39 // NumShards is the number of tumble shards that will process concurrent ly. | |
| 40 // It defaults to 32. | |
| 41 NumShards uint64 | |
| 42 | |
| 43 // TemporalMinDelay is the minimum number of seconds to wait before the | |
| 44 // task queue entry for a given shard will run. It defaults to 1 second. | |
| 45 TemporalMinDelay time.Duration | |
| 46 | |
| 47 // TemporalRoundFactor is the number of seconds to batch together in tas k | |
| 48 // queue tasks. It defaults to 4 seconds. | |
| 49 TemporalRoundFactor time.Duration | |
| 50 | |
| 51 // NumGoroutines is the number of gorountines that will process in paral lel | |
| 52 // in a single shard. Each goroutine will process exactly one root entit y. | |
| 53 // It defaults to 16. | |
| 54 NumGoroutines uint64 | |
| 55 | |
| 56 // ProcessMaxBatchSize is the number of mutations that each processor go routine | |
| 57 // will attempt to include in each commit. | |
| 58 // | |
| 59 // It defaults to 128. A negative value means no limit. | |
| 60 ProcessMaxBatchSize int32 | |
| 61 } | |
| 62 | |
| 63 type key int | |
| 64 | |
| 65 var defaultConfig = Config{ | |
| 66 Name: "tumble", | |
| 67 URLPrefix: "/internal/", | |
| 68 NumShards: 32, | |
| 69 TemporalMinDelay: time.Second, | |
| 70 TemporalRoundFactor: 4 * time.Second, | |
| 71 NumGoroutines: 16, | |
| 72 ProcessMaxBatchSize: 128, | |
| 73 } | |
| 74 | |
| 75 // DefaultConfig returns a Config with all the default values populated. | |
| 76 func DefaultConfig() Config { | |
| 77 return defaultConfig | |
| 78 } | |
| 79 | |
| 80 // Use allows you to set a specific configuration in the context. This | |
| 81 // configuration can be obtained by calling GetConfig. Any zero-value fields | |
| 82 // in the Config will be replaced with its default value. | |
| 83 // | |
| 84 // This Config may be retrieved with GetConfig. | |
| 85 func Use(c context.Context, cfg Config) context.Context { | |
| 86 if cfg.Name == "" { | |
| 87 cfg.Name = defaultConfig.Name | |
| 88 } | |
| 89 if strings.Contains(cfg.Name, "/") { | |
| 90 panic(fmt.Errorf("tumble: name may not contain '/': %q", cfg.Nam e)) | |
| 91 } | |
| 92 if cfg.URLPrefix == "" { | |
| 93 cfg.URLPrefix = defaultConfig.URLPrefix | |
| 94 } | |
| 95 if !strings.HasPrefix(cfg.URLPrefix, "/") { | |
| 96 cfg.URLPrefix = "/" + cfg.URLPrefix | |
| 97 } | |
| 98 if !strings.HasSuffix(cfg.URLPrefix, "/") { | |
| 99 cfg.URLPrefix = cfg.URLPrefix + "/" | |
| 100 } | |
| 101 if cfg.NumShards == 0 { | |
| 102 cfg.NumShards = defaultConfig.NumShards | |
| 103 } | |
| 104 if cfg.TemporalMinDelay == 0 { | |
| 105 cfg.TemporalMinDelay = defaultConfig.TemporalMinDelay | |
| 106 } | |
| 107 if cfg.TemporalRoundFactor == 0 { | |
| 108 cfg.TemporalRoundFactor = defaultConfig.TemporalRoundFactor | |
| 109 } | |
| 110 if cfg.NumGoroutines == 0 { | |
| 111 cfg.NumGoroutines = defaultConfig.NumGoroutines | |
| 112 } | |
| 113 if cfg.ProcessMaxBatchSize == 0 { | |
| 114 cfg.ProcessMaxBatchSize = defaultConfig.ProcessMaxBatchSize | |
| 115 } | |
| 116 return context.WithValue(c, key(0), &cfg) | |
| 117 } | |
| 118 | |
| 119 // GetConfig retrieves the Config from the current context. If none has been set , | |
| 120 // this returns a Config which has all the defaults filled out. | |
| 121 func GetConfig(c context.Context) Config { | |
| 122 if cfg, ok := c.Value(key(0)).(*Config); ok { | |
| 123 return *cfg | |
| 124 } | |
| 125 return defaultConfig | |
| 126 } | |
| 127 | |
| 128 const processShardURLFormat = "/process_shard/:shard_id/at/:timestamp" | |
| 129 | |
| 130 // ProcessURLPattern returns the httprouter-style URL pattern for the taskqueue | |
| 131 // process handler. | |
| 132 func (c *Config) ProcessURLPattern() string { | |
| 133 return c.URLPrefix + c.Name + processShardURLFormat | |
| 134 } | |
| 135 | |
| 136 // ProcessURL creates a new url for a process shard taskqueue task, including | |
| 137 // the given timestamp and shard number. | |
| 138 func (c *Config) ProcessURL(ts time.Time, shard uint64) string { | |
| 139 return strings.NewReplacer( | |
| 140 ":shard_id", fmt.Sprint(shard), | |
| 141 ":timestamp", fmt.Sprint(ts.Unix())).Replace(c.ProcessURLPattern ()) | |
| 142 } | |
| 143 | |
| 144 // FireAllTasksURL returns the url intended to be hit by appengine cron to fire | |
| 145 // an instance of all the processing tasks. | |
| 146 func (c *Config) FireAllTasksURL() string { | |
| 147 return c.URLPrefix + c.Name + "/fire_all_tasks" | |
| 148 } | |
| 149 | |
| 150 // InstallHandlers installs http handlers | |
| 151 func (c *Config) InstallHandlers(r *httprouter.Router) { | |
| 152 // GET so that this can be invoked from cron | |
| 153 r.GET(c.FireAllTasksURL(), | |
|
Vadim Sh.
2015/10/12 20:05:14
check X-AppEngine-Cron header is set to "true" as
iannucci
2015/10/13 02:39:46
middlewaarrrereeerereerrere
| |
| 154 func(rw http.ResponseWriter, r *http.Request, _ httprouter.Param s) { | |
| 155 c := gaelogger.Use(prod.UseRequest(r)) | |
| 156 FireAllTasksHandler(c, rw, r) | |
| 157 }) | |
| 158 r.POST(c.ProcessURLPattern(), | |
|
Vadim Sh.
2015/10/12 20:05:14
same here (X-AppEngine-QueueName header).
Perhaps
iannucci
2015/10/13 02:39:46
да
| |
| 159 func(rw http.ResponseWriter, r *http.Request, p httprouter.Param s) { | |
| 160 c := gaelogger.Use(prod.UseRequest(r)) | |
| 161 ProcessShardHandler(c, rw, r, p) | |
| 162 }) | |
| 163 } | |
| OLD | NEW |