Chromium Code Reviews| Index: appengine/cmd/dm/distributor/pubsub.go |
| diff --git a/appengine/cmd/dm/distributor/pubsub.go b/appengine/cmd/dm/distributor/pubsub.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..0a4f74a9e6c547716394c3106d5f0c7b99898eba |
| --- /dev/null |
| +++ b/appengine/cmd/dm/distributor/pubsub.go |
| @@ -0,0 +1,126 @@ |
| +// Copyright 2015 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +package distributor |
| + |
| +import ( |
| + "encoding/json" |
| + "fmt" |
| + "net/http" |
| + "strconv" |
| + "time" |
| + |
| + "github.com/julienschmidt/httprouter" |
| + "github.com/luci/luci-go/appengine/tumble" |
| + "github.com/luci/luci-go/common/api/dm/service/v1" |
| + "github.com/luci/luci-go/common/logging" |
| + "github.com/luci/luci-go/server/tokens" |
| + "golang.org/x/net/context" |
| +) |
| + |
| +const notifyTopicSuffix = "dm-distributor-notify" |
| + |
| +func pubsubTopic(c context.Context) string { |
|
iannucci
2016/06/08 02:54:24
note that all of this is roughly untested. This wi
|
| + return fmt.Sprintf("projects/%s/topics/%s", getTrimmedAppID(c), notifyTopicSuffix) |
|
dnj (Google)
2016/06/09 18:00:55
I would recommend using "common/gcloud/pubsub" her
iannucci
2016/06/15 00:46:00
Done.
|
| +} |
| + |
| +// PubsubReciever is the http handler that processes incoming pubsub events |
|
dnj (Google)
2016/06/09 18:00:55
nit: HTTP
iannucci
2016/06/15 00:46:00
Done.
|
| +// delivered to topics prepared with TaskDescription.PrepareTopic, and routes |
| +// them to the appropriate distributor implementation's HandleNotification |
| +// method. |
| +// |
| +// It requires that a Registry be installed in c via WithRegistry. |
| +func PubsubReciever(c context.Context, rw http.ResponseWriter, r *http.Request, p httprouter.Params) { |
|
dnj (Google)
2016/06/09 18:00:55
nit: it might be cleaner to wrap a more specific m
iannucci
2016/06/15 00:46:00
meh.... all this handler stuff is already function
|
| + defer r.Body.Close() |
| + |
| + type PubsubMessage struct { |
| + Attributes map[string]string `json:"attributes"` |
| + Data []byte `json:"data"` |
| + MessageID string `json:"message_id"` |
| + } |
| + type PubsubPushMessage struct { |
| + Message PubsubMessage `json:"message"` |
| + Subscription string `json:"subscription"` |
| + } |
| + psm := &PubsubPushMessage{} |
| + |
| + err := json.NewDecoder(r.Body).Decode(psm) |
|
dnj (Google)
2016/06/09 18:00:55
Why use JSON here and not protobuf?
iannucci
2016/06/15 00:46:00
this is just the pubsub message envelope thingy
|
| + if err != nil { |
|
dnj (Google)
2016/06/09 18:00:55
nit: if err := json.NewDecoder...
iannucci
2016/06/15 00:46:00
Done.
|
| + logging.Fields{"error": err}.Errorf(c, "Failed to parse pubsub message") |
| + http.Error(rw, "Failed to parse pubsub message", http.StatusInternalServerError) |
| + return |
| + } |
| + |
| + eid, cfgName, err := decodeAuthToken(c, psm.Message.Attributes["auth_token"]) |
| + if err != nil { |
| + logging.WithError(err).Errorf(c, "bad auth_token") |
| + // Acknowledge this message, since it'll never be valid. |
| + rw.WriteHeader(http.StatusNoContent) |
| + return |
| + } |
| + |
| + // remove "auth_token" from Attributes to avoid having it pass to the |
| + // distributor. |
| + delete(psm.Message.Attributes, "auth_token") |
| + |
| + err = tumble.RunMutation(c, &NotifyExecution{ |
| + cfgName, &Notification{eid, psm.Message.Data, psm.Message.Attributes}, |
| + }) |
| + if err != nil { |
| + // TODO(riannucci): distinguish between transient/non-transient failures. |
| + logging.WithError(err).Errorf(c, "failed to NotifyExecution") |
| + rw.WriteHeader(http.StatusInternalServerError) |
| + return |
| + } |
| + |
| + rw.WriteHeader(http.StatusNoContent) |
| +} |
| + |
| +// pubsubAuthToken describes how to generate HMAC protected tokens used to |
| +// authenticate PubSub messages. |
| +var pubsubAuthToken = tokens.TokenKind{ |
| + Algo: tokens.TokenAlgoHmacSHA256, |
| + Expiration: 48 * time.Hour, |
| + SecretKey: "pubsub_auth_token", |
| + Version: 1, |
| +} |
| + |
| +func encodeAuthToken(c context.Context, eid *dm.Execution_ID, cfgName string) (string, error) { |
| + return pubsubAuthToken.Generate(c, nil, map[string]string{ |
| + "quest": eid.Quest, |
| + "attempt": strconv.FormatUint(uint64(eid.Attempt), 10), |
| + "execution": strconv.FormatUint(uint64(eid.Id), 10), |
| + "cfgName": cfgName, |
| + }, 0) |
| +} |
| + |
| +func decodeAuthToken(c context.Context, authToken string) (eid *dm.Execution_ID, cfgName string, err error) { |
| + items, err := pubsubAuthToken.Validate(c, authToken, nil) |
| + if err != nil { |
| + return |
| + } |
| + quest, qok := items["quest"] |
| + attempt, aok := items["attempt"] |
| + execution, eok := items["execution"] |
| + if !qok || !aok || !eok { |
| + err = fmt.Errorf("missing keys: %v", items) |
| + return |
| + } |
| + attemptNum, err := strconv.ParseUint(attempt, 10, 32) |
| + if err != nil { |
| + return |
| + } |
| + executionNum, err := strconv.ParseUint(execution, 10, 32) |
| + if err != nil { |
| + return |
| + } |
| + eid = dm.NewExecutionID(quest, uint32(attemptNum), uint32(executionNum)) |
| + |
| + cfgName, ok := items["cfgName"] |
| + if !ok { |
| + err = fmt.Errorf("missing config name") |
| + } |
| + |
| + return |
| +} |