OLD | NEW |
(Empty) | |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. |
| 4 |
| 5 package distributor |
| 6 |
| 7 import ( |
| 8 "encoding/json" |
| 9 "fmt" |
| 10 "net/http" |
| 11 "strconv" |
| 12 "time" |
| 13 |
| 14 "github.com/julienschmidt/httprouter" |
| 15 "github.com/luci/luci-go/appengine/tumble" |
| 16 "github.com/luci/luci-go/common/api/dm/service/v1" |
| 17 "github.com/luci/luci-go/common/logging" |
| 18 "github.com/luci/luci-go/server/tokens" |
| 19 "golang.org/x/net/context" |
| 20 ) |
| 21 |
| 22 const notifyTopicSuffix = "dm-distributor-notify" |
| 23 |
| 24 // PubsubReciever is the HTTP handler that processes incoming pubsub events |
| 25 // delivered to topics prepared with TaskDescription.PrepareTopic, and routes |
| 26 // them to the appropriate distributor implementation's HandleNotification |
| 27 // method. |
| 28 // |
| 29 // It requires that a Registry be installed in c via WithRegistry. |
| 30 func PubsubReciever(c context.Context, rw http.ResponseWriter, r *http.Request,
p httprouter.Params) { |
| 31 defer r.Body.Close() |
| 32 |
| 33 type PubsubMessage struct { |
| 34 Attributes map[string]string `json:"attributes"` |
| 35 Data []byte `json:"data"` |
| 36 MessageID string `json:"message_id"` |
| 37 } |
| 38 type PubsubPushMessage struct { |
| 39 Message PubsubMessage `json:"message"` |
| 40 Subscription string `json:"subscription"` |
| 41 } |
| 42 psm := &PubsubPushMessage{} |
| 43 |
| 44 if err := json.NewDecoder(r.Body).Decode(psm); err != nil { |
| 45 logging.WithError(err).Errorf(c, "Failed to parse pubsub message
") |
| 46 http.Error(rw, "Failed to parse pubsub message", http.StatusInte
rnalServerError) |
| 47 return |
| 48 } |
| 49 |
| 50 eid, cfgName, err := decodeAuthToken(c, psm.Message.Attributes["auth_tok
en"]) |
| 51 if err != nil { |
| 52 logging.WithError(err).Errorf(c, "bad auth_token") |
| 53 // Acknowledge this message, since it'll never be valid. |
| 54 rw.WriteHeader(http.StatusNoContent) |
| 55 return |
| 56 } |
| 57 |
| 58 // remove "auth_token" from Attributes to avoid having it pass to the |
| 59 // distributor. |
| 60 delete(psm.Message.Attributes, "auth_token") |
| 61 |
| 62 err = tumble.RunMutation(c, &NotifyExecution{ |
| 63 cfgName, &Notification{eid, psm.Message.Data, psm.Message.Attrib
utes}, |
| 64 }) |
| 65 if err != nil { |
| 66 // TODO(riannucci): distinguish between transient/non-transient
failures. |
| 67 logging.WithError(err).Errorf(c, "failed to NotifyExecution") |
| 68 rw.WriteHeader(http.StatusInternalServerError) |
| 69 return |
| 70 } |
| 71 |
| 72 rw.WriteHeader(http.StatusNoContent) |
| 73 } |
| 74 |
| 75 // pubsubAuthToken describes how to generate HMAC protected tokens used to |
| 76 // authenticate PubSub messages. |
| 77 var pubsubAuthToken = tokens.TokenKind{ |
| 78 Algo: tokens.TokenAlgoHmacSHA256, |
| 79 Expiration: 48 * time.Hour, |
| 80 SecretKey: "pubsub_auth_token", |
| 81 Version: 1, |
| 82 } |
| 83 |
| 84 func encodeAuthToken(c context.Context, eid *dm.Execution_ID, cfgName string) (s
tring, error) { |
| 85 return pubsubAuthToken.Generate(c, nil, map[string]string{ |
| 86 "quest": eid.Quest, |
| 87 "attempt": strconv.FormatUint(uint64(eid.Attempt), 10), |
| 88 "execution": strconv.FormatUint(uint64(eid.Id), 10), |
| 89 "cfgName": cfgName, |
| 90 }, 0) |
| 91 } |
| 92 |
| 93 func decodeAuthToken(c context.Context, authToken string) (eid *dm.Execution_ID,
cfgName string, err error) { |
| 94 items, err := pubsubAuthToken.Validate(c, authToken, nil) |
| 95 if err != nil { |
| 96 return |
| 97 } |
| 98 quest, qok := items["quest"] |
| 99 attempt, aok := items["attempt"] |
| 100 execution, eok := items["execution"] |
| 101 if !qok || !aok || !eok { |
| 102 err = fmt.Errorf("missing keys: %v", items) |
| 103 return |
| 104 } |
| 105 attemptNum, err := strconv.ParseUint(attempt, 10, 32) |
| 106 if err != nil { |
| 107 return |
| 108 } |
| 109 executionNum, err := strconv.ParseUint(execution, 10, 32) |
| 110 if err != nil { |
| 111 return |
| 112 } |
| 113 eid = dm.NewExecutionID(quest, uint32(attemptNum), uint32(executionNum)) |
| 114 |
| 115 cfgName, ok := items["cfgName"] |
| 116 if !ok { |
| 117 err = fmt.Errorf("missing config name") |
| 118 } |
| 119 |
| 120 return |
| 121 } |
OLD | NEW |