]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
tools/contrib: Add tasksetcpu.py to show the CPU allocation grid
authorJose J Palacios-Perez <perezjos@uk.ibm.com>
Wed, 27 Nov 2024 11:38:27 +0000 (11:38 +0000)
committerJose J Palacios-Perez <perezjos@uk.ibm.com>
Thu, 30 Jan 2025 10:51:37 +0000 (10:51 +0000)
Signed-off-by: Jose J Palacios-Perez <perezjos@uk.ibm.com>
src/tools/contrib/README.rst
src/tools/contrib/tasksetcpu.py [new file with mode: 0644]

index d7655e208b9d6712a3bfcf513d9deabe7845c98d..fe36cf959e764d7e3571a607fd0a5648499f7135 100644 (file)
@@ -8,3 +8,21 @@ Please do not assume any level of support.  Your mileage may vary.
 
 Each file's header must include a tracker number and an author signed-off-by
 line.
+
+
+- balance-cpu.py. An utility to distribute the Seastar reactor threads over the
+  (physical) CPU cores, according to two strategies:
+  - OSD-based (default): allocates all the reactors of the same OSD in the same 
+    NUMA socket,
+  - NUMA socket: distributes the reactors of each OSD evenly in the NUMA sockets
+    (normally two), so every OSD ends up with reactors running on both NUMA sockets.
+
+- lscpu.py. A Python module to parse the output of  ``lscpu --json`` into a dictionary
+  which is used by balance-cpu and tasksetcpu.py.
+
+- tasksetcpu.py. an utility to print a grid showing the current CPU core allocation
+  of Seastar reactors. Useful to validate that the allocation strategy is correct.
+
+For further details, please see *BalanceCPUCrimson.md* in doc/dev/crimson.
+
+
diff --git a/src/tools/contrib/tasksetcpu.py b/src/tools/contrib/tasksetcpu.py
new file mode 100644 (file)
index 0000000..28b24a6
--- /dev/null
@@ -0,0 +1,575 @@
+#!/usr/bin/python
+"""
+This script traverses the ouput from taskset and ps to produce a .JSON
+to generate an ascii grid for visualisation.
+Returns the suggested CPu cores for the FIO client.
+"""
+
+import argparse
+import logging
+import os
+import sys
+import re
+import json
+import tempfile
+from pprint import pformat
+
+# import pprint
+from lscpu import LsCpuJson
+
+__author__ = "Jose J Palacios-Perez"
+
+logger = logging.getLogger(__name__)
+
+
+def to_color(string, color):
+    """
+    Simple basic color ascii coding
+    """
+    color_code = {
+        "blue": "\033[34m",
+        "yellow": "\033[33m",
+        "green": "\033[32m",
+        "red": "\033[31m",
+    }
+    return color_code[color] + str(string) + "\033[0m"
+
+
+def serialize_sets(obj):
+    """
+    Serialise sets
+    """
+    if isinstance(obj, set):
+        return list(obj)
+
+    return obj
+
+
+class CpuCell(object):
+    """
+    Single cell representing a CPU core
+    Essentially a list of tuples, each tuple is a str (actually a letter) of the type of thread
+    running in this Cpu core. The type indicates the color code that it should be printed.
+    We only support three types: Reactors, Alien, and Bluestore threads.
+    Probably need to refactor the proc_groups dict as a member here, or a class of its own (?)
+    """
+
+    def __init__(self, cpuid, atype):
+        self.type = atype
+        self.cpuid = cpuid
+
+
+class CpuGrid(object):
+    """
+    Grid for a Single CPU socket
+    Each cell is a CpuCell, which contains a set of threads initials (single letter as per the field 'name')
+    and a color code.
+    """
+
+    ROWS = 8
+    COLS = 7
+    # width       = len(str(max(rows,cols)+1))
+    WIDTH = 5
+
+    def __init__(self, id, socket):
+        """
+        This class expects a single CPU socket, which has two sections:
+        physical of size num_cores, and a HT-sibling section with the same number
+        """
+        self.id = id
+        self.socket = socket
+        # Or more generally, a list of tuples -- these should be CpuCell
+        self.grid = [["."] * self.COLS for _ in range(self.ROWS)]
+        self.str_lines = []
+
+    def get_cell_coord(self, cpuid, is_phys):
+        """
+        Return the tuple row,col for this cpuid
+        """
+        if is_phys:
+            row = (cpuid - self.socket["phy_start"]) // self.COLS
+            col = (cpuid - self.socket["phy_start"]) % self.COLS
+        else:
+            row = (
+                (self.socket["num_cores"] // self.COLS)  # should be 4
+                + (cpuid - self.socket["ht_start"]) // self.COLS
+            )
+            col = (cpuid - self.socket["ht_start"]) % self.COLS
+        return (row, col)
+
+    def set_cell(self, cpuid, is_phys, vstr):
+        """
+        Fill the cell for cpuid with the values vstr
+        """
+        # vlen = len(vstr) // 10 # due to control chars
+        row, col = self.get_cell_coord(cpuid, is_phys)
+        self.grid[row][col] = " " + vstr + " "  # use a new object instead?
+        # self.grid[row][col] = " " * (self.WIDTH - vlen) + vstr
+
+    def set_header(self):
+        """
+        Set this socket grid header
+        """
+        header = " " * self.WIDTH + f" Socket {self.id} ".center(
+            (self.WIDTH + 1) * self.COLS, "-"
+        )
+        self.str_lines = [header]
+
+    def show_grid(self):
+        """
+        Debug show grid
+        """
+        logger.debug(f"Grid: {pformat(self.grid)}")
+        # logger.debug(f"{pformat(self.str_lines)}")
+
+    def make_rows(self, is_phys):
+        """
+        Construct the rows body of the Grid
+        Can be the physical section or HT section
+        """
+        width = self.WIDTH
+        cols = self.COLS
+        contentLine = "# | values |"
+
+        if is_phys:
+            _startp = self.socket["phy_start"]
+            _row = 0
+            _endp = 4  # self.get_cell_coord(self.socket["num_cores"]+1,is_phys)
+        else:
+            _startp = self.socket["ht_start"]
+            _row = 4  # self.get_cell_coord(self.socket["ht_start"],is_phys) # 4
+            _endp = 8  # self.get_cell_coord( self.socket["ht_start"]+self.socket["num_cores"]+1,is_phys) # 8
+
+        logger.debug(f"make_rows: {_row}, {_endp}")
+        grid_slice = self.grid[_row:_endp]
+        for i, row in enumerate(grid_slice):
+            values = "+".join(f"{v}".center(width, " ") for v in row)
+            line = contentLine.replace("values", values)
+            line = line.replace("#", f"{_startp + cols*i:>{width}d}")
+            # This separates the Physical from the HT section
+            self.str_lines.append(line)
+
+    def make_grid(self):
+        """
+        Construct the grid for this socket line by line
+        """
+        width = self.WIDTH
+        cols = self.COLS
+
+        # build frame
+        contentLine = "# | values |"
+
+        dashes = "+".join("-" * width for _ in range(cols))
+        frameLine = contentLine.replace("values", dashes)
+        frameLine = frameLine.replace("#", " " * width)
+        frameLine = frameLine.replace("| ", "+-").replace(" |", "-+")
+
+        # x-axis numbers:
+        numLine = contentLine.replace("|", " ")
+        numLine = numLine.replace("#", " " * width)
+        colNums = " ".join(f"{i:<{width}d}" for i in range(cols))
+        numLine = numLine.replace("values", colNums)
+        self.str_lines.append(numLine)
+
+        self.str_lines.append(frameLine)
+        self.make_rows(True)  # physical section
+        self.str_lines.append(frameLine)
+        self.make_rows(False)  # HT section
+        self.str_lines.append(frameLine)
+
+        # construct an iterator for this grid
+        self.itlines = iter(self.str_lines)
+
+    def get_num_lines(self):
+        """
+        Return the number of lines of the grid
+        """
+        return len(self.str_lines)
+
+    def next(self):
+        """
+        Calls iterator to return next line
+        """
+        return next(self.itlines, "")
+
+
+class TasksetEntry(object):
+    """
+    Process a sequence of taskset_ps _thread.out files to
+    produce a CpuGrid per socket and .JSON
+    """
+
+    # Only for OSD/Crimson
+    proc_groups = {
+        # TODO: log are valid thread names for both reactor and aliens
+        "reactor": {
+            "regex": re.compile(r"(crimson|reactor|syscall|log).*"),
+            "color": "red",
+            "name": "R",
+        },
+        "alien": {
+            "regex": re.compile(r"(alien-store-tp)"),
+            "color": "green",
+            "name": "A",
+        },
+        "bluestore": {
+            "regex": re.compile(r"(bstore|rocksdb|cfin).*"),
+            "color": "blue",
+            "name": "B",
+        },
+    }
+    proc_groups_set = set()
+
+    # Formmat from the _threads.out files:
+    # 1368714 1368714 crimson-osd       0     pid 1368714's current affinity list: 0
+    # 1368714 1368720 reactor-1         1     pid 1368720's current affinity list: 1
+    LINE_REGEX = re.compile(
+        r"""
+        ^\d+\s+ # PID
+        \d+\s+     # TID
+        ([^\s]+)\s+   # thread name
+        (\d+)\s+   # CPU id
+        pid\s+(\d+)[']s\s+current\s+affinity\s+list:\s+(\d+)$""",
+        re.VERBOSE,
+    )  # |re.DEBUG)
+    FILE_SUFFIX_LST = re.compile(r"_list$")  # ,(_list|.out)re.DEBUG)
+
+    def __init__(self, config, directory, num_cpu_client, lscpu):
+        """
+        This class expects either:
+        a list of result files to process into a grid (suffix _list)
+        or a single _threads.out file
+        the number of CPU intended for the clients (eg. FIO)
+        the .json from lscpu
+        """
+        self.config = config
+        m = self.FILE_SUFFIX_LST.search(config)
+        if m:
+            self.mode = "list"
+            self.jsonName = config.replace("_list", ".json")
+        else:
+            self.mode = "single"
+            self.jsonName = config.replace(".out", ".json")
+
+        self.directory = directory
+        self.num_cpu_client = num_cpu_client
+        self.osd_num = 0
+        # This should be an array of dicts, size of num OSD
+        self.entries = []  # {}
+        # From lscpu we can get the ranges
+        # self.sockets = [ Cpugrid(_i, ) for _i in range(NUM_SOCKETS)] # array of CpuGrid
+        self.sockets = []
+        self.lscpu = LsCpuJson(lscpu)
+        self.proc_groups_set.update(self.proc_groups.keys())
+
+    def traverse_dir(self):
+        """
+        Traverse the given list (.JSON) use .tex template to generate document
+        """
+        pass
+
+    def find(self, name, path):
+        """
+        find a name file in path
+        """
+        for root, dirs, files in os.walk(path):
+            if name in files:
+                return os.path.join(root, name)
+
+    def _get_str(self, cpuset, osd_id):
+        """
+        Transform a cpu set into a string of chars to indicate
+        the thread allocation
+        """
+        _result = ""
+        logger.debug(f"Got cpuset: {cpuset} for {osd_id}:")
+        for item in cpuset:
+            logger.debug(f"Got {item}:")
+            if item not in self.proc_groups:
+                logger.error(f"{item} not in proc_groups")
+                return _result
+            _id = self.proc_groups[item]["name"]
+            logger.debug(f"Got {_id}.{osd_id}")
+            _result += to_color(
+                f"{_id}.{osd_id}",
+                self.proc_groups[item]["color"],
+                # self.proc_groups[item]["name"], self.proc_groups[item]["color"]
+            )
+        return _result
+
+    def save_grid_json(self):
+        """
+        Save the grid into a .JSON
+        Shall we use the same name as the config list replaced extension
+        """
+        if self.jsonName:
+            with open(self.jsonName, "w", encoding="utf-8") as f:
+                json.dump(
+                    self.entries, f, indent=4, sort_keys=True, default=serialize_sets
+                )
+                f.close()
+
+    def _get_tgroup(self, tname: str):
+        """
+        Get the proc_groups from the thread name
+        """
+        for k in self.proc_groups:
+            if self.proc_groups[k]["regex"].match(tname):
+                return k
+
+        logger.debug(f"{tname}: not registered in groups")
+        return tname
+
+    def _get_cpu_range(self, cpu_uid: str, cpu_range: str):
+        """
+        Get the cpu id range provided by taskset (if exist)
+        The first arg is the cpuid from ps field PSR
+        Returns the corresponding list as a set
+        """
+        cpu_list = []
+        regex = re.compile(r"(\d+)([,-](\d+))?")
+        m = regex.search(cpu_range)
+        if m:
+            start = int(m.group(1))
+            if m.group(2):
+                end = int(m.group(3))
+                cpu_list = list(range(start, end + 1))
+            else:
+                cpu_list = [start]
+        cpu_set = set(cpu_list)
+        cpu_set.update({int(cpu_uid)})
+        return cpu_set
+
+    def _parse_via_regex(self, line: str):
+        """
+        Bug in the REGEx, alternative working fine
+        """
+        logger.debug(f"Parsing: {line}")
+        match = self.LINE_REGEX.search(line)
+        if match:
+            groups = match.groups()
+            logger.debug(f"Got groups: {groups}")
+            tname = self._get_tgroup(groups[0])
+            cpuid = str(groups[1])
+            return tname, cpuid
+
+    def parse(self, fname: str):
+        """
+        Parses individual _thread.out file
+        Returns a dict whose keys are cpuid, values are dicts
+        with the threads names, process group association (Reactor, Alien, Bluestore)
+        represented as a set (idempotent, we can later look at add info such as occurrences)
+        """
+        entry = {}
+        with open(fname, "r") as _data:
+            f_info = os.fstat(_data.fileno())
+            if f_info.st_size == 0:
+                print(f"input file {fname} is empty")
+                return entry
+            lines = _data.read().splitlines()
+            _data.close()
+            for line in lines:
+                lista = line.split()
+                tid = lista[1]
+                tname = self._get_tgroup(lista[2])
+                cpu_range = self._get_cpu_range(lista[3], lista[9])
+                for cpuid in cpu_range:
+                    if cpuid not in entry:
+                        entry.update({cpuid: {tname: []}})
+                    if tname not in entry[cpuid]:
+                        entry[cpuid].update({tname: []})
+                    entry[cpuid][tname].append(tid)
+
+        return entry
+
+    def merge_entries(self, new_entry: dict, osd_id: int):
+        """
+        Merges (via set union) with the new entry (eg. OSD num)
+        keys of the new_entry are cpuid
+        """
+        for k in new_entry.keys():
+            entry = self.entries[osd_id]
+            if k not in entry:
+                entry[k] = new_entry[k]
+            else:
+                entry[k].update(new_entry[k])
+
+    def set_cpu_in_grid(self, cpuid: int, cpuset, osd_id: int):
+        """
+        Given a cpuid and its contents, set the corresponding CpuGrid
+        for given OSD
+        """
+        vstr = self._get_str(cpuset, osd_id)
+        sindex, is_phys = self.lscpu.get_socket(cpuid)
+        grid = self.sockets[sindex]
+        grid.set_cell(cpuid, is_phys, vstr)
+
+    def get_osd_id(self, setup: str):
+        """
+        Extract the OSD number from the filename (which should follow the expected convention)
+        """
+        osd_id = 0  # default
+        regex = re.compile(r"^osd_(\d+).*$")
+        m = regex.search(setup)
+        if m:
+            osd_id = int(m.group(1))
+        return osd_id
+
+    def update_grid(self, setup: str, osd_id: int):
+        """
+        Update the sockets grid for the given setup
+        """
+        print(f"== {setup} ==")  # OSD process
+
+        for _s in self.sockets:
+            _s.set_header()
+
+        entry = self.entries[osd_id]
+        for cpuid, cpuset in entry.items():
+            self.set_cpu_in_grid(int(cpuid), cpuset, osd_id)
+
+    def show_grid(self):
+        """
+        Traverse the array of sockets to join them by row to produce
+        each line.
+        """
+        for _s in self.sockets:
+            _s.make_grid()
+            _s.show_grid()
+
+        # join the grid lines per socket: this should be done in a more Pythonic way...
+        for i in range(self.sockets[0].get_num_lines()):
+            line = " + ".join(_s.next() for _s in self.sockets)
+            print(line)
+
+    def traverse_files(self):
+        """
+        Traverses the _thread.out files given in the config
+        """
+        if self.mode == "single":
+            out_files = [self.config]
+        else:
+            try:
+                config_file = open(self.config, "r")
+            except IOError as e:
+                raise argparse.ArgumentTypeError(str(e))
+            out_files = config_file.read().splitlines()
+            print(out_files)
+            config_file.close()
+
+        self.osd_num = len(out_files)
+        for entry in range(self.osd_num):
+            self.entries.append({})
+
+        print(f"loading {len(out_files)} .out files ...")
+
+        # pp = pprint.PrettyPrinter(width=41, compact=True)
+        # The number of files should be the same as the number of OSDs
+        for fname in out_files:
+            # Extract the OSD id from fname
+            cpuNodeList = self.parse(fname)
+            osd_id = self.get_osd_id(fname)
+            # pp.pprint(cpuNodeList)# Ok
+            # merged = {**self.entries, **cpuNodeList }
+            # Show the grid for this fname
+            self.merge_entries(cpuNodeList, osd_id)
+            self.update_grid(fname, osd_id)
+        # logger.debug(f"Got entries: {self.entries}:")
+
+    def run(self):
+        """
+        Entry point: processes the input files, then produces the grid
+        """
+        self.lscpu.load_json()
+        self.lscpu.get_ranges()
+        for s in range(self.lscpu.get_num_sockets()):
+            self.sockets.append(
+                CpuGrid(
+                    s,
+                    {
+                        "phy_start": self.lscpu.get_physical_start(s),
+                        "ht_start": self.lscpu.get_ht_start(s),
+                        "num_cores": self.lscpu.get_num_physical(s),
+                    },
+                )
+            )
+        self.traverse_files()
+        self.show_grid()
+        self.save_grid_json()
+
+
+def main(argv):
+    examples = """
+    Examples:
+    # Produce a CPU distribution visualisation grid for a single file:
+        %prog -c osd_0_crimson_1osd_16reactor_256at_8fio_lt_disable_ht_threads.out
+
+    # Produce a CPU distribution visualisation grid for a _list _of files:
+        %prog -c crimson_1osd_16reactor_lt_disable_list
+    """
+    parser = argparse.ArgumentParser(
+        description="""This tool is used to parse output from the combined taskset and ps commands""",
+        epilog=examples,
+        formatter_class=argparse.RawDescriptionHelpFormatter,
+    )
+
+    parser.add_argument(
+        "-c",
+        "--config",
+        type=str,
+        required=True,
+        help="Input file: either containing a _list_ of _threads.out files, or a single .out file",
+        default=None,
+    )
+    # load the NUMA summary -- or lscpu --json
+    parser.add_argument(
+        "-u",
+        "--lscpu",
+        type=str,
+        required=True,
+        help="Input file: .json file produced by lscpu --json",
+        default=None,
+    )
+    parser.add_argument(
+        "-i",
+        "--client",
+        type=int,
+        required=False,
+        help="Number of CPU cores required for the FIO client",
+        default=8,
+    )
+
+    parser.add_argument(
+        "-d", "--directory", type=str, help="Directory to examine", default="./"
+    )
+    parser.add_argument(
+        "-v",
+        "--verbose",
+        action="store_true",
+        help="True to enable verbose logging mode",
+        default=False,
+    )
+
+    # parser.set_defaults(numosd=1)
+    options = parser.parse_args(argv)
+
+    if options.verbose:
+        logLevel = logging.DEBUG
+    else:
+        logLevel = logging.INFO
+
+    with tempfile.NamedTemporaryFile(dir="/tmp", delete=False) as tmpfile:
+        logging.basicConfig(filename=tmpfile.name, encoding="utf-8", level=logLevel)
+        # print(f"logname: {tmpfile.name}")
+
+    logger.debug(f"Got options: {options}")
+
+    os.chdir(options.directory)
+    grid = TasksetEntry(
+        options.config, options.directory, options.client, options.lscpu
+    )
+    grid.run()
+
+
+if __name__ == "__main__":
+    main(sys.argv[1:])