Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(713)

Side by Side Diff: client/libs/logdog/stream.py

Issue 2251413004: LogDog: Properly emit Butler stream magic. (Closed) Base URL: https://github.com/luci/luci-py@master
Patch Set: Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | client/libs/logdog/tests/stream_test.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # Copyright 2016 The LUCI Authors. All rights reserved. 1 # Copyright 2016 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file. 3 # that can be found in the LICENSE file.
4 4
5 import collections 5 import collections
6 import contextlib 6 import contextlib
7 import json 7 import json
8 import os 8 import os
9 import socket 9 import socket
10 import sys 10 import sys
11 import threading 11 import threading
12 import types 12 import types
13 13
14 from libs.logdog import streamname, varint 14 from libs.logdog import streamname, varint
15 15
16 16
17 _StreamParamsBase = collections.namedtuple('_StreamParamsBase', 17 _StreamParamsBase = collections.namedtuple('_StreamParamsBase',
18 ('name', 'type', 'content_type', 'tags', 'tee', 'binary_file_extension')) 18 ('name', 'type', 'content_type', 'tags', 'tee', 'binary_file_extension'))
19 19
20 20
21 # Magic number at the beginning of a Butler stream
22 #
23 # See "ProtocolFrameHeaderMagic" in:
24 # <luci-go>/logdog/client/butlerlib/streamproto
25 BUTLER_MAGIC = 'BTLR1\x1e'
26
27
21 class StreamParams(_StreamParamsBase): 28 class StreamParams(_StreamParamsBase):
22 """Defines the set of parameters to apply to a new stream.""" 29 """Defines the set of parameters to apply to a new stream."""
23 30
24 # A text content stream. 31 # A text content stream.
25 TEXT = 'text' 32 TEXT = 'text'
26 # A binary content stream. 33 # A binary content stream.
27 BINARY = 'binary' 34 BINARY = 'binary'
28 # A datagram content stream. 35 # A datagram content stream.
29 DATAGRAM = 'datagram' 36 DATAGRAM = 'datagram'
30 37
(...skipping 184 matching lines...) Expand 10 before | Expand all | Expand 10 after
215 params (StreamParams): The parameters to use with the new connection. 222 params (StreamParams): The parameters to use with the new connection.
216 223
217 Raises: 224 Raises:
218 ValueError if the stream name has already been used, or if the parameters 225 ValueError if the stream name has already been used, or if the parameters
219 are not valid. 226 are not valid.
220 """ 227 """
221 self._register_new_stream(params.name) 228 self._register_new_stream(params.name)
222 params_json = params.to_json() 229 params_json = params.to_json()
223 230
224 fd = self._connect_raw() 231 fd = self._connect_raw()
232 fd.write(BUTLER_MAGIC)
225 varint.write_uvarint(fd, len(params_json)) 233 varint.write_uvarint(fd, len(params_json))
226 fd.write(params_json) 234 fd.write(params_json)
227 return fd 235 return fd
228 236
229 @contextlib.contextmanager 237 @contextlib.contextmanager
230 def text(self, name, **kwargs): 238 def text(self, name, **kwargs):
231 """Context manager to create, use, and teardown a TEXT stream. 239 """Context manager to create, use, and teardown a TEXT stream.
232 240
233 This context manager creates a new butler TEXT stream with the specified 241 This context manager creates a new butler TEXT stream with the specified
234 parameters, yields it, and closes it on teardown. 242 parameters, yields it, and closes it on teardown.
(...skipping 169 matching lines...) Expand 10 before | Expand all | Expand 10 after
404 def _connect_raw(self): 412 def _connect_raw(self):
405 return open(self._name, 'wb') 413 return open(self._name, 'wb')
406 414
407 _default_registry.register_protocol('net.pipe', _NamedPipeStreamClient) 415 _default_registry.register_protocol('net.pipe', _NamedPipeStreamClient)
408 416
409 417
410 class _UnixDomainSocketStreamClient(StreamClient): 418 class _UnixDomainSocketStreamClient(StreamClient):
411 """A StreamClient implementation that uses a UNIX domain socket. 419 """A StreamClient implementation that uses a UNIX domain socket.
412 """ 420 """
413 421
422 class SocketFile(object):
423 """A write-only file-like object that writes to a UNIX socket."""
424
425 def __init__(self, fd):
426 self._fd = fd
427
428 def write(self, data):
429 self._fd.send(data)
430
431 def close(self):
432 self._fd.close()
433
434
414 def __init__(self, path): 435 def __init__(self, path):
415 """Initializes a new UNIX domain socket stream client. 436 """Initializes a new UNIX domain socket stream client.
416 437
417 Args: 438 Args:
418 path (str): The path to the named UNIX domain socket. 439 path (str): The path to the named UNIX domain socket.
419 """ 440 """
420 super(_UnixDomainSocketStreamClient, self).__init__() 441 super(_UnixDomainSocketStreamClient, self).__init__()
421 self._path = path 442 self._path = path
422 443
423 @classmethod 444 @classmethod
424 def _create(cls, value): 445 def _create(cls, value):
425 if not os.path.exists(value): 446 if not os.path.exists(value):
426 raise ValueError('UNIX domain socket [%s] does not exist.' % (value,)) 447 raise ValueError('UNIX domain socket [%s] does not exist.' % (value,))
427 return cls(value) 448 return cls(value)
428 449
429 def _connect_raw(self): 450 def _connect_raw(self):
430 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 451 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
431 sock.connect(self._path) 452 sock.connect(self._path)
432 return sock 453 return self.SocketFile(sock)
433 454
434 _default_registry.register_protocol('unix', _UnixDomainSocketStreamClient) 455 _default_registry.register_protocol('unix', _UnixDomainSocketStreamClient)
OLDNEW
« no previous file with comments | « no previous file | client/libs/logdog/tests/stream_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698