From: Jose J Palacios-Perez Date: Wed, 27 Nov 2024 11:42:27 +0000 (+0000) Subject: tools/contrib: Add lscpu.py auxiliary module to parse the output of lscpu, rebased... X-Git-Tag: v20.0.0~208^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=8b264e6c4123ecf1fe9f0508339ea8f013cff7ef;p=ceph.git tools/contrib: Add lscpu.py auxiliary module to parse the output of lscpu, rebased with updates Signed-off-by: Jose J Palacios-Perez --- diff --git a/src/tools/contrib/balance_cpu.py b/src/tools/contrib/balance_cpu.py index 13b917db955..0f7d4b7e3f1 100644 --- a/src/tools/contrib/balance_cpu.py +++ b/src/tools/contrib/balance_cpu.py @@ -8,19 +8,155 @@ Two strategies of balancing reactors over CPU cores: 1) OSD based: all the reactors of each OSD run in the same CPU NUMA socket (default), 2) Socket based: reactors for the same OSD are distributed evenly across CPU NUMA sockets. + +Some auxiliaries: +- given a taskset cpu_set bitmask, identify those active physical CPU core ids and their + HT siblings, +- for a gfiven OSD id, identify the corresponding CPU core ids to set. +- convert a (decimal) comma separated intervals into a cpu_set bitmask + +Apply bitwise operator over each bytes variables: +result=bytes(map (lambda a,b: a ^ b, bytes_all_cpu, bytes_fio_cpu)) + +Given the list extracted from lscpu, apply the cpu_set bitmask from the taskset argument, +hence disabling some core ids. For each OSD, produce the corresponding bitmask. """ import argparse import logging import sys +import os +import re import tempfile +from pprint import pformat +# from typing import Dict, List, Any + from lscpu import LsCpuJson __author__ = "Jose J Palacios-Perez" logger = logging.getLogger(__name__) -# Defaults + +# Some generic bitwise functions to use from the taskset data +def get_bit(value, bit_index): + """Get a power of 2 if the bit is on, 0 otherwise""" + return value & (1 << bit_index) + + +def get_normalized_bit(value, bit_index): + """Return 1/0 whenever the bit is on""" + return (value >> bit_index) & 1 + + +def set_bit(value, bit_index): + """As it says on the tin""" + return value | (1 << bit_index) + + +def clear_bit(value, bit_index): + """As it says on the tin""" + return value & ~(1 << bit_index) + + +# Generic functions to query whether a CPU id is enabled/available or not +def is_cpu_avail(bytes_mask, cpuid): + """ + Return true if the cpuid is on + CPU id 0 is at the last end of the bytes_mask, the max_cpu is at bit 0 + """ + try: + return get_normalized_bit(bytes_mask[-1 - (cpuid // 8)], cpuid % 8) + except IndexError: + return False + + +def set_cpu(bytes_mask, cpuid): + """Set cpuid on bytes_mask""" + try: + bytes_mask[-1 - (cpuid // 8)] = set_bit(bytes_mask[-1 - (cpuid // 8)], cpuid % 8) + except IndexError: + pass + return bytes_mask + + +def get_range(bytes_mask, start, length): + """ + Given a bytes_mask, return a new bytes_mask with the range of CPU ids + starting from start and of length, skipping those that are not available + """ + result = bytearray(b"\x00" * len(bytes_mask)) + max_cpu = 8 * len(bytes_mask) + while length > 0 and start < max_cpu: + if is_cpu_avail(bytes_mask, start): + set_cpu(result, start) + length -= 1 + start += 1 + + return result + + +def set_range(bytes_mask, start, end): + """ + Set a range of CPU ids in bytes_mask from start to end + """ + ba = bytearray(bytes_mask) + for i in range(start, end): + set_cpu(ba, i) + return ba + + +def set_all_ht_siblings(bytes_mask): + """ + Set all the HT sibling of the enabled physical CPU ids specified in bytes_mask. + + Physical cores are in the range [-half_length_bytes_mask:] + HT siblings are in the range [0:half_length_bytes_mask-1] + + Notes: + result=bytes(map (lambda a,b: a | b, bytes_ht, bytes_phys)) + # result=bytes(map (lambda a,b: a | b, result[0:half_indx], bytes_mask[-half_indx:])) + # partial = [a | b for a, b in zip(empty[0:half_indx], bytes_mask[-half_indx:])] + # result = bytes( partial + bytes_mask[-half_indx:] ) + # result = bytearray(b"\x00" * len(bytes_mask)) + """ + result = bytearray(bytes_mask) + half_indx = len(bytes_mask) // 2 + for i in range(0, half_indx): + result[i] |= bytes_mask[half_indx + i] + return result + + +def count_bits(bytes_mask: bytearray) -> int: + """ + Using Python 3.9 way + Python 3.10: i.bit_count() + """ + count = 0 + for x in bytes_mask: + count += bin(x).count("1") + return count + + +def count_phys_cpus(bytes_mask: bytearray) -> int: + """ + Count the number of physical CPU from the bitmask + """ + count = 0 + half_indx = len(bytes_mask) // 2 + count = count_bits(bytes_mask[half_indx:]) + return count + + +def is_hexadecimal_str(s: str) -> bool: + try: + int(s, 16) + return True + except ValueError: + return False + + +# Defaults to declare, which are values that can be given as options for the script NUM_OSD = 8 NUM_REACTORS = 3 @@ -33,6 +169,7 @@ class CpuCoreAllocator(object): { "lscpu": [ { + d: { "field": "CPU(s):", "data": "112",} d: {'field': 'NUMA node(s):', 'data': '2'} d: {'field': 'NUMA node0 CPU(s):', 'data': '0-27,56-83'} d: {'field': 'NUMA node1 CPU(s):', 'data': '28-55,84-111'} @@ -41,50 +178,204 @@ class CpuCoreAllocator(object): } """ - def __init__(self, json_file: str, num_osd: int, num_react: int): + def __init__( + self, + lscpu: str = "", + num_osd: int = NUM_OSD, + num_react: int = NUM_REACTORS, + hex_or_range_str: str = "", + out_hex: bool = False, + ) -> None: """ This class expects the output from lscpu --json, from there it works out a list of physical CPU uids to allocate Seastar reactors """ - self.json_file = json_file self.num_osd = num_osd self.num_react = num_react - self._dict = {} - self.lscpu = LsCpuJson(json_file) - # self.socket_lst = LsCpuJson(json_file) + self.out_hex = out_hex + self.bytes_avail_cpus = bytes([]) + self.hex_or_range_str = hex_or_range_str + self.lscpu = LsCpuJson(lscpu) + assert self.lscpu, f"Invalid {lscpu}" + # Output to produce: either hex bitmask CPU set or decimal ranges + self.osds_cpu_out = {"dec_ranges": {}, "hex_cpu_mask": {}} + + def set_cpu_default(self) -> None: + """ + From lscpu we set the max num CPUs value to indicate how many hex digits + a valid taskset string should have + """ + # Number of hex digits required for the cpu_set bitmask + self.num_hex_digits = self.lscpu.get_num_logical_cpus() // 8 + # Default bitmask: all CPUs available + self.ALL_CPUS = "ff" * self.num_hex_digits + self.bytes_all_cpu = bytes.fromhex(self.ALL_CPUS) + + def parse_taskset_arg(self, cpu_range: str) -> str: + """ + The taskset arg can be an hexstring describing a cpu_set bitmask, or can be + decimal ranges comma separated. This method parses the second case. + - split the ',' tuples (or singletons) + extract the decimal values -- validate they are within max CPU uid. Produce a + valid hexstring cpu_set bitmask. + # result_ba = bytearray(b"\x00" * len(self.bytes_all_cpu)) + # cpu_list_int = list(range(start, end + 1)) + # cpu_list_int = [start] + # cpu_set.update(set(cpu_list_int)) + # Convert the set to an hex string for a cpu_set bitmask + # for item in cpu_set: + # set_cpu(result_ba, item) + # Compare both approaches match: + # logging.debug(f"result_ba:{result_ba}, bytes_mask:{bytes_mask}") + """ + cpu_list_str = cpu_range.split(",") + regex = re.compile(r"(\d+)([-](\d+))?") + bytes_mask = bytearray(b"\x00" * len(self.bytes_all_cpu)) + for item in cpu_list_str: + m = regex.search(item) + if m: + start = int(m.group(1)) + if m.group(2): + end = int(m.group(3)) + bytes_mask = set_range(bytes_mask, start, end) + else: + bytes_mask = set_cpu(bytes_mask, start) + return bytes(bytes_mask).hex() + + def set_available_cpus(self): + """ + Set the instance attribute self.bytes_avail_cpus + If valid taskset hex_cpu_mask, use it, otherwise use all CPUs + """ + # Validate the hex_string/bytes size for the cpuset bitmask + if self.hex_cpu_mask: + try: + self.bytes_avail_cpus = bytes.fromhex(self.hex_cpu_mask) + except ValueError: + print(f"Ignoring invalid hex string, using default {self.ALL_CPUS}") + logger.error(f"Invalid taskset arg: {self.hex_cpu_mask} ") + self.bytes_avail_cpus = self.bytes_all_cpu + assert self.num_hex_digits >= len( + self.bytes_avail_cpus + ), "Invalid taskset hexstring size" + + def validate_cpu_for_osd(self): + """ + Validate whether there are enough CPU cores for the required OSD + Count the number of bits from the physical section of the bytes_avail_cpus + + Note: we could use up to the maximum possible num of OSD instead of the number asked. + """ + total_phys_cores = self.lscpu.get_total_physical() + self.num_avail_phys_cores = count_phys_cpus(bytearray(self.bytes_avail_cpus)) + + logger.debug( + f"total_phys_cores: {total_phys_cores}, avail_phys_cores: {self.num_avail_phys_cores}" + ) + assert ( + total_phys_cores >= self.num_avail_phys_cores + ), "Invalid available physical CPU cores" + max_osd_num = self.num_avail_phys_cores // self.num_react + assert max_osd_num > self.num_osd, "Not enough physical CPU cores" + + + def setup(self): + """ + Preparation and validation of available CPU ids + """ + self.lscpu.load_json() + self.lscpu.get_ranges() + self.set_cpu_default() + if is_hexadecimal_str(self.hex_or_range_str): + self.hex_cpu_mask = self.hex_or_range_str + else: + self.hex_cpu_mask = self.parse_taskset_arg(self.hex_or_range_str) + self.set_available_cpus() + logger.debug(f"self.bytes_avail_cpus: {self.bytes_avail_cpus}") + self.validate_cpu_for_osd() + + + def bitmask_to_range(self, bytes_mask) -> str: + """ + Produce a list of decimal ranges from the bitmask cpuset + """ + lista = [] + start = -1 + end = start + i = 0 + # Do we need to check max_cpu < self.lscpu.get_num_logical_cpus(): + max_cpu = len(bytes_mask) * 8 + logger.debug(f"max_cpu : {max_cpu}") + while i None: + """ + Updates the internal attributes to trace the CPUs assigned to the OSD process + """ + osd_cpu_s = bytes(cpuset_ba).hex() + # Update the bitset mask: + if osd in self.osds_cpu_out["hex_cpu_mask"]: + self.osds_cpu_out["hex_cpu_mask"][osd] += f",{osd_cpu_s}" + else: + self.osds_cpu_out["hex_cpu_mask"].update({osd: osd_cpu_s}) + + osd_cpu_str = self.bitmask_to_range(cpuset_ba) + self.osds_cpu_out["dec_ranges"].update({osd: osd_cpu_str}) + logger.debug(f"self.osds_cpu_out: {pformat(self.osds_cpu_out)}") + def do_distrib_socket_based(self): """ Distribution criteria: the reactors of each OSD are distributed across the available NUMA sockets evenly. - Each OSD uses step cores from each NUMA socket. - Produces a list of ranges to use for the ceph config set CLI. + Each OSD uses step cores from each NUMA socket. Each socket is a pair of ranges (_start,_end) + for physical and HT. On each allocation we update the physical_start, so the next iteration + picks the CPU uid accordingly. + Produces a bitmask cpuset and list of cpu id ranges to use for the ceph config set CLI. + + This method and next definitely can be refactored, possibly by defining a dictionary with callbacks + for each stage where differ, both use the same general algorithm. """ - # Init: + # Init: common to both strategies control = [] - cores_to_disable = set([]) num_sockets = self.lscpu.get_num_sockets() - # step = self.num_react - total_phys_cores = self.lscpu.get_total_physical() - # Max num of OSD that can be allocated - max_osd_num = total_phys_cores // self.num_react # Each OSD uses num reactor//sockets cores step = self.num_react // num_sockets reminder = self.num_react % num_sockets - logger.debug( - f"total_phys_cores: {total_phys_cores}, max_osd_num: {max_osd_num}, step:{step}" - ) - assert max_osd_num > self.num_osd, "Not enough physical CPU cores" + logger.debug(f"do_distrib_socket_based: step:{step}") # Copy the original physical ranges to the control dict - for socket in self.lscpu.get_sockets(): # socket_lst["sockets"]: + for socket in self.lscpu.get_sockets(): control.append(socket) + + # This byte array will be transformed for each OSD + cpu_avail_ba = bytearray(self.bytes_avail_cpus) + avail_s = bytes(cpu_avail_ba).hex() + # This dict would hold a bitsetmask in hex per OSD + osds_ba = {} # Traverse the OSD to produce an allocation - # f"total_phys_cores: {total_phys_cores}, max_osd_num: {max_osd_num}, step:{step}, rem:{reminder} " for osd in range(self.num_osd): - osds = [] for socket in control: _start = socket["physical_start"] _step = step @@ -99,34 +390,46 @@ class CpuCoreAllocator(object): logger.debug( f"osd: {osd}, socket:{_so_id}, _start:{_start}, _end:{_end - 1}" ) - osds.append(f"{_start}-{_end - 1}") + # Verify this range is valid, otherwise shift as appropriate + cpuset_ba = get_range(cpu_avail_ba, _start, _step) + # Associate their HT siblings of this range + cpuset_ba = set_all_ht_siblings(cpuset_ba) + # Update the list of bitmask of this OSD + if osd in osds_ba: + merged = bytes(map(lambda a, b: a | b, osds_ba[osd], cpuset_ba)) + osds_ba[osd] = merged + else: + osds_ba.update({osd: cpuset_ba}) + # Disable this OSD bitmaskset from the cpu_avail_ba + cpu_avail_ba = bytes(map(lambda a, b: a & ~b, cpu_avail_ba, cpuset_ba)) + osd_cpu_s = bytes(cpuset_ba).hex() + osd_cpu_s = bytes(osds_ba[osd]).hex() + avail_s = bytes(cpu_avail_ba).hex() + logger.debug(f"-- OSD: {osd}: {osd_cpu_s}, avail:{avail_s}") + # Update the bitset mask + self.set_osd_cpuset(osd, osds_ba[osd]) if _end <= socket["physical_end"]: socket["physical_start"] = _end - # Produce the HT sibling list to disable - # Consider to use sets to avoid dupes + _ht_start = socket["ht_sibling_start"] + _ht_end = socket["ht_sibling_start"] + step plist = list( range( - socket["ht_sibling_start"], - (socket["ht_sibling_start"] + _step), + _ht_start, + _ht_end, 1, ) ) logger.debug(f"plist: {plist}") - pset = set(plist) - # _to_disable=pset.union(cores_to_disable) - cores_to_disable = pset.union(cores_to_disable) - logger.debug(f"cores_to_disable: {list(cores_to_disable)}") socket["ht_sibling_start"] += _step else: # bail out _sops = socket["physical_start"] + step logger.debug(f"out of range: {_sops}") break - print(",".join(osds)) - _to_disable = sorted(list(cores_to_disable)) - logger.debug(f"Cores to disable: {_to_disable}") - print(" ".join(map(str, _to_disable))) + # Set the reminder available CPU + self.set_osd_cpuset('available', bytes(cpu_avail_ba) ) + self.osds_ba = osds_ba def do_distrib_osd_based(self): """ @@ -136,25 +439,24 @@ class CpuCoreAllocator(object): Produces a list of ranges to use for the ceph config set CLI. """ control = [] - cores_to_disable = set([]) # Each OSD uses num reactor cores from the same NUMA socket num_sockets = self.lscpu.get_num_sockets() step = self.num_react - total_phys_cores = self.lscpu.get_total_physical() - # Max num of OSD that can be allocated - max_osd_num = total_phys_cores // self.num_react - - logger.debug( - f"total_phys_cores: {total_phys_cores}, max_osd_num: {max_osd_num}, step:{step}" - ) - assert max_osd_num > self.num_osd, "Not enough physical CPU cores" # Copy the original physical ranges to the control dict for socket in self.lscpu.get_sockets(): control.append(socket) + + # This byte array will be transformed for each OSD + cpu_avail_ba = bytearray(self.bytes_avail_cpus) + avail_s = bytes(cpu_avail_ba).hex() + logger.debug(f"cpu_avail_ba : {avail_s}") + # This dict would hold a bitsetmask per OSD + osds_ba = {} # Traverse the OSD to produce an allocation # even OSD num uses socket0, odd OSD number uses socket 1 for osd in range(self.num_osd): + #osds = [] # List of ranges as string _so_id = osd % num_sockets socket = control[_so_id] _start = socket["physical_start"] @@ -163,32 +465,69 @@ class CpuCoreAllocator(object): logger.debug( f"osd: {osd}, socket:{_so_id}, _start:{_start}, _end:{_end - 1}" ) - print(f"{_start}-{_end - 1}") + # Verify this range is valid, skipping unavailable CPU ids as appropriate + cpuset_ba = get_range(cpu_avail_ba, _start, step) + # Associate their HT siblings of this range -- what if some of these are disabled? + cpuset_ba = set_all_ht_siblings(cpuset_ba) + # Update the list of bitmask of this OSD + if osd in osds_ba: + merged = bytes(map(lambda a, b: a | b, osds_ba[osd], cpuset_ba)) + osds_ba[osd] = merged + else: + osds_ba.update({osd: cpuset_ba}) + #osds.append(f"{_start}-{_end - 1}") + # Disable this OSD bitmaskset from the cpu_avail_ba + cpu_avail_ba = bytes(map(lambda a, b: a & ~b, cpu_avail_ba, cpuset_ba)) + osd_cpu_s = bytes(cpuset_ba).hex() + avail_s = bytes(cpu_avail_ba).hex() + logger.debug(f"-- OSD: {osd}: {osd_cpu_s}, avail:{avail_s}") + # Update the bitset mask + self.set_osd_cpuset(osd, osds_ba[osd]) + if _end <= socket["physical_end"]: socket["physical_start"] = _end - # Produce the HT sibling list to disable - # Consider to use sets to avoid dupes + _ht_start = socket["ht_sibling_start"] + _ht_end = socket["ht_sibling_start"] + step plist = list( range( - socket["ht_sibling_start"], - (socket["ht_sibling_start"] + step), + _ht_start, + _ht_end, 1, ) ) logger.debug(f"plist: {plist}") - pset = set(plist) - # _to_disable = pset.union(cores_to_disable) - cores_to_disable = pset.union(cores_to_disable) - logger.debug(f"cores_to_disable: {list(cores_to_disable)}") socket["ht_sibling_start"] += step else: # bail out _sops = socket["physical_start"] + step logger.debug(f"Out of range: {_sops}") break - _to_disable = sorted(list(cores_to_disable)) - logger.debug(f"Cores to disable: {_to_disable}") - print(" ".join(map(str, _to_disable))) + # Set the reminder available CPU + self.set_osd_cpuset('available', bytes(cpu_avail_ba) ) + # Set the following to exercise the unit tests + self.osds_ba = osds_ba + + def output_cpusets(self): + """ + Generic print of the cpuset to use per OSD and the remaining list of CPU available + to use for everything else, eg Alien threads + + # Convert a bytesarrys back to hex string + # hex_string = "".join("%02x" % b for b in array_alpha) + # print(bytes(bytes_array).hex()) + """ + # Output: either hex or decimal ranges + if self.out_hex: + for cpuset in self.osds_cpu_out["hex_cpu_mask"].values(): + # one line per OSD-- ensure its sorted, last one must be the available + print(cpuset) + else: + for cpuset in self.osds_cpu_out["dec_ranges"].values(): + print(cpuset) + #print(" ".join(map(str, self._to_disable))) + + logger.debug(f"osds_cpu_out: {pformat(self.osds_cpu_out)}") + def run(self, distribute_strat): """ @@ -196,21 +535,22 @@ class CpuCoreAllocator(object): produce the corresponding balance, print the balance as a list intended to be consumed by vstart.sh -- a dictionary will be used for cephadm. """ - self.lscpu.load_json() - self.lscpu.get_ranges() + self.setup() if distribute_strat == "socket": self.do_distrib_socket_based() else: self.do_distrib_osd_based() + self.output_cpusets() + def main(argv): examples = """ Examples: # Produce a balanced CPU distribution of physical CPU cores intended for the Seastar reactor threads - %prog -u [-b ] [-d] [-v] - [-o ] [-r ] + %prog [-u |-t ] [-b ] [-d] [-v] + [-o ] [-r ] # such a list can be used for vstart.sh/cephadm to issue ceph conf set commands. """ @@ -219,21 +559,27 @@ def main(argv): epilog=examples, formatter_class=argparse.RawDescriptionHelpFormatter, ) + parser.add_argument( + "-o", + "--num_osd", + type=int, + required=False, + help="Number of OSDs", + default=NUM_OSD, + ) parser.add_argument( "-u", "--lscpu", type=str, - required=True, help="Input file: .json file produced by lscpu --json", default=None, ) parser.add_argument( - "-o", - "--num_osd", - type=int, - required=False, - help="Number of OSDs", - default=NUM_OSD, + "-t", + "--taskset", + type=str, + help="The taskset argument of the parent process (eg. vstart)", + default=None, ) parser.add_argument( "-r", @@ -252,7 +598,7 @@ def main(argv): type=str, required=False, help="CPU balance strategy: osd (default), socket (NUMA)", - default="osd", + default=False, ) parser.add_argument( "-v", @@ -261,6 +607,13 @@ def main(argv): help="True to enable verbose logging mode", default=False, ) + parser.add_argument( + "-x", + "--hexcpuset", + action="store_true", + help="True to enable hexadecimal cpuset bitmask output", + default=False, + ) # parser.set_defaults(numosd=1) options = parser.parse_args(argv) @@ -274,8 +627,15 @@ def main(argv): logging.basicConfig(filename=tmpfile.name, encoding="utf-8", level=logLevel) logger.debug(f"Got options: {options}") + os.chdir(options.directory) - cpu_cores = CpuCoreAllocator(options.lscpu, options.num_osd, options.num_reactor) + cpu_cores = CpuCoreAllocator( + options.lscpu, + options.num_osd, + options.num_reactor, + options.taskset, + options.hexcpuset, + ) cpu_cores.run(options.balance) diff --git a/src/tools/contrib/lscpu.py b/src/tools/contrib/lscpu.py new file mode 100644 index 00000000000..32dd2d96d01 --- /dev/null +++ b/src/tools/contrib/lscpu.py @@ -0,0 +1,159 @@ +#!/usr/bin/python +""" +This module gets the output from lscpu and produces a list of CPU uids +corresponding to physical cores. +""" + +# import logging +import os +import re +import json +# import tempfile +# import pprint + +__author__ = "Jose J Palacios-Perez" + +# logger = logging.getLogger(__name__) + + +class LsCpuJson(object): + """ + Process a sequence of CPU core ids + + # lscpu --json + { + "lscpu": [ + { + d: { "field": "CPU(s):", "data": "112"} + d: {"field": "Core(s) per socket:", "data": "28"} + d: {'field': 'NUMA node(s):', 'data': '2'} + d: {'field': 'NUMA node0 CPU(s):', 'data': '0-27,56-83'} + d: {'field': 'NUMA node1 CPU(s):', 'data': '28-55,84-111'} + } + : + } + """ + + def __init__(self, json_file: str): + """ + This class expects the output from lscpu --json + """ + self.json_file = json_file + self._dict = {} + self.socket_lst = { + "num_sockets": 0, + "num_logical_cpus": 0, + "num_cores_per_socket": 0, + # or more general, an array, index is the socket number + "sockets": [], + } + + def load_json(self): + """ + Load the lscpu --json output + """ + json_file = self.json_file + with open(json_file, "r") as json_data: + # check for empty file + f_info = os.fstat(json_data.fileno()) + if f_info.st_size == 0: + print(f"JSON input file {json_file} is empty") + return # Should assert + self._dict = json.load(json_data) + json_data.close() + # logger.debug(f"_dict: {self._dict}") + + def get_num_sockets(self): + """ + Accessor + """ + return self.socket_lst["num_sockets"] + + def get_physical_start(self, sindex): + """ + Accessor: cpu core id start physical + """ + return self.socket_lst["sockets"][sindex]["physical_start"] + + def get_ht_start(self, sindex): + """ + Accessor: cpu core id start ht-sibling + """ + return self.socket_lst["sockets"][sindex]["ht_sibling_start"] + + def get_num_physical(self): + """ + Accessor: num physical cpu core ids + """ + return self.socket_lst["num_cores_per_socket"] + + def get_num_logical_cpus(self): + """ + Accessor: num CPU ids + """ + return self.socket_lst["num_logical_cpus"] + + def get_total_physical(self): + """ + Accessor: sum of the physical cores for all sockets + """ + return self.get_num_sockets() * self.get_num_physical() + + + def get_socket(self, cpuid: int): + """ + Accessor: given cpuid returns which socket number and + whether is a physical (True) or ht-sibling (False) + """ + for _i, s in enumerate(self.socket_lst["sockets"]): + if s["physical_start"] <= cpuid and cpuid <= s["physical_end"]: + return (_i, True) + if s["ht_sibling_start"] <= cpuid and cpuid <= s["ht_sibling_end"]: + return (_i, False) + + def get_ranges(self): + """ + Parse the .json from lscpu + (we might extend this to parse either version: normal or .json) + """ + ncpu_re = re.compile(r"^CPU\(s\):$") + numa_re = re.compile(r"NUMA node\(s\):") + node_re = re.compile(r"NUMA node(\d+) CPU\(s\):") + ranges_re = re.compile(r"(\d+)-(\d+),(\d+)-(\d+)") + cores_re = re.compile(r"^Core\(s\) per socket:$") + socket_lst = self.socket_lst + for d in self._dict["lscpu"]: + # logger.debug(f"d: {d}") + m = numa_re.search(d["field"]) + if m: + socket_lst["num_sockets"] = int(d["data"]) + continue + m = node_re.search(d["field"]) + if m: + socket = m.group(1) + m = ranges_re.search(d["data"]) + if m: + drange = { + "socket": int(socket), + "physical_start": int(m.group(1)), + "physical_end": int(m.group(2)), + "ht_sibling_start": int(m.group(3)), + "ht_sibling_end": int(m.group(4)), + } + socket_lst["sockets"].append(drange) + continue + m = ncpu_re.search(d["field"]) + if m: + socket_lst["num_logical_cpus"] = int(d["data"]) + continue + m = cores_re.search(d["field"]) + if m: + socket_lst["num_cores_per_socket"] = int(d["data"]) + # logger.debug(f"result: {socket_lst}") + assert self.socket_lst["num_sockets"] > 0, "Failed to parse lscpu" + + def get_sockets(self): + """ + Return the socket_lst["sockets"] + """ + return self.socket_lst["sockets"] diff --git a/src/tools/contrib/tasksetcpu.py b/src/tools/contrib/tasksetcpu.py index 28b24a6dcdf..2d238d18ec2 100644 --- a/src/tools/contrib/tasksetcpu.py +++ b/src/tools/contrib/tasksetcpu.py @@ -2,7 +2,6 @@ """ This script traverses the ouput from taskset and ps to produce a .JSON to generate an ascii grid for visualisation. -Returns the suggested CPu cores for the FIO client. """ import argparse @@ -12,9 +11,8 @@ import sys import re import json import tempfile +from typing import Dict, List, Any, Set from pprint import pformat - -# import pprint from lscpu import LsCpuJson __author__ = "Jose J Palacios-Perez" @@ -22,9 +20,9 @@ __author__ = "Jose J Palacios-Perez" logger = logging.getLogger(__name__) -def to_color(string, color): +def to_color(string: str, color: str) -> str: """ - Simple basic color ascii coding + Simple basic color ANSI/ASCII Coding """ color_code = { "blue": "\033[34m", @@ -35,7 +33,19 @@ def to_color(string, color): return color_code[color] + str(string) + "\033[0m" -def serialize_sets(obj): +def ljust_color(text: str, padding: int, char=" ") -> str: + """ + Find all matching ANSI sequences, then get the total length + of all matches to serve as our offset when we add it to the padding value + """ + pattern = r"(?:\x1B[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]" + + matches = re.findall(pattern, text) + offset = sum(len(match) for match in matches) + return text.ljust(padding + offset, char[0]) + + +def serialize_sets(obj) -> Any: """ Serialise sets """ @@ -45,44 +55,121 @@ def serialize_sets(obj): return obj +THREAD_TYPES = { + # log is a valid thread name for both reactor and aliens + "reactor": { + "regex": re.compile(r"(crimson|reactor|syscall|log).*"), + "color": "red", + "name": "R", + }, + "alien": { + "regex": re.compile(r"(alien-store-tp)"), + "color": "green", + "name": "A", + }, + "bluestore": { + "regex": re.compile(r"(bstore|rocksdb|cfin).*"), + "color": "blue", + "name": "B", + }, +} + + class CpuCell(object): """ - Single cell representing a CPU core - Essentially a list of tuples, each tuple is a str (actually a letter) of the type of thread - running in this Cpu core. The type indicates the color code that it should be printed. - We only support three types: Reactors, Alien, and Bluestore threads. - Probably need to refactor the proc_groups dict as a member here, or a class of its own (?) + Single cell representing the threads allocated to a CPU core. + Each cell is a set of thread types (Reactor, Alien, Bluestore). + The Reactor type should be mutually exclusive to the other types. + The type indicates the color code that the thread set should be printed. """ - def __init__(self, cpuid, atype): - self.type = atype + def __init__(self, cpuid=0): + """ + Construct an empty CpuCell: + Might need extending to + { OSD: set(thread_types) } + """ + self.osd_id = -1 + self.cpuid = cpuid + self._set = set([]) + + def update(self, cpuid, cpuset: Dict[str, Any], osd_id: str) -> None: + """ + Update the contents of a CpuCell. + """ + _tlist = [] + self.osd_id = osd_id self.cpuid = cpuid + for thread_id in cpuset: + if thread_id not in THREAD_TYPES: + logger.error(f"{thread_id} not in THREAD_TYPES") + else: + _tlist.append(thread_id) + self._set.update(set(_tlist)) + logger.debug(f"--{self._set}--") + + def __str__(self) -> str: + """ + Method to be called by str() on instances of this class. + """ + _str = "".join([THREAD_TYPES[item]["name"] for item in self._set]) + return f"{self.osd_id}.{_str}" + + def __repr__(self) -> str: + """ + Method to represent an instance of this class. + Used implicitly by eg. pformat() + """ + _str = "".join([THREAD_TYPES[item]["name"] for item in self._set]) + return f"{self.osd_id}.{_str}" + # return f"{self.cpuid}.{_str}" + + def print(self, width=0) -> str: + """ + Print the cell in color coded. + """ + _str = "." + _tlen = 1 + for _id in self._set: + _name = THREAD_TYPES[_id]["name"] + _color = THREAD_TYPES[_id]["color"] + _item = f"{_name}{self.osd_id}" + _tlen += len(_item) + _str += to_color( + _item, + _color, + ) + return ljust_color(_str, width) class CpuGrid(object): """ - Grid for a Single CPU socket - Each cell is a CpuCell, which contains a set of threads initials (single letter as per the field 'name') - and a color code. + Grid for a Single CPU socket, basicaly a matrix of CpuCells. """ + NUM_CPUS = 112 ROWS = 8 COLS = 7 # width = len(str(max(rows,cols)+1)) - WIDTH = 5 + WIDTH = 6 - def __init__(self, id, socket): + def __init__( + self, id: int, socket: Dict[str, int], num_cpus: int = NUM_CPUS + ) -> None: """ This class expects a single CPU socket, which has two sections: - physical of size num_cores, and a HT-sibling section with the same number + physical of size num_cores, and a HT-sibling section with the same number. """ self.id = id self.socket = socket - # Or more generally, a list of tuples -- these should be CpuCell - self.grid = [["."] * self.COLS for _ in range(self.ROWS)] - self.str_lines = [] + self.ROWS = num_cpus // (self.COLS * 2) + self.grid = [ + [CpuCell(self.COLS * row + col) for col in range(self.COLS)] + for row in range(self.ROWS) + ] + self.str_lines: List[str] = [] - def get_cell_coord(self, cpuid, is_phys): + def get_cell_coord(self, cpuid: int, is_phys: bool): """ Return the tuple row,col for this cpuid """ @@ -90,42 +177,36 @@ class CpuGrid(object): row = (cpuid - self.socket["phy_start"]) // self.COLS col = (cpuid - self.socket["phy_start"]) % self.COLS else: - row = ( - (self.socket["num_cores"] // self.COLS) # should be 4 - + (cpuid - self.socket["ht_start"]) // self.COLS - ) + row = (self.ROWS // 2) + (cpuid - self.socket["ht_start"]) // self.COLS col = (cpuid - self.socket["ht_start"]) % self.COLS return (row, col) - def set_cell(self, cpuid, is_phys, vstr): + def set_cell( + self, cpuid: int, osd_id, cpuset: Dict[str, List[Any]], is_phys: bool + ) -> None: """ Fill the cell for cpuid with the values vstr """ - # vlen = len(vstr) // 10 # due to control chars row, col = self.get_cell_coord(cpuid, is_phys) - self.grid[row][col] = " " + vstr + " " # use a new object instead? - # self.grid[row][col] = " " * (self.WIDTH - vlen) + vstr + logger.debug(f"cpu{cpuid}: {row},{col}") + try: + self.grid[row][col].update(cpuid, cpuset, osd_id) + except IndexError: + logger.error(f"{is_phys}-index_out_of_range for {cpuid}: {row},{col}") def set_header(self): """ - Set this socket grid header + Set this socket grid header. """ header = " " * self.WIDTH + f" Socket {self.id} ".center( (self.WIDTH + 1) * self.COLS, "-" ) self.str_lines = [header] - def show_grid(self): - """ - Debug show grid - """ - logger.debug(f"Grid: {pformat(self.grid)}") - # logger.debug(f"{pformat(self.str_lines)}") - def make_rows(self, is_phys): """ - Construct the rows body of the Grid - Can be the physical section or HT section + Construct the rows body of the Grid. + Can be the physical section or HT section. """ width = self.WIDTH cols = self.COLS @@ -134,16 +215,15 @@ class CpuGrid(object): if is_phys: _startp = self.socket["phy_start"] _row = 0 - _endp = 4 # self.get_cell_coord(self.socket["num_cores"]+1,is_phys) + _endp = self.ROWS // 2 # 4 else: _startp = self.socket["ht_start"] - _row = 4 # self.get_cell_coord(self.socket["ht_start"],is_phys) # 4 - _endp = 8 # self.get_cell_coord( self.socket["ht_start"]+self.socket["num_cores"]+1,is_phys) # 8 + _row = self.ROWS // 2 # 4 + _endp = self.ROWS # 8 - logger.debug(f"make_rows: {_row}, {_endp}") grid_slice = self.grid[_row:_endp] for i, row in enumerate(grid_slice): - values = "+".join(f"{v}".center(width, " ") for v in row) + values = "+".join(f"{v.print(width)}" for v in row) line = contentLine.replace("values", values) line = line.replace("#", f"{_startp + cols*i:>{width}d}") # This separates the Physical from the HT section @@ -151,7 +231,7 @@ class CpuGrid(object): def make_grid(self): """ - Construct the grid for this socket line by line + Construct the grid for this socket line by line. """ width = self.WIDTH cols = self.COLS @@ -192,37 +272,23 @@ class CpuGrid(object): """ return next(self.itlines, "") + def show_grid(self): + """ + Debug show grid + """ + logger.debug(f"Socket: {self.id} Grid: {pformat(self.grid)}") + class TasksetEntry(object): """ Process a sequence of taskset_ps _thread.out files to produce a CpuGrid per socket and .JSON - """ - - # Only for OSD/Crimson - proc_groups = { - # TODO: log are valid thread names for both reactor and aliens - "reactor": { - "regex": re.compile(r"(crimson|reactor|syscall|log).*"), - "color": "red", - "name": "R", - }, - "alien": { - "regex": re.compile(r"(alien-store-tp)"), - "color": "green", - "name": "A", - }, - "bluestore": { - "regex": re.compile(r"(bstore|rocksdb|cfin).*"), - "color": "blue", - "name": "B", - }, - } - proc_groups_set = set() - # Formmat from the _threads.out files: + # Format from the _threads.out files: # 1368714 1368714 crimson-osd 0 pid 1368714's current affinity list: 0 # 1368714 1368720 reactor-1 1 pid 1368720's current affinity list: 1 + """ + LINE_REGEX = re.compile( r""" ^\d+\s+ # PID @@ -231,16 +297,18 @@ class TasksetEntry(object): (\d+)\s+ # CPU id pid\s+(\d+)[']s\s+current\s+affinity\s+list:\s+(\d+)$""", re.VERBOSE, - ) # |re.DEBUG) - FILE_SUFFIX_LST = re.compile(r"_list$") # ,(_list|.out)re.DEBUG) + ) + FILE_SUFFIX_LST = re.compile(r"_list$") - def __init__(self, config, directory, num_cpu_client, lscpu): + def __init__( + self, config, directory, num_cpu_client, lscpu: str = "", taskset=None + ): """ - This class expects either: - a list of result files to process into a grid (suffix _list) - or a single _threads.out file - the number of CPU intended for the clients (eg. FIO) - the .json from lscpu + This class expects: + - either a list of result files to process into a grid (suffix _list) + or a single _threads.out file + - the number of CPU intended for the clients (eg. FIO) + - the .json from lscpu. """ self.config = config m = self.FILE_SUFFIX_LST.search(config) @@ -254,13 +322,13 @@ class TasksetEntry(object): self.directory = directory self.num_cpu_client = num_cpu_client self.osd_num = 0 - # This should be an array of dicts, size of num OSD - self.entries = [] # {} - # From lscpu we can get the ranges - # self.sockets = [ Cpugrid(_i, ) for _i in range(NUM_SOCKETS)] # array of CpuGrid - self.sockets = [] - self.lscpu = LsCpuJson(lscpu) - self.proc_groups_set.update(self.proc_groups.keys()) + # This is a dict with keys OSD num and values dicts w/keys CPU uid, values list of thread id + self.entries = {} + self.sockets: List[CpuGrid] = [] + # We probably might not need this attribute + self.taskset = taskset + if lscpu: + self.lscpu = LsCpuJson(lscpu) def traverse_dir(self): """ @@ -272,59 +340,43 @@ class TasksetEntry(object): """ find a name file in path """ - for root, dirs, files in os.walk(path): + for root, _, files in os.walk(path): if name in files: return os.path.join(root, name) - def _get_str(self, cpuset, osd_id): - """ - Transform a cpu set into a string of chars to indicate - the thread allocation - """ - _result = "" - logger.debug(f"Got cpuset: {cpuset} for {osd_id}:") - for item in cpuset: - logger.debug(f"Got {item}:") - if item not in self.proc_groups: - logger.error(f"{item} not in proc_groups") - return _result - _id = self.proc_groups[item]["name"] - logger.debug(f"Got {_id}.{osd_id}") - _result += to_color( - f"{_id}.{osd_id}", - self.proc_groups[item]["color"], - # self.proc_groups[item]["name"], self.proc_groups[item]["color"] - ) - return _result - def save_grid_json(self): """ Save the grid into a .JSON Shall we use the same name as the config list replaced extension """ if self.jsonName: + # Ensure the struct is OSD.id: [array of CPU entries] + # Sorts the CPU entries by numeric order: + # int_docs_info = {int(k) : v for k, v in dc.items()} + # sorted_dict = dict(sorted(int_docs_info.items())) with open(self.jsonName, "w", encoding="utf-8") as f: json.dump( self.entries, f, indent=4, sort_keys=True, default=serialize_sets ) f.close() - def _get_tgroup(self, tname: str): + def _get_tgroup(self, tname: str) -> str: """ - Get the proc_groups from the thread name + Get the THREAD_TYPES from the thread name. + Return the thread name if not registered as part of Crimson OSD process. """ - for k in self.proc_groups: - if self.proc_groups[k]["regex"].match(tname): + for k in THREAD_TYPES: + if THREAD_TYPES[k]["regex"].match(tname): return k - logger.debug(f"{tname}: not registered in groups") return tname - def _get_cpu_range(self, cpu_uid: str, cpu_range: str): + def _get_cpu_range(self, cpu_uid: str, cpu_range: str) -> Set: """ Get the cpu id range provided by taskset (if exist) The first arg is the cpuid from ps field PSR - Returns the corresponding list as a set + Returns the corresponding list as a set. + We might produce a bitmask for the corresponding range. """ cpu_list = [] regex = re.compile(r"(\d+)([,-](\d+))?") @@ -342,29 +394,28 @@ class TasksetEntry(object): def _parse_via_regex(self, line: str): """ - Bug in the REGEx, alternative working fine + Bug in the REGEx, alternative working fine, left for reference if required in the future. """ - logger.debug(f"Parsing: {line}") match = self.LINE_REGEX.search(line) if match: groups = match.groups() - logger.debug(f"Got groups: {groups}") tname = self._get_tgroup(groups[0]) cpuid = str(groups[1]) return tname, cpuid - def parse(self, fname: str): + def parse(self, fname: str) -> Dict: """ - Parses individual _thread.out file + Parses an individual _thread.out file. Returns a dict whose keys are cpuid, values are dicts with the threads names, process group association (Reactor, Alien, Bluestore) - represented as a set (idempotent, we can later look at add info such as occurrences) + represented as a set. """ - entry = {} + entry: Dict[int, Dict[str, List[Any]]] = {} with open(fname, "r") as _data: f_info = os.fstat(_data.fileno()) if f_info.st_size == 0: - print(f"input file {fname} is empty") + print(f"Input file {fname} is empty") + logger.error(f"Input file {fname} is empty") return entry lines = _data.read().splitlines() _data.close() @@ -382,51 +433,63 @@ class TasksetEntry(object): return entry - def merge_entries(self, new_entry: dict, osd_id: int): + def merge_entries(self, new_entry: dict, osd_id): """ Merges (via set union) with the new entry (eg. OSD num) keys of the new_entry are cpuid """ - for k in new_entry.keys(): + for k in new_entry.keys(): # cpuid entry = self.entries[osd_id] if k not in entry: entry[k] = new_entry[k] else: entry[k].update(new_entry[k]) - def set_cpu_in_grid(self, cpuid: int, cpuset, osd_id: int): + def set_cpu_in_grid(self, cpuid: int, cpuset, osd_id): """ Given a cpuid and its contents, set the corresponding CpuGrid - for given OSD + for a given OSD. """ - vstr = self._get_str(cpuset, osd_id) sindex, is_phys = self.lscpu.get_socket(cpuid) grid = self.sockets[sindex] - grid.set_cell(cpuid, is_phys, vstr) + osd = self.get_osd_num(osd_id) + logger.debug(f"set_cpu_in_grid:{sindex}, {is_phys}") + grid.set_cell(cpuid, osd, cpuset, is_phys) + + def get_osd_num(self, osd_id): + """ + Extract the OSD number from the OSD id. + """ + num = 0 # default + regex = re.compile(r"^osd_(\d+)$") + m = regex.search(osd_id) + if m: + num = m.group(1) + return num def get_osd_id(self, setup: str): """ - Extract the OSD number from the filename (which should follow the expected convention) + Extract the OSD number from the filename (which should follow the expected convention). """ - osd_id = 0 # default - regex = re.compile(r"^osd_(\d+).*$") + osd_id = "osd_0" # default + regex = re.compile(r"^(osd_\d+).*$") m = regex.search(setup) if m: - osd_id = int(m.group(1)) + osd_id = m.group(1) return osd_id - def update_grid(self, setup: str, osd_id: int): + def update_grid(self, setup: str, osd_id): """ - Update the sockets grid for the given setup + Update the sockets grid for the given setup that is indicated per OSD process. """ - print(f"== {setup} ==") # OSD process - - for _s in self.sockets: - _s.set_header() + print(f"== {setup} ==") + logger.debug(f"== {setup} ==") entry = self.entries[osd_id] for cpuid, cpuset in entry.items(): self.set_cpu_in_grid(int(cpuid), cpuset, osd_id) + for _s in self.sockets: + _s.show_grid() def show_grid(self): """ @@ -434,11 +497,11 @@ class TasksetEntry(object): each line. """ for _s in self.sockets: + _s.set_header() _s.make_grid() - _s.show_grid() # join the grid lines per socket: this should be done in a more Pythonic way... - for i in range(self.sockets[0].get_num_lines()): + for _ in range(self.sockets[0].get_num_lines()): line = " + ".join(_s.next() for _s in self.sockets) print(line) @@ -458,23 +521,17 @@ class TasksetEntry(object): config_file.close() self.osd_num = len(out_files) - for entry in range(self.osd_num): - self.entries.append({}) - print(f"loading {len(out_files)} .out files ...") - # pp = pprint.PrettyPrinter(width=41, compact=True) # The number of files should be the same as the number of OSDs for fname in out_files: # Extract the OSD id from fname - cpuNodeList = self.parse(fname) + cpuNodeDict = self.parse(fname) osd_id = self.get_osd_id(fname) - # pp.pprint(cpuNodeList)# Ok - # merged = {**self.entries, **cpuNodeList } - # Show the grid for this fname - self.merge_entries(cpuNodeList, osd_id) + if osd_id not in self.entries: + self.entries[osd_id] = {} + self.merge_entries(cpuNodeDict, osd_id) self.update_grid(fname, osd_id) - # logger.debug(f"Got entries: {self.entries}:") def run(self): """ @@ -489,8 +546,10 @@ class TasksetEntry(object): { "phy_start": self.lscpu.get_physical_start(s), "ht_start": self.lscpu.get_ht_start(s), - "num_cores": self.lscpu.get_num_physical(s), + "num_cores": self.lscpu.get_num_physical(), }, + # Use this to calculate the size of grid + self.lscpu.get_num_logical_cpus(), ) ) self.traverse_files() @@ -504,8 +563,10 @@ def main(argv): # Produce a CPU distribution visualisation grid for a single file: %prog -c osd_0_crimson_1osd_16reactor_256at_8fio_lt_disable_ht_threads.out - # Produce a CPU distribution visualisation grid for a _list _of files: - %prog -c crimson_1osd_16reactor_lt_disable_list + # Produce a CPU distribution visualisation grid for a _list _of files (located in /tmp) and the output + from ps and taskset: + %prog -v -c crimson_1osd_16reactor_lt_disable_list -d /tmp -u numa_nodes.json + """ parser = argparse.ArgumentParser( description="""This tool is used to parse output from the combined taskset and ps commands""", @@ -521,15 +582,23 @@ def main(argv): help="Input file: either containing a _list_ of _threads.out files, or a single .out file", default=None, ) - # load the NUMA summary -- or lscpu --json - parser.add_argument( + # Load the NUMA CPU summary from lscpu --json + cmd_grp = parser.add_mutually_exclusive_group() + cmd_grp.add_argument( "-u", "--lscpu", type=str, - required=True, help="Input file: .json file produced by lscpu --json", default=None, ) + cmd_grp.add_argument( + "-t", + "--taskset", + type=str, + help="The taskset argument of the parent process (eg. vstart)", + default=None, + ) + parser.add_argument( "-i", "--client", @@ -560,13 +629,17 @@ def main(argv): with tempfile.NamedTemporaryFile(dir="/tmp", delete=False) as tmpfile: logging.basicConfig(filename=tmpfile.name, encoding="utf-8", level=logLevel) - # print(f"logname: {tmpfile.name}") logger.debug(f"Got options: {options}") os.chdir(options.directory) + grid = TasksetEntry( - options.config, options.directory, options.client, options.lscpu + options.config, + options.directory, + options.client, + options.lscpu, + options.taskset, ) grid.run()