From: Mehdi Abaakouk Date: Mon, 15 Feb 2016 21:06:54 +0000 (+0100) Subject: ceph.in: Use new python rados module X-Git-Tag: v10.1.0~369^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=7216b06bc52e3d2de36ced70539c2c336e13bdb3;p=ceph.git ceph.in: Use new python rados module This implements run_in_thread inside the ceph command itself. And fixes the ceph command bootstrap when it run inside the source tree to correctly load the new rados python module. Signed-off-by: Mehdi Abaakouk --- diff --git a/src/ceph.in b/src/ceph.in index e9df0cfdaa2..6368ad0d972 100755 --- a/src/ceph.in +++ b/src/ceph.in @@ -36,7 +36,7 @@ MYPATH = os.path.abspath(__file__) MYDIR = os.path.dirname(MYPATH) DEVMODEMSG = '*** DEVELOPER MODE: setting PATH, PYTHONPATH and LD_LIBRARY_PATH ***' -def respawn_in_path(lib_path, pybind_path): +def respawn_in_path(lib_path, pybind_path, pythonlib_path): execv_cmd = ['python'] if 'CEPH_DBG' in os.environ: execv_cmd += ['-mpdb'] @@ -58,12 +58,23 @@ def respawn_in_path(lib_path, pybind_path): print >> sys.stderr, DEVMODEMSG os.execvp(py_binary, execv_cmd + sys.argv) sys.path.insert(0, os.path.join(MYDIR, pybind_path)) + sys.path.insert(0, os.path.join(MYDIR, pythonlib_path)) + +def get_pythonlib_dir(): + """Returns the name of a distutils build directory""" + import sysconfig + f = "lib.{platform}-{version[0]}.{version[1]}" + name = f.format(platform=sysconfig.get_platform(), + version=sys.version_info) + return os.path.join('build', name) if MYDIR.endswith('src') and \ os.path.exists(os.path.join(MYDIR, '.libs')) and \ - os.path.exists(os.path.join(MYDIR, 'pybind')): + os.path.exists(os.path.join(MYDIR, 'pybind')) and \ + os.path.exists(os.path.join(MYDIR, 'build')): - respawn_in_path(os.path.join(MYDIR, '.libs'), "pybind") + pythonlib_path = get_pythonlib_dir() + respawn_in_path(os.path.join(MYDIR, '.libs'), "pybind", pythonlib_path) if os.environ.has_key('PATH') and MYDIR not in os.environ['PATH']: os.environ['PATH'] += ':' + MYDIR @@ -81,9 +92,11 @@ elif os.path.exists(os.path.join(os.getcwd(), "CMakeCache.txt")) \ # Developer mode, but in a cmake build dir instead of the src dir lib_path = os.path.join(os.getcwd(), "src") pybind_path = os.path.join(src_path, "src", "pybind") - respawn_in_path(lib_path, pybind_path) + pythonlib_path = os.path.join(src_path, "src", get_pythonlib_dir()) + respawn_in_path(lib_path, pybind_path, pythonlib_path) - sys.path.insert(0, os.path.join(MYDIR, pybind_path)) + sys.path.insert(0, os.path.join(MYDIR, pybind_path)) + sys.path.insert(0, os.path.join(MYDIR, pythonlib_path)) # Add src/ to path for e.g. ceph-conf if os.environ.has_key('PATH') and lib_path not in os.environ['PATH']: @@ -101,7 +114,7 @@ import subprocess from ceph_argparse import \ concise_sig, descsort, parse_json_funcsigs, \ matchnum, validate_command, find_cmd_target, \ - send_command, json_command + send_command, json_command, run_in_thread from ceph_daemon import DaemonWatcher, admin_socket @@ -525,12 +538,12 @@ def ping_monitor(cluster_handle, name, timeout): mon_id = name[len('mon.'):] if (mon_id == '*') : - cluster_handle.connect(timeout=timeout) + run_in_thread(cluster_handle.connect, timeout=timeout) for m in monids() : - s = cluster_handle.ping_monitor(m) + s = run_in_thread(cluster_handle.ping_monitor, m) print "mon.{0}".format(m) + '\n' + s else : - s = cluster_handle.ping_monitor(mon_id) + s = run_in_thread(cluster_handle.ping_monitor, mon_id) print s return 0 @@ -668,10 +681,11 @@ def main(): clustername = parsed_args.cluster try: - cluster_handle = rados.Rados(name=name, clustername=clustername, - conf_defaults=conf_defaults, - conffile=conffile) - retargs = cluster_handle.conf_parse_argv(childargs) + cluster_handle = run_in_thread(rados.Rados, + name=name, clustername=clustername, + conf_defaults=conf_defaults, + conffile=conffile) + retargs = run_in_thread(cluster_handle.conf_parse_argv, childargs) except rados.Error as e: print >> sys.stderr, 'Error initializing cluster client: {0}'.\ format(repr(e)) @@ -714,7 +728,7 @@ def main(): try: if childargs and childargs[0] == 'ping': return ping_monitor(cluster_handle, childargs[1], timeout) - cluster_handle.connect(timeout=timeout) + run_in_thread(cluster_handle.connect, timeout=timeout) except KeyboardInterrupt: print >> sys.stderr, 'Cluster connection aborted' return 1 @@ -761,7 +775,7 @@ def main(): # this instance keeps the watch connection alive, but is # otherwise unused - rados.MonitorLog(cluster_handle, level, watch_cb, 0) + run_in_thread(cluster_handle.monitor_log, level, watch_cb, 0) # loop forever letting watch_cb print lines try: @@ -939,5 +953,5 @@ if __name__ == '__main__': retval = main() # shutdown explicitly; Rados() does not if cluster_handle: - cluster_handle.shutdown() + run_in_thread(cluster_handle.shutdown) sys.exit(retval) diff --git a/src/pybind/ceph_argparse.py b/src/pybind/ceph_argparse.py index 9a830575d44..7d5f4cfd3da 100644 --- a/src/pybind/ceph_argparse.py +++ b/src/pybind/ceph_argparse.py @@ -10,6 +10,7 @@ Copyright (C) 2013 Inktank Storage, Inc. LGPL2. See file COPYING. """ import copy +import errno import json import os import pprint @@ -17,6 +18,7 @@ import re import socket import stat import sys +import threading import types import uuid @@ -1089,6 +1091,63 @@ def find_cmd_target(childargs): return 'mon', '' +class RadosThread(threading.Thread): + def __init__(self, target, *args, **kwargs): + self.args = args + self.kwargs = kwargs + self.target = target + self.exception = None + threading.Thread.__init__(self) + + def run(self): + try: + self.retval = self.target(*self.args, **self.kwargs) + except Exception as e: + self.exception = e + + +# time in seconds between each call to t.join() for child thread +POLL_TIME_INCR = 0.5 + + +def run_in_thread(target, *args, **kwargs): + interrupt = False + timeout = kwargs.pop('timeout', 0) + countdown = timeout + t = RadosThread(target, *args, **kwargs) + + # allow the main thread to exit (presumably, avoid a join() on this + # subthread) before this thread terminates. This allows SIGINT + # exit of a blocked call. See below. + t.daemon = True + + t.start() + try: + # poll for thread exit + while t.is_alive(): + t.join(POLL_TIME_INCR) + if timeout and t.is_alive(): + countdown = countdown - POLL_TIME_INCR + if countdown <= 0: + raise KeyboardInterrupt + + t.join() # in case t exits before reaching the join() above + except KeyboardInterrupt: + # ..but allow SIGINT to terminate the waiting. Note: this + # relies on the Linux kernel behavior of delivering the signal + # to the main thread in preference to any subthread (all that's + # strictly guaranteed is that *some* thread that has the signal + # unblocked will receive it). But there doesn't seem to be + # any interface to create t with SIGINT blocked. + interrupt = True + + if interrupt: + t.retval = -errno.EINTR + if t.exception: + raise t.exception + return t.retval + + def send_command(cluster, target=('mon', ''), cmd=None, inbuf='', timeout=0, verbose=False): """ @@ -1110,8 +1169,8 @@ def send_command(cluster, target=('mon', ''), cmd=None, inbuf='', timeout=0, if verbose: print >> sys.stderr, 'submit {0} to osd.{1}'.\ format(cmd, osdid) - ret, outbuf, outs = \ - cluster.osd_command(osdid, cmd, inbuf, timeout) + ret, outbuf, outs = run_in_thread( + cluster.osd_command, osdid, cmd, inbuf, timeout) elif target[0] == 'pg': pgid = target[1] @@ -1126,17 +1185,19 @@ def send_command(cluster, target=('mon', ''), cmd=None, inbuf='', timeout=0, if verbose: print >> sys.stderr, 'submit {0} for pgid {1}'.\ format(cmd, pgid) - ret, outbuf, outs = \ - cluster.pg_command(pgid, cmd, inbuf, timeout) + ret, outbuf, outs = run_in_thread( + cluster.pg_command, pgid, cmd, inbuf, timeout) elif target[0] == 'mon': if verbose: print >> sys.stderr, '{0} to {1}'.\ format(cmd, target[0]) if target[1] == '': - ret, outbuf, outs = cluster.mon_command(cmd, inbuf, timeout) + ret, outbuf, outs = run_in_thread( + cluster.mon_command, cmd, inbuf, timeout) else: - ret, outbuf, outs = cluster.mon_command(cmd, inbuf, timeout, target[1]) + ret, outbuf, outs = run_in_thread( + cluster.mon_command, cmd, inbuf, timeout, target[1]) elif target[0] == 'mds': mds_spec = target[1]