From 4867fc1cd189724bf4be1ab3eb96a86c378ede8a Mon Sep 17 00:00:00 2001 From: Venky Shankar Date: Mon, 16 Nov 2020 09:02:24 -0500 Subject: [PATCH] cephfs-top: top(1) like utility for Ceph Filesystem Signed-off-by: Venky Shankar --- src/tools/cephfs/top/CMakeLists.txt | 7 + src/tools/cephfs/top/cephfs-top | 313 ++++++++++++++++++++++++++++ src/tools/cephfs/top/setup.py | 25 +++ src/tools/cephfs/top/tox.ini | 7 + 4 files changed, 352 insertions(+) create mode 100644 src/tools/cephfs/top/CMakeLists.txt create mode 100755 src/tools/cephfs/top/cephfs-top create mode 100644 src/tools/cephfs/top/setup.py create mode 100644 src/tools/cephfs/top/tox.ini diff --git a/src/tools/cephfs/top/CMakeLists.txt b/src/tools/cephfs/top/CMakeLists.txt new file mode 100644 index 0000000000000..49750c850af04 --- /dev/null +++ b/src/tools/cephfs/top/CMakeLists.txt @@ -0,0 +1,7 @@ +include(Distutils) +distutils_install_module(cephfs-top) + +if(WITH_TESTS) + include(AddCephTest) + add_tox_test(cephfs-top) +endif() diff --git a/src/tools/cephfs/top/cephfs-top b/src/tools/cephfs/top/cephfs-top new file mode 100755 index 0000000000000..f5a74b8264544 --- /dev/null +++ b/src/tools/cephfs/top/cephfs-top @@ -0,0 +1,313 @@ +#!/usr/bin/python3 + +import argparse +import sys +import curses +import errno +import json +import signal +import time + +from collections import OrderedDict +from datetime import datetime +from enum import Enum, unique + +import rados + + +class FSTopException(Exception): + def __init__(self, msg=''): + self.error_msg = msg + + def get_error_msg(self): + return self.error_msg + + +@unique +class MetricType(Enum): + METRIC_TYPE_NONE = 0 + METRIC_TYPE_PERCENTAGE = 1 + METRIC_TYPE_LATENCY = 2 + + +FS_TOP_PROG_STR = 'cephfs-top' + +# version match b/w fstop and stats emitted by mgr/stats +FS_TOP_SUPPORTED_VER = 1 + +ITEMS_PAD_LEN = 1 +ITEMS_PAD = " " * ITEMS_PAD_LEN + +# metadata provided by mgr/stats +FS_TOP_MAIN_WINDOW_COL_CLIENT_ID = "CLIENT_ID" +FS_TOP_MAIN_WINDOW_COL_MNT_ROOT = "MOUNT_ROOT" +FS_TOP_MAIN_WINDOW_COL_MNTPT_HOST_ADDR = "MOUNT_POINT@HOST/ADDR" + +MAIN_WINDOW_TOP_LINE_ITEMS_START = [ITEMS_PAD, + FS_TOP_MAIN_WINDOW_COL_CLIENT_ID, + FS_TOP_MAIN_WINDOW_COL_MNT_ROOT] +MAIN_WINDOW_TOP_LINE_ITEMS_END = [FS_TOP_MAIN_WINDOW_COL_MNTPT_HOST_ADDR] + +# adjust this map according to stats version and maintain order +# as emitted by mgr/stast +MAIN_WINDOW_TOP_LINE_METRICS = OrderedDict([ + ("CAP_HIT", MetricType.METRIC_TYPE_PERCENTAGE), + ("READ_LATENCY", MetricType.METRIC_TYPE_LATENCY), + ("WRITE_LATENCY", MetricType.METRIC_TYPE_LATENCY), + ("METADATA_LATENCY", MetricType.METRIC_TYPE_LATENCY), + ("DENTRY_LEASE", MetricType.METRIC_TYPE_PERCENTAGE), +]) +MGR_STATS_COUNTERS = list(MAIN_WINDOW_TOP_LINE_METRICS.keys()) + +FS_TOP_VERSION_HEADER_FMT = '{prog_name} - {now}' +FS_TOP_CLIENT_HEADER_FMT = 'Client(s): {num_clients} - {num_mounts} FUSE, '\ + '{num_kclients} kclient, {num_libs} libcephfs' + +CLIENT_METADATA_KEY = "client_metadata" +CLIENT_METADATA_MOUNT_POINT_KEY = "mount_point" +CLIENT_METADATA_MOUNT_ROOT_KEY = "root" +CLIENT_METADATA_IP_KEY = "IP" +CLIENT_METADATA_HOSTNAME_KEY = "hostname" + +GLOBAL_METRICS_KEY = "global_metrics" +GLOBAL_COUNTERS_KEY = "global_counters" + + +def calc_perc(c): + if c[0] == 0 and c[1] == 0: + return 0.0 + return round((c[0] / (c[0] + c[1])) * 100, 2) + + +def calc_lat(c): + return round(c[0] + c[1] / 1000000000, 2) + + +def wrap(s, sl): + """return a '+' suffixed wrapped string""" + if len(s) < sl: + return s + return f'{s[0:sl-1]}+' + + +class FSTop(object): + def __init__(self, args): + self.rados = None + self.stop = False + self.stdscr = None # curses instance + self.client_name = args.id + self.cluster_name = args.cluster + self.conffile = args.conffile + + def handle_signal(self, signum, _): + self.stop = True + + def init(self): + try: + if self.conffile: + r_rados = rados.Rados(rados_id=self.client_name, clustername=self.cluster_name, + conffile=self.conffile) + else: + r_rados = rados.Rados(rados_id=self.client_name, clustername=self.cluster_name) + r_rados.conf_read_file() + r_rados.connect() + self.rados = r_rados + except rados.Error as e: + if e.errno == errno.ENOENT: + raise FSTopException(f'cluster {self.cluster_name} does not exist') + else: + raise FSTopException(f'error connecting to cluster: {e}') + self.verify_perf_stats_support() + signal.signal(signal.SIGTERM, self.handle_signal) + signal.signal(signal.SIGINT, self.handle_signal) + + def fini(self): + if self.rados: + self.rados.shutdown() + self.rados = None + + def selftest(self): + stats_json = self.perf_stats_query() + if not stats_json['version'] == FS_TOP_SUPPORTED_VER: + raise FSTopException('perf stats version mismatch!') + + def setup_curses(self): + self.stdscr = curses.initscr() + + # coordinate constants for windowing -- (height, width, y, x) + # NOTE: requires initscr() call before accessing COLS, LINES. + HEADER_WINDOW_COORD = (2, curses.COLS - 1, 0, 0) + TOPLINE_WINDOW_COORD = (1, curses.COLS - 1, 3, 0) + MAIN_WINDOW_COORD = (curses.LINES - 4, curses.COLS - 1, 4, 0) + + self.header = curses.newwin(*HEADER_WINDOW_COORD) + self.topl = curses.newwin(*TOPLINE_WINDOW_COORD) + self.mainw = curses.newwin(*MAIN_WINDOW_COORD) + curses.wrapper(self.display) + + def verify_perf_stats_support(self): + mon_cmd = {'prefix': 'mgr module ls', 'format': 'json'} + try: + ret, buf, out = self.rados.mon_command(json.dumps(mon_cmd), b'') + except Exception as e: + raise FSTopException(f'error checking \'stats\' module: {e}') + if ret != 0: + raise FSTopException(f'error checking \'stats\' module: {out}') + if 'stats' not in json.loads(buf.decode('utf-8'))['enabled_modules']: + raise FSTopException('\'stats\' module not enabled. Use \'ceph mgr module ' + 'enable stats\' to enable') + + def perf_stats_query(self): + mgr_cmd = {'prefix': 'fs perf stats', 'format': 'json'} + try: + ret, buf, out = self.rados.mgr_command(json.dumps(mgr_cmd), b'') + except Exception as e: + raise FSTopException(f'error in \'perf stats\' query: {e}') + if ret != 0: + raise FSTopException(f'error in \'perf stats\' query: {out}') + return json.loads(buf.decode('utf-8')) + + def mtype(self, typ): + if typ == MetricType.METRIC_TYPE_PERCENTAGE: + return "(%)" + elif typ == MetricType.METRIC_TYPE_LATENCY: + return "(s)" + else: + return '' + + def refresh_top_line_and_build_coord(self): + xp = 0 + x_coord_map = {} + + heading = [] + for item in MAIN_WINDOW_TOP_LINE_ITEMS_START: + heading.append(item) + nlen = len(item) + len(ITEMS_PAD) + x_coord_map[item] = (xp, nlen) + xp += nlen + + for item, typ in MAIN_WINDOW_TOP_LINE_METRICS.items(): + it = f'{item}{self.mtype(typ)}' + heading.append(it) + nlen = len(it) + len(ITEMS_PAD) + x_coord_map[item] = (xp, nlen) + xp += nlen + + for item in MAIN_WINDOW_TOP_LINE_ITEMS_END: + heading.append(item) + nlen = len(item) + len(ITEMS_PAD) + x_coord_map[item] = (xp, nlen) + xp += nlen + self.topl.addstr(0, 0, ITEMS_PAD.join(heading), curses.A_STANDOUT | curses.A_BOLD) + return x_coord_map + + def refresh_client(self, client_id, metrics, counters, client_meta, x_coord_map, y_coord): + for item in MAIN_WINDOW_TOP_LINE_ITEMS_END: + coord = x_coord_map[item] + if item == FS_TOP_MAIN_WINDOW_COL_MNTPT_HOST_ADDR: + self.mainw.addstr(y_coord, coord[0], + f'{client_meta[CLIENT_METADATA_MOUNT_POINT_KEY]}@' + f'{client_meta[CLIENT_METADATA_HOSTNAME_KEY]}/' + f'{client_meta[CLIENT_METADATA_IP_KEY]}') + for item in MAIN_WINDOW_TOP_LINE_ITEMS_START: + coord = x_coord_map[item] + hlen = coord[1] - len(ITEMS_PAD) + if item == FS_TOP_MAIN_WINDOW_COL_CLIENT_ID: + self.mainw.addstr(y_coord, coord[0], + wrap(client_id.split('.')[1], hlen)) + elif item == FS_TOP_MAIN_WINDOW_COL_MNT_ROOT: + self.mainw.addstr(y_coord, coord[0], + wrap(client_meta[CLIENT_METADATA_MOUNT_ROOT_KEY], hlen)) + cidx = 0 + for item in counters: + coord = x_coord_map[item] + m = metrics[cidx] + typ = MAIN_WINDOW_TOP_LINE_METRICS[MGR_STATS_COUNTERS[cidx]] + if item.lower() in client_meta['valid_metrics']: + if typ == MetricType.METRIC_TYPE_PERCENTAGE: + self.mainw.addstr(y_coord, coord[0], f'{calc_perc(m)}') + elif typ == MetricType.METRIC_TYPE_LATENCY: + self.mainw.addstr(y_coord, coord[0], f'{calc_lat(m)}') + else: + self.mainw.addstr(y_coord, coord[0], "N/A") + cidx += 1 + + def refresh_clients(self, x_coord_map, stats_json): + counters = [m.upper() for m in stats_json[GLOBAL_COUNTERS_KEY]] + y_coord = 0 + for client_id, metrics in stats_json[GLOBAL_METRICS_KEY].items(): + self.refresh_client(client_id, + metrics, + counters, + stats_json[CLIENT_METADATA_KEY][client_id], + x_coord_map, + y_coord) + y_coord += 1 + + def refresh_main_window(self, x_coord_map, stats_json): + self.refresh_clients(x_coord_map, stats_json) + + def refresh_header(self, stats_json): + if not stats_json['version'] == FS_TOP_SUPPORTED_VER: + self.header.addstr(0, 0, 'perf stats version mismatch!') + return False + client_metadata = stats_json[CLIENT_METADATA_KEY] + num_clients = len(client_metadata) + num_mounts = len([client for client, metadata in client_metadata.items() if not + metadata[CLIENT_METADATA_MOUNT_POINT_KEY] == 'N/A']) + num_kclients = len([client for client, metadata in client_metadata.items() if + "kernel_version" in metadata]) + num_libs = num_clients - (num_mounts + num_kclients) + now = datetime.now().ctime() + self.header.addstr(0, 0, + FS_TOP_VERSION_HEADER_FMT.format(prog_name=FS_TOP_PROG_STR, now=now), + curses.A_STANDOUT | curses.A_BOLD) + self.header.addstr(1, 0, FS_TOP_CLIENT_HEADER_FMT.format(num_clients=num_clients, + num_mounts=num_mounts, + num_kclients=num_kclients, + num_libs=num_libs)) + return True + + def display(self, _): + x_coord_map = self.refresh_top_line_and_build_coord() + self.topl.refresh() + while not self.stop: + stats_json = self.perf_stats_query() + self.header.clear() + self.mainw.clear() + if self.refresh_header(stats_json): + self.refresh_main_window(x_coord_map, stats_json) + self.header.refresh() + self.mainw.refresh() + time.sleep(1) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description='Ceph Filesystem top utility') + parser.add_argument('--cluster', nargs='?', const='ceph', default='ceph', + help='Ceph cluster to connect (defualt: ceph)') + parser.add_argument('--id', nargs='?', const='fstop', default='fstop', + help='Ceph user to use to connection (default: fstop)') + parser.add_argument('--conffile', nargs='?', default=None, + help='Path to cluster configuration file') + parser.add_argument('--selftest', dest='selftest', action='store_true', + help='run in selftest mode') + args = parser.parse_args() + err = False + ft = FSTop(args) + try: + ft.init() + if args.selftest: + ft.selftest() + sys.stdout.write("selftest ok\n") + else: + ft.setup_curses() + except FSTopException as fst: + err = True + sys.stderr.write(f'{fst.get_error_msg()}\n') + except Exception as e: + err = True + sys.stderr.write(f'exception: {e}\n') + finally: + ft.fini() + sys.exit(0 if not err else -1) diff --git a/src/tools/cephfs/top/setup.py b/src/tools/cephfs/top/setup.py new file mode 100644 index 0000000000000..92fbd964c57ee --- /dev/null +++ b/src/tools/cephfs/top/setup.py @@ -0,0 +1,25 @@ +# -*- coding: utf-8 -*- + +from setuptools import setup + +__version__ = '0.0.1' + +setup( + name='cephfs-top', + version=__version__, + description='top(1) like utility for Ceph Filesystem', + keywords='cephfs, top', + scripts=['cephfs-top'], + install_requires=[ + 'rados', + ], + classifiers=[ + 'Development Status :: 3 - Alpha', + 'Environment :: Console', + 'Intended Audience :: System Administrators', + 'License :: OSI Approved :: GNU Lesser General Public License v2 or later (LGPLv2+)', + 'Operating System :: POSIX :: Linux', + 'Programming Language :: Python :: 3' + ], + license='LGPLv2+', +) diff --git a/src/tools/cephfs/top/tox.ini b/src/tools/cephfs/top/tox.ini new file mode 100644 index 0000000000000..b125c0bc8eac8 --- /dev/null +++ b/src/tools/cephfs/top/tox.ini @@ -0,0 +1,7 @@ +[tox] +envlist = py3 +skipsdist = true + +[testenv:py3] +deps = flake8 +commands = flake8 --ignore=W503 --max-line-length=100 cephfs-top -- 2.39.5