Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(17)

Side by Side Diff: client/libs/logdog/stream.py

Issue 1961603002: Add LogDog Python client library. (Closed) Base URL: https://github.com/luci/luci-py@master
Patch Set: Updated w/ comments. Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2016 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed by the Apache v2.0 license that can be
3 # found in the LICENSE file.
4
5 import collections
6 import contextlib
7 import os
8 import json
nodir 2016/05/12 21:29:57 Sort
dnj (Google) 2016/05/16 16:11:12 Done.
9 import socket
10 import sys
11 import threading
12 import types
13
14 from client.libs.logdog import streamname, varint
15
16
17 _StreamParamsBase = collections.namedtuple('_StreamParamsBase',
18 ('name', 'type', 'content_type', 'tags', 'tee', 'binary_file_extension'))
19
20
21 class StreamParams(_StreamParamsBase):
22 """Defines the set of parameters to apply to a new stream.
23 """
nodir 2016/05/12 21:29:57 Join lines for consistency? here and below
dnj (Google) 2016/05/16 16:11:12 Done.
24
25 # A text content stream.
26 TEXT = 'text'
27 # A binary content stream.
28 BINARY = 'binary'
29 # A datagram content stream.
30 DATAGRAM = 'datagram'
31
32 # Tee parameter to tee this stream through the Butler's STDOUT.
33 TEE_STDOUT = 'stdout'
34 # Tee parameter to tee this stream through the Butler's STDERR.
35 TEE_STDERR = 'stderr'
36
37 @classmethod
38 def make(cls, **kwargs):
39 """Returns (StreamParams): A new StreamParams instance with supplied values.
40
41 Any parameter that isn't supplied will be set to None.
42
43 Args:
44 kwargs (dict): Named parameters to apply.
45 """
46 return cls(**{f: kwargs.get(f) for f in cls._fields})
47
48 def validate(self):
49 """Raises (ValueError): if the parameters are not valid."""
50 streamname.validate_stream_name(self.name)
51
52 if self.type not in (self.TEXT, self.BINARY, self.DATAGRAM):
53 raise ValueError('Invalid type (%s)' % (self.type,))
54
55 if self.tags is not None:
56 if not isinstance(self.tags, collections.Mapping):
57 raise ValueError('Invalid tags type (%s)' % (self.tags,))
58 for k, v in self.tags.iteritems():
59 streamname.validate_tag(k, v)
60
61 if self.tee not in (None, self.TEE_STDOUT, self.TEE_STDERR):
62 raise ValueError('Invalid tee type (%s)' % (self.tee,))
63
64 if not isinstance(self.binary_file_extension,
65 (types.NoneType, types.StringTypes)):
66 raise ValueError('Invalid binary file extension type (%s)' % (
67 self.binary_file_extension,))
68
69 def to_json(self):
70 """Returns (str): The JSON representation of the StreamParams.
71
72 Converts stream parameters to JSON for Butler consumption.
73
74 Raises:
75 ValueError: if these parameters are not valid.
76 """
77 self.validate()
78
79 obj = {
80 'name': self.name,
81 'type': self.type,
82 }
83
84 def maybe_add(key, value):
85 if value is not None:
86 obj[key] = value
87 maybe_add('contentType', self.content_type)
88 maybe_add('tags', self.tags)
89 maybe_add('tee', self.tee)
90 maybe_add('binaryFileExtension', self.binary_file_extension)
91
92 # Note that "dumps' will dump UTF-8 by default, which is what Butler wants.
93 return json.dumps(obj, sort_keys=True, ensure_ascii=True, indent=None)
94
95
96 class StreamProtocolRegistry(object):
97 """Registry of streamserver URI protocols and their client classes.
98 """
99
100 def __init__(self):
101 self._registry = {}
102
103 def register_protocol(self, protocol, client_cls):
104 assert issubclass(client_cls, StreamClient)
105 if self._registry.get(protocol) is not None:
106 raise KeyError('Duplicate protocol registered.')
107 self._registry[protocol] = client_cls
108
109 def create(self, uri):
110 uri = uri.split(':', 1)
111 if len(uri) != 2:
112 raise ValueError('Invalid stream server URI [%s]' % (uri,))
113 protocol, value = uri
114
115 client_cls = self._registry.get(protocol)
116 if not client_cls:
117 raise ValueError('Unknown stream client protocol (%s)' % (protocol,))
118 return client_cls._create(value)
119
120 # Default (global) registry.
121 _default_registry = StreamProtocolRegistry()
122
123
124 def create(uri):
125 """Returns (StreamClient): A stream client for the specified URI.
126
127 This uses the default StreamProtocolRegistry to instantiate a StreamClient
128 for the specified URI.
129
130 Args:
131 uri: The streamserver URI.
132
133 Raises:
134 ValueError if the supplied URI references an invalid or improperly
135 configured streamserver.
136 """
137 return _default_registry.create(uri)
138
139
140 class StreamClient(object):
141 """Abstract base class for a streamserver client.
142 """
143
144 class _DatagramStream(object):
145 """Wraps a stream object to write length-prefixed datagrams."""
146
147 def __init__(self, fd):
148 self._fd = fd
149
150 def send(self, data):
151 varint.write_uvarint(self._fd, len(data))
152 self._fd.write(data)
153
154 def close(self):
155 return self._fd.close()
156
157 def __init__(self):
158 self._name_lock = threading.Lock()
159 self._names = set()
160
161 def _register_new_stream(self, name):
162 """Registers a new stream name.
163
164 The Butler will internally reject any duplicate stream names. However, there
165 isn't really feedback when this happens except a closed stream client. This
166 is a client-side check to provide a more user-friendly experience in the
167 event that a user attempts to register a duplicate stream name.
168
169 Note that this is imperfect, as something else could register stream names
170 with the same Butler instance and this library has no means of tracking.
171 This is a best-effort experience, not a reliable check.
172
173 Args:
174 name (str): The name of the stream.
175
176 Raises:
177 ValueError if the stream name has already been registered.
178 """
179 with self._name_lock:
180 if name in self._names:
181 raise ValueError("Duplicate stream name [%s]" % (name,))
182 self._names.add(name)
183
184 @classmethod
185 def _create(cls, value):
186 """Returns (StreamClient): A new stream client connection.
187
188 Validates the streamserver parameters and creates a new StreamClient
189 instance that connects to them.
190
191 Implementing classes must override this.
192 """
193 raise NotImplementedError()
194
195 def _connect_raw(self):
196 """Returns (file): A new file-like stream.
197
198 Creates a new raw connection to the streamserver. This connection MUST not
199 have any data written to it past initialization (if needed) when it has been
200 returned.
201
202 The file-like object must implement `write` and `close`.
203
204 Implementing classes must override this.
205 """
206 raise NotImplementedError()
207
208 def new_connection(self, params):
nodir 2016/05/12 21:29:57 Document type of |params|
dnj (Google) 2016/05/16 16:11:11 Done.
209 """Returns (file): A new configured stream.
210
211 The returned object implements (minimally) `write` and `close`.
212
213 Creates a new LogDog stream with the specified parameters.
214
215 Raises:
216 ValueError if the stream name has already been used.
217 """
218 self._register_new_stream(params.name)
219 params_json = params.to_json()
220
221 fd = self._connect_raw()
222 varint.write_uvarint(fd, len(params_json))
223 fd.write(params_json)
224 return fd
225
226 @contextlib.contextmanager
227 def text(self, name, **kwargs):
228 """Context manager to create, use, and teardown a TEXT stream.
229
230 This context manager creates a new butler TEXT stream with the specified
231 parameters, yields it, and closes it on teardown.
232
233 Args:
234 name (str): the LogDog name of the stream.
235 kwargs (dict): Log stream parameters. These may be any keyword arguments
236 accepted by `open_text`.
237
238 Returns (file): A file-like object to a Butler UTF-8 text stream supporting
239 `write`.
240 """
241 fd = None
242 try:
243 fd = self.open_text(name, **kwargs)
244 yield fd
245 finally:
246 if fd is not None:
247 fd.close()
248
249 def open_text(self, name, content_type=None, tags=None, tee=None,
250 binary_file_extension=None):
251 """Returns (file): A file-like object for a single text stream.
252
253 This creates a new butler TEXT stream with the specified parameters.
254
255 Args:
256 name (str): the LogDog name of the stream.
257 content_type (str): The optional content type of the stream. If None, a
258 default content type will be chosen by the Butler.
259 tags (dict): An optional key/value dictionary pair of LogDog stream tags.
260 tee (str): Describes how stream data should be tee'd through the Butler.
261 One of StreamParams' TEE arguments.
262 binary_file_extension (str): A custom binary file extension. If not
263 provided, a default extension may be chosen or the binary stream may
264 not be emitted.
265
266 Returns (file): A file-like object to a Butler text stream. This object can
267 have UTF-8 text content written to it with its `write` method, and must
268 be closed when finished using its `close` method.
269 """
270 params = StreamParams.make(
271 name=name,
272 type=StreamParams.TEXT,
273 content_type=content_type,
274 tags=tags,
275 tee=tee,
276 binary_file_extension=binary_file_extension)
277 return self.new_connection(params)
278
279 @contextlib.contextmanager
280 def binary(self, name, **kwargs):
281 """Context manager to create, use, and teardown a BINARY stream.
282
283 This context manager creates a new butler BINARY stream with the specified
284 parameters, yields it, and closes it on teardown.
285
286 Args:
287 name (str): the LogDog name of the stream.
288 kwargs (dict): Log stream parameters. These may be any keyword arguments
289 accepted by `open_binary`.
290
291 Returns (file): A file-like object to a Butler binary stream supporting
292 `write`.
293 """
294 fd = None
295 try:
296 fd = self.open_binary(name, **kwargs)
297 yield fd
298 finally:
299 if fd is not None:
300 fd.close()
301
302 def open_binary(self, name, content_type=None, tags=None, tee=None,
303 binary_file_extension=None):
304 """Returns (file): A file-like object for a single binary stream.
305
306 This creates a new butler BINARY stream with the specified parameters.
307
308 Args:
309 name (str): the LogDog name of the stream.
310 content_type (str): The optional content type of the stream. If None, a
311 default content type will be chosen by the Butler.
312 tags (dict): An optional key/value dictionary pair of LogDog stream tags.
313 tee (str): Describes how stream data should be tee'd through the Butler.
314 One of StreamParams' TEE arguments.
315 binary_file_extension (str): A custom binary file extension. If not
316 provided, a default extension may be chosen or the binary stream may
317 not be emitted.
318
319 Returns (file): A file-like object to a Butler binary stream. This object
320 can have UTF-8 content written to it with its `write` method, and must
321 be closed when finished using its `close` method.
322 """
323 params = StreamParams.make(
324 name=name,
325 type=StreamParams.BINARY,
326 content_type=content_type,
327 tags=tags,
328 tee=tee,
329 binary_file_extension=binary_file_extension)
330 return self.new_connection(params)
331
332 @contextlib.contextmanager
333 def datagram(self, name, **kwargs):
334 """Context manager to create, use, and teardown a DATAGRAM stream.
335
336 This context manager creates a new butler DATAAGRAM stream with the
337 specified parameters, yields it, and closes it on teardown.
338
339 Args:
340 name (str): the LogDog name of the stream.
341 kwargs (dict): Log stream parameters. These may be any keyword arguments
342 accepted by `open_datagram`.
343
344 Returns (_DatagramStream): A datagram stream object. Datagrams can be
345 written to it using its `send` method.
346 """
347 fd = None
348 try:
349 fd = self.open_datagram(name, **kwargs)
350 yield fd
351 finally:
352 if fd is not None:
353 fd.close()
354
355 def open_datagram(self, name, content_type=None, tags=None, tee=None,
356 binary_file_extension=None):
357 """Creates a new butler DATAGRAM stream with the specified parameters.
358
359 Args:
360 name (str): the LogDog name of the stream.
361 content_type (str): The optional content type of the stream. If None, a
362 default content type will be chosen by the Butler.
363 tags (dict): An optional key/value dictionary pair of LogDog stream tags.
364 tee (str): Describes how stream data should be tee'd through the Butler.
365 One of StreamParams' TEE arguments.
366 binary_file_extension (str): A custom binary file extension. If not
367 provided, a default extension may be chosen or the binary stream may
368 not be emitted.
369
370 Returns (_DatagramStream): A datagram stream object. Datagrams can be
371 written to it using its `send` method. This object must be closed when
372 finished by using its `close` method.
373 """
374 params = StreamParams.make(
375 name=name,
376 type=StreamParams.DATAGRAM,
377 content_type=content_type,
378 tags=tags,
379 tee=tee,
380 binary_file_extension=binary_file_extension)
381 return self._DatagramStream(self.new_connection(params))
382
383
384 class _NamedPipeStreamClient(StreamClient):
385 """A StreamClient implementation that connects to a Windows named pipe.
386 """
387
388 def __init__(self, name):
389 r"""Initializes a new Windows named pipe stream client.
390
391 Args:
392 name (str): The name of the Windows named pipe to use (e.g., "\\.\name")
393 """
394 super(_NamedPipeStreamClient, self).__init__()
395 self._name = name
396
397 @classmethod
398 def _create(cls, value):
399 return cls(value)
400
401 def _connect_raw(self):
402 return open(self._name, 'wb')
403
404 _default_registry.register_protocol('net.pipe', _NamedPipeStreamClient)
405
406
407 class _UnixDomainSocketStreamClient(StreamClient):
408 """A StreamClient implementation that uses a UNIX domain socket.
409 """
410
411 def __init__(self, path):
412 """Initializes a new UNIX domain socket stream client.
413
414 Args:
415 path (str): The path to the named UNIX domain socket.
416 """
417 super(_UnixDomainSocketStreamClient, self).__init__()
418 self._path = path
419
420 @classmethod
421 def _create(cls, value):
422 if not os.path.exists(value):
423 raise ValueError('UNIX domain socket [%s] does not exist.' % (value,))
424 return cls(value)
425
426 def _connect_raw(self):
427 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
428 sock.connect(self._path)
429 return sock
430
431 _default_registry.register_protocol('unix', _UnixDomainSocketStreamClient)
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698