From ffbc1fc2ef8d74764e5f906c3c7cb78c558e99f3 Mon Sep 17 00:00:00 2001 From: John Mulligan Date: Fri, 9 Aug 2024 10:25:58 -0400 Subject: [PATCH] xxx: add a jobqueue abstraction Signed-off-by: John Mulligan --- teuthology/jobqueue/__init__.py | 0 teuthology/jobqueue/base.py | 41 +++++++++++++ teuthology/jobqueue/beanstalk.py | 68 +++++++++++++++++++++ teuthology/jobqueue/choice.py | 29 +++++++++ teuthology/jobqueue/file.py | 40 +++++++++++++ teuthology/jobqueue/sqlite.py | 100 +++++++++++++++++++++++++++++++ 6 files changed, 278 insertions(+) create mode 100644 teuthology/jobqueue/__init__.py create mode 100644 teuthology/jobqueue/base.py create mode 100644 teuthology/jobqueue/beanstalk.py create mode 100644 teuthology/jobqueue/choice.py create mode 100644 teuthology/jobqueue/file.py create mode 100644 teuthology/jobqueue/sqlite.py diff --git a/teuthology/jobqueue/__init__.py b/teuthology/jobqueue/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/teuthology/jobqueue/base.py b/teuthology/jobqueue/base.py new file mode 100644 index 0000000000..9ce40be64a --- /dev/null +++ b/teuthology/jobqueue/base.py @@ -0,0 +1,41 @@ +from typing import Dict, Any, Optional + +import enum + + +JobSpec = Dict[str, Any] + + +class QueueDirection(enum.Enum): + IN = 1 + OUT = 2 + BIDIR = 3 + + +class Job: + jid: int + + def bury(self) -> None: + raise NotImplementedError() + + def delete(self) -> None: + raise NotImplementedError() + + def job_config(self) -> JobSpec: + raise NotImplementedError() + + +class JobQueue: + def put(self, job_config: JobSpec) -> int: + raise NotImplementedError() + + def get(self) -> Optional[Job]: + raise NotImplementedError() + + @property + def tube(self) -> str: + raise NotImplementedError() + + @property + def direction(self) -> QueueDirection: + raise NotImplementedError() diff --git a/teuthology/jobqueue/beanstalk.py b/teuthology/jobqueue/beanstalk.py new file mode 100644 index 0000000000..2876adc12f --- /dev/null +++ b/teuthology/jobqueue/beanstalk.py @@ -0,0 +1,68 @@ +from typing import Optional, Self + +import yaml + +from teuthology import beanstalk +from teuthology.jobqueue import base + + +class Job(base.Job): + jid: int + + def __init__(self, job) -> None: + self._job = job + self.jid = job.jid + + def bury(self) -> None: + self._job.bury() + + def delete(self) -> None: + self._job.delete() + + def job_config(self) -> base.JobSpec: + return yaml.safe_load(self._job.body) + + +class JobQueue(base.JobQueue): + def __init__( + self, connection, tube: str, direction: base.QueueDirection + ) -> None: + self._connection = connection + self._direction = direction + if direction == base.QueueDirection.IN: + self._connection.use(tube) + self._tube = tube + if direction == base.QueueDirection.OUT: + self._tube = beanstalk.watch_tube(tube) + else: + raise ValueError( + f'invalid direction for beanstalk job queue: {direction}' + ) + + def put(self, job_config: base.JobSpec) -> int: + if self._direction != base.QueueDirection.IN: + raise ValueError('not an input queue') + job = yaml.safe_dump(job_config) + jid = beanstalk.put( + job, + ttr=60 * 60 * 24, + priority=job_config['priority'], + ) + return jid + + def get(self) -> Optional[Job]: + if self._direction != base.QueueDirection.OUT: + raise ValueError('not an output queue') + return Job(connection.reserve(timeout=60)) + + @property + def tube(self) -> str: + return self._tube + + @property + def direction(self) -> base.QueueDirection: + return self._direction + + @classmethod + def connect(tube: str, direction: base.QueueDirection) -> Self: + return cls(beanstalk.connect(), tube, direction) diff --git a/teuthology/jobqueue/choice.py b/teuthology/jobqueue/choice.py new file mode 100644 index 0000000000..615c72a2a0 --- /dev/null +++ b/teuthology/jobqueue/choice.py @@ -0,0 +1,29 @@ +from teuthology.config import config as teuth_config + +from teuthology.jobqueue.base import QueueDirection, JobQueue +from teuthology.jobqueue import beanstalk, sqlite +from teuthology.jobqueue import file as fileq + + +def from_backend( + backend: str, tube: str, direction: QueueDirection +) -> JobQueue: + if backend == 'beanstalk': + return beanstalk.JobQueue.connect(tube, QueueDirection.IN) + if backend.startswith('@'): + return fileq.JobQueue(backend.lstrip('@'), QueueDirection.IN) + if backend.startswith('sqlite:'): + return sqlite.JobQueue(backend, tube, QueueDirection.IN) + raise ValueError( + f"Unexpected queue backend: {backend!r}" + " (expected 'beanstalk', '@'," + " or 'sqlite://'" + ) + + +def from_config( + tube: str, direction: QueueDirection, backend: str = '' +) -> JobQueue: + return from_backend( + backend or teuth_config.job_queue_backend, tube, direction + ) diff --git a/teuthology/jobqueue/file.py b/teuthology/jobqueue/file.py new file mode 100644 index 0000000000..779e8d274b --- /dev/null +++ b/teuthology/jobqueue/file.py @@ -0,0 +1,40 @@ +from typing import Optional + +import yaml + +from teuthology import beanstalk +from teuthology.jobqueue import base + + +class JobQueue(base.JobQueue): + def __init__(self, path: str, direction: base.QueueDirection) -> None: + if direction != base.QueueDirection.IN: + raise ValueError('only output supported') + self._base_path = path + self._count_file_path = f'{path}.count' + self._count = 0 + try: + with open(self._count_file_path, 'r') as fh: + self._count = int(fh.read() or 0) + except FileNotFoundError: + pass + + def put(self, job_config: base.JobSpec) -> int: + jid = self._count = self._count + 1 + count_file_path = f'{self._base_path}.count' + job_config['job_id'] = str(jid) + job = yaml.safe_dump(job_config) + with open(self._base_path, 'a') as fh: + fh.write('---\n') + fh.write(job) + with open(self._count_file_path, 'w') as fh: + fh.write(str(jid)) + print(f'Job scheduled with name {job_config["name"]} and ID {jid}') + + @property + def tube(self) -> str: + return '' + + @property + def direction(self) -> base.QueueDirection: + return base.QueueDirection.IN diff --git a/teuthology/jobqueue/sqlite.py b/teuthology/jobqueue/sqlite.py new file mode 100644 index 0000000000..835e1f8c63 --- /dev/null +++ b/teuthology/jobqueue/sqlite.py @@ -0,0 +1,100 @@ +from typing import Optional, Self, Tuple + +import json +import sqlite3 +import time + +from teuthology.jobqueue import base + + +class Job(base.Job): + jid: int + + def __init__(self, jq: 'JobQueue', jid: int, data: str) -> None: + self._jq = jq + self.jid = jid + self._data = data + + def bury(self) -> None: + self.delete() + + def delete(self) -> None: + self._jq._delete(self.jid) + + def job_config(self) -> base.JobSpec: + return json.loads(self._data) + + +class JobQueue(base.JobQueue): + _retry_empty_sec = 30 + + def __init__( + self, path: str, tube: str, direction: base.QueueDirection + ) -> None: + self._path = path + self._tube = tube + # the sqlite job queue is always bidirectional + self._direction = base.QueueDirection.BIDIR + self._connect() + self._create_jobs_table() + + def put(self, job_config: base.JobSpec) -> int: + job = json.dumps(job_config) + return self._insert(job) + + def get(self) -> Optional[Job]: + result = self._select_job() + if result is None: + time.sleep(self._retry_empty_sec) + result = self._select_job() + if result is None: + return None + jid, data = result + return Job(self, jid, data) + + @property + def tube(self) -> str: + return self._tube + + @property + def direction(self) -> base.QueueDirection: + return self._direction + + def _select_job(self) -> Optional[Tuple[int, str]]: + with self._conn: + cur = self._conn.cursor() + cur.execute( + "SELECT rowid,jobdesc FROM jobs ORDER BY rowid LIMIT 1" + ) + rows = [(jid, data) for jid, data in cur.fetchall()] + if rows: + assert len(rows) == 1 + return rows[0] + return None + + def _insert(self, data: str) -> int: + with self._conn: + cur = self._conn.cursor() + cur.execute("INSERT INTO jobs VALUES (?)", (data,)) + jid = cur.lastrowid + cur.close() + return jid + + def _delete(self, jid: int) -> None: + with self._conn: + self._conn.execute("DELETE FROM jobs WHERE rowid=?", (jid,)) + + def _create_jobs_table(self) -> None: + try: + with self._conn: + self._conn.execute("CREATE TABLE jobs (jobdesc TEXT)") + except sqlite3.OperationalError: + pass + + def _connect(self) -> None: + path = self._path + if path.startswith('sqlite://'): + path = path[9:] + if path.startswith('sqlite:'): + path = path[7:] + self._conn = sqlite3.connect(path) -- 2.39.5