OLD | NEW |
---|---|
1 # Copyright 2016 The LUCI Authors. All rights reserved. | 1 # Copyright 2016 The LUCI Authors. All rights reserved. |
2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
4 | 4 |
5 import collections | 5 import collections |
6 import contextlib | 6 import contextlib |
7 import json | 7 import json |
8 import os | 8 import os |
9 import socket | 9 import socket |
10 import sys | 10 import sys |
(...skipping 94 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
105 | 105 |
106 def __init__(self): | 106 def __init__(self): |
107 self._registry = {} | 107 self._registry = {} |
108 | 108 |
109 def register_protocol(self, protocol, client_cls): | 109 def register_protocol(self, protocol, client_cls): |
110 assert issubclass(client_cls, StreamClient) | 110 assert issubclass(client_cls, StreamClient) |
111 if self._registry.get(protocol) is not None: | 111 if self._registry.get(protocol) is not None: |
112 raise KeyError('Duplicate protocol registered.') | 112 raise KeyError('Duplicate protocol registered.') |
113 self._registry[protocol] = client_cls | 113 self._registry[protocol] = client_cls |
114 | 114 |
115 def create(self, uri): | 115 def create(self, uri, **kwargs): |
116 """Returns (StreamClient): A stream client for the specified URI. | |
117 | |
118 This uses the default StreamProtocolRegistry to instantiate a StreamClient | |
119 for the specified URI. | |
120 | |
121 Args: | |
122 uri (str): The streamserver URI. | |
123 kwargs: keyword arguments to forward to the stream. See | |
124 StreamClient.__init__. | |
125 | |
126 Raises: | |
127 ValueError: if the supplied URI references an invalid or improperly | |
128 configured streamserver. | |
129 """ | |
116 uri = uri.split(':', 1) | 130 uri = uri.split(':', 1) |
117 if len(uri) != 2: | 131 if len(uri) != 2: |
118 raise ValueError('Invalid stream server URI [%s]' % (uri,)) | 132 raise ValueError('Invalid stream server URI [%s]' % (uri,)) |
119 protocol, value = uri | 133 protocol, value = uri |
120 | 134 |
121 client_cls = self._registry.get(protocol) | 135 client_cls = self._registry.get(protocol) |
122 if not client_cls: | 136 if not client_cls: |
123 raise ValueError('Unknown stream client protocol (%s)' % (protocol,)) | 137 raise ValueError('Unknown stream client protocol (%s)' % (protocol,)) |
124 return client_cls._create(value) | 138 return client_cls._create(value, **kwargs) |
139 | |
125 | 140 |
126 # Default (global) registry. | 141 # Default (global) registry. |
127 _default_registry = StreamProtocolRegistry() | 142 _default_registry = StreamProtocolRegistry() |
128 | 143 create = _default_registry.create |
129 | |
130 def create(uri): | |
131 """Returns (StreamClient): A stream client for the specified URI. | |
132 | |
133 This uses the default StreamProtocolRegistry to instantiate a StreamClient | |
134 for the specified URI. | |
135 | |
136 Args: | |
137 uri: The streamserver URI. | |
138 | |
139 Raises: | |
140 ValueError if the supplied URI references an invalid or improperly | |
141 configured streamserver. | |
142 """ | |
143 return _default_registry.create(uri) | |
144 | 144 |
145 | 145 |
146 class StreamClient(object): | 146 class StreamClient(object): |
147 """Abstract base class for a streamserver client. | 147 """Abstract base class for a streamserver client. |
148 """ | 148 """ |
149 | 149 |
150 class _DatagramStream(object): | 150 class _StreamBase(object): |
151 """ABC for StreamClient streams.""" | |
152 | |
153 def __init__(self, stream_client, params): | |
154 self._stream_client = stream_client | |
155 self._params = params | |
156 | |
157 @property | |
158 def params(self): | |
159 """Returns (StreamParams): The stream parameters.""" | |
160 return self._params | |
161 | |
162 @property | |
163 def path(self): | |
jbudorick
2016/10/27 13:24:06
nit: seems a bit odd to have path be a property an
dnj
2016/10/27 22:42:45
Agreed. I don't really like it, but OTOH having a
jbudorick
2016/10/27 23:42:50
sgtm.
| |
164 """Returns (streamname.StreamPath): The stream path. | |
165 | |
166 Raises: | |
167 ValueError: if the stream path is invalid, or if the stream prefix is | |
168 not defined in the client. | |
169 """ | |
170 return self._stream_client.get_stream_path(self._params.name) | |
171 | |
172 def get_viewer_url(self): | |
173 return self._stream_client.get_viewer_url(self._params.name) | |
174 | |
175 | |
176 class _BasicStream(_StreamBase): | |
177 """Wraps a basic file descriptor, offering "write" and "close".""" | |
178 | |
179 def __init__(self, stream_client, params, fd): | |
180 super(StreamClient._BasicStream, self).__init__(stream_client, params) | |
181 self._fd = fd | |
182 | |
183 @property | |
184 def fd(self): | |
185 return self._fd | |
186 | |
187 def write(self, data): | |
188 return self._fd.write(data) | |
189 | |
190 def close(self): | |
191 return self._fd.close() | |
192 | |
193 | |
194 class _DatagramStream(_StreamBase): | |
151 """Wraps a stream object to write length-prefixed datagrams.""" | 195 """Wraps a stream object to write length-prefixed datagrams.""" |
152 | 196 |
153 def __init__(self, fd): | 197 def __init__(self, stream_client, params, fd): |
198 super(StreamClient._DatagramStream, self).__init__(stream_client, params) | |
154 self._fd = fd | 199 self._fd = fd |
155 | 200 |
156 def send(self, data): | 201 def send(self, data): |
157 varint.write_uvarint(self._fd, len(data)) | 202 varint.write_uvarint(self._fd, len(data)) |
158 self._fd.write(data) | 203 self._fd.write(data) |
159 | 204 |
160 def close(self): | 205 def close(self): |
161 return self._fd.close() | 206 return self._fd.close() |
162 | 207 |
163 def __init__(self): | 208 |
209 def __init__(self, prefix=None, coordinator_host=None): | |
210 """Constructs a new base StreamClient instance. | |
211 | |
212 Args: | |
213 prefix (str or None): If not None, the log stream session prefix. | |
214 coordinator_host (str or None): If not None, the name of the Coordinator | |
215 host that this stream client is bound to. This will be used to | |
216 construct viewer URLs for generated streams. | |
217 """ | |
218 self._prefix = prefix | |
219 self._coordinator_host = coordinator_host | |
220 | |
164 self._name_lock = threading.Lock() | 221 self._name_lock = threading.Lock() |
165 self._names = set() | 222 self._names = set() |
166 | 223 |
224 @property | |
225 def prefix(self): | |
226 """Returns (str or None): The stream prefix, or None if not configured.""" | |
227 return self._prefix | |
228 | |
229 @property | |
230 def coordinator_host(self): | |
231 """Returns (str or None): The coordinator host, or None if not configured. | |
232 """ | |
233 return self._coordinator_host | |
234 | |
235 def get_stream_path(self, name): | |
236 """Returns (streamname.StreamPath): The stream path. | |
237 | |
238 Args: | |
239 name (str): The name of the stream. | |
240 | |
241 Raises: | |
242 ValueError: if the stream path is invalid, or if the stream prefix is | |
243 not defined in the client. | |
244 """ | |
245 if not self._prefix: | |
246 raise KeyError('Stream prefix is not configured') | |
247 return streamname.StreamPath.make(self._prefix, name) | |
248 | |
249 def get_viewer_url(self, stream_name): | |
250 """Returns (str): The LogDog viewer URL for the named stream. | |
251 | |
252 Args: | |
253 stream_name (str): The name of the stream. This can also be a query glob. | |
254 | |
255 Raises: | |
256 KeyError: If the prefix and Coordinator host are not configured. | |
257 """ | |
258 if not self._coordinator_host: | |
259 raise KeyError('Coordinator host is not configured') | |
260 | |
261 return streamname.get_logdog_viewer_url( | |
jbudorick
2016/10/27 13:24:06
nit: there's a streamname module and a stream_name
dnj
2016/10/27 22:42:44
Will change to 'name".
| |
262 self._coordinator_host, | |
263 self.get_stream_path(stream_name)) | |
264 | |
167 def _register_new_stream(self, name): | 265 def _register_new_stream(self, name): |
168 """Registers a new stream name. | 266 """Registers a new stream name. |
169 | 267 |
170 The Butler will internally reject any duplicate stream names. However, there | 268 The Butler will internally reject any duplicate stream names. However, there |
171 isn't really feedback when this happens except a closed stream client. This | 269 isn't really feedback when this happens except a closed stream client. This |
172 is a client-side check to provide a more user-friendly experience in the | 270 is a client-side check to provide a more user-friendly experience in the |
173 event that a user attempts to register a duplicate stream name. | 271 event that a user attempts to register a duplicate stream name. |
174 | 272 |
175 Note that this is imperfect, as something else could register stream names | 273 Note that this is imperfect, as something else could register stream names |
176 with the same Butler instance and this library has no means of tracking. | 274 with the same Butler instance and this library has no means of tracking. |
177 This is a best-effort experience, not a reliable check. | 275 This is a best-effort experience, not a reliable check. |
178 | 276 |
179 Args: | 277 Args: |
180 name (str): The name of the stream. | 278 name (str): The name of the stream. |
181 | 279 |
182 Raises: | 280 Raises: |
183 ValueError if the stream name has already been registered. | 281 ValueError if the stream name has already been registered. |
184 """ | 282 """ |
185 with self._name_lock: | 283 with self._name_lock: |
186 if name in self._names: | 284 if name in self._names: |
187 raise ValueError("Duplicate stream name [%s]" % (name,)) | 285 raise ValueError("Duplicate stream name [%s]" % (name,)) |
188 self._names.add(name) | 286 self._names.add(name) |
189 | 287 |
190 @classmethod | 288 @classmethod |
191 def _create(cls, value): | 289 def _create(cls, value, **kwargs): |
192 """Returns (StreamClient): A new stream client connection. | 290 """Returns (StreamClient): A new stream client instance. |
193 | 291 |
194 Validates the streamserver parameters and creates a new StreamClient | 292 Validates the streamserver parameters and creates a new StreamClient |
195 instance that connects to them. | 293 instance that connects to them. |
196 | 294 |
197 Implementing classes must override this. | 295 Implementing classes must override this. |
198 """ | 296 """ |
199 raise NotImplementedError() | 297 raise NotImplementedError() |
200 | 298 |
201 def _connect_raw(self): | 299 def _connect_raw(self): |
202 """Returns (file): A new file-like stream. | 300 """Returns (file): A new file-like stream. |
(...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
278 have UTF-8 text content written to it with its `write` method, and must | 376 have UTF-8 text content written to it with its `write` method, and must |
279 be closed when finished using its `close` method. | 377 be closed when finished using its `close` method. |
280 """ | 378 """ |
281 params = StreamParams.make( | 379 params = StreamParams.make( |
282 name=name, | 380 name=name, |
283 type=StreamParams.TEXT, | 381 type=StreamParams.TEXT, |
284 content_type=content_type, | 382 content_type=content_type, |
285 tags=tags, | 383 tags=tags, |
286 tee=tee, | 384 tee=tee, |
287 binary_file_extension=binary_file_extension) | 385 binary_file_extension=binary_file_extension) |
288 return self.new_connection(params) | 386 return self._BasicStream(self, params, self.new_connection(params)) |
289 | 387 |
290 @contextlib.contextmanager | 388 @contextlib.contextmanager |
291 def binary(self, name, **kwargs): | 389 def binary(self, name, **kwargs): |
292 """Context manager to create, use, and teardown a BINARY stream. | 390 """Context manager to create, use, and teardown a BINARY stream. |
293 | 391 |
294 This context manager creates a new butler BINARY stream with the specified | 392 This context manager creates a new butler BINARY stream with the specified |
295 parameters, yields it, and closes it on teardown. | 393 parameters, yields it, and closes it on teardown. |
296 | 394 |
297 Args: | 395 Args: |
298 name (str): the LogDog name of the stream. | 396 name (str): the LogDog name of the stream. |
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
331 can have UTF-8 content written to it with its `write` method, and must | 429 can have UTF-8 content written to it with its `write` method, and must |
332 be closed when finished using its `close` method. | 430 be closed when finished using its `close` method. |
333 """ | 431 """ |
334 params = StreamParams.make( | 432 params = StreamParams.make( |
335 name=name, | 433 name=name, |
336 type=StreamParams.BINARY, | 434 type=StreamParams.BINARY, |
337 content_type=content_type, | 435 content_type=content_type, |
338 tags=tags, | 436 tags=tags, |
339 tee=tee, | 437 tee=tee, |
340 binary_file_extension=binary_file_extension) | 438 binary_file_extension=binary_file_extension) |
341 return self.new_connection(params) | 439 return self._BasicStream(self, params, self.new_connection(params)) |
342 | 440 |
343 @contextlib.contextmanager | 441 @contextlib.contextmanager |
344 def datagram(self, name, **kwargs): | 442 def datagram(self, name, **kwargs): |
345 """Context manager to create, use, and teardown a DATAGRAM stream. | 443 """Context manager to create, use, and teardown a DATAGRAM stream. |
346 | 444 |
347 This context manager creates a new butler DATAAGRAM stream with the | 445 This context manager creates a new butler DATAAGRAM stream with the |
348 specified parameters, yields it, and closes it on teardown. | 446 specified parameters, yields it, and closes it on teardown. |
349 | 447 |
350 Args: | 448 Args: |
351 name (str): the LogDog name of the stream. | 449 name (str): the LogDog name of the stream. |
(...skipping 30 matching lines...) Expand all Loading... | |
382 written to it using its `send` method. This object must be closed when | 480 written to it using its `send` method. This object must be closed when |
383 finished by using its `close` method. | 481 finished by using its `close` method. |
384 """ | 482 """ |
385 params = StreamParams.make( | 483 params = StreamParams.make( |
386 name=name, | 484 name=name, |
387 type=StreamParams.DATAGRAM, | 485 type=StreamParams.DATAGRAM, |
388 content_type=content_type, | 486 content_type=content_type, |
389 tags=tags, | 487 tags=tags, |
390 tee=tee, | 488 tee=tee, |
391 binary_file_extension=binary_file_extension) | 489 binary_file_extension=binary_file_extension) |
392 return self._DatagramStream(self.new_connection(params)) | 490 return self._DatagramStream(self, params, self.new_connection(params)) |
393 | 491 |
394 | 492 |
395 class _NamedPipeStreamClient(StreamClient): | 493 class _NamedPipeStreamClient(StreamClient): |
396 """A StreamClient implementation that connects to a Windows named pipe. | 494 """A StreamClient implementation that connects to a Windows named pipe. |
397 """ | 495 """ |
398 | 496 |
399 def __init__(self, name): | 497 def __init__(self, name, **kwargs): |
400 r"""Initializes a new Windows named pipe stream client. | 498 r"""Initializes a new Windows named pipe stream client. |
401 | 499 |
402 Args: | 500 Args: |
403 name (str): The name of the Windows named pipe to use (e.g., "\\.\name") | 501 name (str): The name of the Windows named pipe to use (e.g., "\\.\name") |
404 """ | 502 """ |
405 super(_NamedPipeStreamClient, self).__init__() | 503 super(_NamedPipeStreamClient, self).__init__(**kwargs) |
406 self._name = name | 504 self._name = name |
407 | 505 |
408 @classmethod | 506 @classmethod |
409 def _create(cls, value): | 507 def _create(cls, value, **kwargs): |
410 return cls(value) | 508 return cls(value, **kwargs) |
411 | 509 |
412 def _connect_raw(self): | 510 def _connect_raw(self): |
413 return open(self._name, 'wb') | 511 return open(self._name, 'wb') |
414 | 512 |
415 _default_registry.register_protocol('net.pipe', _NamedPipeStreamClient) | 513 _default_registry.register_protocol('net.pipe', _NamedPipeStreamClient) |
416 | 514 |
417 | 515 |
418 class _UnixDomainSocketStreamClient(StreamClient): | 516 class _UnixDomainSocketStreamClient(StreamClient): |
419 """A StreamClient implementation that uses a UNIX domain socket. | 517 """A StreamClient implementation that uses a UNIX domain socket. |
420 """ | 518 """ |
421 | 519 |
422 class SocketFile(object): | 520 class SocketFile(object): |
423 """A write-only file-like object that writes to a UNIX socket.""" | 521 """A write-only file-like object that writes to a UNIX socket.""" |
424 | 522 |
425 def __init__(self, fd): | 523 def __init__(self, fd): |
426 self._fd = fd | 524 self._fd = fd |
427 | 525 |
428 def write(self, data): | 526 def write(self, data): |
429 self._fd.send(data) | 527 self._fd.send(data) |
430 | 528 |
431 def close(self): | 529 def close(self): |
432 self._fd.close() | 530 self._fd.close() |
433 | 531 |
434 | 532 |
435 def __init__(self, path): | 533 def __init__(self, path, **kwargs): |
436 """Initializes a new UNIX domain socket stream client. | 534 """Initializes a new UNIX domain socket stream client. |
437 | 535 |
438 Args: | 536 Args: |
439 path (str): The path to the named UNIX domain socket. | 537 path (str): The path to the named UNIX domain socket. |
440 """ | 538 """ |
441 super(_UnixDomainSocketStreamClient, self).__init__() | 539 super(_UnixDomainSocketStreamClient, self).__init__(**kwargs) |
442 self._path = path | 540 self._path = path |
443 | 541 |
444 @classmethod | 542 @classmethod |
445 def _create(cls, value): | 543 def _create(cls, value, **kwargs): |
446 if not os.path.exists(value): | 544 if not os.path.exists(value): |
447 raise ValueError('UNIX domain socket [%s] does not exist.' % (value,)) | 545 raise ValueError('UNIX domain socket [%s] does not exist.' % (value,)) |
448 return cls(value) | 546 return cls(value, **kwargs) |
449 | 547 |
450 def _connect_raw(self): | 548 def _connect_raw(self): |
451 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | 549 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
452 sock.connect(self._path) | 550 sock.connect(self._path) |
453 return self.SocketFile(sock) | 551 return self.SocketFile(sock) |
454 | 552 |
455 _default_registry.register_protocol('unix', _UnixDomainSocketStreamClient) | 553 _default_registry.register_protocol('unix', _UnixDomainSocketStreamClient) |
OLD | NEW |