log = logging.getLogger(__name__)
-def beanstalk_connect(machine_type):
- qhost = config.queue_host
- qport = config.queue_port
- if qhost is None or qport is None:
+def connect():
+ host = config.queue_host
+ port = config.queue_port
+ if host is None or port is None:
raise RuntimeError(
'Beanstalk queue information not found in {conf_path}'.format(
conf_path=config.teuthology_yaml))
- log.info("Checking Beanstalk Queue...")
- beanstalk = beanstalkc.Connection(host=qhost, port=qport)
- beanstalk.watch(machine_type)
- beanstalk.ignore('default')
- return beanstalk
+ return beanstalkc.Connection(host=host, port=port)
+
+def watch_tube(connection, tube_name):
+ connection.watch(tube_name)
+ connection.ignore('default')
-def walk_jobs(beanstalk, machine_type, show_desc=False, delete=None):
- job_count = beanstalk.stats_tube(machine_type)['current-jobs-ready']
+
+def walk_jobs(connection, machine_type, show_desc=False, delete=None):
+ log.info("Checking Beanstalk Queue...")
+ job_count = connection.stats_tube(machine_type)['current-jobs-ready']
if job_count == 0:
log.info('No jobs in Beanstalk Queue')
return
x = 1
while x < job_count:
x += 1
- job = beanstalk.reserve(timeout=20)
+ job = connection.reserve(timeout=20)
if job is not None and job.body is not None:
job_config = yaml.safe_load(job.body)
job_name = job_config['name']
delete = args['--delete']
show_desc = args['--description']
try:
- beanstalk = beanstalk_connect(machine_type)
- walk_jobs(beanstalk, machine_type, show_desc=show_desc, delete=delete)
+ connection = connect()
+ watch_tube(connection, machine_type)
+ walk_jobs(connection, machine_type, show_desc=show_desc, delete=delete)
except KeyboardInterrupt:
log.info("Interrupted.")
finally:
- beanstalk.close()
+ connection.close()
import tempfile
import time
import yaml
-import beanstalkc
from datetime import datetime
+from . import beanstalk
from . import report
from . import safepath
from .config import config as teuth_config
self.fd = None
-def connect(ctx):
- host = ctx.teuthology_config['queue_host']
- port = ctx.teuthology_config['queue_port']
- return beanstalkc.Connection(host=host, port=port)
-
-
def fetch_teuthology_branch(path, branch='master'):
"""
Make sure we have the correct teuthology branch checked out and up-to-date
read_config(ctx)
- beanstalk = connect(ctx)
- beanstalk.watch(ctx.tube)
- beanstalk.ignore('default')
+ connection = beanstalk.connect()
+ connection.watch_tube(ctx.tube)
while True:
if need_restart():
restart()
- job = beanstalk.reserve(timeout=60)
+ job = connection.reserve(timeout=60)
if job is None:
continue