From: Zack Cerza Date: Tue, 15 Apr 2014 00:28:54 +0000 (-0500) Subject: Use shared methods to connect to beanstalkd X-Git-Tag: 1.1.0~1533^2~6 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=1449e753abb637f957d7a406913090dcb9ec910b;p=teuthology.git Use shared methods to connect to beanstalkd Signed-off-by: Zack Cerza --- diff --git a/teuthology/beanstalk.py b/teuthology/beanstalk.py index d76d914f5..fcb3c6806 100644 --- a/teuthology/beanstalk.py +++ b/teuthology/beanstalk.py @@ -7,29 +7,31 @@ from .config import config log = logging.getLogger(__name__) -def beanstalk_connect(machine_type): - qhost = config.queue_host - qport = config.queue_port - if qhost is None or qport is None: +def connect(): + host = config.queue_host + port = config.queue_port + if host is None or port is None: raise RuntimeError( 'Beanstalk queue information not found in {conf_path}'.format( conf_path=config.teuthology_yaml)) - log.info("Checking Beanstalk Queue...") - beanstalk = beanstalkc.Connection(host=qhost, port=qport) - beanstalk.watch(machine_type) - beanstalk.ignore('default') - return beanstalk + return beanstalkc.Connection(host=host, port=port) + +def watch_tube(connection, tube_name): + connection.watch(tube_name) + connection.ignore('default') -def walk_jobs(beanstalk, machine_type, show_desc=False, delete=None): - job_count = beanstalk.stats_tube(machine_type)['current-jobs-ready'] + +def walk_jobs(connection, machine_type, show_desc=False, delete=None): + log.info("Checking Beanstalk Queue...") + job_count = connection.stats_tube(machine_type)['current-jobs-ready'] if job_count == 0: log.info('No jobs in Beanstalk Queue') return x = 1 while x < job_count: x += 1 - job = beanstalk.reserve(timeout=20) + job = connection.reserve(timeout=20) if job is not None and job.body is not None: job_config = yaml.safe_load(job.body) job_name = job_config['name'] @@ -66,9 +68,10 @@ def main(args): delete = args['--delete'] show_desc = args['--description'] try: - beanstalk = beanstalk_connect(machine_type) - walk_jobs(beanstalk, machine_type, show_desc=show_desc, delete=delete) + connection = connect() + watch_tube(connection, machine_type) + walk_jobs(connection, machine_type, show_desc=show_desc, delete=delete) except KeyboardInterrupt: log.info("Interrupted.") finally: - beanstalk.close() + connection.close() diff --git a/teuthology/queue.py b/teuthology/queue.py index 2ab2e69f6..0b0f49eda 100644 --- a/teuthology/queue.py +++ b/teuthology/queue.py @@ -7,10 +7,10 @@ import sys import tempfile import time import yaml -import beanstalkc from datetime import datetime +from . import beanstalk from . import report from . import safepath from .config import config as teuth_config @@ -56,12 +56,6 @@ class filelock(object): self.fd = None -def connect(ctx): - host = ctx.teuthology_config['queue_host'] - port = ctx.teuthology_config['queue_port'] - return beanstalkc.Connection(host=host, port=port) - - def fetch_teuthology_branch(path, branch='master'): """ Make sure we have the correct teuthology branch checked out and up-to-date @@ -149,15 +143,14 @@ def worker(ctx): read_config(ctx) - beanstalk = connect(ctx) - beanstalk.watch(ctx.tube) - beanstalk.ignore('default') + connection = beanstalk.connect() + connection.watch_tube(ctx.tube) while True: if need_restart(): restart() - job = beanstalk.reserve(timeout=60) + job = connection.reserve(timeout=60) if job is None: continue diff --git a/teuthology/schedule.py b/teuthology/schedule.py index 5aa51ed8c..870c72c96 100644 --- a/teuthology/schedule.py +++ b/teuthology/schedule.py @@ -1,6 +1,6 @@ import yaml -import teuthology.queue +import teuthology.beanstalk from teuthology.misc import get_user from teuthology.misc import read_config @@ -10,7 +10,7 @@ def main(ctx): ctx.owner = 'scheduled_{user}'.format(user=get_user()) read_config(ctx) - beanstalk = teuthology.queue.connect(ctx) + beanstalk = teuthology.beanstalk.connect() tube = ctx.worker beanstalk.use(tube)