Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(31)

Unified Diff: client/libs/logdog/stream.py

Issue 1961603002: Add LogDog Python client library. (Closed) Base URL: https://github.com/luci/luci-py@master
Patch Set: Remove "client." from package name, make pylint happy. Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « client/libs/logdog/bootstrap.py ('k') | client/libs/logdog/streamname.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: client/libs/logdog/stream.py
diff --git a/client/libs/logdog/stream.py b/client/libs/logdog/stream.py
new file mode 100644
index 0000000000000000000000000000000000000000..6221165457216c1a0971d00ff66213ddb1d6a5c9
--- /dev/null
+++ b/client/libs/logdog/stream.py
@@ -0,0 +1,434 @@
+# Copyright 2016 The LUCI Authors. All rights reserved.
+# Use of this source code is governed by the Apache v2.0 license that can be
+# found in the LICENSE file.
+
+import collections
+import contextlib
+import json
+import os
+import socket
+import sys
+import threading
+import types
+
+from libs.logdog import streamname, varint
+
+
+_StreamParamsBase = collections.namedtuple('_StreamParamsBase',
+ ('name', 'type', 'content_type', 'tags', 'tee', 'binary_file_extension'))
+
+
+class StreamParams(_StreamParamsBase):
+ """Defines the set of parameters to apply to a new stream."""
+
+ # A text content stream.
+ TEXT = 'text'
+ # A binary content stream.
+ BINARY = 'binary'
+ # A datagram content stream.
+ DATAGRAM = 'datagram'
+
+ # Tee parameter to tee this stream through the Butler's STDOUT.
+ TEE_STDOUT = 'stdout'
+ # Tee parameter to tee this stream through the Butler's STDERR.
+ TEE_STDERR = 'stderr'
+
+ @classmethod
+ def make(cls, **kwargs):
+ """Returns (StreamParams): A new StreamParams instance with supplied values.
+
+ Any parameter that isn't supplied will be set to None.
+
+ Args:
+ kwargs (dict): Named parameters to apply.
+ """
+ return cls(**{f: kwargs.get(f) for f in cls._fields})
+
+ def validate(self):
+ """Raises (ValueError): if the parameters are not valid."""
+ streamname.validate_stream_name(self.name)
+
+ if self.type not in (self.TEXT, self.BINARY, self.DATAGRAM):
+ raise ValueError('Invalid type (%s)' % (self.type,))
+
+ if self.tags is not None:
+ if not isinstance(self.tags, collections.Mapping):
+ raise ValueError('Invalid tags type (%s)' % (self.tags,))
+ for k, v in self.tags.iteritems():
+ streamname.validate_tag(k, v)
+
+ if self.tee not in (None, self.TEE_STDOUT, self.TEE_STDERR):
+ raise ValueError('Invalid tee type (%s)' % (self.tee,))
+
+ if not isinstance(self.binary_file_extension,
+ (types.NoneType, types.StringTypes)):
+ raise ValueError('Invalid binary file extension type (%s)' % (
+ self.binary_file_extension,))
+
+ def to_json(self):
+ """Returns (str): The JSON representation of the StreamParams.
+
+ Converts stream parameters to JSON for Butler consumption.
+
+ Raises:
+ ValueError: if these parameters are not valid.
+ """
+ self.validate()
+
+ obj = {
+ 'name': self.name,
+ 'type': self.type,
+ }
+
+ def maybe_add(key, value):
+ if value is not None:
+ obj[key] = value
+ maybe_add('contentType', self.content_type)
+ maybe_add('tags', self.tags)
+ maybe_add('tee', self.tee)
+ maybe_add('binaryFileExtension', self.binary_file_extension)
+
+ # Note that "dumps' will dump UTF-8 by default, which is what Butler wants.
+ return json.dumps(obj, sort_keys=True, ensure_ascii=True, indent=None)
+
+
+class StreamProtocolRegistry(object):
+ """Registry of streamserver URI protocols and their client classes.
+ """
+
+ def __init__(self):
+ self._registry = {}
+
+ def register_protocol(self, protocol, client_cls):
+ assert issubclass(client_cls, StreamClient)
+ if self._registry.get(protocol) is not None:
+ raise KeyError('Duplicate protocol registered.')
+ self._registry[protocol] = client_cls
+
+ def create(self, uri):
+ uri = uri.split(':', 1)
+ if len(uri) != 2:
+ raise ValueError('Invalid stream server URI [%s]' % (uri,))
+ protocol, value = uri
+
+ client_cls = self._registry.get(protocol)
+ if not client_cls:
+ raise ValueError('Unknown stream client protocol (%s)' % (protocol,))
+ return client_cls._create(value)
+
+# Default (global) registry.
+_default_registry = StreamProtocolRegistry()
+
+
+def create(uri):
+ """Returns (StreamClient): A stream client for the specified URI.
+
+ This uses the default StreamProtocolRegistry to instantiate a StreamClient
+ for the specified URI.
+
+ Args:
+ uri: The streamserver URI.
+
+ Raises:
+ ValueError if the supplied URI references an invalid or improperly
+ configured streamserver.
+ """
+ return _default_registry.create(uri)
+
+
+class StreamClient(object):
+ """Abstract base class for a streamserver client.
+ """
+
+ class _DatagramStream(object):
+ """Wraps a stream object to write length-prefixed datagrams."""
+
+ def __init__(self, fd):
+ self._fd = fd
+
+ def send(self, data):
+ varint.write_uvarint(self._fd, len(data))
+ self._fd.write(data)
+
+ def close(self):
+ return self._fd.close()
+
+ def __init__(self):
+ self._name_lock = threading.Lock()
+ self._names = set()
+
+ def _register_new_stream(self, name):
+ """Registers a new stream name.
+
+ The Butler will internally reject any duplicate stream names. However, there
+ isn't really feedback when this happens except a closed stream client. This
+ is a client-side check to provide a more user-friendly experience in the
+ event that a user attempts to register a duplicate stream name.
+
+ Note that this is imperfect, as something else could register stream names
+ with the same Butler instance and this library has no means of tracking.
+ This is a best-effort experience, not a reliable check.
+
+ Args:
+ name (str): The name of the stream.
+
+ Raises:
+ ValueError if the stream name has already been registered.
+ """
+ with self._name_lock:
+ if name in self._names:
+ raise ValueError("Duplicate stream name [%s]" % (name,))
+ self._names.add(name)
+
+ @classmethod
+ def _create(cls, value):
+ """Returns (StreamClient): A new stream client connection.
+
+ Validates the streamserver parameters and creates a new StreamClient
+ instance that connects to them.
+
+ Implementing classes must override this.
+ """
+ raise NotImplementedError()
+
+ def _connect_raw(self):
+ """Returns (file): A new file-like stream.
+
+ Creates a new raw connection to the streamserver. This connection MUST not
+ have any data written to it past initialization (if needed) when it has been
+ returned.
+
+ The file-like object must implement `write` and `close`.
+
+ Implementing classes must override this.
+ """
+ raise NotImplementedError()
+
+ def new_connection(self, params):
+ """Returns (file): A new configured stream.
+
+ The returned object implements (minimally) `write` and `close`.
+
+ Creates a new LogDog stream with the specified parameters.
+
+ Args:
+ params (StreamParams): The parameters to use with the new connection.
+
+ Raises:
+ ValueError if the stream name has already been used, or if the parameters
+ are not valid.
+ """
+ self._register_new_stream(params.name)
+ params_json = params.to_json()
+
+ fd = self._connect_raw()
+ varint.write_uvarint(fd, len(params_json))
+ fd.write(params_json)
+ return fd
+
+ @contextlib.contextmanager
+ def text(self, name, **kwargs):
+ """Context manager to create, use, and teardown a TEXT stream.
+
+ This context manager creates a new butler TEXT stream with the specified
+ parameters, yields it, and closes it on teardown.
+
+ Args:
+ name (str): the LogDog name of the stream.
+ kwargs (dict): Log stream parameters. These may be any keyword arguments
+ accepted by `open_text`.
+
+ Returns (file): A file-like object to a Butler UTF-8 text stream supporting
+ `write`.
+ """
+ fd = None
+ try:
+ fd = self.open_text(name, **kwargs)
+ yield fd
+ finally:
+ if fd is not None:
+ fd.close()
+
+ def open_text(self, name, content_type=None, tags=None, tee=None,
+ binary_file_extension=None):
+ """Returns (file): A file-like object for a single text stream.
+
+ This creates a new butler TEXT stream with the specified parameters.
+
+ Args:
+ name (str): the LogDog name of the stream.
+ content_type (str): The optional content type of the stream. If None, a
+ default content type will be chosen by the Butler.
+ tags (dict): An optional key/value dictionary pair of LogDog stream tags.
+ tee (str): Describes how stream data should be tee'd through the Butler.
+ One of StreamParams' TEE arguments.
+ binary_file_extension (str): A custom binary file extension. If not
+ provided, a default extension may be chosen or the binary stream may
+ not be emitted.
+
+ Returns (file): A file-like object to a Butler text stream. This object can
+ have UTF-8 text content written to it with its `write` method, and must
+ be closed when finished using its `close` method.
+ """
+ params = StreamParams.make(
+ name=name,
+ type=StreamParams.TEXT,
+ content_type=content_type,
+ tags=tags,
+ tee=tee,
+ binary_file_extension=binary_file_extension)
+ return self.new_connection(params)
+
+ @contextlib.contextmanager
+ def binary(self, name, **kwargs):
+ """Context manager to create, use, and teardown a BINARY stream.
+
+ This context manager creates a new butler BINARY stream with the specified
+ parameters, yields it, and closes it on teardown.
+
+ Args:
+ name (str): the LogDog name of the stream.
+ kwargs (dict): Log stream parameters. These may be any keyword arguments
+ accepted by `open_binary`.
+
+ Returns (file): A file-like object to a Butler binary stream supporting
+ `write`.
+ """
+ fd = None
+ try:
+ fd = self.open_binary(name, **kwargs)
+ yield fd
+ finally:
+ if fd is not None:
+ fd.close()
+
+ def open_binary(self, name, content_type=None, tags=None, tee=None,
+ binary_file_extension=None):
+ """Returns (file): A file-like object for a single binary stream.
+
+ This creates a new butler BINARY stream with the specified parameters.
+
+ Args:
+ name (str): the LogDog name of the stream.
+ content_type (str): The optional content type of the stream. If None, a
+ default content type will be chosen by the Butler.
+ tags (dict): An optional key/value dictionary pair of LogDog stream tags.
+ tee (str): Describes how stream data should be tee'd through the Butler.
+ One of StreamParams' TEE arguments.
+ binary_file_extension (str): A custom binary file extension. If not
+ provided, a default extension may be chosen or the binary stream may
+ not be emitted.
+
+ Returns (file): A file-like object to a Butler binary stream. This object
+ can have UTF-8 content written to it with its `write` method, and must
+ be closed when finished using its `close` method.
+ """
+ params = StreamParams.make(
+ name=name,
+ type=StreamParams.BINARY,
+ content_type=content_type,
+ tags=tags,
+ tee=tee,
+ binary_file_extension=binary_file_extension)
+ return self.new_connection(params)
+
+ @contextlib.contextmanager
+ def datagram(self, name, **kwargs):
+ """Context manager to create, use, and teardown a DATAGRAM stream.
+
+ This context manager creates a new butler DATAAGRAM stream with the
+ specified parameters, yields it, and closes it on teardown.
+
+ Args:
+ name (str): the LogDog name of the stream.
+ kwargs (dict): Log stream parameters. These may be any keyword arguments
+ accepted by `open_datagram`.
+
+ Returns (_DatagramStream): A datagram stream object. Datagrams can be
+ written to it using its `send` method.
+ """
+ fd = None
+ try:
+ fd = self.open_datagram(name, **kwargs)
+ yield fd
+ finally:
+ if fd is not None:
+ fd.close()
+
+ def open_datagram(self, name, content_type=None, tags=None, tee=None,
+ binary_file_extension=None):
+ """Creates a new butler DATAGRAM stream with the specified parameters.
+
+ Args:
+ name (str): the LogDog name of the stream.
+ content_type (str): The optional content type of the stream. If None, a
+ default content type will be chosen by the Butler.
+ tags (dict): An optional key/value dictionary pair of LogDog stream tags.
+ tee (str): Describes how stream data should be tee'd through the Butler.
+ One of StreamParams' TEE arguments.
+ binary_file_extension (str): A custom binary file extension. If not
+ provided, a default extension may be chosen or the binary stream may
+ not be emitted.
+
+ Returns (_DatagramStream): A datagram stream object. Datagrams can be
+ written to it using its `send` method. This object must be closed when
+ finished by using its `close` method.
+ """
+ params = StreamParams.make(
+ name=name,
+ type=StreamParams.DATAGRAM,
+ content_type=content_type,
+ tags=tags,
+ tee=tee,
+ binary_file_extension=binary_file_extension)
+ return self._DatagramStream(self.new_connection(params))
+
+
+class _NamedPipeStreamClient(StreamClient):
+ """A StreamClient implementation that connects to a Windows named pipe.
+ """
+
+ def __init__(self, name):
+ r"""Initializes a new Windows named pipe stream client.
+
+ Args:
+ name (str): The name of the Windows named pipe to use (e.g., "\\.\name")
+ """
+ super(_NamedPipeStreamClient, self).__init__()
+ self._name = name
+
+ @classmethod
+ def _create(cls, value):
+ return cls(value)
+
+ def _connect_raw(self):
+ return open(self._name, 'wb')
+
+_default_registry.register_protocol('net.pipe', _NamedPipeStreamClient)
+
+
+class _UnixDomainSocketStreamClient(StreamClient):
+ """A StreamClient implementation that uses a UNIX domain socket.
+ """
+
+ def __init__(self, path):
+ """Initializes a new UNIX domain socket stream client.
+
+ Args:
+ path (str): The path to the named UNIX domain socket.
+ """
+ super(_UnixDomainSocketStreamClient, self).__init__()
+ self._path = path
+
+ @classmethod
+ def _create(cls, value):
+ if not os.path.exists(value):
+ raise ValueError('UNIX domain socket [%s] does not exist.' % (value,))
+ return cls(value)
+
+ def _connect_raw(self):
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ sock.connect(self._path)
+ return sock
+
+_default_registry.register_protocol('unix', _UnixDomainSocketStreamClient)
« no previous file with comments | « client/libs/logdog/bootstrap.py ('k') | client/libs/logdog/streamname.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698