]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
ceph.in: Use new python rados module
authorMehdi Abaakouk <sileht@redhat.com>
Mon, 15 Feb 2016 21:06:54 +0000 (22:06 +0100)
committerMehdi Abaakouk <sileht@redhat.com>
Wed, 17 Feb 2016 11:32:38 +0000 (12:32 +0100)
This implements run_in_thread inside the ceph command itself.

And fixes the ceph command bootstrap when it run inside the
source tree to correctly load the new rados python module.

Signed-off-by: Mehdi Abaakouk <sileht@redhat.com>
src/ceph.in
src/pybind/ceph_argparse.py

index e9df0cfdaa29c2d654ef6b03fa46e1f6f38bb229..6368ad0d972538136c10c5b670192b81da174c25 100755 (executable)
@@ -36,7 +36,7 @@ MYPATH = os.path.abspath(__file__)
 MYDIR = os.path.dirname(MYPATH)
 DEVMODEMSG = '*** DEVELOPER MODE: setting PATH, PYTHONPATH and LD_LIBRARY_PATH ***'
 
-def respawn_in_path(lib_path, pybind_path):
+def respawn_in_path(lib_path, pybind_path, pythonlib_path):
     execv_cmd = ['python']
     if 'CEPH_DBG' in os.environ:
         execv_cmd += ['-mpdb']
@@ -58,12 +58,23 @@ def respawn_in_path(lib_path, pybind_path):
         print >> sys.stderr, DEVMODEMSG
         os.execvp(py_binary, execv_cmd + sys.argv)
     sys.path.insert(0, os.path.join(MYDIR, pybind_path))
+    sys.path.insert(0, os.path.join(MYDIR, pythonlib_path))
+
+def get_pythonlib_dir():
+    """Returns the name of a distutils build directory"""
+    import sysconfig
+    f = "lib.{platform}-{version[0]}.{version[1]}"
+    name = f.format(platform=sysconfig.get_platform(),
+                    version=sys.version_info)
+    return os.path.join('build', name)
 
 if MYDIR.endswith('src') and \
    os.path.exists(os.path.join(MYDIR, '.libs')) and \
-   os.path.exists(os.path.join(MYDIR, 'pybind')):
+   os.path.exists(os.path.join(MYDIR, 'pybind')) and \
+   os.path.exists(os.path.join(MYDIR, 'build')):
 
-    respawn_in_path(os.path.join(MYDIR, '.libs'), "pybind")
+    pythonlib_path = get_pythonlib_dir()
+    respawn_in_path(os.path.join(MYDIR, '.libs'), "pybind", pythonlib_path)
     if os.environ.has_key('PATH') and MYDIR not in os.environ['PATH']:
         os.environ['PATH'] += ':' + MYDIR
 
@@ -81,9 +92,11 @@ elif os.path.exists(os.path.join(os.getcwd(), "CMakeCache.txt")) \
         # Developer mode, but in a cmake build dir instead of the src dir
         lib_path = os.path.join(os.getcwd(), "src")
         pybind_path = os.path.join(src_path, "src", "pybind")
-        respawn_in_path(lib_path, pybind_path)
+        pythonlib_path = os.path.join(src_path, "src", get_pythonlib_dir())
+        respawn_in_path(lib_path, pybind_path, pythonlib_path)
 
-    sys.path.insert(0, os.path.join(MYDIR, pybind_path))
+        sys.path.insert(0, os.path.join(MYDIR, pybind_path))
+        sys.path.insert(0, os.path.join(MYDIR, pythonlib_path))
 
     # Add src/ to path for e.g. ceph-conf
     if os.environ.has_key('PATH') and lib_path not in os.environ['PATH']:
@@ -101,7 +114,7 @@ import subprocess
 from ceph_argparse import \
     concise_sig, descsort, parse_json_funcsigs, \
     matchnum, validate_command, find_cmd_target, \
-    send_command, json_command
+    send_command, json_command, run_in_thread
 
 from ceph_daemon import DaemonWatcher, admin_socket
 
@@ -525,12 +538,12 @@ def ping_monitor(cluster_handle, name, timeout):
 
     mon_id = name[len('mon.'):]
     if (mon_id == '*') :
-        cluster_handle.connect(timeout=timeout)
+        run_in_thread(cluster_handle.connect, timeout=timeout)
         for m in monids() :
-            s = cluster_handle.ping_monitor(m)
+            s = run_in_thread(cluster_handle.ping_monitor, m)
             print "mon.{0}".format(m) + '\n' + s
     else :
-            s = cluster_handle.ping_monitor(mon_id)
+            s = run_in_thread(cluster_handle.ping_monitor, mon_id)
             print s
     return 0
 
@@ -668,10 +681,11 @@ def main():
         clustername = parsed_args.cluster
 
     try:
-        cluster_handle = rados.Rados(name=name, clustername=clustername,
-                                     conf_defaults=conf_defaults,
-                                     conffile=conffile)
-        retargs = cluster_handle.conf_parse_argv(childargs)
+        cluster_handle = run_in_thread(rados.Rados,
+                                       name=name, clustername=clustername,
+                                       conf_defaults=conf_defaults,
+                                       conffile=conffile)
+        retargs = run_in_thread(cluster_handle.conf_parse_argv, childargs)
     except rados.Error as e:
         print >> sys.stderr, 'Error initializing cluster client: {0}'.\
             format(repr(e))
@@ -714,7 +728,7 @@ def main():
     try:
         if childargs and childargs[0] == 'ping':
             return ping_monitor(cluster_handle, childargs[1], timeout)
-        cluster_handle.connect(timeout=timeout)
+        run_in_thread(cluster_handle.connect, timeout=timeout)
     except KeyboardInterrupt:
         print >> sys.stderr, 'Cluster connection aborted'
         return 1
@@ -761,7 +775,7 @@ def main():
 
         # this instance keeps the watch connection alive, but is
         # otherwise unused
-        rados.MonitorLog(cluster_handle, level, watch_cb, 0)
+        run_in_thread(cluster_handle.monitor_log, level, watch_cb, 0)
 
         # loop forever letting watch_cb print lines
         try:
@@ -939,5 +953,5 @@ if __name__ == '__main__':
     retval = main()
     # shutdown explicitly; Rados() does not
     if cluster_handle:
-        cluster_handle.shutdown()
+        run_in_thread(cluster_handle.shutdown)
     sys.exit(retval)
index 9a830575d44856e7ad386e6fa56816744ce8174b..7d5f4cfd3da051af49a084edd61d736a74336a78 100644 (file)
@@ -10,6 +10,7 @@ Copyright (C) 2013 Inktank Storage, Inc.
 LGPL2.  See file COPYING.
 """
 import copy
+import errno
 import json
 import os
 import pprint
@@ -17,6 +18,7 @@ import re
 import socket
 import stat
 import sys
+import threading
 import types
 import uuid
 
@@ -1089,6 +1091,63 @@ def find_cmd_target(childargs):
     return 'mon', ''
 
 
+class RadosThread(threading.Thread):
+    def __init__(self, target, *args, **kwargs):
+        self.args = args
+        self.kwargs = kwargs
+        self.target = target
+       self.exception = None
+        threading.Thread.__init__(self)
+
+    def run(self):
+        try:
+               self.retval = self.target(*self.args, **self.kwargs)
+       except Exception as e:
+               self.exception = e
+
+
+# time in seconds between each call to t.join() for child thread
+POLL_TIME_INCR = 0.5
+
+
+def run_in_thread(target, *args, **kwargs):
+    interrupt = False
+    timeout = kwargs.pop('timeout', 0)
+    countdown = timeout
+    t = RadosThread(target, *args, **kwargs)
+
+    # allow the main thread to exit (presumably, avoid a join() on this
+    # subthread) before this thread terminates.  This allows SIGINT
+    # exit of a blocked call.  See below.
+    t.daemon = True
+
+    t.start()
+    try:
+        # poll for thread exit
+        while t.is_alive():
+            t.join(POLL_TIME_INCR)
+            if timeout and t.is_alive():
+                countdown = countdown - POLL_TIME_INCR
+                if countdown <= 0:
+                    raise KeyboardInterrupt
+
+        t.join()        # in case t exits before reaching the join() above
+    except KeyboardInterrupt:
+        # ..but allow SIGINT to terminate the waiting.  Note: this
+        # relies on the Linux kernel behavior of delivering the signal
+        # to the main thread in preference to any subthread (all that's
+        # strictly guaranteed is that *some* thread that has the signal
+        # unblocked will receive it).  But there doesn't seem to be
+        # any interface to create t with SIGINT blocked.
+        interrupt = True
+
+    if interrupt:
+        t.retval = -errno.EINTR
+    if t.exception:
+        raise t.exception
+    return t.retval
+
+
 def send_command(cluster, target=('mon', ''), cmd=None, inbuf='', timeout=0,
                  verbose=False):
     """
@@ -1110,8 +1169,8 @@ def send_command(cluster, target=('mon', ''), cmd=None, inbuf='', timeout=0,
             if verbose:
                 print >> sys.stderr, 'submit {0} to osd.{1}'.\
                     format(cmd, osdid)
-            ret, outbuf, outs = \
-                cluster.osd_command(osdid, cmd, inbuf, timeout)
+            ret, outbuf, outs = run_in_thread(
+                cluster.osd_commandosdid, cmd, inbuf, timeout)
 
         elif target[0] == 'pg':
             pgid = target[1]
@@ -1126,17 +1185,19 @@ def send_command(cluster, target=('mon', ''), cmd=None, inbuf='', timeout=0,
             if verbose:
                 print >> sys.stderr, 'submit {0} for pgid {1}'.\
                     format(cmd, pgid)
-            ret, outbuf, outs = \
-                cluster.pg_command(pgid, cmd, inbuf, timeout)
+            ret, outbuf, outs = run_in_thread(
+                cluster.pg_commandpgid, cmd, inbuf, timeout)
 
         elif target[0] == 'mon':
             if verbose:
                 print >> sys.stderr, '{0} to {1}'.\
                     format(cmd, target[0])
             if target[1] == '':
-                ret, outbuf, outs = cluster.mon_command(cmd, inbuf, timeout)
+                ret, outbuf, outs = run_in_thread(
+                    cluster.mon_command, cmd, inbuf, timeout)
             else:
-                ret, outbuf, outs = cluster.mon_command(cmd, inbuf, timeout, target[1])
+                ret, outbuf, outs = run_in_thread(
+                    cluster.mon_command, cmd, inbuf, timeout, target[1])
         elif target[0] == 'mds':
             mds_spec = target[1]