Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 # Copyright 2015 The Chromium Authors. All rights reserved. | 1 # Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
| 4 | 4 |
| 5 import collections | 5 import collections |
| 6 import copy | 6 import copy |
| 7 import functools | 7 import functools |
| 8 import logging | 8 import logging |
| 9 import operator | 9 import operator |
| 10 import random | 10 import random |
| (...skipping 163 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 174 if key in shard_map: | 174 if key in shard_map: |
| 175 # This row is one shard of a sharded metric - put the shard number | 175 # This row is one shard of a sharded metric - put the shard number |
| 176 # in the task number. | 176 # in the task number. |
| 177 target.task_num, metric_name = shard_map[key] | 177 target.task_num, metric_name = shard_map[key] |
| 178 else: | 178 else: |
| 179 target.task_num, metric_name = 0, key | 179 target.task_num, metric_name = 0, key |
| 180 | 180 |
| 181 yield (copy.copy(target), entities[metric_name].metric, start_time, | 181 yield (copy.copy(target), entities[metric_name].metric, start_time, |
| 182 fields_values) | 182 fields_values) |
| 183 | 183 |
| 184 def _apply_all(self, callables, arg): | 184 def _apply_all(self, modifications, entry): |
| 185 ret = arg | 185 """Applies all the modifications, in order, to a memcache entry. |
| 186 for fn in callables: | |
| 187 ret = fn(ret) | |
| 188 return ret | |
| 189 | 186 |
| 190 def _compare_and_set(self, metric_modify_fns, namespace): | 187 All modifications must be for the same metric.""" |
| 188 | |
| 189 assert len(modifications) > 0 | |
|
Sergey Berezin
2015/12/17 22:28:50
While useful for debugging, I'm generally afraid o
dsansome
2015/12/18 00:54:33
Done.
| |
| 190 | |
| 191 # All modifications should be for the same metric. | |
| 192 name = modifications[0].name | |
| 193 | |
| 194 if entry is None: | |
| 195 entry = (self._start_time(name), {}) | |
| 196 | |
| 197 _, targets = entry | |
| 198 | |
| 199 target_key = self._target_key() | |
| 200 if target_key not in targets: | |
| 201 targets[target_key] = {} | |
| 202 values = targets[target_key] | |
|
Sergey Berezin
2015/12/17 22:28:50
nit: values = targets.setdefault(target_key, {})
dsansome
2015/12/18 00:54:33
Done.
| |
| 203 | |
| 204 for mod in modifications: | |
| 205 value = values.get(mod.fields, 0) | |
| 206 | |
| 207 if mod.mod_type == 'set': | |
| 208 new_value, enforce_ge = mod.args | |
| 209 if enforce_ge and new_value < value: | |
| 210 raise errors.MonitoringDecreasingValueError(name, value, new_value) | |
| 211 value = new_value | |
| 212 elif mod.mod_type == 'incr': | |
| 213 delta, modify_fn = mod.args | |
| 214 if delta < 0: | |
|
Sergey Berezin
2015/12/17 22:28:50
Technically, you can add a negative number to a di
dsansome
2015/12/18 00:54:33
Done.
| |
| 215 raise errors.MonitoringDecreasingValueError(name, None, delta) | |
| 216 value = modify_fn(value, delta) | |
| 217 else: | |
| 218 raise errors.UnknownModificationTypeError(mod.mod_type) | |
| 219 | |
| 220 values[mod.fields] = value | |
| 221 | |
| 222 return entry | |
| 223 | |
| 224 def _compare_and_set(self, modifications, namespace): | |
| 191 client = self._client() | 225 client = self._client() |
| 192 | 226 |
| 193 # Metrics that we haven't updated yet. Metrics are removed from this dict | 227 # Metrics that we haven't updated yet. Metrics are removed from this dict |
| 194 # when they're successfully updated - if there are any left they will be | 228 # when they're successfully updated - if there are any left they will be |
| 195 # retried 10 times until everything has been updated. | 229 # retried 10 times until everything has been updated. |
| 196 # We might have more than one modify_fn for a metric if different field | 230 # We might have more than one modification for a metric if different field |
| 197 # values were updated. | 231 # values were updated. |
| 198 metrics = {} | |
| 199 remaining = collections.defaultdict(list) | 232 remaining = collections.defaultdict(list) |
| 200 for metric, modify_fns in metric_modify_fns: | 233 for modification in modifications: |
| 201 metrics[metric.name] = metric | 234 remaining[modification.name].append(modification) |
| 202 remaining[metric.name].append(modify_fns) | |
| 203 | 235 |
| 204 failed_keys_count = 0 | 236 failed_keys_count = 0 |
| 205 | 237 |
| 206 for _ in xrange(self.CAS_RETRIES): | 238 for _ in xrange(self.CAS_RETRIES): |
| 207 keys = [] | 239 keys = [] |
| 208 key_map = {} # key -> metric name (for sharded metrics) | 240 key_map = {} # key -> metric name (for sharded metrics) |
| 209 | 241 |
| 210 for name in remaining: | 242 for name in remaining: |
| 211 # Pick one of the shards to modify. | 243 # Pick one of the shards to modify. |
| 212 key = self._random_shard(metrics[name]) | 244 key = self._random_shard(self._state.metrics[name]) |
| 213 keys.append(key) | 245 keys.append(key) |
| 214 key_map[key] = name | 246 key_map[key] = name |
| 215 | 247 |
| 216 # Get all existing entries. | 248 # Get all existing entries. |
| 217 cas_mapping = {} # key -> new value (for existing entries) | 249 cas_mapping = {} # key -> new value (for existing entries) |
| 218 add_mapping = {} # key -> new value (for new entries) | 250 add_mapping = {} # key -> new value (for new entries) |
| 219 | 251 |
| 220 entries = client.get_multi(keys, for_cas=True, namespace=namespace) | 252 entries = client.get_multi(keys, for_cas=True, namespace=namespace) |
| 221 for key, entry in entries.iteritems(): | 253 for key, entry in entries.iteritems(): |
| 222 cas_mapping[key] = self._apply_all(remaining[key_map[key]], entry) | 254 cas_mapping[key] = self._apply_all(remaining[key_map[key]], entry) |
| (...skipping 23 matching lines...) Expand all Loading... | |
| 246 remaining = still_remaining | 278 remaining = still_remaining |
| 247 else: | 279 else: |
| 248 logging.warning( | 280 logging.warning( |
| 249 'Memcache compare-and-set failed %d times for keys %s in ' | 281 'Memcache compare-and-set failed %d times for keys %s in ' |
| 250 'namespace %s', | 282 'namespace %s', |
| 251 self.CAS_RETRIES, remaining.keys(), namespace) | 283 self.CAS_RETRIES, remaining.keys(), namespace) |
| 252 | 284 |
| 253 # Update the cas_failures metric with the number of failed keys, but don't | 285 # Update the cas_failures metric with the number of failed keys, but don't |
| 254 # do so recursively. | 286 # do so recursively. |
| 255 if (failed_keys_count and | 287 if (failed_keys_count and |
| 256 any(metric.name != cas_failures.name | 288 any(modification.name != cas_failures.name |
| 257 for metric, _ in metric_modify_fns)): | 289 for modification in modifications)): |
| 258 cas_failures.increment_by(failed_keys_count) | 290 cas_failures.increment_by(failed_keys_count) |
| 259 | 291 |
| 260 def _create_modify_metric_fn(self, name, fields, modify_value_fn, delta): | |
| 261 """Returns a function that modifies a memcache row value. | |
| 262 | |
| 263 Calls modify_value_fn(old_value, delta) and puts the result back in the | |
| 264 memcache row. | |
| 265 """ | |
| 266 | |
| 267 def modify_fn(entry): | |
| 268 if entry is None: | |
| 269 entry = (self._start_time(name), {}) | |
| 270 | |
| 271 _, targets = entry | |
| 272 | |
| 273 target_key = self._target_key() | |
| 274 if target_key not in targets: | |
| 275 targets[target_key] = {} | |
| 276 values = targets[target_key] | |
| 277 | |
| 278 values[fields] = modify_value_fn(values.get(fields, 0), delta) | |
| 279 | |
| 280 return entry | |
| 281 return modify_fn | |
| 282 | |
| 283 def _compare_and_set_metrics(self, modifications): | 292 def _compare_and_set_metrics(self, modifications): |
| 284 if any(name in self.METRIC_NAMES_EXCLUDED_FROM_INDEX | 293 if any(mod.name in self.METRIC_NAMES_EXCLUDED_FROM_INDEX |
| 285 for name, fields, modify_value_fn, delta in modifications): | 294 for mod in modifications): |
| 286 raise errors.MonitoringError('Metric is magical, can\'t set it') | 295 raise errors.MonitoringError('Metric is magical, can\'t set it') |
| 287 | 296 |
| 288 metric_modify_fns = [ | 297 self._compare_and_set(modifications, self._namespace_for_job()) |
| 289 (self._state.metrics[name], | |
| 290 self._create_modify_metric_fn(name, fields, modify_value_fn, delta)) | |
| 291 for name, fields, modify_value_fn, delta in modifications] | |
| 292 self._compare_and_set(metric_modify_fns, self._namespace_for_job()) | |
| 293 | |
| 294 def _create_set_value_fn(self, name, value, enforce_ge): | |
| 295 def modify_fn(old_value, _delta): | |
| 296 if enforce_ge and old_value is not None and value < old_value: | |
| 297 raise errors.MonitoringDecreasingValueError(name, old_value, value) | |
| 298 return value | |
| 299 return modify_fn | |
| 300 | 298 |
| 301 def set(self, name, fields, value, enforce_ge=False): | 299 def set(self, name, fields, value, enforce_ge=False): |
| 302 modify_fn = self._create_set_value_fn(name, value, enforce_ge) | 300 self._compare_and_set_metrics([metric_store.Modification( |
| 303 self._compare_and_set_metrics([(name, fields, modify_fn, None)]) | 301 name, fields, 'set', (value, enforce_ge))]) |
| 304 | 302 |
| 305 def incr(self, name, fields, delta, modify_fn=operator.add): | 303 def incr(self, name, fields, delta, modify_fn=operator.add): |
| 306 if delta < 0: | 304 self._compare_and_set_metrics([metric_store.Modification( |
| 307 raise errors.MonitoringDecreasingValueError(name, None, delta) | 305 name, fields, 'incr', (delta, modify_fn))]) |
| 308 self._compare_and_set_metrics([(name, fields, modify_fn, delta)]) | |
| 309 | 306 |
| 310 def modify_multi(self, modifications): | 307 def modify_multi(self, modifications): |
| 311 mods = [] | 308 self._compare_and_set_metrics(modifications) |
| 312 for mod in modifications: | |
| 313 if mod.mod_type == 'set': | |
| 314 value, enforce_ge = mod.args | |
| 315 modify_fn = self._create_set_value_fn(mod.name, value, enforce_ge) | |
| 316 delta = None | |
| 317 elif mod.mod_type == 'incr': | |
| 318 delta, modify_fn = mod.args | |
| 319 if delta < 0: | |
| 320 raise errors.MonitoringDecreasingValueError(mod.name, None, delta) | |
| 321 else: | |
| 322 raise errors.UnknownModificationTypeError(mod.mod_type) | |
| 323 | |
| 324 mods.append((mod.name, mod.fields, modify_fn, delta)) | |
| 325 | |
| 326 self._compare_and_set_metrics(mods) | |
| 327 | 309 |
| 328 def reset_for_unittest(self, name=None): | 310 def reset_for_unittest(self, name=None): |
| 329 if name is None: | 311 if name is None: |
| 330 self._client().delete_multi(self._state.metrics.keys(), | 312 self._client().delete_multi(self._state.metrics.keys(), |
| 331 namespace=self._namespace_for_job()) | 313 namespace=self._namespace_for_job()) |
| 332 else: | 314 else: |
| 333 self._client().delete(name, namespace=self._namespace_for_job()) | 315 self._client().delete(name, namespace=self._namespace_for_job()) |
| OLD | NEW |