| OLD | NEW |
| 1 # Copyright 2016 The LUCI Authors. All rights reserved. | 1 # Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
| 3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
| 4 | 4 |
| 5 import collections | 5 import collections |
| 6 import contextlib | 6 import contextlib |
| 7 import json | 7 import json |
| 8 import os | 8 import os |
| 9 import socket | 9 import socket |
| 10 import sys | 10 import sys |
| (...skipping 94 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 105 | 105 |
| 106 def __init__(self): | 106 def __init__(self): |
| 107 self._registry = {} | 107 self._registry = {} |
| 108 | 108 |
| 109 def register_protocol(self, protocol, client_cls): | 109 def register_protocol(self, protocol, client_cls): |
| 110 assert issubclass(client_cls, StreamClient) | 110 assert issubclass(client_cls, StreamClient) |
| 111 if self._registry.get(protocol) is not None: | 111 if self._registry.get(protocol) is not None: |
| 112 raise KeyError('Duplicate protocol registered.') | 112 raise KeyError('Duplicate protocol registered.') |
| 113 self._registry[protocol] = client_cls | 113 self._registry[protocol] = client_cls |
| 114 | 114 |
| 115 def create(self, uri): | 115 def create(self, uri, **kwargs): |
| 116 """Returns (StreamClient): A stream client for the specified URI. |
| 117 |
| 118 This uses the default StreamProtocolRegistry to instantiate a StreamClient |
| 119 for the specified URI. |
| 120 |
| 121 Args: |
| 122 uri (str): The streamserver URI. |
| 123 kwargs: keyword arguments to forward to the stream. See |
| 124 StreamClient.__init__. |
| 125 |
| 126 Raises: |
| 127 ValueError: if the supplied URI references an invalid or improperly |
| 128 configured streamserver. |
| 129 """ |
| 116 uri = uri.split(':', 1) | 130 uri = uri.split(':', 1) |
| 117 if len(uri) != 2: | 131 if len(uri) != 2: |
| 118 raise ValueError('Invalid stream server URI [%s]' % (uri,)) | 132 raise ValueError('Invalid stream server URI [%s]' % (uri,)) |
| 119 protocol, value = uri | 133 protocol, value = uri |
| 120 | 134 |
| 121 client_cls = self._registry.get(protocol) | 135 client_cls = self._registry.get(protocol) |
| 122 if not client_cls: | 136 if not client_cls: |
| 123 raise ValueError('Unknown stream client protocol (%s)' % (protocol,)) | 137 raise ValueError('Unknown stream client protocol (%s)' % (protocol,)) |
| 124 return client_cls._create(value) | 138 return client_cls._create(value, **kwargs) |
| 139 |
| 125 | 140 |
| 126 # Default (global) registry. | 141 # Default (global) registry. |
| 127 _default_registry = StreamProtocolRegistry() | 142 _default_registry = StreamProtocolRegistry() |
| 128 | 143 create = _default_registry.create |
| 129 | |
| 130 def create(uri): | |
| 131 """Returns (StreamClient): A stream client for the specified URI. | |
| 132 | |
| 133 This uses the default StreamProtocolRegistry to instantiate a StreamClient | |
| 134 for the specified URI. | |
| 135 | |
| 136 Args: | |
| 137 uri: The streamserver URI. | |
| 138 | |
| 139 Raises: | |
| 140 ValueError if the supplied URI references an invalid or improperly | |
| 141 configured streamserver. | |
| 142 """ | |
| 143 return _default_registry.create(uri) | |
| 144 | 144 |
| 145 | 145 |
| 146 class StreamClient(object): | 146 class StreamClient(object): |
| 147 """Abstract base class for a streamserver client. | 147 """Abstract base class for a streamserver client. |
| 148 """ | 148 """ |
| 149 | 149 |
| 150 class _DatagramStream(object): | 150 class _StreamBase(object): |
| 151 """ABC for StreamClient streams.""" |
| 152 |
| 153 def __init__(self, stream_client, params): |
| 154 self._stream_client = stream_client |
| 155 self._params = params |
| 156 |
| 157 @property |
| 158 def params(self): |
| 159 """Returns (StreamParams): The stream parameters.""" |
| 160 return self._params |
| 161 |
| 162 @property |
| 163 def path(self): |
| 164 """Returns (streamname.StreamPath): The stream path. |
| 165 |
| 166 Raises: |
| 167 ValueError: if the stream path is invalid, or if the stream prefix is |
| 168 not defined in the client. |
| 169 """ |
| 170 return self._stream_client.get_stream_path(self._params.name) |
| 171 |
| 172 def get_viewer_url(self): |
| 173 """Returns (str): The viewer URL for this stream. |
| 174 |
| 175 Raises: |
| 176 KeyError: if information needed to construct the URL is missing. |
| 177 ValueError: if the stream prefix or name do not form a valid stream |
| 178 path. |
| 179 """ |
| 180 return self._stream_client.get_viewer_url(self._params.name) |
| 181 |
| 182 |
| 183 class _BasicStream(_StreamBase): |
| 184 """Wraps a basic file descriptor, offering "write" and "close".""" |
| 185 |
| 186 def __init__(self, stream_client, params, fd): |
| 187 super(StreamClient._BasicStream, self).__init__(stream_client, params) |
| 188 self._fd = fd |
| 189 |
| 190 @property |
| 191 def fd(self): |
| 192 return self._fd |
| 193 |
| 194 def write(self, data): |
| 195 return self._fd.write(data) |
| 196 |
| 197 def close(self): |
| 198 return self._fd.close() |
| 199 |
| 200 |
| 201 class _DatagramStream(_StreamBase): |
| 151 """Wraps a stream object to write length-prefixed datagrams.""" | 202 """Wraps a stream object to write length-prefixed datagrams.""" |
| 152 | 203 |
| 153 def __init__(self, fd): | 204 def __init__(self, stream_client, params, fd): |
| 205 super(StreamClient._DatagramStream, self).__init__(stream_client, params) |
| 154 self._fd = fd | 206 self._fd = fd |
| 155 | 207 |
| 156 def send(self, data): | 208 def send(self, data): |
| 157 varint.write_uvarint(self._fd, len(data)) | 209 varint.write_uvarint(self._fd, len(data)) |
| 158 self._fd.write(data) | 210 self._fd.write(data) |
| 159 | 211 |
| 160 def close(self): | 212 def close(self): |
| 161 return self._fd.close() | 213 return self._fd.close() |
| 162 | 214 |
| 163 def __init__(self): | 215 |
| 216 def __init__(self, project=None, prefix=None, coordinator_host=None): |
| 217 """Constructs a new base StreamClient instance. |
| 218 |
| 219 Args: |
| 220 project (str or None): If not None, the name of the log stream project. |
| 221 prefix (str or None): If not None, the log stream session prefix. |
| 222 coordinator_host (str or None): If not None, the name of the Coordinator |
| 223 host that this stream client is bound to. This will be used to |
| 224 construct viewer URLs for generated streams. |
| 225 """ |
| 226 self._project = project |
| 227 self._prefix = prefix |
| 228 self._coordinator_host = coordinator_host |
| 229 |
| 164 self._name_lock = threading.Lock() | 230 self._name_lock = threading.Lock() |
| 165 self._names = set() | 231 self._names = set() |
| 166 | 232 |
| 233 @property |
| 234 def project(self): |
| 235 """Returns (str or None): The stream project, or None if not configured.""" |
| 236 return self._project |
| 237 |
| 238 @property |
| 239 def prefix(self): |
| 240 """Returns (str or None): The stream prefix, or None if not configured.""" |
| 241 return self._prefix |
| 242 |
| 243 @property |
| 244 def coordinator_host(self): |
| 245 """Returns (str or None): The coordinator host, or None if not configured. |
| 246 """ |
| 247 return self._coordinator_host |
| 248 |
| 249 def get_stream_path(self, name): |
| 250 """Returns (streamname.StreamPath): The stream path. |
| 251 |
| 252 Args: |
| 253 name (str): The name of the stream. |
| 254 |
| 255 Raises: |
| 256 KeyError: if information needed to construct the path is missing. |
| 257 ValueError: if the stream path is invalid, or if the stream prefix is |
| 258 not defined in the client. |
| 259 """ |
| 260 if not self._prefix: |
| 261 raise KeyError('Stream prefix is not configured') |
| 262 return streamname.StreamPath.make(self._prefix, name) |
| 263 |
| 264 def get_viewer_url(self, name): |
| 265 """Returns (str): The LogDog viewer URL for the named stream. |
| 266 |
| 267 Args: |
| 268 name (str): The name of the stream. This can also be a query glob. |
| 269 |
| 270 Raises: |
| 271 KeyError: if information needed to construct the URL is missing. |
| 272 ValueError: if the stream prefix or name do not form a valid stream |
| 273 path. |
| 274 """ |
| 275 if not self._coordinator_host: |
| 276 raise KeyError('Coordinator host is not configured') |
| 277 if not self._project: |
| 278 raise KeyError('Stream project is not configured') |
| 279 |
| 280 return streamname.get_logdog_viewer_url( |
| 281 self._coordinator_host, |
| 282 self._project, |
| 283 self.get_stream_path(name)) |
| 284 |
| 167 def _register_new_stream(self, name): | 285 def _register_new_stream(self, name): |
| 168 """Registers a new stream name. | 286 """Registers a new stream name. |
| 169 | 287 |
| 170 The Butler will internally reject any duplicate stream names. However, there | 288 The Butler will internally reject any duplicate stream names. However, there |
| 171 isn't really feedback when this happens except a closed stream client. This | 289 isn't really feedback when this happens except a closed stream client. This |
| 172 is a client-side check to provide a more user-friendly experience in the | 290 is a client-side check to provide a more user-friendly experience in the |
| 173 event that a user attempts to register a duplicate stream name. | 291 event that a user attempts to register a duplicate stream name. |
| 174 | 292 |
| 175 Note that this is imperfect, as something else could register stream names | 293 Note that this is imperfect, as something else could register stream names |
| 176 with the same Butler instance and this library has no means of tracking. | 294 with the same Butler instance and this library has no means of tracking. |
| 177 This is a best-effort experience, not a reliable check. | 295 This is a best-effort experience, not a reliable check. |
| 178 | 296 |
| 179 Args: | 297 Args: |
| 180 name (str): The name of the stream. | 298 name (str): The name of the stream. |
| 181 | 299 |
| 182 Raises: | 300 Raises: |
| 183 ValueError if the stream name has already been registered. | 301 ValueError if the stream name has already been registered. |
| 184 """ | 302 """ |
| 185 with self._name_lock: | 303 with self._name_lock: |
| 186 if name in self._names: | 304 if name in self._names: |
| 187 raise ValueError("Duplicate stream name [%s]" % (name,)) | 305 raise ValueError("Duplicate stream name [%s]" % (name,)) |
| 188 self._names.add(name) | 306 self._names.add(name) |
| 189 | 307 |
| 190 @classmethod | 308 @classmethod |
| 191 def _create(cls, value): | 309 def _create(cls, value, **kwargs): |
| 192 """Returns (StreamClient): A new stream client connection. | 310 """Returns (StreamClient): A new stream client instance. |
| 193 | 311 |
| 194 Validates the streamserver parameters and creates a new StreamClient | 312 Validates the streamserver parameters and creates a new StreamClient |
| 195 instance that connects to them. | 313 instance that connects to them. |
| 196 | 314 |
| 197 Implementing classes must override this. | 315 Implementing classes must override this. |
| 198 """ | 316 """ |
| 199 raise NotImplementedError() | 317 raise NotImplementedError() |
| 200 | 318 |
| 201 def _connect_raw(self): | 319 def _connect_raw(self): |
| 202 """Returns (file): A new file-like stream. | 320 """Returns (file): A new file-like stream. |
| (...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 278 have UTF-8 text content written to it with its `write` method, and must | 396 have UTF-8 text content written to it with its `write` method, and must |
| 279 be closed when finished using its `close` method. | 397 be closed when finished using its `close` method. |
| 280 """ | 398 """ |
| 281 params = StreamParams.make( | 399 params = StreamParams.make( |
| 282 name=name, | 400 name=name, |
| 283 type=StreamParams.TEXT, | 401 type=StreamParams.TEXT, |
| 284 content_type=content_type, | 402 content_type=content_type, |
| 285 tags=tags, | 403 tags=tags, |
| 286 tee=tee, | 404 tee=tee, |
| 287 binary_file_extension=binary_file_extension) | 405 binary_file_extension=binary_file_extension) |
| 288 return self.new_connection(params) | 406 return self._BasicStream(self, params, self.new_connection(params)) |
| 289 | 407 |
| 290 @contextlib.contextmanager | 408 @contextlib.contextmanager |
| 291 def binary(self, name, **kwargs): | 409 def binary(self, name, **kwargs): |
| 292 """Context manager to create, use, and teardown a BINARY stream. | 410 """Context manager to create, use, and teardown a BINARY stream. |
| 293 | 411 |
| 294 This context manager creates a new butler BINARY stream with the specified | 412 This context manager creates a new butler BINARY stream with the specified |
| 295 parameters, yields it, and closes it on teardown. | 413 parameters, yields it, and closes it on teardown. |
| 296 | 414 |
| 297 Args: | 415 Args: |
| 298 name (str): the LogDog name of the stream. | 416 name (str): the LogDog name of the stream. |
| (...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 331 can have UTF-8 content written to it with its `write` method, and must | 449 can have UTF-8 content written to it with its `write` method, and must |
| 332 be closed when finished using its `close` method. | 450 be closed when finished using its `close` method. |
| 333 """ | 451 """ |
| 334 params = StreamParams.make( | 452 params = StreamParams.make( |
| 335 name=name, | 453 name=name, |
| 336 type=StreamParams.BINARY, | 454 type=StreamParams.BINARY, |
| 337 content_type=content_type, | 455 content_type=content_type, |
| 338 tags=tags, | 456 tags=tags, |
| 339 tee=tee, | 457 tee=tee, |
| 340 binary_file_extension=binary_file_extension) | 458 binary_file_extension=binary_file_extension) |
| 341 return self.new_connection(params) | 459 return self._BasicStream(self, params, self.new_connection(params)) |
| 342 | 460 |
| 343 @contextlib.contextmanager | 461 @contextlib.contextmanager |
| 344 def datagram(self, name, **kwargs): | 462 def datagram(self, name, **kwargs): |
| 345 """Context manager to create, use, and teardown a DATAGRAM stream. | 463 """Context manager to create, use, and teardown a DATAGRAM stream. |
| 346 | 464 |
| 347 This context manager creates a new butler DATAAGRAM stream with the | 465 This context manager creates a new butler DATAAGRAM stream with the |
| 348 specified parameters, yields it, and closes it on teardown. | 466 specified parameters, yields it, and closes it on teardown. |
| 349 | 467 |
| 350 Args: | 468 Args: |
| 351 name (str): the LogDog name of the stream. | 469 name (str): the LogDog name of the stream. |
| (...skipping 30 matching lines...) Expand all Loading... |
| 382 written to it using its `send` method. This object must be closed when | 500 written to it using its `send` method. This object must be closed when |
| 383 finished by using its `close` method. | 501 finished by using its `close` method. |
| 384 """ | 502 """ |
| 385 params = StreamParams.make( | 503 params = StreamParams.make( |
| 386 name=name, | 504 name=name, |
| 387 type=StreamParams.DATAGRAM, | 505 type=StreamParams.DATAGRAM, |
| 388 content_type=content_type, | 506 content_type=content_type, |
| 389 tags=tags, | 507 tags=tags, |
| 390 tee=tee, | 508 tee=tee, |
| 391 binary_file_extension=binary_file_extension) | 509 binary_file_extension=binary_file_extension) |
| 392 return self._DatagramStream(self.new_connection(params)) | 510 return self._DatagramStream(self, params, self.new_connection(params)) |
| 393 | 511 |
| 394 | 512 |
| 395 class _NamedPipeStreamClient(StreamClient): | 513 class _NamedPipeStreamClient(StreamClient): |
| 396 """A StreamClient implementation that connects to a Windows named pipe. | 514 """A StreamClient implementation that connects to a Windows named pipe. |
| 397 """ | 515 """ |
| 398 | 516 |
| 399 def __init__(self, name): | 517 def __init__(self, name, **kwargs): |
| 400 r"""Initializes a new Windows named pipe stream client. | 518 r"""Initializes a new Windows named pipe stream client. |
| 401 | 519 |
| 402 Args: | 520 Args: |
| 403 name (str): The name of the Windows named pipe to use (e.g., "\\.\name") | 521 name (str): The name of the Windows named pipe to use (e.g., "\\.\name") |
| 404 """ | 522 """ |
| 405 super(_NamedPipeStreamClient, self).__init__() | 523 super(_NamedPipeStreamClient, self).__init__(**kwargs) |
| 406 self._name = name | 524 self._name = name |
| 407 | 525 |
| 408 @classmethod | 526 @classmethod |
| 409 def _create(cls, value): | 527 def _create(cls, value, **kwargs): |
| 410 return cls(value) | 528 return cls(value, **kwargs) |
| 411 | 529 |
| 412 def _connect_raw(self): | 530 def _connect_raw(self): |
| 413 return open(self._name, 'wb') | 531 return open(self._name, 'wb') |
| 414 | 532 |
| 415 _default_registry.register_protocol('net.pipe', _NamedPipeStreamClient) | 533 _default_registry.register_protocol('net.pipe', _NamedPipeStreamClient) |
| 416 | 534 |
| 417 | 535 |
| 418 class _UnixDomainSocketStreamClient(StreamClient): | 536 class _UnixDomainSocketStreamClient(StreamClient): |
| 419 """A StreamClient implementation that uses a UNIX domain socket. | 537 """A StreamClient implementation that uses a UNIX domain socket. |
| 420 """ | 538 """ |
| 421 | 539 |
| 422 class SocketFile(object): | 540 class SocketFile(object): |
| 423 """A write-only file-like object that writes to a UNIX socket.""" | 541 """A write-only file-like object that writes to a UNIX socket.""" |
| 424 | 542 |
| 425 def __init__(self, fd): | 543 def __init__(self, fd): |
| 426 self._fd = fd | 544 self._fd = fd |
| 427 | 545 |
| 428 def write(self, data): | 546 def write(self, data): |
| 429 self._fd.send(data) | 547 self._fd.send(data) |
| 430 | 548 |
| 431 def close(self): | 549 def close(self): |
| 432 self._fd.close() | 550 self._fd.close() |
| 433 | 551 |
| 434 | 552 |
| 435 def __init__(self, path): | 553 def __init__(self, path, **kwargs): |
| 436 """Initializes a new UNIX domain socket stream client. | 554 """Initializes a new UNIX domain socket stream client. |
| 437 | 555 |
| 438 Args: | 556 Args: |
| 439 path (str): The path to the named UNIX domain socket. | 557 path (str): The path to the named UNIX domain socket. |
| 440 """ | 558 """ |
| 441 super(_UnixDomainSocketStreamClient, self).__init__() | 559 super(_UnixDomainSocketStreamClient, self).__init__(**kwargs) |
| 442 self._path = path | 560 self._path = path |
| 443 | 561 |
| 444 @classmethod | 562 @classmethod |
| 445 def _create(cls, value): | 563 def _create(cls, value, **kwargs): |
| 446 if not os.path.exists(value): | 564 if not os.path.exists(value): |
| 447 raise ValueError('UNIX domain socket [%s] does not exist.' % (value,)) | 565 raise ValueError('UNIX domain socket [%s] does not exist.' % (value,)) |
| 448 return cls(value) | 566 return cls(value, **kwargs) |
| 449 | 567 |
| 450 def _connect_raw(self): | 568 def _connect_raw(self): |
| 451 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | 569 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
| 452 sock.connect(self._path) | 570 sock.connect(self._path) |
| 453 return self.SocketFile(sock) | 571 return self.SocketFile(sock) |
| 454 | 572 |
| 455 _default_registry.register_protocol('unix', _UnixDomainSocketStreamClient) | 573 _default_registry.register_protocol('unix', _UnixDomainSocketStreamClient) |
| OLD | NEW |