]> git-server-git.apps.pok.os.sepia.ceph.com Git - teuthology.git/commitdiff
Add PCP task 860/head
authorZack Cerza <zack@redhat.com>
Tue, 19 Apr 2016 18:44:33 +0000 (12:44 -0600)
committerZack Cerza <zack@redhat.com>
Mon, 23 May 2016 23:11:12 +0000 (17:11 -0600)
Signed-off-by: Zack Cerza <zack@redhat.com>
docs/siteconfig.rst
setup.py
teuthology/run.py
teuthology/task/pcp.j2 [new file with mode: 0644]
teuthology/task/pcp.py [new file with mode: 0644]
teuthology/test/task/__init__.py
teuthology/test/task/test_pcp.py [new file with mode: 0644]

index 2dfe6637f017238af97ba77b2202cba6edbf8038..2d41fda468700ed80be6f662f4d4600c23281ab9 100644 (file)
@@ -213,3 +213,6 @@ Here is a sample configuration with many of the options set and documented::
         # The size of each volume, in GB
         #
         size: 10 # GB
+
+    # The host running a [PCP](http://pcp.io/) manager
+    pcp_host: http://pcp.front.sepia.ceph.com:44323/
index 3677be543623ec68265980107562cecbe2e28d79..a058eae5d1d56ec9bc321e90fd2eaf386e6223f9 100644 (file)
--- a/setup.py
+++ b/setup.py
@@ -84,6 +84,7 @@ setup(
                       'python-neutronclient',
                       'prettytable',
                       'libvirt-python',
+                      'python-dateutil',
                       ],
 
 
index dcbb66f5828839a7ebcb2729dcef840bb2faa949..a6e40e76b2628d42ecc7750f36cc020304848011 100644 (file)
@@ -221,6 +221,7 @@ def get_initial_tasks(lock, config, machine_type):
 
     if 'roles' in config:
         init_tasks.extend([
+            {'pcp': None},
             {'selinux': None},
             {'ansible.cephlab': None},
             {'clock.check': None}
diff --git a/teuthology/task/pcp.j2 b/teuthology/task/pcp.j2
new file mode 100644 (file)
index 0000000..fe82611
--- /dev/null
@@ -0,0 +1,15 @@
+<html>
+<head>
+<title>{% if job_id %}job {{ job_id }} {% endif %}performance data</title>
+</head>
+{% for metric in graphs.keys() %}
+{% if mode == 'static' %}
+{% set url = graphs[metric].file.split('/')[-1] %}
+{% else %}
+{% set url = graphs[metric].url %}
+{% endif %}
+<p>{{ metric }}
+<img src="{{ url }}"></img>
+</p>
+{% endfor %}
+</html>
diff --git a/teuthology/task/pcp.py b/teuthology/task/pcp.py
new file mode 100644 (file)
index 0000000..c368936
--- /dev/null
@@ -0,0 +1,327 @@
+# maybe run pcp role?
+import datetime
+import dateutil.tz
+import jinja2
+import logging
+import os
+import requests
+import time
+import urllib
+import urlparse
+
+from teuthology.config import config as teuth_config
+from teuthology.orchestra import run
+
+from teuthology import misc
+
+from . import Task
+
+log = logging.getLogger(__name__)
+
+
+class PCPDataSource(object):
+    def __init__(self, hosts, time_from, time_until='now'):
+        self.hosts = hosts
+        self.time_from = time_from
+        self.time_until = time_until
+
+
+class PCPArchive(PCPDataSource):
+    archive_base_path = '/var/log/pcp/pmlogger'
+    archive_file_extensions = ('0', 'index', 'meta')
+
+    def get_archive_input_dir(self, host):
+        return os.path.join(
+            self.archive_base_path,
+            host,
+        )
+
+    def get_pmlogextract_cmd(self, host):
+        cmd = [
+            'pmlogextract',
+            '-S', self._format_time(self.time_from),
+            '-T', self._format_time(self.time_until),
+            run.Raw(os.path.join(
+                self.get_archive_input_dir(host),
+                '*.0')),
+        ]
+        return cmd
+
+    @staticmethod
+    def _format_time(seconds):
+        if isinstance(seconds, basestring):
+            return seconds
+        return "@ %s" % time.asctime(time.gmtime(seconds))
+
+
+class PCPGrapher(PCPDataSource):
+    _endpoint = '/'
+
+    def __init__(self, hosts, time_from, time_until='now'):
+        super(PCPGrapher, self).__init__(hosts, time_from, time_until)
+        self.base_url = urlparse.urljoin(
+            teuth_config.pcp_host,
+            self._endpoint)
+
+
+class GrafanaGrapher(PCPGrapher):
+    _endpoint = '/grafana/index.html#/dashboard/script/index.js'
+
+    def __init__(self, hosts, time_from, time_until='now', job_id=None):
+        super(GrafanaGrapher, self).__init__(hosts, time_from, time_until)
+        self.job_id = job_id
+
+    def build_graph_url(self):
+        config = dict(
+            hosts=','.join(self.hosts),
+            time_from=self._format_time(self.time_from),
+        )
+        if self.time_until:
+            config['time_to'] = self._format_time(self.time_until)
+        args = urllib.urlencode(config)
+        template = "{base_url}?{args}"
+        return template.format(base_url=self.base_url, args=args)
+
+    @staticmethod
+    def _format_time(seconds):
+        if isinstance(seconds, basestring):
+            return seconds
+        seconds = int(seconds)
+        dt = datetime.datetime.fromtimestamp(seconds, dateutil.tz.tzutc())
+        return dt.strftime('%Y-%m-%dT%H:%M:%S')
+
+
+class GraphiteGrapher(PCPGrapher):
+    metrics = [
+        'kernel.all.load.1 minute',
+        'mem.util.free',
+        'mem.util.used',
+        'network.interface.*.bytes.*',
+        'disk.all.read_bytes',
+        'disk.all.write_bytes',
+    ]
+
+    graph_defaults = dict(
+        width='1200',
+        height='300',
+        hideLegend='false',
+        format='png',
+    )
+    _endpoint = '/graphite/render'
+
+    def __init__(self, hosts, time_from, time_until='now', dest_dir=None,
+                 job_id=None):
+        super(GraphiteGrapher, self).__init__(hosts, time_from, time_until)
+        self.dest_dir = dest_dir
+        self.job_id = job_id
+
+    def build_graph_urls(self):
+        if not hasattr(self, 'graphs'):
+            self.graphs = dict()
+        for metric in self.metrics:
+            metric_dict = self.graphs.get(metric, dict())
+            metric_dict['url'] = self.get_graph_url(metric)
+            self.graphs[metric] = metric_dict
+
+    def _check_dest_dir(self):
+        if not self.dest_dir:
+            raise RuntimeError("Must provide a dest_dir!")
+
+    def write_html(self, mode='dynamic'):
+        self._check_dest_dir()
+        generated_html = self.generate_html(mode=mode)
+        html_path = os.path.join(self.dest_dir, 'pcp.html')
+        with open(html_path, 'w') as f:
+            f.write(generated_html)
+
+    def generate_html(self, mode='dynamic'):
+        self.build_graph_urls()
+        cwd = os.path.dirname(__file__)
+        loader = jinja2.loaders.FileSystemLoader(cwd)
+        env = jinja2.Environment(loader=loader)
+        template = env.get_template('pcp.j2')
+        data = template.render(
+            job_id=self.job_id,
+            graphs=self.graphs,
+            mode=mode,
+        )
+        return data
+
+    def download_graphs(self):
+        self._check_dest_dir()
+        self.build_graph_urls()
+        for metric in self.graphs.keys():
+            url = self.graphs[metric]['url']
+            filename = self._sanitize_metric_name(metric) + '.png'
+            self.graphs[metric]['file'] = graph_path = os.path.join(
+                self.dest_dir,
+                filename,
+            )
+            resp = requests.get(url)
+            if not resp.ok:
+                log.warn(
+                    "Graph download failed with error %s %s: %s",
+                    resp.status_code,
+                    resp.reason,
+                    url,
+                )
+                continue
+            with open(graph_path, 'wb') as f:
+                f.write(resp.content)
+
+    def get_graph_url(self, metric):
+        config = dict(self.graph_defaults)
+        config.update({
+            'from': self.time_from,
+            'until': self.time_until,
+            # urlencode with doseq=True encodes each item as a separate
+            # 'target=' arg
+            'target': self.get_target_globs(metric),
+        })
+        args = urllib.urlencode(config, doseq=True)
+        template = "{base_url}?{args}"
+        return template.format(base_url=self.base_url, args=args)
+
+    def get_target_globs(self, metric=''):
+        globs = ['*{}*'.format(host) for host in self.hosts]
+        if metric:
+            globs = ['{}.{}'.format(glob, metric) for glob in globs]
+        return globs
+
+    @staticmethod
+    def _sanitize_metric_name(metric):
+        result = metric
+        replacements = [
+            (' ', '_'),
+            ('*', '_all_'),
+        ]
+        for rep in replacements:
+            result = result.replace(rep[0], rep[1])
+        return result
+
+
+class PCP(Task):
+    """
+    Collects performance data using PCP during a job.
+
+    Configuration options include:
+        ``graphite``: Whether to render PNG graphs using Graphite (default:
+            True)
+        ``grafana``: Whether to build (and submit to paddles) a link to a
+            dynamic Grafana dashboard containing graphs of performance data
+            (default: True)
+        ``fetch_archives``: Whether to assemble and ship a raw PCP archive
+        containing performance data to the job's output archive (default:
+            False)
+    """
+    enabled = True
+
+    def __init__(self, ctx, config):
+        super(PCP, self).__init__(ctx, config)
+        if teuth_config.get('pcp_host') is None:
+            self.enabled = False
+        self.log = log
+        self.job_id = self.ctx.config.get('job_id')
+        # until the job stops, we may want to render graphs reflecting the most
+        # current data
+        self.stop_time = 'now'
+        self.use_graphite = self.config.get('graphite', True)
+        self.use_grafana = self.config.get('grafana', True)
+        # fetch_archives defaults to False for now because of various bugs in
+        # pmlogextract
+        self.fetch_archives = self.config.get('fetch_archives', False)
+
+    def setup(self):
+        if not self.enabled:
+            return
+        super(PCP, self).setup()
+        self.start_time = int(time.time())
+        log.debug("start_time: %s", self.start_time)
+        self.setup_collectors()
+
+    def setup_collectors(self):
+        log.debug("cluster: %s", self.cluster)
+        hosts = [rem.shortname for rem in self.cluster.remotes.keys()]
+        self.setup_grafana(hosts)
+        self.setup_graphite(hosts)
+        self.setup_archive(hosts)
+
+    def setup_grafana(self, hosts):
+        if self.use_grafana:
+            self.grafana = GrafanaGrapher(
+                hosts=hosts,
+                time_from=self.start_time,
+                time_until=self.stop_time,
+                job_id=self.job_id,
+            )
+
+    def setup_graphite(self, hosts):
+        if not hasattr(self.ctx, 'archive'):
+            self.use_graphite = False
+        if self.use_graphite:
+            out_dir = os.path.join(
+                self.ctx.archive,
+                'pcp',
+                'graphite',
+            )
+            if not os.path.exists(out_dir):
+                os.makedirs(out_dir)
+            self.graphite = GraphiteGrapher(
+                hosts=hosts,
+                time_from=self.start_time,
+                time_until=self.stop_time,
+                dest_dir=out_dir,
+                job_id=self.job_id,
+            )
+
+    def setup_archive(self, hosts):
+        if not hasattr(self.ctx, 'archive'):
+            self.fetch_archives = False
+        if self.fetch_archives:
+            self.archiver = PCPArchive(
+                hosts=hosts,
+                time_from=self.start_time,
+                time_until=self.stop_time,
+            )
+
+    def begin(self):
+        if not self.enabled:
+            return
+        if self.use_grafana:
+            log.info(
+                "PCP+Grafana dashboard: %s",
+                self.grafana.build_graph_url(),
+            )
+        if self.use_graphite:
+            self.graphite.write_html()
+
+    def end(self):
+        if not self.enabled:
+            return
+        self.stop_time = int(time.time())
+        self.setup_collectors()
+        log.debug("stop_time: %s", self.stop_time)
+        if self.use_grafana:
+            grafana_url = self.grafana.build_graph_url()
+            log.info(
+                "PCP+Grafana dashboard: %s",
+                grafana_url,
+            )
+            if hasattr(self.ctx, 'summary'):
+                self.ctx.summary['pcp_grafana_url'] = grafana_url
+        if self.use_graphite:
+            self.graphite.download_graphs()
+            self.graphite.write_html(mode='static')
+        if self.fetch_archives:
+            for remote in self.cluster.remotes.keys():
+                log.info("Copying PCP data into archive...")
+                cmd = self.archiver.get_pmlogextract_cmd(remote.shortname)
+                archive_out_path = os.path.join(
+                    misc.get_testdir(),
+                    'pcp_archive_%s' % remote.shortname,
+                )
+                cmd.append(archive_out_path)
+                remote.run(args=cmd)
+
+
+task = PCP
index 687744667a0f15a6f3ff685de4414af75c78e873..a2b804cf621246e140716083b816a19b7a4ad325 100644 (file)
@@ -28,6 +28,7 @@ class TestTask(object):
         with patch.multiple(
             self.klass,
             begin=DEFAULT,
+            end=DEFAULT,
         ):
             with self.klass(self.ctx, self.task_config) as task:
                 assert task.config['key_1'] == 'overridden'
@@ -40,6 +41,7 @@ class TestTask(object):
         with patch.multiple(
             self.klass,
             begin=DEFAULT,
+            end=DEFAULT,
         ):
             with self.klass(self.ctx, self.task_config) as task:
                 task_hosts = task.cluster.remotes.keys()
@@ -56,6 +58,7 @@ class TestTask(object):
         with patch.multiple(
             self.klass,
             begin=DEFAULT,
+            end=DEFAULT,
         ):
             with raises(RuntimeError):
                 with self.klass(self.ctx, self.task_config):
@@ -71,6 +74,7 @@ class TestTask(object):
         with patch.multiple(
             self.klass,
             begin=DEFAULT,
+            end=DEFAULT,
         ):
             with self.klass(self.ctx, self.task_config) as task:
                 task_hosts = task.cluster.remotes.keys()
@@ -88,6 +92,7 @@ class TestTask(object):
         with patch.multiple(
             self.klass,
             begin=DEFAULT,
+            end=DEFAULT,
         ):
             with self.klass(self.ctx, self.task_config) as task:
                 task_hosts = task.cluster.remotes.keys()
@@ -106,6 +111,7 @@ class TestTask(object):
         with patch.multiple(
             self.klass,
             begin=DEFAULT,
+            end=DEFAULT,
         ):
             with self.klass(self.ctx, self.task_config) as task:
                 task_hosts = task.cluster.remotes.keys()
@@ -125,6 +131,7 @@ class TestTask(object):
         with patch.multiple(
             self.klass,
             begin=DEFAULT,
+            end=DEFAULT,
         ):
             with self.klass(self.ctx, self.task_config) as task:
                 task_hosts = task.cluster.remotes.keys()
@@ -138,6 +145,7 @@ class TestTask(object):
             self.klass,
             setup=DEFAULT,
             begin=DEFAULT,
+            end=DEFAULT,
         ):
             with self.klass(self.ctx, self.task_config) as task:
                 task.setup.assert_called_once_with()
@@ -147,6 +155,7 @@ class TestTask(object):
             self.klass,
             setup=DEFAULT,
             begin=DEFAULT,
+            end=DEFAULT,
         ):
             with self.klass(self.ctx, self.task_config) as task:
                 task.begin.assert_called_once_with()
@@ -168,6 +177,7 @@ class TestTask(object):
             self.klass,
             setup=DEFAULT,
             begin=DEFAULT,
+            end=DEFAULT,
             teardown=DEFAULT,
         ):
             with self.klass(self.ctx, self.task_config) as task:
@@ -186,6 +196,7 @@ class TestTask(object):
             self.klass,
             setup=DEFAULT,
             begin=DEFAULT,
+            end=DEFAULT,
             teardown=fake_teardown,
         ):
             with self.klass(self.ctx, self.task_config):
diff --git a/teuthology/test/task/test_pcp.py b/teuthology/test/task/test_pcp.py
new file mode 100644 (file)
index 0000000..3c0ab6b
--- /dev/null
@@ -0,0 +1,363 @@
+import os
+import urlparse
+
+from mock import patch, DEFAULT, Mock, MagicMock
+from pytest import raises
+
+from teuthology.config import config, FakeNamespace
+from teuthology.orchestra.cluster import Cluster
+from teuthology.orchestra.remote import Remote
+from teuthology.orchestra.run import Raw
+from teuthology.task.pcp import (PCPDataSource, PCPArchive, PCPGrapher,
+                                 GrafanaGrapher, GraphiteGrapher, PCP)
+
+from . import TestTask
+
+pcp_host = 'http://pcp.front.sepia.ceph.com:44323/'
+
+
+class TestPCPDataSource(object):
+    klass = PCPDataSource
+
+    def setup(self):
+        config.pcp_host = pcp_host
+
+    def test_init(self):
+        hosts = ['host1', 'host2']
+        time_from = 'now-2h'
+        time_until = 'now'
+        obj = self.klass(
+            hosts=hosts,
+            time_from=time_from,
+            time_until=time_until,
+        )
+        assert obj.hosts == hosts
+        assert obj.time_from == time_from
+        assert obj.time_until == time_until
+
+
+class TestPCPArchive(TestPCPDataSource):
+    klass = PCPArchive
+
+    def test_get_archive_input_dir(self):
+        hosts = ['host1', 'host2']
+        time_from = 'now-1d'
+        obj = self.klass(
+            hosts=hosts,
+            time_from=time_from,
+        )
+        assert obj.get_archive_input_dir('host1') == \
+            '/var/log/pcp/pmlogger/host1'
+
+    def test_get_pmlogextract_cmd(self):
+        obj = self.klass(
+            hosts=['host1'],
+            time_from='now-3h',
+            time_until='now-1h',
+        )
+        expected = [
+            'pmlogextract',
+            '-S', 'now-3h',
+            '-T', 'now-1h',
+            Raw('/var/log/pcp/pmlogger/host1/*.0'),
+        ]
+        assert obj.get_pmlogextract_cmd('host1') == expected
+
+    def test_format_time(self):
+        assert self.klass._format_time(1462893484) == \
+            '@ Tue May 10 15:18:04 2016'
+
+    def test_format_time_now(self):
+        assert self.klass._format_time('now-1h') == 'now-1h'
+
+
+class TestPCPGrapher(TestPCPDataSource):
+    klass = PCPGrapher
+
+    def test_init(self):
+        hosts = ['host1', 'host2']
+        time_from = 'now-2h'
+        time_until = 'now'
+        obj = self.klass(
+            hosts=hosts,
+            time_from=time_from,
+            time_until=time_until,
+        )
+        assert obj.hosts == hosts
+        assert obj.time_from == time_from
+        assert obj.time_until == time_until
+        expected_url = urlparse.urljoin(config.pcp_host, self.klass._endpoint)
+        assert obj.base_url == expected_url
+
+
+class TestGrafanaGrapher(TestPCPGrapher):
+    klass = GrafanaGrapher
+
+    def test_build_graph_url(self):
+        hosts = ['host1']
+        time_from = 'now-3h'
+        time_until = 'now-1h'
+        obj = self.klass(
+            hosts=hosts,
+            time_from=time_from,
+            time_until=time_until,
+        )
+        base_url = urlparse.urljoin(
+            config.pcp_host,
+            'grafana/index.html#/dashboard/script/index.js',
+        )
+        assert obj.base_url == base_url
+        got_url = obj.build_graph_url()
+        parsed_query = urlparse.parse_qs(got_url.split('?')[1])
+        assert parsed_query['hosts'] == hosts
+        assert len(parsed_query['time_from']) == 1
+        assert parsed_query['time_from'][0] == time_from
+        assert len(parsed_query['time_to']) == 1
+        assert parsed_query['time_to'][0] == time_until
+
+    def test_format_time(self):
+        assert self.klass._format_time(1462893484) == \
+            '2016-05-10T15:18:04'
+
+    def test_format_time_now(self):
+        assert self.klass._format_time('now-1h') == 'now-1h'
+
+
+class TestGraphiteGrapher(TestPCPGrapher):
+    klass = GraphiteGrapher
+
+    def test_build_graph_urls(self):
+        obj = self.klass(
+            hosts=['host1', 'host2'],
+            time_from='now-3h',
+            time_until='now-1h',
+        )
+        expected_urls = [obj.get_graph_url(m) for m in obj.metrics]
+        obj.build_graph_urls()
+        built_urls = []
+        for metric in obj.graphs.keys():
+            built_urls.append(obj.graphs[metric]['url'])
+        assert len(built_urls) == len(expected_urls)
+        assert sorted(built_urls) == sorted(expected_urls)
+
+    def test_check_dest_dir(self):
+        obj = self.klass(
+            hosts=['host1'],
+            time_from='now-3h',
+        )
+        assert obj.dest_dir is None
+        with raises(RuntimeError):
+            obj._check_dest_dir()
+
+    def test_generate_html_dynamic(self):
+        obj = self.klass(
+            hosts=['host1'],
+            time_from='now-3h',
+        )
+        html = obj.generate_html()
+        assert config.pcp_host in html
+
+    def test_download_graphs(self):
+        dest_dir = '/fake/path'
+        obj = self.klass(
+            hosts=['host1'],
+            time_from='now-3h',
+            dest_dir=dest_dir,
+        )
+        _format = obj.graph_defaults.get('format')
+        with patch('teuthology.task.pcp.requests.get', create=True) as m_get:
+            m_resp = Mock()
+            m_resp.ok = True
+            m_get.return_value = m_resp
+            with patch('teuthology.task.pcp.open', create=True) as m_open:
+                m_open.return_value = MagicMock(spec=file)
+                obj.download_graphs()
+        expected_filenames = []
+        for metric in obj.metrics:
+            expected_filenames.append(
+                "{}.{}".format(
+                    os.path.join(
+                        dest_dir,
+                        obj._sanitize_metric_name(metric),
+                    ),
+                    _format,
+                )
+            )
+        graph_filenames = []
+        for metric in obj.graphs.keys():
+            graph_filenames.append(obj.graphs[metric]['file'])
+        assert sorted(graph_filenames) == sorted(expected_filenames)
+
+    def test_generate_html_static(self):
+        obj = self.klass(
+            hosts=['host1'],
+            time_from='now-3h',
+            dest_dir='/fake/path',
+        )
+        with patch('teuthology.task.pcp.requests.get', create=True) as m_get:
+            m_resp = Mock()
+            m_resp.ok = True
+            m_get.return_value = m_resp
+            with patch('teuthology.task.pcp.open', create=True) as m_open:
+                m_open.return_value = MagicMock(spec=file)
+                obj.download_graphs()
+        html = obj.generate_html(mode='static')
+        assert config.pcp_host not in html
+
+    def test_sanitize_metric_name(self):
+        sanitized_metrics = {
+            'foo.bar': 'foo.bar',
+            'foo.*': 'foo._all_',
+            'foo.bar baz': 'foo.bar_baz',
+            'foo.*.bar baz': 'foo._all_.bar_baz',
+        }
+        for in_, out in sanitized_metrics.iteritems():
+            assert self.klass._sanitize_metric_name(in_) == out
+
+    def test_get_target_globs(self):
+        obj = self.klass(
+            hosts=['host1'],
+            time_from='now-3h',
+        )
+        assert obj.get_target_globs() == ['*host1*']
+        assert obj.get_target_globs('a.metric') == ['*host1*.a.metric']
+        obj.hosts.append('host2')
+        assert obj.get_target_globs() == ['*host1*', '*host2*']
+        assert obj.get_target_globs('a.metric') == \
+            ['*host1*.a.metric', '*host2*.a.metric']
+
+
+class TestPCPTask(TestTask):
+    klass = PCP
+    task_name = 'pcp'
+
+    def setup(self):
+        self.ctx = FakeNamespace()
+        self.ctx.cluster = Cluster()
+        self.ctx.cluster.add(Remote('user@remote1'), ['role1'])
+        self.ctx.cluster.add(Remote('user@remote2'), ['role2'])
+        self.ctx.config = dict()
+        self.task_config = dict()
+        config.pcp_host = pcp_host
+
+    def test_init(self):
+        task = self.klass(self.ctx, self.task_config)
+        assert task.stop_time == 'now'
+
+    def test_disabled(self):
+        config.pcp_host = None
+        with self.klass(self.ctx, self.task_config) as task:
+            assert task.enabled is False
+            assert not hasattr(task, 'grafana')
+            assert not hasattr(task, 'graphite')
+            assert not hasattr(task, 'archiver')
+
+    def test_setup(self):
+        with patch.multiple(
+            self.klass,
+            setup_collectors=DEFAULT,
+            begin=DEFAULT,
+            end=DEFAULT,
+        ):
+            with self.klass(self.ctx, self.task_config) as task:
+                task.setup_collectors.assert_called_once_with()
+                assert isinstance(task.start_time, int)
+
+    def test_setup_collectors(self):
+        with patch.multiple(
+            self.klass,
+            begin=DEFAULT,
+            end=DEFAULT,
+        ):
+            with self.klass(self.ctx, self.task_config) as task:
+                assert hasattr(task, 'grafana')
+                assert not hasattr(task, 'graphite')
+                assert not hasattr(task, 'archiver')
+            self.task_config['grafana'] = False
+            with self.klass(self.ctx, self.task_config) as task:
+                assert not hasattr(task, 'grafana')
+
+    @patch('os.makedirs')
+    def test_setup_grafana(self, m_makedirs):
+        with patch.multiple(
+            self.klass,
+            begin=DEFAULT,
+            end=DEFAULT,
+        ):
+            self.ctx.archive = '/fake/path'
+            with self.klass(self.ctx, self.task_config) as task:
+                assert hasattr(task, 'grafana')
+            self.task_config['grafana'] = False
+            with self.klass(self.ctx, self.task_config) as task:
+                assert not hasattr(task, 'grafana')
+
+    @patch('os.makedirs')
+    @patch('teuthology.task.pcp.GraphiteGrapher')
+    def test_setup_graphite(self, m_graphite_grapher, m_makedirs):
+        with patch.multiple(
+            self.klass,
+            begin=DEFAULT,
+            end=DEFAULT,
+        ):
+            with self.klass(self.ctx, self.task_config) as task:
+                assert not hasattr(task, 'graphite')
+            self.task_config['graphite'] = False
+            with self.klass(self.ctx, self.task_config) as task:
+                assert not hasattr(task, 'graphite')
+            self.ctx.archive = '/fake/path'
+            self.task_config['graphite'] = True
+            with self.klass(self.ctx, self.task_config) as task:
+                assert hasattr(task, 'graphite')
+            self.task_config['graphite'] = False
+            with self.klass(self.ctx, self.task_config) as task:
+                assert not hasattr(task, 'graphite')
+
+    @patch('os.makedirs')
+    @patch('teuthology.task.pcp.PCPArchive')
+    def test_setup_archiver(self, m_archive, m_makedirs):
+        with patch.multiple(
+            self.klass,
+            begin=DEFAULT,
+            end=DEFAULT,
+        ):
+            self.task_config['fetch_archives'] = True
+            with self.klass(self.ctx, self.task_config) as task:
+                assert not hasattr(task, 'archiver')
+            self.task_config['fetch_archives'] = False
+            with self.klass(self.ctx, self.task_config) as task:
+                assert not hasattr(task, 'archiver')
+            self.ctx.archive = '/fake/path'
+            self.task_config['fetch_archives'] = True
+            with self.klass(self.ctx, self.task_config) as task:
+                assert hasattr(task, 'archiver')
+            self.task_config['fetch_archives'] = False
+            with self.klass(self.ctx, self.task_config) as task:
+                assert not hasattr(task, 'archiver')
+
+    @patch('os.makedirs')
+    @patch('teuthology.task.pcp.GrafanaGrapher')
+    @patch('teuthology.task.pcp.GraphiteGrapher')
+    def test_begin(self, m_grafana, m_graphite, m_makedirs):
+        with patch.multiple(
+            self.klass,
+            end=DEFAULT,
+        ):
+            with self.klass(self.ctx, self.task_config) as task:
+                task.grafana.build_graph_url.assert_called_once_with()
+            self.task_config['graphite'] = True
+            self.ctx.archive = '/fake/path'
+            with self.klass(self.ctx, self.task_config) as task:
+                task.graphite.write_html.assert_called_once_with()
+
+    @patch('os.makedirs')
+    @patch('teuthology.task.pcp.GrafanaGrapher')
+    @patch('teuthology.task.pcp.GraphiteGrapher')
+    def test_end(self, m_grafana, m_graphite, m_makedirs):
+        with self.klass(self.ctx, self.task_config) as task:
+            pass
+        assert isinstance(task.stop_time, int)
+        return
+        self.task_config['graphite'] = True
+        self.ctx.archive = '/fake/path'
+        with self.klass(self.ctx, self.task_config) as task:
+            task.graphite.write_html.assert_called_once_with()