]> git.apps.os.sepia.ceph.com Git - teuthology.git/commitdiff
xxx: add a jobqueue abstraction
authorJohn Mulligan <phlogistonjohn@asynchrono.us>
Fri, 9 Aug 2024 14:25:58 +0000 (10:25 -0400)
committerJohn Mulligan <phlogistonjohn@asynchrono.us>
Fri, 9 Aug 2024 14:25:58 +0000 (10:25 -0400)
Signed-off-by: John Mulligan <phlogistonjohn@asynchrono.us>
teuthology/jobqueue/__init__.py [new file with mode: 0644]
teuthology/jobqueue/base.py [new file with mode: 0644]
teuthology/jobqueue/beanstalk.py [new file with mode: 0644]
teuthology/jobqueue/choice.py [new file with mode: 0644]
teuthology/jobqueue/file.py [new file with mode: 0644]
teuthology/jobqueue/sqlite.py [new file with mode: 0644]

diff --git a/teuthology/jobqueue/__init__.py b/teuthology/jobqueue/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/teuthology/jobqueue/base.py b/teuthology/jobqueue/base.py
new file mode 100644 (file)
index 0000000..9ce40be
--- /dev/null
@@ -0,0 +1,41 @@
+from typing import Dict, Any, Optional
+
+import enum
+
+
+JobSpec = Dict[str, Any]
+
+
+class QueueDirection(enum.Enum):
+    IN = 1
+    OUT = 2
+    BIDIR = 3
+
+
+class Job:
+    jid: int
+
+    def bury(self) -> None:
+        raise NotImplementedError()
+
+    def delete(self) -> None:
+        raise NotImplementedError()
+
+    def job_config(self) -> JobSpec:
+        raise NotImplementedError()
+
+
+class JobQueue:
+    def put(self, job_config: JobSpec) -> int:
+        raise NotImplementedError()
+
+    def get(self) -> Optional[Job]:
+        raise NotImplementedError()
+
+    @property
+    def tube(self) -> str:
+        raise NotImplementedError()
+
+    @property
+    def direction(self) -> QueueDirection:
+        raise NotImplementedError()
diff --git a/teuthology/jobqueue/beanstalk.py b/teuthology/jobqueue/beanstalk.py
new file mode 100644 (file)
index 0000000..2876adc
--- /dev/null
@@ -0,0 +1,68 @@
+from typing import Optional, Self
+
+import yaml
+
+from teuthology import beanstalk
+from teuthology.jobqueue import base
+
+
+class Job(base.Job):
+    jid: int
+
+    def __init__(self, job) -> None:
+        self._job = job
+        self.jid = job.jid
+
+    def bury(self) -> None:
+        self._job.bury()
+
+    def delete(self) -> None:
+        self._job.delete()
+
+    def job_config(self) -> base.JobSpec:
+        return yaml.safe_load(self._job.body)
+
+
+class JobQueue(base.JobQueue):
+    def __init__(
+        self, connection, tube: str, direction: base.QueueDirection
+    ) -> None:
+        self._connection = connection
+        self._direction = direction
+        if direction == base.QueueDirection.IN:
+            self._connection.use(tube)
+            self._tube = tube
+        if direction == base.QueueDirection.OUT:
+            self._tube = beanstalk.watch_tube(tube)
+        else:
+            raise ValueError(
+                f'invalid direction for beanstalk job queue: {direction}'
+            )
+
+    def put(self, job_config: base.JobSpec) -> int:
+        if self._direction != base.QueueDirection.IN:
+            raise ValueError('not an input queue')
+        job = yaml.safe_dump(job_config)
+        jid = beanstalk.put(
+            job,
+            ttr=60 * 60 * 24,
+            priority=job_config['priority'],
+        )
+        return jid
+
+    def get(self) -> Optional[Job]:
+        if self._direction != base.QueueDirection.OUT:
+            raise ValueError('not an output queue')
+        return Job(connection.reserve(timeout=60))
+
+    @property
+    def tube(self) -> str:
+        return self._tube
+
+    @property
+    def direction(self) -> base.QueueDirection:
+        return self._direction
+
+    @classmethod
+    def connect(tube: str, direction: base.QueueDirection) -> Self:
+        return cls(beanstalk.connect(), tube, direction)
diff --git a/teuthology/jobqueue/choice.py b/teuthology/jobqueue/choice.py
new file mode 100644 (file)
index 0000000..615c72a
--- /dev/null
@@ -0,0 +1,29 @@
+from teuthology.config import config as teuth_config
+
+from teuthology.jobqueue.base import QueueDirection, JobQueue
+from teuthology.jobqueue import beanstalk, sqlite
+from teuthology.jobqueue import file as fileq
+
+
+def from_backend(
+    backend: str, tube: str, direction: QueueDirection
+) -> JobQueue:
+    if backend == 'beanstalk':
+        return beanstalk.JobQueue.connect(tube, QueueDirection.IN)
+    if backend.startswith('@'):
+        return fileq.JobQueue(backend.lstrip('@'), QueueDirection.IN)
+    if backend.startswith('sqlite:'):
+        return sqlite.JobQueue(backend, tube, QueueDirection.IN)
+    raise ValueError(
+        f"Unexpected queue backend: {backend!r}"
+        " (expected 'beanstalk', '@<path-to-file>',"
+        " or 'sqlite://<path-to-file>'"
+    )
+
+
+def from_config(
+    tube: str, direction: QueueDirection, backend: str = ''
+) -> JobQueue:
+    return from_backend(
+        backend or teuth_config.job_queue_backend, tube, direction
+    )
diff --git a/teuthology/jobqueue/file.py b/teuthology/jobqueue/file.py
new file mode 100644 (file)
index 0000000..779e8d2
--- /dev/null
@@ -0,0 +1,40 @@
+from typing import Optional
+
+import yaml
+
+from teuthology import beanstalk
+from teuthology.jobqueue import base
+
+
+class JobQueue(base.JobQueue):
+    def __init__(self, path: str, direction: base.QueueDirection) -> None:
+        if direction != base.QueueDirection.IN:
+            raise ValueError('only output supported')
+        self._base_path = path
+        self._count_file_path = f'{path}.count'
+        self._count = 0
+        try:
+            with open(self._count_file_path, 'r') as fh:
+                self._count = int(fh.read() or 0)
+        except FileNotFoundError:
+            pass
+
+    def put(self, job_config: base.JobSpec) -> int:
+        jid = self._count = self._count + 1
+        count_file_path = f'{self._base_path}.count'
+        job_config['job_id'] = str(jid)
+        job = yaml.safe_dump(job_config)
+        with open(self._base_path, 'a') as fh:
+            fh.write('---\n')
+            fh.write(job)
+        with open(self._count_file_path, 'w') as fh:
+            fh.write(str(jid))
+        print(f'Job scheduled with name {job_config["name"]} and ID {jid}')
+
+    @property
+    def tube(self) -> str:
+        return ''
+
+    @property
+    def direction(self) -> base.QueueDirection:
+        return base.QueueDirection.IN
diff --git a/teuthology/jobqueue/sqlite.py b/teuthology/jobqueue/sqlite.py
new file mode 100644 (file)
index 0000000..835e1f8
--- /dev/null
@@ -0,0 +1,100 @@
+from typing import Optional, Self, Tuple
+
+import json
+import sqlite3
+import time
+
+from teuthology.jobqueue import base
+
+
+class Job(base.Job):
+    jid: int
+
+    def __init__(self, jq: 'JobQueue', jid: int, data: str) -> None:
+        self._jq = jq
+        self.jid = jid
+        self._data = data
+
+    def bury(self) -> None:
+        self.delete()
+
+    def delete(self) -> None:
+        self._jq._delete(self.jid)
+
+    def job_config(self) -> base.JobSpec:
+        return json.loads(self._data)
+
+
+class JobQueue(base.JobQueue):
+    _retry_empty_sec = 30
+
+    def __init__(
+        self, path: str, tube: str, direction: base.QueueDirection
+    ) -> None:
+        self._path = path
+        self._tube = tube
+        # the sqlite job queue is always bidirectional
+        self._direction = base.QueueDirection.BIDIR
+        self._connect()
+        self._create_jobs_table()
+
+    def put(self, job_config: base.JobSpec) -> int:
+        job = json.dumps(job_config)
+        return self._insert(job)
+
+    def get(self) -> Optional[Job]:
+        result = self._select_job()
+        if result is None:
+            time.sleep(self._retry_empty_sec)
+            result = self._select_job()
+            if result is None:
+                return None
+        jid, data = result
+        return Job(self, jid, data)
+
+    @property
+    def tube(self) -> str:
+        return self._tube
+
+    @property
+    def direction(self) -> base.QueueDirection:
+        return self._direction
+
+    def _select_job(self) -> Optional[Tuple[int, str]]:
+        with self._conn:
+            cur = self._conn.cursor()
+            cur.execute(
+                "SELECT rowid,jobdesc FROM jobs ORDER BY rowid LIMIT 1"
+            )
+            rows = [(jid, data) for jid, data in cur.fetchall()]
+        if rows:
+            assert len(rows) == 1
+            return rows[0]
+        return None
+
+    def _insert(self, data: str) -> int:
+        with self._conn:
+            cur = self._conn.cursor()
+            cur.execute("INSERT INTO jobs VALUES (?)", (data,))
+            jid = cur.lastrowid
+            cur.close()
+        return jid
+
+    def _delete(self, jid: int) -> None:
+        with self._conn:
+            self._conn.execute("DELETE FROM jobs WHERE rowid=?", (jid,))
+
+    def _create_jobs_table(self) -> None:
+        try:
+            with self._conn:
+                self._conn.execute("CREATE TABLE jobs (jobdesc TEXT)")
+        except sqlite3.OperationalError:
+            pass
+
+    def _connect(self) -> None:
+        path = self._path
+        if path.startswith('sqlite://'):
+            path = path[9:]
+        if path.startswith('sqlite:'):
+            path = path[7:]
+        self._conn = sqlite3.connect(path)