]> git.apps.os.sepia.ceph.com Git - teuthology.git/commitdiff
Add beanstalk as a possible queue backend for Teuthology Jobs along with Paddles
authorAishwarya Mathuria <amathuri@redhat.com>
Mon, 4 Oct 2021 13:54:41 +0000 (19:24 +0530)
committerAishwarya Mathuria <amathuri@redhat.com>
Fri, 3 Jun 2022 14:35:24 +0000 (20:05 +0530)
With the use of the --queue-backend argument the user can specify which backend(paddles/beanstalk) they would like to use for maintaining the teuthology Jobs queue.
In order to avoid overlapping Job IDs, when a job is being scheduled in beanstalk it is also written to paddles which returns a unique ID.
This is the ID teuthology will treat as the Job ID throughout the run of the job.

To differentiate between the 2 queue backends, the teuthology-queue command has been split into teuthology-paddles-queue command and teuthology-beanstalk-queue command.

Signed-off-by: Aishwarya Mathuria <amathuri@redhat.com>
15 files changed:
scripts/beanstalk_queue.py [new file with mode: 0644]
scripts/dispatcher.py
scripts/paddles_queue.py [new file with mode: 0644]
scripts/queue.py [deleted file]
teuthology/beanstalk.py [new file with mode: 0644]
teuthology/config.py
teuthology/dispatcher/__init__.py
teuthology/dispatcher/supervisor.py
teuthology/kill.py
teuthology/orchestra/run.py
teuthology/report.py
teuthology/schedule.py
teuthology/test/test_dispatcher.py
teuthology/test/test_worker.py
teuthology/worker.py

diff --git a/scripts/beanstalk_queue.py b/scripts/beanstalk_queue.py
new file mode 100644 (file)
index 0000000..88a8242
--- /dev/null
@@ -0,0 +1,35 @@
+import docopt
+
+import teuthology.config
+import teuthology.beanstalk
+
+doc = """
+usage: teuthology-beanstalk-queue -h
+       teuthology-beanstalk-queue [-s|-d|-f] -m MACHINE_TYPE
+       teuthology-beanstalk-queue [-r] -m MACHINE_TYPE
+       teuthology-beanstalk-queue -m MACHINE_TYPE -D PATTERN
+       teuthology-beanstalk-queue -p SECONDS [-m MACHINE_TYPE]
+List Jobs in queue.
+If -D is passed, then jobs with PATTERN in the job name are deleted from the
+queue.
+Arguments:
+  -m, --machine_type MACHINE_TYPE [default: multi]
+                        Which machine type queue to work on.
+optional arguments:
+  -h, --help            Show this help message and exit
+  -D, --delete PATTERN  Delete Jobs with PATTERN in their name
+  -d, --description     Show job descriptions
+  -r, --runs            Only show run names
+  -f, --full            Print the entire job config. Use with caution.
+  -s, --status          Prints the status of the queue
+  -p, --pause SECONDS   Pause queues for a number of seconds. A value of 0
+                        will unpause. If -m is passed, pause that queue,
+                        otherwise pause all queues.
+"""
+
+
+def main():
+
+    args = docopt.docopt(doc)
+    print(args)
+    teuthology.beanstalk.main(args)
index 8a67587ed8f76718ac66569b6ea164c476d467ee..5e64b382d84d2f032dca0343dcd897225e48d09b 100644 (file)
@@ -1,9 +1,9 @@
 """
 usage: teuthology-dispatcher --help
        teuthology-dispatcher --supervisor [-v] --bin-path BIN_PATH --job-config CONFIG --archive-dir DIR
-       teuthology-dispatcher [-v] [--archive-dir DIR] --log-dir LOG_DIR --machine-type MACHINE_TYPE
+       teuthology-dispatcher [-v] [--archive-dir DIR] --log-dir LOG_DIR --machine-type MACHINE_TYPE --queue-backend BACKEND
 
-Start a dispatcher for the specified machine type. Grab jobs from a paddles
+Start a dispatcher for the specified machine type. Grab jobs from a paddles/beanstalk
 queue and run the teuthology tests they describe as subprocesses. The
 subprocess invoked is a teuthology-dispatcher command run in supervisor
 mode.
@@ -22,6 +22,7 @@ standard arguments:
   --bin-path BIN_PATH            teuthology bin path
   --job-config CONFIG            file descriptor of job's config file
   --exit-on-empty-queue          if the queue is empty, exit
+  --queue-backend BACKEND        choose between paddles and beanstalk
 """
 
 import docopt
diff --git a/scripts/paddles_queue.py b/scripts/paddles_queue.py
new file mode 100644 (file)
index 0000000..3c69d77
--- /dev/null
@@ -0,0 +1,46 @@
+import docopt
+
+import teuthology.config
+import teuthology.paddles_queue
+
+doc = """
+usage: teuthology-paddles-queue -h
+       teuthology-paddles-queue -s -m MACHINE_TYPE
+       teuthology-paddles-queue [-d|-f] -m MACHINE_TYPE -U USER
+       teuthology-paddles-queue -m MACHINE_TYPE -P PRIORITY [-U USER|-R RUN_NAME]
+       teuthology-paddles-queue [-r] -m MACHINE_TYPE -U USER
+       teuthology-paddles-queue -m MACHINE_TYPE -D PATTERN -U USER
+       teuthology-paddles-queue -p [-t SECONDS] -m MACHINE_TYPE -U USER
+       teuthology-paddles-queue -u -m MACHINE_TYPE -U USER
+
+List Jobs in queue.
+If -D is passed, then jobs with PATTERN in the job name are deleted from the
+queue.
+
+Arguments:
+  -m, --machine_type MACHINE_TYPE
+                        Which machine type queue to work on.
+
+optional arguments:
+  -h, --help            Show this help message and exit
+  -D, --delete PATTERN  Delete Jobs with PATTERN in their name
+  -d, --description     Show job descriptions
+  -r, --runs            Only show run names
+  -f, --full            Print the entire job config. Use with caution.
+  -s, --status          Prints the status of the queue
+  -t, --time SECONDS    Pause queues for a number of seconds.
+                        If -m is passed, pause that queue,
+                        otherwise pause all queues.
+  -p, --pause           Pause queue
+  -u, --unpause         Unpause queue
+  -P, --priority PRIORITY
+                        Change priority of queued jobs
+  -U, --user USER       User who owns the jobs
+  -R, --run-name RUN_NAME
+                        Used to change priority of all jobs in the run.
+"""
+
+
+def main():
+    args = docopt.docopt(doc)
+    teuthology.paddles_queue.main(args)
diff --git a/scripts/queue.py b/scripts/queue.py
deleted file mode 100644 (file)
index a07598a..0000000
+++ /dev/null
@@ -1,46 +0,0 @@
-import docopt
-
-import teuthology.config
-import teuthology.paddles_queue
-
-doc = """
-usage: teuthology-queue -h
-       teuthology-queue -s -m MACHINE_TYPE
-       teuthology-queue [-d|-f] -m MACHINE_TYPE -U USER
-       teuthology-queue -m MACHINE_TYPE -P PRIORITY [-U USER|-R RUN_NAME]
-       teuthology-queue [-r] -m MACHINE_TYPE -U USER
-       teuthology-queue -m MACHINE_TYPE -D PATTERN -U USER
-       teuthology-queue -p [-t SECONDS] -m MACHINE_TYPE -U USER
-       teuthology-queue -u -m MACHINE_TYPE -U USER
-
-List Jobs in queue.
-If -D is passed, then jobs with PATTERN in the job name are deleted from the
-queue.
-
-Arguments:
-  -m, --machine_type MACHINE_TYPE
-                        Which machine type queue to work on.
-
-optional arguments:
-  -h, --help            Show this help message and exit
-  -D, --delete PATTERN  Delete Jobs with PATTERN in their name
-  -d, --description     Show job descriptions
-  -r, --runs            Only show run names
-  -f, --full            Print the entire job config. Use with caution.
-  -s, --status          Prints the status of the queue
-  -t, --time SECONDS    Pause queues for a number of seconds.
-                        If -m is passed, pause that queue,
-                        otherwise pause all queues.
-  -p, --pause           Pause queue
-  -u, --unpause         Unpause queue
-  -P, --priority PRIORITY
-                        Change priority of queued jobs
-  -U, --user USER       User who owns the jobs
-  -R, --run_name RUN_NAME
-                        Used to change priority of all jobs in the run.
-"""
-
-
-def main():
-    args = docopt.docopt(doc)
-    teuthology.paddles_queue.main(args)
diff --git a/teuthology/beanstalk.py b/teuthology/beanstalk.py
new file mode 100644 (file)
index 0000000..a1165be
--- /dev/null
@@ -0,0 +1,214 @@
+import beanstalkc
+import yaml
+import logging
+import pprint
+import sys
+from collections import OrderedDict
+
+from teuthology.config import config
+from teuthology import report
+
+log = logging.getLogger(__name__)
+
+
+def connect():
+    host = config.queue_host
+    port = config.queue_port
+    if host is None or port is None:
+        raise RuntimeError(
+            'Beanstalk queue information not found in {conf_path}'.format(
+                conf_path=config.teuthology_yaml))
+    return beanstalkc.Connection(host=host, port=port)
+
+
+def watch_tube(connection, tube_name):
+    """
+    Watch a given tube, potentially correcting to 'multi' if necessary. Returns
+    the tube_name that was actually used.
+    """
+    if ',' in tube_name:
+        log.debug("Correcting tube name to 'multi'")
+        tube_name = 'multi'
+    connection.watch(tube_name)
+    connection.ignore('default')
+    return tube_name
+
+
+def walk_jobs(connection, tube_name, processor, pattern=None):
+    """
+    def callback(jobs_dict)
+    """
+    log.info("Checking Beanstalk Queue...")
+    job_count = connection.stats_tube(tube_name)['current-jobs-ready']
+    if job_count == 0:
+        log.info('No jobs in Beanstalk Queue')
+        return
+
+    # Try to figure out a sane timeout based on how many jobs are in the queue
+    timeout = job_count / 2000.0 * 60
+    for i in range(1, job_count + 1):
+        print_progress(i, job_count, "Loading")
+        job = connection.reserve(timeout=timeout)
+        if job is None or job.body is None:
+            continue
+        job_config = yaml.safe_load(job.body)
+        job_name = job_config['name']
+        job_id = job.stats()['id']
+        if pattern is not None and pattern not in job_name:
+            continue
+        processor.add_job(job_id, job_config, job)
+    end_progress()
+    processor.complete()
+
+
+def print_progress(index, total, message=None):
+    msg = "{m} ".format(m=message) if message else ''
+    sys.stderr.write("{msg}{i}/{total}\r".format(
+        msg=msg, i=index, total=total))
+    sys.stderr.flush()
+
+
+def end_progress():
+    sys.stderr.write('\n')
+    sys.stderr.flush()
+
+
+class JobProcessor(object):
+    def __init__(self):
+        self.jobs = OrderedDict()
+
+    def add_job(self, job_id, job_config, job_obj=None):
+        job_id = str(job_id)
+
+        job_dict = dict(
+            index=(len(self.jobs) + 1),
+            job_config=job_config,
+        )
+        if job_obj:
+            job_dict['job_obj'] = job_obj
+        self.jobs[job_id] = job_dict
+
+        self.process_job(job_id)
+
+    def process_job(self, job_id):
+        pass
+
+    def complete(self):
+        pass
+
+
+class JobPrinter(JobProcessor):
+    def __init__(self, show_desc=False, full=False):
+        super(JobPrinter, self).__init__()
+        self.show_desc = show_desc
+        self.full = full
+
+    def process_job(self, job_id):
+        job_config = self.jobs[job_id]['job_config']
+        job_index = self.jobs[job_id]['index']
+        job_priority = job_config['priority']
+        job_name = job_config['name']
+        job_desc = job_config['description']
+        print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format(
+            i=job_index,
+            pri=job_priority,
+            job_id=job_id,
+            job_name=job_name,
+            ))
+        if self.full:
+            pprint.pprint(job_config)
+        elif job_desc and self.show_desc:
+            for desc in job_desc.split():
+                print('\t {}'.format(desc))
+
+
+class RunPrinter(JobProcessor):
+    def __init__(self):
+        super(RunPrinter, self).__init__()
+        self.runs = list()
+
+    def process_job(self, job_id):
+        run = self.jobs[job_id]['job_config']['name']
+        if run not in self.runs:
+            self.runs.append(run)
+            print(run)
+
+
+class JobDeleter(JobProcessor):
+    def __init__(self, pattern):
+        self.pattern = pattern
+        super(JobDeleter, self).__init__()
+
+    def add_job(self, job_id, job_config, job_obj=None):
+        job_name = job_config['name']
+        if self.pattern in job_name:
+            super(JobDeleter, self).add_job(job_id, job_config, job_obj)
+
+    def process_job(self, job_id):
+        job_config = self.jobs[job_id]['job_config']
+        job_name = job_config['name']
+        print('Deleting {job_name}/{job_id}'.format(
+            job_id=job_id,
+            job_name=job_name,
+            ))
+        job_obj = self.jobs[job_id].get('job_obj')
+        if job_obj:
+            job_obj.delete()
+        report.try_delete_jobs(job_name, job_id)
+
+
+def pause_tube(connection, tube, duration):
+    duration = int(duration)
+    if not tube:
+        tubes = sorted(connection.tubes())
+    else:
+        tubes = [tube]
+
+    prefix = 'Unpausing' if duration == 0 else "Pausing for {dur}s"
+    templ = prefix + ": {tubes}"
+    log.info(templ.format(dur=duration, tubes=tubes))
+    for tube in tubes:
+        connection.pause_tube(tube, duration)
+
+
+def stats_tube(connection, tube):
+    stats = connection.stats_tube(tube)
+    result = dict(
+        name=tube,
+        count=stats['current-jobs-ready'],
+        paused=(stats['pause'] != 0),
+    )
+    return result
+
+
+def main(args):
+    machine_type = args['--machine_type']
+    status = args['--status']
+    delete = args['--delete']
+    runs = args['--runs']
+    show_desc = args['--description']
+    full = args['--full']
+    pause_duration = args['--pause']
+    try:
+        connection = connect()
+        if machine_type and not pause_duration:
+            # watch_tube needs to be run before we inspect individual jobs;
+            # it is not needed for pausing tubes
+            watch_tube(connection, machine_type)
+        if status:
+            print(stats_tube(connection, machine_type))
+        elif pause_duration:
+            pause_tube(connection, machine_type, pause_duration)
+        elif delete:
+            walk_jobs(connection, machine_type,
+                      JobDeleter(delete))
+        elif runs:
+            walk_jobs(connection, machine_type,
+                      RunPrinter())
+        else:
+            walk_jobs(connection, machine_type,
+                      JobPrinter(show_desc=show_desc, full=full))
+    except KeyboardInterrupt:
+        log.info("Interrupted.")
+    finally:
+        connection.close()
index 6da6cdd7f14131cec886fc3145b064b1fbc9eda6..b42ff48d1a91a29442739da95740752f82d6fab3 100644 (file)
@@ -143,6 +143,7 @@ class TeuthologyConfig(YamlConfig):
         'archive_upload_key': None,
         'archive_upload_url': None,
         'automated_scheduling': False,
+        'backend': 'paddles',
         'reserve_machines': 5,
         'ceph_git_base_url': 'https://github.com/ceph/',
         'ceph_git_url': None,
index 603ec1ce725a349b196cd30aaa578d889d88aa7a..f80818cf149635fd59a7e16edc07cbc7def13fc3 100644 (file)
@@ -8,6 +8,7 @@ from datetime import datetime
 from time import sleep
 
 from teuthology import setup_log_file, install_except_hook
+from teuthology import beanstalk
 from teuthology import report
 from teuthology.config import config as teuth_config
 from teuthology.exceptions import SkipJob
@@ -57,6 +58,8 @@ def load_config(archive_dir=None):
 def clean_config(config):
     result = {}
     for key in config:
+        if key == 'status':
+            continue
         if config[key] is not None:
             result[key] = config[key]
     return result
@@ -70,7 +73,11 @@ def main(args):
     machine_type = args["--machine-type"]
     log_dir = args["--log-dir"]
     archive_dir = args["--archive-dir"]
+<<<<<<< HEAD
     exit_on_empty_queue = args["--exit-on-empty-queue"]
+=======
+    backend = args['--queue-backend']
+>>>>>>> 79c4d9bf... Add beanstalk as a possible queue backend for Teuthology Jobs along with Paddles
 
     if archive_dir is None:
         archive_dir = teuth_config.archive_base
@@ -88,6 +95,10 @@ def main(args):
 
     load_config(archive_dir=archive_dir)
 
+    if backend == 'beanstalk':
+        connection = beanstalk.connect()
+        beanstalk.watch_tube(connection, machine_type)
+
     result_proc = None
 
     if teuth_config.teuthology_path is None:
@@ -111,19 +122,26 @@ def main(args):
 
         load_config()
         job_procs = set(filter(lambda p: p.poll() is None, job_procs))
-        job = report.get_queued_job(machine_type)
-        if job is None:
-            if exit_on_empty_queue and not job_procs:
-                log.info("Queue is empty and no supervisor processes running; exiting!")
-                break
-            continue
-        job = clean_config(job)
-        report.try_push_job_info(job, dict(status='running'))
-        job_id = job.get('job_id')
-        log.info('Reserved job %s', job_id)
-        log.info('Config is: %s', job)
-        job_config = job
-        
+        if backend == 'beanstalk':
+            job = connection.reserve(timeout=60)
+            if job is None:
+                continue
+            job.bury()
+            job_config = yaml.safe_load(job.body)
+            job_id = job_config.get('job_id')
+            log.info('Reserved job %s', job_id)
+            log.info('Config is: %s', job.body)
+        else:
+            job = report.get_queued_job(machine_type)
+            if job is None:
+                continue
+            job = clean_config(job)
+            report.try_push_job_info(job, dict(status='running'))
+            job_id = job.get('job_id')
+            log.info('Reserved job %s', job_id)
+            log.info('Config is: %s', job)
+            job_config = job
+
         if job_config.get('stop_worker'):
             keep_running = False
 
@@ -174,6 +192,13 @@ def main(args):
                 status='fail',
                 failure_reason=error_message))
 
+        # This try/except block is to keep the worker from dying when
+        # beanstalkc throws a SocketError
+        if backend == 'beanstalk':
+            try:
+                job.delete()
+            except Exception:
+                log.exception("Saw exception while trying to delete job")
 
     returncodes = set([0])
     for proc in job_procs:
@@ -201,7 +226,7 @@ def create_job_archive(job_name, job_archive_path, archive_dir):
 
 
 def pause_queue(machine_type, paused, paused_by, pause_duration=None):
-    if paused == True:
+    if paused:
         report.pause_queue(machine_type, paused, paused_by, pause_duration)
         '''
         If there is a pause duration specified
@@ -211,5 +236,5 @@ def pause_queue(machine_type, paused, paused_by, pause_duration=None):
             sleep(int(pause_duration))
             paused = False
             report.pause_queue(machine_type, paused, paused_by)
-    elif paused == False:
+    elif not paused:
         report.pause_queue(machine_type, paused, paused_by)
index 459669861ba731384282fd847b872f23a663c2eb..edb273a1996737c29d87b6906f471ef3d986a879 100644 (file)
@@ -67,6 +67,12 @@ def main(args):
 def run_job(job_config, teuth_bin_path, archive_dir, verbose):
     safe_archive = safepath.munge(job_config['name'])
     if job_config.get('first_in_suite') or job_config.get('last_in_suite'):
+        if teuth_config.results_server:
+            try:
+                report.try_delete_jobs(job_config['name'], job_config['job_id'])
+            except Exception as e:
+                log.warning("Unable to delete job %s, exception occurred: %s",
+                            job_config['job_id'], e)
         job_archive = os.path.join(archive_dir, safe_archive)
         args = [
             os.path.join(teuth_bin_path, 'teuthology-results'),
index 63a63ab4ae0125ffa3f27d57103f8451d2c3da61..7ebf560166569036bb62bc9bfd0a1771820bed28 100755 (executable)
@@ -8,8 +8,9 @@ import tempfile
 import logging
 import getpass
 
-
+from teuthology import beanstalk
 from teuthology import report
+from teuthology.config import config
 from teuthology import misc
 
 log = logging.getLogger(__name__)
@@ -64,6 +65,7 @@ def kill_run(run_name, archive_base=None, owner=None, machine_type=None,
                                     "you must also pass --machine-type")
 
     if not preserve_queue:
+        remove_beanstalk_jobs(run_name, machine_type)
         remove_paddles_jobs(run_name)
     kill_processes(run_name, run_info.get('pids'))
     if owner is not None:
@@ -101,10 +103,13 @@ def find_run_info(serializer, run_name):
     job_info = {}
     job_num = 0
     jobs = serializer.jobs_for_run(run_name)
+    job_total = len(jobs)
     for (job_id, job_dir) in jobs.items():
         if not os.path.isdir(job_dir):
             continue
         job_num += 1
+        if config.backend == 'beanstalk':
+            beanstalk.print_progress(job_num, job_total, 'Reading Job: ')
         job_info = serializer.job_info(run_name, job_id, simple=True)
         for key in job_info.keys():
             if key in run_info_fields and key not in run_info:
@@ -123,6 +128,41 @@ def remove_paddles_jobs(run_name):
         report.try_delete_jobs(run_name, job_ids)
 
 
+def remove_beanstalk_jobs(run_name, tube_name):
+    qhost = config.queue_host
+    qport = config.queue_port
+    if qhost is None or qport is None:
+        raise RuntimeError(
+            'Beanstalk queue information not found in {conf_path}'.format(
+                conf_path=config.yaml_path))
+    log.info("Checking Beanstalk Queue...")
+    beanstalk_conn = beanstalk.connect()
+    real_tube_name = beanstalk.watch_tube(beanstalk_conn, tube_name)
+
+    curjobs = beanstalk_conn.stats_tube(real_tube_name)['current-jobs-ready']
+    if curjobs != 0:
+        x = 1
+        while x != curjobs:
+            x += 1
+            job = beanstalk_conn.reserve(timeout=20)
+            if job is None:
+                continue
+            job_config = yaml.safe_load(job.body)
+            if run_name == job_config['name']:
+                job_id = job_config['job_id']
+                msg = "Deleting job from queue. ID: " + \
+                    "{id} Name: {name} Desc: {desc}".format(
+                        id=str(job_id),
+                        name=job_config['name'],
+                        desc=job_config['description'],
+                    )
+                log.info(msg)
+                job.delete()
+    else:
+        print("No jobs in Beanstalk Queue")
+    beanstalk_conn.close()
+
+
 def kill_processes(run_name, pids=None):
     if pids:
         to_kill = set(pids).intersection(psutil.pids())
index f31dfd0d7fc1db89feab1f67abdc95c809a9731d..6235b0d36e4913d0bfb30ff8c5a1a0be62de28fe 100644 (file)
@@ -182,7 +182,6 @@ class RemoteProcess(object):
                     command=self.command, exitstatus=self.returncode,
                     node=self.hostname, label=self.label
                 )
-
     def _get_exitstatus(self):
         """
         :returns: the remote command's exit status (return code). Note that
index 5d8cc9c3c87af35b698493a133415974fb73fff6..50b7f8966b1bf91dae553a0f222403be0b7b6655 100644 (file)
@@ -385,13 +385,13 @@ class ResultsReporter(object):
         if os.path.exists(self.last_run_file):
             os.remove(self.last_run_file)
     
-    def get_top_job(self, machine_type):
+    def get_top_job(self, queue):
 
-        uri = "{base}/queue/pop_queue?machine_type={machine_type}".format(base=self.base_uri,
-                                                                          machine_type=machine_type)
+        uri = "{base}/queue/pop_queue?queue_name={queue}".format(base=self.base_uri,
+                                                                          queue=queue)
         inc = random.uniform(0, 1)
         with safe_while(
-                sleep=1, increment=inc, action=f'get job from {machine_type}') as proceed:
+                sleep=1, increment=inc, action=f'get job from {queue}') as proceed:
             while proceed():
                 response = self.session.get(uri)
                 if response.status_code == 200:
@@ -505,7 +505,7 @@ class ResultsReporter(object):
         response = self.session.delete(uri)
         response.raise_for_status()
 
-    def create_queue(self, machine_type):
+    def create_queue(self, queue):
         """
         Create a queue on the results server
 
@@ -514,19 +514,19 @@ class ResultsReporter(object):
         uri = "{base}/queue/".format(
             base=self.base_uri
         )
-        queue_info = {'machine_type': machine_type}
+        queue_info = {'queue': queue}
         queue_json = json.dumps(queue_info)
         headers = {'content-type': 'application/json'}
 
         inc = random.uniform(0, 1)
         with safe_while(
-                sleep=1, increment=inc, action=f'creating queue {machine_type}') as proceed:
+                sleep=1, increment=inc, action=f'creating queue {queue}') as proceed:
             while proceed():
                 response = self.session.post(uri, data=queue_json, headers=headers)
 
                 if response.status_code == 200:
-                    self.log.info("Successfully created queue for {machine_type}".format(
-                        machine_type=machine_type,
+                    self.log.info("Successfully created queue {queue}".format(
+                        queue=queue,
                     ))
                     return
                 else:
@@ -546,26 +546,26 @@ class ResultsReporter(object):
 
         response.raise_for_status()
 
-    def update_queue(self, machine_type, paused, paused_by, pause_duration=None):
+    def update_queue(self, queue, paused, paused_by, pause_duration=None):
         uri = "{base}/queue/".format(
             base=self.base_uri
         )
         if pause_duration is not None:
             pause_duration = int(pause_duration)
-        queue_info = {'machine_type': machine_type, 'paused': paused, 'paused_by': paused_by,
+        queue_info = {'queue': queue, 'paused': paused, 'paused_by': paused_by,
                       'pause_duration': pause_duration}
         queue_json = json.dumps(queue_info)
         headers = {'content-type': 'application/json'}
 
         inc = random.uniform(0, 1)
         with safe_while(
-                sleep=1, increment=inc, action=f'updating queue {machine_type}') as proceed:
+                sleep=1, increment=inc, action=f'updating queue {queue}') as proceed:
             while proceed():
                 response = self.session.put(uri, data=queue_json, headers=headers)
 
                 if response.status_code == 200:
-                    self.log.info("Successfully updated queue for {machine_type}".format(
-                        machine_type=machine_type,
+                    self.log.info("Successfully updated queue {queue}".format(
+                        queue=queue,
                     ))
                     return
                 else:
@@ -580,23 +580,23 @@ class ResultsReporter(object):
         response.raise_for_status()
     
 
-    def queue_stats(self, machine_type):
+    def queue_stats(self, queue):
         uri = "{base}/queue/stats/".format(
             base=self.base_uri
         )
-        queue_info = {'machine_type': machine_type}
+        queue_info = {'queue': queue}
         queue_json = json.dumps(queue_info)
 
         headers = {'content-type': 'application/json'}
         inc = random.uniform(0, 1)
         with safe_while(
-                sleep=1, increment=inc, action=f'stats for queue {machine_type}') as proceed:
+                sleep=1, increment=inc, action=f'stats for queue {queue}') as proceed:
             while proceed():
                 response = self.session.post(uri, data=queue_json, headers=headers)
 
                 if response.status_code == 200:
-                    self.log.info("Successfully retrieved stats for queue {machine_type}".format(
-                        machine_type=machine_type,
+                    self.log.info("Successfully retrieved stats for queue {queue}".format(
+                        queue=queue,
                     ))
                     return response.json()
                 else:
@@ -609,18 +609,18 @@ class ResultsReporter(object):
                         ))
         response.raise_for_status()
     
-    def queued_jobs(self, machine_type, user, run_name):
+    def queued_jobs(self, queue, user, run_name):
         uri = "{base}/queue/queued_jobs/".format(
             base=self.base_uri
         )
+        request_info = {'queue': queue}
         if run_name is not None:
             filter_field = run_name
-            request_info = {'machine_type': machine_type,
-                            'run_name': run_name}
+            uri += "?run_name=" + str(run_name)
         else:
             filter_field = user
-            request_info = {'machine_type': machine_type,
-                            'user': user}
+            uri += "?user=" + str(user)
+            
         request_json = json.dumps(request_info)
         headers = {'content-type': 'application/json'}
         inc = random.uniform(0, 1)
@@ -645,11 +645,14 @@ class ResultsReporter(object):
         response.raise_for_status()
 
 
-def create_machine_type_queue(machine_type):
+def create_machine_type_queue(queue):
     reporter = ResultsReporter()
     if not reporter.base_uri:
         return
-    reporter.create_queue(machine_type)
+    if ',' in queue:
+        queue = 'multi'
+    reporter.create_queue(queue)
+    return queue
 
 
 def get_user_jobs_queue(machine_type, user, run_name=None):
@@ -709,12 +712,16 @@ def get_queued_job(machine_type):
     reporter = ResultsReporter()
     if not reporter.base_uri:
         return
-    if is_queue_paused(machine_type) == True:
-        log.info("Teuthology queue for machine type %s is currently paused",
-                  machine_type)
+    if ',' in machine_type:
+        queue = 'multi'
+    else:
+        queue = machine_type
+    if is_queue_paused(queue) == True:
+        log.info("Teuthology queue %s is currently paused",
+                  queue)
         return None
     else:
-        return reporter.get_top_job(machine_type)
+        return reporter.get_top_job(queue)
 
 
 def try_push_job_info(job_config, extra_info=None):
index 4ccf780845b15a5d51542957335c87749c181e85..81dd4d548f1a85daad782fce19339d3563ad3409 100644 (file)
@@ -1,6 +1,7 @@
 import os
 import yaml
 
+import teuthology.beanstalk
 from teuthology.misc import get_user, merge_configs
 from teuthology import report
 
@@ -22,11 +23,6 @@ def main(args):
             if args[opt]:
                 raise ValueError(msg_fmt.format(opt=opt))
 
-    if args['--first-in-suite'] or args['--last-in-suite']:
-        report_status = False
-    else:
-        report_status = True
-
     name = args['--name']
     if not name or name.isdigit():
         raise ValueError("Please use a more descriptive value for --name")
@@ -34,13 +30,15 @@ def main(args):
     backend = args['--queue-backend']
     if args['--dry-run']:
         print('---\n' + yaml.safe_dump(job_config))
-    elif backend == 'paddles':
-        schedule_job(job_config, args['--num'], report_status)
     elif backend.startswith('@'):
         dump_job_to_file(backend.lstrip('@'), job_config, args['--num'])
+    elif backend == 'paddles':
+        paddles_schedule_job(job_config, args['--num'])
+    elif backend == 'beanstalk':
+        beanstalk_schedule_job(job_config, args['--num'])
     else:
         raise ValueError("Provided schedule backend '%s' is not supported. "
-                         "Try 'paddles' or '@path-to-a-file" % backend)
+                         "Try 'paddles', 'beanstalk' or '@path-to-a-file" % backend)
 
 
 def build_config(args):
@@ -86,9 +84,9 @@ def build_config(args):
     return job_config
 
 
-def schedule_job(job_config, num=1, report_status=True):
+def paddles_schedule_job(job_config, backend, num=1):
     """
-    Schedule a job.
+    Schedule a job with Paddles as the backend.
 
     :param job_config: The complete job dict
     :param num:      The number of times to schedule the job
@@ -97,16 +95,44 @@ def schedule_job(job_config, num=1, report_status=True):
     '''
     Add 'machine_type' queue to DB here.
     '''
-    report.create_machine_type_queue(job_config['machine_type'])
+    queue = report.create_machine_type_queue(job_config['machine_type'])
+    job_config['queue'] = queue
     while num > 0:
-
         job_id = report.try_create_job(job_config, dict(status='queued'))
-        print('Job scheduled with name {name} and ID {job_id}'.format(
+        print('Job scheduled in Paddles with name {name} and ID {job_id}'.format(
             name=job_config['name'], job_id=job_id))
         job_config['job_id'] = str(job_id)
-        
+
+        num -= 1
+
+
+def beanstalk_schedule_job(job_config, backend, num=1):
+    """
+    Schedule a job with Beanstalk as the backend.
+
+    :param job_config: The complete job dict
+    :param num:      The number of times to schedule the job
+    """
+    num = int(num)
+    tube = job_config.pop('tube')
+    beanstalk = teuthology.beanstalk.connect()
+    beanstalk.use(tube)
+    queue = report.create_machine_type_queue(job_config['machine_type'])
+    job_config['queue'] = queue
+    while num > 0:
+        job_id = report.try_create_job(job_config, dict(status='queued'))
+        job_config['job_id'] = str(job_id)
+        job = yaml.safe_dump(job_config)
+        _ = beanstalk.put(
+            job,
+            ttr=60 * 60 * 24,
+            priority=job_config['priority'],
+        )
+        print('Job scheduled in Beanstalk with name {name} and ID {job_id}'.format(
+            name=job_config['name'], job_id=job_id))
         num -= 1
 
+
 def dump_job_to_file(path, job_config, num=1):
     """
     Schedule a job.
@@ -134,4 +160,3 @@ def dump_job_to_file(path, job_config, num=1):
             num -= 1
     with open(count_file_path, 'w') as f:
         f.write(str(jid))
-
index 6b0dddfe2f4c988803ef2ee02a3852f5cc6509d1..9a6d0ff564cd31addc26686159adc6fe92f0e83f 100644 (file)
@@ -65,7 +65,8 @@ class TestDispatcher(unittest.TestCase):
             '--description': 'the_description',
             '--machine-type': 'test_queue',
             '--supervisor': False,
-            '--verbose': False
+            '--verbose': False,
+            '--queue-backend': 'paddles'
         }
 
         m = mock.MagicMock()
index c5ed88799f20c4e971b6f56ad94168b607ee5580..6c5cf756d2c961400be553b6d94c8476be980b52 100644 (file)
@@ -1,3 +1,4 @@
+import beanstalkc
 import os
 
 from unittest.mock import patch, Mock, MagicMock
@@ -43,7 +44,8 @@ class TestWorker(object):
     @patch("os.symlink")
     def test_symlink_success(self, m_symlink):
         worker.symlink_worker_log("path/to/worker.log", "path/to/archive")
-        m_symlink.assert_called_with("path/to/worker.log", "path/to/archive/worker.log")
+        m_symlink.assert_called_with(
+            "path/to/worker.log", "path/to/archive/worker.log")
 
     @patch("teuthology.worker.log")
     @patch("os.symlink")
@@ -135,7 +137,8 @@ class TestWorker(object):
         m_popen.return_value = m_p
         m_t_config.results_server = False
         worker.run_job(config, "teuth/bin/path", "archive/dir", verbose=False)
-        m_symlink_log.assert_called_with(config["worker_log"], config["archive_path"])
+        m_symlink_log.assert_called_with(
+            config["worker_log"], config["archive_path"])
 
     @patch("teuthology.worker.report.try_push_job_info")
     @patch("teuthology.worker.symlink_worker_log")
@@ -151,7 +154,8 @@ class TestWorker(object):
         process = Mock()
         process.poll.return_value = "not None"
         worker.run_with_watchdog(process, config)
-        m_symlink_log.assert_called_with(config["worker_log"], config["archive_path"])
+        m_symlink_log.assert_called_with(
+            config["worker_log"], config["archive_path"])
         m_try_push.assert_called_with(
             dict(name=config["name"], job_id=config["job_id"]),
             dict(status='dead')
@@ -175,7 +179,8 @@ class TestWorker(object):
         m_proc.poll.return_value = "not None"
         m_popen.return_value = m_proc
         worker.run_with_watchdog(process, config)
-        m_symlink_log.assert_called_with(config["worker_log"], config["archive_path"])
+        m_symlink_log.assert_called_with(
+            config["worker_log"], config["archive_path"])
 
     @patch("os.path.isdir")
     @patch("teuthology.worker.fetch_teuthology")
@@ -211,63 +216,97 @@ class TestWorker(object):
         assert m_fetch_qa_suite.called_once_with_args(branch='main')
         assert got_config['suite_path'] == '/suite/path'
 
-    def build_fake_jobs(self, job_bodies):
+    def build_fake_jobs(self, m_connection, m_job, job_bodies):
         """
+        Given patched copies of:
+            beanstalkc.Connection
+            beanstalkc.Job
         And a list of basic job bodies, return a list of mocked Job objects
         """
+        # Make sure instantiating m_job returns a new object each time
+        m_job.side_effect = lambda **kwargs: Mock(spec=beanstalkc.Job)
         jobs = []
         job_id = 0
         for job_body in job_bodies:
             job_id += 1
-            job = {}
-            job['job_id'] = job_id
-            job['body'] = job_body
+            job = m_job(conn=m_connection, jid=job_id, body=job_body)
+            job.jid = job_id
+            job_body += '\njob_id: ' + str(job_id)
+            job.body = job_body
             jobs.append(job)
         return jobs
 
-
-    @patch("teuthology.worker.setup_log_file")
-    @patch("os.path.isdir", return_value=True)
-    @patch("teuthology.worker.fetch_teuthology")
-    @patch("teuthology.worker.fetch_qa_suite")
     @patch("teuthology.worker.run_job")
+    @patch("teuthology.worker.prep_job")
+    @patch("beanstalkc.Job", autospec=True)
+    @patch("teuthology.worker.fetch_qa_suite")
+    @patch("teuthology.worker.fetch_teuthology")
+    @patch("teuthology.worker.beanstalk.watch_tube")
+    @patch("teuthology.worker.beanstalk.connect")
+    @patch("os.path.isdir", return_value=True)
+    @patch("teuthology.worker.setup_log_file")
+    def test_main_loop(
+        self, m_setup_log_file, m_isdir, m_connect, m_watch_tube,
+        m_fetch_teuthology, m_fetch_qa_suite, m_job, m_prep_job, m_run_job,
+    ):
+        m_connection = Mock()
+        jobs = self.build_fake_jobs(
+            m_connection,
+            m_job,
+            [
+                'foo: bar',
+                'stop_worker: true',
+            ],
+        )
+        m_connection.reserve.side_effect = jobs
+        m_connect.return_value = m_connection
+        m_prep_job.return_value = (dict(), '/bin/path')
+        worker.main(self.ctx)
+        # There should be one reserve call per item in the jobs list
+        expected_reserve_calls = [
+            dict(timeout=60) for i in range(len(jobs))
+        ]
+        got_reserve_calls = [
+            call[1] for call in m_connection.reserve.call_args_list
+        ]
+        assert got_reserve_calls == expected_reserve_calls
+        for job in jobs:
+            job.bury.assert_called_once_with()
+            job.delete.assert_called_once_with()
+
     @patch("teuthology.worker.report.try_push_job_info")
-    @patch("teuthology.worker.report.get_queued_job")
-    @patch("teuthology.worker.clean_config")
+    @patch("teuthology.worker.run_job")
+    @patch("beanstalkc.Job", autospec=True)
+    @patch("teuthology.worker.fetch_qa_suite")
+    @patch("teuthology.worker.fetch_teuthology")
+    @patch("teuthology.worker.beanstalk.watch_tube")
+    @patch("teuthology.worker.beanstalk.connect")
+    @patch("os.path.isdir", return_value=True)
+    @patch("teuthology.worker.setup_log_file")
     def test_main_loop_13925(
-        self, m_setup_log_file, m_isdir,
-        m_fetch_teuthology, m_fetch_qa_suite, m_run_job,
-        m_try_push_job_info, m_get_queued_job, m_clean_config
-                       ):
+        self, m_setup_log_file, m_isdir, m_connect, m_watch_tube,
+        m_fetch_teuthology, m_fetch_qa_suite, m_job, m_run_job,
+        m_try_push_job_info,
+    ):
+        m_connection = Mock()
+        jobs = self.build_fake_jobs(
+            m_connection,
+            m_job,
+            [
+                'name: name',
+                'name: name\nstop_worker: true',
+            ],
+        )
+        m_connection.reserve.side_effect = jobs
+        m_connect.return_value = m_connection
         m_fetch_qa_suite.side_effect = [
             '/suite/path',
             MaxWhileTries(),
             MaxWhileTries(),
         ]
-        job = {
-            'job_id': '1',
-            'description': 'DESC',
-            'email': 'EMAIL',
-            'first_in_suite': False,
-            'last_in_suite': True,
-            'machine_type': 'test_queue',
-            'name': 'NAME',
-            'owner': 'OWNER',
-            'priority': 99,
-            'results_timeout': '6',
-            'verbose': False,
-            'stop_worker': True
-        }
-        m_get_queued_job.return_value = job
-        m_clean_config.return_value = job
-
-        mock_prep_job_patcher = patch('teuthology.worker.prep_job')
-        mock_prep_job = mock_prep_job_patcher.start()
-        mock_prep_job.return_value = (dict(), '/teuth/bin/path')
-
         worker.main(self.ctx)
-        mock_prep_job_patcher.stop()
-        assert len(m_run_job.call_args_list) == 1
-        assert len(m_try_push_job_info.call_args_list) == 1
-        assert m_try_push_job_info.called_once_with(job, dict(status='running'))
-
+        assert len(m_run_job.call_args_list) == 0
+        assert len(m_try_push_job_info.call_args_list) == len(jobs)
+        for i in range(len(jobs)):
+            push_call = m_try_push_job_info.call_args_list[i]
+            assert push_call[0][1]['status'] == 'dead'
index 89a7304229f03d6db4829a8038d62f9715521bbd..5cff69c880640723b086fe8da503b5b57d6ac32a 100644 (file)
@@ -9,6 +9,7 @@ import yaml
 from datetime import datetime
 
 from teuthology import setup_log_file, install_except_hook
+from teuthology import beanstalk
 from teuthology import report
 from teuthology import safepath
 from teuthology.config import config as teuth_config
@@ -57,14 +58,6 @@ def load_config(ctx=None):
             teuth_config.archive_base = ctx.archive_dir
 
 
-def clean_config(config):
-    result = {}
-    for key in config:
-        if config[key] is not None:
-            result[key] = config[key]
-    return result
-
-
 def main(ctx):
     loglevel = logging.INFO
     if ctx.verbose:
@@ -81,6 +74,8 @@ def main(ctx):
 
     set_config_attr(ctx)
 
+    connection = beanstalk.connect()
+    beanstalk.watch_tube(connection, ctx.tube)
     result_proc = None
 
     if teuth_config.teuthology_path is None:
@@ -103,16 +98,16 @@ def main(ctx):
 
         load_config()
 
-        job = report.get_queued_job(ctx.machine_type)
+        job = connection.reserve(timeout=60)
         if job is None:
             continue
 
-        job = clean_config(job)
-        report.try_push_job_info(job, dict(status='running'))
-        job_id = job.get('job_id')
+        # bury the job so it won't be re-run if it fails
+        job.bury()
+        job_config = yaml.safe_load(job.body)
+        job_id = job_config.get('job_id')
         log.info('Reserved job %s', job_id)
-        log.info('Config is: %s', job)
-        job_config = job
+        log.info('Config is: %s', job.body)
 
         if job_config.get('stop_worker'):
             keep_running = False
@@ -132,6 +127,13 @@ def main(ctx):
         except SkipJob:
             continue
 
+        # This try/except block is to keep the worker from dying when
+        # beanstalkc throws a SocketError
+        try:
+            job.delete()
+        except Exception:
+            log.exception("Saw exception while trying to delete job")
+
 
 def prep_job(job_config, log_file_path, archive_dir):
     job_id = job_config['job_id']