Index: appengine/chromium_cq_status/shared/utils.py |
diff --git a/appengine/chromium_cq_status/shared/utils.py b/appengine/chromium_cq_status/shared/utils.py |
index 7312725d4f3dd54789d0a6b8fd58ca80630b23af..6273803f4c4b5611438cd2a2a19cde6d96b659f7 100644 |
--- a/appengine/chromium_cq_status/shared/utils.py |
+++ b/appengine/chromium_cq_status/shared/utils.py |
@@ -4,17 +4,19 @@ |
import calendar |
from datetime import datetime |
+import functools |
import hashlib |
import json |
import logging |
+import os |
from google.appengine.api import memcache |
from google.appengine.api import users |
+from google.appengine.api import app_identity |
-from shared.config import VALID_EMAIL_RE |
+from shared.config import HOST_ACLS |
compressed_separators = (',', ':') |
-minutes_per_day = 24 * 60 |
def cronjob(cronjob_handler): |
def checked_cronjob_handler(self, *args): |
@@ -24,6 +26,7 @@ def cronjob(cronjob_handler): |
return checked_cronjob_handler |
def cross_origin_json(handler): |
+ @functools.wraps(handler) |
def headered_json_handler(self, *args): |
self.response.headers.add_header("Access-Control-Allow-Origin", "*") |
result = handler(self, *args) |
@@ -35,11 +38,45 @@ def cross_origin_json(handler): |
def filter_dict(d, keys): |
return {key: d[key] for key in d if key in keys} |
-def is_valid_user(): |
+ |
+def get_host_permissions(kind): |
+ """Returns compiled regex of allowed user email or True if everyone is |
+ allowed.""" |
+ assert kind in ('read', 'write') |
+ if os.environ.get('SERVER_SOFTWARE', '').startswith('Development'): |
+ host = 'Development' |
+ else: |
+ host = app_identity.get_default_version_hostname() |
+ return HOST_ACLS[host][kind] |
+ |
+def has_permission(kind): |
if users.is_current_user_admin(): |
+ logging.info('user is admin') |
+ return True |
+ email_pattern = get_host_permissions(kind) |
+ if email_pattern == 'everyone': |
return True |
user = users.get_current_user() |
- return user and VALID_EMAIL_RE.match(user.email()) |
+ logging.info('user: %s %s', user, 'xx' if not user else user.email()) |
+ return user and bool(email_pattern.match(user.email())) |
+ |
+ |
+def read_access(handler): |
+ """Decorator ensuring current user has read access to this host.""" |
+ @functools.wraps(handler) |
+ def ensure(self, *args, **kwargs): |
+ if not has_permission('read'): |
+ self.redirect(users.create_login_url(self.request.url)) |
+ return |
+ return handler(self, *args, **kwargs) |
+ return ensure |
+ |
+ |
+def get_friendly_hostname(): |
+ host = app_identity.get_default_version_hostname() |
+ # For a typical host 'xyz-cq-status.appspot.com', return 'Xyz'. |
+ return host.split('-')[0].capitalize() if host else '(Development)' |
+ |
def memcachize(cache_check): |
def decorator(f): |