From: Jose J Palacios-Perez Date: Wed, 27 Nov 2024 11:38:27 +0000 (+0000) Subject: tools/contrib: Add tasksetcpu.py to show the CPU allocation grid X-Git-Tag: v20.0.0~208^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=f83110a9adde8dab0b90d834968435f8838b9df4;p=ceph.git tools/contrib: Add tasksetcpu.py to show the CPU allocation grid Signed-off-by: Jose J Palacios-Perez --- diff --git a/src/tools/contrib/README.rst b/src/tools/contrib/README.rst index d7655e208b9..fe36cf959e7 100644 --- a/src/tools/contrib/README.rst +++ b/src/tools/contrib/README.rst @@ -8,3 +8,21 @@ Please do not assume any level of support. Your mileage may vary. Each file's header must include a tracker number and an author signed-off-by line. + + +- balance-cpu.py. An utility to distribute the Seastar reactor threads over the + (physical) CPU cores, according to two strategies: + - OSD-based (default): allocates all the reactors of the same OSD in the same + NUMA socket, + - NUMA socket: distributes the reactors of each OSD evenly in the NUMA sockets + (normally two), so every OSD ends up with reactors running on both NUMA sockets. + +- lscpu.py. A Python module to parse the output of ``lscpu --json`` into a dictionary + which is used by balance-cpu and tasksetcpu.py. + +- tasksetcpu.py. an utility to print a grid showing the current CPU core allocation + of Seastar reactors. Useful to validate that the allocation strategy is correct. + +For further details, please see *BalanceCPUCrimson.md* in doc/dev/crimson. + + diff --git a/src/tools/contrib/tasksetcpu.py b/src/tools/contrib/tasksetcpu.py new file mode 100644 index 00000000000..28b24a6dcdf --- /dev/null +++ b/src/tools/contrib/tasksetcpu.py @@ -0,0 +1,575 @@ +#!/usr/bin/python +""" +This script traverses the ouput from taskset and ps to produce a .JSON +to generate an ascii grid for visualisation. +Returns the suggested CPu cores for the FIO client. +""" + +import argparse +import logging +import os +import sys +import re +import json +import tempfile +from pprint import pformat + +# import pprint +from lscpu import LsCpuJson + +__author__ = "Jose J Palacios-Perez" + +logger = logging.getLogger(__name__) + + +def to_color(string, color): + """ + Simple basic color ascii coding + """ + color_code = { + "blue": "\033[34m", + "yellow": "\033[33m", + "green": "\033[32m", + "red": "\033[31m", + } + return color_code[color] + str(string) + "\033[0m" + + +def serialize_sets(obj): + """ + Serialise sets + """ + if isinstance(obj, set): + return list(obj) + + return obj + + +class CpuCell(object): + """ + Single cell representing a CPU core + Essentially a list of tuples, each tuple is a str (actually a letter) of the type of thread + running in this Cpu core. The type indicates the color code that it should be printed. + We only support three types: Reactors, Alien, and Bluestore threads. + Probably need to refactor the proc_groups dict as a member here, or a class of its own (?) + """ + + def __init__(self, cpuid, atype): + self.type = atype + self.cpuid = cpuid + + +class CpuGrid(object): + """ + Grid for a Single CPU socket + Each cell is a CpuCell, which contains a set of threads initials (single letter as per the field 'name') + and a color code. + """ + + ROWS = 8 + COLS = 7 + # width = len(str(max(rows,cols)+1)) + WIDTH = 5 + + def __init__(self, id, socket): + """ + This class expects a single CPU socket, which has two sections: + physical of size num_cores, and a HT-sibling section with the same number + """ + self.id = id + self.socket = socket + # Or more generally, a list of tuples -- these should be CpuCell + self.grid = [["."] * self.COLS for _ in range(self.ROWS)] + self.str_lines = [] + + def get_cell_coord(self, cpuid, is_phys): + """ + Return the tuple row,col for this cpuid + """ + if is_phys: + row = (cpuid - self.socket["phy_start"]) // self.COLS + col = (cpuid - self.socket["phy_start"]) % self.COLS + else: + row = ( + (self.socket["num_cores"] // self.COLS) # should be 4 + + (cpuid - self.socket["ht_start"]) // self.COLS + ) + col = (cpuid - self.socket["ht_start"]) % self.COLS + return (row, col) + + def set_cell(self, cpuid, is_phys, vstr): + """ + Fill the cell for cpuid with the values vstr + """ + # vlen = len(vstr) // 10 # due to control chars + row, col = self.get_cell_coord(cpuid, is_phys) + self.grid[row][col] = " " + vstr + " " # use a new object instead? + # self.grid[row][col] = " " * (self.WIDTH - vlen) + vstr + + def set_header(self): + """ + Set this socket grid header + """ + header = " " * self.WIDTH + f" Socket {self.id} ".center( + (self.WIDTH + 1) * self.COLS, "-" + ) + self.str_lines = [header] + + def show_grid(self): + """ + Debug show grid + """ + logger.debug(f"Grid: {pformat(self.grid)}") + # logger.debug(f"{pformat(self.str_lines)}") + + def make_rows(self, is_phys): + """ + Construct the rows body of the Grid + Can be the physical section or HT section + """ + width = self.WIDTH + cols = self.COLS + contentLine = "# | values |" + + if is_phys: + _startp = self.socket["phy_start"] + _row = 0 + _endp = 4 # self.get_cell_coord(self.socket["num_cores"]+1,is_phys) + else: + _startp = self.socket["ht_start"] + _row = 4 # self.get_cell_coord(self.socket["ht_start"],is_phys) # 4 + _endp = 8 # self.get_cell_coord( self.socket["ht_start"]+self.socket["num_cores"]+1,is_phys) # 8 + + logger.debug(f"make_rows: {_row}, {_endp}") + grid_slice = self.grid[_row:_endp] + for i, row in enumerate(grid_slice): + values = "+".join(f"{v}".center(width, " ") for v in row) + line = contentLine.replace("values", values) + line = line.replace("#", f"{_startp + cols*i:>{width}d}") + # This separates the Physical from the HT section + self.str_lines.append(line) + + def make_grid(self): + """ + Construct the grid for this socket line by line + """ + width = self.WIDTH + cols = self.COLS + + # build frame + contentLine = "# | values |" + + dashes = "+".join("-" * width for _ in range(cols)) + frameLine = contentLine.replace("values", dashes) + frameLine = frameLine.replace("#", " " * width) + frameLine = frameLine.replace("| ", "+-").replace(" |", "-+") + + # x-axis numbers: + numLine = contentLine.replace("|", " ") + numLine = numLine.replace("#", " " * width) + colNums = " ".join(f"{i:<{width}d}" for i in range(cols)) + numLine = numLine.replace("values", colNums) + self.str_lines.append(numLine) + + self.str_lines.append(frameLine) + self.make_rows(True) # physical section + self.str_lines.append(frameLine) + self.make_rows(False) # HT section + self.str_lines.append(frameLine) + + # construct an iterator for this grid + self.itlines = iter(self.str_lines) + + def get_num_lines(self): + """ + Return the number of lines of the grid + """ + return len(self.str_lines) + + def next(self): + """ + Calls iterator to return next line + """ + return next(self.itlines, "") + + +class TasksetEntry(object): + """ + Process a sequence of taskset_ps _thread.out files to + produce a CpuGrid per socket and .JSON + """ + + # Only for OSD/Crimson + proc_groups = { + # TODO: log are valid thread names for both reactor and aliens + "reactor": { + "regex": re.compile(r"(crimson|reactor|syscall|log).*"), + "color": "red", + "name": "R", + }, + "alien": { + "regex": re.compile(r"(alien-store-tp)"), + "color": "green", + "name": "A", + }, + "bluestore": { + "regex": re.compile(r"(bstore|rocksdb|cfin).*"), + "color": "blue", + "name": "B", + }, + } + proc_groups_set = set() + + # Formmat from the _threads.out files: + # 1368714 1368714 crimson-osd 0 pid 1368714's current affinity list: 0 + # 1368714 1368720 reactor-1 1 pid 1368720's current affinity list: 1 + LINE_REGEX = re.compile( + r""" + ^\d+\s+ # PID + \d+\s+ # TID + ([^\s]+)\s+ # thread name + (\d+)\s+ # CPU id + pid\s+(\d+)[']s\s+current\s+affinity\s+list:\s+(\d+)$""", + re.VERBOSE, + ) # |re.DEBUG) + FILE_SUFFIX_LST = re.compile(r"_list$") # ,(_list|.out)re.DEBUG) + + def __init__(self, config, directory, num_cpu_client, lscpu): + """ + This class expects either: + a list of result files to process into a grid (suffix _list) + or a single _threads.out file + the number of CPU intended for the clients (eg. FIO) + the .json from lscpu + """ + self.config = config + m = self.FILE_SUFFIX_LST.search(config) + if m: + self.mode = "list" + self.jsonName = config.replace("_list", ".json") + else: + self.mode = "single" + self.jsonName = config.replace(".out", ".json") + + self.directory = directory + self.num_cpu_client = num_cpu_client + self.osd_num = 0 + # This should be an array of dicts, size of num OSD + self.entries = [] # {} + # From lscpu we can get the ranges + # self.sockets = [ Cpugrid(_i, ) for _i in range(NUM_SOCKETS)] # array of CpuGrid + self.sockets = [] + self.lscpu = LsCpuJson(lscpu) + self.proc_groups_set.update(self.proc_groups.keys()) + + def traverse_dir(self): + """ + Traverse the given list (.JSON) use .tex template to generate document + """ + pass + + def find(self, name, path): + """ + find a name file in path + """ + for root, dirs, files in os.walk(path): + if name in files: + return os.path.join(root, name) + + def _get_str(self, cpuset, osd_id): + """ + Transform a cpu set into a string of chars to indicate + the thread allocation + """ + _result = "" + logger.debug(f"Got cpuset: {cpuset} for {osd_id}:") + for item in cpuset: + logger.debug(f"Got {item}:") + if item not in self.proc_groups: + logger.error(f"{item} not in proc_groups") + return _result + _id = self.proc_groups[item]["name"] + logger.debug(f"Got {_id}.{osd_id}") + _result += to_color( + f"{_id}.{osd_id}", + self.proc_groups[item]["color"], + # self.proc_groups[item]["name"], self.proc_groups[item]["color"] + ) + return _result + + def save_grid_json(self): + """ + Save the grid into a .JSON + Shall we use the same name as the config list replaced extension + """ + if self.jsonName: + with open(self.jsonName, "w", encoding="utf-8") as f: + json.dump( + self.entries, f, indent=4, sort_keys=True, default=serialize_sets + ) + f.close() + + def _get_tgroup(self, tname: str): + """ + Get the proc_groups from the thread name + """ + for k in self.proc_groups: + if self.proc_groups[k]["regex"].match(tname): + return k + + logger.debug(f"{tname}: not registered in groups") + return tname + + def _get_cpu_range(self, cpu_uid: str, cpu_range: str): + """ + Get the cpu id range provided by taskset (if exist) + The first arg is the cpuid from ps field PSR + Returns the corresponding list as a set + """ + cpu_list = [] + regex = re.compile(r"(\d+)([,-](\d+))?") + m = regex.search(cpu_range) + if m: + start = int(m.group(1)) + if m.group(2): + end = int(m.group(3)) + cpu_list = list(range(start, end + 1)) + else: + cpu_list = [start] + cpu_set = set(cpu_list) + cpu_set.update({int(cpu_uid)}) + return cpu_set + + def _parse_via_regex(self, line: str): + """ + Bug in the REGEx, alternative working fine + """ + logger.debug(f"Parsing: {line}") + match = self.LINE_REGEX.search(line) + if match: + groups = match.groups() + logger.debug(f"Got groups: {groups}") + tname = self._get_tgroup(groups[0]) + cpuid = str(groups[1]) + return tname, cpuid + + def parse(self, fname: str): + """ + Parses individual _thread.out file + Returns a dict whose keys are cpuid, values are dicts + with the threads names, process group association (Reactor, Alien, Bluestore) + represented as a set (idempotent, we can later look at add info such as occurrences) + """ + entry = {} + with open(fname, "r") as _data: + f_info = os.fstat(_data.fileno()) + if f_info.st_size == 0: + print(f"input file {fname} is empty") + return entry + lines = _data.read().splitlines() + _data.close() + for line in lines: + lista = line.split() + tid = lista[1] + tname = self._get_tgroup(lista[2]) + cpu_range = self._get_cpu_range(lista[3], lista[9]) + for cpuid in cpu_range: + if cpuid not in entry: + entry.update({cpuid: {tname: []}}) + if tname not in entry[cpuid]: + entry[cpuid].update({tname: []}) + entry[cpuid][tname].append(tid) + + return entry + + def merge_entries(self, new_entry: dict, osd_id: int): + """ + Merges (via set union) with the new entry (eg. OSD num) + keys of the new_entry are cpuid + """ + for k in new_entry.keys(): + entry = self.entries[osd_id] + if k not in entry: + entry[k] = new_entry[k] + else: + entry[k].update(new_entry[k]) + + def set_cpu_in_grid(self, cpuid: int, cpuset, osd_id: int): + """ + Given a cpuid and its contents, set the corresponding CpuGrid + for given OSD + """ + vstr = self._get_str(cpuset, osd_id) + sindex, is_phys = self.lscpu.get_socket(cpuid) + grid = self.sockets[sindex] + grid.set_cell(cpuid, is_phys, vstr) + + def get_osd_id(self, setup: str): + """ + Extract the OSD number from the filename (which should follow the expected convention) + """ + osd_id = 0 # default + regex = re.compile(r"^osd_(\d+).*$") + m = regex.search(setup) + if m: + osd_id = int(m.group(1)) + return osd_id + + def update_grid(self, setup: str, osd_id: int): + """ + Update the sockets grid for the given setup + """ + print(f"== {setup} ==") # OSD process + + for _s in self.sockets: + _s.set_header() + + entry = self.entries[osd_id] + for cpuid, cpuset in entry.items(): + self.set_cpu_in_grid(int(cpuid), cpuset, osd_id) + + def show_grid(self): + """ + Traverse the array of sockets to join them by row to produce + each line. + """ + for _s in self.sockets: + _s.make_grid() + _s.show_grid() + + # join the grid lines per socket: this should be done in a more Pythonic way... + for i in range(self.sockets[0].get_num_lines()): + line = " + ".join(_s.next() for _s in self.sockets) + print(line) + + def traverse_files(self): + """ + Traverses the _thread.out files given in the config + """ + if self.mode == "single": + out_files = [self.config] + else: + try: + config_file = open(self.config, "r") + except IOError as e: + raise argparse.ArgumentTypeError(str(e)) + out_files = config_file.read().splitlines() + print(out_files) + config_file.close() + + self.osd_num = len(out_files) + for entry in range(self.osd_num): + self.entries.append({}) + + print(f"loading {len(out_files)} .out files ...") + + # pp = pprint.PrettyPrinter(width=41, compact=True) + # The number of files should be the same as the number of OSDs + for fname in out_files: + # Extract the OSD id from fname + cpuNodeList = self.parse(fname) + osd_id = self.get_osd_id(fname) + # pp.pprint(cpuNodeList)# Ok + # merged = {**self.entries, **cpuNodeList } + # Show the grid for this fname + self.merge_entries(cpuNodeList, osd_id) + self.update_grid(fname, osd_id) + # logger.debug(f"Got entries: {self.entries}:") + + def run(self): + """ + Entry point: processes the input files, then produces the grid + """ + self.lscpu.load_json() + self.lscpu.get_ranges() + for s in range(self.lscpu.get_num_sockets()): + self.sockets.append( + CpuGrid( + s, + { + "phy_start": self.lscpu.get_physical_start(s), + "ht_start": self.lscpu.get_ht_start(s), + "num_cores": self.lscpu.get_num_physical(s), + }, + ) + ) + self.traverse_files() + self.show_grid() + self.save_grid_json() + + +def main(argv): + examples = """ + Examples: + # Produce a CPU distribution visualisation grid for a single file: + %prog -c osd_0_crimson_1osd_16reactor_256at_8fio_lt_disable_ht_threads.out + + # Produce a CPU distribution visualisation grid for a _list _of files: + %prog -c crimson_1osd_16reactor_lt_disable_list + """ + parser = argparse.ArgumentParser( + description="""This tool is used to parse output from the combined taskset and ps commands""", + epilog=examples, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + + parser.add_argument( + "-c", + "--config", + type=str, + required=True, + help="Input file: either containing a _list_ of _threads.out files, or a single .out file", + default=None, + ) + # load the NUMA summary -- or lscpu --json + parser.add_argument( + "-u", + "--lscpu", + type=str, + required=True, + help="Input file: .json file produced by lscpu --json", + default=None, + ) + parser.add_argument( + "-i", + "--client", + type=int, + required=False, + help="Number of CPU cores required for the FIO client", + default=8, + ) + + parser.add_argument( + "-d", "--directory", type=str, help="Directory to examine", default="./" + ) + parser.add_argument( + "-v", + "--verbose", + action="store_true", + help="True to enable verbose logging mode", + default=False, + ) + + # parser.set_defaults(numosd=1) + options = parser.parse_args(argv) + + if options.verbose: + logLevel = logging.DEBUG + else: + logLevel = logging.INFO + + with tempfile.NamedTemporaryFile(dir="/tmp", delete=False) as tmpfile: + logging.basicConfig(filename=tmpfile.name, encoding="utf-8", level=logLevel) + # print(f"logname: {tmpfile.name}") + + logger.debug(f"Got options: {options}") + + os.chdir(options.directory) + grid = TasksetEntry( + options.config, options.directory, options.client, options.lscpu + ) + grid.run() + + +if __name__ == "__main__": + main(sys.argv[1:])