OLD | NEW |
1 # Copyright 2015 The Chromium Authors. All rights reserved. | 1 # Copyright 2015 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 import collections | 5 import collections |
6 import copy | 6 import copy |
7 import functools | 7 import functools |
8 import logging | 8 import logging |
9 import operator | |
10 import random | 9 import random |
11 import threading | 10 import threading |
12 | 11 |
13 from google.appengine.api import memcache | 12 from google.appengine.api import memcache |
14 from google.appengine.api import modules | 13 from google.appengine.api import modules |
15 from google.appengine.ext import ndb | 14 from google.appengine.ext import ndb |
16 | 15 |
17 from infra_libs.ts_mon.common import errors | 16 from infra_libs.ts_mon.common import errors |
18 from infra_libs.ts_mon.common import metric_store | 17 from infra_libs.ts_mon.common import metric_store |
19 from infra_libs.ts_mon.common import metrics | 18 from infra_libs.ts_mon.common import metrics |
(...skipping 154 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
174 if key in shard_map: | 173 if key in shard_map: |
175 # This row is one shard of a sharded metric - put the shard number | 174 # This row is one shard of a sharded metric - put the shard number |
176 # in the task number. | 175 # in the task number. |
177 target.task_num, metric_name = shard_map[key] | 176 target.task_num, metric_name = shard_map[key] |
178 else: | 177 else: |
179 target.task_num, metric_name = 0, key | 178 target.task_num, metric_name = 0, key |
180 | 179 |
181 yield (copy.copy(target), entities[metric_name].metric, start_time, | 180 yield (copy.copy(target), entities[metric_name].metric, start_time, |
182 fields_values) | 181 fields_values) |
183 | 182 |
184 def _apply_all(self, callables, arg): | 183 def _apply_all(self, modifications, entry): |
185 ret = arg | 184 """Applies all the modifications, in order, to a memcache entry. |
186 for fn in callables: | |
187 ret = fn(ret) | |
188 return ret | |
189 | 185 |
190 def _compare_and_set(self, metric_modify_fns, namespace): | 186 All modifications must be for the same metric.""" |
| 187 |
| 188 if not modifications: # pragma: no cover |
| 189 return entry |
| 190 |
| 191 # All modifications should be for the same metric. |
| 192 name = modifications[0].name |
| 193 |
| 194 if entry is None: |
| 195 entry = (self._start_time(name), {}) |
| 196 |
| 197 _, targets = entry |
| 198 |
| 199 target_key = self._target_key() |
| 200 values = targets.setdefault(target_key, {}) |
| 201 |
| 202 for mod in modifications: |
| 203 value = values.get(mod.fields, 0) |
| 204 |
| 205 if mod.mod_type == 'set': |
| 206 new_value, enforce_ge = mod.args |
| 207 if enforce_ge and new_value < value: |
| 208 raise errors.MonitoringDecreasingValueError(name, value, new_value) |
| 209 value = new_value |
| 210 elif mod.mod_type == 'incr': |
| 211 delta, modify_fn = mod.args |
| 212 if modify_fn is None: |
| 213 modify_fn = metric_store.default_modify_fn(name) |
| 214 value = modify_fn(value, delta) |
| 215 else: |
| 216 raise errors.UnknownModificationTypeError(mod.mod_type) |
| 217 |
| 218 values[mod.fields] = value |
| 219 |
| 220 return entry |
| 221 |
| 222 def _compare_and_set(self, modifications, namespace): |
191 client = self._client() | 223 client = self._client() |
192 | 224 |
193 # Metrics that we haven't updated yet. Metrics are removed from this dict | 225 # Metrics that we haven't updated yet. Metrics are removed from this dict |
194 # when they're successfully updated - if there are any left they will be | 226 # when they're successfully updated - if there are any left they will be |
195 # retried 10 times until everything has been updated. | 227 # retried 10 times until everything has been updated. |
196 # We might have more than one modify_fn for a metric if different field | 228 # We might have more than one modification for a metric if different field |
197 # values were updated. | 229 # values were updated. |
198 metrics = {} | |
199 remaining = collections.defaultdict(list) | 230 remaining = collections.defaultdict(list) |
200 for metric, modify_fns in metric_modify_fns: | 231 for modification in modifications: |
201 metrics[metric.name] = metric | 232 remaining[modification.name].append(modification) |
202 remaining[metric.name].append(modify_fns) | |
203 | 233 |
204 failed_keys_count = 0 | 234 failed_keys_count = 0 |
205 | 235 |
206 for _ in xrange(self.CAS_RETRIES): | 236 for _ in xrange(self.CAS_RETRIES): |
207 keys = [] | 237 keys = [] |
208 key_map = {} # key -> metric name (for sharded metrics) | 238 key_map = {} # key -> metric name (for sharded metrics) |
209 | 239 |
210 for name in remaining: | 240 for name in remaining: |
211 # Pick one of the shards to modify. | 241 # Pick one of the shards to modify. |
212 key = self._random_shard(metrics[name]) | 242 key = self._random_shard(self._state.metrics[name]) |
213 keys.append(key) | 243 keys.append(key) |
214 key_map[key] = name | 244 key_map[key] = name |
215 | 245 |
216 # Get all existing entries. | 246 # Get all existing entries. |
217 cas_mapping = {} # key -> new value (for existing entries) | 247 cas_mapping = {} # key -> new value (for existing entries) |
218 add_mapping = {} # key -> new value (for new entries) | 248 add_mapping = {} # key -> new value (for new entries) |
219 | 249 |
220 entries = client.get_multi(keys, for_cas=True, namespace=namespace) | 250 entries = client.get_multi(keys, for_cas=True, namespace=namespace) |
221 for key, entry in entries.iteritems(): | 251 for key, entry in entries.iteritems(): |
222 cas_mapping[key] = self._apply_all(remaining[key_map[key]], entry) | 252 cas_mapping[key] = self._apply_all(remaining[key_map[key]], entry) |
(...skipping 23 matching lines...) Expand all Loading... |
246 remaining = still_remaining | 276 remaining = still_remaining |
247 else: | 277 else: |
248 logging.warning( | 278 logging.warning( |
249 'Memcache compare-and-set failed %d times for keys %s in ' | 279 'Memcache compare-and-set failed %d times for keys %s in ' |
250 'namespace %s', | 280 'namespace %s', |
251 self.CAS_RETRIES, remaining.keys(), namespace) | 281 self.CAS_RETRIES, remaining.keys(), namespace) |
252 | 282 |
253 # Update the cas_failures metric with the number of failed keys, but don't | 283 # Update the cas_failures metric with the number of failed keys, but don't |
254 # do so recursively. | 284 # do so recursively. |
255 if (failed_keys_count and | 285 if (failed_keys_count and |
256 any(metric.name != cas_failures.name | 286 any(modification.name != cas_failures.name |
257 for metric, _ in metric_modify_fns)): | 287 for modification in modifications)): |
258 cas_failures.increment_by(failed_keys_count) | 288 cas_failures.increment_by(failed_keys_count) |
259 | 289 |
260 def _create_modify_metric_fn(self, name, fields, modify_value_fn, delta): | |
261 """Returns a function that modifies a memcache row value. | |
262 | |
263 Calls modify_value_fn(old_value, delta) and puts the result back in the | |
264 memcache row. | |
265 """ | |
266 | |
267 def modify_fn(entry): | |
268 if entry is None: | |
269 entry = (self._start_time(name), {}) | |
270 | |
271 _, targets = entry | |
272 | |
273 target_key = self._target_key() | |
274 if target_key not in targets: | |
275 targets[target_key] = {} | |
276 values = targets[target_key] | |
277 | |
278 values[fields] = modify_value_fn(values.get(fields, 0), delta) | |
279 | |
280 return entry | |
281 return modify_fn | |
282 | |
283 def _compare_and_set_metrics(self, modifications): | 290 def _compare_and_set_metrics(self, modifications): |
284 if any(name in self.METRIC_NAMES_EXCLUDED_FROM_INDEX | 291 if any(mod.name in self.METRIC_NAMES_EXCLUDED_FROM_INDEX |
285 for name, fields, modify_value_fn, delta in modifications): | 292 for mod in modifications): |
286 raise errors.MonitoringError('Metric is magical, can\'t set it') | 293 raise errors.MonitoringError('Metric is magical, can\'t set it') |
287 | 294 |
288 metric_modify_fns = [ | 295 self._compare_and_set(modifications, self._namespace_for_job()) |
289 (self._state.metrics[name], | |
290 self._create_modify_metric_fn(name, fields, modify_value_fn, delta)) | |
291 for name, fields, modify_value_fn, delta in modifications] | |
292 self._compare_and_set(metric_modify_fns, self._namespace_for_job()) | |
293 | |
294 def _create_set_value_fn(self, name, value, enforce_ge): | |
295 def modify_fn(old_value, _delta): | |
296 if enforce_ge and old_value is not None and value < old_value: | |
297 raise errors.MonitoringDecreasingValueError(name, old_value, value) | |
298 return value | |
299 return modify_fn | |
300 | 296 |
301 def set(self, name, fields, value, enforce_ge=False): | 297 def set(self, name, fields, value, enforce_ge=False): |
302 modify_fn = self._create_set_value_fn(name, value, enforce_ge) | 298 self._compare_and_set_metrics([metric_store.Modification( |
303 self._compare_and_set_metrics([(name, fields, modify_fn, None)]) | 299 name, fields, 'set', (value, enforce_ge))]) |
304 | 300 |
305 def incr(self, name, fields, delta, modify_fn=operator.add): | 301 def incr(self, name, fields, delta, modify_fn=None): |
306 if delta < 0: | 302 self._compare_and_set_metrics([metric_store.Modification( |
307 raise errors.MonitoringDecreasingValueError(name, None, delta) | 303 name, fields, 'incr', (delta, modify_fn))]) |
308 self._compare_and_set_metrics([(name, fields, modify_fn, delta)]) | |
309 | 304 |
310 def modify_multi(self, modifications): | 305 def modify_multi(self, modifications): |
311 mods = [] | 306 self._compare_and_set_metrics(modifications) |
312 for mod in modifications: | |
313 if mod.mod_type == 'set': | |
314 value, enforce_ge = mod.args | |
315 modify_fn = self._create_set_value_fn(mod.name, value, enforce_ge) | |
316 delta = None | |
317 elif mod.mod_type == 'incr': | |
318 delta, modify_fn = mod.args | |
319 if delta < 0: | |
320 raise errors.MonitoringDecreasingValueError(mod.name, None, delta) | |
321 else: | |
322 raise errors.UnknownModificationTypeError(mod.mod_type) | |
323 | |
324 mods.append((mod.name, mod.fields, modify_fn, delta)) | |
325 | |
326 self._compare_and_set_metrics(mods) | |
327 | 307 |
328 def reset_for_unittest(self, name=None): | 308 def reset_for_unittest(self, name=None): |
329 if name is None: | 309 if name is None: |
330 self._client().delete_multi(self._state.metrics.keys(), | 310 self._client().delete_multi(self._state.metrics.keys(), |
331 namespace=self._namespace_for_job()) | 311 namespace=self._namespace_for_job()) |
332 else: | 312 else: |
333 self._client().delete(name, namespace=self._namespace_for_job()) | 313 self._client().delete(name, namespace=self._namespace_for_job()) |
OLD | NEW |