| Index: client/libs/logdog/stream.py
 | 
| diff --git a/client/libs/logdog/stream.py b/client/libs/logdog/stream.py
 | 
| index 3d60a6c6c948807e209f6819c53297ffa7f540ce..ea415e1ffc199315af53b5993373a200311476f4 100644
 | 
| --- a/client/libs/logdog/stream.py
 | 
| +++ b/client/libs/logdog/stream.py
 | 
| @@ -112,7 +112,21 @@ class StreamProtocolRegistry(object):
 | 
|        raise KeyError('Duplicate protocol registered.')
 | 
|      self._registry[protocol] = client_cls
 | 
|  
 | 
| -  def create(self, uri):
 | 
| +  def create(self, uri, **kwargs):
 | 
| +    """Returns (StreamClient): A stream client for the specified URI.
 | 
| +
 | 
| +    This uses the default StreamProtocolRegistry to instantiate a StreamClient
 | 
| +    for the specified URI.
 | 
| +
 | 
| +    Args:
 | 
| +      uri (str): The streamserver URI.
 | 
| +      kwargs: keyword arguments to forward to the stream. See
 | 
| +          StreamClient.__init__.
 | 
| +
 | 
| +    Raises:
 | 
| +      ValueError: if the supplied URI references an invalid or improperly
 | 
| +          configured streamserver.
 | 
| +    """
 | 
|      uri = uri.split(':', 1)
 | 
|      if len(uri) != 2:
 | 
|        raise ValueError('Invalid stream server URI [%s]' % (uri,))
 | 
| @@ -121,36 +135,74 @@ class StreamProtocolRegistry(object):
 | 
|      client_cls = self._registry.get(protocol)
 | 
|      if not client_cls:
 | 
|        raise ValueError('Unknown stream client protocol (%s)' % (protocol,))
 | 
| -    return client_cls._create(value)
 | 
| +    return client_cls._create(value, **kwargs)
 | 
| +
 | 
|  
 | 
|  # Default (global) registry.
 | 
|  _default_registry = StreamProtocolRegistry()
 | 
| +create = _default_registry.create
 | 
|  
 | 
|  
 | 
| -def create(uri):
 | 
| -  """Returns (StreamClient): A stream client for the specified URI.
 | 
| +class StreamClient(object):
 | 
| +  """Abstract base class for a streamserver client.
 | 
| +  """
 | 
|  
 | 
| -  This uses the default StreamProtocolRegistry to instantiate a StreamClient
 | 
| -  for the specified URI.
 | 
| +  class _StreamBase(object):
 | 
| +    """ABC for StreamClient streams."""
 | 
|  
 | 
| -  Args:
 | 
| -    uri: The streamserver URI.
 | 
| +    def __init__(self, stream_client, params):
 | 
| +      self._stream_client = stream_client
 | 
| +      self._params = params
 | 
|  
 | 
| -  Raises:
 | 
| -    ValueError if the supplied URI references an invalid or improperly
 | 
| -        configured streamserver.
 | 
| -  """
 | 
| -  return _default_registry.create(uri)
 | 
| +    @property
 | 
| +    def params(self):
 | 
| +      """Returns (StreamParams): The stream parameters."""
 | 
| +      return self._params
 | 
|  
 | 
| +    @property
 | 
| +    def path(self):
 | 
| +      """Returns (streamname.StreamPath): The stream path.
 | 
|  
 | 
| -class StreamClient(object):
 | 
| -  """Abstract base class for a streamserver client.
 | 
| -  """
 | 
| +      Raises:
 | 
| +        ValueError: if the stream path is invalid, or if the stream prefix is
 | 
| +            not defined in the client.
 | 
| +      """
 | 
| +      return self._stream_client.get_stream_path(self._params.name)
 | 
| +
 | 
| +    def get_viewer_url(self):
 | 
| +      """Returns (str): The viewer URL for this stream.
 | 
| +
 | 
| +      Raises:
 | 
| +        KeyError: if information needed to construct the URL is missing.
 | 
| +        ValueError: if the stream prefix or name do not form a valid stream
 | 
| +            path.
 | 
| +      """
 | 
| +      return self._stream_client.get_viewer_url(self._params.name)
 | 
| +
 | 
| +
 | 
| +  class _BasicStream(_StreamBase):
 | 
| +    """Wraps a basic file descriptor, offering "write" and "close"."""
 | 
| +
 | 
| +    def __init__(self, stream_client, params, fd):
 | 
| +      super(StreamClient._BasicStream, self).__init__(stream_client, params)
 | 
| +      self._fd = fd
 | 
| +
 | 
| +    @property
 | 
| +    def fd(self):
 | 
| +      return self._fd
 | 
| +
 | 
| +    def write(self, data):
 | 
| +      return self._fd.write(data)
 | 
| +
 | 
| +    def close(self):
 | 
| +      return self._fd.close()
 | 
|  
 | 
| -  class _DatagramStream(object):
 | 
| +
 | 
| +  class _DatagramStream(_StreamBase):
 | 
|      """Wraps a stream object to write length-prefixed datagrams."""
 | 
|  
 | 
| -    def __init__(self, fd):
 | 
| +    def __init__(self, stream_client, params, fd):
 | 
| +      super(StreamClient._DatagramStream, self).__init__(stream_client, params)
 | 
|        self._fd = fd
 | 
|  
 | 
|      def send(self, data):
 | 
| @@ -160,10 +212,76 @@ class StreamClient(object):
 | 
|      def close(self):
 | 
|        return self._fd.close()
 | 
|  
 | 
| -  def __init__(self):
 | 
| +
 | 
| +  def __init__(self, project=None, prefix=None, coordinator_host=None):
 | 
| +    """Constructs a new base StreamClient instance.
 | 
| +
 | 
| +    Args:
 | 
| +      project (str or None): If not None, the name of the log stream project.
 | 
| +      prefix (str or None): If not None, the log stream session prefix.
 | 
| +      coordinator_host (str or None): If not None, the name of the Coordinator
 | 
| +          host that this stream client is bound to. This will be used to
 | 
| +          construct viewer URLs for generated streams.
 | 
| +    """
 | 
| +    self._project = project
 | 
| +    self._prefix = prefix
 | 
| +    self._coordinator_host = coordinator_host
 | 
| +
 | 
|      self._name_lock = threading.Lock()
 | 
|      self._names = set()
 | 
|  
 | 
| +  @property
 | 
| +  def project(self):
 | 
| +    """Returns (str or None): The stream project, or None if not configured."""
 | 
| +    return self._project
 | 
| +
 | 
| +  @property
 | 
| +  def prefix(self):
 | 
| +    """Returns (str or None): The stream prefix, or None if not configured."""
 | 
| +    return self._prefix
 | 
| +
 | 
| +  @property
 | 
| +  def coordinator_host(self):
 | 
| +    """Returns (str or None): The coordinator host, or None if not configured.
 | 
| +    """
 | 
| +    return self._coordinator_host
 | 
| +
 | 
| +  def get_stream_path(self, name):
 | 
| +    """Returns (streamname.StreamPath): The stream path.
 | 
| +
 | 
| +    Args:
 | 
| +      name (str): The name of the stream.
 | 
| +
 | 
| +    Raises:
 | 
| +      KeyError: if information needed to construct the path is missing.
 | 
| +      ValueError: if the stream path is invalid, or if the stream prefix is
 | 
| +          not defined in the client.
 | 
| +    """
 | 
| +    if not self._prefix:
 | 
| +      raise KeyError('Stream prefix is not configured')
 | 
| +    return streamname.StreamPath.make(self._prefix, name)
 | 
| +
 | 
| +  def get_viewer_url(self, name):
 | 
| +    """Returns (str): The LogDog viewer URL for the named stream.
 | 
| +
 | 
| +    Args:
 | 
| +      name (str): The name of the stream. This can also be a query glob.
 | 
| +
 | 
| +    Raises:
 | 
| +      KeyError: if information needed to construct the URL is missing.
 | 
| +      ValueError: if the stream prefix or name do not form a valid stream
 | 
| +          path.
 | 
| +    """
 | 
| +    if not self._coordinator_host:
 | 
| +      raise KeyError('Coordinator host is not configured')
 | 
| +    if not self._project:
 | 
| +      raise KeyError('Stream project is not configured')
 | 
| +
 | 
| +    return streamname.get_logdog_viewer_url(
 | 
| +        self._coordinator_host,
 | 
| +        self._project,
 | 
| +        self.get_stream_path(name))
 | 
| +
 | 
|    def _register_new_stream(self, name):
 | 
|      """Registers a new stream name.
 | 
|  
 | 
| @@ -188,8 +306,8 @@ class StreamClient(object):
 | 
|        self._names.add(name)
 | 
|  
 | 
|    @classmethod
 | 
| -  def _create(cls, value):
 | 
| -    """Returns (StreamClient): A new stream client connection.
 | 
| +  def _create(cls, value, **kwargs):
 | 
| +    """Returns (StreamClient): A new stream client instance.
 | 
|  
 | 
|      Validates the streamserver parameters and creates a new StreamClient
 | 
|      instance that connects to them.
 | 
| @@ -285,7 +403,7 @@ class StreamClient(object):
 | 
|          tags=tags,
 | 
|          tee=tee,
 | 
|          binary_file_extension=binary_file_extension)
 | 
| -    return self.new_connection(params)
 | 
| +    return self._BasicStream(self, params, self.new_connection(params))
 | 
|  
 | 
|    @contextlib.contextmanager
 | 
|    def binary(self, name, **kwargs):
 | 
| @@ -338,7 +456,7 @@ class StreamClient(object):
 | 
|          tags=tags,
 | 
|          tee=tee,
 | 
|          binary_file_extension=binary_file_extension)
 | 
| -    return self.new_connection(params)
 | 
| +    return self._BasicStream(self, params, self.new_connection(params))
 | 
|  
 | 
|    @contextlib.contextmanager
 | 
|    def datagram(self, name, **kwargs):
 | 
| @@ -389,25 +507,25 @@ class StreamClient(object):
 | 
|          tags=tags,
 | 
|          tee=tee,
 | 
|          binary_file_extension=binary_file_extension)
 | 
| -    return self._DatagramStream(self.new_connection(params))
 | 
| +    return self._DatagramStream(self, params, self.new_connection(params))
 | 
|  
 | 
|  
 | 
|  class _NamedPipeStreamClient(StreamClient):
 | 
|    """A StreamClient implementation that connects to a Windows named pipe.
 | 
|    """
 | 
|  
 | 
| -  def __init__(self, name):
 | 
| +  def __init__(self, name, **kwargs):
 | 
|      r"""Initializes a new Windows named pipe stream client.
 | 
|  
 | 
|      Args:
 | 
|        name (str): The name of the Windows named pipe to use (e.g., "\\.\name")
 | 
|      """
 | 
| -    super(_NamedPipeStreamClient, self).__init__()
 | 
| +    super(_NamedPipeStreamClient, self).__init__(**kwargs)
 | 
|      self._name = name
 | 
|  
 | 
|    @classmethod
 | 
| -  def _create(cls, value):
 | 
| -    return cls(value)
 | 
| +  def _create(cls, value, **kwargs):
 | 
| +    return cls(value, **kwargs)
 | 
|  
 | 
|    def _connect_raw(self):
 | 
|      return open(self._name, 'wb')
 | 
| @@ -432,20 +550,20 @@ class _UnixDomainSocketStreamClient(StreamClient):
 | 
|        self._fd.close()
 | 
|  
 | 
|  
 | 
| -  def __init__(self, path):
 | 
| +  def __init__(self, path, **kwargs):
 | 
|      """Initializes a new UNIX domain socket stream client.
 | 
|  
 | 
|      Args:
 | 
|        path (str): The path to the named UNIX domain socket.
 | 
|      """
 | 
| -    super(_UnixDomainSocketStreamClient, self).__init__()
 | 
| +    super(_UnixDomainSocketStreamClient, self).__init__(**kwargs)
 | 
|      self._path = path
 | 
|  
 | 
|    @classmethod
 | 
| -  def _create(cls, value):
 | 
| +  def _create(cls, value, **kwargs):
 | 
|      if not os.path.exists(value):
 | 
|        raise ValueError('UNIX domain socket [%s] does not exist.' % (value,))
 | 
| -    return cls(value)
 | 
| +    return cls(value, **kwargs)
 | 
|  
 | 
|    def _connect_raw(self):
 | 
|      sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
 | 
| 
 |