| OLD | NEW |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package tsmon | 5 package tsmon |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "errors" | 8 "errors" |
| 9 "sync" | 9 "sync" |
| 10 | 10 |
| 11 "golang.org/x/net/context" | 11 "golang.org/x/net/context" |
| 12 | 12 |
| 13 "github.com/luci/luci-go/common/logging" | 13 "github.com/luci/luci-go/common/logging" |
| 14 "github.com/luci/luci-go/common/tsmon/monitor" | 14 "github.com/luci/luci-go/common/tsmon/monitor" |
| 15 "github.com/luci/luci-go/common/tsmon/store" | 15 "github.com/luci/luci-go/common/tsmon/store" |
| 16 "github.com/luci/luci-go/common/tsmon/target" | 16 "github.com/luci/luci-go/common/tsmon/target" |
| 17 "github.com/luci/luci-go/common/tsmon/types" | 17 "github.com/luci/luci-go/common/tsmon/types" |
| 18 ) | 18 ) |
| 19 | 19 |
| 20 // State holds the configuration of the tsmon library. There is one global | 20 // State holds the configuration of the tsmon library. There is one global |
| 21 // instance of State, but it can be overridden in a Context by tests. | 21 // instance of State, but it can be overridden in a Context by tests. |
| 22 type State struct { | 22 type State struct { |
| 23 S store.Store | 23 S store.Store |
| 24 M monitor.Monitor | 24 M monitor.Monitor |
| 25 Flusher *autoFlusher | 25 Flusher *autoFlusher |
| 26 | 26 |
| 27 RegisteredMetrics map[string]types.Metric | 27 RegisteredMetrics map[string]types.Metric |
| 28 RegisteredMetricsLock sync.RWMutex | 28 RegisteredMetricsLock sync.RWMutex |
| 29 | 29 |
| 30 » CallbacksMutex sync.RWMutex | 30 » CallbacksMutex sync.RWMutex |
| 31 » Callbacks []Callback | 31 » Callbacks []Callback |
| 32 » GlobalCallbacks []GlobalCallback | 32 » GlobalCallbacks []GlobalCallback |
| 33 » InvokeGlobalCallbacksOnFlush bool |
| 33 } | 34 } |
| 34 | 35 |
| 35 // SetStore changes the metric store. All metrics that were registered with | 36 // SetStore changes the metric store. All metrics that were registered with |
| 36 // the old store will be re-registered on the new store. | 37 // the old store will be re-registered on the new store. |
| 37 func (state *State) SetStore(s store.Store) { | 38 func (state *State) SetStore(s store.Store) { |
| 38 oldStore := state.S | 39 oldStore := state.S |
| 39 if s == oldStore { | 40 if s == oldStore { |
| 40 return | 41 return |
| 41 } | 42 } |
| 42 | 43 |
| (...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 76 // See RegisterGlobalCallback for more info. | 77 // See RegisterGlobalCallback for more info. |
| 77 func (state *State) RunGlobalCallbacks(c context.Context) { | 78 func (state *State) RunGlobalCallbacks(c context.Context) { |
| 78 state.CallbacksMutex.RLock() | 79 state.CallbacksMutex.RLock() |
| 79 defer state.CallbacksMutex.RUnlock() | 80 defer state.CallbacksMutex.RUnlock() |
| 80 | 81 |
| 81 for _, gcp := range state.GlobalCallbacks { | 82 for _, gcp := range state.GlobalCallbacks { |
| 82 gcp.Callback(c) | 83 gcp.Callback(c) |
| 83 } | 84 } |
| 84 } | 85 } |
| 85 | 86 |
| 86 // ResetGlobalCallbackMetrics resets metrics produced by global callbacks. | 87 // resetGlobalCallbackMetrics resets metrics produced by global callbacks. |
| 87 // | 88 // |
| 88 // See RegisterGlobalCallback for more info. | 89 // See RegisterGlobalCallback for more info. |
| 89 func (state *State) ResetGlobalCallbackMetrics(c context.Context) { | 90 func (state *State) resetGlobalCallbackMetrics(c context.Context) { |
| 90 state.CallbacksMutex.RLock() | 91 state.CallbacksMutex.RLock() |
| 91 defer state.CallbacksMutex.RUnlock() | 92 defer state.CallbacksMutex.RUnlock() |
| 92 | 93 |
| 93 for _, gcp := range state.GlobalCallbacks { | 94 for _, gcp := range state.GlobalCallbacks { |
| 94 for _, m := range gcp.Metrics { | 95 for _, m := range gcp.Metrics { |
| 95 state.S.Reset(c, m) | 96 state.S.Reset(c, m) |
| 96 } | 97 } |
| 97 } | 98 } |
| 98 } | 99 } |
| 99 | 100 |
| (...skipping 11 matching lines...) Expand all Loading... |
| 111 if mon == nil { | 112 if mon == nil { |
| 112 mon = state.M | 113 mon = state.M |
| 113 } | 114 } |
| 114 if mon == nil { | 115 if mon == nil { |
| 115 return errors.New("no tsmon Monitor is configured") | 116 return errors.New("no tsmon Monitor is configured") |
| 116 } | 117 } |
| 117 | 118 |
| 118 // Run any callbacks that have been registered to populate values in cal
lback | 119 // Run any callbacks that have been registered to populate values in cal
lback |
| 119 // metrics. | 120 // metrics. |
| 120 state.runCallbacks(c) | 121 state.runCallbacks(c) |
| 122 if state.InvokeGlobalCallbacksOnFlush { |
| 123 state.RunGlobalCallbacks(c) |
| 124 } |
| 121 | 125 |
| 122 cells := state.S.GetAll(c) | 126 cells := state.S.GetAll(c) |
| 123 if len(cells) == 0 { | 127 if len(cells) == 0 { |
| 124 return nil | 128 return nil |
| 125 } | 129 } |
| 126 | 130 |
| 127 logging.Debugf(c, "Starting tsmon flush: %d cells", len(cells)) | 131 logging.Debugf(c, "Starting tsmon flush: %d cells", len(cells)) |
| 128 defer logging.Debugf(c, "Tsmon flush finished") | 132 defer logging.Debugf(c, "Tsmon flush finished") |
| 129 | 133 |
| 130 // Split up the payload into chunks if there are too many cells. | 134 // Split up the payload into chunks if there are too many cells. |
| 131 chunkSize := mon.ChunkSize() | 135 chunkSize := mon.ChunkSize() |
| 132 if chunkSize == 0 { | 136 if chunkSize == 0 { |
| 133 chunkSize = len(cells) | 137 chunkSize = len(cells) |
| 134 } | 138 } |
| 135 | 139 |
| 136 sent := 0 | 140 sent := 0 |
| 137 var lastErr error | 141 var lastErr error |
| 138 for len(cells) > 0 { | 142 for len(cells) > 0 { |
| 139 count := minInt(chunkSize, len(cells)) | 143 count := minInt(chunkSize, len(cells)) |
| 140 if err := mon.Send(c, cells[:count]); err != nil { | 144 if err := mon.Send(c, cells[:count]); err != nil { |
| 141 logging.Errorf(c, "Failed to send %d cells: %v", count,
err) | 145 logging.Errorf(c, "Failed to send %d cells: %v", count,
err) |
| 142 lastErr = err | 146 lastErr = err |
| 143 // Continue anyway. | 147 // Continue anyway. |
| 144 } | 148 } |
| 145 cells = cells[count:] | 149 cells = cells[count:] |
| 146 sent += count | 150 sent += count |
| 147 } | 151 } |
| 152 state.resetGlobalCallbackMetrics(c) |
| 148 return lastErr | 153 return lastErr |
| 149 } | 154 } |
| 150 | 155 |
| 151 // runCallbacks runs any callbacks that have been registered to populate values | 156 // runCallbacks runs any callbacks that have been registered to populate values |
| 152 // in callback metrics. | 157 // in callback metrics. |
| 153 func (state *State) runCallbacks(c context.Context) { | 158 func (state *State) runCallbacks(c context.Context) { |
| 154 state.CallbacksMutex.RLock() | 159 state.CallbacksMutex.RLock() |
| 155 defer state.CallbacksMutex.RUnlock() | 160 defer state.CallbacksMutex.RUnlock() |
| 156 | 161 |
| 157 for _, f := range state.Callbacks { | 162 for _, f := range state.Callbacks { |
| (...skipping 14 matching lines...) Expand all Loading... |
| 172 func WithState(c context.Context, s *State) context.Context { | 177 func WithState(c context.Context, s *State) context.Context { |
| 173 return context.WithValue(c, stateKey, s) | 178 return context.WithValue(c, stateKey, s) |
| 174 } | 179 } |
| 175 | 180 |
| 176 // WithFakes returns a new context holding a new State with a fake store and a | 181 // WithFakes returns a new context holding a new State with a fake store and a |
| 177 // fake monitor. | 182 // fake monitor. |
| 178 func WithFakes(c context.Context) (context.Context, *store.Fake, *monitor.Fake)
{ | 183 func WithFakes(c context.Context) (context.Context, *store.Fake, *monitor.Fake)
{ |
| 179 s := &store.Fake{} | 184 s := &store.Fake{} |
| 180 m := &monitor.Fake{} | 185 m := &monitor.Fake{} |
| 181 return WithState(c, &State{ | 186 return WithState(c, &State{ |
| 182 » » S: s, | 187 » » S: s, |
| 183 » » M: m, | 188 » » M: m, |
| 184 » » RegisteredMetrics: map[string]types.Metric{}, | 189 » » RegisteredMetrics: map[string]types.Metric{}, |
| 190 » » InvokeGlobalCallbacksOnFlush: true, |
| 185 }), s, m | 191 }), s, m |
| 186 } | 192 } |
| 187 | 193 |
| 188 // WithDummyInMemory returns a new context holding a new State with a new in- | 194 // WithDummyInMemory returns a new context holding a new State with a new in- |
| 189 // memory store and a fake monitor. | 195 // memory store and a fake monitor. |
| 190 func WithDummyInMemory(c context.Context) (context.Context, *monitor.Fake) { | 196 func WithDummyInMemory(c context.Context) (context.Context, *monitor.Fake) { |
| 191 m := &monitor.Fake{} | 197 m := &monitor.Fake{} |
| 192 return WithState(c, &State{ | 198 return WithState(c, &State{ |
| 193 » » S: store.NewInMemory(&target.Task{}), | 199 » » S: store.NewInMemory(&target.Task{}), |
| 194 » » M: m, | 200 » » M: m, |
| 195 » » RegisteredMetrics: map[string]types.Metric{}, | 201 » » RegisteredMetrics: map[string]types.Metric{}, |
| 202 » » InvokeGlobalCallbacksOnFlush: true, |
| 196 }), m | 203 }), m |
| 197 } | 204 } |
| 198 | 205 |
| 199 type key int | 206 type key int |
| 200 | 207 |
| 201 var ( | 208 var ( |
| 202 globalState = NewState() | 209 globalState = NewState() |
| 203 stateKey key = 1 | 210 stateKey key = 1 |
| 204 ) | 211 ) |
| 205 | 212 |
| 206 // NewState returns a new default State, configured with a nil Store and | 213 // NewState returns a new default State, configured with a nil Store and |
| 207 // Monitor. | 214 // Monitor. |
| 208 func NewState() *State { | 215 func NewState() *State { |
| 209 return &State{ | 216 return &State{ |
| 210 » » S: store.NewNilStore(), | 217 » » S: store.NewNilStore(), |
| 211 » » M: monitor.NewNilMonitor(), | 218 » » M: monitor.NewNilMonitor(), |
| 212 » » RegisteredMetrics: map[string]types.Metric{}, | 219 » » RegisteredMetrics: map[string]types.Metric{}, |
| 220 » » InvokeGlobalCallbacksOnFlush: true, |
| 213 } | 221 } |
| 214 } | 222 } |
| OLD | NEW |