Source code for tapes.distributed.registry
from multiprocessing import Process
import functools
import zmq
from ..registry import Registry, BaseRegistry
from .meter import MeterProxy
from .counter import CounterProxy
from .message import Message
from .timer import TimerProxy
from .histogram import HistogramProxy
_DEFAULT_IPC = 'ipc://tapes_metrics.ipc'
def _registry_aggregator(reporter, socket_addr):
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.bind(socket_addr)
socket.set_hwm(0)
socket.setsockopt_string(zmq.SUBSCRIBE, u'')
registry = Registry()
reporter.registry = registry
reporter.start()
while True:
type_, name, value = socket.recv_json()
if type_ == 'meter':
registry.meter(name).mark(value)
elif type_ == 'timer':
registry.timer(name).update(value)
elif type_ == 'counter':
registry.counter(name).increment(value)
elif type_ == 'histogram':
registry.histogram(name).update(value)
elif type_ == 'shutdown':
reporter.stop()
socket.unbind(socket_addr)
socket.close()
context.destroy()
[docs]class RegistryAggregator(object):
"""Aggregates multiple registry proxies and reports on the unified metrics."""
def __init__(self, reporter, socket_addr=_DEFAULT_IPC):
"""Constructs a metrics registry aggregator.
The ``registry`` field on the ``reporter`` argument will be reset to an implementation instance prior to
calling ``start()``. Any previously set registry is not guaranteed to be used.
:param reporter: the reporter to use
:param socket_addr: the 0MQ socket address; has to be the same as corresponding proxies'
"""
super(RegistryAggregator, self).__init__()
self.socket_addr = socket_addr
self.reporter = reporter
self.process = None
[docs] def start(self, fork=True):
"""Starts the registry aggregator.
:param fork: whether to fork a process; if ``False``, blocks and stays in the existing process
"""
if not fork:
_registry_aggregator(self.reporter, self.socket_addr)
else:
p = Process(target=_registry_aggregator, args=(self.reporter, self.socket_addr, ))
p.start()
self.process = p
[docs] def stop(self):
"""Terminates the forked process.
Only valid if started as a fork, because... well you wouldn't get here otherwise.
:return:
"""
self.process.terminate()
self.process.join()
[docs]class DistributedRegistry(BaseRegistry):
"""A registry proxy that pushes metrics data to a ``RegistryAggregator``."""
def __init__(self, socket_addr=_DEFAULT_IPC):
"""
:param socket_addr: the 0MQ IPC socket address; has to be the same as corresponding aggregator's
"""
super(DistributedRegistry, self).__init__()
self.stats = dict()
self.socket_addr = socket_addr
self.zmq_context = None
self.socket = None
def meter(self, name):
return self._get_or_add_stat(name, functools.partial(MeterProxy, self.socket, name))
def timer(self, name):
return self._get_or_add_stat(name, functools.partial(TimerProxy, self.socket, name))
def gauge(self, name, producer):
raise NotImplementedError('Gauge is unavailable in distributed mode')
def counter(self, name):
return self._get_or_add_stat(name, functools.partial(CounterProxy, self.socket, name))
def histogram(self, name):
return self._get_or_add_stat(name, functools.partial(HistogramProxy, self.socket, name))
[docs] def connect(self):
"""Connects to the 0MQ socket and starts publishing."""
self.zmq_context = zmq.Context()
sock = self.zmq_context.socket(zmq.PUB)
sock.set_hwm(0)
sock.setsockopt(zmq.LINGER, 0)
sock.connect(self.socket_addr)
def _reset_socket(values):
for value in values:
try:
_reset_socket(value.values())
except AttributeError:
value.socket = sock
_reset_socket(self.stats.values())
self.socket = sock
def close(self):
self.socket.send_json(Message('shutdown', 'noname', -1))
self.socket.disconnect(self.socket_addr)
self.socket.close()
self.zmq_context.destroy()