Chromium Code Reviews| Index: appengine/sheriff_o_matic/utils.py |
| diff --git a/appengine/sheriff_o_matic/utils.py b/appengine/sheriff_o_matic/utils.py |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..fe03d6c8e70d1483e8d8c61e205a804a6945711c |
| --- /dev/null |
| +++ b/appengine/sheriff_o_matic/utils.py |
| @@ -0,0 +1,90 @@ |
| +#!/usr/bin/env python |
|
agable
2015/08/12 22:18:08
I'd sort the functions in this file into
a) Auth s
|
| +# Copyright (c) 2015 The Chromium Authors. All rights reserved. |
| +# Use of this source code is governed by a BSD-style license that can be |
| +# found in the LICENSE file. |
| + |
| +"""Utility functions for Sheriff-o-matic.""" |
| + |
| +import calendar |
| +import datetime |
| +import hashlib |
| +import json |
| +import logging |
| +import time |
| +import urllib2 |
| +import webapp2 |
| + |
| +from datetime import datetime as dt |
| +from google.appengine.api import app_identity |
| +from google.appengine.api import urlfetch |
| +from google.appengine.api import users |
| + |
| +from components import auth |
| + |
| + |
| +class DateTimeEncoder(json.JSONEncoder): |
| + |
| + def default(self, obj): # pylint: disable=E0202 |
| + if isinstance(obj, datetime.datetime): |
| + return calendar.timegm(obj.timetuple()) |
| + # Let the base class default method raise the TypeError. |
| + return json.JSONEncoder.default(self, obj) |
| + |
| + |
| +def increment_monarch(endpoint): |
| + base_url = app_identity.get_default_version_hostname() |
| + url = 'http://%s-dot-%s/monitoring/%s' % ('monitoring', base_url, endpoint) |
| + return urlfetch.fetch(url=url, method=urlfetch.GET) |
| + |
| + |
| +def is_googler(): |
| + user = users.get_current_user() |
| + if user: |
| + email = user.email() |
| + return email.endswith('@google.com') and '+' not in email |
| + return False |
| + |
| + |
| +def is_trooper_or_admin(): |
| + return (auth.is_group_member("mdb/chrome-troopers") or |
| + users.is_current_user_admin()) |
| + |
| + |
| +def convert_to_secs(duration_str): |
| + duration_str = duration_str.strip() |
| + if duration_str[-1] == 's': |
| + return int(duration_str[:-1]) |
| + elif duration_str[-1] == 'm': |
| + return 60 * int(duration_str[:-1]) |
| + elif duration_str[-1] == 'h': |
| + return 3600 * int(duration_str[:-1]) |
| + elif duration_str[-1] == 'd': |
| + return 24 * 3600 * int(duration_str[:-1]) |
| + elif duration_str[-1] == 'w': |
| + return 7 * 24 * 3600 * int(duration_str[:-1]) |
| + else: |
| + raise Exception('Invalid duration_str ' + duration_str[-1]) |
| + |
| + |
| +def secs_ago(time_string, time_now=None): |
| + try: |
| + time_sent = dt.strptime(time_string, '%Y-%m-%d %H:%M:%S %Z') |
| + except ValueError: |
| + time_sent = dt.strptime(time_string, '%Y-%m-%d %H:%M:%S') |
| + time_now = time_now or int(dt.utcnow().strftime('%s')) |
| + latency = int(time_now) - int(time_sent.strftime('%s')) |
| + return latency |
| + |
| + |
| +def hash_string(input_str): |
| + return hashlib.sha1(input_str).hexdigest() |
| + |
| + |
| +def generate_json_dump(alerts, human_readable=True): |
| + if human_readable: |
| + return json.dumps(alerts, cls=DateTimeEncoder, |
| + indent=2, |
| + separators=(',', ': ')) |
| + return json.dumps(alerts, cls=DateTimeEncoder, |
| + indent=None, |
| + separators=(',', ':')) |