| Index: scheduler/appengine/engine/tq/tq.go
|
| diff --git a/scheduler/appengine/engine/tq/tq.go b/scheduler/appengine/engine/tq/tq.go
|
| deleted file mode 100644
|
| index 6f974ddf8d03bde018da16042fa4c11f7f526bec..0000000000000000000000000000000000000000
|
| --- a/scheduler/appengine/engine/tq/tq.go
|
| +++ /dev/null
|
| @@ -1,416 +0,0 @@
|
| -// Copyright 2017 The LUCI Authors.
|
| -//
|
| -// Licensed under the Apache License, Version 2.0 (the "License");
|
| -// you may not use this file except in compliance with the License.
|
| -// You may obtain a copy of the License at
|
| -//
|
| -// http://www.apache.org/licenses/LICENSE-2.0
|
| -//
|
| -// Unless required by applicable law or agreed to in writing, software
|
| -// distributed under the License is distributed on an "AS IS" BASIS,
|
| -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| -// See the License for the specific language governing permissions and
|
| -// limitations under the License.
|
| -
|
| -// Package tq implements simple routing layer for task queue tasks.
|
| -package tq
|
| -
|
| -import (
|
| - "bytes"
|
| - "crypto/sha256"
|
| - "encoding/hex"
|
| - "encoding/json"
|
| - "fmt"
|
| - "io/ioutil"
|
| - "net/http"
|
| - "reflect"
|
| - "strconv"
|
| - "strings"
|
| - "sync"
|
| - "time"
|
| -
|
| - "github.com/golang/protobuf/jsonpb"
|
| - "github.com/golang/protobuf/proto"
|
| - "golang.org/x/net/context"
|
| -
|
| - "github.com/luci/gae/service/taskqueue"
|
| -
|
| - "github.com/luci/luci-go/appengine/gaemiddleware"
|
| - "github.com/luci/luci-go/common/data/stringset"
|
| - "github.com/luci/luci-go/common/errors"
|
| - "github.com/luci/luci-go/common/logging"
|
| - "github.com/luci/luci-go/common/retry/transient"
|
| - "github.com/luci/luci-go/server/router"
|
| -)
|
| -
|
| -// Dispatcher submits and handles task queue tasks.
|
| -type Dispatcher struct {
|
| - BaseURL string // URL prefix for all URLs, "/internal/tasks/" by default
|
| -
|
| - mu sync.RWMutex
|
| - handlers map[string]handler // the key is proto message type name
|
| -}
|
| -
|
| -// Task contains task body and additional parameters that influence how it is
|
| -// routed.
|
| -type Task struct {
|
| - // Payload is task's payload as well as indicator of its type.
|
| - //
|
| - // Tasks are routed based on type of the payload message, see RegisterTask.
|
| - Payload proto.Message
|
| -
|
| - // DeduplicationKey is optional unique key of the task.
|
| - //
|
| - // If a task of a given proto type with a given key has already been enqueued
|
| - // recently, this task will be silently ignored.
|
| - //
|
| - // Such tasks can only be used outside of transactions.
|
| - DeduplicationKey string
|
| -
|
| - // Title is optional string that identifies the task in HTTP logs.
|
| - //
|
| - // It will show up as a suffix in task handler URL. It exists exclusively to
|
| - // simplify reading HTTP logs. It serves no other purpose! In particular,
|
| - // it is NOT a task name.
|
| - //
|
| - // Handlers won't ever see it. Pass all information through the task body.
|
| - Title string
|
| -
|
| - // Delay specifies the duration the task queue service must wait before
|
| - // executing the task.
|
| - //
|
| - // Either Delay or ETA may be set, but not both.
|
| - Delay time.Duration
|
| -
|
| - // ETA specifies the earliest time a task may be executed.
|
| - //
|
| - // Either Delay or ETA may be set, but not both.
|
| - ETA time.Time
|
| -
|
| - // Retry options for this task.
|
| - //
|
| - // If given, overrides default options set when this task was registered.
|
| - RetryOptions *taskqueue.RetryOptions
|
| -}
|
| -
|
| -// Handler is called to handle one enqueued task.
|
| -//
|
| -// The passed context is produced by a middleware chain installed with
|
| -// InstallHandlers.
|
| -//
|
| -// execCount corresponds to X-AppEngine-TaskExecutionCount header value: it is
|
| -// 1 on first execution attempt, 2 on a retry, and so on.
|
| -//
|
| -// May return transient errors. In this case, task queue may attempt to
|
| -// redeliver the task (depending on RetryOptions).
|
| -//
|
| -// A fatal error (or success) mark the task as "done", it won't be retried.
|
| -type Handler func(c context.Context, payload proto.Message, execCount int) error
|
| -
|
| -// RegisterTask tells the dispatcher that tasks of given proto type should be
|
| -// handled by the given handler and routed through the given task queue.
|
| -//
|
| -// 'prototype' should be a pointer to some concrete proto message. It will be
|
| -// used only for its type signature.
|
| -//
|
| -// Intended to be called during process startup. Panics if such message has
|
| -// already been registered.
|
| -func (d *Dispatcher) RegisterTask(prototype proto.Message, cb Handler, queue string, opts *taskqueue.RetryOptions) {
|
| - if queue == "" {
|
| - queue = "default" // default GAE task queue name, always exists
|
| - }
|
| -
|
| - name := proto.MessageName(prototype)
|
| - if name == "" {
|
| - panic(fmt.Sprintf("unregistered proto message type %T", prototype))
|
| - }
|
| -
|
| - d.mu.Lock()
|
| - defer d.mu.Unlock()
|
| -
|
| - if _, ok := d.handlers[name]; ok {
|
| - panic(fmt.Sprintf("handler for %q has already been registered", name))
|
| - }
|
| -
|
| - if d.handlers == nil {
|
| - d.handlers = make(map[string]handler)
|
| - }
|
| -
|
| - d.handlers[name] = handler{
|
| - cb: cb,
|
| - typeName: name,
|
| - queue: queue,
|
| - retryOpts: opts,
|
| - }
|
| -}
|
| -
|
| -// AddTask submits given tasks to an appropriate task queue.
|
| -//
|
| -// It means, at some later time in some other GAE process, callbacks registered
|
| -// as handlers for corresponding proto types will be called.
|
| -//
|
| -// If the given context is transactional, inherits the transaction. Note if
|
| -// running outside of a transaction and multiple tasks are passed, the operation
|
| -// is not atomic: it returns an error if at least one enqueue operation failed
|
| -// (there's no way to figure out which one exactly).
|
| -//
|
| -// Returns only transient errors. Unlike regular Task Queue's Add,
|
| -// ErrTaskAlreadyAdded is not considered an error.
|
| -func (d *Dispatcher) AddTask(c context.Context, tasks ...*Task) error {
|
| - if len(tasks) == 0 {
|
| - return nil
|
| - }
|
| -
|
| - // Handle the most common case of one task in a more efficient way.
|
| - if len(tasks) == 1 {
|
| - t, queue, err := d.tqTask(tasks[0])
|
| - if err != nil {
|
| - return err
|
| - }
|
| - if err := taskqueue.Add(c, queue, t); err != nil {
|
| - if err == taskqueue.ErrTaskAlreadyAdded {
|
| - return nil
|
| - }
|
| - return transient.Tag.Apply(err)
|
| - }
|
| - return nil
|
| - }
|
| -
|
| - perQueue := map[string][]*taskqueue.Task{}
|
| - for _, task := range tasks {
|
| - t, queue, err := d.tqTask(task)
|
| - if err != nil {
|
| - return err
|
| - }
|
| - perQueue[queue] = append(perQueue[queue], t)
|
| - }
|
| -
|
| - // Enqueue in parallel, per-queue, split into batches based on Task Queue
|
| - // RPC limits (100 tasks per batch).
|
| - errs := make(chan error)
|
| - ops := 0
|
| - for q, tasks := range perQueue {
|
| - for len(tasks) > 0 {
|
| - count := 100
|
| - if count > len(tasks) {
|
| - count = len(tasks)
|
| - }
|
| - go func(q string, batch []*taskqueue.Task) {
|
| - errs <- taskqueue.Add(c, q, batch...)
|
| - }(q, tasks[:count])
|
| - tasks = tasks[count:]
|
| - ops++
|
| - }
|
| - }
|
| -
|
| - // Gather all errors throwing away ErrTaskAlreadyAdded.
|
| - var all errors.MultiError
|
| - for i := 0; i < ops; i++ {
|
| - err := <-errs
|
| - if merr, yep := err.(errors.MultiError); yep {
|
| - for _, e := range merr {
|
| - if e != nil && e != taskqueue.ErrTaskAlreadyAdded {
|
| - all = append(all, e)
|
| - }
|
| - }
|
| - } else if err != nil && err != taskqueue.ErrTaskAlreadyAdded {
|
| - all = append(all, err)
|
| - }
|
| - }
|
| -
|
| - if len(all) == 0 {
|
| - return nil
|
| - }
|
| -
|
| - return transient.Tag.Apply(all)
|
| -}
|
| -
|
| -// InstallRoutes installs appropriate HTTP routes in the router.
|
| -//
|
| -// Must be called only after all task handlers are registered!
|
| -func (d *Dispatcher) InstallRoutes(r *router.Router, mw router.MiddlewareChain) {
|
| - queues := stringset.New(0)
|
| -
|
| - d.mu.RLock()
|
| - for _, h := range d.handlers {
|
| - queues.Add(h.queue)
|
| - }
|
| - d.mu.RUnlock()
|
| -
|
| - for _, q := range queues.ToSlice() {
|
| - r.POST(
|
| - fmt.Sprintf("%s%s/*title", d.baseURL(), q),
|
| - mw.Extend(gaemiddleware.RequireTaskQueue(q)),
|
| - d.processHTTPRequest)
|
| - }
|
| -}
|
| -
|
| -////////////////////////////////////////////////////////////////////////////////
|
| -
|
| -type handler struct {
|
| - cb Handler // the actual handler
|
| - typeName string // name of the proto type it handles
|
| - queue string // name of the task queue
|
| - retryOpts *taskqueue.RetryOptions // default retry options
|
| -}
|
| -
|
| -// tqTask constructs task queue task struct.
|
| -func (d *Dispatcher) tqTask(task *Task) (*taskqueue.Task, string, error) {
|
| - handler, err := d.handler(task.Payload)
|
| - if err != nil {
|
| - return nil, "", err
|
| - }
|
| -
|
| - blob, err := serializePayload(task.Payload)
|
| - if err != nil {
|
| - return nil, "", err
|
| - }
|
| -
|
| - title := handler.typeName
|
| - if task.Title != "" {
|
| - title = task.Title
|
| - }
|
| -
|
| - retryOpts := handler.retryOpts
|
| - if task.RetryOptions != nil {
|
| - retryOpts = task.RetryOptions
|
| - }
|
| -
|
| - // There's some weird restrictions on what characters are allowed inside task
|
| - // names. Lexicographically close names also cause hot spot problems in the
|
| - // Task Queues backend. To avoid these two issues, we always use SHA256 hashes
|
| - // as task names. Also each task kind owns its own namespace of deduplication
|
| - // keys, so add task type to the digest as well.
|
| - name := ""
|
| - if task.DeduplicationKey != "" {
|
| - h := sha256.New()
|
| - h.Write([]byte(handler.typeName))
|
| - h.Write([]byte{0})
|
| - h.Write([]byte(task.DeduplicationKey))
|
| - name = hex.EncodeToString(h.Sum(nil))
|
| - }
|
| -
|
| - return &taskqueue.Task{
|
| - Path: fmt.Sprintf("%s%s/%s", d.baseURL(), handler.queue, title),
|
| - Name: name,
|
| - Method: "POST",
|
| - Payload: blob,
|
| - ETA: task.ETA,
|
| - Delay: task.Delay,
|
| - RetryOptions: retryOpts,
|
| - }, handler.queue, nil
|
| -}
|
| -
|
| -// baseURL returns a URL prefix for all HTTP routes used by Dispatcher.
|
| -//
|
| -// It ends with '/'.
|
| -func (d *Dispatcher) baseURL() string {
|
| - switch {
|
| - case d.BaseURL != "" && strings.HasSuffix(d.BaseURL, "/"):
|
| - return d.BaseURL
|
| - case d.BaseURL != "":
|
| - return d.BaseURL + "/"
|
| - default:
|
| - return "/internal/tasks/"
|
| - }
|
| -}
|
| -
|
| -// handler returns a handler struct registered with Register.
|
| -func (d *Dispatcher) handler(payload proto.Message) (handler, error) {
|
| - name := proto.MessageName(payload)
|
| -
|
| - d.mu.RLock()
|
| - defer d.mu.RUnlock()
|
| -
|
| - handler, registered := d.handlers[name]
|
| - if !registered {
|
| - return handler, fmt.Errorf("handler for %q is not registered", name)
|
| - }
|
| - return handler, nil
|
| -}
|
| -
|
| -// processHTTPRequest is invoked on each HTTP POST.
|
| -//
|
| -// It deserializes the task and invokes an appropriate callback. Finishes the
|
| -// request with status 202 in case of a fatal error (to stop retries).
|
| -func (d *Dispatcher) processHTTPRequest(c *router.Context) {
|
| - body, err := ioutil.ReadAll(c.Request.Body)
|
| - if err != nil {
|
| - httpReply(c, false, 500, "Failed to read request body: %s", err)
|
| - return
|
| - }
|
| - logging.Debugf(c.Context, "Received task: %s", body)
|
| -
|
| - payload, err := deserializePayload(body)
|
| - if err != nil {
|
| - httpReply(c, false, 202, "Bad payload, can't deserialize: %s", err)
|
| - return
|
| - }
|
| -
|
| - h, err := d.handler(payload)
|
| - if err != nil {
|
| - httpReply(c, false, 202, "Bad task: %s", err)
|
| - return
|
| - }
|
| -
|
| - execCount, _ := strconv.Atoi(c.Request.Header.Get("X-AppEngine-TaskExecutionCount"))
|
| - switch err = h.cb(c.Context, payload, execCount); {
|
| - case err == nil:
|
| - httpReply(c, true, 200, "OK")
|
| - case transient.Tag.In(err):
|
| - httpReply(c, false, 500, "Transient error: %s", err)
|
| - default:
|
| - httpReply(c, false, 202, "Fatal error: %s", err)
|
| - }
|
| -}
|
| -
|
| -func httpReply(c *router.Context, ok bool, code int, msg string, args ...interface{}) {
|
| - body := fmt.Sprintf(msg, args...)
|
| - if !ok {
|
| - logging.Errorf(c.Context, "%s", body)
|
| - }
|
| - http.Error(c.Writer, body, code)
|
| -}
|
| -
|
| -////////////////////////////////////////////////////////////////////////////////
|
| -
|
| -var marshaller = jsonpb.Marshaler{}
|
| -
|
| -type envelope struct {
|
| - Type string `json:"type"`
|
| - Body *json.RawMessage `json:"body"`
|
| -}
|
| -
|
| -func serializePayload(task proto.Message) ([]byte, error) {
|
| - var buf bytes.Buffer
|
| - if err := marshaller.Marshal(&buf, task); err != nil {
|
| - return nil, err
|
| - }
|
| - raw := json.RawMessage(buf.Bytes())
|
| - return json.Marshal(envelope{
|
| - Type: proto.MessageName(task),
|
| - Body: &raw,
|
| - })
|
| -}
|
| -
|
| -func deserializePayload(blob []byte) (proto.Message, error) {
|
| - env := envelope{}
|
| - if err := json.Unmarshal(blob, &env); err != nil {
|
| - return nil, err
|
| - }
|
| -
|
| - tp := proto.MessageType(env.Type) // this is **ConcreteStruct{}
|
| - if tp == nil {
|
| - return nil, fmt.Errorf("unregistered proto message name %q", env.Type)
|
| - }
|
| - if env.Body == nil {
|
| - return nil, fmt.Errorf("no task body given")
|
| - }
|
| -
|
| - task := reflect.New(tp.Elem()).Interface().(proto.Message)
|
| - if err := jsonpb.Unmarshal(bytes.NewReader(*env.Body), task); err != nil {
|
| - return nil, err
|
| - }
|
| -
|
| - return task, nil
|
| -}
|
|
|