Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 # Copyright 2016 The LUCI Authors. All rights reserved. | |
| 2 # Use of this source code is governed by the Apache v2.0 license that can be | |
| 3 # found in the LICENSE file. | |
| 4 | |
| 5 import collections | |
| 6 import contextlib | |
| 7 import os | |
| 8 import json | |
| 9 import socket | |
| 10 import sys | |
| 11 import threading | |
| 12 import types | |
| 13 | |
| 14 from client.libs.logdog import streamname, varint | |
| 15 | |
| 16 | |
| 17 _StreamParamsBase = collections.namedtuple('_StreamParamsBase', | |
| 18 ('name', 'type', 'content_type', 'tags', 'tee', 'binary_file_extension')) | |
| 19 | |
| 20 | |
| 21 class StreamParams(_StreamParamsBase): | |
|
martiniss
2016/05/10 22:12:38
This could be generated from a protobuf, right? Or
dnj (Google)
2016/05/12 17:54:02
Nope, this is a Go struct, so we need a Python ver
| |
| 22 """Defines the set of parameters to apply to a new stream. | |
| 23 """ | |
| 24 | |
| 25 # A text content stream. | |
| 26 TEXT = 'text' | |
| 27 # A binary content stream. | |
| 28 BINARY = 'binary' | |
| 29 # A datagram content stream. | |
| 30 DATAGRAM = 'datagram' | |
| 31 | |
| 32 # Tee parameter to tee this stream through the Butler's STDOUT. | |
| 33 TEE_STDOUT = 'stdout' | |
| 34 # Tee parameter to tee this stream through the Butler's STDERR. | |
| 35 TEE_STDERR = 'stderr' | |
| 36 | |
| 37 @classmethod | |
| 38 def make(cls, **kwargs): | |
| 39 """Returns (StreamParams): A new StreamParams instance with supplied values. | |
| 40 | |
| 41 Any parameter that isn't supplied will be set to None. | |
| 42 | |
| 43 Args: | |
| 44 kwargs (dict): Named parameters to apply. | |
| 45 """ | |
| 46 return cls(**{f: kwargs.get(f) for f in cls._fields}) | |
| 47 | |
| 48 def validate(self): | |
| 49 """Raises (ValueError): if the parameters are not valid.""" | |
| 50 streamname.validate_stream_name(self.name) | |
| 51 | |
| 52 if self.type not in (self.TEXT, self.BINARY, self.DATAGRAM): | |
| 53 raise ValueError('Invalid type (%s)' % (self.type,)) | |
| 54 | |
| 55 if self.tags is not None: | |
| 56 if not isinstance(self.tags, collections.Mapping): | |
| 57 raise ValueError('Invalid tags type (%s)' % (self.tags,)) | |
| 58 for k, v in self.tags.iteritems(): | |
| 59 streamname.validate_tag(k, v) | |
| 60 | |
| 61 if self.tee not in (None, self.TEE_STDOUT, self.TEE_STDERR): | |
| 62 raise ValueError('Invalid tee type (%s)' % (self.tee,)) | |
| 63 | |
| 64 if not isinstance(self.binary_file_extension, | |
| 65 (types.NoneType, types.StringTypes)): | |
| 66 raise ValueError('Invalid binary file extension type (%s)' % ( | |
| 67 self.binary_file_extension,)) | |
| 68 | |
| 69 def to_json(self): | |
| 70 """Returns (str): The JSON representation of the StreamParams. | |
| 71 | |
| 72 Converts stream parameters to JSON for Butler consumption. | |
| 73 | |
| 74 Raises: | |
| 75 ValueError: if these parameters are not valid. | |
| 76 """ | |
| 77 self.validate() | |
| 78 | |
| 79 obj = { | |
| 80 'name': self.name, | |
| 81 'type': self.type, | |
| 82 } | |
| 83 | |
| 84 def maybe_add(key, value): | |
| 85 if value is not None: | |
| 86 obj[key] = value | |
| 87 maybe_add('contentType', self.content_type) | |
| 88 maybe_add('tags', self.tags) | |
| 89 maybe_add('tee', self.tee) | |
| 90 maybe_add('binaryFileExtension', self.binary_file_extension) | |
| 91 | |
| 92 # Note that "dumps' will dump UTF-8 by default, which is what Butler wants. | |
| 93 return json.dumps(obj, sort_keys=True, ensure_ascii=True, indent=None) | |
| 94 | |
| 95 | |
| 96 class StreamProtocolRegistry(object): | |
| 97 """Registry of streamserver URI protocols and their client classes. | |
| 98 """ | |
| 99 | |
| 100 def __init__(self): | |
| 101 self._registry = {} | |
| 102 | |
| 103 def register_protocol(self, protocol, client_cls): | |
| 104 assert issubclass(client_cls, StreamClient) | |
| 105 if self._registry.get(protocol) is not None: | |
| 106 raise KeyError('Duplicate protocol registered.') | |
| 107 self._registry[protocol] = client_cls | |
| 108 | |
| 109 def create(self, uri): | |
| 110 uri = uri.split(':', 1) | |
| 111 if len(uri) != 2: | |
| 112 raise ValueError('Invalid stream server URI [%s]' % (uri,)) | |
| 113 protocol, value = uri | |
| 114 | |
| 115 client_cls = self._registry.get(protocol) | |
| 116 if not client_cls: | |
| 117 raise ValueError('Unknown stream client protocol (%s)' % (protocol,)) | |
| 118 return client_cls._create(value) | |
| 119 | |
| 120 # Default (global) registry. | |
| 121 _default_registry = StreamProtocolRegistry() | |
| 122 | |
| 123 | |
| 124 def create(uri): | |
| 125 """Returns (StreamClient): A stream client for the specified URI. | |
| 126 | |
| 127 This uses the default StreamProtocolRegistry to instantiate a StreamClient | |
| 128 for the specified URI. | |
| 129 | |
| 130 Args: | |
| 131 uri: The streamserver URI. | |
| 132 | |
| 133 Raises: | |
| 134 ValueError if the supplied URI references an invalid or improperly | |
| 135 configured streamserver. | |
| 136 """ | |
| 137 return _default_registry.create(uri) | |
| 138 | |
| 139 | |
| 140 class StreamClient(object): | |
| 141 """Abstract base class for a streamserver client. | |
| 142 """ | |
| 143 | |
| 144 class _DatagramStream(object): | |
| 145 """Wraps a stream object to write length-prefixed datagrams.""" | |
| 146 | |
| 147 def __init__(self, fd): | |
| 148 self._fd = fd | |
| 149 | |
| 150 def send(self, data): | |
| 151 varint.write_uvarint(self._fd, len(data)) | |
| 152 self._fd.write(data) | |
| 153 | |
| 154 def close(self): | |
| 155 return self._fd.close() | |
| 156 | |
| 157 def __init__(self): | |
| 158 self._name_lock = threading.Lock() | |
| 159 self._names = set() | |
| 160 | |
| 161 def _register_new_stream(self, name): | |
| 162 """Registers a new stream name. | |
| 163 | |
| 164 The Butler will internally reject any duplicate stream names. However, there | |
| 165 isn't really feedback when this happens except a closed stream client. This | |
| 166 is a client-side check to provide a more user-friendly experience in the | |
| 167 event that a user attempts to register a duplicate stream name. | |
| 168 | |
| 169 Note that this is imperfect, as somethig else could register stream names | |
|
martiniss
2016/05/10 22:12:38
typo
dnj (Google)
2016/05/12 17:54:03
Done.
| |
| 170 with the same Butler instance and this library has no means of tracking. | |
| 171 This is a best-effort experience, not a reliable check. | |
| 172 | |
| 173 Args: | |
| 174 name (str): The name of the stream. | |
| 175 | |
| 176 Raises: | |
| 177 ValueError if the stream name has already been registered. | |
| 178 """ | |
| 179 with self._name_lock: | |
| 180 if name in self._names: | |
| 181 raise ValueError("Duplicate stream name [%s]" % (name,)) | |
| 182 self._names.add(name) | |
| 183 | |
| 184 @classmethod | |
| 185 def _create(cls, value): | |
| 186 """Returns (StreamClient): A new stream client connection. | |
| 187 | |
| 188 Validates the streamserver parameters and creates a new StreamClient | |
| 189 instance that connects to them. | |
| 190 | |
| 191 Implementing classes must override this. | |
| 192 """ | |
| 193 raise NotImplementedError() | |
| 194 | |
| 195 def _connect_raw(self): | |
| 196 """Returns (file): A new file-like stream. | |
| 197 | |
| 198 Creates a new raw connection to the streamserver. This connection MUST not | |
| 199 have any data written to it past initialization (if needed) when it has been | |
| 200 returned. | |
| 201 | |
| 202 The file-like object must implement `write` and `close`. | |
| 203 | |
| 204 Implementing classes must override this. | |
| 205 """ | |
| 206 raise NotImplementedError() | |
| 207 | |
| 208 def new_connection(self, params): | |
| 209 """Returns (file): A new configured stream. | |
| 210 | |
| 211 The returned object implements (minimally) `write` and `close`. | |
| 212 | |
| 213 Creates a new LogDog stream with the specified parameters. | |
| 214 | |
| 215 Raises: | |
| 216 ValueError if the stream name has already been used. | |
| 217 """ | |
| 218 self._register_new_stream(params.name) | |
| 219 params_json = params.to_json() | |
| 220 | |
| 221 fd = self._connect_raw() | |
| 222 varint.write_uvarint(fd, len(params_json)) | |
| 223 fd.write(params_json) | |
| 224 return fd | |
| 225 | |
| 226 @contextlib.contextmanager | |
| 227 def text(self, name, **kwargs): | |
| 228 """Context manager to create, use, and teardown a TEXT stream. | |
| 229 | |
| 230 This context manager creates a new butler TEXT stream with the specified | |
| 231 name and parameters, yields it, and closes it on teardown. | |
| 232 | |
| 233 Args: | |
| 234 name (str): the LogDog name of the stream. | |
| 235 kwargs (dict): Log stream parameters. These may be any keyword arguments | |
| 236 accepted by `open_text`. | |
| 237 | |
| 238 Returns (file): A file-like object to a Butler UTF-8 text stream supporting | |
| 239 `write`. | |
| 240 """ | |
| 241 fd = None | |
| 242 try: | |
| 243 fd = self.open_text(name, **kwargs) | |
| 244 yield fd | |
| 245 finally: | |
| 246 if fd is not None: | |
| 247 fd.close() | |
| 248 | |
| 249 def open_text(self, name, content_type=None, tags=None, tee=None, | |
| 250 binary_file_extension=None): | |
| 251 """Returns (file): A file-like object for a single text stream. | |
| 252 | |
| 253 This creates a new butler TEXT stream with the specified name and | |
| 254 parameters. | |
| 255 | |
| 256 Args: | |
| 257 name (str): the LogDog name of the stream. | |
| 258 content_type (str): The optional content type of the stream. If None, a | |
| 259 default content type will be chosen by the Butler. | |
| 260 tags (dict): An optional key/value dictionary pair of LogDog stream tags. | |
| 261 tee (str): Describes how stream data should be tee'd through the Butler. | |
| 262 One of StreamParams' TEE arguments. | |
| 263 binary_file_extension (str): A custom binary file extension. If not | |
| 264 provided, a default extension may be chosen or the binary stream may | |
| 265 not be emitted. | |
| 266 | |
| 267 Returns (file): A file-like object to a Butler text stream. This object can | |
| 268 have UTF-8 text content written to it with its `write` method, and must | |
| 269 be closed when finished using its `close` method. | |
| 270 """ | |
| 271 params = StreamParams.make( | |
| 272 name=name, | |
| 273 type=StreamParams.TEXT, | |
| 274 content_type=content_type, | |
| 275 tags=tags, | |
| 276 tee=tee, | |
| 277 binary_file_extension=binary_file_extension) | |
| 278 return self.new_connection(params) | |
| 279 | |
| 280 @contextlib.contextmanager | |
| 281 def binary(self, name, **kwargs): | |
| 282 """Context manager to create, use, and teardown a BINARY stream. | |
| 283 | |
| 284 This context manager creates a new butler BINARY stream with the specified | |
| 285 name and parameters, yields it, and closes it on teardown. | |
| 286 | |
| 287 Args: | |
| 288 name (str): the LogDog name of the stream. | |
| 289 kwargs (dict): Log stream parameters. These may be any keyword arguments | |
| 290 accepted by `open_binary`. | |
| 291 | |
| 292 Returns (file): A file-like object to a Butler binary stream supporting | |
| 293 `write`. | |
| 294 """ | |
| 295 fd = None | |
| 296 try: | |
| 297 fd = self.open_binary(name, **kwargs) | |
| 298 yield fd | |
| 299 finally: | |
| 300 if fd is not None: | |
| 301 fd.close() | |
| 302 | |
| 303 def open_binary(self, name, content_type=None, tags=None, tee=None, | |
| 304 binary_file_extension=None): | |
| 305 """Returns (file): A file-like object for a single binary stream. | |
| 306 | |
| 307 This creates a new butler BINARY stream with the specified name and | |
| 308 parameters. | |
| 309 | |
| 310 Args: | |
| 311 name (str): the LogDog name of the stream. | |
| 312 content_type (str): The optional content type of the stream. If None, a | |
| 313 default content type will be chosen by the Butler. | |
| 314 tags (dict): An optional key/value dictionary pair of LogDog stream tags. | |
| 315 tee (str): Describes how stream data should be tee'd through the Butler. | |
| 316 One of StreamParams' TEE arguments. | |
| 317 binary_file_extension (str): A custom binary file extension. If not | |
| 318 provided, a default extension may be chosen or the binary stream may | |
| 319 not be emitted. | |
| 320 | |
| 321 Returns (file): A file-like object to a Butler binary stream. This object | |
| 322 can have UTF-8 content written to it with its `write` method, and must | |
| 323 be closed when finished using its `close` method. | |
| 324 """ | |
| 325 params = StreamParams.make( | |
| 326 name=name, | |
| 327 type=StreamParams.BINARY, | |
| 328 content_type=content_type, | |
| 329 tags=tags, | |
| 330 tee=tee, | |
| 331 binary_file_extension=binary_file_extension) | |
| 332 return self.new_connection(params) | |
| 333 | |
| 334 @contextlib.contextmanager | |
| 335 def datagram(self, name, **kwargs): | |
| 336 """Context manager to create, use, and teardown a DATAGRAM stream. | |
| 337 | |
| 338 This context manager creates a new butler DATAAGRAM stream with the | |
| 339 specified name and parameters, yields it, and closes it on teardown. | |
| 340 | |
| 341 Args: | |
| 342 name (str): the LogDog name of the stream. | |
| 343 kwargs (dict): Log stream parameters. These may be any keyword arguments | |
| 344 accepted by `open_datagram`. | |
| 345 | |
| 346 Returns (_DatagramStream): A datagram stream object. Datagrams can be | |
| 347 written to it using its `send` method. | |
| 348 """ | |
| 349 fd = None | |
| 350 try: | |
| 351 fd = self.open_datagram(name, **kwargs) | |
| 352 yield fd | |
| 353 finally: | |
| 354 if fd is not None: | |
| 355 fd.close() | |
| 356 | |
| 357 def open_datagram(self, name, content_type=None, tags=None, tee=None, | |
| 358 binary_file_extension=None): | |
| 359 """Returns: (file): A file-like object for a single text stream. | |
|
martiniss
2016/05/10 22:12:38
Is this correct? I think this was copy pasted.
dnj (Google)
2016/05/12 17:54:02
Yep was copy/pasted, cleaned that up.
| |
| 360 | |
| 361 This creates a new butler TEXT stream with the specified name and | |
| 362 parameters. | |
| 363 | |
| 364 Args: | |
| 365 name (str): the LogDog name of the stream. | |
| 366 content_type (str): The optional content type of the stream. If None, a | |
| 367 default content type will be chosen by the Butler. | |
| 368 tags (dict): An optional key/value dictionary pair of LogDog stream tags. | |
| 369 tee (str): Describes how stream data should be tee'd through the Butler. | |
| 370 One of StreamParams' TEE arguments. | |
| 371 binary_file_extension (str): A custom binary file extension. If not | |
| 372 provided, a default extension may be chosen or the binary stream may | |
| 373 not be emitted. | |
| 374 | |
| 375 Returns (_DatagramStream): A datagram stream object. Datagrams can be | |
| 376 written to it using its `send` method. This object must be closed when | |
| 377 finished by using its `close` method. | |
| 378 """ | |
| 379 params = StreamParams.make( | |
| 380 name=name, | |
| 381 type=StreamParams.DATAGRAM, | |
| 382 content_type=content_type, | |
| 383 tags=tags, | |
| 384 tee=tee, | |
| 385 binary_file_extension=binary_file_extension) | |
| 386 return self._DatagramStream(self.new_connection(params)) | |
| 387 | |
| 388 | |
| 389 class _NamedPipeStreamClient(StreamClient): | |
| 390 """A StreamClient implementation that connects to a Windows named pipe. | |
|
martiniss
2016/05/10 22:12:38
Windows? Why does line 394 say UNIX? Bad copy-past
dnj (Google)
2016/05/12 17:54:02
Was copy/pasted.
| |
| 391 """ | |
| 392 | |
| 393 def __init__(self, name): | |
| 394 r"""Initializes a new UNIX domain socket stream client. | |
| 395 | |
| 396 Args: | |
| 397 name (str): The name of the Windows named pipe to use (e.g., "\\.\name") | |
| 398 """ | |
| 399 super(_NamedPipeStreamClient, self).__init__() | |
| 400 self._name = name | |
| 401 | |
| 402 @classmethod | |
| 403 def _create(cls, value): | |
| 404 return cls(value) | |
| 405 | |
| 406 def _connect_raw(self): | |
| 407 return open(self._name, 'wb') | |
| 408 | |
| 409 _default_registry.register_protocol('net.pipe', _NamedPipeStreamClient) | |
| 410 | |
| 411 | |
| 412 class _UnixDomainSocketStreamClient(StreamClient): | |
| 413 """A StreamClient implementation that uses a UNIX domain socket. | |
| 414 """ | |
| 415 | |
| 416 def __init__(self, path): | |
| 417 """Initializes a new UNIX domain socket stream client. | |
| 418 | |
| 419 Args: | |
| 420 path (str): The path to the named UNIX domain socket. | |
| 421 """ | |
| 422 super(_UnixDomainSocketStreamClient, self).__init__() | |
| 423 self._path = path | |
| 424 | |
| 425 @classmethod | |
| 426 def _create(cls, value): | |
| 427 if not os.path.exists(value): | |
| 428 raise ValueError('UNIX domain socket [%s] does not exist.' % (value,)) | |
| 429 return cls(value) | |
| 430 | |
| 431 def _connect_raw(self): | |
| 432 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | |
| 433 sock.connect(self._path) | |
| 434 return sock | |
| 435 | |
| 436 _default_registry.register_protocol('unix', _UnixDomainSocketStreamClient) | |
| OLD | NEW |