]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Use shared methods to connect to beanstalkd
authorZack Cerza <zack@cerza.org>
Tue, 15 Apr 2014 00:28:54 +0000 (19:28 -0500)
committerZack Cerza <zack@cerza.org>
Thu, 17 Apr 2014 17:27:41 +0000 (12:27 -0500)
Signed-off-by: Zack Cerza <zack.cerza@inktank.com>
teuthology/beanstalk.py
teuthology/queue.py
teuthology/schedule.py

index d76d914f5b7445f1ec6ab1df362524b5e961ac06..fcb3c6806dcffd8416df89abcb057ec864e8cf5f 100644 (file)
@@ -7,29 +7,31 @@ from .config import config
 log = logging.getLogger(__name__)
 
 
-def beanstalk_connect(machine_type):
-    qhost = config.queue_host
-    qport = config.queue_port
-    if qhost is None or qport is None:
+def connect():
+    host = config.queue_host
+    port = config.queue_port
+    if host is None or port is None:
         raise RuntimeError(
             'Beanstalk queue information not found in {conf_path}'.format(
                 conf_path=config.teuthology_yaml))
-    log.info("Checking Beanstalk Queue...")
-    beanstalk = beanstalkc.Connection(host=qhost, port=qport)
-    beanstalk.watch(machine_type)
-    beanstalk.ignore('default')
-    return beanstalk
+    return beanstalkc.Connection(host=host, port=port)
+
 
+def watch_tube(connection, tube_name):
+    connection.watch(tube_name)
+    connection.ignore('default')
 
-def walk_jobs(beanstalk, machine_type, show_desc=False, delete=None):
-    job_count = beanstalk.stats_tube(machine_type)['current-jobs-ready']
+
+def walk_jobs(connection, machine_type, show_desc=False, delete=None):
+    log.info("Checking Beanstalk Queue...")
+    job_count = connection.stats_tube(machine_type)['current-jobs-ready']
     if job_count == 0:
         log.info('No jobs in Beanstalk Queue')
         return
     x = 1
     while x < job_count:
         x += 1
-        job = beanstalk.reserve(timeout=20)
+        job = connection.reserve(timeout=20)
         if job is not None and job.body is not None:
             job_config = yaml.safe_load(job.body)
             job_name = job_config['name']
@@ -66,9 +68,10 @@ def main(args):
     delete = args['--delete']
     show_desc = args['--description']
     try:
-        beanstalk = beanstalk_connect(machine_type)
-        walk_jobs(beanstalk, machine_type, show_desc=show_desc, delete=delete)
+        connection = connect()
+        watch_tube(connection, machine_type)
+        walk_jobs(connection, machine_type, show_desc=show_desc, delete=delete)
     except KeyboardInterrupt:
         log.info("Interrupted.")
     finally:
-        beanstalk.close()
+        connection.close()
index 2ab2e69f658c099a8d76352c961a339a8e0c64d0..0b0f49edab5899d259013497a2ee2c6f0b647fba 100644 (file)
@@ -7,10 +7,10 @@ import sys
 import tempfile
 import time
 import yaml
-import beanstalkc
 
 from datetime import datetime
 
+from . import beanstalk
 from . import report
 from . import safepath
 from .config import config as teuth_config
@@ -56,12 +56,6 @@ class filelock(object):
         self.fd = None
 
 
-def connect(ctx):
-    host = ctx.teuthology_config['queue_host']
-    port = ctx.teuthology_config['queue_port']
-    return beanstalkc.Connection(host=host, port=port)
-
-
 def fetch_teuthology_branch(path, branch='master'):
     """
     Make sure we have the correct teuthology branch checked out and up-to-date
@@ -149,15 +143,14 @@ def worker(ctx):
 
     read_config(ctx)
 
-    beanstalk = connect(ctx)
-    beanstalk.watch(ctx.tube)
-    beanstalk.ignore('default')
+    connection = beanstalk.connect()
+    connection.watch_tube(ctx.tube)
 
     while True:
         if need_restart():
             restart()
 
-        job = beanstalk.reserve(timeout=60)
+        job = connection.reserve(timeout=60)
         if job is None:
             continue
 
index 5aa51ed8cfe829dec836a9c8a05f5450c363ee60..870c72c96d8c3eb579795f77cb13aeaacaf1303a 100644 (file)
@@ -1,6 +1,6 @@
 import yaml
 
-import teuthology.queue
+import teuthology.beanstalk
 from teuthology.misc import get_user
 from teuthology.misc import read_config
 
@@ -10,7 +10,7 @@ def main(ctx):
         ctx.owner = 'scheduled_{user}'.format(user=get_user())
     read_config(ctx)
 
-    beanstalk = teuthology.queue.connect(ctx)
+    beanstalk = teuthology.beanstalk.connect()
 
     tube = ctx.worker
     beanstalk.use(tube)