| Index: client/libs/logdog/stream.py
|
| diff --git a/client/libs/logdog/stream.py b/client/libs/logdog/stream.py
|
| index 3d60a6c6c948807e209f6819c53297ffa7f540ce..ea415e1ffc199315af53b5993373a200311476f4 100644
|
| --- a/client/libs/logdog/stream.py
|
| +++ b/client/libs/logdog/stream.py
|
| @@ -112,7 +112,21 @@ class StreamProtocolRegistry(object):
|
| raise KeyError('Duplicate protocol registered.')
|
| self._registry[protocol] = client_cls
|
|
|
| - def create(self, uri):
|
| + def create(self, uri, **kwargs):
|
| + """Returns (StreamClient): A stream client for the specified URI.
|
| +
|
| + This uses the default StreamProtocolRegistry to instantiate a StreamClient
|
| + for the specified URI.
|
| +
|
| + Args:
|
| + uri (str): The streamserver URI.
|
| + kwargs: keyword arguments to forward to the stream. See
|
| + StreamClient.__init__.
|
| +
|
| + Raises:
|
| + ValueError: if the supplied URI references an invalid or improperly
|
| + configured streamserver.
|
| + """
|
| uri = uri.split(':', 1)
|
| if len(uri) != 2:
|
| raise ValueError('Invalid stream server URI [%s]' % (uri,))
|
| @@ -121,36 +135,74 @@ class StreamProtocolRegistry(object):
|
| client_cls = self._registry.get(protocol)
|
| if not client_cls:
|
| raise ValueError('Unknown stream client protocol (%s)' % (protocol,))
|
| - return client_cls._create(value)
|
| + return client_cls._create(value, **kwargs)
|
| +
|
|
|
| # Default (global) registry.
|
| _default_registry = StreamProtocolRegistry()
|
| +create = _default_registry.create
|
|
|
|
|
| -def create(uri):
|
| - """Returns (StreamClient): A stream client for the specified URI.
|
| +class StreamClient(object):
|
| + """Abstract base class for a streamserver client.
|
| + """
|
|
|
| - This uses the default StreamProtocolRegistry to instantiate a StreamClient
|
| - for the specified URI.
|
| + class _StreamBase(object):
|
| + """ABC for StreamClient streams."""
|
|
|
| - Args:
|
| - uri: The streamserver URI.
|
| + def __init__(self, stream_client, params):
|
| + self._stream_client = stream_client
|
| + self._params = params
|
|
|
| - Raises:
|
| - ValueError if the supplied URI references an invalid or improperly
|
| - configured streamserver.
|
| - """
|
| - return _default_registry.create(uri)
|
| + @property
|
| + def params(self):
|
| + """Returns (StreamParams): The stream parameters."""
|
| + return self._params
|
|
|
| + @property
|
| + def path(self):
|
| + """Returns (streamname.StreamPath): The stream path.
|
|
|
| -class StreamClient(object):
|
| - """Abstract base class for a streamserver client.
|
| - """
|
| + Raises:
|
| + ValueError: if the stream path is invalid, or if the stream prefix is
|
| + not defined in the client.
|
| + """
|
| + return self._stream_client.get_stream_path(self._params.name)
|
| +
|
| + def get_viewer_url(self):
|
| + """Returns (str): The viewer URL for this stream.
|
| +
|
| + Raises:
|
| + KeyError: if information needed to construct the URL is missing.
|
| + ValueError: if the stream prefix or name do not form a valid stream
|
| + path.
|
| + """
|
| + return self._stream_client.get_viewer_url(self._params.name)
|
| +
|
| +
|
| + class _BasicStream(_StreamBase):
|
| + """Wraps a basic file descriptor, offering "write" and "close"."""
|
| +
|
| + def __init__(self, stream_client, params, fd):
|
| + super(StreamClient._BasicStream, self).__init__(stream_client, params)
|
| + self._fd = fd
|
| +
|
| + @property
|
| + def fd(self):
|
| + return self._fd
|
| +
|
| + def write(self, data):
|
| + return self._fd.write(data)
|
| +
|
| + def close(self):
|
| + return self._fd.close()
|
|
|
| - class _DatagramStream(object):
|
| +
|
| + class _DatagramStream(_StreamBase):
|
| """Wraps a stream object to write length-prefixed datagrams."""
|
|
|
| - def __init__(self, fd):
|
| + def __init__(self, stream_client, params, fd):
|
| + super(StreamClient._DatagramStream, self).__init__(stream_client, params)
|
| self._fd = fd
|
|
|
| def send(self, data):
|
| @@ -160,10 +212,76 @@ class StreamClient(object):
|
| def close(self):
|
| return self._fd.close()
|
|
|
| - def __init__(self):
|
| +
|
| + def __init__(self, project=None, prefix=None, coordinator_host=None):
|
| + """Constructs a new base StreamClient instance.
|
| +
|
| + Args:
|
| + project (str or None): If not None, the name of the log stream project.
|
| + prefix (str or None): If not None, the log stream session prefix.
|
| + coordinator_host (str or None): If not None, the name of the Coordinator
|
| + host that this stream client is bound to. This will be used to
|
| + construct viewer URLs for generated streams.
|
| + """
|
| + self._project = project
|
| + self._prefix = prefix
|
| + self._coordinator_host = coordinator_host
|
| +
|
| self._name_lock = threading.Lock()
|
| self._names = set()
|
|
|
| + @property
|
| + def project(self):
|
| + """Returns (str or None): The stream project, or None if not configured."""
|
| + return self._project
|
| +
|
| + @property
|
| + def prefix(self):
|
| + """Returns (str or None): The stream prefix, or None if not configured."""
|
| + return self._prefix
|
| +
|
| + @property
|
| + def coordinator_host(self):
|
| + """Returns (str or None): The coordinator host, or None if not configured.
|
| + """
|
| + return self._coordinator_host
|
| +
|
| + def get_stream_path(self, name):
|
| + """Returns (streamname.StreamPath): The stream path.
|
| +
|
| + Args:
|
| + name (str): The name of the stream.
|
| +
|
| + Raises:
|
| + KeyError: if information needed to construct the path is missing.
|
| + ValueError: if the stream path is invalid, or if the stream prefix is
|
| + not defined in the client.
|
| + """
|
| + if not self._prefix:
|
| + raise KeyError('Stream prefix is not configured')
|
| + return streamname.StreamPath.make(self._prefix, name)
|
| +
|
| + def get_viewer_url(self, name):
|
| + """Returns (str): The LogDog viewer URL for the named stream.
|
| +
|
| + Args:
|
| + name (str): The name of the stream. This can also be a query glob.
|
| +
|
| + Raises:
|
| + KeyError: if information needed to construct the URL is missing.
|
| + ValueError: if the stream prefix or name do not form a valid stream
|
| + path.
|
| + """
|
| + if not self._coordinator_host:
|
| + raise KeyError('Coordinator host is not configured')
|
| + if not self._project:
|
| + raise KeyError('Stream project is not configured')
|
| +
|
| + return streamname.get_logdog_viewer_url(
|
| + self._coordinator_host,
|
| + self._project,
|
| + self.get_stream_path(name))
|
| +
|
| def _register_new_stream(self, name):
|
| """Registers a new stream name.
|
|
|
| @@ -188,8 +306,8 @@ class StreamClient(object):
|
| self._names.add(name)
|
|
|
| @classmethod
|
| - def _create(cls, value):
|
| - """Returns (StreamClient): A new stream client connection.
|
| + def _create(cls, value, **kwargs):
|
| + """Returns (StreamClient): A new stream client instance.
|
|
|
| Validates the streamserver parameters and creates a new StreamClient
|
| instance that connects to them.
|
| @@ -285,7 +403,7 @@ class StreamClient(object):
|
| tags=tags,
|
| tee=tee,
|
| binary_file_extension=binary_file_extension)
|
| - return self.new_connection(params)
|
| + return self._BasicStream(self, params, self.new_connection(params))
|
|
|
| @contextlib.contextmanager
|
| def binary(self, name, **kwargs):
|
| @@ -338,7 +456,7 @@ class StreamClient(object):
|
| tags=tags,
|
| tee=tee,
|
| binary_file_extension=binary_file_extension)
|
| - return self.new_connection(params)
|
| + return self._BasicStream(self, params, self.new_connection(params))
|
|
|
| @contextlib.contextmanager
|
| def datagram(self, name, **kwargs):
|
| @@ -389,25 +507,25 @@ class StreamClient(object):
|
| tags=tags,
|
| tee=tee,
|
| binary_file_extension=binary_file_extension)
|
| - return self._DatagramStream(self.new_connection(params))
|
| + return self._DatagramStream(self, params, self.new_connection(params))
|
|
|
|
|
| class _NamedPipeStreamClient(StreamClient):
|
| """A StreamClient implementation that connects to a Windows named pipe.
|
| """
|
|
|
| - def __init__(self, name):
|
| + def __init__(self, name, **kwargs):
|
| r"""Initializes a new Windows named pipe stream client.
|
|
|
| Args:
|
| name (str): The name of the Windows named pipe to use (e.g., "\\.\name")
|
| """
|
| - super(_NamedPipeStreamClient, self).__init__()
|
| + super(_NamedPipeStreamClient, self).__init__(**kwargs)
|
| self._name = name
|
|
|
| @classmethod
|
| - def _create(cls, value):
|
| - return cls(value)
|
| + def _create(cls, value, **kwargs):
|
| + return cls(value, **kwargs)
|
|
|
| def _connect_raw(self):
|
| return open(self._name, 'wb')
|
| @@ -432,20 +550,20 @@ class _UnixDomainSocketStreamClient(StreamClient):
|
| self._fd.close()
|
|
|
|
|
| - def __init__(self, path):
|
| + def __init__(self, path, **kwargs):
|
| """Initializes a new UNIX domain socket stream client.
|
|
|
| Args:
|
| path (str): The path to the named UNIX domain socket.
|
| """
|
| - super(_UnixDomainSocketStreamClient, self).__init__()
|
| + super(_UnixDomainSocketStreamClient, self).__init__(**kwargs)
|
| self._path = path
|
|
|
| @classmethod
|
| - def _create(cls, value):
|
| + def _create(cls, value, **kwargs):
|
| if not os.path.exists(value):
|
| raise ValueError('UNIX domain socket [%s] does not exist.' % (value,))
|
| - return cls(value)
|
| + return cls(value, **kwargs)
|
|
|
| def _connect_raw(self):
|
| sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
|