Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(284)

Side by Side Diff: client/libs/logdog/stream.py

Issue 1961603002: Add LogDog Python client library. (Closed) Base URL: https://github.com/luci/luci-py@master
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2016 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed by the Apache v2.0 license that can be
3 # found in the LICENSE file.
4
5 import collections
6 import contextlib
7 import os
8 import json
9 import socket
10 import sys
11 import threading
12 import types
13
14 from client.libs.logdog import streamname, varint
15
16
17 _StreamParamsBase = collections.namedtuple('_StreamParamsBase',
18 ('name', 'type', 'content_type', 'tags', 'tee', 'binary_file_extension'))
19
20
21 class StreamParams(_StreamParamsBase):
martiniss 2016/05/10 22:12:38 This could be generated from a protobuf, right? Or
dnj (Google) 2016/05/12 17:54:02 Nope, this is a Go struct, so we need a Python ver
22 """Defines the set of parameters to apply to a new stream.
23 """
24
25 # A text content stream.
26 TEXT = 'text'
27 # A binary content stream.
28 BINARY = 'binary'
29 # A datagram content stream.
30 DATAGRAM = 'datagram'
31
32 # Tee parameter to tee this stream through the Butler's STDOUT.
33 TEE_STDOUT = 'stdout'
34 # Tee parameter to tee this stream through the Butler's STDERR.
35 TEE_STDERR = 'stderr'
36
37 @classmethod
38 def make(cls, **kwargs):
39 """Returns (StreamParams): A new StreamParams instance with supplied values.
40
41 Any parameter that isn't supplied will be set to None.
42
43 Args:
44 kwargs (dict): Named parameters to apply.
45 """
46 return cls(**{f: kwargs.get(f) for f in cls._fields})
47
48 def validate(self):
49 """Raises (ValueError): if the parameters are not valid."""
50 streamname.validate_stream_name(self.name)
51
52 if self.type not in (self.TEXT, self.BINARY, self.DATAGRAM):
53 raise ValueError('Invalid type (%s)' % (self.type,))
54
55 if self.tags is not None:
56 if not isinstance(self.tags, collections.Mapping):
57 raise ValueError('Invalid tags type (%s)' % (self.tags,))
58 for k, v in self.tags.iteritems():
59 streamname.validate_tag(k, v)
60
61 if self.tee not in (None, self.TEE_STDOUT, self.TEE_STDERR):
62 raise ValueError('Invalid tee type (%s)' % (self.tee,))
63
64 if not isinstance(self.binary_file_extension,
65 (types.NoneType, types.StringTypes)):
66 raise ValueError('Invalid binary file extension type (%s)' % (
67 self.binary_file_extension,))
68
69 def to_json(self):
70 """Returns (str): The JSON representation of the StreamParams.
71
72 Converts stream parameters to JSON for Butler consumption.
73
74 Raises:
75 ValueError: if these parameters are not valid.
76 """
77 self.validate()
78
79 obj = {
80 'name': self.name,
81 'type': self.type,
82 }
83
84 def maybe_add(key, value):
85 if value is not None:
86 obj[key] = value
87 maybe_add('contentType', self.content_type)
88 maybe_add('tags', self.tags)
89 maybe_add('tee', self.tee)
90 maybe_add('binaryFileExtension', self.binary_file_extension)
91
92 # Note that "dumps' will dump UTF-8 by default, which is what Butler wants.
93 return json.dumps(obj, sort_keys=True, ensure_ascii=True, indent=None)
94
95
96 class StreamProtocolRegistry(object):
97 """Registry of streamserver URI protocols and their client classes.
98 """
99
100 def __init__(self):
101 self._registry = {}
102
103 def register_protocol(self, protocol, client_cls):
104 assert issubclass(client_cls, StreamClient)
105 if self._registry.get(protocol) is not None:
106 raise KeyError('Duplicate protocol registered.')
107 self._registry[protocol] = client_cls
108
109 def create(self, uri):
110 uri = uri.split(':', 1)
111 if len(uri) != 2:
112 raise ValueError('Invalid stream server URI [%s]' % (uri,))
113 protocol, value = uri
114
115 client_cls = self._registry.get(protocol)
116 if not client_cls:
117 raise ValueError('Unknown stream client protocol (%s)' % (protocol,))
118 return client_cls._create(value)
119
120 # Default (global) registry.
121 _default_registry = StreamProtocolRegistry()
122
123
124 def create(uri):
125 """Returns (StreamClient): A stream client for the specified URI.
126
127 This uses the default StreamProtocolRegistry to instantiate a StreamClient
128 for the specified URI.
129
130 Args:
131 uri: The streamserver URI.
132
133 Raises:
134 ValueError if the supplied URI references an invalid or improperly
135 configured streamserver.
136 """
137 return _default_registry.create(uri)
138
139
140 class StreamClient(object):
141 """Abstract base class for a streamserver client.
142 """
143
144 class _DatagramStream(object):
145 """Wraps a stream object to write length-prefixed datagrams."""
146
147 def __init__(self, fd):
148 self._fd = fd
149
150 def send(self, data):
151 varint.write_uvarint(self._fd, len(data))
152 self._fd.write(data)
153
154 def close(self):
155 return self._fd.close()
156
157 def __init__(self):
158 self._name_lock = threading.Lock()
159 self._names = set()
160
161 def _register_new_stream(self, name):
162 """Registers a new stream name.
163
164 The Butler will internally reject any duplicate stream names. However, there
165 isn't really feedback when this happens except a closed stream client. This
166 is a client-side check to provide a more user-friendly experience in the
167 event that a user attempts to register a duplicate stream name.
168
169 Note that this is imperfect, as somethig else could register stream names
martiniss 2016/05/10 22:12:38 typo
dnj (Google) 2016/05/12 17:54:03 Done.
170 with the same Butler instance and this library has no means of tracking.
171 This is a best-effort experience, not a reliable check.
172
173 Args:
174 name (str): The name of the stream.
175
176 Raises:
177 ValueError if the stream name has already been registered.
178 """
179 with self._name_lock:
180 if name in self._names:
181 raise ValueError("Duplicate stream name [%s]" % (name,))
182 self._names.add(name)
183
184 @classmethod
185 def _create(cls, value):
186 """Returns (StreamClient): A new stream client connection.
187
188 Validates the streamserver parameters and creates a new StreamClient
189 instance that connects to them.
190
191 Implementing classes must override this.
192 """
193 raise NotImplementedError()
194
195 def _connect_raw(self):
196 """Returns (file): A new file-like stream.
197
198 Creates a new raw connection to the streamserver. This connection MUST not
199 have any data written to it past initialization (if needed) when it has been
200 returned.
201
202 The file-like object must implement `write` and `close`.
203
204 Implementing classes must override this.
205 """
206 raise NotImplementedError()
207
208 def new_connection(self, params):
209 """Returns (file): A new configured stream.
210
211 The returned object implements (minimally) `write` and `close`.
212
213 Creates a new LogDog stream with the specified parameters.
214
215 Raises:
216 ValueError if the stream name has already been used.
217 """
218 self._register_new_stream(params.name)
219 params_json = params.to_json()
220
221 fd = self._connect_raw()
222 varint.write_uvarint(fd, len(params_json))
223 fd.write(params_json)
224 return fd
225
226 @contextlib.contextmanager
227 def text(self, name, **kwargs):
228 """Context manager to create, use, and teardown a TEXT stream.
229
230 This context manager creates a new butler TEXT stream with the specified
231 name and parameters, yields it, and closes it on teardown.
232
233 Args:
234 name (str): the LogDog name of the stream.
235 kwargs (dict): Log stream parameters. These may be any keyword arguments
236 accepted by `open_text`.
237
238 Returns (file): A file-like object to a Butler UTF-8 text stream supporting
239 `write`.
240 """
241 fd = None
242 try:
243 fd = self.open_text(name, **kwargs)
244 yield fd
245 finally:
246 if fd is not None:
247 fd.close()
248
249 def open_text(self, name, content_type=None, tags=None, tee=None,
250 binary_file_extension=None):
251 """Returns (file): A file-like object for a single text stream.
252
253 This creates a new butler TEXT stream with the specified name and
254 parameters.
255
256 Args:
257 name (str): the LogDog name of the stream.
258 content_type (str): The optional content type of the stream. If None, a
259 default content type will be chosen by the Butler.
260 tags (dict): An optional key/value dictionary pair of LogDog stream tags.
261 tee (str): Describes how stream data should be tee'd through the Butler.
262 One of StreamParams' TEE arguments.
263 binary_file_extension (str): A custom binary file extension. If not
264 provided, a default extension may be chosen or the binary stream may
265 not be emitted.
266
267 Returns (file): A file-like object to a Butler text stream. This object can
268 have UTF-8 text content written to it with its `write` method, and must
269 be closed when finished using its `close` method.
270 """
271 params = StreamParams.make(
272 name=name,
273 type=StreamParams.TEXT,
274 content_type=content_type,
275 tags=tags,
276 tee=tee,
277 binary_file_extension=binary_file_extension)
278 return self.new_connection(params)
279
280 @contextlib.contextmanager
281 def binary(self, name, **kwargs):
282 """Context manager to create, use, and teardown a BINARY stream.
283
284 This context manager creates a new butler BINARY stream with the specified
285 name and parameters, yields it, and closes it on teardown.
286
287 Args:
288 name (str): the LogDog name of the stream.
289 kwargs (dict): Log stream parameters. These may be any keyword arguments
290 accepted by `open_binary`.
291
292 Returns (file): A file-like object to a Butler binary stream supporting
293 `write`.
294 """
295 fd = None
296 try:
297 fd = self.open_binary(name, **kwargs)
298 yield fd
299 finally:
300 if fd is not None:
301 fd.close()
302
303 def open_binary(self, name, content_type=None, tags=None, tee=None,
304 binary_file_extension=None):
305 """Returns (file): A file-like object for a single binary stream.
306
307 This creates a new butler BINARY stream with the specified name and
308 parameters.
309
310 Args:
311 name (str): the LogDog name of the stream.
312 content_type (str): The optional content type of the stream. If None, a
313 default content type will be chosen by the Butler.
314 tags (dict): An optional key/value dictionary pair of LogDog stream tags.
315 tee (str): Describes how stream data should be tee'd through the Butler.
316 One of StreamParams' TEE arguments.
317 binary_file_extension (str): A custom binary file extension. If not
318 provided, a default extension may be chosen or the binary stream may
319 not be emitted.
320
321 Returns (file): A file-like object to a Butler binary stream. This object
322 can have UTF-8 content written to it with its `write` method, and must
323 be closed when finished using its `close` method.
324 """
325 params = StreamParams.make(
326 name=name,
327 type=StreamParams.BINARY,
328 content_type=content_type,
329 tags=tags,
330 tee=tee,
331 binary_file_extension=binary_file_extension)
332 return self.new_connection(params)
333
334 @contextlib.contextmanager
335 def datagram(self, name, **kwargs):
336 """Context manager to create, use, and teardown a DATAGRAM stream.
337
338 This context manager creates a new butler DATAAGRAM stream with the
339 specified name and parameters, yields it, and closes it on teardown.
340
341 Args:
342 name (str): the LogDog name of the stream.
343 kwargs (dict): Log stream parameters. These may be any keyword arguments
344 accepted by `open_datagram`.
345
346 Returns (_DatagramStream): A datagram stream object. Datagrams can be
347 written to it using its `send` method.
348 """
349 fd = None
350 try:
351 fd = self.open_datagram(name, **kwargs)
352 yield fd
353 finally:
354 if fd is not None:
355 fd.close()
356
357 def open_datagram(self, name, content_type=None, tags=None, tee=None,
358 binary_file_extension=None):
359 """Returns: (file): A file-like object for a single text stream.
martiniss 2016/05/10 22:12:38 Is this correct? I think this was copy pasted.
dnj (Google) 2016/05/12 17:54:02 Yep was copy/pasted, cleaned that up.
360
361 This creates a new butler TEXT stream with the specified name and
362 parameters.
363
364 Args:
365 name (str): the LogDog name of the stream.
366 content_type (str): The optional content type of the stream. If None, a
367 default content type will be chosen by the Butler.
368 tags (dict): An optional key/value dictionary pair of LogDog stream tags.
369 tee (str): Describes how stream data should be tee'd through the Butler.
370 One of StreamParams' TEE arguments.
371 binary_file_extension (str): A custom binary file extension. If not
372 provided, a default extension may be chosen or the binary stream may
373 not be emitted.
374
375 Returns (_DatagramStream): A datagram stream object. Datagrams can be
376 written to it using its `send` method. This object must be closed when
377 finished by using its `close` method.
378 """
379 params = StreamParams.make(
380 name=name,
381 type=StreamParams.DATAGRAM,
382 content_type=content_type,
383 tags=tags,
384 tee=tee,
385 binary_file_extension=binary_file_extension)
386 return self._DatagramStream(self.new_connection(params))
387
388
389 class _NamedPipeStreamClient(StreamClient):
390 """A StreamClient implementation that connects to a Windows named pipe.
martiniss 2016/05/10 22:12:38 Windows? Why does line 394 say UNIX? Bad copy-past
dnj (Google) 2016/05/12 17:54:02 Was copy/pasted.
391 """
392
393 def __init__(self, name):
394 r"""Initializes a new UNIX domain socket stream client.
395
396 Args:
397 name (str): The name of the Windows named pipe to use (e.g., "\\.\name")
398 """
399 super(_NamedPipeStreamClient, self).__init__()
400 self._name = name
401
402 @classmethod
403 def _create(cls, value):
404 return cls(value)
405
406 def _connect_raw(self):
407 return open(self._name, 'wb')
408
409 _default_registry.register_protocol('net.pipe', _NamedPipeStreamClient)
410
411
412 class _UnixDomainSocketStreamClient(StreamClient):
413 """A StreamClient implementation that uses a UNIX domain socket.
414 """
415
416 def __init__(self, path):
417 """Initializes a new UNIX domain socket stream client.
418
419 Args:
420 path (str): The path to the named UNIX domain socket.
421 """
422 super(_UnixDomainSocketStreamClient, self).__init__()
423 self._path = path
424
425 @classmethod
426 def _create(cls, value):
427 if not os.path.exists(value):
428 raise ValueError('UNIX domain socket [%s] does not exist.' % (value,))
429 return cls(value)
430
431 def _connect_raw(self):
432 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
433 sock.connect(self._path)
434 return sock
435
436 _default_registry.register_protocol('unix', _UnixDomainSocketStreamClient)
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698