Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(80)

Side by Side Diff: client/libs/logdog/stream.py

Issue 1961603002: Add LogDog Python client library. (Closed) Base URL: https://github.com/luci/luci-py@master
Patch Set: Remove "client." from package name, make pylint happy. Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « client/libs/logdog/bootstrap.py ('k') | client/libs/logdog/streamname.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 # Copyright 2016 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed by the Apache v2.0 license that can be
3 # found in the LICENSE file.
4
5 import collections
6 import contextlib
7 import json
8 import os
9 import socket
10 import sys
11 import threading
12 import types
13
14 from libs.logdog import streamname, varint
15
16
17 _StreamParamsBase = collections.namedtuple('_StreamParamsBase',
18 ('name', 'type', 'content_type', 'tags', 'tee', 'binary_file_extension'))
19
20
21 class StreamParams(_StreamParamsBase):
22 """Defines the set of parameters to apply to a new stream."""
23
24 # A text content stream.
25 TEXT = 'text'
26 # A binary content stream.
27 BINARY = 'binary'
28 # A datagram content stream.
29 DATAGRAM = 'datagram'
30
31 # Tee parameter to tee this stream through the Butler's STDOUT.
32 TEE_STDOUT = 'stdout'
33 # Tee parameter to tee this stream through the Butler's STDERR.
34 TEE_STDERR = 'stderr'
35
36 @classmethod
37 def make(cls, **kwargs):
38 """Returns (StreamParams): A new StreamParams instance with supplied values.
39
40 Any parameter that isn't supplied will be set to None.
41
42 Args:
43 kwargs (dict): Named parameters to apply.
44 """
45 return cls(**{f: kwargs.get(f) for f in cls._fields})
46
47 def validate(self):
48 """Raises (ValueError): if the parameters are not valid."""
49 streamname.validate_stream_name(self.name)
50
51 if self.type not in (self.TEXT, self.BINARY, self.DATAGRAM):
52 raise ValueError('Invalid type (%s)' % (self.type,))
53
54 if self.tags is not None:
55 if not isinstance(self.tags, collections.Mapping):
56 raise ValueError('Invalid tags type (%s)' % (self.tags,))
57 for k, v in self.tags.iteritems():
58 streamname.validate_tag(k, v)
59
60 if self.tee not in (None, self.TEE_STDOUT, self.TEE_STDERR):
61 raise ValueError('Invalid tee type (%s)' % (self.tee,))
62
63 if not isinstance(self.binary_file_extension,
64 (types.NoneType, types.StringTypes)):
65 raise ValueError('Invalid binary file extension type (%s)' % (
66 self.binary_file_extension,))
67
68 def to_json(self):
69 """Returns (str): The JSON representation of the StreamParams.
70
71 Converts stream parameters to JSON for Butler consumption.
72
73 Raises:
74 ValueError: if these parameters are not valid.
75 """
76 self.validate()
77
78 obj = {
79 'name': self.name,
80 'type': self.type,
81 }
82
83 def maybe_add(key, value):
84 if value is not None:
85 obj[key] = value
86 maybe_add('contentType', self.content_type)
87 maybe_add('tags', self.tags)
88 maybe_add('tee', self.tee)
89 maybe_add('binaryFileExtension', self.binary_file_extension)
90
91 # Note that "dumps' will dump UTF-8 by default, which is what Butler wants.
92 return json.dumps(obj, sort_keys=True, ensure_ascii=True, indent=None)
93
94
95 class StreamProtocolRegistry(object):
96 """Registry of streamserver URI protocols and their client classes.
97 """
98
99 def __init__(self):
100 self._registry = {}
101
102 def register_protocol(self, protocol, client_cls):
103 assert issubclass(client_cls, StreamClient)
104 if self._registry.get(protocol) is not None:
105 raise KeyError('Duplicate protocol registered.')
106 self._registry[protocol] = client_cls
107
108 def create(self, uri):
109 uri = uri.split(':', 1)
110 if len(uri) != 2:
111 raise ValueError('Invalid stream server URI [%s]' % (uri,))
112 protocol, value = uri
113
114 client_cls = self._registry.get(protocol)
115 if not client_cls:
116 raise ValueError('Unknown stream client protocol (%s)' % (protocol,))
117 return client_cls._create(value)
118
119 # Default (global) registry.
120 _default_registry = StreamProtocolRegistry()
121
122
123 def create(uri):
124 """Returns (StreamClient): A stream client for the specified URI.
125
126 This uses the default StreamProtocolRegistry to instantiate a StreamClient
127 for the specified URI.
128
129 Args:
130 uri: The streamserver URI.
131
132 Raises:
133 ValueError if the supplied URI references an invalid or improperly
134 configured streamserver.
135 """
136 return _default_registry.create(uri)
137
138
139 class StreamClient(object):
140 """Abstract base class for a streamserver client.
141 """
142
143 class _DatagramStream(object):
144 """Wraps a stream object to write length-prefixed datagrams."""
145
146 def __init__(self, fd):
147 self._fd = fd
148
149 def send(self, data):
150 varint.write_uvarint(self._fd, len(data))
151 self._fd.write(data)
152
153 def close(self):
154 return self._fd.close()
155
156 def __init__(self):
157 self._name_lock = threading.Lock()
158 self._names = set()
159
160 def _register_new_stream(self, name):
161 """Registers a new stream name.
162
163 The Butler will internally reject any duplicate stream names. However, there
164 isn't really feedback when this happens except a closed stream client. This
165 is a client-side check to provide a more user-friendly experience in the
166 event that a user attempts to register a duplicate stream name.
167
168 Note that this is imperfect, as something else could register stream names
169 with the same Butler instance and this library has no means of tracking.
170 This is a best-effort experience, not a reliable check.
171
172 Args:
173 name (str): The name of the stream.
174
175 Raises:
176 ValueError if the stream name has already been registered.
177 """
178 with self._name_lock:
179 if name in self._names:
180 raise ValueError("Duplicate stream name [%s]" % (name,))
181 self._names.add(name)
182
183 @classmethod
184 def _create(cls, value):
185 """Returns (StreamClient): A new stream client connection.
186
187 Validates the streamserver parameters and creates a new StreamClient
188 instance that connects to them.
189
190 Implementing classes must override this.
191 """
192 raise NotImplementedError()
193
194 def _connect_raw(self):
195 """Returns (file): A new file-like stream.
196
197 Creates a new raw connection to the streamserver. This connection MUST not
198 have any data written to it past initialization (if needed) when it has been
199 returned.
200
201 The file-like object must implement `write` and `close`.
202
203 Implementing classes must override this.
204 """
205 raise NotImplementedError()
206
207 def new_connection(self, params):
208 """Returns (file): A new configured stream.
209
210 The returned object implements (minimally) `write` and `close`.
211
212 Creates a new LogDog stream with the specified parameters.
213
214 Args:
215 params (StreamParams): The parameters to use with the new connection.
216
217 Raises:
218 ValueError if the stream name has already been used, or if the parameters
219 are not valid.
220 """
221 self._register_new_stream(params.name)
222 params_json = params.to_json()
223
224 fd = self._connect_raw()
225 varint.write_uvarint(fd, len(params_json))
226 fd.write(params_json)
227 return fd
228
229 @contextlib.contextmanager
230 def text(self, name, **kwargs):
231 """Context manager to create, use, and teardown a TEXT stream.
232
233 This context manager creates a new butler TEXT stream with the specified
234 parameters, yields it, and closes it on teardown.
235
236 Args:
237 name (str): the LogDog name of the stream.
238 kwargs (dict): Log stream parameters. These may be any keyword arguments
239 accepted by `open_text`.
240
241 Returns (file): A file-like object to a Butler UTF-8 text stream supporting
242 `write`.
243 """
244 fd = None
245 try:
246 fd = self.open_text(name, **kwargs)
247 yield fd
248 finally:
249 if fd is not None:
250 fd.close()
251
252 def open_text(self, name, content_type=None, tags=None, tee=None,
253 binary_file_extension=None):
254 """Returns (file): A file-like object for a single text stream.
255
256 This creates a new butler TEXT stream with the specified parameters.
257
258 Args:
259 name (str): the LogDog name of the stream.
260 content_type (str): The optional content type of the stream. If None, a
261 default content type will be chosen by the Butler.
262 tags (dict): An optional key/value dictionary pair of LogDog stream tags.
263 tee (str): Describes how stream data should be tee'd through the Butler.
264 One of StreamParams' TEE arguments.
265 binary_file_extension (str): A custom binary file extension. If not
266 provided, a default extension may be chosen or the binary stream may
267 not be emitted.
268
269 Returns (file): A file-like object to a Butler text stream. This object can
270 have UTF-8 text content written to it with its `write` method, and must
271 be closed when finished using its `close` method.
272 """
273 params = StreamParams.make(
274 name=name,
275 type=StreamParams.TEXT,
276 content_type=content_type,
277 tags=tags,
278 tee=tee,
279 binary_file_extension=binary_file_extension)
280 return self.new_connection(params)
281
282 @contextlib.contextmanager
283 def binary(self, name, **kwargs):
284 """Context manager to create, use, and teardown a BINARY stream.
285
286 This context manager creates a new butler BINARY stream with the specified
287 parameters, yields it, and closes it on teardown.
288
289 Args:
290 name (str): the LogDog name of the stream.
291 kwargs (dict): Log stream parameters. These may be any keyword arguments
292 accepted by `open_binary`.
293
294 Returns (file): A file-like object to a Butler binary stream supporting
295 `write`.
296 """
297 fd = None
298 try:
299 fd = self.open_binary(name, **kwargs)
300 yield fd
301 finally:
302 if fd is not None:
303 fd.close()
304
305 def open_binary(self, name, content_type=None, tags=None, tee=None,
306 binary_file_extension=None):
307 """Returns (file): A file-like object for a single binary stream.
308
309 This creates a new butler BINARY stream with the specified parameters.
310
311 Args:
312 name (str): the LogDog name of the stream.
313 content_type (str): The optional content type of the stream. If None, a
314 default content type will be chosen by the Butler.
315 tags (dict): An optional key/value dictionary pair of LogDog stream tags.
316 tee (str): Describes how stream data should be tee'd through the Butler.
317 One of StreamParams' TEE arguments.
318 binary_file_extension (str): A custom binary file extension. If not
319 provided, a default extension may be chosen or the binary stream may
320 not be emitted.
321
322 Returns (file): A file-like object to a Butler binary stream. This object
323 can have UTF-8 content written to it with its `write` method, and must
324 be closed when finished using its `close` method.
325 """
326 params = StreamParams.make(
327 name=name,
328 type=StreamParams.BINARY,
329 content_type=content_type,
330 tags=tags,
331 tee=tee,
332 binary_file_extension=binary_file_extension)
333 return self.new_connection(params)
334
335 @contextlib.contextmanager
336 def datagram(self, name, **kwargs):
337 """Context manager to create, use, and teardown a DATAGRAM stream.
338
339 This context manager creates a new butler DATAAGRAM stream with the
340 specified parameters, yields it, and closes it on teardown.
341
342 Args:
343 name (str): the LogDog name of the stream.
344 kwargs (dict): Log stream parameters. These may be any keyword arguments
345 accepted by `open_datagram`.
346
347 Returns (_DatagramStream): A datagram stream object. Datagrams can be
348 written to it using its `send` method.
349 """
350 fd = None
351 try:
352 fd = self.open_datagram(name, **kwargs)
353 yield fd
354 finally:
355 if fd is not None:
356 fd.close()
357
358 def open_datagram(self, name, content_type=None, tags=None, tee=None,
359 binary_file_extension=None):
360 """Creates a new butler DATAGRAM stream with the specified parameters.
361
362 Args:
363 name (str): the LogDog name of the stream.
364 content_type (str): The optional content type of the stream. If None, a
365 default content type will be chosen by the Butler.
366 tags (dict): An optional key/value dictionary pair of LogDog stream tags.
367 tee (str): Describes how stream data should be tee'd through the Butler.
368 One of StreamParams' TEE arguments.
369 binary_file_extension (str): A custom binary file extension. If not
370 provided, a default extension may be chosen or the binary stream may
371 not be emitted.
372
373 Returns (_DatagramStream): A datagram stream object. Datagrams can be
374 written to it using its `send` method. This object must be closed when
375 finished by using its `close` method.
376 """
377 params = StreamParams.make(
378 name=name,
379 type=StreamParams.DATAGRAM,
380 content_type=content_type,
381 tags=tags,
382 tee=tee,
383 binary_file_extension=binary_file_extension)
384 return self._DatagramStream(self.new_connection(params))
385
386
387 class _NamedPipeStreamClient(StreamClient):
388 """A StreamClient implementation that connects to a Windows named pipe.
389 """
390
391 def __init__(self, name):
392 r"""Initializes a new Windows named pipe stream client.
393
394 Args:
395 name (str): The name of the Windows named pipe to use (e.g., "\\.\name")
396 """
397 super(_NamedPipeStreamClient, self).__init__()
398 self._name = name
399
400 @classmethod
401 def _create(cls, value):
402 return cls(value)
403
404 def _connect_raw(self):
405 return open(self._name, 'wb')
406
407 _default_registry.register_protocol('net.pipe', _NamedPipeStreamClient)
408
409
410 class _UnixDomainSocketStreamClient(StreamClient):
411 """A StreamClient implementation that uses a UNIX domain socket.
412 """
413
414 def __init__(self, path):
415 """Initializes a new UNIX domain socket stream client.
416
417 Args:
418 path (str): The path to the named UNIX domain socket.
419 """
420 super(_UnixDomainSocketStreamClient, self).__init__()
421 self._path = path
422
423 @classmethod
424 def _create(cls, value):
425 if not os.path.exists(value):
426 raise ValueError('UNIX domain socket [%s] does not exist.' % (value,))
427 return cls(value)
428
429 def _connect_raw(self):
430 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
431 sock.connect(self._path)
432 return sock
433
434 _default_registry.register_protocol('unix', _UnixDomainSocketStreamClient)
OLDNEW
« no previous file with comments | « client/libs/logdog/bootstrap.py ('k') | client/libs/logdog/streamname.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698