| Index: appengine/cmd/dm/distributor/tq_handler.go
|
| diff --git a/appengine/cmd/dm/distributor/tq_handler.go b/appengine/cmd/dm/distributor/tq_handler.go
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..059a57369f6a43947dc56d3cd530acae1daa0956
|
| --- /dev/null
|
| +++ b/appengine/cmd/dm/distributor/tq_handler.go
|
| @@ -0,0 +1,60 @@
|
| +// Copyright 2015 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +package distributor
|
| +
|
| +import (
|
| + "net/http"
|
| + "net/url"
|
| + "strings"
|
| +
|
| + "github.com/julienschmidt/httprouter"
|
| + "github.com/luci/luci-go/appengine/tumble"
|
| + "github.com/luci/luci-go/common/logging"
|
| + "golang.org/x/net/context"
|
| +)
|
| +
|
| +const handlerPattern = "/tq/distributor/:cfgName"
|
| +
|
| +func handlerPath(cfgName string) string {
|
| + return strings.Replace(handlerPattern, ":cfgName", url.QueryEscape(cfgName), 1)
|
| +}
|
| +
|
| +// TaskqueueHandler is the http handler that routes taskqueue tasks made with
|
| +// Config.EnqueueTask to a distributor's HandleTaskQueueTask method.
|
| +//
|
| +// This requires that c already have a Registry installed via the WithRegistry
|
| +// method.
|
| +func TaskqueueHandler(c context.Context, rw http.ResponseWriter, r *http.Request, p httprouter.Params) {
|
| + defer r.Body.Close()
|
| +
|
| + cfgName := p.ByName("cfgName")
|
| + dist, _, err := GetRegistry(c).MakeDistributor(c, cfgName)
|
| + if err != nil {
|
| + logging.Fields{"error": err, "cfg": cfgName}.Errorf(c, "Failed to make distributor")
|
| + http.Error(rw, "bad distributor", http.StatusBadRequest)
|
| + return
|
| + }
|
| + notifications, err := dist.HandleTaskQueueTask(r)
|
| + if err != nil {
|
| + logging.Fields{"error": err, "cfg": cfgName}.Errorf(c, "Failed to handle taskqueue task")
|
| + http.Error(rw, "failure to execute handler", http.StatusInternalServerError)
|
| + return
|
| + }
|
| + if len(notifications) > 0 {
|
| + muts := make([]tumble.Mutation, 0, len(notifications))
|
| + for _, notify := range notifications {
|
| + if notify != nil {
|
| + muts = append(muts, &NotifyExecution{cfgName, notify})
|
| + }
|
| + }
|
| + err = tumble.AddToJournal(c, muts...)
|
| + if err != nil {
|
| + logging.Fields{"error": err, "cfg": cfgName}.Errorf(c, "Failed to handle notifications")
|
| + http.Error(rw, "failure to handle notifications", http.StatusInternalServerError)
|
| + return
|
| + }
|
| + }
|
| + rw.WriteHeader(http.StatusOK)
|
| +}
|
|
|