Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(142)

Side by Side Diff: client/libs/logdog/stream.py

Issue 2453273002: Update LogDog client library to generate URLs. (Closed)
Patch Set: Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2016 The LUCI Authors. All rights reserved. 1 # Copyright 2016 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file. 3 # that can be found in the LICENSE file.
4 4
5 import collections 5 import collections
6 import contextlib 6 import contextlib
7 import json 7 import json
8 import os 8 import os
9 import socket 9 import socket
10 import sys 10 import sys
(...skipping 94 matching lines...) Expand 10 before | Expand all | Expand 10 after
105 105
106 def __init__(self): 106 def __init__(self):
107 self._registry = {} 107 self._registry = {}
108 108
109 def register_protocol(self, protocol, client_cls): 109 def register_protocol(self, protocol, client_cls):
110 assert issubclass(client_cls, StreamClient) 110 assert issubclass(client_cls, StreamClient)
111 if self._registry.get(protocol) is not None: 111 if self._registry.get(protocol) is not None:
112 raise KeyError('Duplicate protocol registered.') 112 raise KeyError('Duplicate protocol registered.')
113 self._registry[protocol] = client_cls 113 self._registry[protocol] = client_cls
114 114
115 def create(self, uri): 115 def create(self, uri, **kwargs):
116 """Returns (StreamClient): A stream client for the specified URI.
117
118 This uses the default StreamProtocolRegistry to instantiate a StreamClient
119 for the specified URI.
120
121 Args:
122 uri (str): The streamserver URI.
123 kwargs: keyword arguments to forward to the stream. See
124 StreamClient.__init__.
125
126 Raises:
127 ValueError: if the supplied URI references an invalid or improperly
128 configured streamserver.
129 """
116 uri = uri.split(':', 1) 130 uri = uri.split(':', 1)
117 if len(uri) != 2: 131 if len(uri) != 2:
118 raise ValueError('Invalid stream server URI [%s]' % (uri,)) 132 raise ValueError('Invalid stream server URI [%s]' % (uri,))
119 protocol, value = uri 133 protocol, value = uri
120 134
121 client_cls = self._registry.get(protocol) 135 client_cls = self._registry.get(protocol)
122 if not client_cls: 136 if not client_cls:
123 raise ValueError('Unknown stream client protocol (%s)' % (protocol,)) 137 raise ValueError('Unknown stream client protocol (%s)' % (protocol,))
124 return client_cls._create(value) 138 return client_cls._create(value, **kwargs)
139
125 140
126 # Default (global) registry. 141 # Default (global) registry.
127 _default_registry = StreamProtocolRegistry() 142 _default_registry = StreamProtocolRegistry()
128 143 create = _default_registry.create
129
130 def create(uri):
131 """Returns (StreamClient): A stream client for the specified URI.
132
133 This uses the default StreamProtocolRegistry to instantiate a StreamClient
134 for the specified URI.
135
136 Args:
137 uri: The streamserver URI.
138
139 Raises:
140 ValueError if the supplied URI references an invalid or improperly
141 configured streamserver.
142 """
143 return _default_registry.create(uri)
144 144
145 145
146 class StreamClient(object): 146 class StreamClient(object):
147 """Abstract base class for a streamserver client. 147 """Abstract base class for a streamserver client.
148 """ 148 """
149 149
150 class _DatagramStream(object): 150 class _StreamBase(object):
151 """ABC for StreamClient streams."""
152
153 def __init__(self, stream_client, params):
154 self._stream_client = stream_client
155 self._params = params
156
157 @property
158 def params(self):
159 """Returns (StreamParams): The stream parameters."""
160 return self._params
161
162 @property
163 def path(self):
jbudorick 2016/10/27 13:24:06 nit: seems a bit odd to have path be a property an
dnj 2016/10/27 22:42:45 Agreed. I don't really like it, but OTOH having a
jbudorick 2016/10/27 23:42:50 sgtm.
164 """Returns (streamname.StreamPath): The stream path.
165
166 Raises:
167 ValueError: if the stream path is invalid, or if the stream prefix is
168 not defined in the client.
169 """
170 return self._stream_client.get_stream_path(self._params.name)
171
172 def get_viewer_url(self):
173 return self._stream_client.get_viewer_url(self._params.name)
174
175
176 class _BasicStream(_StreamBase):
177 """Wraps a basic file descriptor, offering "write" and "close"."""
178
179 def __init__(self, stream_client, params, fd):
180 super(StreamClient._BasicStream, self).__init__(stream_client, params)
181 self._fd = fd
182
183 @property
184 def fd(self):
185 return self._fd
186
187 def write(self, data):
188 return self._fd.write(data)
189
190 def close(self):
191 return self._fd.close()
192
193
194 class _DatagramStream(_StreamBase):
151 """Wraps a stream object to write length-prefixed datagrams.""" 195 """Wraps a stream object to write length-prefixed datagrams."""
152 196
153 def __init__(self, fd): 197 def __init__(self, stream_client, params, fd):
198 super(StreamClient._DatagramStream, self).__init__(stream_client, params)
154 self._fd = fd 199 self._fd = fd
155 200
156 def send(self, data): 201 def send(self, data):
157 varint.write_uvarint(self._fd, len(data)) 202 varint.write_uvarint(self._fd, len(data))
158 self._fd.write(data) 203 self._fd.write(data)
159 204
160 def close(self): 205 def close(self):
161 return self._fd.close() 206 return self._fd.close()
162 207
163 def __init__(self): 208
209 def __init__(self, prefix=None, coordinator_host=None):
210 """Constructs a new base StreamClient instance.
211
212 Args:
213 prefix (str or None): If not None, the log stream session prefix.
214 coordinator_host (str or None): If not None, the name of the Coordinator
215 host that this stream client is bound to. This will be used to
216 construct viewer URLs for generated streams.
217 """
218 self._prefix = prefix
219 self._coordinator_host = coordinator_host
220
164 self._name_lock = threading.Lock() 221 self._name_lock = threading.Lock()
165 self._names = set() 222 self._names = set()
166 223
224 @property
225 def prefix(self):
226 """Returns (str or None): The stream prefix, or None if not configured."""
227 return self._prefix
228
229 @property
230 def coordinator_host(self):
231 """Returns (str or None): The coordinator host, or None if not configured.
232 """
233 return self._coordinator_host
234
235 def get_stream_path(self, name):
236 """Returns (streamname.StreamPath): The stream path.
237
238 Args:
239 name (str): The name of the stream.
240
241 Raises:
242 ValueError: if the stream path is invalid, or if the stream prefix is
243 not defined in the client.
244 """
245 if not self._prefix:
246 raise KeyError('Stream prefix is not configured')
247 return streamname.StreamPath.make(self._prefix, name)
248
249 def get_viewer_url(self, stream_name):
250 """Returns (str): The LogDog viewer URL for the named stream.
251
252 Args:
253 stream_name (str): The name of the stream. This can also be a query glob.
254
255 Raises:
256 KeyError: If the prefix and Coordinator host are not configured.
257 """
258 if not self._coordinator_host:
259 raise KeyError('Coordinator host is not configured')
260
261 return streamname.get_logdog_viewer_url(
jbudorick 2016/10/27 13:24:06 nit: there's a streamname module and a stream_name
dnj 2016/10/27 22:42:44 Will change to 'name".
262 self._coordinator_host,
263 self.get_stream_path(stream_name))
264
167 def _register_new_stream(self, name): 265 def _register_new_stream(self, name):
168 """Registers a new stream name. 266 """Registers a new stream name.
169 267
170 The Butler will internally reject any duplicate stream names. However, there 268 The Butler will internally reject any duplicate stream names. However, there
171 isn't really feedback when this happens except a closed stream client. This 269 isn't really feedback when this happens except a closed stream client. This
172 is a client-side check to provide a more user-friendly experience in the 270 is a client-side check to provide a more user-friendly experience in the
173 event that a user attempts to register a duplicate stream name. 271 event that a user attempts to register a duplicate stream name.
174 272
175 Note that this is imperfect, as something else could register stream names 273 Note that this is imperfect, as something else could register stream names
176 with the same Butler instance and this library has no means of tracking. 274 with the same Butler instance and this library has no means of tracking.
177 This is a best-effort experience, not a reliable check. 275 This is a best-effort experience, not a reliable check.
178 276
179 Args: 277 Args:
180 name (str): The name of the stream. 278 name (str): The name of the stream.
181 279
182 Raises: 280 Raises:
183 ValueError if the stream name has already been registered. 281 ValueError if the stream name has already been registered.
184 """ 282 """
185 with self._name_lock: 283 with self._name_lock:
186 if name in self._names: 284 if name in self._names:
187 raise ValueError("Duplicate stream name [%s]" % (name,)) 285 raise ValueError("Duplicate stream name [%s]" % (name,))
188 self._names.add(name) 286 self._names.add(name)
189 287
190 @classmethod 288 @classmethod
191 def _create(cls, value): 289 def _create(cls, value, **kwargs):
192 """Returns (StreamClient): A new stream client connection. 290 """Returns (StreamClient): A new stream client instance.
193 291
194 Validates the streamserver parameters and creates a new StreamClient 292 Validates the streamserver parameters and creates a new StreamClient
195 instance that connects to them. 293 instance that connects to them.
196 294
197 Implementing classes must override this. 295 Implementing classes must override this.
198 """ 296 """
199 raise NotImplementedError() 297 raise NotImplementedError()
200 298
201 def _connect_raw(self): 299 def _connect_raw(self):
202 """Returns (file): A new file-like stream. 300 """Returns (file): A new file-like stream.
(...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after
278 have UTF-8 text content written to it with its `write` method, and must 376 have UTF-8 text content written to it with its `write` method, and must
279 be closed when finished using its `close` method. 377 be closed when finished using its `close` method.
280 """ 378 """
281 params = StreamParams.make( 379 params = StreamParams.make(
282 name=name, 380 name=name,
283 type=StreamParams.TEXT, 381 type=StreamParams.TEXT,
284 content_type=content_type, 382 content_type=content_type,
285 tags=tags, 383 tags=tags,
286 tee=tee, 384 tee=tee,
287 binary_file_extension=binary_file_extension) 385 binary_file_extension=binary_file_extension)
288 return self.new_connection(params) 386 return self._BasicStream(self, params, self.new_connection(params))
289 387
290 @contextlib.contextmanager 388 @contextlib.contextmanager
291 def binary(self, name, **kwargs): 389 def binary(self, name, **kwargs):
292 """Context manager to create, use, and teardown a BINARY stream. 390 """Context manager to create, use, and teardown a BINARY stream.
293 391
294 This context manager creates a new butler BINARY stream with the specified 392 This context manager creates a new butler BINARY stream with the specified
295 parameters, yields it, and closes it on teardown. 393 parameters, yields it, and closes it on teardown.
296 394
297 Args: 395 Args:
298 name (str): the LogDog name of the stream. 396 name (str): the LogDog name of the stream.
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
331 can have UTF-8 content written to it with its `write` method, and must 429 can have UTF-8 content written to it with its `write` method, and must
332 be closed when finished using its `close` method. 430 be closed when finished using its `close` method.
333 """ 431 """
334 params = StreamParams.make( 432 params = StreamParams.make(
335 name=name, 433 name=name,
336 type=StreamParams.BINARY, 434 type=StreamParams.BINARY,
337 content_type=content_type, 435 content_type=content_type,
338 tags=tags, 436 tags=tags,
339 tee=tee, 437 tee=tee,
340 binary_file_extension=binary_file_extension) 438 binary_file_extension=binary_file_extension)
341 return self.new_connection(params) 439 return self._BasicStream(self, params, self.new_connection(params))
342 440
343 @contextlib.contextmanager 441 @contextlib.contextmanager
344 def datagram(self, name, **kwargs): 442 def datagram(self, name, **kwargs):
345 """Context manager to create, use, and teardown a DATAGRAM stream. 443 """Context manager to create, use, and teardown a DATAGRAM stream.
346 444
347 This context manager creates a new butler DATAAGRAM stream with the 445 This context manager creates a new butler DATAAGRAM stream with the
348 specified parameters, yields it, and closes it on teardown. 446 specified parameters, yields it, and closes it on teardown.
349 447
350 Args: 448 Args:
351 name (str): the LogDog name of the stream. 449 name (str): the LogDog name of the stream.
(...skipping 30 matching lines...) Expand all
382 written to it using its `send` method. This object must be closed when 480 written to it using its `send` method. This object must be closed when
383 finished by using its `close` method. 481 finished by using its `close` method.
384 """ 482 """
385 params = StreamParams.make( 483 params = StreamParams.make(
386 name=name, 484 name=name,
387 type=StreamParams.DATAGRAM, 485 type=StreamParams.DATAGRAM,
388 content_type=content_type, 486 content_type=content_type,
389 tags=tags, 487 tags=tags,
390 tee=tee, 488 tee=tee,
391 binary_file_extension=binary_file_extension) 489 binary_file_extension=binary_file_extension)
392 return self._DatagramStream(self.new_connection(params)) 490 return self._DatagramStream(self, params, self.new_connection(params))
393 491
394 492
395 class _NamedPipeStreamClient(StreamClient): 493 class _NamedPipeStreamClient(StreamClient):
396 """A StreamClient implementation that connects to a Windows named pipe. 494 """A StreamClient implementation that connects to a Windows named pipe.
397 """ 495 """
398 496
399 def __init__(self, name): 497 def __init__(self, name, **kwargs):
400 r"""Initializes a new Windows named pipe stream client. 498 r"""Initializes a new Windows named pipe stream client.
401 499
402 Args: 500 Args:
403 name (str): The name of the Windows named pipe to use (e.g., "\\.\name") 501 name (str): The name of the Windows named pipe to use (e.g., "\\.\name")
404 """ 502 """
405 super(_NamedPipeStreamClient, self).__init__() 503 super(_NamedPipeStreamClient, self).__init__(**kwargs)
406 self._name = name 504 self._name = name
407 505
408 @classmethod 506 @classmethod
409 def _create(cls, value): 507 def _create(cls, value, **kwargs):
410 return cls(value) 508 return cls(value, **kwargs)
411 509
412 def _connect_raw(self): 510 def _connect_raw(self):
413 return open(self._name, 'wb') 511 return open(self._name, 'wb')
414 512
415 _default_registry.register_protocol('net.pipe', _NamedPipeStreamClient) 513 _default_registry.register_protocol('net.pipe', _NamedPipeStreamClient)
416 514
417 515
418 class _UnixDomainSocketStreamClient(StreamClient): 516 class _UnixDomainSocketStreamClient(StreamClient):
419 """A StreamClient implementation that uses a UNIX domain socket. 517 """A StreamClient implementation that uses a UNIX domain socket.
420 """ 518 """
421 519
422 class SocketFile(object): 520 class SocketFile(object):
423 """A write-only file-like object that writes to a UNIX socket.""" 521 """A write-only file-like object that writes to a UNIX socket."""
424 522
425 def __init__(self, fd): 523 def __init__(self, fd):
426 self._fd = fd 524 self._fd = fd
427 525
428 def write(self, data): 526 def write(self, data):
429 self._fd.send(data) 527 self._fd.send(data)
430 528
431 def close(self): 529 def close(self):
432 self._fd.close() 530 self._fd.close()
433 531
434 532
435 def __init__(self, path): 533 def __init__(self, path, **kwargs):
436 """Initializes a new UNIX domain socket stream client. 534 """Initializes a new UNIX domain socket stream client.
437 535
438 Args: 536 Args:
439 path (str): The path to the named UNIX domain socket. 537 path (str): The path to the named UNIX domain socket.
440 """ 538 """
441 super(_UnixDomainSocketStreamClient, self).__init__() 539 super(_UnixDomainSocketStreamClient, self).__init__(**kwargs)
442 self._path = path 540 self._path = path
443 541
444 @classmethod 542 @classmethod
445 def _create(cls, value): 543 def _create(cls, value, **kwargs):
446 if not os.path.exists(value): 544 if not os.path.exists(value):
447 raise ValueError('UNIX domain socket [%s] does not exist.' % (value,)) 545 raise ValueError('UNIX domain socket [%s] does not exist.' % (value,))
448 return cls(value) 546 return cls(value, **kwargs)
449 547
450 def _connect_raw(self): 548 def _connect_raw(self):
451 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 549 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
452 sock.connect(self._path) 550 sock.connect(self._path)
453 return self.SocketFile(sock) 551 return self.SocketFile(sock)
454 552
455 _default_registry.register_protocol('unix', _UnixDomainSocketStreamClient) 553 _default_registry.register_protocol('unix', _UnixDomainSocketStreamClient)
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698