Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2574)

Unified Diff: appengine/cmd/dm/distributor/pubsub.go

Issue 1537883002: Initial distributor implementation (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: self review Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/cmd/dm/distributor/pubsub.go
diff --git a/appengine/cmd/dm/distributor/pubsub.go b/appengine/cmd/dm/distributor/pubsub.go
new file mode 100644
index 0000000000000000000000000000000000000000..0a4f74a9e6c547716394c3106d5f0c7b99898eba
--- /dev/null
+++ b/appengine/cmd/dm/distributor/pubsub.go
@@ -0,0 +1,126 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package distributor
+
+import (
+ "encoding/json"
+ "fmt"
+ "net/http"
+ "strconv"
+ "time"
+
+ "github.com/julienschmidt/httprouter"
+ "github.com/luci/luci-go/appengine/tumble"
+ "github.com/luci/luci-go/common/api/dm/service/v1"
+ "github.com/luci/luci-go/common/logging"
+ "github.com/luci/luci-go/server/tokens"
+ "golang.org/x/net/context"
+)
+
+const notifyTopicSuffix = "dm-distributor-notify"
+
+func pubsubTopic(c context.Context) string {
iannucci 2016/06/08 02:54:24 note that all of this is roughly untested. This wi
+ return fmt.Sprintf("projects/%s/topics/%s", getTrimmedAppID(c), notifyTopicSuffix)
dnj (Google) 2016/06/09 18:00:55 I would recommend using "common/gcloud/pubsub" her
iannucci 2016/06/15 00:46:00 Done.
+}
+
+// PubsubReciever is the http handler that processes incoming pubsub events
dnj (Google) 2016/06/09 18:00:55 nit: HTTP
iannucci 2016/06/15 00:46:00 Done.
+// delivered to topics prepared with TaskDescription.PrepareTopic, and routes
+// them to the appropriate distributor implementation's HandleNotification
+// method.
+//
+// It requires that a Registry be installed in c via WithRegistry.
+func PubsubReciever(c context.Context, rw http.ResponseWriter, r *http.Request, p httprouter.Params) {
dnj (Google) 2016/06/09 18:00:55 nit: it might be cleaner to wrap a more specific m
iannucci 2016/06/15 00:46:00 meh.... all this handler stuff is already function
+ defer r.Body.Close()
+
+ type PubsubMessage struct {
+ Attributes map[string]string `json:"attributes"`
+ Data []byte `json:"data"`
+ MessageID string `json:"message_id"`
+ }
+ type PubsubPushMessage struct {
+ Message PubsubMessage `json:"message"`
+ Subscription string `json:"subscription"`
+ }
+ psm := &PubsubPushMessage{}
+
+ err := json.NewDecoder(r.Body).Decode(psm)
dnj (Google) 2016/06/09 18:00:55 Why use JSON here and not protobuf?
iannucci 2016/06/15 00:46:00 this is just the pubsub message envelope thingy
+ if err != nil {
dnj (Google) 2016/06/09 18:00:55 nit: if err := json.NewDecoder...
iannucci 2016/06/15 00:46:00 Done.
+ logging.Fields{"error": err}.Errorf(c, "Failed to parse pubsub message")
+ http.Error(rw, "Failed to parse pubsub message", http.StatusInternalServerError)
+ return
+ }
+
+ eid, cfgName, err := decodeAuthToken(c, psm.Message.Attributes["auth_token"])
+ if err != nil {
+ logging.WithError(err).Errorf(c, "bad auth_token")
+ // Acknowledge this message, since it'll never be valid.
+ rw.WriteHeader(http.StatusNoContent)
+ return
+ }
+
+ // remove "auth_token" from Attributes to avoid having it pass to the
+ // distributor.
+ delete(psm.Message.Attributes, "auth_token")
+
+ err = tumble.RunMutation(c, &NotifyExecution{
+ cfgName, &Notification{eid, psm.Message.Data, psm.Message.Attributes},
+ })
+ if err != nil {
+ // TODO(riannucci): distinguish between transient/non-transient failures.
+ logging.WithError(err).Errorf(c, "failed to NotifyExecution")
+ rw.WriteHeader(http.StatusInternalServerError)
+ return
+ }
+
+ rw.WriteHeader(http.StatusNoContent)
+}
+
+// pubsubAuthToken describes how to generate HMAC protected tokens used to
+// authenticate PubSub messages.
+var pubsubAuthToken = tokens.TokenKind{
+ Algo: tokens.TokenAlgoHmacSHA256,
+ Expiration: 48 * time.Hour,
+ SecretKey: "pubsub_auth_token",
+ Version: 1,
+}
+
+func encodeAuthToken(c context.Context, eid *dm.Execution_ID, cfgName string) (string, error) {
+ return pubsubAuthToken.Generate(c, nil, map[string]string{
+ "quest": eid.Quest,
+ "attempt": strconv.FormatUint(uint64(eid.Attempt), 10),
+ "execution": strconv.FormatUint(uint64(eid.Id), 10),
+ "cfgName": cfgName,
+ }, 0)
+}
+
+func decodeAuthToken(c context.Context, authToken string) (eid *dm.Execution_ID, cfgName string, err error) {
+ items, err := pubsubAuthToken.Validate(c, authToken, nil)
+ if err != nil {
+ return
+ }
+ quest, qok := items["quest"]
+ attempt, aok := items["attempt"]
+ execution, eok := items["execution"]
+ if !qok || !aok || !eok {
+ err = fmt.Errorf("missing keys: %v", items)
+ return
+ }
+ attemptNum, err := strconv.ParseUint(attempt, 10, 32)
+ if err != nil {
+ return
+ }
+ executionNum, err := strconv.ParseUint(execution, 10, 32)
+ if err != nil {
+ return
+ }
+ eid = dm.NewExecutionID(quest, uint32(attemptNum), uint32(executionNum))
+
+ cfgName, ok := items["cfgName"]
+ if !ok {
+ err = fmt.Errorf("missing config name")
+ }
+
+ return
+}

Powered by Google App Engine
This is Rietveld 408576698