Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 # Copyright 2016 The LUCI Authors. All rights reserved. | 1 # Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
| 3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
| 4 | 4 |
| 5 import collections | 5 import collections |
| 6 import contextlib | 6 import contextlib |
| 7 import json | 7 import json |
| 8 import os | 8 import os |
| 9 import socket | 9 import socket |
| 10 import sys | 10 import sys |
| (...skipping 94 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 105 | 105 |
| 106 def __init__(self): | 106 def __init__(self): |
| 107 self._registry = {} | 107 self._registry = {} |
| 108 | 108 |
| 109 def register_protocol(self, protocol, client_cls): | 109 def register_protocol(self, protocol, client_cls): |
| 110 assert issubclass(client_cls, StreamClient) | 110 assert issubclass(client_cls, StreamClient) |
| 111 if self._registry.get(protocol) is not None: | 111 if self._registry.get(protocol) is not None: |
| 112 raise KeyError('Duplicate protocol registered.') | 112 raise KeyError('Duplicate protocol registered.') |
| 113 self._registry[protocol] = client_cls | 113 self._registry[protocol] = client_cls |
| 114 | 114 |
| 115 def create(self, uri): | 115 def create(self, uri, **kwargs): |
| 116 """Returns (StreamClient): A stream client for the specified URI. | |
| 117 | |
| 118 This uses the default StreamProtocolRegistry to instantiate a StreamClient | |
| 119 for the specified URI. | |
| 120 | |
| 121 Args: | |
| 122 uri (str): The streamserver URI. | |
| 123 kwargs: keyword arguments to forward to the stream. See | |
| 124 StreamClient.__init__. | |
| 125 | |
| 126 Raises: | |
| 127 ValueError: if the supplied URI references an invalid or improperly | |
| 128 configured streamserver. | |
| 129 """ | |
| 116 uri = uri.split(':', 1) | 130 uri = uri.split(':', 1) |
| 117 if len(uri) != 2: | 131 if len(uri) != 2: |
| 118 raise ValueError('Invalid stream server URI [%s]' % (uri,)) | 132 raise ValueError('Invalid stream server URI [%s]' % (uri,)) |
| 119 protocol, value = uri | 133 protocol, value = uri |
| 120 | 134 |
| 121 client_cls = self._registry.get(protocol) | 135 client_cls = self._registry.get(protocol) |
| 122 if not client_cls: | 136 if not client_cls: |
| 123 raise ValueError('Unknown stream client protocol (%s)' % (protocol,)) | 137 raise ValueError('Unknown stream client protocol (%s)' % (protocol,)) |
| 124 return client_cls._create(value) | 138 return client_cls._create(value, **kwargs) |
| 139 | |
| 125 | 140 |
| 126 # Default (global) registry. | 141 # Default (global) registry. |
| 127 _default_registry = StreamProtocolRegistry() | 142 _default_registry = StreamProtocolRegistry() |
| 128 | 143 create = _default_registry.create |
| 129 | |
| 130 def create(uri): | |
| 131 """Returns (StreamClient): A stream client for the specified URI. | |
| 132 | |
| 133 This uses the default StreamProtocolRegistry to instantiate a StreamClient | |
| 134 for the specified URI. | |
| 135 | |
| 136 Args: | |
| 137 uri: The streamserver URI. | |
| 138 | |
| 139 Raises: | |
| 140 ValueError if the supplied URI references an invalid or improperly | |
| 141 configured streamserver. | |
| 142 """ | |
| 143 return _default_registry.create(uri) | |
| 144 | 144 |
| 145 | 145 |
| 146 class StreamClient(object): | 146 class StreamClient(object): |
| 147 """Abstract base class for a streamserver client. | 147 """Abstract base class for a streamserver client. |
| 148 """ | 148 """ |
| 149 | 149 |
| 150 class _DatagramStream(object): | 150 class _StreamBase(object): |
| 151 """ABC for StreamClient streams.""" | |
| 152 | |
| 153 def __init__(self, stream_client, params): | |
| 154 self._stream_client = stream_client | |
| 155 self._params = params | |
| 156 | |
| 157 @property | |
| 158 def params(self): | |
| 159 """Returns (StreamParams): The stream parameters.""" | |
| 160 return self._params | |
| 161 | |
| 162 @property | |
| 163 def path(self): | |
|
jbudorick
2016/10/27 13:24:06
nit: seems a bit odd to have path be a property an
dnj
2016/10/27 22:42:45
Agreed. I don't really like it, but OTOH having a
jbudorick
2016/10/27 23:42:50
sgtm.
| |
| 164 """Returns (streamname.StreamPath): The stream path. | |
| 165 | |
| 166 Raises: | |
| 167 ValueError: if the stream path is invalid, or if the stream prefix is | |
| 168 not defined in the client. | |
| 169 """ | |
| 170 return self._stream_client.get_stream_path(self._params.name) | |
| 171 | |
| 172 def get_viewer_url(self): | |
| 173 return self._stream_client.get_viewer_url(self._params.name) | |
| 174 | |
| 175 | |
| 176 class _BasicStream(_StreamBase): | |
| 177 """Wraps a basic file descriptor, offering "write" and "close".""" | |
| 178 | |
| 179 def __init__(self, stream_client, params, fd): | |
| 180 super(StreamClient._BasicStream, self).__init__(stream_client, params) | |
| 181 self._fd = fd | |
| 182 | |
| 183 @property | |
| 184 def fd(self): | |
| 185 return self._fd | |
| 186 | |
| 187 def write(self, data): | |
| 188 return self._fd.write(data) | |
| 189 | |
| 190 def close(self): | |
| 191 return self._fd.close() | |
| 192 | |
| 193 | |
| 194 class _DatagramStream(_StreamBase): | |
| 151 """Wraps a stream object to write length-prefixed datagrams.""" | 195 """Wraps a stream object to write length-prefixed datagrams.""" |
| 152 | 196 |
| 153 def __init__(self, fd): | 197 def __init__(self, stream_client, params, fd): |
| 198 super(StreamClient._DatagramStream, self).__init__(stream_client, params) | |
| 154 self._fd = fd | 199 self._fd = fd |
| 155 | 200 |
| 156 def send(self, data): | 201 def send(self, data): |
| 157 varint.write_uvarint(self._fd, len(data)) | 202 varint.write_uvarint(self._fd, len(data)) |
| 158 self._fd.write(data) | 203 self._fd.write(data) |
| 159 | 204 |
| 160 def close(self): | 205 def close(self): |
| 161 return self._fd.close() | 206 return self._fd.close() |
| 162 | 207 |
| 163 def __init__(self): | 208 |
| 209 def __init__(self, prefix=None, coordinator_host=None): | |
| 210 """Constructs a new base StreamClient instance. | |
| 211 | |
| 212 Args: | |
| 213 prefix (str or None): If not None, the log stream session prefix. | |
| 214 coordinator_host (str or None): If not None, the name of the Coordinator | |
| 215 host that this stream client is bound to. This will be used to | |
| 216 construct viewer URLs for generated streams. | |
| 217 """ | |
| 218 self._prefix = prefix | |
| 219 self._coordinator_host = coordinator_host | |
| 220 | |
| 164 self._name_lock = threading.Lock() | 221 self._name_lock = threading.Lock() |
| 165 self._names = set() | 222 self._names = set() |
| 166 | 223 |
| 224 @property | |
| 225 def prefix(self): | |
| 226 """Returns (str or None): The stream prefix, or None if not configured.""" | |
| 227 return self._prefix | |
| 228 | |
| 229 @property | |
| 230 def coordinator_host(self): | |
| 231 """Returns (str or None): The coordinator host, or None if not configured. | |
| 232 """ | |
| 233 return self._coordinator_host | |
| 234 | |
| 235 def get_stream_path(self, name): | |
| 236 """Returns (streamname.StreamPath): The stream path. | |
| 237 | |
| 238 Args: | |
| 239 name (str): The name of the stream. | |
| 240 | |
| 241 Raises: | |
| 242 ValueError: if the stream path is invalid, or if the stream prefix is | |
| 243 not defined in the client. | |
| 244 """ | |
| 245 if not self._prefix: | |
| 246 raise KeyError('Stream prefix is not configured') | |
| 247 return streamname.StreamPath.make(self._prefix, name) | |
| 248 | |
| 249 def get_viewer_url(self, stream_name): | |
| 250 """Returns (str): The LogDog viewer URL for the named stream. | |
| 251 | |
| 252 Args: | |
| 253 stream_name (str): The name of the stream. This can also be a query glob. | |
| 254 | |
| 255 Raises: | |
| 256 KeyError: If the prefix and Coordinator host are not configured. | |
| 257 """ | |
| 258 if not self._coordinator_host: | |
| 259 raise KeyError('Coordinator host is not configured') | |
| 260 | |
| 261 return streamname.get_logdog_viewer_url( | |
|
jbudorick
2016/10/27 13:24:06
nit: there's a streamname module and a stream_name
dnj
2016/10/27 22:42:44
Will change to 'name".
| |
| 262 self._coordinator_host, | |
| 263 self.get_stream_path(stream_name)) | |
| 264 | |
| 167 def _register_new_stream(self, name): | 265 def _register_new_stream(self, name): |
| 168 """Registers a new stream name. | 266 """Registers a new stream name. |
| 169 | 267 |
| 170 The Butler will internally reject any duplicate stream names. However, there | 268 The Butler will internally reject any duplicate stream names. However, there |
| 171 isn't really feedback when this happens except a closed stream client. This | 269 isn't really feedback when this happens except a closed stream client. This |
| 172 is a client-side check to provide a more user-friendly experience in the | 270 is a client-side check to provide a more user-friendly experience in the |
| 173 event that a user attempts to register a duplicate stream name. | 271 event that a user attempts to register a duplicate stream name. |
| 174 | 272 |
| 175 Note that this is imperfect, as something else could register stream names | 273 Note that this is imperfect, as something else could register stream names |
| 176 with the same Butler instance and this library has no means of tracking. | 274 with the same Butler instance and this library has no means of tracking. |
| 177 This is a best-effort experience, not a reliable check. | 275 This is a best-effort experience, not a reliable check. |
| 178 | 276 |
| 179 Args: | 277 Args: |
| 180 name (str): The name of the stream. | 278 name (str): The name of the stream. |
| 181 | 279 |
| 182 Raises: | 280 Raises: |
| 183 ValueError if the stream name has already been registered. | 281 ValueError if the stream name has already been registered. |
| 184 """ | 282 """ |
| 185 with self._name_lock: | 283 with self._name_lock: |
| 186 if name in self._names: | 284 if name in self._names: |
| 187 raise ValueError("Duplicate stream name [%s]" % (name,)) | 285 raise ValueError("Duplicate stream name [%s]" % (name,)) |
| 188 self._names.add(name) | 286 self._names.add(name) |
| 189 | 287 |
| 190 @classmethod | 288 @classmethod |
| 191 def _create(cls, value): | 289 def _create(cls, value, **kwargs): |
| 192 """Returns (StreamClient): A new stream client connection. | 290 """Returns (StreamClient): A new stream client instance. |
| 193 | 291 |
| 194 Validates the streamserver parameters and creates a new StreamClient | 292 Validates the streamserver parameters and creates a new StreamClient |
| 195 instance that connects to them. | 293 instance that connects to them. |
| 196 | 294 |
| 197 Implementing classes must override this. | 295 Implementing classes must override this. |
| 198 """ | 296 """ |
| 199 raise NotImplementedError() | 297 raise NotImplementedError() |
| 200 | 298 |
| 201 def _connect_raw(self): | 299 def _connect_raw(self): |
| 202 """Returns (file): A new file-like stream. | 300 """Returns (file): A new file-like stream. |
| (...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 278 have UTF-8 text content written to it with its `write` method, and must | 376 have UTF-8 text content written to it with its `write` method, and must |
| 279 be closed when finished using its `close` method. | 377 be closed when finished using its `close` method. |
| 280 """ | 378 """ |
| 281 params = StreamParams.make( | 379 params = StreamParams.make( |
| 282 name=name, | 380 name=name, |
| 283 type=StreamParams.TEXT, | 381 type=StreamParams.TEXT, |
| 284 content_type=content_type, | 382 content_type=content_type, |
| 285 tags=tags, | 383 tags=tags, |
| 286 tee=tee, | 384 tee=tee, |
| 287 binary_file_extension=binary_file_extension) | 385 binary_file_extension=binary_file_extension) |
| 288 return self.new_connection(params) | 386 return self._BasicStream(self, params, self.new_connection(params)) |
| 289 | 387 |
| 290 @contextlib.contextmanager | 388 @contextlib.contextmanager |
| 291 def binary(self, name, **kwargs): | 389 def binary(self, name, **kwargs): |
| 292 """Context manager to create, use, and teardown a BINARY stream. | 390 """Context manager to create, use, and teardown a BINARY stream. |
| 293 | 391 |
| 294 This context manager creates a new butler BINARY stream with the specified | 392 This context manager creates a new butler BINARY stream with the specified |
| 295 parameters, yields it, and closes it on teardown. | 393 parameters, yields it, and closes it on teardown. |
| 296 | 394 |
| 297 Args: | 395 Args: |
| 298 name (str): the LogDog name of the stream. | 396 name (str): the LogDog name of the stream. |
| (...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 331 can have UTF-8 content written to it with its `write` method, and must | 429 can have UTF-8 content written to it with its `write` method, and must |
| 332 be closed when finished using its `close` method. | 430 be closed when finished using its `close` method. |
| 333 """ | 431 """ |
| 334 params = StreamParams.make( | 432 params = StreamParams.make( |
| 335 name=name, | 433 name=name, |
| 336 type=StreamParams.BINARY, | 434 type=StreamParams.BINARY, |
| 337 content_type=content_type, | 435 content_type=content_type, |
| 338 tags=tags, | 436 tags=tags, |
| 339 tee=tee, | 437 tee=tee, |
| 340 binary_file_extension=binary_file_extension) | 438 binary_file_extension=binary_file_extension) |
| 341 return self.new_connection(params) | 439 return self._BasicStream(self, params, self.new_connection(params)) |
| 342 | 440 |
| 343 @contextlib.contextmanager | 441 @contextlib.contextmanager |
| 344 def datagram(self, name, **kwargs): | 442 def datagram(self, name, **kwargs): |
| 345 """Context manager to create, use, and teardown a DATAGRAM stream. | 443 """Context manager to create, use, and teardown a DATAGRAM stream. |
| 346 | 444 |
| 347 This context manager creates a new butler DATAAGRAM stream with the | 445 This context manager creates a new butler DATAAGRAM stream with the |
| 348 specified parameters, yields it, and closes it on teardown. | 446 specified parameters, yields it, and closes it on teardown. |
| 349 | 447 |
| 350 Args: | 448 Args: |
| 351 name (str): the LogDog name of the stream. | 449 name (str): the LogDog name of the stream. |
| (...skipping 30 matching lines...) Expand all Loading... | |
| 382 written to it using its `send` method. This object must be closed when | 480 written to it using its `send` method. This object must be closed when |
| 383 finished by using its `close` method. | 481 finished by using its `close` method. |
| 384 """ | 482 """ |
| 385 params = StreamParams.make( | 483 params = StreamParams.make( |
| 386 name=name, | 484 name=name, |
| 387 type=StreamParams.DATAGRAM, | 485 type=StreamParams.DATAGRAM, |
| 388 content_type=content_type, | 486 content_type=content_type, |
| 389 tags=tags, | 487 tags=tags, |
| 390 tee=tee, | 488 tee=tee, |
| 391 binary_file_extension=binary_file_extension) | 489 binary_file_extension=binary_file_extension) |
| 392 return self._DatagramStream(self.new_connection(params)) | 490 return self._DatagramStream(self, params, self.new_connection(params)) |
| 393 | 491 |
| 394 | 492 |
| 395 class _NamedPipeStreamClient(StreamClient): | 493 class _NamedPipeStreamClient(StreamClient): |
| 396 """A StreamClient implementation that connects to a Windows named pipe. | 494 """A StreamClient implementation that connects to a Windows named pipe. |
| 397 """ | 495 """ |
| 398 | 496 |
| 399 def __init__(self, name): | 497 def __init__(self, name, **kwargs): |
| 400 r"""Initializes a new Windows named pipe stream client. | 498 r"""Initializes a new Windows named pipe stream client. |
| 401 | 499 |
| 402 Args: | 500 Args: |
| 403 name (str): The name of the Windows named pipe to use (e.g., "\\.\name") | 501 name (str): The name of the Windows named pipe to use (e.g., "\\.\name") |
| 404 """ | 502 """ |
| 405 super(_NamedPipeStreamClient, self).__init__() | 503 super(_NamedPipeStreamClient, self).__init__(**kwargs) |
| 406 self._name = name | 504 self._name = name |
| 407 | 505 |
| 408 @classmethod | 506 @classmethod |
| 409 def _create(cls, value): | 507 def _create(cls, value, **kwargs): |
| 410 return cls(value) | 508 return cls(value, **kwargs) |
| 411 | 509 |
| 412 def _connect_raw(self): | 510 def _connect_raw(self): |
| 413 return open(self._name, 'wb') | 511 return open(self._name, 'wb') |
| 414 | 512 |
| 415 _default_registry.register_protocol('net.pipe', _NamedPipeStreamClient) | 513 _default_registry.register_protocol('net.pipe', _NamedPipeStreamClient) |
| 416 | 514 |
| 417 | 515 |
| 418 class _UnixDomainSocketStreamClient(StreamClient): | 516 class _UnixDomainSocketStreamClient(StreamClient): |
| 419 """A StreamClient implementation that uses a UNIX domain socket. | 517 """A StreamClient implementation that uses a UNIX domain socket. |
| 420 """ | 518 """ |
| 421 | 519 |
| 422 class SocketFile(object): | 520 class SocketFile(object): |
| 423 """A write-only file-like object that writes to a UNIX socket.""" | 521 """A write-only file-like object that writes to a UNIX socket.""" |
| 424 | 522 |
| 425 def __init__(self, fd): | 523 def __init__(self, fd): |
| 426 self._fd = fd | 524 self._fd = fd |
| 427 | 525 |
| 428 def write(self, data): | 526 def write(self, data): |
| 429 self._fd.send(data) | 527 self._fd.send(data) |
| 430 | 528 |
| 431 def close(self): | 529 def close(self): |
| 432 self._fd.close() | 530 self._fd.close() |
| 433 | 531 |
| 434 | 532 |
| 435 def __init__(self, path): | 533 def __init__(self, path, **kwargs): |
| 436 """Initializes a new UNIX domain socket stream client. | 534 """Initializes a new UNIX domain socket stream client. |
| 437 | 535 |
| 438 Args: | 536 Args: |
| 439 path (str): The path to the named UNIX domain socket. | 537 path (str): The path to the named UNIX domain socket. |
| 440 """ | 538 """ |
| 441 super(_UnixDomainSocketStreamClient, self).__init__() | 539 super(_UnixDomainSocketStreamClient, self).__init__(**kwargs) |
| 442 self._path = path | 540 self._path = path |
| 443 | 541 |
| 444 @classmethod | 542 @classmethod |
| 445 def _create(cls, value): | 543 def _create(cls, value, **kwargs): |
| 446 if not os.path.exists(value): | 544 if not os.path.exists(value): |
| 447 raise ValueError('UNIX domain socket [%s] does not exist.' % (value,)) | 545 raise ValueError('UNIX domain socket [%s] does not exist.' % (value,)) |
| 448 return cls(value) | 546 return cls(value, **kwargs) |
| 449 | 547 |
| 450 def _connect_raw(self): | 548 def _connect_raw(self): |
| 451 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | 549 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
| 452 sock.connect(self._path) | 550 sock.connect(self._path) |
| 453 return self.SocketFile(sock) | 551 return self.SocketFile(sock) |
| 454 | 552 |
| 455 _default_registry.register_protocol('unix', _UnixDomainSocketStreamClient) | 553 _default_registry.register_protocol('unix', _UnixDomainSocketStreamClient) |
| OLD | NEW |