OLD | NEW |
1 # Copyright 2016 The LUCI Authors. All rights reserved. | 1 # Copyright 2016 The LUCI Authors. All rights reserved. |
2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
4 | 4 |
5 import collections | 5 import collections |
6 import contextlib | 6 import contextlib |
7 import json | 7 import json |
8 import os | 8 import os |
9 import socket | 9 import socket |
10 import sys | 10 import sys |
(...skipping 94 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
105 | 105 |
106 def __init__(self): | 106 def __init__(self): |
107 self._registry = {} | 107 self._registry = {} |
108 | 108 |
109 def register_protocol(self, protocol, client_cls): | 109 def register_protocol(self, protocol, client_cls): |
110 assert issubclass(client_cls, StreamClient) | 110 assert issubclass(client_cls, StreamClient) |
111 if self._registry.get(protocol) is not None: | 111 if self._registry.get(protocol) is not None: |
112 raise KeyError('Duplicate protocol registered.') | 112 raise KeyError('Duplicate protocol registered.') |
113 self._registry[protocol] = client_cls | 113 self._registry[protocol] = client_cls |
114 | 114 |
115 def create(self, uri): | 115 def create(self, uri, **kwargs): |
| 116 """Returns (StreamClient): A stream client for the specified URI. |
| 117 |
| 118 This uses the default StreamProtocolRegistry to instantiate a StreamClient |
| 119 for the specified URI. |
| 120 |
| 121 Args: |
| 122 uri (str): The streamserver URI. |
| 123 kwargs: keyword arguments to forward to the stream. See |
| 124 StreamClient.__init__. |
| 125 |
| 126 Raises: |
| 127 ValueError: if the supplied URI references an invalid or improperly |
| 128 configured streamserver. |
| 129 """ |
116 uri = uri.split(':', 1) | 130 uri = uri.split(':', 1) |
117 if len(uri) != 2: | 131 if len(uri) != 2: |
118 raise ValueError('Invalid stream server URI [%s]' % (uri,)) | 132 raise ValueError('Invalid stream server URI [%s]' % (uri,)) |
119 protocol, value = uri | 133 protocol, value = uri |
120 | 134 |
121 client_cls = self._registry.get(protocol) | 135 client_cls = self._registry.get(protocol) |
122 if not client_cls: | 136 if not client_cls: |
123 raise ValueError('Unknown stream client protocol (%s)' % (protocol,)) | 137 raise ValueError('Unknown stream client protocol (%s)' % (protocol,)) |
124 return client_cls._create(value) | 138 return client_cls._create(value, **kwargs) |
| 139 |
125 | 140 |
126 # Default (global) registry. | 141 # Default (global) registry. |
127 _default_registry = StreamProtocolRegistry() | 142 _default_registry = StreamProtocolRegistry() |
128 | 143 create = _default_registry.create |
129 | |
130 def create(uri): | |
131 """Returns (StreamClient): A stream client for the specified URI. | |
132 | |
133 This uses the default StreamProtocolRegistry to instantiate a StreamClient | |
134 for the specified URI. | |
135 | |
136 Args: | |
137 uri: The streamserver URI. | |
138 | |
139 Raises: | |
140 ValueError if the supplied URI references an invalid or improperly | |
141 configured streamserver. | |
142 """ | |
143 return _default_registry.create(uri) | |
144 | 144 |
145 | 145 |
146 class StreamClient(object): | 146 class StreamClient(object): |
147 """Abstract base class for a streamserver client. | 147 """Abstract base class for a streamserver client. |
148 """ | 148 """ |
149 | 149 |
150 class _DatagramStream(object): | 150 class _StreamBase(object): |
| 151 """ABC for StreamClient streams.""" |
| 152 |
| 153 def __init__(self, stream_client, params): |
| 154 self._stream_client = stream_client |
| 155 self._params = params |
| 156 |
| 157 @property |
| 158 def params(self): |
| 159 """Returns (StreamParams): The stream parameters.""" |
| 160 return self._params |
| 161 |
| 162 @property |
| 163 def path(self): |
| 164 """Returns (streamname.StreamPath): The stream path. |
| 165 |
| 166 Raises: |
| 167 ValueError: if the stream path is invalid, or if the stream prefix is |
| 168 not defined in the client. |
| 169 """ |
| 170 return self._stream_client.get_stream_path(self._params.name) |
| 171 |
| 172 def get_viewer_url(self): |
| 173 """Returns (str): The viewer URL for this stream. |
| 174 |
| 175 Raises: |
| 176 KeyError: if information needed to construct the URL is missing. |
| 177 ValueError: if the stream prefix or name do not form a valid stream |
| 178 path. |
| 179 """ |
| 180 return self._stream_client.get_viewer_url(self._params.name) |
| 181 |
| 182 |
| 183 class _BasicStream(_StreamBase): |
| 184 """Wraps a basic file descriptor, offering "write" and "close".""" |
| 185 |
| 186 def __init__(self, stream_client, params, fd): |
| 187 super(StreamClient._BasicStream, self).__init__(stream_client, params) |
| 188 self._fd = fd |
| 189 |
| 190 @property |
| 191 def fd(self): |
| 192 return self._fd |
| 193 |
| 194 def write(self, data): |
| 195 return self._fd.write(data) |
| 196 |
| 197 def close(self): |
| 198 return self._fd.close() |
| 199 |
| 200 |
| 201 class _DatagramStream(_StreamBase): |
151 """Wraps a stream object to write length-prefixed datagrams.""" | 202 """Wraps a stream object to write length-prefixed datagrams.""" |
152 | 203 |
153 def __init__(self, fd): | 204 def __init__(self, stream_client, params, fd): |
| 205 super(StreamClient._DatagramStream, self).__init__(stream_client, params) |
154 self._fd = fd | 206 self._fd = fd |
155 | 207 |
156 def send(self, data): | 208 def send(self, data): |
157 varint.write_uvarint(self._fd, len(data)) | 209 varint.write_uvarint(self._fd, len(data)) |
158 self._fd.write(data) | 210 self._fd.write(data) |
159 | 211 |
160 def close(self): | 212 def close(self): |
161 return self._fd.close() | 213 return self._fd.close() |
162 | 214 |
163 def __init__(self): | 215 |
| 216 def __init__(self, project=None, prefix=None, coordinator_host=None): |
| 217 """Constructs a new base StreamClient instance. |
| 218 |
| 219 Args: |
| 220 project (str or None): If not None, the name of the log stream project. |
| 221 prefix (str or None): If not None, the log stream session prefix. |
| 222 coordinator_host (str or None): If not None, the name of the Coordinator |
| 223 host that this stream client is bound to. This will be used to |
| 224 construct viewer URLs for generated streams. |
| 225 """ |
| 226 self._project = project |
| 227 self._prefix = prefix |
| 228 self._coordinator_host = coordinator_host |
| 229 |
164 self._name_lock = threading.Lock() | 230 self._name_lock = threading.Lock() |
165 self._names = set() | 231 self._names = set() |
166 | 232 |
| 233 @property |
| 234 def project(self): |
| 235 """Returns (str or None): The stream project, or None if not configured.""" |
| 236 return self._project |
| 237 |
| 238 @property |
| 239 def prefix(self): |
| 240 """Returns (str or None): The stream prefix, or None if not configured.""" |
| 241 return self._prefix |
| 242 |
| 243 @property |
| 244 def coordinator_host(self): |
| 245 """Returns (str or None): The coordinator host, or None if not configured. |
| 246 """ |
| 247 return self._coordinator_host |
| 248 |
| 249 def get_stream_path(self, name): |
| 250 """Returns (streamname.StreamPath): The stream path. |
| 251 |
| 252 Args: |
| 253 name (str): The name of the stream. |
| 254 |
| 255 Raises: |
| 256 KeyError: if information needed to construct the path is missing. |
| 257 ValueError: if the stream path is invalid, or if the stream prefix is |
| 258 not defined in the client. |
| 259 """ |
| 260 if not self._prefix: |
| 261 raise KeyError('Stream prefix is not configured') |
| 262 return streamname.StreamPath.make(self._prefix, name) |
| 263 |
| 264 def get_viewer_url(self, name): |
| 265 """Returns (str): The LogDog viewer URL for the named stream. |
| 266 |
| 267 Args: |
| 268 name (str): The name of the stream. This can also be a query glob. |
| 269 |
| 270 Raises: |
| 271 KeyError: if information needed to construct the URL is missing. |
| 272 ValueError: if the stream prefix or name do not form a valid stream |
| 273 path. |
| 274 """ |
| 275 if not self._coordinator_host: |
| 276 raise KeyError('Coordinator host is not configured') |
| 277 if not self._project: |
| 278 raise KeyError('Stream project is not configured') |
| 279 |
| 280 return streamname.get_logdog_viewer_url( |
| 281 self._coordinator_host, |
| 282 self._project, |
| 283 self.get_stream_path(name)) |
| 284 |
167 def _register_new_stream(self, name): | 285 def _register_new_stream(self, name): |
168 """Registers a new stream name. | 286 """Registers a new stream name. |
169 | 287 |
170 The Butler will internally reject any duplicate stream names. However, there | 288 The Butler will internally reject any duplicate stream names. However, there |
171 isn't really feedback when this happens except a closed stream client. This | 289 isn't really feedback when this happens except a closed stream client. This |
172 is a client-side check to provide a more user-friendly experience in the | 290 is a client-side check to provide a more user-friendly experience in the |
173 event that a user attempts to register a duplicate stream name. | 291 event that a user attempts to register a duplicate stream name. |
174 | 292 |
175 Note that this is imperfect, as something else could register stream names | 293 Note that this is imperfect, as something else could register stream names |
176 with the same Butler instance and this library has no means of tracking. | 294 with the same Butler instance and this library has no means of tracking. |
177 This is a best-effort experience, not a reliable check. | 295 This is a best-effort experience, not a reliable check. |
178 | 296 |
179 Args: | 297 Args: |
180 name (str): The name of the stream. | 298 name (str): The name of the stream. |
181 | 299 |
182 Raises: | 300 Raises: |
183 ValueError if the stream name has already been registered. | 301 ValueError if the stream name has already been registered. |
184 """ | 302 """ |
185 with self._name_lock: | 303 with self._name_lock: |
186 if name in self._names: | 304 if name in self._names: |
187 raise ValueError("Duplicate stream name [%s]" % (name,)) | 305 raise ValueError("Duplicate stream name [%s]" % (name,)) |
188 self._names.add(name) | 306 self._names.add(name) |
189 | 307 |
190 @classmethod | 308 @classmethod |
191 def _create(cls, value): | 309 def _create(cls, value, **kwargs): |
192 """Returns (StreamClient): A new stream client connection. | 310 """Returns (StreamClient): A new stream client instance. |
193 | 311 |
194 Validates the streamserver parameters and creates a new StreamClient | 312 Validates the streamserver parameters and creates a new StreamClient |
195 instance that connects to them. | 313 instance that connects to them. |
196 | 314 |
197 Implementing classes must override this. | 315 Implementing classes must override this. |
198 """ | 316 """ |
199 raise NotImplementedError() | 317 raise NotImplementedError() |
200 | 318 |
201 def _connect_raw(self): | 319 def _connect_raw(self): |
202 """Returns (file): A new file-like stream. | 320 """Returns (file): A new file-like stream. |
(...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
278 have UTF-8 text content written to it with its `write` method, and must | 396 have UTF-8 text content written to it with its `write` method, and must |
279 be closed when finished using its `close` method. | 397 be closed when finished using its `close` method. |
280 """ | 398 """ |
281 params = StreamParams.make( | 399 params = StreamParams.make( |
282 name=name, | 400 name=name, |
283 type=StreamParams.TEXT, | 401 type=StreamParams.TEXT, |
284 content_type=content_type, | 402 content_type=content_type, |
285 tags=tags, | 403 tags=tags, |
286 tee=tee, | 404 tee=tee, |
287 binary_file_extension=binary_file_extension) | 405 binary_file_extension=binary_file_extension) |
288 return self.new_connection(params) | 406 return self._BasicStream(self, params, self.new_connection(params)) |
289 | 407 |
290 @contextlib.contextmanager | 408 @contextlib.contextmanager |
291 def binary(self, name, **kwargs): | 409 def binary(self, name, **kwargs): |
292 """Context manager to create, use, and teardown a BINARY stream. | 410 """Context manager to create, use, and teardown a BINARY stream. |
293 | 411 |
294 This context manager creates a new butler BINARY stream with the specified | 412 This context manager creates a new butler BINARY stream with the specified |
295 parameters, yields it, and closes it on teardown. | 413 parameters, yields it, and closes it on teardown. |
296 | 414 |
297 Args: | 415 Args: |
298 name (str): the LogDog name of the stream. | 416 name (str): the LogDog name of the stream. |
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
331 can have UTF-8 content written to it with its `write` method, and must | 449 can have UTF-8 content written to it with its `write` method, and must |
332 be closed when finished using its `close` method. | 450 be closed when finished using its `close` method. |
333 """ | 451 """ |
334 params = StreamParams.make( | 452 params = StreamParams.make( |
335 name=name, | 453 name=name, |
336 type=StreamParams.BINARY, | 454 type=StreamParams.BINARY, |
337 content_type=content_type, | 455 content_type=content_type, |
338 tags=tags, | 456 tags=tags, |
339 tee=tee, | 457 tee=tee, |
340 binary_file_extension=binary_file_extension) | 458 binary_file_extension=binary_file_extension) |
341 return self.new_connection(params) | 459 return self._BasicStream(self, params, self.new_connection(params)) |
342 | 460 |
343 @contextlib.contextmanager | 461 @contextlib.contextmanager |
344 def datagram(self, name, **kwargs): | 462 def datagram(self, name, **kwargs): |
345 """Context manager to create, use, and teardown a DATAGRAM stream. | 463 """Context manager to create, use, and teardown a DATAGRAM stream. |
346 | 464 |
347 This context manager creates a new butler DATAAGRAM stream with the | 465 This context manager creates a new butler DATAAGRAM stream with the |
348 specified parameters, yields it, and closes it on teardown. | 466 specified parameters, yields it, and closes it on teardown. |
349 | 467 |
350 Args: | 468 Args: |
351 name (str): the LogDog name of the stream. | 469 name (str): the LogDog name of the stream. |
(...skipping 30 matching lines...) Expand all Loading... |
382 written to it using its `send` method. This object must be closed when | 500 written to it using its `send` method. This object must be closed when |
383 finished by using its `close` method. | 501 finished by using its `close` method. |
384 """ | 502 """ |
385 params = StreamParams.make( | 503 params = StreamParams.make( |
386 name=name, | 504 name=name, |
387 type=StreamParams.DATAGRAM, | 505 type=StreamParams.DATAGRAM, |
388 content_type=content_type, | 506 content_type=content_type, |
389 tags=tags, | 507 tags=tags, |
390 tee=tee, | 508 tee=tee, |
391 binary_file_extension=binary_file_extension) | 509 binary_file_extension=binary_file_extension) |
392 return self._DatagramStream(self.new_connection(params)) | 510 return self._DatagramStream(self, params, self.new_connection(params)) |
393 | 511 |
394 | 512 |
395 class _NamedPipeStreamClient(StreamClient): | 513 class _NamedPipeStreamClient(StreamClient): |
396 """A StreamClient implementation that connects to a Windows named pipe. | 514 """A StreamClient implementation that connects to a Windows named pipe. |
397 """ | 515 """ |
398 | 516 |
399 def __init__(self, name): | 517 def __init__(self, name, **kwargs): |
400 r"""Initializes a new Windows named pipe stream client. | 518 r"""Initializes a new Windows named pipe stream client. |
401 | 519 |
402 Args: | 520 Args: |
403 name (str): The name of the Windows named pipe to use (e.g., "\\.\name") | 521 name (str): The name of the Windows named pipe to use (e.g., "\\.\name") |
404 """ | 522 """ |
405 super(_NamedPipeStreamClient, self).__init__() | 523 super(_NamedPipeStreamClient, self).__init__(**kwargs) |
406 self._name = name | 524 self._name = name |
407 | 525 |
408 @classmethod | 526 @classmethod |
409 def _create(cls, value): | 527 def _create(cls, value, **kwargs): |
410 return cls(value) | 528 return cls(value, **kwargs) |
411 | 529 |
412 def _connect_raw(self): | 530 def _connect_raw(self): |
413 return open(self._name, 'wb') | 531 return open(self._name, 'wb') |
414 | 532 |
415 _default_registry.register_protocol('net.pipe', _NamedPipeStreamClient) | 533 _default_registry.register_protocol('net.pipe', _NamedPipeStreamClient) |
416 | 534 |
417 | 535 |
418 class _UnixDomainSocketStreamClient(StreamClient): | 536 class _UnixDomainSocketStreamClient(StreamClient): |
419 """A StreamClient implementation that uses a UNIX domain socket. | 537 """A StreamClient implementation that uses a UNIX domain socket. |
420 """ | 538 """ |
421 | 539 |
422 class SocketFile(object): | 540 class SocketFile(object): |
423 """A write-only file-like object that writes to a UNIX socket.""" | 541 """A write-only file-like object that writes to a UNIX socket.""" |
424 | 542 |
425 def __init__(self, fd): | 543 def __init__(self, fd): |
426 self._fd = fd | 544 self._fd = fd |
427 | 545 |
428 def write(self, data): | 546 def write(self, data): |
429 self._fd.send(data) | 547 self._fd.send(data) |
430 | 548 |
431 def close(self): | 549 def close(self): |
432 self._fd.close() | 550 self._fd.close() |
433 | 551 |
434 | 552 |
435 def __init__(self, path): | 553 def __init__(self, path, **kwargs): |
436 """Initializes a new UNIX domain socket stream client. | 554 """Initializes a new UNIX domain socket stream client. |
437 | 555 |
438 Args: | 556 Args: |
439 path (str): The path to the named UNIX domain socket. | 557 path (str): The path to the named UNIX domain socket. |
440 """ | 558 """ |
441 super(_UnixDomainSocketStreamClient, self).__init__() | 559 super(_UnixDomainSocketStreamClient, self).__init__(**kwargs) |
442 self._path = path | 560 self._path = path |
443 | 561 |
444 @classmethod | 562 @classmethod |
445 def _create(cls, value): | 563 def _create(cls, value, **kwargs): |
446 if not os.path.exists(value): | 564 if not os.path.exists(value): |
447 raise ValueError('UNIX domain socket [%s] does not exist.' % (value,)) | 565 raise ValueError('UNIX domain socket [%s] does not exist.' % (value,)) |
448 return cls(value) | 566 return cls(value, **kwargs) |
449 | 567 |
450 def _connect_raw(self): | 568 def _connect_raw(self): |
451 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | 569 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
452 sock.connect(self._path) | 570 sock.connect(self._path) |
453 return self.SocketFile(sock) | 571 return self.SocketFile(sock) |
454 | 572 |
455 _default_registry.register_protocol('unix', _UnixDomainSocketStreamClient) | 573 _default_registry.register_protocol('unix', _UnixDomainSocketStreamClient) |
OLD | NEW |