OLD | NEW |
(Empty) | |
| 1 # Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 # Use of this source code is governed under the Apache License, Version 2.0 |
| 3 # that can be found in the LICENSE file. |
| 4 |
| 5 """Implements a client library for reading and writing LUCI_CONTEXT compatible |
| 6 files. |
| 7 |
| 8 Due to arcane details of the UNIX process model and environment variables, this |
| 9 library is unfortunately NOT threadsafe; there's no way to have multiple |
| 10 LUCI_CONTEXTS live in a process safely at the same time. As such, this library |
| 11 will raise an exception if any attempt is made to use it improperly (for example |
| 12 by having multiple threads call 'write' at the same time). |
| 13 |
| 14 See ../LUCI_CONTEXT.md for details on the LUCI_CONTEXT concept/protocol.""" |
| 15 |
| 16 import contextlib |
| 17 import copy |
| 18 import json |
| 19 import logging |
| 20 import os |
| 21 import sys |
| 22 import tempfile |
| 23 import threading |
| 24 |
| 25 _LOGGER = logging.getLogger(__name__) |
| 26 |
| 27 # _ENV_KEY is the environment variable that we look for to find out where the |
| 28 # LUCI context file is. |
| 29 _ENV_KEY = 'LUCI_CONTEXT' |
| 30 |
| 31 # _CUR_CONTEXT contains the cached LUCI Context that is currently available to |
| 32 # read. A value of None indicates that the value has not yet been populated. |
| 33 _CUR_CONTEXT = None |
| 34 _CUR_CONTEXT_LOCK = threading.Lock() |
| 35 |
| 36 # Write lock is a recursive mutex which is taken when using the write() method. |
| 37 # This allows the same thread to |
| 38 _WRITE_LOCK = threading.RLock() |
| 39 |
| 40 |
| 41 @contextlib.contextmanager |
| 42 def _tf(data, data_raw=False, workdir=None): |
| 43 tf = tempfile.NamedTemporaryFile(prefix='luci_ctx.', suffix='.json', |
| 44 delete=False, dir=workdir) |
| 45 _LOGGER.debug('Writing LUCI_CONTEXT file %r', tf.name) |
| 46 try: |
| 47 if not data_raw: |
| 48 json.dump(data, tf) |
| 49 else: |
| 50 # for testing, allows malformed json |
| 51 tf.write(data) |
| 52 tf.close() # close it so that winders subprocesses can read it. |
| 53 yield tf.name |
| 54 finally: |
| 55 try: |
| 56 os.unlink(tf.name) |
| 57 except OSError as ex: |
| 58 _LOGGER.error( |
| 59 'Failed to delete written LUCI_CONTEXT file %r: %s', tf.name, ex) |
| 60 |
| 61 |
| 62 def _to_utf8(obj): |
| 63 if isinstance(obj, dict): |
| 64 return {_to_utf8(key): _to_utf8(value) for key, value in obj.iteritems()} |
| 65 if isinstance(obj, list): |
| 66 return [_to_utf8(item) for item in obj] |
| 67 if isinstance(obj, unicode): |
| 68 return obj.encode('utf-8') |
| 69 return obj |
| 70 |
| 71 |
| 72 class MultipleLUCIContextException(Exception): |
| 73 def __init__(self): |
| 74 super(MultipleLUCIContextException, self).__init__( |
| 75 'Attempted to write LUCI_CONTEXT in multiple threads') |
| 76 |
| 77 |
| 78 def _check_ok(data): |
| 79 if not isinstance(data, dict): |
| 80 _LOGGER.error( |
| 81 'LUCI_CONTEXT does not contain a dict: %s', type(data).__name__) |
| 82 return False |
| 83 |
| 84 bad = False |
| 85 for k, v in data.iteritems(): |
| 86 if not isinstance(v, dict): |
| 87 bad = True |
| 88 _LOGGER.error( |
| 89 'LUCI_CONTEXT[%r] is not a dict: %s', k, type(v).__name__) |
| 90 |
| 91 return not bad |
| 92 |
| 93 |
| 94 # this is a separate function from _read_full for testing purposes. |
| 95 def _initial_load(): |
| 96 global _CUR_CONTEXT |
| 97 to_assign = {} |
| 98 |
| 99 ctx_path = os.environ.get(_ENV_KEY) |
| 100 if ctx_path: |
| 101 ctx_path = ctx_path.decode(sys.getfilesystemencoding()) |
| 102 _LOGGER.debug('Loading LUCI_CONTEXT: %r', ctx_path) |
| 103 try: |
| 104 with open(ctx_path, 'r') as f: |
| 105 loaded = _to_utf8(json.load(f)) |
| 106 if _check_ok(loaded): |
| 107 to_assign = loaded |
| 108 except OSError as ex: |
| 109 _LOGGER.error('LUCI_CONTEXT failed to open: %s', ex) |
| 110 except IOError as ex: |
| 111 _LOGGER.error('LUCI_CONTEXT failed to read: %s', ex) |
| 112 except ValueError as ex: |
| 113 _LOGGER.error('LUCI_CONTEXT failed to decode: %s', ex) |
| 114 |
| 115 _CUR_CONTEXT = to_assign |
| 116 |
| 117 |
| 118 def _read_full(): |
| 119 # double-check because I'm a hopeless diehard. |
| 120 if _CUR_CONTEXT is None: |
| 121 with _CUR_CONTEXT_LOCK: |
| 122 if _CUR_CONTEXT is None: |
| 123 _initial_load() |
| 124 return _CUR_CONTEXT |
| 125 |
| 126 |
| 127 def _mutate(section_values): |
| 128 new_val = read_full() |
| 129 for section, value in section_values.iteritems(): |
| 130 if value is None: |
| 131 new_val.pop(section, None) |
| 132 elif isinstance(value, dict): |
| 133 new_val[section] = value |
| 134 else: |
| 135 raise ValueError( |
| 136 'Bad type for LUCI_CONTEXT[%r]: %s', section, type(value).__name__) |
| 137 return new_val |
| 138 |
| 139 |
| 140 def read_full(): |
| 141 """Returns a copy of the entire current contents of the LUCI_CONTEXT as |
| 142 a dict. |
| 143 """ |
| 144 return copy.deepcopy(_read_full()) |
| 145 |
| 146 |
| 147 def read(section_key): |
| 148 """Reads from the given section key. Returns the data in the section or None |
| 149 if the data doesn't exist. |
| 150 |
| 151 Args: |
| 152 section_key (str) - The top-level key to read from the LUCI_CONTEXT. |
| 153 |
| 154 Returns: |
| 155 A copy of the requested section data (as a dict), or None if the section was |
| 156 not present. |
| 157 |
| 158 Example: |
| 159 Given a LUCI_CONTEXT of: |
| 160 { |
| 161 "swarming": { |
| 162 "secret_bytes": <bytes> |
| 163 }, |
| 164 "other_service": { |
| 165 "nested": { |
| 166 "key": "something" |
| 167 } |
| 168 } |
| 169 } |
| 170 |
| 171 read('swarming') -> {'secret_bytes': <bytes>} |
| 172 read('doesnt_exist') -> None |
| 173 """ |
| 174 return copy.deepcopy(_read_full().get(section_key, None)) |
| 175 |
| 176 |
| 177 @contextlib.contextmanager |
| 178 def write(_tmpdir=None, **section_values): |
| 179 """Write is a contextmanager which will write all of the provided section |
| 180 details to a new context, copying over the values from any unmentioned |
| 181 sections. The new context file will be set in os.environ. When the |
| 182 contextmanager exits, it will attempt to delete the context file. |
| 183 |
| 184 Since each call to write produces a new context file on disk, it's beneficial |
| 185 to group edits together into a single call to write when possible. |
| 186 |
| 187 Calls to read*() within the context of a call to write will read from the |
| 188 written value. This written value is stored on a per-thread basis. |
| 189 |
| 190 NOTE: Because environment variables are per-process and not per-thread, it is |
| 191 an error to call write() from multiple threads simultaneously. If this is |
| 192 done, this function raises an exception. |
| 193 |
| 194 Args: |
| 195 _tmpdir (str) - an optional directory to use for the newly written |
| 196 LUCI_CONTEXT file. |
| 197 section_values (str -> value) - A mapping of section_key to the new value |
| 198 for that section. A value of None will remove that section. Non-None |
| 199 values must be of the type 'dict', and must be json serializable. |
| 200 |
| 201 Raises: |
| 202 MultipleLUCIContextException if called from multiple threads |
| 203 simulataneously. |
| 204 |
| 205 Example: |
| 206 Given a LUCI_CONTEXT of: |
| 207 { |
| 208 "swarming": { |
| 209 "secret_bytes": <bytes> |
| 210 }, |
| 211 "other_service": { |
| 212 ... |
| 213 } |
| 214 } |
| 215 |
| 216 with write(swarming=None): ... # deletes 'swarming' |
| 217 with write(something={...}): ... # sets 'something' section to {...} |
| 218 """ |
| 219 # If there are no edits, just pass-through |
| 220 if not section_values: |
| 221 yield |
| 222 return |
| 223 |
| 224 new_val = _mutate(section_values) |
| 225 |
| 226 global _CUR_CONTEXT |
| 227 got_lock = _WRITE_LOCK.acquire(blocking=False) |
| 228 if not got_lock: |
| 229 raise MultipleLUCIContextException() |
| 230 try: |
| 231 with _tf(new_val, workdir=_tmpdir) as name: |
| 232 try: |
| 233 old_value = _CUR_CONTEXT |
| 234 old_envvar = os.environ.get(_ENV_KEY, None) |
| 235 |
| 236 os.environ[_ENV_KEY] = name.encode(sys.getfilesystemencoding()) |
| 237 _CUR_CONTEXT = new_val |
| 238 yield |
| 239 finally: |
| 240 _CUR_CONTEXT = old_value |
| 241 if old_envvar is None: |
| 242 del os.environ[_ENV_KEY] |
| 243 else: |
| 244 os.environ[_ENV_KEY] = old_envvar |
| 245 finally: |
| 246 _WRITE_LOCK.release() |
| 247 |
| 248 |
| 249 @contextlib.contextmanager |
| 250 def stage(_tmpdir=None, **section_values): |
| 251 """Prepares and writes new LUCI_CONTEXT file, but doesn't replace the env var. |
| 252 |
| 253 This is useful when launching new process asynchronously in new LUCI_CONTEXT |
| 254 environment. In this case, modifying the environment of the current process |
| 255 (like 'write' does) may be harmful. |
| 256 |
| 257 Calls the body with a path to the new LUCI_CONTEXT file or None if |
| 258 'section_values' is empty (meaning, no changes have been made). |
| 259 """ |
| 260 if not section_values: |
| 261 yield None |
| 262 return |
| 263 with _tf(_mutate(section_values), workdir=_tmpdir) as name: |
| 264 yield name |
OLD | NEW |