OLD | NEW |
1 # Copyright 2016 The LUCI Authors. All rights reserved. | 1 # Copyright 2016 The LUCI Authors. All rights reserved. |
2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
4 | 4 |
5 import collections | 5 import collections |
6 import contextlib | 6 import contextlib |
7 import json | 7 import json |
8 import os | 8 import os |
9 import socket | 9 import socket |
10 import sys | 10 import sys |
11 import threading | 11 import threading |
12 import types | 12 import types |
13 | 13 |
14 from libs.logdog import streamname, varint | 14 from libs.logdog import streamname, varint |
15 | 15 |
16 | 16 |
17 _StreamParamsBase = collections.namedtuple('_StreamParamsBase', | 17 _StreamParamsBase = collections.namedtuple('_StreamParamsBase', |
18 ('name', 'type', 'content_type', 'tags', 'tee', 'binary_file_extension')) | 18 ('name', 'type', 'content_type', 'tags', 'tee', 'binary_file_extension')) |
19 | 19 |
20 | 20 |
| 21 # Magic number at the beginning of a Butler stream |
| 22 # |
| 23 # See "ProtocolFrameHeaderMagic" in: |
| 24 # <luci-go>/logdog/client/butlerlib/streamproto |
| 25 BUTLER_MAGIC = 'BTLR1\x1e' |
| 26 |
| 27 |
21 class StreamParams(_StreamParamsBase): | 28 class StreamParams(_StreamParamsBase): |
22 """Defines the set of parameters to apply to a new stream.""" | 29 """Defines the set of parameters to apply to a new stream.""" |
23 | 30 |
24 # A text content stream. | 31 # A text content stream. |
25 TEXT = 'text' | 32 TEXT = 'text' |
26 # A binary content stream. | 33 # A binary content stream. |
27 BINARY = 'binary' | 34 BINARY = 'binary' |
28 # A datagram content stream. | 35 # A datagram content stream. |
29 DATAGRAM = 'datagram' | 36 DATAGRAM = 'datagram' |
30 | 37 |
(...skipping 184 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
215 params (StreamParams): The parameters to use with the new connection. | 222 params (StreamParams): The parameters to use with the new connection. |
216 | 223 |
217 Raises: | 224 Raises: |
218 ValueError if the stream name has already been used, or if the parameters | 225 ValueError if the stream name has already been used, or if the parameters |
219 are not valid. | 226 are not valid. |
220 """ | 227 """ |
221 self._register_new_stream(params.name) | 228 self._register_new_stream(params.name) |
222 params_json = params.to_json() | 229 params_json = params.to_json() |
223 | 230 |
224 fd = self._connect_raw() | 231 fd = self._connect_raw() |
| 232 fd.write(BUTLER_MAGIC) |
225 varint.write_uvarint(fd, len(params_json)) | 233 varint.write_uvarint(fd, len(params_json)) |
226 fd.write(params_json) | 234 fd.write(params_json) |
227 return fd | 235 return fd |
228 | 236 |
229 @contextlib.contextmanager | 237 @contextlib.contextmanager |
230 def text(self, name, **kwargs): | 238 def text(self, name, **kwargs): |
231 """Context manager to create, use, and teardown a TEXT stream. | 239 """Context manager to create, use, and teardown a TEXT stream. |
232 | 240 |
233 This context manager creates a new butler TEXT stream with the specified | 241 This context manager creates a new butler TEXT stream with the specified |
234 parameters, yields it, and closes it on teardown. | 242 parameters, yields it, and closes it on teardown. |
(...skipping 169 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
404 def _connect_raw(self): | 412 def _connect_raw(self): |
405 return open(self._name, 'wb') | 413 return open(self._name, 'wb') |
406 | 414 |
407 _default_registry.register_protocol('net.pipe', _NamedPipeStreamClient) | 415 _default_registry.register_protocol('net.pipe', _NamedPipeStreamClient) |
408 | 416 |
409 | 417 |
410 class _UnixDomainSocketStreamClient(StreamClient): | 418 class _UnixDomainSocketStreamClient(StreamClient): |
411 """A StreamClient implementation that uses a UNIX domain socket. | 419 """A StreamClient implementation that uses a UNIX domain socket. |
412 """ | 420 """ |
413 | 421 |
| 422 class SocketFile(object): |
| 423 """A write-only file-like object that writes to a UNIX socket.""" |
| 424 |
| 425 def __init__(self, fd): |
| 426 self._fd = fd |
| 427 |
| 428 def write(self, data): |
| 429 self._fd.send(data) |
| 430 |
| 431 def close(self): |
| 432 self._fd.close() |
| 433 |
| 434 |
414 def __init__(self, path): | 435 def __init__(self, path): |
415 """Initializes a new UNIX domain socket stream client. | 436 """Initializes a new UNIX domain socket stream client. |
416 | 437 |
417 Args: | 438 Args: |
418 path (str): The path to the named UNIX domain socket. | 439 path (str): The path to the named UNIX domain socket. |
419 """ | 440 """ |
420 super(_UnixDomainSocketStreamClient, self).__init__() | 441 super(_UnixDomainSocketStreamClient, self).__init__() |
421 self._path = path | 442 self._path = path |
422 | 443 |
423 @classmethod | 444 @classmethod |
424 def _create(cls, value): | 445 def _create(cls, value): |
425 if not os.path.exists(value): | 446 if not os.path.exists(value): |
426 raise ValueError('UNIX domain socket [%s] does not exist.' % (value,)) | 447 raise ValueError('UNIX domain socket [%s] does not exist.' % (value,)) |
427 return cls(value) | 448 return cls(value) |
428 | 449 |
429 def _connect_raw(self): | 450 def _connect_raw(self): |
430 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | 451 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
431 sock.connect(self._path) | 452 sock.connect(self._path) |
432 return sock | 453 return self.SocketFile(sock) |
433 | 454 |
434 _default_registry.register_protocol('unix', _UnixDomainSocketStreamClient) | 455 _default_registry.register_protocol('unix', _UnixDomainSocketStreamClient) |
OLD | NEW |