Chromium Code Reviews| Index: client/libs/logdog/stream.py |
| diff --git a/client/libs/logdog/stream.py b/client/libs/logdog/stream.py |
| index 3d60a6c6c948807e209f6819c53297ffa7f540ce..34e95b6f5f1dfa99faaebf76d70a5d4372264580 100644 |
| --- a/client/libs/logdog/stream.py |
| +++ b/client/libs/logdog/stream.py |
| @@ -112,7 +112,21 @@ class StreamProtocolRegistry(object): |
| raise KeyError('Duplicate protocol registered.') |
| self._registry[protocol] = client_cls |
| - def create(self, uri): |
| + def create(self, uri, **kwargs): |
| + """Returns (StreamClient): A stream client for the specified URI. |
| + |
| + This uses the default StreamProtocolRegistry to instantiate a StreamClient |
| + for the specified URI. |
| + |
| + Args: |
| + uri (str): The streamserver URI. |
| + kwargs: keyword arguments to forward to the stream. See |
| + StreamClient.__init__. |
| + |
| + Raises: |
| + ValueError: if the supplied URI references an invalid or improperly |
| + configured streamserver. |
| + """ |
| uri = uri.split(':', 1) |
| if len(uri) != 2: |
| raise ValueError('Invalid stream server URI [%s]' % (uri,)) |
| @@ -121,36 +135,67 @@ class StreamProtocolRegistry(object): |
| client_cls = self._registry.get(protocol) |
| if not client_cls: |
| raise ValueError('Unknown stream client protocol (%s)' % (protocol,)) |
| - return client_cls._create(value) |
| + return client_cls._create(value, **kwargs) |
| + |
| # Default (global) registry. |
| _default_registry = StreamProtocolRegistry() |
| +create = _default_registry.create |
| -def create(uri): |
| - """Returns (StreamClient): A stream client for the specified URI. |
| +class StreamClient(object): |
| + """Abstract base class for a streamserver client. |
| + """ |
| - This uses the default StreamProtocolRegistry to instantiate a StreamClient |
| - for the specified URI. |
| + class _StreamBase(object): |
| + """ABC for StreamClient streams.""" |
| - Args: |
| - uri: The streamserver URI. |
| + def __init__(self, stream_client, params): |
| + self._stream_client = stream_client |
| + self._params = params |
| - Raises: |
| - ValueError if the supplied URI references an invalid or improperly |
| - configured streamserver. |
| - """ |
| - return _default_registry.create(uri) |
| + @property |
| + def params(self): |
| + """Returns (StreamParams): The stream parameters.""" |
| + return self._params |
| + @property |
| + def path(self): |
|
jbudorick
2016/10/27 13:24:06
nit: seems a bit odd to have path be a property an
dnj
2016/10/27 22:42:45
Agreed. I don't really like it, but OTOH having a
jbudorick
2016/10/27 23:42:50
sgtm.
|
| + """Returns (streamname.StreamPath): The stream path. |
| -class StreamClient(object): |
| - """Abstract base class for a streamserver client. |
| - """ |
| + Raises: |
| + ValueError: if the stream path is invalid, or if the stream prefix is |
| + not defined in the client. |
| + """ |
| + return self._stream_client.get_stream_path(self._params.name) |
| + |
| + def get_viewer_url(self): |
| + return self._stream_client.get_viewer_url(self._params.name) |
| + |
| + |
| + class _BasicStream(_StreamBase): |
| + """Wraps a basic file descriptor, offering "write" and "close".""" |
| + |
| + def __init__(self, stream_client, params, fd): |
| + super(StreamClient._BasicStream, self).__init__(stream_client, params) |
| + self._fd = fd |
| + |
| + @property |
| + def fd(self): |
| + return self._fd |
| + |
| + def write(self, data): |
| + return self._fd.write(data) |
| + |
| + def close(self): |
| + return self._fd.close() |
| - class _DatagramStream(object): |
| + |
| + class _DatagramStream(_StreamBase): |
| """Wraps a stream object to write length-prefixed datagrams.""" |
| - def __init__(self, fd): |
| + def __init__(self, stream_client, params, fd): |
| + super(StreamClient._DatagramStream, self).__init__(stream_client, params) |
| self._fd = fd |
| def send(self, data): |
| @@ -160,10 +205,63 @@ class StreamClient(object): |
| def close(self): |
| return self._fd.close() |
| - def __init__(self): |
| + |
| + def __init__(self, prefix=None, coordinator_host=None): |
| + """Constructs a new base StreamClient instance. |
| + |
| + Args: |
| + prefix (str or None): If not None, the log stream session prefix. |
| + coordinator_host (str or None): If not None, the name of the Coordinator |
| + host that this stream client is bound to. This will be used to |
| + construct viewer URLs for generated streams. |
| + """ |
| + self._prefix = prefix |
| + self._coordinator_host = coordinator_host |
| + |
| self._name_lock = threading.Lock() |
| self._names = set() |
| + @property |
| + def prefix(self): |
| + """Returns (str or None): The stream prefix, or None if not configured.""" |
| + return self._prefix |
| + |
| + @property |
| + def coordinator_host(self): |
| + """Returns (str or None): The coordinator host, or None if not configured. |
| + """ |
| + return self._coordinator_host |
| + |
| + def get_stream_path(self, name): |
| + """Returns (streamname.StreamPath): The stream path. |
| + |
| + Args: |
| + name (str): The name of the stream. |
| + |
| + Raises: |
| + ValueError: if the stream path is invalid, or if the stream prefix is |
| + not defined in the client. |
| + """ |
| + if not self._prefix: |
| + raise KeyError('Stream prefix is not configured') |
| + return streamname.StreamPath.make(self._prefix, name) |
| + |
| + def get_viewer_url(self, stream_name): |
| + """Returns (str): The LogDog viewer URL for the named stream. |
| + |
| + Args: |
| + stream_name (str): The name of the stream. This can also be a query glob. |
| + |
| + Raises: |
| + KeyError: If the prefix and Coordinator host are not configured. |
| + """ |
| + if not self._coordinator_host: |
| + raise KeyError('Coordinator host is not configured') |
| + |
| + return streamname.get_logdog_viewer_url( |
|
jbudorick
2016/10/27 13:24:06
nit: there's a streamname module and a stream_name
dnj
2016/10/27 22:42:44
Will change to 'name".
|
| + self._coordinator_host, |
| + self.get_stream_path(stream_name)) |
| + |
| def _register_new_stream(self, name): |
| """Registers a new stream name. |
| @@ -188,8 +286,8 @@ class StreamClient(object): |
| self._names.add(name) |
| @classmethod |
| - def _create(cls, value): |
| - """Returns (StreamClient): A new stream client connection. |
| + def _create(cls, value, **kwargs): |
| + """Returns (StreamClient): A new stream client instance. |
| Validates the streamserver parameters and creates a new StreamClient |
| instance that connects to them. |
| @@ -285,7 +383,7 @@ class StreamClient(object): |
| tags=tags, |
| tee=tee, |
| binary_file_extension=binary_file_extension) |
| - return self.new_connection(params) |
| + return self._BasicStream(self, params, self.new_connection(params)) |
| @contextlib.contextmanager |
| def binary(self, name, **kwargs): |
| @@ -338,7 +436,7 @@ class StreamClient(object): |
| tags=tags, |
| tee=tee, |
| binary_file_extension=binary_file_extension) |
| - return self.new_connection(params) |
| + return self._BasicStream(self, params, self.new_connection(params)) |
| @contextlib.contextmanager |
| def datagram(self, name, **kwargs): |
| @@ -389,25 +487,25 @@ class StreamClient(object): |
| tags=tags, |
| tee=tee, |
| binary_file_extension=binary_file_extension) |
| - return self._DatagramStream(self.new_connection(params)) |
| + return self._DatagramStream(self, params, self.new_connection(params)) |
| class _NamedPipeStreamClient(StreamClient): |
| """A StreamClient implementation that connects to a Windows named pipe. |
| """ |
| - def __init__(self, name): |
| + def __init__(self, name, **kwargs): |
| r"""Initializes a new Windows named pipe stream client. |
| Args: |
| name (str): The name of the Windows named pipe to use (e.g., "\\.\name") |
| """ |
| - super(_NamedPipeStreamClient, self).__init__() |
| + super(_NamedPipeStreamClient, self).__init__(**kwargs) |
| self._name = name |
| @classmethod |
| - def _create(cls, value): |
| - return cls(value) |
| + def _create(cls, value, **kwargs): |
| + return cls(value, **kwargs) |
| def _connect_raw(self): |
| return open(self._name, 'wb') |
| @@ -432,20 +530,20 @@ class _UnixDomainSocketStreamClient(StreamClient): |
| self._fd.close() |
| - def __init__(self, path): |
| + def __init__(self, path, **kwargs): |
| """Initializes a new UNIX domain socket stream client. |
| Args: |
| path (str): The path to the named UNIX domain socket. |
| """ |
| - super(_UnixDomainSocketStreamClient, self).__init__() |
| + super(_UnixDomainSocketStreamClient, self).__init__(**kwargs) |
| self._path = path |
| @classmethod |
| - def _create(cls, value): |
| + def _create(cls, value, **kwargs): |
| if not os.path.exists(value): |
| raise ValueError('UNIX domain socket [%s] does not exist.' % (value,)) |
| - return cls(value) |
| + return cls(value, **kwargs) |
| def _connect_raw(self): |
| sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |