]> git-server-git.apps.pok.os.sepia.ceph.com Git - teuthology.git/commitdiff
exporter: With no exporter running, metrics collection should no-op 1981/head
authorZack Cerza <zack@redhat.com>
Thu, 25 Jul 2024 20:29:19 +0000 (14:29 -0600)
committerZack Cerza <zack@redhat.com>
Fri, 2 Aug 2024 18:50:37 +0000 (12:50 -0600)
The new import tests exposed a ValueError raised by prometheus_client regarding
duplicate metrics in the multiprocess directory.

Signed-off-by: Zack Cerza <zack@redhat.com>
teuthology/dispatcher/__init__.py
teuthology/dispatcher/supervisor.py
teuthology/exporter.py
teuthology/kill.py
teuthology/provision/__init__.py
teuthology/repo_utils.py
teuthology/report.py
teuthology/run_tasks.py

index e97d17513b3e8bb01143a320d1edc6413ecf696a..15c4003c990ff9c489126365effaf89ce96fcf00 100644 (file)
@@ -313,7 +313,10 @@ def lock_machines(job_config):
     fake_ctx = supervisor.create_fake_context(job_config, block=True)
     machine_type = job_config["machine_type"]
     count = len(job_config['roles'])
-    with exporter.NodeLockingTime.labels(machine_type, count).time():
+    with exporter.NodeLockingTime().time(
+        machine_type=machine_type,
+        count=count,
+    ):
         lock_ops.block_and_lock_machines(
             fake_ctx,
             count,
index 2eb52f6637172c2a34d8eb76bd72eb79530a6e57..d2e86de364ceb4bfefa9743bda735ef749d0c898 100644 (file)
@@ -44,10 +44,10 @@ def main(args):
         # If a job (e.g. from the nop suite) doesn't need nodes, avoid
         # submitting a zero here.
         if node_count:
-            with exporter.NodeReimagingTime.labels(
-                job_config["machine_type"],
-                node_count
-            ).time():
+            with exporter.NodeReimagingTime().time(
+                machine_type=job_config["machine_type"],
+                node_count=node_count,
+            ):
                 reimage(job_config)
         else:
             reimage(job_config)
@@ -57,7 +57,7 @@ def main(args):
     try:
         suite = job_config.get("suite")
         if suite:
-            with exporter.JobTime.labels(suite).time():
+            with exporter.JobTime().time(suite=suite):
                 return run_job(
                     job_config,
                     args.bin_path,
index 4c7c10a4c25b08f7295264eeb1128fb9fa8a4f8d..30aead87567d46da143fd5dabb6814a92bafc04f 100644 (file)
@@ -1,3 +1,4 @@
+import contextlib
 import itertools
 import logging
 import os
@@ -15,7 +16,6 @@ log = logging.getLogger(__name__)
 
 
 PROMETHEUS_MULTIPROC_DIR = Path("~/.cache/teuthology-exporter").expanduser()
-PROMETHEUS_MULTIPROC_DIR.mkdir(parents=True, exist_ok=True)
 os.environ["PROMETHEUS_MULTIPROC_DIR"] = str(PROMETHEUS_MULTIPROC_DIR)
 
 # We can't import prometheus_client until after we set PROMETHEUS_MULTIPROC_DIR
@@ -28,18 +28,17 @@ from prometheus_client import (  # noqa: E402
     CollectorRegistry,
 )
 
-registry = CollectorRegistry()
-multiprocess.MultiProcessCollector(registry)
-
 MACHINE_TYPES = list(config.active_machine_types)
+REGISTRY = None
 
 
 class TeuthologyExporter:
     port = 61764  # int(''.join([str((ord(c) - 100) % 10) for c in "teuth"]))
 
     def __init__(self, interval=60):
-        for file in PROMETHEUS_MULTIPROC_DIR.iterdir():
-            file.unlink()
+        if REGISTRY:
+            for file in PROMETHEUS_MULTIPROC_DIR.iterdir():
+                file.unlink()
         self.interval = interval
         self.metrics = [
             Dispatchers(),
@@ -50,7 +49,8 @@ class TeuthologyExporter:
         self._created_time = time.perf_counter()
 
     def start(self):
-        start_http_server(self.port, registry=registry)
+        if REGISTRY:
+            start_http_server(self.port, registry=REGISTRY)
         self.loop()
 
     def update(self):
@@ -85,24 +85,61 @@ class TeuthologyExporter:
     def restart(self):
         # Use the dispatcher's restart function - note that by using this here,
         # it restarts the exporter, *not* the dispatcher.
-        return teuthology.dispatcher.restart(log=log)
+        if REGISTRY:
+            return teuthology.dispatcher.restart(log=log)
+
+
+class SingletonMeta(type):
+    _instances = {}
+    def __call__(cls, *args, **kwargs):
+        if cls not in cls._instances:
+            instance = super().__call__(*args, **kwargs)
+            cls._instances[cls] = instance
+        return cls._instances[cls]
 
 
-class TeuthologyMetric:
+class TeuthologyMetric(metaclass=SingletonMeta):
     def __init__(self):
-        pass
+        if REGISTRY:
+            self._init()
+
+    def _init(self):
+        raise NotImplementedError
 
     def update(self):
+        if REGISTRY:
+            self._update()
+
+    def _update(self):
+        raise NotImplementedError
+
+    def record(self, **kwargs):
+        if REGISTRY:
+            self._record(**kwargs)
+
+    def _record(self, **_):
+        raise NotImplementedError
+
+    @contextlib.contextmanager
+    def time(self, **labels):
+        if REGISTRY:
+            yield self._time(**labels)
+        else:
+            yield
+
+    def _time(self):
         raise NotImplementedError
 
 
 class Dispatchers(TeuthologyMetric):
-    def __init__(self):
+    def _init(self):
         self.metric = Gauge(
-            "teuthology_dispatchers", "Teuthology Dispatchers", ["machine_type"]
+            "teuthology_dispatchers",
+            "Teuthology Dispatchers",
+            ["machine_type"],
         )
 
-    def update(self):
+    def _update(self):
         dispatcher_procs = teuthology.dispatcher.find_dispatcher_processes()
         for machine_type in MACHINE_TYPES:
             self.metric.labels(machine_type).set(
@@ -111,15 +148,17 @@ class Dispatchers(TeuthologyMetric):
 
 
 class BeanstalkQueue(TeuthologyMetric):
-    def __init__(self):
+    def _init(self):
         self.length = Gauge(
-            "beanstalk_queue_length", "Beanstalk Queue Length", ["machine_type"]
+            "beanstalk_queue_length",
+            "Beanstalk Queue Length",
+            ["machine_type"],
         )
         self.paused = Gauge(
             "beanstalk_queue_paused", "Beanstalk Queue is Paused", ["machine_type"]
         )
 
-    def update(self):
+    def _update(self):
         for machine_type in MACHINE_TYPES:
             queue_stats = beanstalk.stats_tube(beanstalk.connect(), machine_type)
             self.length.labels(machine_type).set(queue_stats["count"])
@@ -127,14 +166,13 @@ class BeanstalkQueue(TeuthologyMetric):
 
 
 class JobProcesses(TeuthologyMetric):
-    def __init__(self):
+    def _init(self):
         self.metric = Gauge(
             "teuthology_job_processes",
             "Teuthology Job Processes",
         )
 
-    def update(self):
-
+    def _update(self):
         attrs = ["pid", "cmdline"]
         total = 0
         for proc in psutil.process_iter(attrs=attrs):
@@ -168,12 +206,14 @@ class JobProcesses(TeuthologyMetric):
 
 
 class Nodes(TeuthologyMetric):
-    def __init__(self):
+    def _init(self):
         self.metric = Gauge(
-            "teuthology_nodes", "Teuthology Nodes", ["machine_type", "locked", "up"]
+            "teuthology_nodes",
+            "Teuthology Nodes",
+            ["machine_type", "locked", "up"],
         )
 
-    def update(self):
+    def _update(self):
         for machine_type in MACHINE_TYPES:
             nodes = list_locks(machine_type=machine_type)
             for up, locked in itertools.product([True, False], [True, False]):
@@ -182,8 +222,8 @@ class Nodes(TeuthologyMetric):
                 )
 
 
-class _JobResults(TeuthologyMetric):
-    def __init__(self):
+class JobResults(TeuthologyMetric):
+    def _init(self):
         self.metric = Counter(
             "teuthology_job_results",
             "Teuthology Job Results",
@@ -191,15 +231,12 @@ class _JobResults(TeuthologyMetric):
         )
 
     # As this is to be used within job processes, we implement record() rather than update()
-    def record(self, machine_type, status):
-        self.metric.labels(machine_type=machine_type, status=status).inc()
-
+    def _record(self, **labels):
+        self.metric.labels(**labels).inc()
 
-JobResults = _JobResults()
 
-
-class _NodeReimagingResults(TeuthologyMetric):
-    def __init__(self):
+class NodeReimagingResults(TeuthologyMetric):
+    def _init(self):
         self.metric = Counter(
             "teuthology_reimaging_results",
             "Teuthology Reimaging Results",
@@ -207,42 +244,104 @@ class _NodeReimagingResults(TeuthologyMetric):
         )
 
     # As this is to be used within job processes, we implement record() rather than update()
-    def record(self, machine_type, status):
-        self.metric.labels(machine_type=machine_type, status=status).inc()
+    def _record(self, **labels):
+        if REGISTRY:
+            self.metric.labels(**labels).inc()
 
 
-NodeReimagingResults = _NodeReimagingResults()
+class NodeLockingTime(TeuthologyMetric):
+    def _init(self):
+        self.metric = Summary(
+            "teuthology_node_locking_duration_seconds",
+            "Time spent waiting to lock nodes",
+            ["machine_type", "count"],
+        )
 
-NodeLockingTime = Summary(
-    "teuthology_node_locking_duration_seconds",
-    "Time spent waiting to lock nodes",
-    ["machine_type", "count"],
-)
+    def _time(self, **labels):
+        yield self.metric.labels(**labels).time()
 
-NodeReimagingTime = Summary(
-    "teuthology_node_reimaging_duration_seconds",
-    "Time spent reimaging nodes",
-    ["machine_type", "count"],
-)
 
-JobTime = Summary(
-    "teuthology_job_duration_seconds",
-    "Time spent executing a job",
-    ["suite"],
-)
+class NodeReimagingTime(TeuthologyMetric):
+    def _init(self):
+        self.metric = Summary(
+            "teuthology_node_reimaging_duration_seconds",
+            "Time spent reimaging nodes",
+            ["machine_type", "count"],
+        )
 
-TaskTime = Summary(
-    "teuthology_task_duration_seconds",
-    "Time spent executing a task",
-    ["name", "phase"],
-)
+    def _time(self, **labels):
+        yield self.metric.labels(**labels).time()
 
-BootstrapTime = Summary(
-    "teuthology_bootstrap_duration_seconds",
-    "Time spent running teuthology's bootstrap script",
-)
 
+class JobTime(TeuthologyMetric):
+    def _init(self):
+        self.metric = Summary(
+            "teuthology_job_duration_seconds",
+            "Time spent executing a job",
+            ["suite"],
+        )
+
+    def _time(self, **labels):
+        yield self.metric.labels(**labels).time()
 
-def main(args):
+
+class TaskTime(TeuthologyMetric):
+    def _init(self):
+        self.metric = Summary(
+            "teuthology_task_duration_seconds",
+            "Time spent executing a task",
+            ["name", "phase"],
+        )
+
+    def _time(self, **labels):
+        yield self.metric.labels(**labels).time()
+
+
+class BootstrapTime(TeuthologyMetric):
+    def _init(self):
+        self.metric = Summary(
+            "teuthology_bootstrap_duration_seconds",
+            "Time spent running teuthology's bootstrap script",
+        )
+
+    def _time(self, **labels):
+        yield self.metric.labels(**labels).time()
+
+
+def find_exporter_process() -> int | None:
+    attrs = ['pid', 'uids', 'cmdline']
+    for proc in psutil.process_iter(attrs=attrs):
+        try:
+            cmdline = proc.info['cmdline']
+        except psutil.AccessDenied:
+            continue
+        pid = proc.info['pid']
+        if not cmdline:
+            continue
+        if not [i for i in cmdline if i.split('/')[-1] == 'teuthology-exporter']:
+            continue
+        if os.getuid() not in proc.info['uids']:
+            continue
+        return pid
+
+
+def main(args) -> int:
+    if pid := find_exporter_process():
+        if os.getpid() != pid:
+            log.error(f"teuthology-exporter is already running as PID {pid}")
+            return 2
     exporter = TeuthologyExporter(interval=int(args["--interval"]))
-    exporter.start()
+    try:
+        exporter.start()
+    except Exception:
+        log.exception("Exporter failed")
+        return 1
+    else:
+        return 0
+
+
+pid = find_exporter_process()
+if pid:
+    PROMETHEUS_MULTIPROC_DIR.mkdir(parents=True, exist_ok=True)
+    REGISTRY = CollectorRegistry()
+    multiprocess.MultiProcessCollector(REGISTRY)
index a18e0ac8ad81361e90c3e540444f492a4bff37f7..137e49080e77a445b846f1f5f8f047f74c0dcca0 100755 (executable)
@@ -95,9 +95,9 @@ def kill_job(run_name, job_id, archive_base=None, owner=None, skip_unlock=False)
         return
     report.try_push_job_info(job_info, dict(status="dead"))
     if 'machine_type' in job_info:
-        teuthology.exporter.JobResults.record(
-            job_info["machine_type"],
-            job_info.get("status", "dead")
+        teuthology.exporter.JobResults().record(
+            machine_type=job_info["machine_type"],
+            status=job_info.get("status", "dead")
         )
     else:
         log.warn(f"Job {job_id} has no machine_type; cannot report via Prometheus")
index 5afc5ed7e17dc1a708e3655966ea5456db8c16fc..48392eabae7c98b18d51fb488d17e635f8db5161 100644 (file)
@@ -48,9 +48,9 @@ def reimage(ctx, machine_name, machine_type):
         # or SystemExit
         raise
     finally:
-        teuthology.exporter.NodeReimagingResults.record(
-            machine_type,
-            status,
+        teuthology.exporter.NodeReimagingResults().record(
+            machine_type=machine_type,
+            status=status,
         )
     return result
 
index 6ab0747ade2db5dd5b47dd922298b817e04c8326..79fd92edaa20456741524318bc4fb11d9212de40 100644 (file)
@@ -440,7 +440,7 @@ def fetch_teuthology(branch, commit=None, lock=True):
 
 
 def bootstrap_teuthology(dest_path):
-    with exporter.BootstrapTime.time():
+    with exporter.BootstrapTime().time():
         log.info("Bootstrapping %s", dest_path)
         # This magic makes the bootstrap script not attempt to clobber an
         # existing virtualenv. But the branch's bootstrap needs to actually
index e5382133b5a8a57d386b63848a8a6946281f85fd..f0a44720170ecbc0abb5e665db2f8374e305a65f 100644 (file)
@@ -474,7 +474,10 @@ def push_job_info(run_name, job_id, job_info, base_uri=None):
     reporter.report_job(run_name, job_id, job_info)
     status = get_status(job_info)
     if status in ["pass", "fail", "dead"] and "machine_type" in job_info:
-        teuthology.exporter.JobResults.record(job_info["machine_type"], status)
+        teuthology.exporter.JobResults().record(
+            machine_type=job_info["machine_type"],
+            status=status,
+        )
 
 
 def try_push_job_info(job_config, extra_info=None):
@@ -584,7 +587,10 @@ def try_mark_run_dead(run_name):
                 log.info("Marking job {job_id} as dead".format(job_id=job_id))
                 reporter.report_job(run_name, job['job_id'], dead=True)
                 if "machine_type" in job:
-                    teuthology.exporter.JobResults.record(job["machine_type"], job["status"])
+                    teuthology.exporter.JobResults().record(
+                        machine_type=job["machine_type"],
+                        status=job["status"],
+                    )
             except report_exceptions:
                 log.exception("Could not mark job as dead: {job_id}".format(
                     job_id=job_id))
index 1af2c682079048b765f49fce8eb73935e3d5a609..267d8fd3f3f854587e0682ae6bade0e04c8996fe 100644 (file)
@@ -105,7 +105,10 @@ def run_tasks(tasks, ctx):
             manager = run_one_task(taskname, ctx=ctx, config=config)
             if hasattr(manager, '__enter__'):
                 stack.append((taskname, manager))
-                with exporter.TaskTime.labels(taskname, "enter").time():
+                with exporter.TaskTime().time(
+                    name=taskname,
+                    phase="enter"
+                ):
                     manager.__enter__()
     except BaseException as e:
         if isinstance(e, ConnectionLostError):
@@ -150,7 +153,10 @@ def run_tasks(tasks, ctx):
                 log.debug('Unwinding manager %s', taskname)
                 timer.mark('%s exit' % taskname)
                 try:
-                    with exporter.TaskTime.labels(taskname, "exit").time():
+                    with exporter.TaskTime().time(
+                        name=taskname,
+                        phase="exit"
+                    ):
                         suppress = manager.__exit__(*exc_info)
                 except Exception as e:
                     if isinstance(e, ConnectionLostError):