Index: client/libs/logdog/stream.py |
diff --git a/client/libs/logdog/stream.py b/client/libs/logdog/stream.py |
index 3d60a6c6c948807e209f6819c53297ffa7f540ce..ea415e1ffc199315af53b5993373a200311476f4 100644 |
--- a/client/libs/logdog/stream.py |
+++ b/client/libs/logdog/stream.py |
@@ -112,7 +112,21 @@ class StreamProtocolRegistry(object): |
raise KeyError('Duplicate protocol registered.') |
self._registry[protocol] = client_cls |
- def create(self, uri): |
+ def create(self, uri, **kwargs): |
+ """Returns (StreamClient): A stream client for the specified URI. |
+ |
+ This uses the default StreamProtocolRegistry to instantiate a StreamClient |
+ for the specified URI. |
+ |
+ Args: |
+ uri (str): The streamserver URI. |
+ kwargs: keyword arguments to forward to the stream. See |
+ StreamClient.__init__. |
+ |
+ Raises: |
+ ValueError: if the supplied URI references an invalid or improperly |
+ configured streamserver. |
+ """ |
uri = uri.split(':', 1) |
if len(uri) != 2: |
raise ValueError('Invalid stream server URI [%s]' % (uri,)) |
@@ -121,36 +135,74 @@ class StreamProtocolRegistry(object): |
client_cls = self._registry.get(protocol) |
if not client_cls: |
raise ValueError('Unknown stream client protocol (%s)' % (protocol,)) |
- return client_cls._create(value) |
+ return client_cls._create(value, **kwargs) |
+ |
# Default (global) registry. |
_default_registry = StreamProtocolRegistry() |
+create = _default_registry.create |
-def create(uri): |
- """Returns (StreamClient): A stream client for the specified URI. |
+class StreamClient(object): |
+ """Abstract base class for a streamserver client. |
+ """ |
- This uses the default StreamProtocolRegistry to instantiate a StreamClient |
- for the specified URI. |
+ class _StreamBase(object): |
+ """ABC for StreamClient streams.""" |
- Args: |
- uri: The streamserver URI. |
+ def __init__(self, stream_client, params): |
+ self._stream_client = stream_client |
+ self._params = params |
- Raises: |
- ValueError if the supplied URI references an invalid or improperly |
- configured streamserver. |
- """ |
- return _default_registry.create(uri) |
+ @property |
+ def params(self): |
+ """Returns (StreamParams): The stream parameters.""" |
+ return self._params |
+ @property |
+ def path(self): |
+ """Returns (streamname.StreamPath): The stream path. |
-class StreamClient(object): |
- """Abstract base class for a streamserver client. |
- """ |
+ Raises: |
+ ValueError: if the stream path is invalid, or if the stream prefix is |
+ not defined in the client. |
+ """ |
+ return self._stream_client.get_stream_path(self._params.name) |
+ |
+ def get_viewer_url(self): |
+ """Returns (str): The viewer URL for this stream. |
+ |
+ Raises: |
+ KeyError: if information needed to construct the URL is missing. |
+ ValueError: if the stream prefix or name do not form a valid stream |
+ path. |
+ """ |
+ return self._stream_client.get_viewer_url(self._params.name) |
+ |
+ |
+ class _BasicStream(_StreamBase): |
+ """Wraps a basic file descriptor, offering "write" and "close".""" |
+ |
+ def __init__(self, stream_client, params, fd): |
+ super(StreamClient._BasicStream, self).__init__(stream_client, params) |
+ self._fd = fd |
+ |
+ @property |
+ def fd(self): |
+ return self._fd |
+ |
+ def write(self, data): |
+ return self._fd.write(data) |
+ |
+ def close(self): |
+ return self._fd.close() |
- class _DatagramStream(object): |
+ |
+ class _DatagramStream(_StreamBase): |
"""Wraps a stream object to write length-prefixed datagrams.""" |
- def __init__(self, fd): |
+ def __init__(self, stream_client, params, fd): |
+ super(StreamClient._DatagramStream, self).__init__(stream_client, params) |
self._fd = fd |
def send(self, data): |
@@ -160,10 +212,76 @@ class StreamClient(object): |
def close(self): |
return self._fd.close() |
- def __init__(self): |
+ |
+ def __init__(self, project=None, prefix=None, coordinator_host=None): |
+ """Constructs a new base StreamClient instance. |
+ |
+ Args: |
+ project (str or None): If not None, the name of the log stream project. |
+ prefix (str or None): If not None, the log stream session prefix. |
+ coordinator_host (str or None): If not None, the name of the Coordinator |
+ host that this stream client is bound to. This will be used to |
+ construct viewer URLs for generated streams. |
+ """ |
+ self._project = project |
+ self._prefix = prefix |
+ self._coordinator_host = coordinator_host |
+ |
self._name_lock = threading.Lock() |
self._names = set() |
+ @property |
+ def project(self): |
+ """Returns (str or None): The stream project, or None if not configured.""" |
+ return self._project |
+ |
+ @property |
+ def prefix(self): |
+ """Returns (str or None): The stream prefix, or None if not configured.""" |
+ return self._prefix |
+ |
+ @property |
+ def coordinator_host(self): |
+ """Returns (str or None): The coordinator host, or None if not configured. |
+ """ |
+ return self._coordinator_host |
+ |
+ def get_stream_path(self, name): |
+ """Returns (streamname.StreamPath): The stream path. |
+ |
+ Args: |
+ name (str): The name of the stream. |
+ |
+ Raises: |
+ KeyError: if information needed to construct the path is missing. |
+ ValueError: if the stream path is invalid, or if the stream prefix is |
+ not defined in the client. |
+ """ |
+ if not self._prefix: |
+ raise KeyError('Stream prefix is not configured') |
+ return streamname.StreamPath.make(self._prefix, name) |
+ |
+ def get_viewer_url(self, name): |
+ """Returns (str): The LogDog viewer URL for the named stream. |
+ |
+ Args: |
+ name (str): The name of the stream. This can also be a query glob. |
+ |
+ Raises: |
+ KeyError: if information needed to construct the URL is missing. |
+ ValueError: if the stream prefix or name do not form a valid stream |
+ path. |
+ """ |
+ if not self._coordinator_host: |
+ raise KeyError('Coordinator host is not configured') |
+ if not self._project: |
+ raise KeyError('Stream project is not configured') |
+ |
+ return streamname.get_logdog_viewer_url( |
+ self._coordinator_host, |
+ self._project, |
+ self.get_stream_path(name)) |
+ |
def _register_new_stream(self, name): |
"""Registers a new stream name. |
@@ -188,8 +306,8 @@ class StreamClient(object): |
self._names.add(name) |
@classmethod |
- def _create(cls, value): |
- """Returns (StreamClient): A new stream client connection. |
+ def _create(cls, value, **kwargs): |
+ """Returns (StreamClient): A new stream client instance. |
Validates the streamserver parameters and creates a new StreamClient |
instance that connects to them. |
@@ -285,7 +403,7 @@ class StreamClient(object): |
tags=tags, |
tee=tee, |
binary_file_extension=binary_file_extension) |
- return self.new_connection(params) |
+ return self._BasicStream(self, params, self.new_connection(params)) |
@contextlib.contextmanager |
def binary(self, name, **kwargs): |
@@ -338,7 +456,7 @@ class StreamClient(object): |
tags=tags, |
tee=tee, |
binary_file_extension=binary_file_extension) |
- return self.new_connection(params) |
+ return self._BasicStream(self, params, self.new_connection(params)) |
@contextlib.contextmanager |
def datagram(self, name, **kwargs): |
@@ -389,25 +507,25 @@ class StreamClient(object): |
tags=tags, |
tee=tee, |
binary_file_extension=binary_file_extension) |
- return self._DatagramStream(self.new_connection(params)) |
+ return self._DatagramStream(self, params, self.new_connection(params)) |
class _NamedPipeStreamClient(StreamClient): |
"""A StreamClient implementation that connects to a Windows named pipe. |
""" |
- def __init__(self, name): |
+ def __init__(self, name, **kwargs): |
r"""Initializes a new Windows named pipe stream client. |
Args: |
name (str): The name of the Windows named pipe to use (e.g., "\\.\name") |
""" |
- super(_NamedPipeStreamClient, self).__init__() |
+ super(_NamedPipeStreamClient, self).__init__(**kwargs) |
self._name = name |
@classmethod |
- def _create(cls, value): |
- return cls(value) |
+ def _create(cls, value, **kwargs): |
+ return cls(value, **kwargs) |
def _connect_raw(self): |
return open(self._name, 'wb') |
@@ -432,20 +550,20 @@ class _UnixDomainSocketStreamClient(StreamClient): |
self._fd.close() |
- def __init__(self, path): |
+ def __init__(self, path, **kwargs): |
"""Initializes a new UNIX domain socket stream client. |
Args: |
path (str): The path to the named UNIX domain socket. |
""" |
- super(_UnixDomainSocketStreamClient, self).__init__() |
+ super(_UnixDomainSocketStreamClient, self).__init__(**kwargs) |
self._path = path |
@classmethod |
- def _create(cls, value): |
+ def _create(cls, value, **kwargs): |
if not os.path.exists(value): |
raise ValueError('UNIX domain socket [%s] does not exist.' % (value,)) |
- return cls(value) |
+ return cls(value, **kwargs) |
def _connect_raw(self): |
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |