OLD | NEW |
---|---|
1 # Copyright 2015 The Chromium Authors. All rights reserved. | 1 # Copyright 2015 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 import collections | 5 import collections |
6 import copy | 6 import copy |
7 import functools | 7 import functools |
8 import logging | 8 import logging |
9 import operator | 9 import operator |
10 import random | 10 import random |
(...skipping 163 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
174 if key in shard_map: | 174 if key in shard_map: |
175 # This row is one shard of a sharded metric - put the shard number | 175 # This row is one shard of a sharded metric - put the shard number |
176 # in the task number. | 176 # in the task number. |
177 target.task_num, metric_name = shard_map[key] | 177 target.task_num, metric_name = shard_map[key] |
178 else: | 178 else: |
179 target.task_num, metric_name = 0, key | 179 target.task_num, metric_name = 0, key |
180 | 180 |
181 yield (copy.copy(target), entities[metric_name].metric, start_time, | 181 yield (copy.copy(target), entities[metric_name].metric, start_time, |
182 fields_values) | 182 fields_values) |
183 | 183 |
184 def _apply_all(self, callables, arg): | 184 def _apply_all(self, modifications, entry): |
185 ret = arg | 185 """Applies all the modifications, in order, to a memcache entry. |
186 for fn in callables: | |
187 ret = fn(ret) | |
188 return ret | |
189 | 186 |
190 def _compare_and_set(self, metric_modify_fns, namespace): | 187 All modifications must be for the same metric.""" |
188 | |
189 assert len(modifications) > 0 | |
Sergey Berezin
2015/12/17 22:28:50
While useful for debugging, I'm generally afraid o
dsansome
2015/12/18 00:54:33
Done.
| |
190 | |
191 # All modifications should be for the same metric. | |
192 name = modifications[0].name | |
193 | |
194 if entry is None: | |
195 entry = (self._start_time(name), {}) | |
196 | |
197 _, targets = entry | |
198 | |
199 target_key = self._target_key() | |
200 if target_key not in targets: | |
201 targets[target_key] = {} | |
202 values = targets[target_key] | |
Sergey Berezin
2015/12/17 22:28:50
nit: values = targets.setdefault(target_key, {})
dsansome
2015/12/18 00:54:33
Done.
| |
203 | |
204 for mod in modifications: | |
205 value = values.get(mod.fields, 0) | |
206 | |
207 if mod.mod_type == 'set': | |
208 new_value, enforce_ge = mod.args | |
209 if enforce_ge and new_value < value: | |
210 raise errors.MonitoringDecreasingValueError(name, value, new_value) | |
211 value = new_value | |
212 elif mod.mod_type == 'incr': | |
213 delta, modify_fn = mod.args | |
214 if delta < 0: | |
Sergey Berezin
2015/12/17 22:28:50
Technically, you can add a negative number to a di
dsansome
2015/12/18 00:54:33
Done.
| |
215 raise errors.MonitoringDecreasingValueError(name, None, delta) | |
216 value = modify_fn(value, delta) | |
217 else: | |
218 raise errors.UnknownModificationTypeError(mod.mod_type) | |
219 | |
220 values[mod.fields] = value | |
221 | |
222 return entry | |
223 | |
224 def _compare_and_set(self, modifications, namespace): | |
191 client = self._client() | 225 client = self._client() |
192 | 226 |
193 # Metrics that we haven't updated yet. Metrics are removed from this dict | 227 # Metrics that we haven't updated yet. Metrics are removed from this dict |
194 # when they're successfully updated - if there are any left they will be | 228 # when they're successfully updated - if there are any left they will be |
195 # retried 10 times until everything has been updated. | 229 # retried 10 times until everything has been updated. |
196 # We might have more than one modify_fn for a metric if different field | 230 # We might have more than one modification for a metric if different field |
197 # values were updated. | 231 # values were updated. |
198 metrics = {} | |
199 remaining = collections.defaultdict(list) | 232 remaining = collections.defaultdict(list) |
200 for metric, modify_fns in metric_modify_fns: | 233 for modification in modifications: |
201 metrics[metric.name] = metric | 234 remaining[modification.name].append(modification) |
202 remaining[metric.name].append(modify_fns) | |
203 | 235 |
204 failed_keys_count = 0 | 236 failed_keys_count = 0 |
205 | 237 |
206 for _ in xrange(self.CAS_RETRIES): | 238 for _ in xrange(self.CAS_RETRIES): |
207 keys = [] | 239 keys = [] |
208 key_map = {} # key -> metric name (for sharded metrics) | 240 key_map = {} # key -> metric name (for sharded metrics) |
209 | 241 |
210 for name in remaining: | 242 for name in remaining: |
211 # Pick one of the shards to modify. | 243 # Pick one of the shards to modify. |
212 key = self._random_shard(metrics[name]) | 244 key = self._random_shard(self._state.metrics[name]) |
213 keys.append(key) | 245 keys.append(key) |
214 key_map[key] = name | 246 key_map[key] = name |
215 | 247 |
216 # Get all existing entries. | 248 # Get all existing entries. |
217 cas_mapping = {} # key -> new value (for existing entries) | 249 cas_mapping = {} # key -> new value (for existing entries) |
218 add_mapping = {} # key -> new value (for new entries) | 250 add_mapping = {} # key -> new value (for new entries) |
219 | 251 |
220 entries = client.get_multi(keys, for_cas=True, namespace=namespace) | 252 entries = client.get_multi(keys, for_cas=True, namespace=namespace) |
221 for key, entry in entries.iteritems(): | 253 for key, entry in entries.iteritems(): |
222 cas_mapping[key] = self._apply_all(remaining[key_map[key]], entry) | 254 cas_mapping[key] = self._apply_all(remaining[key_map[key]], entry) |
(...skipping 23 matching lines...) Expand all Loading... | |
246 remaining = still_remaining | 278 remaining = still_remaining |
247 else: | 279 else: |
248 logging.warning( | 280 logging.warning( |
249 'Memcache compare-and-set failed %d times for keys %s in ' | 281 'Memcache compare-and-set failed %d times for keys %s in ' |
250 'namespace %s', | 282 'namespace %s', |
251 self.CAS_RETRIES, remaining.keys(), namespace) | 283 self.CAS_RETRIES, remaining.keys(), namespace) |
252 | 284 |
253 # Update the cas_failures metric with the number of failed keys, but don't | 285 # Update the cas_failures metric with the number of failed keys, but don't |
254 # do so recursively. | 286 # do so recursively. |
255 if (failed_keys_count and | 287 if (failed_keys_count and |
256 any(metric.name != cas_failures.name | 288 any(modification.name != cas_failures.name |
257 for metric, _ in metric_modify_fns)): | 289 for modification in modifications)): |
258 cas_failures.increment_by(failed_keys_count) | 290 cas_failures.increment_by(failed_keys_count) |
259 | 291 |
260 def _create_modify_metric_fn(self, name, fields, modify_value_fn, delta): | |
261 """Returns a function that modifies a memcache row value. | |
262 | |
263 Calls modify_value_fn(old_value, delta) and puts the result back in the | |
264 memcache row. | |
265 """ | |
266 | |
267 def modify_fn(entry): | |
268 if entry is None: | |
269 entry = (self._start_time(name), {}) | |
270 | |
271 _, targets = entry | |
272 | |
273 target_key = self._target_key() | |
274 if target_key not in targets: | |
275 targets[target_key] = {} | |
276 values = targets[target_key] | |
277 | |
278 values[fields] = modify_value_fn(values.get(fields, 0), delta) | |
279 | |
280 return entry | |
281 return modify_fn | |
282 | |
283 def _compare_and_set_metrics(self, modifications): | 292 def _compare_and_set_metrics(self, modifications): |
284 if any(name in self.METRIC_NAMES_EXCLUDED_FROM_INDEX | 293 if any(mod.name in self.METRIC_NAMES_EXCLUDED_FROM_INDEX |
285 for name, fields, modify_value_fn, delta in modifications): | 294 for mod in modifications): |
286 raise errors.MonitoringError('Metric is magical, can\'t set it') | 295 raise errors.MonitoringError('Metric is magical, can\'t set it') |
287 | 296 |
288 metric_modify_fns = [ | 297 self._compare_and_set(modifications, self._namespace_for_job()) |
289 (self._state.metrics[name], | |
290 self._create_modify_metric_fn(name, fields, modify_value_fn, delta)) | |
291 for name, fields, modify_value_fn, delta in modifications] | |
292 self._compare_and_set(metric_modify_fns, self._namespace_for_job()) | |
293 | |
294 def _create_set_value_fn(self, name, value, enforce_ge): | |
295 def modify_fn(old_value, _delta): | |
296 if enforce_ge and old_value is not None and value < old_value: | |
297 raise errors.MonitoringDecreasingValueError(name, old_value, value) | |
298 return value | |
299 return modify_fn | |
300 | 298 |
301 def set(self, name, fields, value, enforce_ge=False): | 299 def set(self, name, fields, value, enforce_ge=False): |
302 modify_fn = self._create_set_value_fn(name, value, enforce_ge) | 300 self._compare_and_set_metrics([metric_store.Modification( |
303 self._compare_and_set_metrics([(name, fields, modify_fn, None)]) | 301 name, fields, 'set', (value, enforce_ge))]) |
304 | 302 |
305 def incr(self, name, fields, delta, modify_fn=operator.add): | 303 def incr(self, name, fields, delta, modify_fn=operator.add): |
306 if delta < 0: | 304 self._compare_and_set_metrics([metric_store.Modification( |
307 raise errors.MonitoringDecreasingValueError(name, None, delta) | 305 name, fields, 'incr', (delta, modify_fn))]) |
308 self._compare_and_set_metrics([(name, fields, modify_fn, delta)]) | |
309 | 306 |
310 def modify_multi(self, modifications): | 307 def modify_multi(self, modifications): |
311 mods = [] | 308 self._compare_and_set_metrics(modifications) |
312 for mod in modifications: | |
313 if mod.mod_type == 'set': | |
314 value, enforce_ge = mod.args | |
315 modify_fn = self._create_set_value_fn(mod.name, value, enforce_ge) | |
316 delta = None | |
317 elif mod.mod_type == 'incr': | |
318 delta, modify_fn = mod.args | |
319 if delta < 0: | |
320 raise errors.MonitoringDecreasingValueError(mod.name, None, delta) | |
321 else: | |
322 raise errors.UnknownModificationTypeError(mod.mod_type) | |
323 | |
324 mods.append((mod.name, mod.fields, modify_fn, delta)) | |
325 | |
326 self._compare_and_set_metrics(mods) | |
327 | 309 |
328 def reset_for_unittest(self, name=None): | 310 def reset_for_unittest(self, name=None): |
329 if name is None: | 311 if name is None: |
330 self._client().delete_multi(self._state.metrics.keys(), | 312 self._client().delete_multi(self._state.metrics.keys(), |
331 namespace=self._namespace_for_job()) | 313 namespace=self._namespace_for_job()) |
332 else: | 314 else: |
333 self._client().delete(name, namespace=self._namespace_for_job()) | 315 self._client().delete(name, namespace=self._namespace_for_job()) |
OLD | NEW |