Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(5532)

Unified Diff: client/libs/logdog/stream.py

Issue 2453273002: Update LogDog client library to generate URLs. (Closed)
Patch Set: Forgot project, oops. Addressed nits. Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « client/libs/logdog/bootstrap.py ('k') | client/libs/logdog/streamname.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: client/libs/logdog/stream.py
diff --git a/client/libs/logdog/stream.py b/client/libs/logdog/stream.py
index 3d60a6c6c948807e209f6819c53297ffa7f540ce..ea415e1ffc199315af53b5993373a200311476f4 100644
--- a/client/libs/logdog/stream.py
+++ b/client/libs/logdog/stream.py
@@ -112,7 +112,21 @@ class StreamProtocolRegistry(object):
raise KeyError('Duplicate protocol registered.')
self._registry[protocol] = client_cls
- def create(self, uri):
+ def create(self, uri, **kwargs):
+ """Returns (StreamClient): A stream client for the specified URI.
+
+ This uses the default StreamProtocolRegistry to instantiate a StreamClient
+ for the specified URI.
+
+ Args:
+ uri (str): The streamserver URI.
+ kwargs: keyword arguments to forward to the stream. See
+ StreamClient.__init__.
+
+ Raises:
+ ValueError: if the supplied URI references an invalid or improperly
+ configured streamserver.
+ """
uri = uri.split(':', 1)
if len(uri) != 2:
raise ValueError('Invalid stream server URI [%s]' % (uri,))
@@ -121,36 +135,74 @@ class StreamProtocolRegistry(object):
client_cls = self._registry.get(protocol)
if not client_cls:
raise ValueError('Unknown stream client protocol (%s)' % (protocol,))
- return client_cls._create(value)
+ return client_cls._create(value, **kwargs)
+
# Default (global) registry.
_default_registry = StreamProtocolRegistry()
+create = _default_registry.create
-def create(uri):
- """Returns (StreamClient): A stream client for the specified URI.
+class StreamClient(object):
+ """Abstract base class for a streamserver client.
+ """
- This uses the default StreamProtocolRegistry to instantiate a StreamClient
- for the specified URI.
+ class _StreamBase(object):
+ """ABC for StreamClient streams."""
- Args:
- uri: The streamserver URI.
+ def __init__(self, stream_client, params):
+ self._stream_client = stream_client
+ self._params = params
- Raises:
- ValueError if the supplied URI references an invalid or improperly
- configured streamserver.
- """
- return _default_registry.create(uri)
+ @property
+ def params(self):
+ """Returns (StreamParams): The stream parameters."""
+ return self._params
+ @property
+ def path(self):
+ """Returns (streamname.StreamPath): The stream path.
-class StreamClient(object):
- """Abstract base class for a streamserver client.
- """
+ Raises:
+ ValueError: if the stream path is invalid, or if the stream prefix is
+ not defined in the client.
+ """
+ return self._stream_client.get_stream_path(self._params.name)
+
+ def get_viewer_url(self):
+ """Returns (str): The viewer URL for this stream.
+
+ Raises:
+ KeyError: if information needed to construct the URL is missing.
+ ValueError: if the stream prefix or name do not form a valid stream
+ path.
+ """
+ return self._stream_client.get_viewer_url(self._params.name)
+
+
+ class _BasicStream(_StreamBase):
+ """Wraps a basic file descriptor, offering "write" and "close"."""
+
+ def __init__(self, stream_client, params, fd):
+ super(StreamClient._BasicStream, self).__init__(stream_client, params)
+ self._fd = fd
+
+ @property
+ def fd(self):
+ return self._fd
+
+ def write(self, data):
+ return self._fd.write(data)
+
+ def close(self):
+ return self._fd.close()
- class _DatagramStream(object):
+
+ class _DatagramStream(_StreamBase):
"""Wraps a stream object to write length-prefixed datagrams."""
- def __init__(self, fd):
+ def __init__(self, stream_client, params, fd):
+ super(StreamClient._DatagramStream, self).__init__(stream_client, params)
self._fd = fd
def send(self, data):
@@ -160,10 +212,76 @@ class StreamClient(object):
def close(self):
return self._fd.close()
- def __init__(self):
+
+ def __init__(self, project=None, prefix=None, coordinator_host=None):
+ """Constructs a new base StreamClient instance.
+
+ Args:
+ project (str or None): If not None, the name of the log stream project.
+ prefix (str or None): If not None, the log stream session prefix.
+ coordinator_host (str or None): If not None, the name of the Coordinator
+ host that this stream client is bound to. This will be used to
+ construct viewer URLs for generated streams.
+ """
+ self._project = project
+ self._prefix = prefix
+ self._coordinator_host = coordinator_host
+
self._name_lock = threading.Lock()
self._names = set()
+ @property
+ def project(self):
+ """Returns (str or None): The stream project, or None if not configured."""
+ return self._project
+
+ @property
+ def prefix(self):
+ """Returns (str or None): The stream prefix, or None if not configured."""
+ return self._prefix
+
+ @property
+ def coordinator_host(self):
+ """Returns (str or None): The coordinator host, or None if not configured.
+ """
+ return self._coordinator_host
+
+ def get_stream_path(self, name):
+ """Returns (streamname.StreamPath): The stream path.
+
+ Args:
+ name (str): The name of the stream.
+
+ Raises:
+ KeyError: if information needed to construct the path is missing.
+ ValueError: if the stream path is invalid, or if the stream prefix is
+ not defined in the client.
+ """
+ if not self._prefix:
+ raise KeyError('Stream prefix is not configured')
+ return streamname.StreamPath.make(self._prefix, name)
+
+ def get_viewer_url(self, name):
+ """Returns (str): The LogDog viewer URL for the named stream.
+
+ Args:
+ name (str): The name of the stream. This can also be a query glob.
+
+ Raises:
+ KeyError: if information needed to construct the URL is missing.
+ ValueError: if the stream prefix or name do not form a valid stream
+ path.
+ """
+ if not self._coordinator_host:
+ raise KeyError('Coordinator host is not configured')
+ if not self._project:
+ raise KeyError('Stream project is not configured')
+
+ return streamname.get_logdog_viewer_url(
+ self._coordinator_host,
+ self._project,
+ self.get_stream_path(name))
+
def _register_new_stream(self, name):
"""Registers a new stream name.
@@ -188,8 +306,8 @@ class StreamClient(object):
self._names.add(name)
@classmethod
- def _create(cls, value):
- """Returns (StreamClient): A new stream client connection.
+ def _create(cls, value, **kwargs):
+ """Returns (StreamClient): A new stream client instance.
Validates the streamserver parameters and creates a new StreamClient
instance that connects to them.
@@ -285,7 +403,7 @@ class StreamClient(object):
tags=tags,
tee=tee,
binary_file_extension=binary_file_extension)
- return self.new_connection(params)
+ return self._BasicStream(self, params, self.new_connection(params))
@contextlib.contextmanager
def binary(self, name, **kwargs):
@@ -338,7 +456,7 @@ class StreamClient(object):
tags=tags,
tee=tee,
binary_file_extension=binary_file_extension)
- return self.new_connection(params)
+ return self._BasicStream(self, params, self.new_connection(params))
@contextlib.contextmanager
def datagram(self, name, **kwargs):
@@ -389,25 +507,25 @@ class StreamClient(object):
tags=tags,
tee=tee,
binary_file_extension=binary_file_extension)
- return self._DatagramStream(self.new_connection(params))
+ return self._DatagramStream(self, params, self.new_connection(params))
class _NamedPipeStreamClient(StreamClient):
"""A StreamClient implementation that connects to a Windows named pipe.
"""
- def __init__(self, name):
+ def __init__(self, name, **kwargs):
r"""Initializes a new Windows named pipe stream client.
Args:
name (str): The name of the Windows named pipe to use (e.g., "\\.\name")
"""
- super(_NamedPipeStreamClient, self).__init__()
+ super(_NamedPipeStreamClient, self).__init__(**kwargs)
self._name = name
@classmethod
- def _create(cls, value):
- return cls(value)
+ def _create(cls, value, **kwargs):
+ return cls(value, **kwargs)
def _connect_raw(self):
return open(self._name, 'wb')
@@ -432,20 +550,20 @@ class _UnixDomainSocketStreamClient(StreamClient):
self._fd.close()
- def __init__(self, path):
+ def __init__(self, path, **kwargs):
"""Initializes a new UNIX domain socket stream client.
Args:
path (str): The path to the named UNIX domain socket.
"""
- super(_UnixDomainSocketStreamClient, self).__init__()
+ super(_UnixDomainSocketStreamClient, self).__init__(**kwargs)
self._path = path
@classmethod
- def _create(cls, value):
+ def _create(cls, value, **kwargs):
if not os.path.exists(value):
raise ValueError('UNIX domain socket [%s] does not exist.' % (value,))
- return cls(value)
+ return cls(value, **kwargs)
def _connect_raw(self):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
« no previous file with comments | « client/libs/logdog/bootstrap.py ('k') | client/libs/logdog/streamname.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698