| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package tsmon | 5 package tsmon |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "net/http" | 9 "net/http" |
| 10 "strings" | 10 "strings" |
| 11 "sync" | 11 "sync" |
| 12 "time" | 12 "time" |
| 13 | 13 |
| 14 "github.com/golang/protobuf/proto" | |
| 15 "github.com/julienschmidt/httprouter" | 14 "github.com/julienschmidt/httprouter" |
| 16 "github.com/luci/gae/service/datastore" | 15 "github.com/luci/gae/service/datastore" |
| 17 "github.com/luci/gae/service/info" | 16 "github.com/luci/gae/service/info" |
| 18 gaeauth "github.com/luci/luci-go/appengine/gaeauth/client" | 17 gaeauth "github.com/luci/luci-go/appengine/gaeauth/client" |
| 19 "github.com/luci/luci-go/common/clock" | 18 "github.com/luci/luci-go/common/clock" |
| 20 "github.com/luci/luci-go/common/gcloud/pubsub" | 19 "github.com/luci/luci-go/common/gcloud/pubsub" |
| 21 "github.com/luci/luci-go/common/logging" | 20 "github.com/luci/luci-go/common/logging" |
| 22 "github.com/luci/luci-go/common/tsmon" | 21 "github.com/luci/luci-go/common/tsmon" |
| 23 "github.com/luci/luci-go/common/tsmon/monitor" | 22 "github.com/luci/luci-go/common/tsmon/monitor" |
| 24 "github.com/luci/luci-go/common/tsmon/store" | 23 "github.com/luci/luci-go/common/tsmon/store" |
| (...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 66 | 65 |
| 67 var err error | 66 var err error |
| 68 mon, err = monitor.NewPubsubMonitor(client, pubsubProject, pubsu
bTopic) | 67 mon, err = monitor.NewPubsubMonitor(client, pubsubProject, pubsu
bTopic) |
| 69 if err != nil { | 68 if err != nil { |
| 70 return err | 69 return err |
| 71 } | 70 } |
| 72 } | 71 } |
| 73 | 72 |
| 74 // Create the target. | 73 // Create the target. |
| 75 tar := &target.Task{ | 74 tar := &target.Task{ |
| 76 » » DataCenter: proto.String(targetDataCenter), | 75 » » DataCenter: targetDataCenter, |
| 77 » » ServiceName: proto.String(i.AppID()), | 76 » » ServiceName: i.AppID(), |
| 78 » » JobName: proto.String(i.ModuleName()), | 77 » » JobName: i.ModuleName(), |
| 79 » » HostName: proto.String(strings.SplitN(i.VersionID(), ".", 2)[
0]), | 78 » » HostName: strings.SplitN(i.VersionID(), ".", 2)[0], |
| 80 » » TaskNum: proto.Int32(-1), | 79 » » TaskNum: -1, |
| 81 } | 80 } |
| 82 | 81 |
| 83 tsmon.Initialize(c, mon, store.NewInMemory(tar)) | 82 tsmon.Initialize(c, mon, store.NewInMemory(tar)) |
| 84 return nil | 83 return nil |
| 85 } | 84 } |
| 86 | 85 |
| 87 func flushIfNeeded(c context.Context) { | 86 func flushIfNeeded(c context.Context) { |
| 88 if !updateLastFlushed(c) { | 87 if !updateLastFlushed(c) { |
| 89 return | 88 return |
| 90 } | 89 } |
| (...skipping 24 matching lines...) Expand all Loading... |
| 115 if !ok { | 114 if !ok { |
| 116 // tsmon probably failed to initialize - just do nothing. | 115 // tsmon probably failed to initialize - just do nothing. |
| 117 return fmt.Errorf("default tsmon target is not a Task: %v", tsmo
n.Store(c).DefaultTarget()) | 116 return fmt.Errorf("default tsmon target is not a Task: %v", tsmo
n.Store(c).DefaultTarget()) |
| 118 } | 117 } |
| 119 | 118 |
| 120 logger := logging.Get(c) | 119 logger := logging.Get(c) |
| 121 entity := getOrCreateInstanceEntity(c) | 120 entity := getOrCreateInstanceEntity(c) |
| 122 now := clock.Now(c) | 121 now := clock.Now(c) |
| 123 | 122 |
| 124 if entity.TaskNum < 0 { | 123 if entity.TaskNum < 0 { |
| 125 » » if *task.TaskNum >= 0 { | 124 » » if task.TaskNum >= 0 { |
| 126 // We used to have a task number but we don't any more (
we were inactive | 125 // We used to have a task number but we don't any more (
we were inactive |
| 127 // for too long), so clear our state. | 126 // for too long), so clear our state. |
| 128 logging.Warningf(c, "Instance %s got purged from Datasto
re, but is still alive. "+ | 127 logging.Warningf(c, "Instance %s got purged from Datasto
re, but is still alive. "+ |
| 129 "Clearing cumulative metrics", info.Get(c).Insta
nceID()) | 128 "Clearing cumulative metrics", info.Get(c).Insta
nceID()) |
| 130 tsmon.ResetCumulativeMetrics(c) | 129 tsmon.ResetCumulativeMetrics(c) |
| 131 } | 130 } |
| 132 » » task.TaskNum = proto.Int32(-1) | 131 » » task.TaskNum = -1 |
| 133 lastFlushed.Time = entity.LastUpdated | 132 lastFlushed.Time = entity.LastUpdated |
| 134 | 133 |
| 135 // Start complaining if we haven't been given a task number afte
r some time. | 134 // Start complaining if we haven't been given a task number afte
r some time. |
| 136 shouldHaveTaskNumBy := entity.LastUpdated.Add(instanceExpectedTo
HaveTaskNum) | 135 shouldHaveTaskNumBy := entity.LastUpdated.Add(instanceExpectedTo
HaveTaskNum) |
| 137 if shouldHaveTaskNumBy.Before(now) { | 136 if shouldHaveTaskNumBy.Before(now) { |
| 138 logger.Warningf("Instance %s is %s old with no task_num.
", | 137 logger.Warningf("Instance %s is %s old with no task_num.
", |
| 139 info.Get(c).InstanceID(), now.Sub(shouldHaveTask
NumBy).String()) | 138 info.Get(c).InstanceID(), now.Sub(shouldHaveTask
NumBy).String()) |
| 140 } | 139 } |
| 141 return nil | 140 return nil |
| 142 } | 141 } |
| 143 | 142 |
| 144 » task.TaskNum = proto.Int32(int32(entity.TaskNum)) | 143 » task.TaskNum = int32(entity.TaskNum) |
| 145 tsmon.Store(c).SetDefaultTarget(task) | 144 tsmon.Store(c).SetDefaultTarget(task) |
| 146 | 145 |
| 147 // Update the instance entity and put it back in the datastore asynchron
ously. | 146 // Update the instance entity and put it back in the datastore asynchron
ously. |
| 148 entity.LastUpdated = now | 147 entity.LastUpdated = now |
| 149 putDone := make(chan struct{}) | 148 putDone := make(chan struct{}) |
| 150 go func() { | 149 go func() { |
| 151 defer close(putDone) | 150 defer close(putDone) |
| 152 if err := datastore.Get(c).Put(entity); err != nil { | 151 if err := datastore.Get(c).Put(entity); err != nil { |
| 153 logger.Errorf("Failed to update instance entity: %s", er
r) | 152 logger.Errorf("Failed to update instance entity: %s", er
r) |
| 154 } | 153 } |
| 155 }() | 154 }() |
| 156 | 155 |
| 157 ret := tsmon.Flush(c) | 156 ret := tsmon.Flush(c) |
| 158 resetGlobalCallbackMetrics(c) | 157 resetGlobalCallbackMetrics(c) |
| 159 | 158 |
| 160 <-putDone | 159 <-putDone |
| 161 return ret | 160 return ret |
| 162 } | 161 } |
| OLD | NEW |