--- /dev/null
+from typing import Dict, Any, Optional
+
+import enum
+
+
+JobSpec = Dict[str, Any]
+
+
+class QueueDirection(enum.Enum):
+ IN = 1
+ OUT = 2
+ BIDIR = 3
+
+
+class Job:
+ jid: int
+
+ def bury(self) -> None:
+ raise NotImplementedError()
+
+ def delete(self) -> None:
+ raise NotImplementedError()
+
+ def job_config(self) -> JobSpec:
+ raise NotImplementedError()
+
+
+class JobQueue:
+ def put(self, job_config: JobSpec) -> int:
+ raise NotImplementedError()
+
+ def get(self) -> Optional[Job]:
+ raise NotImplementedError()
+
+ @property
+ def tube(self) -> str:
+ raise NotImplementedError()
+
+ @property
+ def direction(self) -> QueueDirection:
+ raise NotImplementedError()
--- /dev/null
+from typing import Optional, Self
+
+import yaml
+
+from teuthology import beanstalk
+from teuthology.jobqueue import base
+
+
+class Job(base.Job):
+ jid: int
+
+ def __init__(self, job) -> None:
+ self._job = job
+ self.jid = job.jid
+
+ def bury(self) -> None:
+ self._job.bury()
+
+ def delete(self) -> None:
+ self._job.delete()
+
+ def job_config(self) -> base.JobSpec:
+ return yaml.safe_load(self._job.body)
+
+
+class JobQueue(base.JobQueue):
+ def __init__(
+ self, connection, tube: str, direction: base.QueueDirection
+ ) -> None:
+ self._connection = connection
+ self._direction = direction
+ if direction == base.QueueDirection.IN:
+ self._connection.use(tube)
+ self._tube = tube
+ if direction == base.QueueDirection.OUT:
+ self._tube = beanstalk.watch_tube(tube)
+ else:
+ raise ValueError(
+ f'invalid direction for beanstalk job queue: {direction}'
+ )
+
+ def put(self, job_config: base.JobSpec) -> int:
+ if self._direction != base.QueueDirection.IN:
+ raise ValueError('not an input queue')
+ job = yaml.safe_dump(job_config)
+ jid = beanstalk.put(
+ job,
+ ttr=60 * 60 * 24,
+ priority=job_config['priority'],
+ )
+ return jid
+
+ def get(self) -> Optional[Job]:
+ if self._direction != base.QueueDirection.OUT:
+ raise ValueError('not an output queue')
+ return Job(connection.reserve(timeout=60))
+
+ @property
+ def tube(self) -> str:
+ return self._tube
+
+ @property
+ def direction(self) -> base.QueueDirection:
+ return self._direction
+
+ @classmethod
+ def connect(tube: str, direction: base.QueueDirection) -> Self:
+ return cls(beanstalk.connect(), tube, direction)
--- /dev/null
+from teuthology.config import config as teuth_config
+
+from teuthology.jobqueue.base import QueueDirection, JobQueue
+from teuthology.jobqueue import beanstalk, sqlite
+from teuthology.jobqueue import file as fileq
+
+
+def from_backend(
+ backend: str, tube: str, direction: QueueDirection
+) -> JobQueue:
+ if backend == 'beanstalk':
+ return beanstalk.JobQueue.connect(tube, QueueDirection.IN)
+ if backend.startswith('@'):
+ return fileq.JobQueue(backend.lstrip('@'), QueueDirection.IN)
+ if backend.startswith('sqlite:'):
+ return sqlite.JobQueue(backend, tube, QueueDirection.IN)
+ raise ValueError(
+ f"Unexpected queue backend: {backend!r}"
+ " (expected 'beanstalk', '@<path-to-file>',"
+ " or 'sqlite://<path-to-file>'"
+ )
+
+
+def from_config(
+ tube: str, direction: QueueDirection, backend: str = ''
+) -> JobQueue:
+ return from_backend(
+ backend or teuth_config.job_queue_backend, tube, direction
+ )
--- /dev/null
+from typing import Optional
+
+import yaml
+
+from teuthology import beanstalk
+from teuthology.jobqueue import base
+
+
+class JobQueue(base.JobQueue):
+ def __init__(self, path: str, direction: base.QueueDirection) -> None:
+ if direction != base.QueueDirection.IN:
+ raise ValueError('only output supported')
+ self._base_path = path
+ self._count_file_path = f'{path}.count'
+ self._count = 0
+ try:
+ with open(self._count_file_path, 'r') as fh:
+ self._count = int(fh.read() or 0)
+ except FileNotFoundError:
+ pass
+
+ def put(self, job_config: base.JobSpec) -> int:
+ jid = self._count = self._count + 1
+ count_file_path = f'{self._base_path}.count'
+ job_config['job_id'] = str(jid)
+ job = yaml.safe_dump(job_config)
+ with open(self._base_path, 'a') as fh:
+ fh.write('---\n')
+ fh.write(job)
+ with open(self._count_file_path, 'w') as fh:
+ fh.write(str(jid))
+ print(f'Job scheduled with name {job_config["name"]} and ID {jid}')
+
+ @property
+ def tube(self) -> str:
+ return ''
+
+ @property
+ def direction(self) -> base.QueueDirection:
+ return base.QueueDirection.IN
--- /dev/null
+from typing import Optional, Self, Tuple
+
+import json
+import sqlite3
+import time
+
+from teuthology.jobqueue import base
+
+
+class Job(base.Job):
+ jid: int
+
+ def __init__(self, jq: 'JobQueue', jid: int, data: str) -> None:
+ self._jq = jq
+ self.jid = jid
+ self._data = data
+
+ def bury(self) -> None:
+ self.delete()
+
+ def delete(self) -> None:
+ self._jq._delete(self.jid)
+
+ def job_config(self) -> base.JobSpec:
+ return json.loads(self._data)
+
+
+class JobQueue(base.JobQueue):
+ _retry_empty_sec = 30
+
+ def __init__(
+ self, path: str, tube: str, direction: base.QueueDirection
+ ) -> None:
+ self._path = path
+ self._tube = tube
+ # the sqlite job queue is always bidirectional
+ self._direction = base.QueueDirection.BIDIR
+ self._connect()
+ self._create_jobs_table()
+
+ def put(self, job_config: base.JobSpec) -> int:
+ job = json.dumps(job_config)
+ return self._insert(job)
+
+ def get(self) -> Optional[Job]:
+ result = self._select_job()
+ if result is None:
+ time.sleep(self._retry_empty_sec)
+ result = self._select_job()
+ if result is None:
+ return None
+ jid, data = result
+ return Job(self, jid, data)
+
+ @property
+ def tube(self) -> str:
+ return self._tube
+
+ @property
+ def direction(self) -> base.QueueDirection:
+ return self._direction
+
+ def _select_job(self) -> Optional[Tuple[int, str]]:
+ with self._conn:
+ cur = self._conn.cursor()
+ cur.execute(
+ "SELECT rowid,jobdesc FROM jobs ORDER BY rowid LIMIT 1"
+ )
+ rows = [(jid, data) for jid, data in cur.fetchall()]
+ if rows:
+ assert len(rows) == 1
+ return rows[0]
+ return None
+
+ def _insert(self, data: str) -> int:
+ with self._conn:
+ cur = self._conn.cursor()
+ cur.execute("INSERT INTO jobs VALUES (?)", (data,))
+ jid = cur.lastrowid
+ cur.close()
+ return jid
+
+ def _delete(self, jid: int) -> None:
+ with self._conn:
+ self._conn.execute("DELETE FROM jobs WHERE rowid=?", (jid,))
+
+ def _create_jobs_table(self) -> None:
+ try:
+ with self._conn:
+ self._conn.execute("CREATE TABLE jobs (jobdesc TEXT)")
+ except sqlite3.OperationalError:
+ pass
+
+ def _connect(self) -> None:
+ path = self._path
+ if path.startswith('sqlite://'):
+ path = path[9:]
+ if path.startswith('sqlite:'):
+ path = path[7:]
+ self._conn = sqlite3.connect(path)