| OLD | NEW |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package tsmon | 5 package tsmon |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "net/http" | 8 "net/http" |
| 9 | 9 |
| 10 "golang.org/x/net/context" | 10 "golang.org/x/net/context" |
| 11 | 11 |
| 12 "github.com/luci/gae/filter/dsQueryBatch" | |
| 13 ds "github.com/luci/gae/service/datastore" | 12 ds "github.com/luci/gae/service/datastore" |
| 14 "github.com/luci/gae/service/info" | 13 "github.com/luci/gae/service/info" |
| 15 | 14 |
| 16 "github.com/luci/luci-go/common/clock" | 15 "github.com/luci/luci-go/common/clock" |
| 17 "github.com/luci/luci-go/common/logging" | 16 "github.com/luci/luci-go/common/logging" |
| 18 "github.com/luci/luci-go/common/tsmon" | 17 "github.com/luci/luci-go/common/tsmon" |
| 19 "github.com/luci/luci-go/server/router" | 18 "github.com/luci/luci-go/server/router" |
| 20 ) | 19 ) |
| 21 | 20 |
| 22 const housekeepingInstanceBatchSize = 200 | |
| 23 | |
| 24 // InstallHandlers installs HTTP handlers for tsmon routes. | 21 // InstallHandlers installs HTTP handlers for tsmon routes. |
| 25 func InstallHandlers(r *router.Router, base router.MiddlewareChain) { | 22 func InstallHandlers(r *router.Router, base router.MiddlewareChain) { |
| 26 r.GET("/internal/cron/ts_mon/housekeeping", base, housekeepingHandler) | 23 r.GET("/internal/cron/ts_mon/housekeeping", base, housekeepingHandler) |
| 27 } | 24 } |
| 28 | 25 |
| 29 // housekeepingHandler is an HTTP handler that should be run every minute by | 26 // housekeepingHandler is an HTTP handler that should be run every minute by |
| 30 // cron on App Engine. It assigns task numbers to datastore entries, and runs | 27 // cron on App Engine. It assigns task numbers to datastore entries, and runs |
| 31 // any global metric callbacks. | 28 // any global metric callbacks. |
| 32 func housekeepingHandler(c *router.Context) { | 29 func housekeepingHandler(c *router.Context) { |
| 33 if !info.IsDevAppServer(c.Context) && c.Request.Header.Get("X-Appengine-
Cron") != "true" { | 30 if !info.IsDevAppServer(c.Context) && c.Request.Header.Get("X-Appengine-
Cron") != "true" { |
| (...skipping 14 matching lines...) Expand all Loading... |
| 48 // and expiring old entities. | 45 // and expiring old entities. |
| 49 func assignTaskNumbers(c context.Context) error { | 46 func assignTaskNumbers(c context.Context) error { |
| 50 c = info.MustNamespace(c, instanceNamespace) | 47 c = info.MustNamespace(c, instanceNamespace) |
| 51 | 48 |
| 52 now := clock.Now(c) | 49 now := clock.Now(c) |
| 53 expiredTime := now.Add(-instanceExpirationTimeout) | 50 expiredTime := now.Add(-instanceExpirationTimeout) |
| 54 | 51 |
| 55 usedTaskNums := map[int]struct{}{} | 52 usedTaskNums := map[int]struct{}{} |
| 56 totalExpired := 0 | 53 totalExpired := 0 |
| 57 | 54 |
| 58 » expiredKeys := make([]*ds.Key, 0, housekeepingInstanceBatchSize) | 55 » expiredKeys := make([]*ds.Key, 0, ds.Raw(c).Constraints().QueryBatchSize
) |
| 59 var unassigned []*instance | 56 var unassigned []*instance |
| 60 | 57 |
| 61 // expireInstanceBatch processes the set of instances in "expiredKeys", | 58 // expireInstanceBatch processes the set of instances in "expiredKeys", |
| 62 // deletes them, and clears the list for the next iteration. | 59 // deletes them, and clears the list for the next iteration. |
| 63 // | 60 // |
| 64 // We do this in batches to handle large numbers without inflating memor
y | 61 // We do this in batches to handle large numbers without inflating memor
y |
| 65 // requirements. If there are any timeouts or problems, this will also e
nable | 62 // requirements. If there are any timeouts or problems, this will also e
nable |
| 66 // us to iteratively chip away at the problem. | 63 // us to iteratively chip away at the problem. |
| 67 expireInstanceBatch := func(c context.Context) error { | 64 expireInstanceBatch := func(c context.Context) error { |
| 68 if len(expiredKeys) == 0 { | 65 if len(expiredKeys) == 0 { |
| 69 return nil | 66 return nil |
| 70 } | 67 } |
| 71 | 68 |
| 72 logging.Debugf(c, "Expiring %d instance(s)", len(expiredKeys)) | 69 logging.Debugf(c, "Expiring %d instance(s)", len(expiredKeys)) |
| 73 if err := ds.Delete(c, expiredKeys); err != nil { | 70 if err := ds.Delete(c, expiredKeys); err != nil { |
| 74 logging.WithError(err).Errorf(c, "Failed to expire insta
nces.") | 71 logging.WithError(err).Errorf(c, "Failed to expire insta
nces.") |
| 75 return err | 72 return err |
| 76 } | 73 } |
| 77 | 74 |
| 78 // Clear the instances list for next round. | 75 // Clear the instances list for next round. |
| 79 totalExpired += len(expiredKeys) | 76 totalExpired += len(expiredKeys) |
| 80 expiredKeys = expiredKeys[:0] | 77 expiredKeys = expiredKeys[:0] |
| 81 return nil | 78 return nil |
| 82 } | 79 } |
| 83 | 80 |
| 84 // Query all instances from datastore. | 81 // Query all instances from datastore. |
| 85 q := ds.NewQuery("Instance") | 82 q := ds.NewQuery("Instance") |
| 86 » if err := ds.Run(dsQueryBatch.BatchQueries(c, housekeepingInstanceBatchS
ize, expireInstanceBatch), q, func(i *instance) { | 83 |
| 84 » b := ds.Batcher{ |
| 85 » » Callback: expireInstanceBatch, |
| 86 » } |
| 87 » if err := b.Run(c, q, func(i *instance) { |
| 87 if i.TaskNum >= 0 { | 88 if i.TaskNum >= 0 { |
| 88 usedTaskNums[i.TaskNum] = struct{}{} | 89 usedTaskNums[i.TaskNum] = struct{}{} |
| 89 } | 90 } |
| 90 if i.LastUpdated.Before(expiredTime) { | 91 if i.LastUpdated.Before(expiredTime) { |
| 91 expiredKeys = append(expiredKeys, ds.NewKey(c, "Instance
", i.ID, 0, nil)) | 92 expiredKeys = append(expiredKeys, ds.NewKey(c, "Instance
", i.ID, 0, nil)) |
| 92 logging.Debugf(c, "Expiring %s task_num %d, inactive sin
ce %s", | 93 logging.Debugf(c, "Expiring %s task_num %d, inactive sin
ce %s", |
| 93 i.ID, i.TaskNum, i.LastUpdated.String()) | 94 i.ID, i.TaskNum, i.LastUpdated.String()) |
| 94 } else if i.TaskNum < 0 { | 95 } else if i.TaskNum < 0 { |
| 95 unassigned = append(unassigned, i) | 96 unassigned = append(unassigned, i) |
| 96 } | 97 } |
| (...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 129 for { | 130 for { |
| 130 n := next | 131 n := next |
| 131 next++ | 132 next++ |
| 132 _, has := used[n] | 133 _, has := used[n] |
| 133 if !has { | 134 if !has { |
| 134 return n | 135 return n |
| 135 } | 136 } |
| 136 } | 137 } |
| 137 } | 138 } |
| 138 } | 139 } |
| OLD | NEW |