| OLD | NEW |
| 1 # Copyright 2015 The Chromium Authors. All rights reserved. | 1 # Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
| 4 | 4 |
| 5 import collections | 5 import collections |
| 6 import copy | 6 import copy |
| 7 import functools | 7 import functools |
| 8 import logging | 8 import logging |
| 9 import operator | |
| 10 import random | 9 import random |
| 11 import threading | 10 import threading |
| 12 | 11 |
| 13 from google.appengine.api import memcache | 12 from google.appengine.api import memcache |
| 14 from google.appengine.api import modules | 13 from google.appengine.api import modules |
| 15 from google.appengine.ext import ndb | 14 from google.appengine.ext import ndb |
| 16 | 15 |
| 17 from infra_libs.ts_mon.common import errors | 16 from infra_libs.ts_mon.common import errors |
| 18 from infra_libs.ts_mon.common import metric_store | 17 from infra_libs.ts_mon.common import metric_store |
| 19 from infra_libs.ts_mon.common import metrics | 18 from infra_libs.ts_mon.common import metrics |
| (...skipping 154 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 174 if key in shard_map: | 173 if key in shard_map: |
| 175 # This row is one shard of a sharded metric - put the shard number | 174 # This row is one shard of a sharded metric - put the shard number |
| 176 # in the task number. | 175 # in the task number. |
| 177 target.task_num, metric_name = shard_map[key] | 176 target.task_num, metric_name = shard_map[key] |
| 178 else: | 177 else: |
| 179 target.task_num, metric_name = 0, key | 178 target.task_num, metric_name = 0, key |
| 180 | 179 |
| 181 yield (copy.copy(target), entities[metric_name].metric, start_time, | 180 yield (copy.copy(target), entities[metric_name].metric, start_time, |
| 182 fields_values) | 181 fields_values) |
| 183 | 182 |
| 184 def _apply_all(self, callables, arg): | 183 def _apply_all(self, modifications, entry): |
| 185 ret = arg | 184 """Applies all the modifications, in order, to a memcache entry. |
| 186 for fn in callables: | |
| 187 ret = fn(ret) | |
| 188 return ret | |
| 189 | 185 |
| 190 def _compare_and_set(self, metric_modify_fns, namespace): | 186 All modifications must be for the same metric.""" |
| 187 |
| 188 if not modifications: # pragma: no cover |
| 189 return entry |
| 190 |
| 191 # All modifications should be for the same metric. |
| 192 name = modifications[0].name |
| 193 |
| 194 if entry is None: |
| 195 entry = (self._start_time(name), {}) |
| 196 |
| 197 _, targets = entry |
| 198 |
| 199 target_key = self._target_key() |
| 200 values = targets.setdefault(target_key, {}) |
| 201 |
| 202 for mod in modifications: |
| 203 value = values.get(mod.fields, 0) |
| 204 |
| 205 if mod.mod_type == 'set': |
| 206 new_value, enforce_ge = mod.args |
| 207 if enforce_ge and new_value < value: |
| 208 raise errors.MonitoringDecreasingValueError(name, value, new_value) |
| 209 value = new_value |
| 210 elif mod.mod_type == 'incr': |
| 211 delta, modify_fn = mod.args |
| 212 if modify_fn is None: |
| 213 modify_fn = metric_store.default_modify_fn(name) |
| 214 value = modify_fn(value, delta) |
| 215 else: |
| 216 raise errors.UnknownModificationTypeError(mod.mod_type) |
| 217 |
| 218 values[mod.fields] = value |
| 219 |
| 220 return entry |
| 221 |
| 222 def _compare_and_set(self, modifications, namespace): |
| 191 client = self._client() | 223 client = self._client() |
| 192 | 224 |
| 193 # Metrics that we haven't updated yet. Metrics are removed from this dict | 225 # Metrics that we haven't updated yet. Metrics are removed from this dict |
| 194 # when they're successfully updated - if there are any left they will be | 226 # when they're successfully updated - if there are any left they will be |
| 195 # retried 10 times until everything has been updated. | 227 # retried 10 times until everything has been updated. |
| 196 # We might have more than one modify_fn for a metric if different field | 228 # We might have more than one modification for a metric if different field |
| 197 # values were updated. | 229 # values were updated. |
| 198 metrics = {} | |
| 199 remaining = collections.defaultdict(list) | 230 remaining = collections.defaultdict(list) |
| 200 for metric, modify_fns in metric_modify_fns: | 231 for modification in modifications: |
| 201 metrics[metric.name] = metric | 232 remaining[modification.name].append(modification) |
| 202 remaining[metric.name].append(modify_fns) | |
| 203 | 233 |
| 204 failed_keys_count = 0 | 234 failed_keys_count = 0 |
| 205 | 235 |
| 206 for _ in xrange(self.CAS_RETRIES): | 236 for _ in xrange(self.CAS_RETRIES): |
| 207 keys = [] | 237 keys = [] |
| 208 key_map = {} # key -> metric name (for sharded metrics) | 238 key_map = {} # key -> metric name (for sharded metrics) |
| 209 | 239 |
| 210 for name in remaining: | 240 for name in remaining: |
| 211 # Pick one of the shards to modify. | 241 # Pick one of the shards to modify. |
| 212 key = self._random_shard(metrics[name]) | 242 key = self._random_shard(self._state.metrics[name]) |
| 213 keys.append(key) | 243 keys.append(key) |
| 214 key_map[key] = name | 244 key_map[key] = name |
| 215 | 245 |
| 216 # Get all existing entries. | 246 # Get all existing entries. |
| 217 cas_mapping = {} # key -> new value (for existing entries) | 247 cas_mapping = {} # key -> new value (for existing entries) |
| 218 add_mapping = {} # key -> new value (for new entries) | 248 add_mapping = {} # key -> new value (for new entries) |
| 219 | 249 |
| 220 entries = client.get_multi(keys, for_cas=True, namespace=namespace) | 250 entries = client.get_multi(keys, for_cas=True, namespace=namespace) |
| 221 for key, entry in entries.iteritems(): | 251 for key, entry in entries.iteritems(): |
| 222 cas_mapping[key] = self._apply_all(remaining[key_map[key]], entry) | 252 cas_mapping[key] = self._apply_all(remaining[key_map[key]], entry) |
| (...skipping 23 matching lines...) Expand all Loading... |
| 246 remaining = still_remaining | 276 remaining = still_remaining |
| 247 else: | 277 else: |
| 248 logging.warning( | 278 logging.warning( |
| 249 'Memcache compare-and-set failed %d times for keys %s in ' | 279 'Memcache compare-and-set failed %d times for keys %s in ' |
| 250 'namespace %s', | 280 'namespace %s', |
| 251 self.CAS_RETRIES, remaining.keys(), namespace) | 281 self.CAS_RETRIES, remaining.keys(), namespace) |
| 252 | 282 |
| 253 # Update the cas_failures metric with the number of failed keys, but don't | 283 # Update the cas_failures metric with the number of failed keys, but don't |
| 254 # do so recursively. | 284 # do so recursively. |
| 255 if (failed_keys_count and | 285 if (failed_keys_count and |
| 256 any(metric.name != cas_failures.name | 286 any(modification.name != cas_failures.name |
| 257 for metric, _ in metric_modify_fns)): | 287 for modification in modifications)): |
| 258 cas_failures.increment_by(failed_keys_count) | 288 cas_failures.increment_by(failed_keys_count) |
| 259 | 289 |
| 260 def _create_modify_metric_fn(self, name, fields, modify_value_fn, delta): | |
| 261 """Returns a function that modifies a memcache row value. | |
| 262 | |
| 263 Calls modify_value_fn(old_value, delta) and puts the result back in the | |
| 264 memcache row. | |
| 265 """ | |
| 266 | |
| 267 def modify_fn(entry): | |
| 268 if entry is None: | |
| 269 entry = (self._start_time(name), {}) | |
| 270 | |
| 271 _, targets = entry | |
| 272 | |
| 273 target_key = self._target_key() | |
| 274 if target_key not in targets: | |
| 275 targets[target_key] = {} | |
| 276 values = targets[target_key] | |
| 277 | |
| 278 values[fields] = modify_value_fn(values.get(fields, 0), delta) | |
| 279 | |
| 280 return entry | |
| 281 return modify_fn | |
| 282 | |
| 283 def _compare_and_set_metrics(self, modifications): | 290 def _compare_and_set_metrics(self, modifications): |
| 284 if any(name in self.METRIC_NAMES_EXCLUDED_FROM_INDEX | 291 if any(mod.name in self.METRIC_NAMES_EXCLUDED_FROM_INDEX |
| 285 for name, fields, modify_value_fn, delta in modifications): | 292 for mod in modifications): |
| 286 raise errors.MonitoringError('Metric is magical, can\'t set it') | 293 raise errors.MonitoringError('Metric is magical, can\'t set it') |
| 287 | 294 |
| 288 metric_modify_fns = [ | 295 self._compare_and_set(modifications, self._namespace_for_job()) |
| 289 (self._state.metrics[name], | |
| 290 self._create_modify_metric_fn(name, fields, modify_value_fn, delta)) | |
| 291 for name, fields, modify_value_fn, delta in modifications] | |
| 292 self._compare_and_set(metric_modify_fns, self._namespace_for_job()) | |
| 293 | |
| 294 def _create_set_value_fn(self, name, value, enforce_ge): | |
| 295 def modify_fn(old_value, _delta): | |
| 296 if enforce_ge and old_value is not None and value < old_value: | |
| 297 raise errors.MonitoringDecreasingValueError(name, old_value, value) | |
| 298 return value | |
| 299 return modify_fn | |
| 300 | 296 |
| 301 def set(self, name, fields, value, enforce_ge=False): | 297 def set(self, name, fields, value, enforce_ge=False): |
| 302 modify_fn = self._create_set_value_fn(name, value, enforce_ge) | 298 self._compare_and_set_metrics([metric_store.Modification( |
| 303 self._compare_and_set_metrics([(name, fields, modify_fn, None)]) | 299 name, fields, 'set', (value, enforce_ge))]) |
| 304 | 300 |
| 305 def incr(self, name, fields, delta, modify_fn=operator.add): | 301 def incr(self, name, fields, delta, modify_fn=None): |
| 306 if delta < 0: | 302 self._compare_and_set_metrics([metric_store.Modification( |
| 307 raise errors.MonitoringDecreasingValueError(name, None, delta) | 303 name, fields, 'incr', (delta, modify_fn))]) |
| 308 self._compare_and_set_metrics([(name, fields, modify_fn, delta)]) | |
| 309 | 304 |
| 310 def modify_multi(self, modifications): | 305 def modify_multi(self, modifications): |
| 311 mods = [] | 306 self._compare_and_set_metrics(modifications) |
| 312 for mod in modifications: | |
| 313 if mod.mod_type == 'set': | |
| 314 value, enforce_ge = mod.args | |
| 315 modify_fn = self._create_set_value_fn(mod.name, value, enforce_ge) | |
| 316 delta = None | |
| 317 elif mod.mod_type == 'incr': | |
| 318 delta, modify_fn = mod.args | |
| 319 if delta < 0: | |
| 320 raise errors.MonitoringDecreasingValueError(mod.name, None, delta) | |
| 321 else: | |
| 322 raise errors.UnknownModificationTypeError(mod.mod_type) | |
| 323 | |
| 324 mods.append((mod.name, mod.fields, modify_fn, delta)) | |
| 325 | |
| 326 self._compare_and_set_metrics(mods) | |
| 327 | 307 |
| 328 def reset_for_unittest(self, name=None): | 308 def reset_for_unittest(self, name=None): |
| 329 if name is None: | 309 if name is None: |
| 330 self._client().delete_multi(self._state.metrics.keys(), | 310 self._client().delete_multi(self._state.metrics.keys(), |
| 331 namespace=self._namespace_for_job()) | 311 namespace=self._namespace_for_job()) |
| 332 else: | 312 else: |
| 333 self._client().delete(name, namespace=self._namespace_for_job()) | 313 self._client().delete(name, namespace=self._namespace_for_job()) |
| OLD | NEW |