]> git.apps.os.sepia.ceph.com Git - teuthology.git/commitdiff
Remove usage of beanstalk from teuthology
authorAishwarya Mathuria <amathuri@redhat.com>
Mon, 26 Apr 2021 13:46:45 +0000 (19:16 +0530)
committerZack Cerza <zack@redhat.com>
Tue, 31 May 2022 19:45:32 +0000 (13:45 -0600)
The following changes support the removal of Beanstalk from Teuthology.
In place of Beanstalk, we will now be using Paddles for queue management in Teuthology.
This PR has the corresponding changes for the paddles PR: https://github.com/ceph/paddles/pull/94/files.

The changes include:
1. Removing all beanstalk related code
2. Teuthology scheduler and dispatcher using Paddles queue for scheduling and dispatching jobs
3. Adding support for Paddles queue management
4. Additional functionality of being able to change the priority of Teuthology jobs in the queued state in the teuthology-queue command

Signed-off-by: Aishwarya Mathuria <amathuri@redhat.com>
13 files changed:
scripts/dispatcher.py
scripts/kill.py
scripts/queue.py
scripts/schedule.py
scripts/worker.py
teuthology/beanstalk.py [deleted file]
teuthology/dispatcher/__init__.py
teuthology/dispatcher/supervisor.py
teuthology/kill.py
teuthology/paddles_queue.py [new file with mode: 0644]
teuthology/report.py
teuthology/schedule.py
teuthology/worker.py

index 4cb1abdea665803c62ecc5d0ef19fd2fa77f1e07..8a67587ed8f76718ac66569b6ea164c476d467ee 100644 (file)
@@ -1,9 +1,9 @@
 """
 usage: teuthology-dispatcher --help
-       teuthology-dispatcher --supervisor [-v] --bin-path BIN_PATH --job-config COFNFIG --archive-dir DIR
-       teuthology-dispatcher [-v] [--archive-dir DIR] [--exit-on-empty-queue] --log-dir LOG_DIR --tube TUBE
+       teuthology-dispatcher --supervisor [-v] --bin-path BIN_PATH --job-config CONFIG --archive-dir DIR
+       teuthology-dispatcher [-v] [--archive-dir DIR] --log-dir LOG_DIR --machine-type MACHINE_TYPE
 
-Start a dispatcher for the specified tube. Grab jobs from a beanstalk
+Start a dispatcher for the specified machine type. Grab jobs from a paddles
 queue and run the teuthology tests they describe as subprocesses. The
 subprocess invoked is a teuthology-dispatcher command run in supervisor
 mode.
@@ -15,9 +15,9 @@ at the end of the run.
 standard arguments:
   -h, --help                     show this help message and exit
   -v, --verbose                  be more verbose
-  -t, --tube TUBE                which beanstalk tube to read jobs from
   -l, --log-dir LOG_DIR          path in which to store logs
   -a DIR, --archive-dir DIR      path to archive results in
+  --machine-type MACHINE_TYPE    the machine type for the job
   --supervisor                   run dispatcher in job supervisor mode
   --bin-path BIN_PATH            teuthology bin path
   --job-config CONFIG            file descriptor of job's config file
index 31acc8b1a4a262741518a80064ac33a8f72c8853..e2a1a4ef09e9aac45920b0fe8c4164f27fe047bf 100644 (file)
@@ -12,7 +12,7 @@ usage: teuthology-kill -h
        teuthology-kill [-p] -o OWNER -m MACHINE_TYPE -r RUN
 
 Kill running teuthology jobs:
-1. Removes any queued jobs from the beanstalk queue
+1. Removes any queued jobs from the paddles queue
 2. Kills any running jobs
 3. Nukes any machines involved
 
index 8ea5ca5c2ce1c64161596cbb36d1f4f4f6adeff6..285a0adac9599df848ad71ec575c16486ee9cba1 100644 (file)
@@ -1,21 +1,22 @@
 import docopt
 
 import teuthology.config
-import teuthology.beanstalk
+import teuthology.paddles_queue
 
 doc = """
 usage: teuthology-queue -h
-       teuthology-queue [-s|-d|-f] -m MACHINE_TYPE
-       teuthology-queue [-r] -m MACHINE_TYPE
-       teuthology-queue -m MACHINE_TYPE -D PATTERN
-       teuthology-queue -p SECONDS [-m MACHINE_TYPE]
+       teuthology-queue -s -m MACHINE_TYPE
+       teuthology-queue [-d|-f] -m MACHINE_TYPE [-P PRIORITY] -u USER
+       teuthology-queue [-r] -m MACHINE_TYPE -u USER
+       teuthology-queue -m MACHINE_TYPE -D PATTERN -u USER
+       teuthology-queue -p SECONDS -m MACHINE_TYPE -u USER
 
 List Jobs in queue.
 If -D is passed, then jobs with PATTERN in the job name are deleted from the
 queue.
 
 Arguments:
-  -m, --machine_type MACHINE_TYPE [default: multi]
+  -m, --machine_type MACHINE_TYPE
                         Which machine type queue to work on.
 
 optional arguments:
@@ -28,9 +29,12 @@ optional arguments:
   -p, --pause SECONDS   Pause queues for a number of seconds. A value of 0
                         will unpause. If -m is passed, pause that queue,
                         otherwise pause all queues.
+  -P, --priority PRIORITY
+                        Change priority of queued jobs
+  -u, --user USER       User who owns the jobs
 """
 
 
 def main():
     args = docopt.docopt(doc)
-    teuthology.beanstalk.main(args)
+    teuthology.paddles_queue.main(args)
index 59a2cee298426c891b15feb928fe15d10d5c2308..ee443125ee1a821936a86b8efd27ebc7111e9892 100644 (file)
@@ -20,7 +20,7 @@ optional arguments:
                                        Queue backend name, use prefix '@'
                                        to append job config to the given
                                        file path as yaml.
-                                       [default: beanstalk]
+                                       [default: paddles]
   -n <name>, --name <name>             Name of suite run the job is part of
   -d <desc>, --description <desc>      Job description
   -o <owner>, --owner <owner>          Job owner
index a3e12c20d7bb9366ae92347e23e1b833f70b1f34..8d3228d8d031b2bceaf84bf4c65a997cda327406 100644 (file)
@@ -9,7 +9,7 @@ def main():
 
 def parse_args():
     parser = argparse.ArgumentParser(description="""
-Grab jobs from a beanstalk queue and run the teuthology tests they
+Grab jobs from a paddles queue and run the teuthology tests they
 describe. One job is run at a time.
 """)
     parser.add_argument(
@@ -29,8 +29,8 @@ describe. One job is run at a time.
         required=True,
     )
     parser.add_argument(
-        '-t', '--tube',
-        help='which beanstalk tube to read jobs from',
+        '-m', '--machine-type',
+        help='which machine type the jobs will run on',
         required=True,
     )
 
diff --git a/teuthology/beanstalk.py b/teuthology/beanstalk.py
deleted file mode 100644 (file)
index ff3afb6..0000000
+++ /dev/null
@@ -1,214 +0,0 @@
-import beanstalkc
-import yaml
-import logging
-import pprint
-import sys
-from collections import OrderedDict
-
-from teuthology.config import config
-from teuthology import report
-
-log = logging.getLogger(__name__)
-
-
-def connect():
-    host = config.queue_host
-    port = config.queue_port
-    if host is None or port is None:
-        raise RuntimeError(
-            'Beanstalk queue information not found in {conf_path}'.format(
-                conf_path=config.teuthology_yaml))
-    return beanstalkc.Connection(host=host, port=port, parse_yaml=yaml.safe_load)
-
-
-def watch_tube(connection, tube_name):
-    """
-    Watch a given tube, potentially correcting to 'multi' if necessary. Returns
-    the tube_name that was actually used.
-    """
-    if ',' in tube_name:
-        log.debug("Correcting tube name to 'multi'")
-        tube_name = 'multi'
-    connection.watch(tube_name)
-    connection.ignore('default')
-    return tube_name
-
-
-def walk_jobs(connection, tube_name, processor, pattern=None):
-    """
-    def callback(jobs_dict)
-    """
-    log.info("Checking Beanstalk Queue...")
-    job_count = connection.stats_tube(tube_name)['current-jobs-ready']
-    if job_count == 0:
-        log.info('No jobs in Beanstalk Queue')
-        return
-
-    # Try to figure out a sane timeout based on how many jobs are in the queue
-    timeout = job_count / 2000.0 * 60
-    for i in range(1, job_count + 1):
-        print_progress(i, job_count, "Loading")
-        job = connection.reserve(timeout=timeout)
-        if job is None or job.body is None:
-            continue
-        job_config = yaml.safe_load(job.body)
-        job_name = job_config['name']
-        job_id = job.stats()['id']
-        if pattern is not None and pattern not in job_name:
-            continue
-        processor.add_job(job_id, job_config, job)
-    end_progress()
-    processor.complete()
-
-
-def print_progress(index, total, message=None):
-    msg = "{m} ".format(m=message) if message else ''
-    sys.stderr.write("{msg}{i}/{total}\r".format(
-        msg=msg, i=index, total=total))
-    sys.stderr.flush()
-
-
-def end_progress():
-    sys.stderr.write('\n')
-    sys.stderr.flush()
-
-
-class JobProcessor(object):
-    def __init__(self):
-        self.jobs = OrderedDict()
-
-    def add_job(self, job_id, job_config, job_obj=None):
-        job_id = str(job_id)
-
-        job_dict = dict(
-            index=(len(self.jobs) + 1),
-            job_config=job_config,
-        )
-        if job_obj:
-            job_dict['job_obj'] = job_obj
-        self.jobs[job_id] = job_dict
-
-        self.process_job(job_id)
-
-    def process_job(self, job_id):
-        pass
-
-    def complete(self):
-        pass
-
-
-class JobPrinter(JobProcessor):
-    def __init__(self, show_desc=False, full=False):
-        super(JobPrinter, self).__init__()
-        self.show_desc = show_desc
-        self.full = full
-
-    def process_job(self, job_id):
-        job_config = self.jobs[job_id]['job_config']
-        job_index = self.jobs[job_id]['index']
-        job_priority = job_config['priority']
-        job_name = job_config['name']
-        job_desc = job_config['description']
-        print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format(
-            i=job_index,
-            pri=job_priority,
-            job_id=job_id,
-            job_name=job_name,
-            ))
-        if self.full:
-            pprint.pprint(job_config)
-        elif job_desc and self.show_desc:
-            for desc in job_desc.split():
-                print('\t {}'.format(desc))
-
-
-class RunPrinter(JobProcessor):
-    def __init__(self):
-        super(RunPrinter, self).__init__()
-        self.runs = list()
-
-    def process_job(self, job_id):
-        run = self.jobs[job_id]['job_config']['name']
-        if run not in self.runs:
-            self.runs.append(run)
-            print(run)
-
-
-class JobDeleter(JobProcessor):
-    def __init__(self, pattern):
-        self.pattern = pattern
-        super(JobDeleter, self).__init__()
-
-    def add_job(self, job_id, job_config, job_obj=None):
-        job_name = job_config['name']
-        if self.pattern in job_name:
-            super(JobDeleter, self).add_job(job_id, job_config, job_obj)
-
-    def process_job(self, job_id):
-        job_config = self.jobs[job_id]['job_config']
-        job_name = job_config['name']
-        print('Deleting {job_name}/{job_id}'.format(
-            job_id=job_id,
-            job_name=job_name,
-            ))
-        job_obj = self.jobs[job_id].get('job_obj')
-        if job_obj:
-            job_obj.delete()
-        report.try_delete_jobs(job_name, job_id)
-
-
-def pause_tube(connection, tube, duration):
-    duration = int(duration)
-    if not tube:
-        tubes = sorted(connection.tubes())
-    else:
-        tubes = [tube]
-
-    prefix = 'Unpausing' if duration == 0 else "Pausing for {dur}s"
-    templ = prefix + ": {tubes}"
-    log.info(templ.format(dur=duration, tubes=tubes))
-    for tube in tubes:
-        connection.pause_tube(tube, duration)
-
-
-def stats_tube(connection, tube):
-    stats = connection.stats_tube(tube)
-    result = dict(
-        name=tube,
-        count=stats['current-jobs-ready'],
-        paused=(stats['pause'] != 0),
-    )
-    return result
-
-
-def main(args):
-    machine_type = args['--machine_type']
-    status = args['--status']
-    delete = args['--delete']
-    runs = args['--runs']
-    show_desc = args['--description']
-    full = args['--full']
-    pause_duration = args['--pause']
-    try:
-        connection = connect()
-        if machine_type and not pause_duration:
-            # watch_tube needs to be run before we inspect individual jobs;
-            # it is not needed for pausing tubes
-            watch_tube(connection, machine_type)
-        if status:
-            print(stats_tube(connection, machine_type))
-        elif pause_duration:
-            pause_tube(connection, machine_type, pause_duration)
-        elif delete:
-            walk_jobs(connection, machine_type,
-                      JobDeleter(delete))
-        elif runs:
-            walk_jobs(connection, machine_type,
-                      RunPrinter())
-        else:
-            walk_jobs(connection, machine_type,
-                      JobPrinter(show_desc=show_desc, full=full))
-    except KeyboardInterrupt:
-        log.info("Interrupted.")
-    finally:
-        connection.close()
index 14218835b18ec52aea266c0403aa9ee7ec16cf89..dd2930f1e6ce4efd357d30e559d86c263d77c197 100644 (file)
@@ -3,11 +3,11 @@ import os
 import subprocess
 import sys
 import yaml
+import json
 
 from datetime import datetime
 
 from teuthology import setup_log_file, install_except_hook
-from teuthology import beanstalk
 from teuthology import report
 from teuthology.config import config as teuth_config
 from teuthology.exceptions import SkipJob
@@ -54,6 +54,12 @@ def load_config(archive_dir=None):
         else:
             teuth_config.archive_base = archive_dir
 
+def clean_config(config):
+    result = {}
+    for key in config:
+        if config[key] is not None:
+            result[key] = config[key]
+    return result
 
 def main(args):
     # run dispatcher in job supervisor mode if --supervisor passed
@@ -61,7 +67,7 @@ def main(args):
         return supervisor.main(args)
 
     verbose = args["--verbose"]
-    tube = args["--tube"]
+    machine_type = args["--machine-type"]
     log_dir = args["--log-dir"]
     archive_dir = args["--archive-dir"]
     exit_on_empty_queue = args["--exit-on-empty-queue"]
@@ -69,19 +75,19 @@ def main(args):
     if archive_dir is None:
         archive_dir = teuth_config.archive_base
 
+    if machine_type is None and teuth_config.machine_type is None:
+        return
     # setup logging for disoatcher in {log_dir}
     loglevel = logging.INFO
     if verbose:
         loglevel = logging.DEBUG
     log.setLevel(loglevel)
-    log_file_path = os.path.join(log_dir, f"dispatcher.{tube}.{os.getpid()}")
+    log_file_path = os.path.join(log_dir, f"dispatcher.{machine_type}.{os.getpid()}")
     setup_log_file(log_file_path)
     install_except_hook()
 
     load_config(archive_dir=archive_dir)
 
-    connection = beanstalk.connect()
-    beanstalk.watch_tube(connection, tube)
     result_proc = None
 
     if teuth_config.teuthology_path is None:
@@ -105,21 +111,19 @@ def main(args):
 
         load_config()
         job_procs = set(filter(lambda p: p.poll() is None, job_procs))
-        job = connection.reserve(timeout=60)
+        job = report.get_queued_job(machine_type)
         if job is None:
             if exit_on_empty_queue and not job_procs:
                 log.info("Queue is empty and no supervisor processes running; exiting!")
                 break
             continue
-
-        # bury the job so it won't be re-run if it fails
-        job.bury()
-        job_id = job.jid
-        log.info('Reserved job %d', job_id)
-        log.info('Config is: %s', job.body)
-        job_config = yaml.safe_load(job.body)
-        job_config['job_id'] = str(job_id)
-
+        job = clean_config(job)
+        report.try_push_job_info(job, dict(status='running'))
+        job_id = job.get('job_id')
+        log.info('Reserved job %s', job_id)
+        log.info('Config is: %s', job)
+        job_config = job
+        
         if job_config.get('stop_worker'):
             keep_running = False
 
@@ -170,12 +174,6 @@ def main(args):
                 status='fail',
                 failure_reason=error_message))
 
-        # This try/except block is to keep the worker from dying when
-        # beanstalkc throws a SocketError
-        try:
-            job.delete()
-        except Exception:
-            log.exception("Saw exception while trying to delete job")
 
     returncodes = set([0])
     for proc in job_procs:
index d7a695475d1033d612b15ac1a7084c47c9380a5e..459669861ba731384282fd847b872f23a663c2eb 100644 (file)
@@ -124,7 +124,7 @@ def run_job(job_config, teuth_bin_path, archive_dir, verbose):
         '--archive', job_config['archive_path'],
         '--name', job_config['name'],
     ])
-    if job_config['description'] is not None:
+    if 'description' in job_config:
         arg.extend(['--description', job_config['description']])
     job_archive = os.path.join(job_config['archive_path'], 'orig.config.yaml')
     arg.extend(['--', job_archive])
index 5af11b628c95b174e76548ddcef42fae754109db..f9080f0184aedefb759caf40df20ddab4011abe6 100755 (executable)
@@ -9,7 +9,6 @@ import logging
 import getpass
 
 
-from teuthology import beanstalk
 from teuthology import report
 from teuthology.config import config
 from teuthology import misc
@@ -66,7 +65,6 @@ def kill_run(run_name, archive_base=None, owner=None, machine_type=None,
                                     "you must also pass --machine-type")
 
     if not preserve_queue:
-        remove_beanstalk_jobs(run_name, machine_type)
         remove_paddles_jobs(run_name)
     kill_processes(run_name, run_info.get('pids'))
     if owner is not None:
@@ -109,7 +107,6 @@ def find_run_info(serializer, run_name):
         if not os.path.isdir(job_dir):
             continue
         job_num += 1
-        beanstalk.print_progress(job_num, job_total, 'Reading Job: ')
         job_info = serializer.job_info(run_name, job_id, simple=True)
         for key in job_info.keys():
             if key in run_info_fields and key not in run_info:
@@ -128,41 +125,6 @@ def remove_paddles_jobs(run_name):
         report.try_delete_jobs(run_name, job_ids)
 
 
-def remove_beanstalk_jobs(run_name, tube_name):
-    qhost = config.queue_host
-    qport = config.queue_port
-    if qhost is None or qport is None:
-        raise RuntimeError(
-            'Beanstalk queue information not found in {conf_path}'.format(
-                conf_path=config.yaml_path))
-    log.info("Checking Beanstalk Queue...")
-    beanstalk_conn = beanstalk.connect()
-    real_tube_name = beanstalk.watch_tube(beanstalk_conn, tube_name)
-
-    curjobs = beanstalk_conn.stats_tube(real_tube_name)['current-jobs-ready']
-    if curjobs != 0:
-        x = 1
-        while x != curjobs:
-            x += 1
-            job = beanstalk_conn.reserve(timeout=20)
-            if job is None:
-                continue
-            job_config = yaml.safe_load(job.body)
-            if run_name == job_config['name']:
-                job_id = job.stats()['id']
-                msg = "Deleting job from queue. ID: " + \
-                    "{id} Name: {name} Desc: {desc}".format(
-                        id=str(job_id),
-                        name=job_config['name'],
-                        desc=job_config['description'],
-                    )
-                log.info(msg)
-                job.delete()
-    else:
-        print("No jobs in Beanstalk Queue")
-    beanstalk_conn.close()
-
-
 def kill_processes(run_name, pids=None):
     if pids:
         to_kill = set(pids).intersection(psutil.pids())
diff --git a/teuthology/paddles_queue.py b/teuthology/paddles_queue.py
new file mode 100644 (file)
index 0000000..5287ea4
--- /dev/null
@@ -0,0 +1,214 @@
+import logging
+import pprint
+import sys
+from collections import OrderedDict
+
+from teuthology.config import config
+from teuthology import report
+
+
+log = logging.getLogger(__name__)
+
+
+def connect():
+    host = config.queue_host
+    port = config.queue_port
+    if host is None or port is None:
+        raise RuntimeError(
+            'Beanstalk queue information not found in {conf_path}'.format(
+                conf_path=config.teuthology_yaml))
+    return beanstalkc.Connection(host=host, port=port, parse_yaml=yaml.safe_load)
+
+
+def watch_tube(connection, tube_name):
+    """
+    Watch a given tube, potentially correcting to 'multi' if necessary. Returns
+    the tube_name that was actually used.
+    """
+    if ',' in tube_name:
+        log.debug("Correcting tube name to 'multi'")
+        tube_name = 'multi'
+    connection.watch(tube_name)
+    connection.ignore('default')
+    return tube_name
+
+
+def walk_jobs(connection, tube_name, processor, pattern=None):
+    """
+    def callback(jobs_dict)
+    """
+    log.info("Checking Beanstalk Queue...")
+    job_count = connection.stats_tube(tube_name)['current-jobs-ready']
+    if job_count == 0:
+        log.info('No jobs in Beanstalk Queue')
+        return
+
+def stats_queue(machine_type):
+    stats = report.get_queue_stats(machine_type)
+    stats = report.get_queue_stats(machine_type)
+    if stats['paused'] is None:
+        log.info("%s queue is currently running with %s jobs queued",
+                 stats['name'],
+                 stats['count'])
+    else:
+        log.info("%s queue is paused with %s jobs queued",
+                 stats['name'],
+                 stats['count'])
+
+
+def update_priority(machine_type, priority, user):
+    jobs = report.get_user_jobs_queue(machine_type, user)
+    for job in jobs:
+        job['priority'] = priority
+        report.try_push_job_info(job)
+
+
+def pause_queue(machine_type, pause_duration, paused_by):
+    report.pause_queue(machine_type, paused_by, pause_duration)
+
+
+def print_progress(index, total, message=None):
+    msg = "{m} ".format(m=message) if message else ''
+    sys.stderr.write("{msg}{i}/{total}\r".format(
+        msg=msg, i=index, total=total))
+    sys.stderr.flush()
+
+
+def end_progress():
+    sys.stderr.write('\n')
+    sys.stderr.flush()
+
+
+def walk_jobs(machine_type, processor, user):
+    log.info("Checking paddles queue...")
+    job_count = report.get_queue_stats(machine_type)['count']
+
+    jobs = report.get_user_jobs_queue(machine_type, user)
+    if job_count == 0:
+        log.info('No jobs in queue')
+        return
+
+    # Try to figure out a sane timeout based on how many jobs are in the queue
+    timeout = job_count / 2000.0 * 60
+    for i in range(1, job_count + 1):
+        print_progress(i, job_count, "Loading")
+        job = jobs[i-1]
+        if job is None:
+            continue
+        job_name = job['name']
+        job_id = job['job_id']
+        processor.add_job(job_id, job)
+    end_progress()
+    processor.complete()
+
+
+class JobProcessor(object):
+    def __init__(self):
+        self.jobs = OrderedDict()
+
+    def add_job(self, job_id, job_config, job_obj=None):
+        job_id = str(job_id)
+
+        job_dict = dict(
+            index=(len(self.jobs) + 1),
+            job_config=job_config,
+        )
+        if job_obj:
+            job_dict['job_obj'] = job_obj
+        self.jobs[job_id] = job_dict
+
+        self.process_job(job_id)
+
+    def process_job(self, job_id):
+        pass
+
+    def complete(self):
+        pass
+
+
+class JobPrinter(JobProcessor):
+    def __init__(self, show_desc=False, full=False):
+        super(JobPrinter, self).__init__()
+        self.show_desc = show_desc
+        self.full = full
+
+    def process_job(self, job_id):
+        job_config = self.jobs[job_id]['job_config']
+        job_index = self.jobs[job_id]['index']
+        job_priority = job_config['priority']
+        job_name = job_config['name']
+        job_desc = job_config['description']
+        print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format(
+            i=job_index,
+            pri=job_priority,
+            job_id=job_id,
+            job_name=job_name,
+            ))
+        if self.full:
+            pprint.pprint(job_config)
+        elif job_desc and self.show_desc:
+            for desc in job_desc.split():
+                print('\t {}'.format(desc))
+
+
+class RunPrinter(JobProcessor):
+    def __init__(self):
+        super(RunPrinter, self).__init__()
+        self.runs = list()
+
+    def process_job(self, job_id):
+        run = self.jobs[job_id]['job_config']['name']
+        if run not in self.runs:
+            self.runs.append(run)
+            print(run)
+
+
+class JobDeleter(JobProcessor):
+    def __init__(self, pattern):
+        self.pattern = pattern
+        super(JobDeleter, self).__init__()
+
+    def add_job(self, job_id, job_config, job_obj=None):
+        job_name = job_config['name']
+        if self.pattern in job_name:
+            super(JobDeleter, self).add_job(job_id, job_config, job_obj)
+
+    def process_job(self, job_id):
+        job_config = self.jobs[job_id]['job_config']
+        job_name = job_config['name']
+        print('Deleting {job_name}/{job_id}'.format(
+            job_id=job_id,
+            job_name=job_name,
+            ))
+        report.try_delete_jobs(job_name, job_id)
+
+
+def main(args):
+    machine_type = args['--machine_type']
+    user = args['--user']
+    priority = args['--priority']
+    status = args['--status']
+    delete = args['--delete']
+    runs = args['--runs']
+    show_desc = args['--description']
+    full = args['--full']
+    pause_duration = args['--pause']
+    try:
+        if status:
+            stats_queue(machine_type)
+        elif pause_duration:
+            pause_queue(machine_type, pause_duration, user)
+        elif priority:
+            update_priority(machine_type, priority, user)
+        elif delete:
+            walk_jobs(machine_type,
+                      JobDeleter(delete), user)
+        elif runs:
+            walk_jobs(machine_type,
+                      RunPrinter(), user)
+        else:
+            walk_jobs(machine_type,
+                      JobPrinter(show_desc=show_desc, full=full),
+                      user)
+    except KeyboardInterrupt:
+        log.info("Interrupted.")
index b75d0ad768ad764facea6e358708103d63bcf9e3..a28fcffb7ce2ff079f0b79174a556459e399048a 100644 (file)
@@ -6,6 +6,7 @@ import requests
 import logging
 import random
 import socket
+import threading
 from datetime import datetime
 
 import teuthology
@@ -262,6 +263,39 @@ class ResultsReporter(object):
             self.log.debug("    no jobs; skipped")
         return len(jobs)
 
+    def write_new_job(self, run_name, job_info):
+        """
+        Report a new job to the results server.
+
+        :param run_name: The name of the run. The run must already exist.
+        :param job_info: The job's info dict. Must be present since this is a new job
+        """
+        if job_info is None or not isinstance(job_info, dict):
+            raise TypeError("Job info must be a dict")
+        run_uri = "{base}/runs/{name}/jobs/".format(
+            base=self.base_uri, name=run_name,
+        )
+        job_json = json.dumps(job_info)
+        headers = {'content-type': 'application/json'}
+        response = self.session.post(run_uri, data=job_json, headers=headers)
+
+        if response.status_code == 200:
+            resp_json = response.json()
+            job_id = resp_json['job_id']
+            return job_id
+        else:
+            msg = response.text
+            self.log.error(
+                "POST to {uri} failed with status {status}: {msg}".format(
+                    uri=run_uri,
+                    status=response.status_code,
+                    msg=msg,
+                ))
+        
+        response.raise_for_status()
+        return None
+
+
     def report_jobs(self, run_name, job_ids, dead=False):
         """
         Report several jobs to the results server.
@@ -291,12 +325,13 @@ class ResultsReporter(object):
             set_status(job_info, 'dead')
         job_json = json.dumps(job_info)
         headers = {'content-type': 'application/json'}
+        job_uri = os.path.join(run_uri, job_id, '')
 
         inc = random.uniform(0, 1)
         with safe_while(
                 sleep=1, increment=inc, action=f'report job {job_id}') as proceed:
             while proceed():
-                response = self.session.post(run_uri, data=job_json, headers=headers)
+                response = self.session.put(job_uri, data=job_json, headers=headers)
 
                 if response.status_code == 200:
                     return
@@ -313,15 +348,9 @@ class ResultsReporter(object):
                 else:
                     msg = response.text
 
-                if msg and msg.endswith('already exists'):
-                    job_uri = os.path.join(run_uri, job_id, '')
-                    response = self.session.put(job_uri, data=job_json,
-                                                headers=headers)
-                    if response.status_code == 200:
-                        return
-                elif msg:
+                if msg:
                     self.log.error(
-                        "POST to {uri} failed with status {status}: {msg}".format(
+                        "PUT to {uri} failed with status {status}: {msg}".format(
                             uri=run_uri,
                             status=response.status_code,
                             msg=msg,
@@ -351,6 +380,14 @@ class ResultsReporter(object):
         self.__last_run = None
         if os.path.exists(self.last_run_file):
             os.remove(self.last_run_file)
+    
+    def get_top_job(self, machine_type):
+
+        uri = "{base}/queue/pop_queue?machine_type={machine_type}".format(base=self.base_uri,
+                                                                          machine_type=machine_type)
+        response = self.session.get(uri)
+        response.raise_for_status()
+        return response.json()
 
     def get_jobs(self, run_name, job_id=None, fields=None):
         """
@@ -458,6 +495,164 @@ class ResultsReporter(object):
         response = self.session.delete(uri)
         response.raise_for_status()
 
+    def create_queue(self, machine_type):
+        """
+        Create a queue on the results server
+
+        :param machine_type: The machine type specified for the job
+        """
+        uri = "{base}/queue/".format(
+            base=self.base_uri
+        )
+        queue_info = {'machine_type': machine_type}
+        queue_json = json.dumps(queue_info)
+        headers = {'content-type': 'application/json'}
+        response = self.session.post(uri, data=queue_json, headers=headers)
+
+        if response.status_code == 200:
+            self.log.info("Successfully created queue for {machine_type}".format(
+                machine_type=machine_type,
+            ))
+        else:
+            resp_json = response.json()
+            if resp_json:
+                msg = resp_json.get('message', '')
+            else:
+                msg = response.text
+            if msg and msg.endswith('already exists'):
+                return
+            self.log.error(
+                "POST to {uri} failed with status {status}: {msg}".format(
+                    uri=uri,
+                    status=response.status_code,
+                    msg=msg,
+                ))
+
+        response.raise_for_status()
+
+    def update_queue(self, machine_type, paused, paused_by=None, pause_duration=None):
+        uri = "{base}/queue/".format(
+            base=self.base_uri
+        )
+        if pause_duration is not None:
+            pause_duration = int(pause_duration)
+        queue_info = {'machine_type': machine_type, 'paused': paused, 'paused_by': paused_by,
+                      'pause_duration': pause_duration}
+        queue_json = json.dumps(queue_info)
+        headers = {'content-type': 'application/json'}
+        response = self.session.put(uri, data=queue_json, headers=headers)
+
+        if response.status_code == 200:
+            self.log.info("Successfully updated queue for {machine_type}".format(
+                machine_type=machine_type,
+            ))
+        else:
+            msg = response.text
+            self.log.error(
+                "PUT to {uri} failed with status {status}: {msg}".format(
+                    uri=uri,
+                    status=response.status_code,
+                    msg=msg,
+                ))
+
+        response.raise_for_status()
+    
+
+    def queue_stats(self, machine_type):
+        uri = "{base}/queue/stats/".format(
+            base=self.base_uri
+        )
+        queue_info = {'machine_type': machine_type}
+        queue_json = json.dumps(queue_info)
+
+        headers = {'content-type': 'application/json'}
+        response = self.session.post(uri, data=queue_json, headers=headers)
+
+        if response.status_code == 200:
+            self.log.info("Successfully retrieved stats for queue {machine_type}".format(
+                machine_type=machine_type,
+            ))
+            return response.json()
+        else:
+            msg = response.text
+            self.log.error(
+                "POST to {uri} failed with status {status}: {msg}".format(
+                    uri=uri,
+                    status=response.status_code,
+                    msg=msg,
+                ))
+        response.raise_for_status()
+    
+    def queued_jobs(self, machine_type, user):
+        uri = "{base}/queue/queued_jobs/".format(
+            base=self.base_uri
+        )
+        request_info = {'machine_type': machine_type,
+                        'user': user}
+        request_json = json.dumps(request_info)
+        headers = {'content-type': 'application/json'}
+        response = self.session.post(uri, data=request_json, headers=headers)
+
+        if response.status_code == 200:
+            self.log.info("Successfully retrieved jobs for user {user}".format(
+                user=user,
+            ))
+            return response.json()
+        else:
+            msg = response.text
+            self.log.error(
+                "POST to {uri} failed with status {status}: {msg}".format(
+                    uri=uri,
+                    status=response.status_code,
+                    msg=msg,
+                ))
+        response.raise_for_status()
+
+
+def create_machine_type_queue(machine_type):
+    reporter = ResultsReporter()
+    if not reporter.base_uri:
+        return
+    reporter.create_queue(machine_type)
+
+
+def get_user_jobs_queue(machine_type, user):
+    reporter = ResultsReporter()
+    if not reporter.base_uri:
+        return
+    return reporter.queued_jobs(machine_type, user)
+
+
+def pause_queue(machine_type, paused_by, pause_duration):
+    reporter = ResultsReporter()
+    if not reporter.base_uri:
+        return
+    paused = True
+    reporter.update_queue(machine_type, paused, paused_by, pause_duration)
+    paused = False
+    timer = threading.Timer(int(pause_duration), reporter.update_queue, [machine_type, paused, paused_by])
+    timer.daemon = True
+    timer.start()
+    timer.join()
+
+
+def is_queue_paused(machine_type):
+    reporter = ResultsReporter()
+    if not reporter.base_uri:
+        return
+    stats = reporter.queue_stats(machine_type)
+    if stats['paused'] != 0 and stats['paused'] is not None:
+        return True
+    return False
+
+
+def get_queue_stats(machine_type):
+    reporter = ResultsReporter()
+    if not reporter.base_uri:
+        return  
+    stats = reporter.queue_stats(machine_type)
+    return stats
+
 
 def push_job_info(run_name, job_id, job_info, base_uri=None):
     """
@@ -475,6 +670,23 @@ def push_job_info(run_name, job_id, job_info, base_uri=None):
     reporter.report_job(run_name, job_id, job_info)
 
 
+def get_queued_job(machine_type):
+    """
+    Retrieve a job that is queued depending on priority
+
+    """
+    log = init_logging()
+    reporter = ResultsReporter()
+    if not reporter.base_uri:
+        return
+    if is_queue_paused(machine_type) == True:
+        log.info("Teuthology queue for machine type %s is currently paused",
+                  machine_type)
+        return None
+    else:
+        return reporter.get_top_job(machine_type)
+
+
 def try_push_job_info(job_config, extra_info=None):
     """
     Wrap push_job_info, gracefully doing nothing if:
@@ -514,6 +726,36 @@ def try_push_job_info(job_config, extra_info=None):
                       config.results_server)
 
 
+def try_create_job(job_config, extra_info=None):
+    log = init_logging()
+
+    if not config.results_server:
+        log.warning('No results_server in config; not reporting results')
+        return
+
+    reporter = ResultsReporter()
+    if not reporter.base_uri:
+        return
+
+    run_name = job_config['name']
+
+    if extra_info is not None:
+        job_info = extra_info.copy()
+        job_info.update(job_config)
+    else:
+        job_info = job_config
+
+    try:
+        log.debug("Writing job info to %s", config.results_server)
+        job_id = reporter.write_new_job(run_name, job_info)
+        log.info("Job ID: %s", job_id)
+        if job_id is not None:
+            return job_id
+    except report_exceptions:
+        log.exception("Could not report results to %s",
+                      config.results_server)
+
+
 def try_delete_jobs(run_name, job_ids, delete_empty_run=True):
     """
     Using the same error checking and retry mechanism as try_push_job_info(),
index d9af64efc4af29cd8515c283cb693669b8bce822..4836884a9e73e2624e7304181ca909bea40a303a 100644 (file)
@@ -1,9 +1,9 @@
 import os
 import yaml
 
-import teuthology.beanstalk
 from teuthology.misc import get_user, merge_configs
 from teuthology import report
+from teuthology.config import config
 
 
 def main(args):
@@ -35,13 +35,13 @@ def main(args):
     backend = args['--queue-backend']
     if args['--dry-run']:
         print('---\n' + yaml.safe_dump(job_config))
-    elif backend == 'beanstalk':
+    elif backend == 'paddles':
         schedule_job(job_config, args['--num'], report_status)
     elif backend.startswith('@'):
         dump_job_to_file(backend.lstrip('@'), job_config, args['--num'])
     else:
         raise ValueError("Provided schedule backend '%s' is not supported. "
-                         "Try 'beanstalk' or '@path-to-a-file" % backend)
+                         "Try 'paddles' or '@path-to-a-file" % backend)
 
 
 def build_config(args):
@@ -96,22 +96,19 @@ def schedule_job(job_config, num=1, report_status=True):
     """
     num = int(num)
     job = yaml.safe_dump(job_config)
-    tube = job_config.pop('tube')
-    beanstalk = teuthology.beanstalk.connect()
-    beanstalk.use(tube)
+
+    '''
+    Add 'machine_type' queue to DB here.
+    '''
+    report.create_machine_type_queue(job_config['machine_type'])
     while num > 0:
-        jid = beanstalk.put(
-            job,
-            ttr=60 * 60 * 24,
-            priority=job_config['priority'],
-        )
-        print('Job scheduled with name {name} and ID {jid}'.format(
-            name=job_config['name'], jid=jid))
-        job_config['job_id'] = str(jid)
-        if report_status:
-            report.try_push_job_info(job_config, dict(status='queued'))
-        num -= 1
 
+        job_id = report.try_create_job(job_config, dict(status='queued'))
+        print('Job scheduled with name {name} and ID {job_id}'.format(
+            name=job_config['name'], job_id=job_id))
+        job_config['job_id'] = str(job_id)
+        
+        num -= 1
 
 def dump_job_to_file(path, job_config, num=1):
     """
index b99fab33d7546d5ff02e6d9009ec604fdf4c7949..89a7304229f03d6db4829a8038d62f9715521bbd 100644 (file)
@@ -9,7 +9,6 @@ import yaml
 from datetime import datetime
 
 from teuthology import setup_log_file, install_except_hook
-from teuthology import beanstalk
 from teuthology import report
 from teuthology import safepath
 from teuthology.config import config as teuth_config
@@ -58,14 +57,22 @@ def load_config(ctx=None):
             teuth_config.archive_base = ctx.archive_dir
 
 
+def clean_config(config):
+    result = {}
+    for key in config:
+        if config[key] is not None:
+            result[key] = config[key]
+    return result
+
+
 def main(ctx):
     loglevel = logging.INFO
     if ctx.verbose:
         loglevel = logging.DEBUG
     log.setLevel(loglevel)
 
-    log_file_path = os.path.join(ctx.log_dir, 'worker.{tube}.{pid}'.format(
-        pid=os.getpid(), tube=ctx.tube,))
+    log_file_path = os.path.join(ctx.log_dir, 'worker.{machine_type}.{pid}'.format(
+        pid=os.getpid(), machine_type=ctx.machine_type,))
     setup_log_file(log_file_path)
 
     install_except_hook()
@@ -74,8 +81,6 @@ def main(ctx):
 
     set_config_attr(ctx)
 
-    connection = beanstalk.connect()
-    beanstalk.watch_tube(connection, ctx.tube)
     result_proc = None
 
     if teuth_config.teuthology_path is None:
@@ -98,17 +103,16 @@ def main(ctx):
 
         load_config()
 
-        job = connection.reserve(timeout=60)
+        job = report.get_queued_job(ctx.machine_type)
         if job is None:
             continue
 
-        # bury the job so it won't be re-run if it fails
-        job.bury()
-        job_id = job.jid
-        log.info('Reserved job %d', job_id)
-        log.info('Config is: %s', job.body)
-        job_config = yaml.safe_load(job.body)
-        job_config['job_id'] = str(job_id)
+        job = clean_config(job)
+        report.try_push_job_info(job, dict(status='running'))
+        job_id = job.get('job_id')
+        log.info('Reserved job %s', job_id)
+        log.info('Config is: %s', job)
+        job_config = job
 
         if job_config.get('stop_worker'):
             keep_running = False
@@ -128,13 +132,6 @@ def main(ctx):
         except SkipJob:
             continue
 
-        # This try/except block is to keep the worker from dying when
-        # beanstalkc throws a SocketError
-        try:
-            job.delete()
-        except Exception:
-            log.exception("Saw exception while trying to delete job")
-
 
 def prep_job(job_config, log_file_path, archive_dir):
     job_id = job_config['job_id']