Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(539)

Side by Side Diff: milo/buildsource/buildbot/pubsub.go

Issue 2955223002: Milo: Buildbucket PubSub ingestion outline (Closed)
Patch Set: rebase Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « milo/api/config/settings.pb.go ('k') | milo/buildsource/buildbot/pubsub_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The LUCI Authors. All rights reserved. 1 // Copyright 2016 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package buildbot 5 package buildbot
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "compress/gzip" 9 "compress/gzip"
10 "compress/zlib" 10 "compress/zlib"
11 "encoding/base64"
12 "encoding/json" 11 "encoding/json"
13 "fmt" 12 "fmt"
14 "net/http" 13 "net/http"
15 "strings" 14 "strings"
16 "time" 15 "time"
17 16
18 ds "github.com/luci/gae/service/datastore" 17 ds "github.com/luci/gae/service/datastore"
19 "github.com/luci/luci-go/common/clock" 18 "github.com/luci/luci-go/common/clock"
20 "github.com/luci/luci-go/common/iotools" 19 "github.com/luci/luci-go/common/iotools"
21 "github.com/luci/luci-go/common/logging" 20 "github.com/luci/luci-go/common/logging"
(...skipping 22 matching lines...) Expand all
44 masterCounter = metric.NewCounter( 43 masterCounter = metric.NewCounter(
45 "luci/milo/buildbot_pubsub/masters", 44 "luci/milo/buildbot_pubsub/masters",
46 "The number of buildbot master jsons received by Milo from PubSu b", 45 "The number of buildbot master jsons received by Milo from PubSu b",
47 nil, 46 nil,
48 field.Bool("internal"), 47 field.Bool("internal"),
49 field.String("master"), 48 field.String("master"),
50 // Status can be one of 2 options. "success", "failure". 49 // Status can be one of 2 options. "success", "failure".
51 field.String("status")) 50 field.String("status"))
52 ) 51 )
53 52
54 type pubSubMessage struct {
55 Attributes map[string]string `json:"attributes"`
56 Data string `json:"data"`
57 MessageID string `json:"message_id"`
58 }
59
60 type pubSubSubscription struct {
61 Message pubSubMessage `json:"message"`
62 Subscription string `json:"subscription"`
63 }
64
65 type buildMasterMsg struct { 53 type buildMasterMsg struct {
66 Master *buildbotMaster `json:"master"` 54 Master *buildbotMaster `json:"master"`
67 Builds []*buildbotBuild `json:"builds"` 55 Builds []*buildbotBuild `json:"builds"`
68 } 56 }
69 57
70 // buildbotMasterEntry is a container for a marshaled and packed buildbot 58 // buildbotMasterEntry is a container for a marshaled and packed buildbot
71 // master json. 59 // master json.
72 type buildbotMasterEntry struct { 60 type buildbotMasterEntry struct {
73 // Name of the buildbot master. 61 // Name of the buildbot master.
74 Name string `gae:"$id"` 62 Name string `gae:"$id"`
(...skipping 30 matching lines...) Expand all
105 if err := e.Encode(master); err != nil { 93 if err := e.Encode(master); err != nil {
106 return err 94 return err
107 } 95 }
108 gsw.Close() 96 gsw.Close()
109 entry.Data = gzbs.Bytes() 97 entry.Data = gzbs.Bytes()
110 logging.Debugf(c, "Length of json data: %d", cw.Count) 98 logging.Debugf(c, "Length of json data: %d", cw.Count)
111 logging.Debugf(c, "Length of gzipped data: %d", len(entry.Data)) 99 logging.Debugf(c, "Length of gzipped data: %d", len(entry.Data))
112 return ds.Put(c, &entry) 100 return ds.Put(c, &entry)
113 } 101 }
114 102
115 // GetData returns the expanded form of Data (decoded from base64).
116 func (m *pubSubSubscription) GetData() ([]byte, error) {
117 return base64.StdEncoding.DecodeString(m.Message.Data)
118 }
119
120 // unmarshal a gzipped byte stream into a list of buildbot builds and masters. 103 // unmarshal a gzipped byte stream into a list of buildbot builds and masters.
121 func unmarshal( 104 func unmarshal(
122 c context.Context, msg []byte) ([]*buildbotBuild, *buildbotMaster, error ) { 105 c context.Context, msg []byte) ([]*buildbotBuild, *buildbotMaster, error ) {
123 bm := buildMasterMsg{} 106 bm := buildMasterMsg{}
124 if len(msg) == 0 { 107 if len(msg) == 0 {
125 return bm.Builds, bm.Master, nil 108 return bm.Builds, bm.Master, nil
126 } 109 }
127 reader, err := zlib.NewReader(bytes.NewReader(msg)) 110 reader, err := zlib.NewReader(bytes.NewReader(msg))
128 if err != nil { 111 if err != nil {
129 logging.WithError(err).Errorf(c, "gzip decompression error") 112 logging.WithError(err).Errorf(c, "gzip decompression error")
(...skipping 156 matching lines...) Expand 10 before | Expand all | Expand 10 after
286 // PubSubHandler is a webhook that stores the builds coming in from pubsub. 269 // PubSubHandler is a webhook that stores the builds coming in from pubsub.
287 func PubSubHandler(ctx *router.Context) { 270 func PubSubHandler(ctx *router.Context) {
288 statusCode := pubSubHandlerImpl(ctx.Context, ctx.Request) 271 statusCode := pubSubHandlerImpl(ctx.Context, ctx.Request)
289 ctx.Writer.WriteHeader(statusCode) 272 ctx.Writer.WriteHeader(statusCode)
290 } 273 }
291 274
292 // This is the actual implementation of the pubsub handler. Returns 275 // This is the actual implementation of the pubsub handler. Returns
293 // a status code. StatusOK (200) for okay (ACK implied, don't retry). 276 // a status code. StatusOK (200) for okay (ACK implied, don't retry).
294 // Anything else will signal to pubsub to retry. 277 // Anything else will signal to pubsub to retry.
295 func pubSubHandlerImpl(c context.Context, r *http.Request) int { 278 func pubSubHandlerImpl(c context.Context, r *http.Request) int {
296 » msg := pubSubSubscription{} 279 » msg := common.PubSubSubscription{}
297 now := int(clock.Now(c).Unix()) 280 now := int(clock.Now(c).Unix())
298 defer r.Body.Close() 281 defer r.Body.Close()
299 dec := json.NewDecoder(r.Body) 282 dec := json.NewDecoder(r.Body)
300 if err := dec.Decode(&msg); err != nil { 283 if err := dec.Decode(&msg); err != nil {
301 logging.WithError(err).Errorf( 284 logging.WithError(err).Errorf(
302 c, "Could not decode message. %s", err) 285 c, "Could not decode message. %s", err)
303 return http.StatusOK // This is a hard failure, we don't want Pu bSub to retry. 286 return http.StatusOK // This is a hard failure, we don't want Pu bSub to retry.
304 } 287 }
305 internal := true 288 internal := true
306 // Get the name of the subscription on luci-config 289 // Get the name of the subscription on luci-config
(...skipping 91 matching lines...) Expand 10 before | Expand all | Expand 10 after
398 381
399 } 382 }
400 if master != nil { 383 if master != nil {
401 code := doMaster(c, master, internal) 384 code := doMaster(c, master, internal)
402 if code != 0 { 385 if code != 0 {
403 return code 386 return code
404 } 387 }
405 } 388 }
406 return http.StatusOK 389 return http.StatusOK
407 } 390 }
OLDNEW
« no previous file with comments | « milo/api/config/settings.pb.go ('k') | milo/buildsource/buildbot/pubsub_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698