Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(32)

Side by Side Diff: appengine/tsmon/middleware.go

Issue 1857643003: Revert of Migrate tsmon protos to proto3 (Closed) Base URL: git@github.com:luci/luci-go.git@master
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « appengine/tsmon/global_callback_test.go ('k') | appengine/tsmon/middleware_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package tsmon 5 package tsmon
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 "net/http" 9 "net/http"
10 "strings" 10 "strings"
11 "sync" 11 "sync"
12 "time" 12 "time"
13 13
14 "github.com/golang/protobuf/proto"
14 "github.com/julienschmidt/httprouter" 15 "github.com/julienschmidt/httprouter"
15 "github.com/luci/gae/service/datastore" 16 "github.com/luci/gae/service/datastore"
16 "github.com/luci/gae/service/info" 17 "github.com/luci/gae/service/info"
17 gaeauth "github.com/luci/luci-go/appengine/gaeauth/client" 18 gaeauth "github.com/luci/luci-go/appengine/gaeauth/client"
18 "github.com/luci/luci-go/common/clock" 19 "github.com/luci/luci-go/common/clock"
19 "github.com/luci/luci-go/common/gcloud/pubsub" 20 "github.com/luci/luci-go/common/gcloud/pubsub"
20 "github.com/luci/luci-go/common/logging" 21 "github.com/luci/luci-go/common/logging"
21 "github.com/luci/luci-go/common/tsmon" 22 "github.com/luci/luci-go/common/tsmon"
22 "github.com/luci/luci-go/common/tsmon/monitor" 23 "github.com/luci/luci-go/common/tsmon/monitor"
23 "github.com/luci/luci-go/common/tsmon/store" 24 "github.com/luci/luci-go/common/tsmon/store"
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
65 66
66 var err error 67 var err error
67 mon, err = monitor.NewPubsubMonitor(client, pubsubProject, pubsu bTopic) 68 mon, err = monitor.NewPubsubMonitor(client, pubsubProject, pubsu bTopic)
68 if err != nil { 69 if err != nil {
69 return err 70 return err
70 } 71 }
71 } 72 }
72 73
73 // Create the target. 74 // Create the target.
74 tar := &target.Task{ 75 tar := &target.Task{
75 » » DataCenter: targetDataCenter, 76 » » DataCenter: proto.String(targetDataCenter),
76 » » ServiceName: i.AppID(), 77 » » ServiceName: proto.String(i.AppID()),
77 » » JobName: i.ModuleName(), 78 » » JobName: proto.String(i.ModuleName()),
78 » » HostName: strings.SplitN(i.VersionID(), ".", 2)[0], 79 » » HostName: proto.String(strings.SplitN(i.VersionID(), ".", 2)[ 0]),
79 » » TaskNum: -1, 80 » » TaskNum: proto.Int32(-1),
80 } 81 }
81 82
82 tsmon.Initialize(c, mon, store.NewInMemory(tar)) 83 tsmon.Initialize(c, mon, store.NewInMemory(tar))
83 return nil 84 return nil
84 } 85 }
85 86
86 func flushIfNeeded(c context.Context) { 87 func flushIfNeeded(c context.Context) {
87 if !updateLastFlushed(c) { 88 if !updateLastFlushed(c) {
88 return 89 return
89 } 90 }
(...skipping 24 matching lines...) Expand all
114 if !ok { 115 if !ok {
115 // tsmon probably failed to initialize - just do nothing. 116 // tsmon probably failed to initialize - just do nothing.
116 return fmt.Errorf("default tsmon target is not a Task: %v", tsmo n.Store(c).DefaultTarget()) 117 return fmt.Errorf("default tsmon target is not a Task: %v", tsmo n.Store(c).DefaultTarget())
117 } 118 }
118 119
119 logger := logging.Get(c) 120 logger := logging.Get(c)
120 entity := getOrCreateInstanceEntity(c) 121 entity := getOrCreateInstanceEntity(c)
121 now := clock.Now(c) 122 now := clock.Now(c)
122 123
123 if entity.TaskNum < 0 { 124 if entity.TaskNum < 0 {
124 » » if task.TaskNum >= 0 { 125 » » if *task.TaskNum >= 0 {
125 // We used to have a task number but we don't any more ( we were inactive 126 // We used to have a task number but we don't any more ( we were inactive
126 // for too long), so clear our state. 127 // for too long), so clear our state.
127 logging.Warningf(c, "Instance %s got purged from Datasto re, but is still alive. "+ 128 logging.Warningf(c, "Instance %s got purged from Datasto re, but is still alive. "+
128 "Clearing cumulative metrics", info.Get(c).Insta nceID()) 129 "Clearing cumulative metrics", info.Get(c).Insta nceID())
129 tsmon.ResetCumulativeMetrics(c) 130 tsmon.ResetCumulativeMetrics(c)
130 } 131 }
131 » » task.TaskNum = -1 132 » » task.TaskNum = proto.Int32(-1)
132 lastFlushed.Time = entity.LastUpdated 133 lastFlushed.Time = entity.LastUpdated
133 134
134 // Start complaining if we haven't been given a task number afte r some time. 135 // Start complaining if we haven't been given a task number afte r some time.
135 shouldHaveTaskNumBy := entity.LastUpdated.Add(instanceExpectedTo HaveTaskNum) 136 shouldHaveTaskNumBy := entity.LastUpdated.Add(instanceExpectedTo HaveTaskNum)
136 if shouldHaveTaskNumBy.Before(now) { 137 if shouldHaveTaskNumBy.Before(now) {
137 logger.Warningf("Instance %s is %s old with no task_num. ", 138 logger.Warningf("Instance %s is %s old with no task_num. ",
138 info.Get(c).InstanceID(), now.Sub(shouldHaveTask NumBy).String()) 139 info.Get(c).InstanceID(), now.Sub(shouldHaveTask NumBy).String())
139 } 140 }
140 return nil 141 return nil
141 } 142 }
142 143
143 » task.TaskNum = int32(entity.TaskNum) 144 » task.TaskNum = proto.Int32(int32(entity.TaskNum))
144 tsmon.Store(c).SetDefaultTarget(task) 145 tsmon.Store(c).SetDefaultTarget(task)
145 146
146 // Update the instance entity and put it back in the datastore asynchron ously. 147 // Update the instance entity and put it back in the datastore asynchron ously.
147 entity.LastUpdated = now 148 entity.LastUpdated = now
148 putDone := make(chan struct{}) 149 putDone := make(chan struct{})
149 go func() { 150 go func() {
150 defer close(putDone) 151 defer close(putDone)
151 if err := datastore.Get(c).Put(entity); err != nil { 152 if err := datastore.Get(c).Put(entity); err != nil {
152 logger.Errorf("Failed to update instance entity: %s", er r) 153 logger.Errorf("Failed to update instance entity: %s", er r)
153 } 154 }
154 }() 155 }()
155 156
156 ret := tsmon.Flush(c) 157 ret := tsmon.Flush(c)
157 resetGlobalCallbackMetrics(c) 158 resetGlobalCallbackMetrics(c)
158 159
159 <-putDone 160 <-putDone
160 return ret 161 return ret
161 } 162 }
OLDNEW
« no previous file with comments | « appengine/tsmon/global_callback_test.go ('k') | appengine/tsmon/middleware_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698