| OLD | NEW |
| 1 # Copyright 2016 The LUCI Authors. All rights reserved. | 1 # Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
| 3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
| 4 | 4 |
| 5 import collections | 5 import collections |
| 6 import contextlib | 6 import contextlib |
| 7 import json | 7 import json |
| 8 import os | 8 import os |
| 9 import socket | 9 import socket |
| 10 import sys | 10 import sys |
| 11 import threading | 11 import threading |
| 12 import types | 12 import types |
| 13 | 13 |
| 14 from libs.logdog import streamname, varint | 14 from libs.logdog import streamname, varint |
| 15 | 15 |
| 16 | 16 |
| 17 _StreamParamsBase = collections.namedtuple('_StreamParamsBase', | 17 _StreamParamsBase = collections.namedtuple('_StreamParamsBase', |
| 18 ('name', 'type', 'content_type', 'tags', 'tee', 'binary_file_extension')) | 18 ('name', 'type', 'content_type', 'tags', 'tee', 'binary_file_extension')) |
| 19 | 19 |
| 20 | 20 |
| 21 # Magic number at the beginning of a Butler stream |
| 22 # |
| 23 # See "ProtocolFrameHeaderMagic" in: |
| 24 # <luci-go>/logdog/client/butlerlib/streamproto |
| 25 BUTLER_MAGIC = 'BTLR1\x1e' |
| 26 |
| 27 |
| 21 class StreamParams(_StreamParamsBase): | 28 class StreamParams(_StreamParamsBase): |
| 22 """Defines the set of parameters to apply to a new stream.""" | 29 """Defines the set of parameters to apply to a new stream.""" |
| 23 | 30 |
| 24 # A text content stream. | 31 # A text content stream. |
| 25 TEXT = 'text' | 32 TEXT = 'text' |
| 26 # A binary content stream. | 33 # A binary content stream. |
| 27 BINARY = 'binary' | 34 BINARY = 'binary' |
| 28 # A datagram content stream. | 35 # A datagram content stream. |
| 29 DATAGRAM = 'datagram' | 36 DATAGRAM = 'datagram' |
| 30 | 37 |
| (...skipping 184 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 215 params (StreamParams): The parameters to use with the new connection. | 222 params (StreamParams): The parameters to use with the new connection. |
| 216 | 223 |
| 217 Raises: | 224 Raises: |
| 218 ValueError if the stream name has already been used, or if the parameters | 225 ValueError if the stream name has already been used, or if the parameters |
| 219 are not valid. | 226 are not valid. |
| 220 """ | 227 """ |
| 221 self._register_new_stream(params.name) | 228 self._register_new_stream(params.name) |
| 222 params_json = params.to_json() | 229 params_json = params.to_json() |
| 223 | 230 |
| 224 fd = self._connect_raw() | 231 fd = self._connect_raw() |
| 232 fd.write(BUTLER_MAGIC) |
| 225 varint.write_uvarint(fd, len(params_json)) | 233 varint.write_uvarint(fd, len(params_json)) |
| 226 fd.write(params_json) | 234 fd.write(params_json) |
| 227 return fd | 235 return fd |
| 228 | 236 |
| 229 @contextlib.contextmanager | 237 @contextlib.contextmanager |
| 230 def text(self, name, **kwargs): | 238 def text(self, name, **kwargs): |
| 231 """Context manager to create, use, and teardown a TEXT stream. | 239 """Context manager to create, use, and teardown a TEXT stream. |
| 232 | 240 |
| 233 This context manager creates a new butler TEXT stream with the specified | 241 This context manager creates a new butler TEXT stream with the specified |
| 234 parameters, yields it, and closes it on teardown. | 242 parameters, yields it, and closes it on teardown. |
| (...skipping 169 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 404 def _connect_raw(self): | 412 def _connect_raw(self): |
| 405 return open(self._name, 'wb') | 413 return open(self._name, 'wb') |
| 406 | 414 |
| 407 _default_registry.register_protocol('net.pipe', _NamedPipeStreamClient) | 415 _default_registry.register_protocol('net.pipe', _NamedPipeStreamClient) |
| 408 | 416 |
| 409 | 417 |
| 410 class _UnixDomainSocketStreamClient(StreamClient): | 418 class _UnixDomainSocketStreamClient(StreamClient): |
| 411 """A StreamClient implementation that uses a UNIX domain socket. | 419 """A StreamClient implementation that uses a UNIX domain socket. |
| 412 """ | 420 """ |
| 413 | 421 |
| 422 class SocketFile(object): |
| 423 """A write-only file-like object that writes to a UNIX socket.""" |
| 424 |
| 425 def __init__(self, fd): |
| 426 self._fd = fd |
| 427 |
| 428 def write(self, data): |
| 429 self._fd.send(data) |
| 430 |
| 431 def close(self): |
| 432 self._fd.close() |
| 433 |
| 434 |
| 414 def __init__(self, path): | 435 def __init__(self, path): |
| 415 """Initializes a new UNIX domain socket stream client. | 436 """Initializes a new UNIX domain socket stream client. |
| 416 | 437 |
| 417 Args: | 438 Args: |
| 418 path (str): The path to the named UNIX domain socket. | 439 path (str): The path to the named UNIX domain socket. |
| 419 """ | 440 """ |
| 420 super(_UnixDomainSocketStreamClient, self).__init__() | 441 super(_UnixDomainSocketStreamClient, self).__init__() |
| 421 self._path = path | 442 self._path = path |
| 422 | 443 |
| 423 @classmethod | 444 @classmethod |
| 424 def _create(cls, value): | 445 def _create(cls, value): |
| 425 if not os.path.exists(value): | 446 if not os.path.exists(value): |
| 426 raise ValueError('UNIX domain socket [%s] does not exist.' % (value,)) | 447 raise ValueError('UNIX domain socket [%s] does not exist.' % (value,)) |
| 427 return cls(value) | 448 return cls(value) |
| 428 | 449 |
| 429 def _connect_raw(self): | 450 def _connect_raw(self): |
| 430 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | 451 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
| 431 sock.connect(self._path) | 452 sock.connect(self._path) |
| 432 return sock | 453 return self.SocketFile(sock) |
| 433 | 454 |
| 434 _default_registry.register_protocol('unix', _UnixDomainSocketStreamClient) | 455 _default_registry.register_protocol('unix', _UnixDomainSocketStreamClient) |
| OLD | NEW |