Source code for tapes.reporting

from __future__ import print_function
import logging
from threading import Event, Thread

import abc
import six
import tapes


reporting_logger = logging.getLogger('tapes.reporting')


@six.add_metaclass(abc.ABCMeta)
class Reporter(object):
    def __init__(self, registry=None):
        self.registry = registry if registry is not None else tapes._global_registry

    @abc.abstractmethod
    def start(self):
        """Starts reporting."""
        pass

    @abc.abstractmethod
    def stop(self):
        """Stops reporting."""
        pass


@six.add_metaclass(abc.ABCMeta)
[docs]class ScheduledReporter(Reporter): """Super class for scheduled reporters. Handles scheduling via a ``Thread``.""" def __init__(self, interval, registry=None): """Creates a reporter that reports with the given interval. Suitable for push style reporting. Uses a Python ``Thread`` for implementation, so reporting will block your application from doing anything else. :param interval: A ``timedelta`` instance :param registry: The registry to report from, defaults to the global one """ super(ScheduledReporter, self).__init__(registry) self.interval = interval self.thread = None self.termination_event = Event() @abc.abstractmethod
[docs] def report(self): """Override in subclasses. A Python ``Thread`` is used for scheduling, so whatever this ends up doing, it should be pretty fast. """ pass
def start(self): reporting_logger.debug('Starting reporter %s', self.__class__.__name__) def _report(): while True: self.report() terminated = self.termination_event.wait(self.interval.total_seconds()) if terminated: return self.thread = Thread(target=_report) self.thread.start() reporting_logger.debug('Started reporter %s', self.__class__.__name__) def stop(self): reporting_logger.debug('Stopping reporter %s', self.__class__.__name__) self.termination_event.set() self.thread.join() reporting_logger.debug('Stopped reporter %s', self.__class__.__name__)