Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(87)

Side by Side Diff: client/libs/logdog/stream.py

Issue 2453273002: Update LogDog client library to generate URLs. (Closed)
Patch Set: Forgot project, oops. Addressed nits. Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « client/libs/logdog/bootstrap.py ('k') | client/libs/logdog/streamname.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # Copyright 2016 The LUCI Authors. All rights reserved. 1 # Copyright 2016 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file. 3 # that can be found in the LICENSE file.
4 4
5 import collections 5 import collections
6 import contextlib 6 import contextlib
7 import json 7 import json
8 import os 8 import os
9 import socket 9 import socket
10 import sys 10 import sys
(...skipping 94 matching lines...) Expand 10 before | Expand all | Expand 10 after
105 105
106 def __init__(self): 106 def __init__(self):
107 self._registry = {} 107 self._registry = {}
108 108
109 def register_protocol(self, protocol, client_cls): 109 def register_protocol(self, protocol, client_cls):
110 assert issubclass(client_cls, StreamClient) 110 assert issubclass(client_cls, StreamClient)
111 if self._registry.get(protocol) is not None: 111 if self._registry.get(protocol) is not None:
112 raise KeyError('Duplicate protocol registered.') 112 raise KeyError('Duplicate protocol registered.')
113 self._registry[protocol] = client_cls 113 self._registry[protocol] = client_cls
114 114
115 def create(self, uri): 115 def create(self, uri, **kwargs):
116 """Returns (StreamClient): A stream client for the specified URI.
117
118 This uses the default StreamProtocolRegistry to instantiate a StreamClient
119 for the specified URI.
120
121 Args:
122 uri (str): The streamserver URI.
123 kwargs: keyword arguments to forward to the stream. See
124 StreamClient.__init__.
125
126 Raises:
127 ValueError: if the supplied URI references an invalid or improperly
128 configured streamserver.
129 """
116 uri = uri.split(':', 1) 130 uri = uri.split(':', 1)
117 if len(uri) != 2: 131 if len(uri) != 2:
118 raise ValueError('Invalid stream server URI [%s]' % (uri,)) 132 raise ValueError('Invalid stream server URI [%s]' % (uri,))
119 protocol, value = uri 133 protocol, value = uri
120 134
121 client_cls = self._registry.get(protocol) 135 client_cls = self._registry.get(protocol)
122 if not client_cls: 136 if not client_cls:
123 raise ValueError('Unknown stream client protocol (%s)' % (protocol,)) 137 raise ValueError('Unknown stream client protocol (%s)' % (protocol,))
124 return client_cls._create(value) 138 return client_cls._create(value, **kwargs)
139
125 140
126 # Default (global) registry. 141 # Default (global) registry.
127 _default_registry = StreamProtocolRegistry() 142 _default_registry = StreamProtocolRegistry()
128 143 create = _default_registry.create
129
130 def create(uri):
131 """Returns (StreamClient): A stream client for the specified URI.
132
133 This uses the default StreamProtocolRegistry to instantiate a StreamClient
134 for the specified URI.
135
136 Args:
137 uri: The streamserver URI.
138
139 Raises:
140 ValueError if the supplied URI references an invalid or improperly
141 configured streamserver.
142 """
143 return _default_registry.create(uri)
144 144
145 145
146 class StreamClient(object): 146 class StreamClient(object):
147 """Abstract base class for a streamserver client. 147 """Abstract base class for a streamserver client.
148 """ 148 """
149 149
150 class _DatagramStream(object): 150 class _StreamBase(object):
151 """ABC for StreamClient streams."""
152
153 def __init__(self, stream_client, params):
154 self._stream_client = stream_client
155 self._params = params
156
157 @property
158 def params(self):
159 """Returns (StreamParams): The stream parameters."""
160 return self._params
161
162 @property
163 def path(self):
164 """Returns (streamname.StreamPath): The stream path.
165
166 Raises:
167 ValueError: if the stream path is invalid, or if the stream prefix is
168 not defined in the client.
169 """
170 return self._stream_client.get_stream_path(self._params.name)
171
172 def get_viewer_url(self):
173 """Returns (str): The viewer URL for this stream.
174
175 Raises:
176 KeyError: if information needed to construct the URL is missing.
177 ValueError: if the stream prefix or name do not form a valid stream
178 path.
179 """
180 return self._stream_client.get_viewer_url(self._params.name)
181
182
183 class _BasicStream(_StreamBase):
184 """Wraps a basic file descriptor, offering "write" and "close"."""
185
186 def __init__(self, stream_client, params, fd):
187 super(StreamClient._BasicStream, self).__init__(stream_client, params)
188 self._fd = fd
189
190 @property
191 def fd(self):
192 return self._fd
193
194 def write(self, data):
195 return self._fd.write(data)
196
197 def close(self):
198 return self._fd.close()
199
200
201 class _DatagramStream(_StreamBase):
151 """Wraps a stream object to write length-prefixed datagrams.""" 202 """Wraps a stream object to write length-prefixed datagrams."""
152 203
153 def __init__(self, fd): 204 def __init__(self, stream_client, params, fd):
205 super(StreamClient._DatagramStream, self).__init__(stream_client, params)
154 self._fd = fd 206 self._fd = fd
155 207
156 def send(self, data): 208 def send(self, data):
157 varint.write_uvarint(self._fd, len(data)) 209 varint.write_uvarint(self._fd, len(data))
158 self._fd.write(data) 210 self._fd.write(data)
159 211
160 def close(self): 212 def close(self):
161 return self._fd.close() 213 return self._fd.close()
162 214
163 def __init__(self): 215
216 def __init__(self, project=None, prefix=None, coordinator_host=None):
217 """Constructs a new base StreamClient instance.
218
219 Args:
220 project (str or None): If not None, the name of the log stream project.
221 prefix (str or None): If not None, the log stream session prefix.
222 coordinator_host (str or None): If not None, the name of the Coordinator
223 host that this stream client is bound to. This will be used to
224 construct viewer URLs for generated streams.
225 """
226 self._project = project
227 self._prefix = prefix
228 self._coordinator_host = coordinator_host
229
164 self._name_lock = threading.Lock() 230 self._name_lock = threading.Lock()
165 self._names = set() 231 self._names = set()
166 232
233 @property
234 def project(self):
235 """Returns (str or None): The stream project, or None if not configured."""
236 return self._project
237
238 @property
239 def prefix(self):
240 """Returns (str or None): The stream prefix, or None if not configured."""
241 return self._prefix
242
243 @property
244 def coordinator_host(self):
245 """Returns (str or None): The coordinator host, or None if not configured.
246 """
247 return self._coordinator_host
248
249 def get_stream_path(self, name):
250 """Returns (streamname.StreamPath): The stream path.
251
252 Args:
253 name (str): The name of the stream.
254
255 Raises:
256 KeyError: if information needed to construct the path is missing.
257 ValueError: if the stream path is invalid, or if the stream prefix is
258 not defined in the client.
259 """
260 if not self._prefix:
261 raise KeyError('Stream prefix is not configured')
262 return streamname.StreamPath.make(self._prefix, name)
263
264 def get_viewer_url(self, name):
265 """Returns (str): The LogDog viewer URL for the named stream.
266
267 Args:
268 name (str): The name of the stream. This can also be a query glob.
269
270 Raises:
271 KeyError: if information needed to construct the URL is missing.
272 ValueError: if the stream prefix or name do not form a valid stream
273 path.
274 """
275 if not self._coordinator_host:
276 raise KeyError('Coordinator host is not configured')
277 if not self._project:
278 raise KeyError('Stream project is not configured')
279
280 return streamname.get_logdog_viewer_url(
281 self._coordinator_host,
282 self._project,
283 self.get_stream_path(name))
284
167 def _register_new_stream(self, name): 285 def _register_new_stream(self, name):
168 """Registers a new stream name. 286 """Registers a new stream name.
169 287
170 The Butler will internally reject any duplicate stream names. However, there 288 The Butler will internally reject any duplicate stream names. However, there
171 isn't really feedback when this happens except a closed stream client. This 289 isn't really feedback when this happens except a closed stream client. This
172 is a client-side check to provide a more user-friendly experience in the 290 is a client-side check to provide a more user-friendly experience in the
173 event that a user attempts to register a duplicate stream name. 291 event that a user attempts to register a duplicate stream name.
174 292
175 Note that this is imperfect, as something else could register stream names 293 Note that this is imperfect, as something else could register stream names
176 with the same Butler instance and this library has no means of tracking. 294 with the same Butler instance and this library has no means of tracking.
177 This is a best-effort experience, not a reliable check. 295 This is a best-effort experience, not a reliable check.
178 296
179 Args: 297 Args:
180 name (str): The name of the stream. 298 name (str): The name of the stream.
181 299
182 Raises: 300 Raises:
183 ValueError if the stream name has already been registered. 301 ValueError if the stream name has already been registered.
184 """ 302 """
185 with self._name_lock: 303 with self._name_lock:
186 if name in self._names: 304 if name in self._names:
187 raise ValueError("Duplicate stream name [%s]" % (name,)) 305 raise ValueError("Duplicate stream name [%s]" % (name,))
188 self._names.add(name) 306 self._names.add(name)
189 307
190 @classmethod 308 @classmethod
191 def _create(cls, value): 309 def _create(cls, value, **kwargs):
192 """Returns (StreamClient): A new stream client connection. 310 """Returns (StreamClient): A new stream client instance.
193 311
194 Validates the streamserver parameters and creates a new StreamClient 312 Validates the streamserver parameters and creates a new StreamClient
195 instance that connects to them. 313 instance that connects to them.
196 314
197 Implementing classes must override this. 315 Implementing classes must override this.
198 """ 316 """
199 raise NotImplementedError() 317 raise NotImplementedError()
200 318
201 def _connect_raw(self): 319 def _connect_raw(self):
202 """Returns (file): A new file-like stream. 320 """Returns (file): A new file-like stream.
(...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after
278 have UTF-8 text content written to it with its `write` method, and must 396 have UTF-8 text content written to it with its `write` method, and must
279 be closed when finished using its `close` method. 397 be closed when finished using its `close` method.
280 """ 398 """
281 params = StreamParams.make( 399 params = StreamParams.make(
282 name=name, 400 name=name,
283 type=StreamParams.TEXT, 401 type=StreamParams.TEXT,
284 content_type=content_type, 402 content_type=content_type,
285 tags=tags, 403 tags=tags,
286 tee=tee, 404 tee=tee,
287 binary_file_extension=binary_file_extension) 405 binary_file_extension=binary_file_extension)
288 return self.new_connection(params) 406 return self._BasicStream(self, params, self.new_connection(params))
289 407
290 @contextlib.contextmanager 408 @contextlib.contextmanager
291 def binary(self, name, **kwargs): 409 def binary(self, name, **kwargs):
292 """Context manager to create, use, and teardown a BINARY stream. 410 """Context manager to create, use, and teardown a BINARY stream.
293 411
294 This context manager creates a new butler BINARY stream with the specified 412 This context manager creates a new butler BINARY stream with the specified
295 parameters, yields it, and closes it on teardown. 413 parameters, yields it, and closes it on teardown.
296 414
297 Args: 415 Args:
298 name (str): the LogDog name of the stream. 416 name (str): the LogDog name of the stream.
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
331 can have UTF-8 content written to it with its `write` method, and must 449 can have UTF-8 content written to it with its `write` method, and must
332 be closed when finished using its `close` method. 450 be closed when finished using its `close` method.
333 """ 451 """
334 params = StreamParams.make( 452 params = StreamParams.make(
335 name=name, 453 name=name,
336 type=StreamParams.BINARY, 454 type=StreamParams.BINARY,
337 content_type=content_type, 455 content_type=content_type,
338 tags=tags, 456 tags=tags,
339 tee=tee, 457 tee=tee,
340 binary_file_extension=binary_file_extension) 458 binary_file_extension=binary_file_extension)
341 return self.new_connection(params) 459 return self._BasicStream(self, params, self.new_connection(params))
342 460
343 @contextlib.contextmanager 461 @contextlib.contextmanager
344 def datagram(self, name, **kwargs): 462 def datagram(self, name, **kwargs):
345 """Context manager to create, use, and teardown a DATAGRAM stream. 463 """Context manager to create, use, and teardown a DATAGRAM stream.
346 464
347 This context manager creates a new butler DATAAGRAM stream with the 465 This context manager creates a new butler DATAAGRAM stream with the
348 specified parameters, yields it, and closes it on teardown. 466 specified parameters, yields it, and closes it on teardown.
349 467
350 Args: 468 Args:
351 name (str): the LogDog name of the stream. 469 name (str): the LogDog name of the stream.
(...skipping 30 matching lines...) Expand all
382 written to it using its `send` method. This object must be closed when 500 written to it using its `send` method. This object must be closed when
383 finished by using its `close` method. 501 finished by using its `close` method.
384 """ 502 """
385 params = StreamParams.make( 503 params = StreamParams.make(
386 name=name, 504 name=name,
387 type=StreamParams.DATAGRAM, 505 type=StreamParams.DATAGRAM,
388 content_type=content_type, 506 content_type=content_type,
389 tags=tags, 507 tags=tags,
390 tee=tee, 508 tee=tee,
391 binary_file_extension=binary_file_extension) 509 binary_file_extension=binary_file_extension)
392 return self._DatagramStream(self.new_connection(params)) 510 return self._DatagramStream(self, params, self.new_connection(params))
393 511
394 512
395 class _NamedPipeStreamClient(StreamClient): 513 class _NamedPipeStreamClient(StreamClient):
396 """A StreamClient implementation that connects to a Windows named pipe. 514 """A StreamClient implementation that connects to a Windows named pipe.
397 """ 515 """
398 516
399 def __init__(self, name): 517 def __init__(self, name, **kwargs):
400 r"""Initializes a new Windows named pipe stream client. 518 r"""Initializes a new Windows named pipe stream client.
401 519
402 Args: 520 Args:
403 name (str): The name of the Windows named pipe to use (e.g., "\\.\name") 521 name (str): The name of the Windows named pipe to use (e.g., "\\.\name")
404 """ 522 """
405 super(_NamedPipeStreamClient, self).__init__() 523 super(_NamedPipeStreamClient, self).__init__(**kwargs)
406 self._name = name 524 self._name = name
407 525
408 @classmethod 526 @classmethod
409 def _create(cls, value): 527 def _create(cls, value, **kwargs):
410 return cls(value) 528 return cls(value, **kwargs)
411 529
412 def _connect_raw(self): 530 def _connect_raw(self):
413 return open(self._name, 'wb') 531 return open(self._name, 'wb')
414 532
415 _default_registry.register_protocol('net.pipe', _NamedPipeStreamClient) 533 _default_registry.register_protocol('net.pipe', _NamedPipeStreamClient)
416 534
417 535
418 class _UnixDomainSocketStreamClient(StreamClient): 536 class _UnixDomainSocketStreamClient(StreamClient):
419 """A StreamClient implementation that uses a UNIX domain socket. 537 """A StreamClient implementation that uses a UNIX domain socket.
420 """ 538 """
421 539
422 class SocketFile(object): 540 class SocketFile(object):
423 """A write-only file-like object that writes to a UNIX socket.""" 541 """A write-only file-like object that writes to a UNIX socket."""
424 542
425 def __init__(self, fd): 543 def __init__(self, fd):
426 self._fd = fd 544 self._fd = fd
427 545
428 def write(self, data): 546 def write(self, data):
429 self._fd.send(data) 547 self._fd.send(data)
430 548
431 def close(self): 549 def close(self):
432 self._fd.close() 550 self._fd.close()
433 551
434 552
435 def __init__(self, path): 553 def __init__(self, path, **kwargs):
436 """Initializes a new UNIX domain socket stream client. 554 """Initializes a new UNIX domain socket stream client.
437 555
438 Args: 556 Args:
439 path (str): The path to the named UNIX domain socket. 557 path (str): The path to the named UNIX domain socket.
440 """ 558 """
441 super(_UnixDomainSocketStreamClient, self).__init__() 559 super(_UnixDomainSocketStreamClient, self).__init__(**kwargs)
442 self._path = path 560 self._path = path
443 561
444 @classmethod 562 @classmethod
445 def _create(cls, value): 563 def _create(cls, value, **kwargs):
446 if not os.path.exists(value): 564 if not os.path.exists(value):
447 raise ValueError('UNIX domain socket [%s] does not exist.' % (value,)) 565 raise ValueError('UNIX domain socket [%s] does not exist.' % (value,))
448 return cls(value) 566 return cls(value, **kwargs)
449 567
450 def _connect_raw(self): 568 def _connect_raw(self):
451 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 569 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
452 sock.connect(self._path) 570 sock.connect(self._path)
453 return self.SocketFile(sock) 571 return self.SocketFile(sock)
454 572
455 _default_registry.register_protocol('unix', _UnixDomainSocketStreamClient) 573 _default_registry.register_protocol('unix', _UnixDomainSocketStreamClient)
OLDNEW
« no previous file with comments | « client/libs/logdog/bootstrap.py ('k') | client/libs/logdog/streamname.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698