| OLD | NEW |
| (Empty) | |
| 1 # Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 # Use of this source code is governed by the Apache v2.0 license that can be |
| 3 # found in the LICENSE file. |
| 4 |
| 5 import collections |
| 6 import contextlib |
| 7 import json |
| 8 import os |
| 9 import socket |
| 10 import sys |
| 11 import threading |
| 12 import types |
| 13 |
| 14 from libs.logdog import streamname, varint |
| 15 |
| 16 |
| 17 _StreamParamsBase = collections.namedtuple('_StreamParamsBase', |
| 18 ('name', 'type', 'content_type', 'tags', 'tee', 'binary_file_extension')) |
| 19 |
| 20 |
| 21 class StreamParams(_StreamParamsBase): |
| 22 """Defines the set of parameters to apply to a new stream.""" |
| 23 |
| 24 # A text content stream. |
| 25 TEXT = 'text' |
| 26 # A binary content stream. |
| 27 BINARY = 'binary' |
| 28 # A datagram content stream. |
| 29 DATAGRAM = 'datagram' |
| 30 |
| 31 # Tee parameter to tee this stream through the Butler's STDOUT. |
| 32 TEE_STDOUT = 'stdout' |
| 33 # Tee parameter to tee this stream through the Butler's STDERR. |
| 34 TEE_STDERR = 'stderr' |
| 35 |
| 36 @classmethod |
| 37 def make(cls, **kwargs): |
| 38 """Returns (StreamParams): A new StreamParams instance with supplied values. |
| 39 |
| 40 Any parameter that isn't supplied will be set to None. |
| 41 |
| 42 Args: |
| 43 kwargs (dict): Named parameters to apply. |
| 44 """ |
| 45 return cls(**{f: kwargs.get(f) for f in cls._fields}) |
| 46 |
| 47 def validate(self): |
| 48 """Raises (ValueError): if the parameters are not valid.""" |
| 49 streamname.validate_stream_name(self.name) |
| 50 |
| 51 if self.type not in (self.TEXT, self.BINARY, self.DATAGRAM): |
| 52 raise ValueError('Invalid type (%s)' % (self.type,)) |
| 53 |
| 54 if self.tags is not None: |
| 55 if not isinstance(self.tags, collections.Mapping): |
| 56 raise ValueError('Invalid tags type (%s)' % (self.tags,)) |
| 57 for k, v in self.tags.iteritems(): |
| 58 streamname.validate_tag(k, v) |
| 59 |
| 60 if self.tee not in (None, self.TEE_STDOUT, self.TEE_STDERR): |
| 61 raise ValueError('Invalid tee type (%s)' % (self.tee,)) |
| 62 |
| 63 if not isinstance(self.binary_file_extension, |
| 64 (types.NoneType, types.StringTypes)): |
| 65 raise ValueError('Invalid binary file extension type (%s)' % ( |
| 66 self.binary_file_extension,)) |
| 67 |
| 68 def to_json(self): |
| 69 """Returns (str): The JSON representation of the StreamParams. |
| 70 |
| 71 Converts stream parameters to JSON for Butler consumption. |
| 72 |
| 73 Raises: |
| 74 ValueError: if these parameters are not valid. |
| 75 """ |
| 76 self.validate() |
| 77 |
| 78 obj = { |
| 79 'name': self.name, |
| 80 'type': self.type, |
| 81 } |
| 82 |
| 83 def maybe_add(key, value): |
| 84 if value is not None: |
| 85 obj[key] = value |
| 86 maybe_add('contentType', self.content_type) |
| 87 maybe_add('tags', self.tags) |
| 88 maybe_add('tee', self.tee) |
| 89 maybe_add('binaryFileExtension', self.binary_file_extension) |
| 90 |
| 91 # Note that "dumps' will dump UTF-8 by default, which is what Butler wants. |
| 92 return json.dumps(obj, sort_keys=True, ensure_ascii=True, indent=None) |
| 93 |
| 94 |
| 95 class StreamProtocolRegistry(object): |
| 96 """Registry of streamserver URI protocols and their client classes. |
| 97 """ |
| 98 |
| 99 def __init__(self): |
| 100 self._registry = {} |
| 101 |
| 102 def register_protocol(self, protocol, client_cls): |
| 103 assert issubclass(client_cls, StreamClient) |
| 104 if self._registry.get(protocol) is not None: |
| 105 raise KeyError('Duplicate protocol registered.') |
| 106 self._registry[protocol] = client_cls |
| 107 |
| 108 def create(self, uri): |
| 109 uri = uri.split(':', 1) |
| 110 if len(uri) != 2: |
| 111 raise ValueError('Invalid stream server URI [%s]' % (uri,)) |
| 112 protocol, value = uri |
| 113 |
| 114 client_cls = self._registry.get(protocol) |
| 115 if not client_cls: |
| 116 raise ValueError('Unknown stream client protocol (%s)' % (protocol,)) |
| 117 return client_cls._create(value) |
| 118 |
| 119 # Default (global) registry. |
| 120 _default_registry = StreamProtocolRegistry() |
| 121 |
| 122 |
| 123 def create(uri): |
| 124 """Returns (StreamClient): A stream client for the specified URI. |
| 125 |
| 126 This uses the default StreamProtocolRegistry to instantiate a StreamClient |
| 127 for the specified URI. |
| 128 |
| 129 Args: |
| 130 uri: The streamserver URI. |
| 131 |
| 132 Raises: |
| 133 ValueError if the supplied URI references an invalid or improperly |
| 134 configured streamserver. |
| 135 """ |
| 136 return _default_registry.create(uri) |
| 137 |
| 138 |
| 139 class StreamClient(object): |
| 140 """Abstract base class for a streamserver client. |
| 141 """ |
| 142 |
| 143 class _DatagramStream(object): |
| 144 """Wraps a stream object to write length-prefixed datagrams.""" |
| 145 |
| 146 def __init__(self, fd): |
| 147 self._fd = fd |
| 148 |
| 149 def send(self, data): |
| 150 varint.write_uvarint(self._fd, len(data)) |
| 151 self._fd.write(data) |
| 152 |
| 153 def close(self): |
| 154 return self._fd.close() |
| 155 |
| 156 def __init__(self): |
| 157 self._name_lock = threading.Lock() |
| 158 self._names = set() |
| 159 |
| 160 def _register_new_stream(self, name): |
| 161 """Registers a new stream name. |
| 162 |
| 163 The Butler will internally reject any duplicate stream names. However, there |
| 164 isn't really feedback when this happens except a closed stream client. This |
| 165 is a client-side check to provide a more user-friendly experience in the |
| 166 event that a user attempts to register a duplicate stream name. |
| 167 |
| 168 Note that this is imperfect, as something else could register stream names |
| 169 with the same Butler instance and this library has no means of tracking. |
| 170 This is a best-effort experience, not a reliable check. |
| 171 |
| 172 Args: |
| 173 name (str): The name of the stream. |
| 174 |
| 175 Raises: |
| 176 ValueError if the stream name has already been registered. |
| 177 """ |
| 178 with self._name_lock: |
| 179 if name in self._names: |
| 180 raise ValueError("Duplicate stream name [%s]" % (name,)) |
| 181 self._names.add(name) |
| 182 |
| 183 @classmethod |
| 184 def _create(cls, value): |
| 185 """Returns (StreamClient): A new stream client connection. |
| 186 |
| 187 Validates the streamserver parameters and creates a new StreamClient |
| 188 instance that connects to them. |
| 189 |
| 190 Implementing classes must override this. |
| 191 """ |
| 192 raise NotImplementedError() |
| 193 |
| 194 def _connect_raw(self): |
| 195 """Returns (file): A new file-like stream. |
| 196 |
| 197 Creates a new raw connection to the streamserver. This connection MUST not |
| 198 have any data written to it past initialization (if needed) when it has been |
| 199 returned. |
| 200 |
| 201 The file-like object must implement `write` and `close`. |
| 202 |
| 203 Implementing classes must override this. |
| 204 """ |
| 205 raise NotImplementedError() |
| 206 |
| 207 def new_connection(self, params): |
| 208 """Returns (file): A new configured stream. |
| 209 |
| 210 The returned object implements (minimally) `write` and `close`. |
| 211 |
| 212 Creates a new LogDog stream with the specified parameters. |
| 213 |
| 214 Args: |
| 215 params (StreamParams): The parameters to use with the new connection. |
| 216 |
| 217 Raises: |
| 218 ValueError if the stream name has already been used, or if the parameters |
| 219 are not valid. |
| 220 """ |
| 221 self._register_new_stream(params.name) |
| 222 params_json = params.to_json() |
| 223 |
| 224 fd = self._connect_raw() |
| 225 varint.write_uvarint(fd, len(params_json)) |
| 226 fd.write(params_json) |
| 227 return fd |
| 228 |
| 229 @contextlib.contextmanager |
| 230 def text(self, name, **kwargs): |
| 231 """Context manager to create, use, and teardown a TEXT stream. |
| 232 |
| 233 This context manager creates a new butler TEXT stream with the specified |
| 234 parameters, yields it, and closes it on teardown. |
| 235 |
| 236 Args: |
| 237 name (str): the LogDog name of the stream. |
| 238 kwargs (dict): Log stream parameters. These may be any keyword arguments |
| 239 accepted by `open_text`. |
| 240 |
| 241 Returns (file): A file-like object to a Butler UTF-8 text stream supporting |
| 242 `write`. |
| 243 """ |
| 244 fd = None |
| 245 try: |
| 246 fd = self.open_text(name, **kwargs) |
| 247 yield fd |
| 248 finally: |
| 249 if fd is not None: |
| 250 fd.close() |
| 251 |
| 252 def open_text(self, name, content_type=None, tags=None, tee=None, |
| 253 binary_file_extension=None): |
| 254 """Returns (file): A file-like object for a single text stream. |
| 255 |
| 256 This creates a new butler TEXT stream with the specified parameters. |
| 257 |
| 258 Args: |
| 259 name (str): the LogDog name of the stream. |
| 260 content_type (str): The optional content type of the stream. If None, a |
| 261 default content type will be chosen by the Butler. |
| 262 tags (dict): An optional key/value dictionary pair of LogDog stream tags. |
| 263 tee (str): Describes how stream data should be tee'd through the Butler. |
| 264 One of StreamParams' TEE arguments. |
| 265 binary_file_extension (str): A custom binary file extension. If not |
| 266 provided, a default extension may be chosen or the binary stream may |
| 267 not be emitted. |
| 268 |
| 269 Returns (file): A file-like object to a Butler text stream. This object can |
| 270 have UTF-8 text content written to it with its `write` method, and must |
| 271 be closed when finished using its `close` method. |
| 272 """ |
| 273 params = StreamParams.make( |
| 274 name=name, |
| 275 type=StreamParams.TEXT, |
| 276 content_type=content_type, |
| 277 tags=tags, |
| 278 tee=tee, |
| 279 binary_file_extension=binary_file_extension) |
| 280 return self.new_connection(params) |
| 281 |
| 282 @contextlib.contextmanager |
| 283 def binary(self, name, **kwargs): |
| 284 """Context manager to create, use, and teardown a BINARY stream. |
| 285 |
| 286 This context manager creates a new butler BINARY stream with the specified |
| 287 parameters, yields it, and closes it on teardown. |
| 288 |
| 289 Args: |
| 290 name (str): the LogDog name of the stream. |
| 291 kwargs (dict): Log stream parameters. These may be any keyword arguments |
| 292 accepted by `open_binary`. |
| 293 |
| 294 Returns (file): A file-like object to a Butler binary stream supporting |
| 295 `write`. |
| 296 """ |
| 297 fd = None |
| 298 try: |
| 299 fd = self.open_binary(name, **kwargs) |
| 300 yield fd |
| 301 finally: |
| 302 if fd is not None: |
| 303 fd.close() |
| 304 |
| 305 def open_binary(self, name, content_type=None, tags=None, tee=None, |
| 306 binary_file_extension=None): |
| 307 """Returns (file): A file-like object for a single binary stream. |
| 308 |
| 309 This creates a new butler BINARY stream with the specified parameters. |
| 310 |
| 311 Args: |
| 312 name (str): the LogDog name of the stream. |
| 313 content_type (str): The optional content type of the stream. If None, a |
| 314 default content type will be chosen by the Butler. |
| 315 tags (dict): An optional key/value dictionary pair of LogDog stream tags. |
| 316 tee (str): Describes how stream data should be tee'd through the Butler. |
| 317 One of StreamParams' TEE arguments. |
| 318 binary_file_extension (str): A custom binary file extension. If not |
| 319 provided, a default extension may be chosen or the binary stream may |
| 320 not be emitted. |
| 321 |
| 322 Returns (file): A file-like object to a Butler binary stream. This object |
| 323 can have UTF-8 content written to it with its `write` method, and must |
| 324 be closed when finished using its `close` method. |
| 325 """ |
| 326 params = StreamParams.make( |
| 327 name=name, |
| 328 type=StreamParams.BINARY, |
| 329 content_type=content_type, |
| 330 tags=tags, |
| 331 tee=tee, |
| 332 binary_file_extension=binary_file_extension) |
| 333 return self.new_connection(params) |
| 334 |
| 335 @contextlib.contextmanager |
| 336 def datagram(self, name, **kwargs): |
| 337 """Context manager to create, use, and teardown a DATAGRAM stream. |
| 338 |
| 339 This context manager creates a new butler DATAAGRAM stream with the |
| 340 specified parameters, yields it, and closes it on teardown. |
| 341 |
| 342 Args: |
| 343 name (str): the LogDog name of the stream. |
| 344 kwargs (dict): Log stream parameters. These may be any keyword arguments |
| 345 accepted by `open_datagram`. |
| 346 |
| 347 Returns (_DatagramStream): A datagram stream object. Datagrams can be |
| 348 written to it using its `send` method. |
| 349 """ |
| 350 fd = None |
| 351 try: |
| 352 fd = self.open_datagram(name, **kwargs) |
| 353 yield fd |
| 354 finally: |
| 355 if fd is not None: |
| 356 fd.close() |
| 357 |
| 358 def open_datagram(self, name, content_type=None, tags=None, tee=None, |
| 359 binary_file_extension=None): |
| 360 """Creates a new butler DATAGRAM stream with the specified parameters. |
| 361 |
| 362 Args: |
| 363 name (str): the LogDog name of the stream. |
| 364 content_type (str): The optional content type of the stream. If None, a |
| 365 default content type will be chosen by the Butler. |
| 366 tags (dict): An optional key/value dictionary pair of LogDog stream tags. |
| 367 tee (str): Describes how stream data should be tee'd through the Butler. |
| 368 One of StreamParams' TEE arguments. |
| 369 binary_file_extension (str): A custom binary file extension. If not |
| 370 provided, a default extension may be chosen or the binary stream may |
| 371 not be emitted. |
| 372 |
| 373 Returns (_DatagramStream): A datagram stream object. Datagrams can be |
| 374 written to it using its `send` method. This object must be closed when |
| 375 finished by using its `close` method. |
| 376 """ |
| 377 params = StreamParams.make( |
| 378 name=name, |
| 379 type=StreamParams.DATAGRAM, |
| 380 content_type=content_type, |
| 381 tags=tags, |
| 382 tee=tee, |
| 383 binary_file_extension=binary_file_extension) |
| 384 return self._DatagramStream(self.new_connection(params)) |
| 385 |
| 386 |
| 387 class _NamedPipeStreamClient(StreamClient): |
| 388 """A StreamClient implementation that connects to a Windows named pipe. |
| 389 """ |
| 390 |
| 391 def __init__(self, name): |
| 392 r"""Initializes a new Windows named pipe stream client. |
| 393 |
| 394 Args: |
| 395 name (str): The name of the Windows named pipe to use (e.g., "\\.\name") |
| 396 """ |
| 397 super(_NamedPipeStreamClient, self).__init__() |
| 398 self._name = name |
| 399 |
| 400 @classmethod |
| 401 def _create(cls, value): |
| 402 return cls(value) |
| 403 |
| 404 def _connect_raw(self): |
| 405 return open(self._name, 'wb') |
| 406 |
| 407 _default_registry.register_protocol('net.pipe', _NamedPipeStreamClient) |
| 408 |
| 409 |
| 410 class _UnixDomainSocketStreamClient(StreamClient): |
| 411 """A StreamClient implementation that uses a UNIX domain socket. |
| 412 """ |
| 413 |
| 414 def __init__(self, path): |
| 415 """Initializes a new UNIX domain socket stream client. |
| 416 |
| 417 Args: |
| 418 path (str): The path to the named UNIX domain socket. |
| 419 """ |
| 420 super(_UnixDomainSocketStreamClient, self).__init__() |
| 421 self._path = path |
| 422 |
| 423 @classmethod |
| 424 def _create(cls, value): |
| 425 if not os.path.exists(value): |
| 426 raise ValueError('UNIX domain socket [%s] does not exist.' % (value,)) |
| 427 return cls(value) |
| 428 |
| 429 def _connect_raw(self): |
| 430 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
| 431 sock.connect(self._path) |
| 432 return sock |
| 433 |
| 434 _default_registry.register_protocol('unix', _UnixDomainSocketStreamClient) |
| OLD | NEW |