+import contextlib
import itertools
import logging
import os
PROMETHEUS_MULTIPROC_DIR = Path("~/.cache/teuthology-exporter").expanduser()
-PROMETHEUS_MULTIPROC_DIR.mkdir(parents=True, exist_ok=True)
os.environ["PROMETHEUS_MULTIPROC_DIR"] = str(PROMETHEUS_MULTIPROC_DIR)
# We can't import prometheus_client until after we set PROMETHEUS_MULTIPROC_DIR
CollectorRegistry,
)
-registry = CollectorRegistry()
-multiprocess.MultiProcessCollector(registry)
-
MACHINE_TYPES = list(config.active_machine_types)
+REGISTRY = None
class TeuthologyExporter:
port = 61764 # int(''.join([str((ord(c) - 100) % 10) for c in "teuth"]))
def __init__(self, interval=60):
- for file in PROMETHEUS_MULTIPROC_DIR.iterdir():
- file.unlink()
+ if REGISTRY:
+ for file in PROMETHEUS_MULTIPROC_DIR.iterdir():
+ file.unlink()
self.interval = interval
self.metrics = [
Dispatchers(),
self._created_time = time.perf_counter()
def start(self):
- start_http_server(self.port, registry=registry)
+ if REGISTRY:
+ start_http_server(self.port, registry=REGISTRY)
self.loop()
def update(self):
def restart(self):
# Use the dispatcher's restart function - note that by using this here,
# it restarts the exporter, *not* the dispatcher.
- return teuthology.dispatcher.restart(log=log)
+ if REGISTRY:
+ return teuthology.dispatcher.restart(log=log)
+
+
+class SingletonMeta(type):
+ _instances = {}
+ def __call__(cls, *args, **kwargs):
+ if cls not in cls._instances:
+ instance = super().__call__(*args, **kwargs)
+ cls._instances[cls] = instance
+ return cls._instances[cls]
-class TeuthologyMetric:
+class TeuthologyMetric(metaclass=SingletonMeta):
def __init__(self):
- pass
+ if REGISTRY:
+ self._init()
+
+ def _init(self):
+ raise NotImplementedError
def update(self):
+ if REGISTRY:
+ self._update()
+
+ def _update(self):
+ raise NotImplementedError
+
+ def record(self, **kwargs):
+ if REGISTRY:
+ self._record(**kwargs)
+
+ def _record(self, **_):
+ raise NotImplementedError
+
+ @contextlib.contextmanager
+ def time(self, **labels):
+ if REGISTRY:
+ yield self._time(**labels)
+ else:
+ yield
+
+ def _time(self):
raise NotImplementedError
class Dispatchers(TeuthologyMetric):
- def __init__(self):
+ def _init(self):
self.metric = Gauge(
- "teuthology_dispatchers", "Teuthology Dispatchers", ["machine_type"]
+ "teuthology_dispatchers",
+ "Teuthology Dispatchers",
+ ["machine_type"],
)
- def update(self):
+ def _update(self):
dispatcher_procs = teuthology.dispatcher.find_dispatcher_processes()
for machine_type in MACHINE_TYPES:
self.metric.labels(machine_type).set(
class BeanstalkQueue(TeuthologyMetric):
- def __init__(self):
+ def _init(self):
self.length = Gauge(
- "beanstalk_queue_length", "Beanstalk Queue Length", ["machine_type"]
+ "beanstalk_queue_length",
+ "Beanstalk Queue Length",
+ ["machine_type"],
)
self.paused = Gauge(
"beanstalk_queue_paused", "Beanstalk Queue is Paused", ["machine_type"]
)
- def update(self):
+ def _update(self):
for machine_type in MACHINE_TYPES:
queue_stats = beanstalk.stats_tube(beanstalk.connect(), machine_type)
self.length.labels(machine_type).set(queue_stats["count"])
class JobProcesses(TeuthologyMetric):
- def __init__(self):
+ def _init(self):
self.metric = Gauge(
"teuthology_job_processes",
"Teuthology Job Processes",
)
- def update(self):
-
+ def _update(self):
attrs = ["pid", "cmdline"]
total = 0
for proc in psutil.process_iter(attrs=attrs):
class Nodes(TeuthologyMetric):
- def __init__(self):
+ def _init(self):
self.metric = Gauge(
- "teuthology_nodes", "Teuthology Nodes", ["machine_type", "locked", "up"]
+ "teuthology_nodes",
+ "Teuthology Nodes",
+ ["machine_type", "locked", "up"],
)
- def update(self):
+ def _update(self):
for machine_type in MACHINE_TYPES:
nodes = list_locks(machine_type=machine_type)
for up, locked in itertools.product([True, False], [True, False]):
)
-class _JobResults(TeuthologyMetric):
- def __init__(self):
+class JobResults(TeuthologyMetric):
+ def _init(self):
self.metric = Counter(
"teuthology_job_results",
"Teuthology Job Results",
)
# As this is to be used within job processes, we implement record() rather than update()
- def record(self, machine_type, status):
- self.metric.labels(machine_type=machine_type, status=status).inc()
-
+ def _record(self, **labels):
+ self.metric.labels(**labels).inc()
-JobResults = _JobResults()
-
-class _NodeReimagingResults(TeuthologyMetric):
- def __init__(self):
+class NodeReimagingResults(TeuthologyMetric):
+ def _init(self):
self.metric = Counter(
"teuthology_reimaging_results",
"Teuthology Reimaging Results",
)
# As this is to be used within job processes, we implement record() rather than update()
- def record(self, machine_type, status):
- self.metric.labels(machine_type=machine_type, status=status).inc()
+ def _record(self, **labels):
+ if REGISTRY:
+ self.metric.labels(**labels).inc()
-NodeReimagingResults = _NodeReimagingResults()
+class NodeLockingTime(TeuthologyMetric):
+ def _init(self):
+ self.metric = Summary(
+ "teuthology_node_locking_duration_seconds",
+ "Time spent waiting to lock nodes",
+ ["machine_type", "count"],
+ )
-NodeLockingTime = Summary(
- "teuthology_node_locking_duration_seconds",
- "Time spent waiting to lock nodes",
- ["machine_type", "count"],
-)
+ def _time(self, **labels):
+ yield self.metric.labels(**labels).time()
-NodeReimagingTime = Summary(
- "teuthology_node_reimaging_duration_seconds",
- "Time spent reimaging nodes",
- ["machine_type", "count"],
-)
-JobTime = Summary(
- "teuthology_job_duration_seconds",
- "Time spent executing a job",
- ["suite"],
-)
+class NodeReimagingTime(TeuthologyMetric):
+ def _init(self):
+ self.metric = Summary(
+ "teuthology_node_reimaging_duration_seconds",
+ "Time spent reimaging nodes",
+ ["machine_type", "count"],
+ )
-TaskTime = Summary(
- "teuthology_task_duration_seconds",
- "Time spent executing a task",
- ["name", "phase"],
-)
+ def _time(self, **labels):
+ yield self.metric.labels(**labels).time()
-BootstrapTime = Summary(
- "teuthology_bootstrap_duration_seconds",
- "Time spent running teuthology's bootstrap script",
-)
+class JobTime(TeuthologyMetric):
+ def _init(self):
+ self.metric = Summary(
+ "teuthology_job_duration_seconds",
+ "Time spent executing a job",
+ ["suite"],
+ )
+
+ def _time(self, **labels):
+ yield self.metric.labels(**labels).time()
-def main(args):
+
+class TaskTime(TeuthologyMetric):
+ def _init(self):
+ self.metric = Summary(
+ "teuthology_task_duration_seconds",
+ "Time spent executing a task",
+ ["name", "phase"],
+ )
+
+ def _time(self, **labels):
+ yield self.metric.labels(**labels).time()
+
+
+class BootstrapTime(TeuthologyMetric):
+ def _init(self):
+ self.metric = Summary(
+ "teuthology_bootstrap_duration_seconds",
+ "Time spent running teuthology's bootstrap script",
+ )
+
+ def _time(self, **labels):
+ yield self.metric.labels(**labels).time()
+
+
+def find_exporter_process() -> int | None:
+ attrs = ['pid', 'uids', 'cmdline']
+ for proc in psutil.process_iter(attrs=attrs):
+ try:
+ cmdline = proc.info['cmdline']
+ except psutil.AccessDenied:
+ continue
+ pid = proc.info['pid']
+ if not cmdline:
+ continue
+ if not [i for i in cmdline if i.split('/')[-1] == 'teuthology-exporter']:
+ continue
+ if os.getuid() not in proc.info['uids']:
+ continue
+ return pid
+
+
+def main(args) -> int:
+ if pid := find_exporter_process():
+ if os.getpid() != pid:
+ log.error(f"teuthology-exporter is already running as PID {pid}")
+ return 2
exporter = TeuthologyExporter(interval=int(args["--interval"]))
- exporter.start()
+ try:
+ exporter.start()
+ except Exception:
+ log.exception("Exporter failed")
+ return 1
+ else:
+ return 0
+
+
+pid = find_exporter_process()
+if pid:
+ PROMETHEUS_MULTIPROC_DIR.mkdir(parents=True, exist_ok=True)
+ REGISTRY = CollectorRegistry()
+ multiprocess.MultiProcessCollector(REGISTRY)