#!@Python3_EXECUTABLE@ # -*- mode:python -*- # vim: ts=4 sw=4 smarttab expandtab # # Processed in Makefile to add python #! line and version variable # # """ ceph.in becomes ceph, the command-line management tool for Ceph clusters. This is a replacement for tools/ceph.cc and tools/common.cc. Copyright (C) 2013 Inktank Storage, Inc. This is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License version 2, as published by the Free Software Foundation. See file COPYING. """ from time import sleep import grp import os import pwd import re import shutil import stat import sys import time import platform from typing import Dict, List, Sequence, Tuple try: input = raw_input except NameError: pass CEPH_GIT_VER = "@CEPH_GIT_VER@" CEPH_GIT_NICE_VER = "@CEPH_GIT_NICE_VER@" CEPH_RELEASE = "@CEPH_RELEASE@" CEPH_RELEASE_NAME = "@CEPH_RELEASE_NAME@" CEPH_RELEASE_TYPE = "@CEPH_RELEASE_TYPE@" # priorities from src/common/perf_counters.h PRIO_CRITICAL = 10 PRIO_INTERESTING = 8 PRIO_USEFUL = 5 PRIO_UNINTERESTING = 2 PRIO_DEBUGONLY = 0 PRIO_DEFAULT = PRIO_INTERESTING # Make life easier on developers: # If our parent dir contains CMakeCache.txt and bin/init-ceph, # assume we're running from a build dir (i.e. src/build/bin/ceph) # and tweak sys.path and LD_LIBRARY_PATH to use built files. # Since this involves re-execing, if CEPH_DBG is set in the environment # re-exec with -mpdb. Also, if CEPH_DEV is in the env, suppress # the warning message about the DEVELOPER MODE. MYPATH = os.path.abspath(__file__) MYDIR = os.path.dirname(MYPATH) MYPDIR = os.path.dirname(MYDIR) DEVMODEMSG = '*** DEVELOPER MODE: setting PATH, PYTHONPATH and LD_LIBRARY_PATH ***' def add_to_ld_path(path_name, path): paths = re.split('[ :]', os.environ.get(path_name, '')) if path in paths: return 0 else: paths.insert(0, path) os.environ[path_name] = ':'.join(paths) return 1 def respawn_in_path(lib_path, pybind_path, pythonlib_path, asan_lib_path): if platform.system() == "Darwin": lib_path_var = "DYLD_LIBRARY_PATH" else: lib_path_var = "LD_LIBRARY_PATH" ld_paths_changed = 0 preload_libcxx = os.environ.get('CEPH_PRELOAD_LIBCXX') if preload_libcxx: ld_paths_changed += add_to_ld_path('LD_PRELOAD', preload_libcxx) if asan_lib_path: ld_paths_changed += add_to_ld_path('LD_PRELOAD', asan_lib_path) ld_paths_changed += add_to_ld_path(lib_path_var, lib_path) if ld_paths_changed > 0: if "CEPH_DEV" not in os.environ: print(DEVMODEMSG, file=sys.stderr) execv_cmd = [] if 'CEPH_DBG' in os.environ: execv_cmd += ['@Python3_EXECUTABLE@', '-mpdb'] execv_cmd += sys.argv os.execvp(execv_cmd[0], execv_cmd) else: sys.path.insert(0, pybind_path) sys.path.insert(0, pythonlib_path) def get_pythonlib_dir(): """Returns the name of a distutils build directory""" return "lib.{version[0]}".format(version=sys.version_info) def get_cmake_variables(*names): vars = dict((name, None) for name in names) for line in open(os.path.join(MYPDIR, "CMakeCache.txt")): # parse lines like "WITH_ASAN:BOOL=ON" for name in names: if line.startswith("{}:".format(name)): type_value = line.split(":")[1].strip() t, v = type_value.split("=") if t == 'BOOL': v = v.upper() in ('TRUE', '1', 'Y', 'YES', 'ON') vars[name] = v break if all(vars.values()): break return [vars[name] for name in names] if os.path.exists(os.path.join(MYPDIR, "CMakeCache.txt")) \ and os.path.exists(os.path.join(MYPDIR, "bin/init-ceph")): src_path, with_asan, asan_lib_path = \ get_cmake_variables("ceph_SOURCE_DIR", "WITH_ASAN", "ASAN_LIBRARY") if src_path is None: # Huh, maybe we're not really in a cmake environment? pass else: # Developer mode, but in a cmake build dir instead of the src dir lib_path = os.path.join(MYPDIR, "lib") bin_path = os.path.join(MYPDIR, "bin") pybind_path = os.path.join(src_path, "src", "pybind") pythonlib_path = os.path.join(lib_path, "cython_modules", get_pythonlib_dir()) respawn_in_path(lib_path, pybind_path, pythonlib_path, asan_lib_path if with_asan else None) if 'PATH' in os.environ and bin_path not in os.environ['PATH']: os.environ['PATH'] = os.pathsep.join([bin_path, os.environ['PATH']]) import argparse import errno import json import rados import shlex import signal import string import subprocess from ceph_argparse import \ concise_sig, descsort_key, parse_json_funcsigs, \ validate_command, find_cmd_target, \ json_command, run_in_thread, Flag from ceph_daemon import admin_socket, DaemonWatcher, Termsize # just a couple of globals verbose = False cluster_handle = None def raw_write(buf): sys.stdout.flush() sys.stdout.buffer.write(buf) def osdids(): ret, outbuf, outs = json_command(cluster_handle, prefix='osd ls') if ret: raise RuntimeError('Can\'t contact mon for osd list') return [line.decode('utf-8') for line in outbuf.split(b'\n') if line] def monids(): ret, outbuf, outs = json_command(cluster_handle, prefix='mon dump', argdict={'format': 'json'}) if ret: raise RuntimeError('Can\'t contact mon for mon list') d = json.loads(outbuf.decode('utf-8')) return [m['name'] for m in d['mons']] def mdsids(): ret, outbuf, outs = json_command(cluster_handle, prefix='fs dump', argdict={'format': 'json'}) if ret: raise RuntimeError('Can\'t contact mon for mds list') d = json.loads(outbuf.decode('utf-8')) l = [] for info in d['standbys']: l.append(info['name']) for fs in d['filesystems']: for info in fs['mdsmap']['info'].values(): l.append(info['name']) return l def mgrids(): ret, outbuf, outs = json_command(cluster_handle, prefix='mgr dump', argdict={'format': 'json'}) if ret: raise RuntimeError('Can\'t contact mon for mgr list') d = json.loads(outbuf.decode('utf-8')) l = [] l.append(d['active_name']) # we can only send tell commands to the active mgr #for i in d['standbys']: # l.append(i['name']) return l def ids_by_service(service): ids = {"mon": monids, "osd": osdids, "mds": mdsids, "mgr": mgrids} return ids[service]() def validate_target(target): """ this function will return true iff target is a correct target, such as mon.a/osd.2/mds.a/mgr. target: array, likes ['osd', '2'] return: bool, or raise RuntimeError """ if len(target) == 2: # for case "service.id" service_name, service_id = target[0], target[1] try: exist_ids = ids_by_service(service_name) except KeyError: print('WARN: {0} is not a legal service name, should be one of mon/osd/mds/mgr'.format(service_name), file=sys.stderr) return False if service_id in exist_ids or len(exist_ids) > 0 and service_id == '*': return True else: print('WARN: the service id you provided does not exist. service id should ' 'be one of {0}.'.format('/'.join(exist_ids)), file=sys.stderr) return False elif len(target) == 1 and target[0] in ['mgr', 'mon']: return True else: print('WARN: \"{0}\" is not a legal target. it should be one of mon./osd./mds./mgr'.format('.'.join(target)), file=sys.stderr) return False # these args must be passed to all child programs GLOBAL_ARGS = { 'client_id': '--id', 'client_name': '--name', 'cluster': '--cluster', 'cephconf': '--conf', } def parse_cmdargs(args=None, target='') -> Tuple[argparse.ArgumentParser, argparse.Namespace, List[str]]: """ Consume generic arguments from the start of the ``args`` list. Call this first to handle arguments that are not handled by a command description provided by the server. :returns: three tuple of ArgumentParser instance, Namespace instance containing parsed values, and list of un-handled arguments """ # alias: let the line-wrapping be sane AP = argparse.ArgumentParser # format our own help parser = AP(description='Ceph administration tool', add_help=False) parser.add_argument('--completion', action='store_true', help=argparse.SUPPRESS) parser.add_argument('-h', '--help', help='request mon help', action='store_true') parser.add_argument('-c', '--conf', dest='cephconf', help='ceph configuration file') parser.add_argument('-i', '--in-file', dest='input_file', help='input file, or "-" for stdin') parser.add_argument('-o', '--out-file', dest='output_file', help='output file, or "-" for stdout') parser.add_argument('--setuser', dest='setuser', help='set user file permission') parser.add_argument('--setgroup', dest='setgroup', help='set group file permission') parser.add_argument('--id', '--user', dest='client_id', help='client id for authentication') parser.add_argument('--name', '-n', dest='client_name', help='client name for authentication') parser.add_argument('--cluster', help='cluster name') parser.add_argument('--admin-daemon', dest='admin_socket', help='submit admin-socket command (e.g. "help\" for' \ 'a list of available commands)') parser.add_argument('-s', '--status', action='store_true', help='show cluster status') parser.add_argument('-w', '--watch', action='store_true', help='watch live cluster changes') parser.add_argument('--watch-debug', action='store_true', help='watch debug events') parser.add_argument('--watch-info', action='store_true', help='watch info events') parser.add_argument('--watch-sec', action='store_true', help='watch security events') parser.add_argument('--watch-warn', action='store_true', help='watch warn events') parser.add_argument('--watch-error', action='store_true', help='watch error events') parser.add_argument('-W', '--watch-channel', dest="watch_channel", help="watch live cluster changes on a specific channel " "(e.g., cluster, audit, cephadm, or '*' for all)") parser.add_argument('--version', '-v', action="store_true", help="display version") parser.add_argument('--verbose', action="store_true", help="make verbose") parser.add_argument('--concise', dest='verbose', action="store_false", help="make less verbose") parser.add_argument('-f', '--format', choices=['json', 'json-pretty', 'xml', 'xml-pretty', 'plain', 'yaml'], help="Note: yaml is only valid for orch commands", dest='output_format') parser.add_argument('--connect-timeout', dest='cluster_timeout', type=int, help='set a timeout for connecting to the cluster') parser.add_argument('--block', action='store_true', help='block until completion (scrub and deep-scrub only)') parser.add_argument('--period', '-p', default=1, type=float, help='polling period, default 1.0 second (for ' \ 'polling commands only)') # returns a Namespace with the parsed args, and a list of all extras parsed_args, extras = parser.parse_known_args(args) return parser, parsed_args, extras def hdr(s): print('\n', s, '\n', '=' * len(s)) def do_basic_help(parser, args): """ Print basic parser help If the cluster is available, get and print monitor help """ hdr('General usage:') parser.print_help() print_locally_handled_command_help() def print_locally_handled_command_help(): hdr("Local commands:") print(""" ping Send simple presence/life test to a mon may be 'mon.*' for all mons daemon {type.id|path} Same as --admin-daemon, but auto-find admin socket daemonperf {type.id | path} [stat-pats] [priority] [] [] daemonperf {type.id | path} list|ls [stat-pats] [priority] Get selected perf stats from daemon/admin socket Optional shell-glob comma-delim match string stat-pats Optional selection priority (can abbreviate name): critical, interesting, useful, noninteresting, debug List shows a table of all available stats Run times (default forever), once per seconds (default 1) """, file=sys.stdout) def do_extended_help(parser, args, target, partial) -> int: def help_for_sigs(sigs, partial=None): try: while True: out = format_help(parse_json_funcsigs(sigs, 'cli'), partial=partial) if not out and partial: # shorten partial until we get at least one matching command prefix partial = ' '.join(partial.split()[:-1]) continue sys.stdout.write(out) break except BrokenPipeError: pass def help_for_target(target, partial=None): # wait for osdmap because we know this is sent after the mgrmap # and monmap (it's alphabetical). cluster_handle.wait_for_latest_osdmap() ret, outbuf, outs = json_command(cluster_handle, target=target, prefix='get_command_descriptions', timeout=10) if ret: if (ret == -errno.EPERM or ret == -errno.EACCES) and target[0] in ('osd', 'mds'): print("Permission denied. Check that your user has 'allow *' " "capabilities for the target daemon type.", file=sys.stderr) elif ret == -errno.EPERM: print("Permission denied. Check your user has proper " "capabilities configured", file=sys.stderr) else: print("couldn't get command descriptions for {0}: {1} ({2})". format(target, outs, ret), file=sys.stderr) return ret else: return help_for_sigs(outbuf.decode('utf-8'), partial) assert(cluster_handle.state == "connected") return help_for_target(target, partial) DONTSPLIT = string.ascii_letters + '{[<>]}' def wrap(s, width, indent): """ generator to transform s into a sequence of strings width or shorter, for wrapping text to a specific column width. Attempt to break on anything but DONTSPLIT characters. indent is amount to indent 2nd-through-nth lines. so "long string long string long string" width=11 indent=1 becomes 'long string', ' long string', ' long string' so that it can be printed as long string long string long string Consumes s. """ result = '' leader = '' while len(s): if len(s) <= width: # no splitting; just possibly indent result = leader + s s = '' yield result else: splitpos = width while (splitpos > 0) and (s[splitpos-1] in DONTSPLIT): splitpos -= 1 if splitpos == 0: splitpos = width if result: # prior result means we're mid-iteration, indent result = leader else: # first time, set leader and width for next leader = ' ' * indent width -= 1 # for subsequent space additions # remove any leading spaces in this chunk of s result += s[:splitpos].lstrip() s = s[splitpos:] yield result def format_help(cmddict, partial=None) -> str: """ Formats all the cmdsigs and helptexts from cmddict into a sorted-by- cmdsig 2-column display, with each column wrapped and indented to fit into (terminal_width / 2) characters. """ fullusage = '' for cmd in sorted(cmddict.values(), key=descsort_key): if not cmd['help']: continue flags = cmd.get('flags', 0) if flags & (Flag.OBSOLETE | Flag.DEPRECATED | Flag.HIDDEN): continue concise = concise_sig(cmd['sig']) if partial and not concise.startswith(partial): continue width = Termsize().cols - 1 # 1 for the line between sig and help sig_width = int(width / 2) # make sure width == sig_width + help_width, even (width % 2 > 0) help_width = int(width / 2) + (width % 2) siglines = [l for l in wrap(concise, sig_width, 1)] helplines = [l for l in wrap(cmd['help'], help_width, 1)] # make lists the same length maxlen = max(len(siglines), len(helplines)) siglines.extend([''] * (maxlen - len(siglines))) helplines.extend([''] * (maxlen - len(helplines))) # so we can zip them for output for s, h in zip(siglines, helplines): fullusage += '{s:{w}s} {h}\n'.format(s=s, h=h, w=sig_width) return fullusage def ceph_conf(parsed_args, field, name, pid=None): cmd = 'ceph-conf' bindir = os.path.dirname(__file__) if shutil.which(cmd): args = [cmd] elif shutil.which(cmd, path=bindir): args = [os.path.join(bindir, cmd)] else: raise RuntimeError('"ceph-conf" not found') if name: args.extend(['--name', name]) if pid: args.extend(['--pid', pid]) # add any args in GLOBAL_ARGS for key, val in GLOBAL_ARGS.items(): # ignore name in favor of argument name, if any if name and key == 'client_name': continue if getattr(parsed_args, key): args.extend([val, getattr(parsed_args, key)]) args.extend(['--show-config-value', field]) p = subprocess.Popen( args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) outdata, errdata = p.communicate() if p.returncode != 0: raise RuntimeError('unable to get conf option %s for %s: %s' % (field, name, errdata)) return outdata.rstrip() PROMPT = 'ceph> ' if sys.stdin.isatty(): def read_input(): while True: line = input(PROMPT).rstrip() if line in ['q', 'quit', 'Q', 'exit']: return None if line: return line else: def read_input(): while True: line = sys.stdin.readline() if not line: return None line = line.rstrip() if line: return line def do_command(parsed_args, target, cmdargs, sigdict, inbuf, verbose): ''' Validate a command, and handle the polling flag ''' valid_dict = validate_command(sigdict, cmdargs, verbose) # Validate input args against list of sigs if valid_dict: if parsed_args.output_format: valid_dict['format'] = parsed_args.output_format if verbose: print("Submitting command: ", valid_dict, file=sys.stderr) else: return -errno.EINVAL, '', 'invalid command' next_header_print = 0 # Set extra options for polling commands only: if valid_dict.get('poll', False): valid_dict['width'] = Termsize().cols while True: try: # Only print the header for polling commands if next_header_print == 0 and valid_dict.get('poll', False): valid_dict['print_header'] = True next_header_print = Termsize().rows - 3 next_header_print -= 1 ret, outbuf, outs = json_command(cluster_handle, target=target, argdict=valid_dict, inbuf=inbuf, verbose=verbose) if valid_dict.get('poll', False): valid_dict['print_header'] = False if not valid_dict.get('poll', False): # Don't print here if it's not a polling command break if ret: ret = abs(ret) print('Error: {0} {1}'.format(ret, errno.errorcode.get(ret, 'Unknown')), file=sys.stderr) break if outbuf: print(outbuf.decode('utf-8')) if outs: print(outs, file=sys.stderr) if parsed_args.period <= 0: break sleep(parsed_args.period) except KeyboardInterrupt: print('Interrupted') return errno.EINTR, '', '' if ret == errno.ETIMEDOUT: ret = -ret if not outs: outs = ("Connection timed out. Please check the client's " + "permission and connection.") return ret, outbuf, outs def new_style_command(parsed_args, cmdargs, target, sigdict, inbuf, verbose) -> Tuple[int, bytes, str]: """ Do new-style command dance. target: daemon to receive command: mon (any) or osd.N sigdict - the parsed output from the new monitor describing commands inbuf - any -i input file data verbose - bool """ if verbose: for cmdtag in sorted(sigdict.keys()): cmd = sigdict[cmdtag] sig = cmd['sig'] print('{0}: {1}'.format(cmdtag, concise_sig(sig))) if cmdargs: # Non interactive mode ret, outbuf, outs = do_command(parsed_args, target, cmdargs, sigdict, inbuf, verbose) else: # Interactive mode (ceph cli) if sys.stdin.isatty(): # do the command-interpreter looping # for input to do readline cmd editing import readline # noqa while True: try: interactive_input = read_input() except EOFError: # leave user an uncluttered prompt return 0, b'\n', '' if interactive_input is None: return 0, b'', '' cmdargs = parse_cmdargs(shlex.split(interactive_input))[2] try: target = find_cmd_target(cmdargs) except Exception as e: print('error handling command target: {0}'.format(e), file=sys.stderr) continue if len(cmdargs) and cmdargs[0] == 'tell': print('Can not use \'tell\' in interactive mode.', file=sys.stderr) continue ret, outbuf, outs = do_command(parsed_args, target, cmdargs, sigdict, inbuf, verbose) if ret < 0: ret = -ret errstr = errno.errorcode.get(ret, 'Unknown') print('Error {0}: {1}'.format(errstr, outs), file=sys.stderr) else: if outs: print(outs, file=sys.stderr) if outbuf: print(outbuf.decode('utf-8')) return ret, outbuf, outs def complete(sigdict, args, target): """ Command completion. Match as much of [args] as possible, and print every possible match separated by newlines. Return exitcode. """ # XXX this looks a lot like the front of validate_command(). Refactor? # Repulsive hack to handle tell: lop off 'tell' and target # and validate the rest of the command. 'target' is already # determined in our callers, so it's ok to remove it here. if len(args) and args[0] == 'tell': args = args[2:] # look for best match, accumulate possibles in bestcmds # (so we can maybe give a more-useful error message) match_count = 0 comps = [] for cmdtag, cmd in sigdict.items(): flags = cmd.get('flags', 0) if flags & (Flag.OBSOLETE | Flag.HIDDEN): continue sig = cmd['sig'] j = 0 # iterate over all arguments, except last one for arg in args[0:-1]: if j > len(sig)-1: # an out of argument definitions break found_match = arg in sig[j].complete(arg) if not found_match and sig[j].req: # no elements that match break if not sig[j].N: j += 1 else: # successfully matched all - except last one - arguments if j < len(sig) and len(args) > 0: comps += sig[j].complete(args[-1]) match_count += 1 match_cmd = cmd if match_count == 1 and len(comps) == 0: # only one command matched and no hints yet => add help comps = comps + [' ', '#'+match_cmd['help']] print('\n'.join(sorted(set(comps)))) return 0 def ping_monitor(cluster_handle, name, timeout): if 'mon.' not in name: print('"ping" expects a monitor to ping; try "ping mon."', file=sys.stderr) return 1 mon_id = name[len('mon.'):] if mon_id == '*': run_in_thread(cluster_handle.connect, timeout=timeout) for m in monids(): s = run_in_thread(cluster_handle.ping_monitor, m) if s is None: print("mon.{0}".format(m) + '\n' + "Error connecting to monitor.") else: print("mon.{0}".format(m) + '\n' + s) else: s = run_in_thread(cluster_handle.ping_monitor, mon_id) print(s) return 0 def get_admin_socket(parsed_args, name): path = ceph_conf(parsed_args, 'admin_socket', name) try: if stat.S_ISSOCK(os.stat(path).st_mode): return path except OSError: pass # try harder, probably the "name" option is in the form of # "${name}.${pid}"? parts = name.rsplit('.', 1) if len(parts) > 1 and parts[-1].isnumeric(): name, pid = parts return ceph_conf(parsed_args, 'admin_socket', name, pid) else: return path def maybe_daemon_command(parsed_args, childargs): """ Check if --admin-socket, daemon, or daemonperf command if it is, returns (boolean handled, return code if handled == True) """ daemon_perf = False sockpath = None if parsed_args.admin_socket: sockpath = parsed_args.admin_socket elif len(childargs) > 0 and childargs[0] in ["daemon", "daemonperf"]: daemon_perf = (childargs[0] == "daemonperf") # Treat "daemon " or "daemon " like --admin_daemon # Handle "daemonperf " the same but requires no trailing args require_args = 2 if daemon_perf else 3 if len(childargs) >= require_args: if childargs[1].find('/') >= 0: sockpath = childargs[1] else: # try resolve daemon name try: sockpath = get_admin_socket(parsed_args, childargs[1]) except Exception as e: print('Can\'t get admin socket path: ' + str(e), file=sys.stderr) return True, errno.EINVAL # for both: childargs = childargs[2:] else: print('{0} requires at least {1} arguments'.format(childargs[0], require_args), file=sys.stderr) return True, errno.EINVAL if sockpath and daemon_perf: return True, daemonperf(childargs, sockpath) elif sockpath: try: raw_write(admin_socket(sockpath, childargs, parsed_args.output_format)) except Exception as e: print('admin_socket: {0}'.format(e), file=sys.stderr) return True, errno.EINVAL return True, 0 return False, 0 def isnum(s): try: float(s) return True except ValueError: return False def daemonperf(childargs: Sequence[str], sockpath: str): """ Handle daemonperf command; returns errno or 0 daemonperf [priority string] [statpats] [interval] [count] daemonperf list|ls [statpats] """ interval = 1 count = None statpats = None priority = None do_list = False def prio_from_name(arg): PRIOMAP = { 'critical': PRIO_CRITICAL, 'interesting': PRIO_INTERESTING, 'useful': PRIO_USEFUL, 'uninteresting': PRIO_UNINTERESTING, 'debugonly': PRIO_DEBUGONLY, } if arg in PRIOMAP: return PRIOMAP[arg] # allow abbreviation for name, val in PRIOMAP.items(): if name.startswith(arg): return val return None # consume and analyze non-numeric args while len(childargs) and not isnum(childargs[0]): arg = childargs.pop(0) # 'list'? if arg in ['list', 'ls']: do_list = True continue # prio? prio = prio_from_name(arg) if prio is not None: priority = prio continue # statpats statpats = arg.split(',') if priority is None: priority = PRIO_DEFAULT if len(childargs) > 0: try: interval = float(childargs.pop(0)) if interval < 0: raise ValueError except ValueError: print('daemonperf: interval should be a positive number', file=sys.stderr) return errno.EINVAL if len(childargs) > 0: arg = childargs.pop(0) if (not isnum(arg)) or (int(arg) < 0): print('daemonperf: count should be a positive integer', file=sys.stderr) return errno.EINVAL count = int(arg) watcher = DaemonWatcher(sockpath, statpats, priority) if do_list: watcher.list() else: watcher.run(interval, count) return 0 def get_scrub_timestamps(childargs: Sequence[str]) -> Dict[str, Tuple[str, str]]: last_scrub_stamp = "last_" + childargs[1].replace('-', '_') + "_stamp" results = dict() scruball = False if childargs[2] in ['all', 'any', '*']: scruball = True devnull = open(os.devnull, 'w') out = subprocess.check_output(['ceph', 'pg', 'dump', '--format=json-pretty'], stderr=devnull) try: pgstats = json.loads(out)['pg_map']['pg_stats'] except KeyError: pgstats = json.loads(out)['pg_stats'] for stat in pgstats: if scruball or stat['up_primary'] == int(childargs[2]): scrub_tuple = (stat['up_primary'], stat[last_scrub_stamp]) results[stat['pgid']] = scrub_tuple return results def check_scrub_stamps(waitdata, currdata): for pg in waitdata.keys(): # Try to handle the case where a pg may not exist in current results if pg in currdata and waitdata[pg][1] == currdata[pg][1]: return False return True def waitscrub(childargs, waitdata): print('Waiting for {0} to complete...'.format(childargs[1]), file=sys.stdout) currdata = get_scrub_timestamps(childargs) while not check_scrub_stamps(waitdata, currdata): time.sleep(3) currdata = get_scrub_timestamps(childargs) print('{0} completed'.format(childargs[1]), file=sys.stdout) def wait(childargs: Sequence[str], waitdata): if childargs[1] in ['scrub', 'deep-scrub']: waitscrub(childargs, waitdata) def main(): ceph_args = os.environ.get('CEPH_ARGS') if ceph_args: if "injectargs" in sys.argv: i = sys.argv.index("injectargs") sys.argv = sys.argv[:i] + ceph_args.split() + sys.argv[i:] else: sys.argv.extend([arg for arg in ceph_args.split() if '--admin-socket' not in arg]) parser, parsed_args, childargs = parse_cmdargs() if parsed_args.version: print('ceph version {0} ({1}) {2} ({3})'.format( CEPH_GIT_NICE_VER, CEPH_GIT_VER, CEPH_RELEASE_NAME, CEPH_RELEASE_TYPE)) # noqa return 0 # --watch-channel|-W implies -w if parsed_args.watch_channel: parsed_args.watch = True elif parsed_args.watch and not parsed_args.watch_channel: parsed_args.watch_channel = 'cluster' global verbose verbose = parsed_args.verbose if verbose: print("parsed_args: {0}, childargs: {1}".format(parsed_args, childargs), file=sys.stderr) # pass on --id, --name, --conf name = 'client.admin' if parsed_args.client_id: name = 'client.' + parsed_args.client_id if parsed_args.client_name: name = parsed_args.client_name conffile = rados.Rados.DEFAULT_CONF_FILES if parsed_args.cephconf: conffile = parsed_args.cephconf # For now, --admin-daemon is handled as usual. Try it # first in case we can't connect() to the cluster done, ret = maybe_daemon_command(parsed_args, childargs) if done: return ret timeout = None if parsed_args.cluster_timeout: timeout = parsed_args.cluster_timeout # basic help if parsed_args.help: do_basic_help(parser, childargs) # handle any 'generic' ceph arguments that we didn't parse here global cluster_handle # rados.Rados() will call rados_create2, and then read the conf file, # and then set the keys from the dict. So we must do these # "pre-file defaults" first (see common_preinit in librados) conf_defaults = { 'log_to_stderr': 'true', 'err_to_stderr': 'true', 'log_flush_on_exit': 'true', } if 'injectargs' in childargs: position = childargs.index('injectargs') injectargs = childargs[position:] childargs = childargs[:position] if verbose: print('Separate childargs {0} from injectargs {1}'.format(childargs, injectargs), file=sys.stderr) else: injectargs = None clustername = None if parsed_args.cluster: clustername = parsed_args.cluster try: cluster_handle = run_in_thread(rados.Rados, name=name, clustername=clustername, conf_defaults=conf_defaults, conffile=conffile) retargs = run_in_thread(cluster_handle.conf_parse_argv, childargs) except rados.Error as e: print('Error initializing cluster client: {0!r}'.format(e), file=sys.stderr) return 1 childargs = retargs if not childargs: childargs = [] # -- means "stop parsing args", but we don't want to see it either if '--' in childargs: childargs.remove('--') if injectargs and '--' in injectargs: injectargs.remove('--') block = False waitdata = dict() if parsed_args.block: if (len(childargs) >= 2 and childargs[0] == 'osd' and childargs[1] in ['deep-scrub', 'scrub']): block = True waitdata = get_scrub_timestamps(childargs) if parsed_args.help: # short default timeout for -h if not timeout: timeout = 5 if childargs and childargs[0] == 'ping' and not parsed_args.help: if len(childargs) < 2: print('"ping" requires a monitor name as argument: "ping mon."', file=sys.stderr) return 1 if parsed_args.completion: # for completion let timeout be really small timeout = 3 try: if childargs and childargs[0] == 'ping' and not parsed_args.help: return ping_monitor(cluster_handle, childargs[1], timeout) result = run_in_thread(cluster_handle.connect, timeout=timeout) if type(result) is tuple and result[0] == -errno.EINTR: print('Cluster connection interrupted or timed out', file=sys.stderr) return 1 except KeyboardInterrupt: print('Cluster connection aborted', file=sys.stderr) return 1 except rados.PermissionDeniedError as e: print(str(e), file=sys.stderr) return errno.EACCES except Exception as e: print(str(e), file=sys.stderr) return 1 if parsed_args.help: target = None if len(childargs) >= 2 and childargs[0] == 'tell': target = childargs[1].split('.', 1) if not validate_target(target): print('target {0} doesn\'t exist; please pass correct target to tell command (e.g., mon.a, osd.1, mds.a, mgr)'.format(childargs[1]), file=sys.stderr) return 1 childargs = childargs[2:] hdr('Tell %s commands:' % target[0]) else: hdr('Monitor commands:') target = ('mon', '') if verbose: print('[Contacting monitor, timeout after %d seconds]' % timeout) return do_extended_help(parser, childargs, target, ' '.join(childargs)) # implement "tell service.id help" if len(childargs) >= 3 and childargs[0] == 'tell' and childargs[2] == 'help': target = childargs[1].split('.', 1) if validate_target(target): hdr('Tell %s commands' % target[0]) return do_extended_help(parser, childargs, target, None) else: print('target {0} doesn\'t exists, please pass correct target to tell command, such as mon.a/' 'osd.1/mds.a/mgr'.format(childargs[1]), file=sys.stderr) return 1 # implement -w/--watch_* # This is ugly, but Namespace() isn't quite rich enough. level = '' for k, v in parsed_args._get_kwargs(): if k.startswith('watch') and v: if k == 'watch': level = 'info' elif k != "watch_channel": level = k.replace('watch_', '') if level: # an awfully simple callback def watch_cb(arg, line, channel, name, who, stamp_sec, stamp_nsec, seq, level, msg): # Filter on channel channel = channel.decode('utf-8') if parsed_args.watch_channel in (channel, '*'): print(line.decode('utf-8')) sys.stdout.flush() # first do a ceph status ret, outbuf, outs = json_command(cluster_handle, prefix='status') if ret: print("status query failed: ", outs, file=sys.stderr) return ret print(outbuf.decode('utf-8')) # this instance keeps the watch connection alive, but is # otherwise unused run_in_thread(cluster_handle.monitor_log2, level, watch_cb, 0) # loop forever letting watch_cb print lines try: signal.pause() except KeyboardInterrupt: # or until ^C, at least return 0 # read input file, if any inbuf = b'' if parsed_args.input_file: try: if parsed_args.input_file == '-': inbuf = sys.stdin.buffer.read() else: with open(parsed_args.input_file, 'rb') as f: inbuf = f.read() except Exception as e: print('Can\'t open input file {0}: {1}'.format(parsed_args.input_file, e), file=sys.stderr) return 1 # prepare output file, if any if parsed_args.output_file: try: if parsed_args.output_file == '-': outf = sys.stdout.buffer else: outf = open(parsed_args.output_file, 'wb') except Exception as e: print('Can\'t open output file {0}: {1}'.format(parsed_args.output_file, e), file=sys.stderr) return 1 if parsed_args.setuser: try: ownerid = pwd.getpwnam(parsed_args.setuser).pw_uid os.fchown(outf.fileno(), ownerid, -1) except OSError as e: print('Failed to change user ownership of {0} to {1}: {2}'.format(outf, parsed_args.setuser, e)) return 1 if parsed_args.setgroup: try: groupid = grp.getgrnam(parsed_args.setgroup).gr_gid os.fchown(outf.fileno(), -1, groupid) except OSError as e: print('Failed to change group ownership of {0} to {1}: {2}'.format(outf, parsed_args.setgroup, e)) return 1 # -s behaves like a command (ceph status). if parsed_args.status: childargs.insert(0, 'status') try: target = find_cmd_target(childargs) except Exception as e: print('error handling command target: {0}'.format(e), file=sys.stderr) return 1 # Repulsive hack to handle tell: lop off 'tell' and target # and validate the rest of the command. 'target' is already # determined in our callers, so it's ok to remove it here. is_tell = False if len(childargs) and childargs[0] == 'tell': childargs = childargs[2:] is_tell = True if is_tell: if injectargs: childargs = injectargs if not len(childargs): print('"{0} tell" requires additional arguments.'.format(sys.argv[0]), 'Try "{0} tell [options...]" instead.'.format(sys.argv[0]), file=sys.stderr) return errno.EINVAL # fetch JSON sigs from command # each line contains one command signature (a placeholder name # of the form 'cmdNNN' followed by an array of argument descriptors) # as part of the validated argument JSON object if target[1] == '*': service = target[0] targets = [(service, o) for o in ids_by_service(service)] else: targets = [target] final_ret = 0 for target in targets: # prettify? prefix output with target, if there was a wildcard used prefix = '' suffix = '' if not parsed_args.output_file and len(targets) > 1: prefix = '{0}.{1}: '.format(*target) suffix = '\n' ret, outbuf, outs = json_command(cluster_handle, target=target, prefix='get_command_descriptions') if ret: where = '{0}.{1}'.format(*target) if ret > 0: raise RuntimeError('Unexpected return code from {0}: {1}'. format(where, ret)) outs = 'problem getting command descriptions from {0}'.format(where) else: sigdict = parse_json_funcsigs(outbuf.decode('utf-8'), 'cli') if parsed_args.completion: return complete(sigdict, childargs, target) ret, outbuf, outs = new_style_command(parsed_args, childargs, target, sigdict, inbuf, verbose) # debug tool: send any successful command *again* to # verify that it is idempotent. if not ret and 'CEPH_CLI_TEST_DUP_COMMAND' in os.environ: ret, outbuf, outs = new_style_command(parsed_args, childargs, target, sigdict, inbuf, verbose) if ret < 0: ret = -ret print(prefix + 'Second attempt of previously successful command ' 'failed with {0}: {1}'.format( errno.errorcode.get(ret, 'Unknown'), outs), file=sys.stderr) sys.stdout.flush() if parsed_args.output_file: outf.write(outbuf) else: # hack: old code printed status line before many json outputs # (osd dump, etc.) that consumers know to ignore. Add blank line # to satisfy consumers that skip the first line, but not annoy # consumers that don't. if parsed_args.output_format and \ parsed_args.output_format.startswith('json'): print() # if we are prettifying things, normalize newlines. sigh. if suffix: outbuf = outbuf.rstrip() if outbuf: try: print(prefix, end='') # Write directly to binary stdout raw_write(outbuf) print(suffix, end='') except IOError as e: if e.errno != errno.EPIPE: raise e final_e = None try: sys.stdout.flush() except IOError as e: if e.errno != errno.EPIPE: final_e = e if ret < 0: ret = -ret errstr = errno.errorcode.get(ret, 'Unknown') print('Error {0}: {1}'.format(errstr, outs), file=sys.stderr) final_ret = ret elif outs: print(prefix + outs, file=sys.stderr) if final_e: raise final_e # Block until command completion (currently scrub and deep scrub only) if block: wait(childargs, waitdata) if parsed_args.output_file and parsed_args.output_file != '-': outf.close() if final_ret: return final_ret return 0 if __name__ == '__main__': try: retval = main() # shutdown explicitly; Rados() does not if retval == 0 and cluster_handle: run_in_thread(cluster_handle.shutdown) except KeyboardInterrupt: print('Interrupted') retval = errno.EINTR if retval: # flush explicitly because we aren't exiting in the usual way sys.stdout.flush() sys.stderr.flush() os._exit(retval) else: sys.exit(retval)