| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package tsmon | 5 package tsmon |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "net/http" | 9 "net/http" |
| 10 "strings" | 10 "strings" |
| 11 "sync" | 11 "sync" |
| 12 "time" | 12 "time" |
| 13 | 13 |
| 14 "github.com/golang/protobuf/proto" |
| 14 "github.com/julienschmidt/httprouter" | 15 "github.com/julienschmidt/httprouter" |
| 15 "github.com/luci/gae/service/datastore" | 16 "github.com/luci/gae/service/datastore" |
| 16 "github.com/luci/gae/service/info" | 17 "github.com/luci/gae/service/info" |
| 17 gaeauth "github.com/luci/luci-go/appengine/gaeauth/client" | 18 gaeauth "github.com/luci/luci-go/appengine/gaeauth/client" |
| 18 "github.com/luci/luci-go/common/clock" | 19 "github.com/luci/luci-go/common/clock" |
| 19 "github.com/luci/luci-go/common/gcloud/pubsub" | 20 "github.com/luci/luci-go/common/gcloud/pubsub" |
| 20 "github.com/luci/luci-go/common/logging" | 21 "github.com/luci/luci-go/common/logging" |
| 21 "github.com/luci/luci-go/common/tsmon" | 22 "github.com/luci/luci-go/common/tsmon" |
| 22 "github.com/luci/luci-go/common/tsmon/monitor" | 23 "github.com/luci/luci-go/common/tsmon/monitor" |
| 23 "github.com/luci/luci-go/common/tsmon/store" | 24 "github.com/luci/luci-go/common/tsmon/store" |
| (...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 65 | 66 |
| 66 var err error | 67 var err error |
| 67 mon, err = monitor.NewPubsubMonitor(client, pubsubProject, pubsu
bTopic) | 68 mon, err = monitor.NewPubsubMonitor(client, pubsubProject, pubsu
bTopic) |
| 68 if err != nil { | 69 if err != nil { |
| 69 return err | 70 return err |
| 70 } | 71 } |
| 71 } | 72 } |
| 72 | 73 |
| 73 // Create the target. | 74 // Create the target. |
| 74 tar := &target.Task{ | 75 tar := &target.Task{ |
| 75 » » DataCenter: targetDataCenter, | 76 » » DataCenter: proto.String(targetDataCenter), |
| 76 » » ServiceName: i.AppID(), | 77 » » ServiceName: proto.String(i.AppID()), |
| 77 » » JobName: i.ModuleName(), | 78 » » JobName: proto.String(i.ModuleName()), |
| 78 » » HostName: strings.SplitN(i.VersionID(), ".", 2)[0], | 79 » » HostName: proto.String(strings.SplitN(i.VersionID(), ".", 2)[
0]), |
| 79 » » TaskNum: -1, | 80 » » TaskNum: proto.Int32(-1), |
| 80 } | 81 } |
| 81 | 82 |
| 82 tsmon.Initialize(c, mon, store.NewInMemory(tar)) | 83 tsmon.Initialize(c, mon, store.NewInMemory(tar)) |
| 83 return nil | 84 return nil |
| 84 } | 85 } |
| 85 | 86 |
| 86 func flushIfNeeded(c context.Context) { | 87 func flushIfNeeded(c context.Context) { |
| 87 if !updateLastFlushed(c) { | 88 if !updateLastFlushed(c) { |
| 88 return | 89 return |
| 89 } | 90 } |
| (...skipping 24 matching lines...) Expand all Loading... |
| 114 if !ok { | 115 if !ok { |
| 115 // tsmon probably failed to initialize - just do nothing. | 116 // tsmon probably failed to initialize - just do nothing. |
| 116 return fmt.Errorf("default tsmon target is not a Task: %v", tsmo
n.Store(c).DefaultTarget()) | 117 return fmt.Errorf("default tsmon target is not a Task: %v", tsmo
n.Store(c).DefaultTarget()) |
| 117 } | 118 } |
| 118 | 119 |
| 119 logger := logging.Get(c) | 120 logger := logging.Get(c) |
| 120 entity := getOrCreateInstanceEntity(c) | 121 entity := getOrCreateInstanceEntity(c) |
| 121 now := clock.Now(c) | 122 now := clock.Now(c) |
| 122 | 123 |
| 123 if entity.TaskNum < 0 { | 124 if entity.TaskNum < 0 { |
| 124 » » if task.TaskNum >= 0 { | 125 » » if *task.TaskNum >= 0 { |
| 125 // We used to have a task number but we don't any more (
we were inactive | 126 // We used to have a task number but we don't any more (
we were inactive |
| 126 // for too long), so clear our state. | 127 // for too long), so clear our state. |
| 127 logging.Warningf(c, "Instance %s got purged from Datasto
re, but is still alive. "+ | 128 logging.Warningf(c, "Instance %s got purged from Datasto
re, but is still alive. "+ |
| 128 "Clearing cumulative metrics", info.Get(c).Insta
nceID()) | 129 "Clearing cumulative metrics", info.Get(c).Insta
nceID()) |
| 129 tsmon.ResetCumulativeMetrics(c) | 130 tsmon.ResetCumulativeMetrics(c) |
| 130 } | 131 } |
| 131 » » task.TaskNum = -1 | 132 » » task.TaskNum = proto.Int32(-1) |
| 132 lastFlushed.Time = entity.LastUpdated | 133 lastFlushed.Time = entity.LastUpdated |
| 133 | 134 |
| 134 // Start complaining if we haven't been given a task number afte
r some time. | 135 // Start complaining if we haven't been given a task number afte
r some time. |
| 135 shouldHaveTaskNumBy := entity.LastUpdated.Add(instanceExpectedTo
HaveTaskNum) | 136 shouldHaveTaskNumBy := entity.LastUpdated.Add(instanceExpectedTo
HaveTaskNum) |
| 136 if shouldHaveTaskNumBy.Before(now) { | 137 if shouldHaveTaskNumBy.Before(now) { |
| 137 logger.Warningf("Instance %s is %s old with no task_num.
", | 138 logger.Warningf("Instance %s is %s old with no task_num.
", |
| 138 info.Get(c).InstanceID(), now.Sub(shouldHaveTask
NumBy).String()) | 139 info.Get(c).InstanceID(), now.Sub(shouldHaveTask
NumBy).String()) |
| 139 } | 140 } |
| 140 return nil | 141 return nil |
| 141 } | 142 } |
| 142 | 143 |
| 143 » task.TaskNum = int32(entity.TaskNum) | 144 » task.TaskNum = proto.Int32(int32(entity.TaskNum)) |
| 144 tsmon.Store(c).SetDefaultTarget(task) | 145 tsmon.Store(c).SetDefaultTarget(task) |
| 145 | 146 |
| 146 // Update the instance entity and put it back in the datastore asynchron
ously. | 147 // Update the instance entity and put it back in the datastore asynchron
ously. |
| 147 entity.LastUpdated = now | 148 entity.LastUpdated = now |
| 148 putDone := make(chan struct{}) | 149 putDone := make(chan struct{}) |
| 149 go func() { | 150 go func() { |
| 150 defer close(putDone) | 151 defer close(putDone) |
| 151 if err := datastore.Get(c).Put(entity); err != nil { | 152 if err := datastore.Get(c).Put(entity); err != nil { |
| 152 logger.Errorf("Failed to update instance entity: %s", er
r) | 153 logger.Errorf("Failed to update instance entity: %s", er
r) |
| 153 } | 154 } |
| 154 }() | 155 }() |
| 155 | 156 |
| 156 ret := tsmon.Flush(c) | 157 ret := tsmon.Flush(c) |
| 157 resetGlobalCallbackMetrics(c) | 158 resetGlobalCallbackMetrics(c) |
| 158 | 159 |
| 159 <-putDone | 160 <-putDone |
| 160 return ret | 161 return ret |
| 161 } | 162 } |
| OLD | NEW |