Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(13)

Side by Side Diff: appengine/chromium_cq_status/shared/utils.py

Issue 2111713004: Teach CQ status app to check login status of users. (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Review and tests. Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2014 The Chromium Authors. All rights reserved. 1 # Copyright 2014 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 import calendar 5 import calendar
6 from datetime import datetime 6 from datetime import datetime
7 import functools
7 import hashlib 8 import hashlib
8 import json 9 import json
9 import logging 10 import logging
11 import os
10 12
11 from google.appengine.api import memcache 13 from google.appengine.api import memcache
12 from google.appengine.api import users 14 from google.appengine.api import users
15 from google.appengine.api import app_identity
13 16
14 from shared.config import VALID_EMAIL_RE 17 from shared.config import HOST_ACLS
15 18
16 compressed_separators = (',', ':') 19 compressed_separators = (',', ':')
17 minutes_per_day = 24 * 60
18 20
19 def cronjob(cronjob_handler): 21 def cronjob(cronjob_handler):
20 def checked_cronjob_handler(self, *args): 22 def checked_cronjob_handler(self, *args):
21 assert (self.request.headers.get('X-AppEngine-Cron') or # pragma: no cover 23 assert (self.request.headers.get('X-AppEngine-Cron') or # pragma: no cover
22 users.is_current_user_admin()) 24 users.is_current_user_admin())
23 cronjob_handler(self, *args) # pragma: no cover 25 cronjob_handler(self, *args) # pragma: no cover
24 return checked_cronjob_handler 26 return checked_cronjob_handler
25 27
26 def cross_origin_json(handler): 28 def cross_origin_json(handler):
29 @functools.wraps(handler)
27 def headered_json_handler(self, *args): 30 def headered_json_handler(self, *args):
28 self.response.headers.add_header("Access-Control-Allow-Origin", "*") 31 self.response.headers.add_header("Access-Control-Allow-Origin", "*")
29 result = handler(self, *args) 32 result = handler(self, *args)
30 if result is not None: 33 if result is not None:
31 self.response.headers.add_header('Content-Type', 'application/json') 34 self.response.headers.add_header('Content-Type', 'application/json')
32 self.response.write(compressed_json_dumps(result)) 35 self.response.write(compressed_json_dumps(result))
33 return headered_json_handler 36 return headered_json_handler
34 37
35 def filter_dict(d, keys): 38 def filter_dict(d, keys):
36 return {key: d[key] for key in d if key in keys} 39 return {key: d[key] for key in d if key in keys}
37 40
38 def is_valid_user(): 41
42 def get_host_permissions(kind):
43 """Returns compiled regex of allowed user email or True if everyone is
44 allowed."""
45 assert kind in ('read', 'write')
46 if os.environ.get('SERVER_SOFTWARE', '').startswith('Development'):
47 host = 'Development'
48 else:
49 host = app_identity.get_default_version_hostname()
50 return HOST_ACLS[host][kind]
51
52 def has_permission(kind):
39 if users.is_current_user_admin(): 53 if users.is_current_user_admin():
54 logging.info('user is admin')
55 return True
56 email_pattern = get_host_permissions(kind)
57 if email_pattern == 'everyone':
40 return True 58 return True
41 user = users.get_current_user() 59 user = users.get_current_user()
42 return user and VALID_EMAIL_RE.match(user.email()) 60 logging.info('user: %s %s', user, 'xx' if not user else user.email())
61 return user and bool(email_pattern.match(user.email()))
62
63
64 def read_access(handler):
65 """Decorator ensuring current user has read access to this host."""
66 @functools.wraps(handler)
67 def ensure(self, *args, **kwargs):
68 if not has_permission('read'):
69 self.redirect(users.create_login_url(self.request.url))
70 return
71 return handler(self, *args, **kwargs)
72 return ensure
73
74
75 def get_friendly_hostname():
76 host = app_identity.get_default_version_hostname()
77 # For a typical host 'xyz-cq-status.appspot.com', return 'Xyz'.
78 return host.split('-')[0].capitalize() if host else '(Development)'
79
43 80
44 def memcachize(cache_check): 81 def memcachize(cache_check):
45 def decorator(f): 82 def decorator(f):
46 def memcachized(**kwargs): 83 def memcachized(**kwargs):
47 key = '%s.%s(%s)' % ( 84 key = '%s.%s(%s)' % (
48 f.__module__, 85 f.__module__,
49 f.__name__, 86 f.__name__,
50 ', '.join('%s=%r' % i for i in sorted(kwargs.items())), 87 ', '.join('%s=%r' % i for i in sorted(kwargs.items())),
51 ) 88 )
52 cache = memcache.get(key) 89 cache = memcache.get(key)
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after
103 140
104 141
105 def get_full_patchset_url(codereview_hostname, issue, patchset): 142 def get_full_patchset_url(codereview_hostname, issue, patchset):
106 if codereview_hostname.split('.')[0].split('-')[-1] == 'review': 143 if codereview_hostname.split('.')[0].split('-')[-1] == 'review':
107 # This is Gerrit, which has host-review.googlesource.com. 144 # This is Gerrit, which has host-review.googlesource.com.
108 templ = 'https://%s/#/c/%s/%s' 145 templ = 'https://%s/#/c/%s/%s'
109 else: 146 else:
110 # Rietveld. 147 # Rietveld.
111 templ = 'https://%s/%s/#ps%s' 148 templ = 'https://%s/%s/#ps%s'
112 return templ % (codereview_hostname, issue, patchset) 149 return templ % (codereview_hostname, issue, patchset)
OLDNEW
« no previous file with comments | « appengine/chromium_cq_status/shared/test/utils_test.py ('k') | appengine/chromium_cq_status/stats/test/stats_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698