Chromium Code Reviews| Index: client/libs/logdog/stream.py |
| diff --git a/client/libs/logdog/stream.py b/client/libs/logdog/stream.py |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..bac54bc59cecad40bd7de7f442db6482441d9296 |
| --- /dev/null |
| +++ b/client/libs/logdog/stream.py |
| @@ -0,0 +1,436 @@ |
| +# Copyright 2016 The LUCI Authors. All rights reserved. |
| +# Use of this source code is governed by the Apache v2.0 license that can be |
| +# found in the LICENSE file. |
| + |
| +import collections |
| +import contextlib |
| +import os |
| +import json |
| +import socket |
| +import sys |
| +import threading |
| +import types |
| + |
| +from client.libs.logdog import streamname, varint |
| + |
| + |
| +_StreamParamsBase = collections.namedtuple('_StreamParamsBase', |
| + ('name', 'type', 'content_type', 'tags', 'tee', 'binary_file_extension')) |
| + |
| + |
| +class StreamParams(_StreamParamsBase): |
|
martiniss
2016/05/10 22:12:38
This could be generated from a protobuf, right? Or
dnj (Google)
2016/05/12 17:54:02
Nope, this is a Go struct, so we need a Python ver
|
| + """Defines the set of parameters to apply to a new stream. |
| + """ |
| + |
| + # A text content stream. |
| + TEXT = 'text' |
| + # A binary content stream. |
| + BINARY = 'binary' |
| + # A datagram content stream. |
| + DATAGRAM = 'datagram' |
| + |
| + # Tee parameter to tee this stream through the Butler's STDOUT. |
| + TEE_STDOUT = 'stdout' |
| + # Tee parameter to tee this stream through the Butler's STDERR. |
| + TEE_STDERR = 'stderr' |
| + |
| + @classmethod |
| + def make(cls, **kwargs): |
| + """Returns (StreamParams): A new StreamParams instance with supplied values. |
| + |
| + Any parameter that isn't supplied will be set to None. |
| + |
| + Args: |
| + kwargs (dict): Named parameters to apply. |
| + """ |
| + return cls(**{f: kwargs.get(f) for f in cls._fields}) |
| + |
| + def validate(self): |
| + """Raises (ValueError): if the parameters are not valid.""" |
| + streamname.validate_stream_name(self.name) |
| + |
| + if self.type not in (self.TEXT, self.BINARY, self.DATAGRAM): |
| + raise ValueError('Invalid type (%s)' % (self.type,)) |
| + |
| + if self.tags is not None: |
| + if not isinstance(self.tags, collections.Mapping): |
| + raise ValueError('Invalid tags type (%s)' % (self.tags,)) |
| + for k, v in self.tags.iteritems(): |
| + streamname.validate_tag(k, v) |
| + |
| + if self.tee not in (None, self.TEE_STDOUT, self.TEE_STDERR): |
| + raise ValueError('Invalid tee type (%s)' % (self.tee,)) |
| + |
| + if not isinstance(self.binary_file_extension, |
| + (types.NoneType, types.StringTypes)): |
| + raise ValueError('Invalid binary file extension type (%s)' % ( |
| + self.binary_file_extension,)) |
| + |
| + def to_json(self): |
| + """Returns (str): The JSON representation of the StreamParams. |
| + |
| + Converts stream parameters to JSON for Butler consumption. |
| + |
| + Raises: |
| + ValueError: if these parameters are not valid. |
| + """ |
| + self.validate() |
| + |
| + obj = { |
| + 'name': self.name, |
| + 'type': self.type, |
| + } |
| + |
| + def maybe_add(key, value): |
| + if value is not None: |
| + obj[key] = value |
| + maybe_add('contentType', self.content_type) |
| + maybe_add('tags', self.tags) |
| + maybe_add('tee', self.tee) |
| + maybe_add('binaryFileExtension', self.binary_file_extension) |
| + |
| + # Note that "dumps' will dump UTF-8 by default, which is what Butler wants. |
| + return json.dumps(obj, sort_keys=True, ensure_ascii=True, indent=None) |
| + |
| + |
| +class StreamProtocolRegistry(object): |
| + """Registry of streamserver URI protocols and their client classes. |
| + """ |
| + |
| + def __init__(self): |
| + self._registry = {} |
| + |
| + def register_protocol(self, protocol, client_cls): |
| + assert issubclass(client_cls, StreamClient) |
| + if self._registry.get(protocol) is not None: |
| + raise KeyError('Duplicate protocol registered.') |
| + self._registry[protocol] = client_cls |
| + |
| + def create(self, uri): |
| + uri = uri.split(':', 1) |
| + if len(uri) != 2: |
| + raise ValueError('Invalid stream server URI [%s]' % (uri,)) |
| + protocol, value = uri |
| + |
| + client_cls = self._registry.get(protocol) |
| + if not client_cls: |
| + raise ValueError('Unknown stream client protocol (%s)' % (protocol,)) |
| + return client_cls._create(value) |
| + |
| +# Default (global) registry. |
| +_default_registry = StreamProtocolRegistry() |
| + |
| + |
| +def create(uri): |
| + """Returns (StreamClient): A stream client for the specified URI. |
| + |
| + This uses the default StreamProtocolRegistry to instantiate a StreamClient |
| + for the specified URI. |
| + |
| + Args: |
| + uri: The streamserver URI. |
| + |
| + Raises: |
| + ValueError if the supplied URI references an invalid or improperly |
| + configured streamserver. |
| + """ |
| + return _default_registry.create(uri) |
| + |
| + |
| +class StreamClient(object): |
| + """Abstract base class for a streamserver client. |
| + """ |
| + |
| + class _DatagramStream(object): |
| + """Wraps a stream object to write length-prefixed datagrams.""" |
| + |
| + def __init__(self, fd): |
| + self._fd = fd |
| + |
| + def send(self, data): |
| + varint.write_uvarint(self._fd, len(data)) |
| + self._fd.write(data) |
| + |
| + def close(self): |
| + return self._fd.close() |
| + |
| + def __init__(self): |
| + self._name_lock = threading.Lock() |
| + self._names = set() |
| + |
| + def _register_new_stream(self, name): |
| + """Registers a new stream name. |
| + |
| + The Butler will internally reject any duplicate stream names. However, there |
| + isn't really feedback when this happens except a closed stream client. This |
| + is a client-side check to provide a more user-friendly experience in the |
| + event that a user attempts to register a duplicate stream name. |
| + |
| + Note that this is imperfect, as somethig else could register stream names |
|
martiniss
2016/05/10 22:12:38
typo
dnj (Google)
2016/05/12 17:54:03
Done.
|
| + with the same Butler instance and this library has no means of tracking. |
| + This is a best-effort experience, not a reliable check. |
| + |
| + Args: |
| + name (str): The name of the stream. |
| + |
| + Raises: |
| + ValueError if the stream name has already been registered. |
| + """ |
| + with self._name_lock: |
| + if name in self._names: |
| + raise ValueError("Duplicate stream name [%s]" % (name,)) |
| + self._names.add(name) |
| + |
| + @classmethod |
| + def _create(cls, value): |
| + """Returns (StreamClient): A new stream client connection. |
| + |
| + Validates the streamserver parameters and creates a new StreamClient |
| + instance that connects to them. |
| + |
| + Implementing classes must override this. |
| + """ |
| + raise NotImplementedError() |
| + |
| + def _connect_raw(self): |
| + """Returns (file): A new file-like stream. |
| + |
| + Creates a new raw connection to the streamserver. This connection MUST not |
| + have any data written to it past initialization (if needed) when it has been |
| + returned. |
| + |
| + The file-like object must implement `write` and `close`. |
| + |
| + Implementing classes must override this. |
| + """ |
| + raise NotImplementedError() |
| + |
| + def new_connection(self, params): |
| + """Returns (file): A new configured stream. |
| + |
| + The returned object implements (minimally) `write` and `close`. |
| + |
| + Creates a new LogDog stream with the specified parameters. |
| + |
| + Raises: |
| + ValueError if the stream name has already been used. |
| + """ |
| + self._register_new_stream(params.name) |
| + params_json = params.to_json() |
| + |
| + fd = self._connect_raw() |
| + varint.write_uvarint(fd, len(params_json)) |
| + fd.write(params_json) |
| + return fd |
| + |
| + @contextlib.contextmanager |
| + def text(self, name, **kwargs): |
| + """Context manager to create, use, and teardown a TEXT stream. |
| + |
| + This context manager creates a new butler TEXT stream with the specified |
| + name and parameters, yields it, and closes it on teardown. |
| + |
| + Args: |
| + name (str): the LogDog name of the stream. |
| + kwargs (dict): Log stream parameters. These may be any keyword arguments |
| + accepted by `open_text`. |
| + |
| + Returns (file): A file-like object to a Butler UTF-8 text stream supporting |
| + `write`. |
| + """ |
| + fd = None |
| + try: |
| + fd = self.open_text(name, **kwargs) |
| + yield fd |
| + finally: |
| + if fd is not None: |
| + fd.close() |
| + |
| + def open_text(self, name, content_type=None, tags=None, tee=None, |
| + binary_file_extension=None): |
| + """Returns (file): A file-like object for a single text stream. |
| + |
| + This creates a new butler TEXT stream with the specified name and |
| + parameters. |
| + |
| + Args: |
| + name (str): the LogDog name of the stream. |
| + content_type (str): The optional content type of the stream. If None, a |
| + default content type will be chosen by the Butler. |
| + tags (dict): An optional key/value dictionary pair of LogDog stream tags. |
| + tee (str): Describes how stream data should be tee'd through the Butler. |
| + One of StreamParams' TEE arguments. |
| + binary_file_extension (str): A custom binary file extension. If not |
| + provided, a default extension may be chosen or the binary stream may |
| + not be emitted. |
| + |
| + Returns (file): A file-like object to a Butler text stream. This object can |
| + have UTF-8 text content written to it with its `write` method, and must |
| + be closed when finished using its `close` method. |
| + """ |
| + params = StreamParams.make( |
| + name=name, |
| + type=StreamParams.TEXT, |
| + content_type=content_type, |
| + tags=tags, |
| + tee=tee, |
| + binary_file_extension=binary_file_extension) |
| + return self.new_connection(params) |
| + |
| + @contextlib.contextmanager |
| + def binary(self, name, **kwargs): |
| + """Context manager to create, use, and teardown a BINARY stream. |
| + |
| + This context manager creates a new butler BINARY stream with the specified |
| + name and parameters, yields it, and closes it on teardown. |
| + |
| + Args: |
| + name (str): the LogDog name of the stream. |
| + kwargs (dict): Log stream parameters. These may be any keyword arguments |
| + accepted by `open_binary`. |
| + |
| + Returns (file): A file-like object to a Butler binary stream supporting |
| + `write`. |
| + """ |
| + fd = None |
| + try: |
| + fd = self.open_binary(name, **kwargs) |
| + yield fd |
| + finally: |
| + if fd is not None: |
| + fd.close() |
| + |
| + def open_binary(self, name, content_type=None, tags=None, tee=None, |
| + binary_file_extension=None): |
| + """Returns (file): A file-like object for a single binary stream. |
| + |
| + This creates a new butler BINARY stream with the specified name and |
| + parameters. |
| + |
| + Args: |
| + name (str): the LogDog name of the stream. |
| + content_type (str): The optional content type of the stream. If None, a |
| + default content type will be chosen by the Butler. |
| + tags (dict): An optional key/value dictionary pair of LogDog stream tags. |
| + tee (str): Describes how stream data should be tee'd through the Butler. |
| + One of StreamParams' TEE arguments. |
| + binary_file_extension (str): A custom binary file extension. If not |
| + provided, a default extension may be chosen or the binary stream may |
| + not be emitted. |
| + |
| + Returns (file): A file-like object to a Butler binary stream. This object |
| + can have UTF-8 content written to it with its `write` method, and must |
| + be closed when finished using its `close` method. |
| + """ |
| + params = StreamParams.make( |
| + name=name, |
| + type=StreamParams.BINARY, |
| + content_type=content_type, |
| + tags=tags, |
| + tee=tee, |
| + binary_file_extension=binary_file_extension) |
| + return self.new_connection(params) |
| + |
| + @contextlib.contextmanager |
| + def datagram(self, name, **kwargs): |
| + """Context manager to create, use, and teardown a DATAGRAM stream. |
| + |
| + This context manager creates a new butler DATAAGRAM stream with the |
| + specified name and parameters, yields it, and closes it on teardown. |
| + |
| + Args: |
| + name (str): the LogDog name of the stream. |
| + kwargs (dict): Log stream parameters. These may be any keyword arguments |
| + accepted by `open_datagram`. |
| + |
| + Returns (_DatagramStream): A datagram stream object. Datagrams can be |
| + written to it using its `send` method. |
| + """ |
| + fd = None |
| + try: |
| + fd = self.open_datagram(name, **kwargs) |
| + yield fd |
| + finally: |
| + if fd is not None: |
| + fd.close() |
| + |
| + def open_datagram(self, name, content_type=None, tags=None, tee=None, |
| + binary_file_extension=None): |
| + """Returns: (file): A file-like object for a single text stream. |
|
martiniss
2016/05/10 22:12:38
Is this correct? I think this was copy pasted.
dnj (Google)
2016/05/12 17:54:02
Yep was copy/pasted, cleaned that up.
|
| + |
| + This creates a new butler TEXT stream with the specified name and |
| + parameters. |
| + |
| + Args: |
| + name (str): the LogDog name of the stream. |
| + content_type (str): The optional content type of the stream. If None, a |
| + default content type will be chosen by the Butler. |
| + tags (dict): An optional key/value dictionary pair of LogDog stream tags. |
| + tee (str): Describes how stream data should be tee'd through the Butler. |
| + One of StreamParams' TEE arguments. |
| + binary_file_extension (str): A custom binary file extension. If not |
| + provided, a default extension may be chosen or the binary stream may |
| + not be emitted. |
| + |
| + Returns (_DatagramStream): A datagram stream object. Datagrams can be |
| + written to it using its `send` method. This object must be closed when |
| + finished by using its `close` method. |
| + """ |
| + params = StreamParams.make( |
| + name=name, |
| + type=StreamParams.DATAGRAM, |
| + content_type=content_type, |
| + tags=tags, |
| + tee=tee, |
| + binary_file_extension=binary_file_extension) |
| + return self._DatagramStream(self.new_connection(params)) |
| + |
| + |
| +class _NamedPipeStreamClient(StreamClient): |
| + """A StreamClient implementation that connects to a Windows named pipe. |
|
martiniss
2016/05/10 22:12:38
Windows? Why does line 394 say UNIX? Bad copy-past
dnj (Google)
2016/05/12 17:54:02
Was copy/pasted.
|
| + """ |
| + |
| + def __init__(self, name): |
| + r"""Initializes a new UNIX domain socket stream client. |
| + |
| + Args: |
| + name (str): The name of the Windows named pipe to use (e.g., "\\.\name") |
| + """ |
| + super(_NamedPipeStreamClient, self).__init__() |
| + self._name = name |
| + |
| + @classmethod |
| + def _create(cls, value): |
| + return cls(value) |
| + |
| + def _connect_raw(self): |
| + return open(self._name, 'wb') |
| + |
| +_default_registry.register_protocol('net.pipe', _NamedPipeStreamClient) |
| + |
| + |
| +class _UnixDomainSocketStreamClient(StreamClient): |
| + """A StreamClient implementation that uses a UNIX domain socket. |
| + """ |
| + |
| + def __init__(self, path): |
| + """Initializes a new UNIX domain socket stream client. |
| + |
| + Args: |
| + path (str): The path to the named UNIX domain socket. |
| + """ |
| + super(_UnixDomainSocketStreamClient, self).__init__() |
| + self._path = path |
| + |
| + @classmethod |
| + def _create(cls, value): |
| + if not os.path.exists(value): |
| + raise ValueError('UNIX domain socket [%s] does not exist.' % (value,)) |
| + return cls(value) |
| + |
| + def _connect_raw(self): |
| + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
| + sock.connect(self._path) |
| + return sock |
| + |
| +_default_registry.register_protocol('unix', _UnixDomainSocketStreamClient) |