]> git-server-git.apps.pok.os.sepia.ceph.com Git - teuthology.git/commitdiff
xxx: use jobqueue abstraction in dispatcher
authorJohn Mulligan <phlogistonjohn@asynchrono.us>
Fri, 9 Aug 2024 14:38:38 +0000 (10:38 -0400)
committerJohn Mulligan <phlogistonjohn@asynchrono.us>
Fri, 9 Aug 2024 15:02:06 +0000 (11:02 -0400)
Signed-off-by: John Mulligan <phlogistonjohn@asynchrono.us>
teuthology/dispatcher/__init__.py

index 59f8ae3279d8e27509e2322d6cc32c0ea8bf6815..c6fe0c87b1de62c928773751150efa31df9e4e88 100644 (file)
@@ -13,7 +13,6 @@ from teuthology import (
     setup_log_file,
     install_except_hook,
     # modules
-    beanstalk,
     exporter,
     report,
     repo_utils,
@@ -24,6 +23,9 @@ from teuthology.exceptions import BranchNotFoundError, CommitNotFoundError, Skip
 from teuthology.lock import ops as lock_ops
 from teuthology.util.time import parse_timestamp
 from teuthology import safepath
+from teuthology.jobqueue.base import QueueDirection
+import teuthology.jobqueue.choice
+
 
 log = logging.getLogger(__name__)
 start_time = datetime.datetime.now(datetime.timezone.utc)
@@ -88,8 +90,7 @@ def main(args):
 
     load_config(archive_dir=archive_dir)
 
-    connection = beanstalk.connect()
-    beanstalk.watch_tube(connection, args.tube)
+    queue = teuthology.jobqueue.choice.from_config(args.tube, QueueDirection.OUT)
     result_proc = None
 
     if teuth_config.teuthology_path is None:
@@ -118,7 +119,7 @@ def main(args):
             if rc is not None:
                 worst_returncode = max([worst_returncode, rc])
                 job_procs.remove(proc)
-        job = connection.reserve(timeout=60)
+        job = queue.get()
         if job is None:
             if args.exit_on_empty_queue and not job_procs:
                 log.info("Queue is empty and no supervisor processes running; exiting!")
@@ -129,8 +130,8 @@ def main(args):
         job.bury()
         job_id = job.jid
         log.info('Reserved job %d', job_id)
-        log.info('Config is: %s', job.body)
-        job_config = yaml.safe_load(job.body)
+        job_config = job.job_config()
+        log.info('Config is: %s', job_config)
         job_config['job_id'] = str(job_id)
 
         if job_config.get('stop_worker'):