Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2017 The LUCI Authors. | |
| 2 // | |
| 3 // Licensed under the Apache License, Version 2.0 (the "License"); | |
| 4 // you may not use this file except in compliance with the License. | |
| 5 // You may obtain a copy of the License at | |
| 6 // | |
| 7 // http://www.apache.org/licenses/LICENSE-2.0 | |
| 8 // | |
| 9 // Unless required by applicable law or agreed to in writing, software | |
| 10 // distributed under the License is distributed on an "AS IS" BASIS, | |
| 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| 12 // See the License for the specific language governing permissions and | |
| 13 // limitations under the License. | |
| 14 | |
| 15 // Package tq implements simple routing layer for task queue tasks. | |
| 16 package tq | |
| 17 | |
| 18 import ( | |
| 19 "bytes" | |
| 20 "encoding/json" | |
| 21 "fmt" | |
| 22 "io/ioutil" | |
| 23 "net/http" | |
| 24 "reflect" | |
| 25 "strconv" | |
| 26 "strings" | |
| 27 "sync" | |
| 28 "time" | |
| 29 | |
| 30 "github.com/golang/protobuf/jsonpb" | |
| 31 "github.com/golang/protobuf/proto" | |
| 32 "golang.org/x/net/context" | |
| 33 | |
| 34 "github.com/luci/gae/service/taskqueue" | |
| 35 | |
| 36 "github.com/luci/luci-go/appengine/gaemiddleware" | |
| 37 "github.com/luci/luci-go/common/data/stringset" | |
| 38 "github.com/luci/luci-go/common/logging" | |
| 39 "github.com/luci/luci-go/common/retry/transient" | |
| 40 "github.com/luci/luci-go/server/router" | |
| 41 ) | |
| 42 | |
| 43 // Dispatcher submits and handles task queue tasks. | |
| 44 type Dispatcher struct { | |
| 45 BaseURL string // URL prefix for all URLs, "/internal/tasks/" by default | |
| 46 | |
| 47 mu sync.RWMutex | |
| 48 handlers map[string]handler // the key is proto message type name | |
| 49 } | |
| 50 | |
| 51 // Task contains task body and additional parameters that influence how it is | |
| 52 // routed. | |
| 53 type Task struct { | |
| 54 // Payload is task's payload as well as indicator of its type. | |
| 55 // | |
| 56 // Tasks are routed based on type of the payload message, see RegisterTa sk. | |
| 57 Payload proto.Message | |
| 58 | |
| 59 // Title is optional string that identifies the task in HTTP logs. | |
| 60 // | |
| 61 // It will show up as a suffix in task handler URL. It exists exclusivel y to | |
| 62 // simplify reading HTTP logs. It serves no other purpose! In particular , | |
| 63 // it is NOT a task name. | |
| 64 // | |
| 65 // Handlers won't ever see it. Pass all information through the task bod y. | |
| 66 Title string | |
| 67 | |
| 68 // Delay specifies the duration the task queue service must wait before | |
| 69 // executing the task. | |
| 70 // | |
| 71 // Either Delay or ETA may be set, but not both. | |
| 72 Delay time.Duration | |
| 73 | |
| 74 // ETA specifies the earliest time a task may be executed. | |
| 75 // | |
| 76 // Either Delay or ETA may be set, but not both. | |
| 77 ETA time.Time | |
| 78 | |
| 79 // Retry options for this task. | |
| 80 // | |
| 81 // If given, overrides default options set when this task was registered . | |
| 82 RetryOptions *taskqueue.RetryOptions | |
| 83 } | |
| 84 | |
| 85 // Handler is called to handle one enqueued task. | |
| 86 // | |
| 87 // The passed context is produced by a middleware chain installed with | |
| 88 // InstallHandlers. | |
| 89 // | |
| 90 // execCount corresponds to X-AppEngine-TaskExecutionCount header value: it is | |
| 91 // 1 on first execution attempt, 2 on a retry, and so on. | |
| 92 // | |
| 93 // May return transient errors. In this case, task queue may attempt to | |
| 94 // redeliver the task (depending on RetryOptions). | |
| 95 // | |
| 96 // A fatal error (or success) mark the task as "done", it won't be retried. | |
| 97 type Handler func(c context.Context, payload proto.Message, execCount int) error | |
| 98 | |
| 99 // RegisterTask tells the dispatcher that tasks of given proto type should be | |
| 100 // handled by the given handler and routed through the given task queue. | |
| 101 // | |
| 102 // 'prototype' should be a pointer to some concrete proto message. It will be | |
| 103 // used only for its type signature. | |
| 104 // | |
| 105 // Intended to be called during process startup. Panics if such message has | |
| 106 // already been registered. | |
| 107 func (d *Dispatcher) RegisterTask(prototype proto.Message, cb Handler, queue str ing, opts *taskqueue.RetryOptions) { | |
| 108 if queue == "" { | |
| 109 queue = "default" // default GAE task queue name, always exists | |
| 110 } | |
| 111 | |
| 112 name := proto.MessageName(prototype) | |
| 113 if name == "" { | |
| 114 panic(fmt.Sprintf("unregistered proto message type %T", prototyp e)) | |
| 115 } | |
| 116 | |
| 117 d.mu.Lock() | |
| 118 defer d.mu.Unlock() | |
| 119 | |
| 120 if _, ok := d.handlers[name]; ok { | |
| 121 panic(fmt.Sprintf("handler for %q has already been registered", name)) | |
| 122 } | |
| 123 | |
| 124 if d.handlers == nil { | |
| 125 d.handlers = make(map[string]handler) | |
| 126 } | |
| 127 | |
| 128 d.handlers[name] = handler{cb: cb, queue: queue, retryOpts: opts} | |
| 129 } | |
| 130 | |
| 131 // AddTask submits the given task to an appropriate task queue. | |
| 132 // | |
| 133 // It means, add some later time in some other GAE process, the callback | |
| 134 // registered as a handler for corresponding proto type will be called. | |
| 135 // | |
| 136 // If the given context is transactional, inherits the transaction. | |
| 137 // | |
| 138 // May return transient errors. | |
| 139 func (d *Dispatcher) AddTask(c context.Context, task Task) error { | |
| 140 handler, err := d.handler(task.Payload) | |
| 141 if err != nil { | |
| 142 return err | |
| 143 } | |
| 144 | |
| 145 blob, err := serializePayload(task.Payload) | |
| 146 if err != nil { | |
| 147 return err | |
| 148 } | |
| 149 | |
| 150 title := "-" | |
| 151 if task.Title != "" { | |
| 152 title = task.Title | |
| 153 } | |
| 154 | |
| 155 retryOpts := handler.retryOpts | |
| 156 if task.RetryOptions != nil { | |
| 157 retryOpts = task.RetryOptions | |
| 158 } | |
| 159 | |
| 160 t := taskqueue.Task{ | |
| 161 Path: fmt.Sprintf("%s%s/%s", d.baseURL(), handler.queue, title), | |
| 162 Method: "POST", | |
| 163 Payload: blob, | |
| 164 ETA: task.ETA, | |
| 165 Delay: task.Delay, | |
| 166 RetryOptions: retryOpts, | |
| 167 } | |
| 168 if err := taskqueue.Add(c, handler.queue, &t); err != nil { | |
| 169 return transient.Tag.Apply(err) | |
| 170 } | |
| 171 | |
| 172 return nil | |
| 173 } | |
| 174 | |
| 175 // InstallRoutes installs appropriate HTTP routes in the router. | |
| 176 // | |
| 177 // Must be called only after all task handlers are registered! | |
| 178 func (d *Dispatcher) InstallRoutes(r *router.Router, mw router.MiddlewareChain) { | |
| 179 queues := stringset.New(0) | |
| 180 | |
| 181 d.mu.RLock() | |
| 182 for _, h := range d.handlers { | |
| 183 queues.Add(h.queue) | |
| 184 } | |
| 185 d.mu.RUnlock() | |
| 186 | |
| 187 for _, q := range queues.ToSlice() { | |
| 188 r.POST( | |
| 189 fmt.Sprintf("%s%s/:Title", d.baseURL(), q), | |
| 190 mw.Extend(gaemiddleware.RequireTaskQueue(q)), | |
| 191 d.processHTTPRequest) | |
| 192 } | |
| 193 } | |
| 194 | |
| 195 //////////////////////////////////////////////////////////////////////////////// | |
| 196 | |
| 197 type handler struct { | |
| 198 cb Handler // the actual handler | |
| 199 queue string // name of the task queue | |
| 200 retryOpts *taskqueue.RetryOptions // default retry options | |
| 201 } | |
| 202 | |
| 203 // baseURL returns a URL prefix for all HTTP routes used by Dispatcher. | |
| 204 // | |
| 205 // It ends with '/'. | |
| 206 func (d *Dispatcher) baseURL() string { | |
| 207 switch { | |
| 208 case d.BaseURL != "" && strings.HasSuffix(d.BaseURL, "/"): | |
| 209 return d.BaseURL | |
| 210 case d.BaseURL != "": | |
| 211 return d.BaseURL + "/" | |
| 212 default: | |
| 213 return "/internal/tasks/" | |
| 214 } | |
| 215 } | |
| 216 | |
| 217 // handler returns a handler struct registered with Register. | |
| 218 func (d *Dispatcher) handler(payload proto.Message) (handler, error) { | |
| 219 name := proto.MessageName(payload) | |
| 220 | |
| 221 d.mu.RLock() | |
| 222 defer d.mu.RUnlock() | |
| 223 | |
| 224 handler, registered := d.handlers[name] | |
| 225 if !registered { | |
| 226 return handler, fmt.Errorf("handler for %q is not registered", n ame) | |
| 227 } | |
| 228 return handler, nil | |
| 229 } | |
| 230 | |
| 231 // processHTTPRequest is invoked on each HTTP POST. | |
| 232 // | |
| 233 // It deserializes the task and invokes an appropriate callback. | |
| 234 func (d *Dispatcher) processHTTPRequest(c *router.Context) { | |
| 235 body, err := ioutil.ReadAll(c.Request.Body) | |
| 236 if err != nil { | |
| 237 httpReply(c, false, 500, "Failed to read request body: %s", err) | |
| 238 return | |
| 239 } | |
| 240 | |
| 241 payload, err := deserializePayload(body) | |
| 242 if err != nil { | |
| 243 httpReply(c, false, 400, "Bad payload, can't deserialize: %s", e rr) | |
| 244 return | |
| 245 } | |
| 246 | |
| 247 h, err := d.handler(payload) | |
| 248 if err != nil { | |
| 249 httpReply(c, false, 404, "Bad task: %s", err) | |
| 250 return | |
| 251 } | |
| 252 | |
| 253 execCount, _ := strconv.Atoi(c.Request.Header.Get("X-AppEngine-TaskExecu tionCount")) | |
| 254 switch h.cb(c.Context, payload, execCount); { | |
| 255 case err == nil: | |
| 256 httpReply(c, true, 200, "OK") | |
| 257 case transient.Tag.In(err): | |
| 258 httpReply(c, false, 500, "Transient error: %s", err) | |
| 259 default: | |
| 260 httpReply(c, false, 200, "Fatal error: %s", err) // return 200 t o stop retries | |
| 261 } | |
| 262 } | |
| 263 | |
| 264 func httpReply(c *router.Context, ok bool, code int, msg string, args ...interfa ce{}) { | |
| 265 body := fmt.Sprintf(msg, args...) | |
| 266 if !ok { | |
| 267 logging.Errorf(c.Context, "%s", body) | |
| 268 } | |
| 269 http.Error(c.Writer, body, code) | |
| 270 } | |
| 271 | |
| 272 //////////////////////////////////////////////////////////////////////////////// | |
| 273 | |
| 274 var marshaller = jsonpb.Marshaler{} | |
| 275 | |
| 276 type envelope struct { | |
| 277 Type string `json:"type"` | |
| 278 Body *json.RawMessage `json:"body"` | |
| 279 } | |
| 280 | |
| 281 func serializePayload(task proto.Message) ([]byte, error) { | |
| 282 var buf bytes.Buffer | |
| 283 if err := marshaller.Marshal(&buf, task); err != nil { | |
|
Vadim Sh.
2017/07/15 01:16:29
Using JSON to be able to see the task bodies in Cl
| |
| 284 return nil, err | |
| 285 } | |
| 286 raw := json.RawMessage(buf.Bytes()) | |
| 287 return json.Marshal(envelope{ | |
| 288 Type: proto.MessageName(task), | |
| 289 Body: &raw, | |
| 290 }) | |
| 291 } | |
| 292 | |
| 293 func deserializePayload(blob []byte) (proto.Message, error) { | |
| 294 env := envelope{} | |
| 295 if err := json.Unmarshal(blob, &env); err != nil { | |
| 296 return nil, err | |
| 297 } | |
| 298 | |
| 299 tp := proto.MessageType(env.Type) // this is **ConcreteStruct{} | |
| 300 if tp == nil { | |
| 301 return nil, fmt.Errorf("unregistered proto message name %q", env .Type) | |
| 302 } | |
| 303 if env.Body == nil { | |
| 304 return nil, fmt.Errorf("no task body given") | |
| 305 } | |
| 306 | |
| 307 task := reflect.New(tp.Elem()).Interface().(proto.Message) | |
| 308 if err := jsonpb.Unmarshal(bytes.NewReader(*env.Body), task); err != nil { | |
| 309 return nil, err | |
| 310 } | |
| 311 | |
| 312 return task, nil | |
| 313 } | |
| OLD | NEW |