]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
tools/contrib: Add lscpu.py auxiliary module to parse the output of lscpu, rebased... 60822/head
authorJose J Palacios-Perez <perezjos@uk.ibm.com>
Wed, 27 Nov 2024 11:42:27 +0000 (11:42 +0000)
committerJose J Palacios-Perez <perezjos@uk.ibm.com>
Thu, 30 Jan 2025 10:51:37 +0000 (10:51 +0000)
Signed-off-by: Jose J Palacios-Perez <perezjos@uk.ibm.com>
src/tools/contrib/balance_cpu.py
src/tools/contrib/lscpu.py [new file with mode: 0644]
src/tools/contrib/tasksetcpu.py

index 13b917db955be24b0e0edcf080a29f63c61fcd76..0f7d4b7e3f1ae34184d32fa3d0560c6af2dbdff3 100644 (file)
@@ -8,19 +8,155 @@ Two strategies of balancing reactors over CPU cores:
 
 1) OSD based: all the reactors of each OSD run in the same CPU NUMA socket (default),
 2) Socket based: reactors for the same OSD are distributed evenly across CPU NUMA sockets.
+
+Some auxiliaries:
+- given a taskset cpu_set bitmask, identify those active physical CPU core ids and their
+  HT siblings,
+- for a gfiven OSD id, identify the corresponding CPU core ids to set.
+- convert a (decimal) comma separated intervals into a cpu_set bitmask
+
+Apply bitwise operator over each bytes variables:
+result=bytes(map (lambda a,b: a ^ b, bytes_all_cpu, bytes_fio_cpu))
+
+Given the list extracted from lscpu, apply the cpu_set bitmask from the taskset argument,
+hence disabling some core ids. For each OSD, produce the corresponding bitmask.
 """
 
 import argparse
 import logging
 import sys
+import os
+import re
 import tempfile
+from pprint import pformat
+# from typing import Dict, List, Any
+
 from lscpu import LsCpuJson
 
 __author__ = "Jose J Palacios-Perez"
 
 logger = logging.getLogger(__name__)
 
-# Defaults
+
+# Some generic bitwise functions to use from the taskset data
+def get_bit(value, bit_index):
+    """Get a power of 2 if the bit is on, 0 otherwise"""
+    return value & (1 << bit_index)
+
+
+def get_normalized_bit(value, bit_index):
+    """Return 1/0 whenever the bit is on"""
+    return (value >> bit_index) & 1
+
+
+def set_bit(value, bit_index):
+    """As it says on the tin"""
+    return value | (1 << bit_index)
+
+
+def clear_bit(value, bit_index):
+    """As it says on the tin"""
+    return value & ~(1 << bit_index)
+
+
+# Generic functions to query whether a CPU id is enabled/available or not
+def is_cpu_avail(bytes_mask, cpuid):
+    """
+    Return true if the cpuid is on
+    CPU id 0 is at the last end of the bytes_mask, the max_cpu is at bit 0
+    """
+    try:
+        return get_normalized_bit(bytes_mask[-1 - (cpuid // 8)], cpuid % 8)
+    except IndexError:
+        return False
+
+
+def set_cpu(bytes_mask, cpuid):
+    """Set cpuid on bytes_mask"""
+    try:
+        bytes_mask[-1 - (cpuid // 8)] = set_bit(bytes_mask[-1 - (cpuid // 8)], cpuid % 8)
+    except IndexError:
+        pass
+    return bytes_mask
+
+
+def get_range(bytes_mask, start, length):
+    """
+    Given a bytes_mask, return a new bytes_mask with the range of CPU ids
+    starting from start and of length, skipping those that are not available
+    """
+    result = bytearray(b"\x00" * len(bytes_mask))
+    max_cpu = 8 * len(bytes_mask)
+    while length > 0 and start < max_cpu:
+        if is_cpu_avail(bytes_mask, start):
+            set_cpu(result, start)
+            length -= 1
+        start += 1
+
+    return result
+
+
+def set_range(bytes_mask, start, end):
+    """
+    Set a range of CPU ids in bytes_mask from start to end
+    """
+    ba = bytearray(bytes_mask)
+    for i in range(start, end):
+        set_cpu(ba, i)
+    return ba
+
+
+def set_all_ht_siblings(bytes_mask):
+    """
+    Set all the HT sibling of the enabled physical CPU ids specified in bytes_mask.
+
+    Physical cores are in the range [-half_length_bytes_mask:]
+    HT siblings are in the range [0:half_length_bytes_mask-1]
+
+    Notes:
+    result=bytes(map (lambda a,b: a | b, bytes_ht, bytes_phys))
+    # result=bytes(map (lambda a,b: a | b, result[0:half_indx], bytes_mask[-half_indx:]))
+    # partial = [a | b for a, b in zip(empty[0:half_indx], bytes_mask[-half_indx:])]
+    # result = bytes( partial + bytes_mask[-half_indx:] )
+    # result = bytearray(b"\x00" * len(bytes_mask))
+    """
+    result = bytearray(bytes_mask)
+    half_indx = len(bytes_mask) // 2
+    for i in range(0, half_indx):
+        result[i] |= bytes_mask[half_indx + i]
+    return result
+
+
+def count_bits(bytes_mask: bytearray) -> int:
+    """
+    Using Python 3.9 way
+    Python 3.10: i.bit_count()
+    """
+    count = 0
+    for x in bytes_mask:
+        count += bin(x).count("1")
+    return count
+
+
+def count_phys_cpus(bytes_mask: bytearray) -> int:
+    """
+    Count the number of physical CPU from the bitmask
+    """
+    count = 0
+    half_indx = len(bytes_mask) // 2
+    count = count_bits(bytes_mask[half_indx:])
+    return count
+
+
+def is_hexadecimal_str(s: str) -> bool:
+    try:
+        int(s, 16)
+        return True
+    except ValueError:
+        return False
+
+
+# Defaults to declare, which are values that can be given as options for the script
 NUM_OSD = 8
 NUM_REACTORS = 3
 
@@ -33,6 +169,7 @@ class CpuCoreAllocator(object):
     {
     "lscpu": [
       {
+        d: { "field": "CPU(s):", "data": "112",}
         d: {'field': 'NUMA node(s):', 'data': '2'}
         d: {'field': 'NUMA node0 CPU(s):', 'data': '0-27,56-83'}
         d: {'field': 'NUMA node1 CPU(s):', 'data': '28-55,84-111'}
@@ -41,50 +178,204 @@ class CpuCoreAllocator(object):
     }
     """
 
-    def __init__(self, json_file: str, num_osd: int, num_react: int):
+    def __init__(
+        self,
+        lscpu: str = "",
+        num_osd: int = NUM_OSD,
+        num_react: int = NUM_REACTORS,
+        hex_or_range_str: str = "",
+        out_hex: bool = False,
+    ) -> None:
         """
         This class expects the output from lscpu --json, from there
         it works out a list of physical CPU uids to allocate Seastar reactors
         """
-        self.json_file = json_file
         self.num_osd = num_osd
         self.num_react = num_react
-        self._dict = {}
-        self.lscpu = LsCpuJson(json_file)
-        # self.socket_lst = LsCpuJson(json_file)
+        self.out_hex = out_hex
+        self.bytes_avail_cpus = bytes([])
+        self.hex_or_range_str = hex_or_range_str
+        self.lscpu = LsCpuJson(lscpu)
+        assert self.lscpu, f"Invalid {lscpu}"
+        # Output to produce: either hex bitmask CPU set or decimal ranges
+        self.osds_cpu_out = {"dec_ranges": {}, "hex_cpu_mask": {}}
+
+    def set_cpu_default(self) -> None:
+        """
+        From lscpu we set the max num CPUs value to indicate how many hex digits
+        a valid taskset string should have
+        """
+        # Number of hex digits required for the cpu_set bitmask
+        self.num_hex_digits = self.lscpu.get_num_logical_cpus() // 8
+        # Default bitmask: all CPUs available
+        self.ALL_CPUS = "ff" * self.num_hex_digits
+        self.bytes_all_cpu = bytes.fromhex(self.ALL_CPUS)
+
+    def parse_taskset_arg(self, cpu_range: str) -> str:
+        """
+        The taskset arg can be an hexstring describing a cpu_set bitmask, or can be
+        decimal ranges comma separated. This method parses the second case.
+        - split the ',' tuples (or singletons)
+        extract the decimal values -- validate they are within max CPU uid. Produce a
+        valid hexstring cpu_set bitmask.
+        # result_ba = bytearray(b"\x00" * len(self.bytes_all_cpu))
+        # cpu_list_int = list(range(start, end + 1))
+        # cpu_list_int = [start]
+        # cpu_set.update(set(cpu_list_int))
+        # Convert the set to an hex string for a cpu_set bitmask
+        # for item in cpu_set:
+        #    set_cpu(result_ba, item)
+        # Compare both approaches match:
+        # logging.debug(f"result_ba:{result_ba}, bytes_mask:{bytes_mask}")
+        """
+        cpu_list_str = cpu_range.split(",")
+        regex = re.compile(r"(\d+)([-](\d+))?")
+        bytes_mask = bytearray(b"\x00" * len(self.bytes_all_cpu))
+        for item in cpu_list_str:
+            m = regex.search(item)
+            if m:
+                start = int(m.group(1))
+                if m.group(2):
+                    end = int(m.group(3))
+                    bytes_mask = set_range(bytes_mask, start, end)
+                else:
+                    bytes_mask = set_cpu(bytes_mask, start)
+        return bytes(bytes_mask).hex()
+
+    def set_available_cpus(self):
+        """
+        Set the instance attribute self.bytes_avail_cpus
+        If valid taskset hex_cpu_mask, use it, otherwise use all CPUs
+        """
+        # Validate the hex_string/bytes size for the cpuset bitmask
+        if self.hex_cpu_mask:
+            try:
+                self.bytes_avail_cpus = bytes.fromhex(self.hex_cpu_mask)
+            except ValueError:
+                print(f"Ignoring invalid hex string, using default {self.ALL_CPUS}")
+                logger.error(f"Invalid taskset arg: {self.hex_cpu_mask} ")
+                self.bytes_avail_cpus = self.bytes_all_cpu
+            assert self.num_hex_digits >= len(
+                self.bytes_avail_cpus
+            ), "Invalid taskset hexstring size"
+
+    def validate_cpu_for_osd(self):
+        """
+        Validate whether there are enough CPU cores for the required OSD
+        Count the number of bits from the physical section of the bytes_avail_cpus
+
+        Note: we could use up to the maximum possible num of OSD instead of the number asked.
+        """
+        total_phys_cores = self.lscpu.get_total_physical()
+        self.num_avail_phys_cores = count_phys_cpus(bytearray(self.bytes_avail_cpus))
+
+        logger.debug(
+            f"total_phys_cores: {total_phys_cores}, avail_phys_cores: {self.num_avail_phys_cores}"
+        )
+        assert (
+            total_phys_cores >= self.num_avail_phys_cores
+        ), "Invalid available physical CPU cores"
+        max_osd_num = self.num_avail_phys_cores // self.num_react
+        assert max_osd_num > self.num_osd, "Not enough physical CPU cores"
+
+
+    def setup(self):
+        """
+        Preparation and validation of available CPU ids
+        """
+        self.lscpu.load_json()
+        self.lscpu.get_ranges()
+        self.set_cpu_default()
+        if is_hexadecimal_str(self.hex_or_range_str):
+            self.hex_cpu_mask = self.hex_or_range_str
+        else:
+            self.hex_cpu_mask = self.parse_taskset_arg(self.hex_or_range_str)
+        self.set_available_cpus()
+        logger.debug(f"self.bytes_avail_cpus: {self.bytes_avail_cpus}")
+        self.validate_cpu_for_osd()
+
+
+    def bitmask_to_range(self, bytes_mask) -> str:
+        """
+        Produce a list of decimal ranges from the bitmask cpuset
+        """
+        lista = []
+        start = -1
+        end = start
+        i = 0
+        # Do we need to check max_cpu < self.lscpu.get_num_logical_cpus():
+        max_cpu = len(bytes_mask) * 8
+        logger.debug(f"max_cpu : {max_cpu}")
+        while i<max_cpu:
+            flag = is_cpu_avail(bytes_mask, i)
+            if flag:
+                if start == -1:
+                    start = i
+                logger.debug(f"i: {i}, lista:{pformat(lista)}")
+                while is_cpu_avail(bytes_mask,i) and i<max_cpu:
+                    end = i
+                    i += 1
+                if start == end:
+                    lista.append(f"{start}")
+                else:
+                    lista.append(f"{start}-{end}")
+                start = -1
+            else:
+                logger.debug(f"not i: {i}, lista:{pformat(lista)}")
+                while not is_cpu_avail(bytes_mask, i) and i<max_cpu:
+                    i += 1
+        return ",".join(lista)
+
+
+    def set_osd_cpuset(self, osd, cpuset_ba:bytes) -> None:
+        """
+        Updates the internal attributes to trace the CPUs assigned to the OSD process
+        """
+        osd_cpu_s = bytes(cpuset_ba).hex()
+        # Update the bitset mask:
+        if osd in self.osds_cpu_out["hex_cpu_mask"]:
+            self.osds_cpu_out["hex_cpu_mask"][osd] += f",{osd_cpu_s}"
+        else:
+            self.osds_cpu_out["hex_cpu_mask"].update({osd: osd_cpu_s})
+
+        osd_cpu_str = self.bitmask_to_range(cpuset_ba)
+        self.osds_cpu_out["dec_ranges"].update({osd: osd_cpu_str})
+        logger.debug(f"self.osds_cpu_out: {pformat(self.osds_cpu_out)}")
+
 
     def do_distrib_socket_based(self):
         """
         Distribution criteria: the reactors of each OSD are distributed across the available
         NUMA sockets evenly.
-        Each OSD uses step cores from each NUMA socket.
-        Produces a list of ranges to use for the ceph config set CLI.
+        Each OSD uses step cores from each NUMA socket. Each socket is a pair of ranges (_start,_end)
+        for physical and HT. On each allocation we update the physical_start, so the next iteration
+        picks the CPU uid accordingly.
+        Produces a bitmask cpuset and list of cpu id ranges to use for the ceph config set CLI.
+
+        This method and next definitely can be refactored, possibly by defining a dictionary with callbacks
+        for each stage where differ, both use the same general algorithm.
         """
-        # Init:
+        # Init: common to both strategies
         control = []
-        cores_to_disable = set([])
         num_sockets = self.lscpu.get_num_sockets()
-        # step = self.num_react
-        total_phys_cores = self.lscpu.get_total_physical()
-        # Max num of OSD that can be allocated
-        max_osd_num = total_phys_cores // self.num_react
 
         # Each OSD uses num reactor//sockets cores
         step = self.num_react // num_sockets
         reminder = self.num_react % num_sockets
 
-        logger.debug(
-            f"total_phys_cores: {total_phys_cores}, max_osd_num: {max_osd_num}, step:{step}"
-        )
-        assert max_osd_num > self.num_osd, "Not enough physical CPU cores"
+        logger.debug(f"do_distrib_socket_based: step:{step}")
 
         # Copy the original physical ranges to the control dict
-        for socket in self.lscpu.get_sockets():  # socket_lst["sockets"]:
+        for socket in self.lscpu.get_sockets():
             control.append(socket)
+
+        # This byte array will be transformed for each OSD
+        cpu_avail_ba = bytearray(self.bytes_avail_cpus)
+        avail_s = bytes(cpu_avail_ba).hex()
+        # This dict would hold a bitsetmask in hex per OSD
+        osds_ba = {}
         # Traverse the OSD to produce an allocation
-        #  f"total_phys_cores: {total_phys_cores}, max_osd_num: {max_osd_num}, step:{step}, rem:{reminder} "
         for osd in range(self.num_osd):
-            osds = []
             for socket in control:
                 _start = socket["physical_start"]
                 _step = step
@@ -99,34 +390,46 @@ class CpuCoreAllocator(object):
                 logger.debug(
                     f"osd: {osd}, socket:{_so_id}, _start:{_start}, _end:{_end - 1}"
                 )
-                osds.append(f"{_start}-{_end - 1}")
+                # Verify this range is valid, otherwise shift as appropriate
+                cpuset_ba = get_range(cpu_avail_ba, _start, _step)
+                # Associate their HT siblings of this range
+                cpuset_ba = set_all_ht_siblings(cpuset_ba)
+                # Update the list of bitmask of this OSD
+                if osd in osds_ba:
+                    merged = bytes(map(lambda a, b: a | b, osds_ba[osd], cpuset_ba))
+                    osds_ba[osd] = merged
+                else:
+                    osds_ba.update({osd: cpuset_ba})
+                # Disable this OSD bitmaskset from the cpu_avail_ba
+                cpu_avail_ba = bytes(map(lambda a, b: a & ~b, cpu_avail_ba, cpuset_ba))
+                osd_cpu_s = bytes(cpuset_ba).hex()
+                osd_cpu_s = bytes(osds_ba[osd]).hex()
+                avail_s = bytes(cpu_avail_ba).hex()
+                logger.debug(f"-- OSD: {osd}: {osd_cpu_s}, avail:{avail_s}")
+                # Update the bitset mask
+                self.set_osd_cpuset(osd, osds_ba[osd])
 
                 if _end <= socket["physical_end"]:
                     socket["physical_start"] = _end
-                    # Produce the HT sibling list to disable
-                    # Consider to use sets to avoid dupes
+                    _ht_start = socket["ht_sibling_start"]
+                    _ht_end = socket["ht_sibling_start"] + step
                     plist = list(
                         range(
-                            socket["ht_sibling_start"],
-                            (socket["ht_sibling_start"] + _step),
+                            _ht_start,
+                            _ht_end,
                             1,
                         )
                     )
                     logger.debug(f"plist: {plist}")
-                    pset = set(plist)
-                    # _to_disable=pset.union(cores_to_disable)
-                    cores_to_disable = pset.union(cores_to_disable)
-                    logger.debug(f"cores_to_disable: {list(cores_to_disable)}")
                     socket["ht_sibling_start"] += _step
                 else:
                     # bail out
                     _sops = socket["physical_start"] + step
                     logger.debug(f"out of range: {_sops}")
                     break
-            print(",".join(osds))
-        _to_disable = sorted(list(cores_to_disable))
-        logger.debug(f"Cores to disable: {_to_disable}")
-        print(" ".join(map(str, _to_disable)))
+        # Set the reminder available CPU
+        self.set_osd_cpuset('available', bytes(cpu_avail_ba) )
+        self.osds_ba = osds_ba
 
     def do_distrib_osd_based(self):
         """
@@ -136,25 +439,24 @@ class CpuCoreAllocator(object):
         Produces a list of ranges to use for the ceph config set CLI.
         """
         control = []
-        cores_to_disable = set([])
         # Each OSD uses num reactor cores from the same NUMA socket
         num_sockets = self.lscpu.get_num_sockets()
         step = self.num_react
-        total_phys_cores = self.lscpu.get_total_physical()
-        # Max num of OSD that can be allocated
-        max_osd_num = total_phys_cores // self.num_react
-
-        logger.debug(
-            f"total_phys_cores: {total_phys_cores}, max_osd_num: {max_osd_num}, step:{step}"
-        )
-        assert max_osd_num > self.num_osd, "Not enough physical CPU cores"
 
         # Copy the original physical ranges to the control dict
         for socket in self.lscpu.get_sockets():
             control.append(socket)
+
+        # This byte array will be transformed for each OSD
+        cpu_avail_ba = bytearray(self.bytes_avail_cpus)
+        avail_s = bytes(cpu_avail_ba).hex()
+        logger.debug(f"cpu_avail_ba : {avail_s}")
+        # This dict would hold a bitsetmask per OSD
+        osds_ba = {}
         # Traverse the OSD to produce an allocation
         # even OSD num uses socket0, odd OSD number uses socket 1
         for osd in range(self.num_osd):
+            #osds = []  # List of ranges as string
             _so_id = osd % num_sockets
             socket = control[_so_id]
             _start = socket["physical_start"]
@@ -163,32 +465,69 @@ class CpuCoreAllocator(object):
             logger.debug(
                 f"osd: {osd}, socket:{_so_id}, _start:{_start}, _end:{_end - 1}"
             )
-            print(f"{_start}-{_end - 1}")
+            # Verify this range is valid, skipping unavailable CPU ids as appropriate
+            cpuset_ba = get_range(cpu_avail_ba, _start, step)
+            # Associate their HT siblings of this range -- what if some of these are disabled?
+            cpuset_ba = set_all_ht_siblings(cpuset_ba)
+            # Update the list of bitmask of this OSD
+            if osd in osds_ba:
+                merged = bytes(map(lambda a, b: a | b, osds_ba[osd], cpuset_ba))
+                osds_ba[osd] = merged
+            else:
+                osds_ba.update({osd: cpuset_ba})
+            #osds.append(f"{_start}-{_end - 1}")
+            # Disable this OSD bitmaskset from the cpu_avail_ba
+            cpu_avail_ba = bytes(map(lambda a, b: a & ~b, cpu_avail_ba, cpuset_ba))
+            osd_cpu_s = bytes(cpuset_ba).hex()
+            avail_s = bytes(cpu_avail_ba).hex()
+            logger.debug(f"-- OSD: {osd}: {osd_cpu_s}, avail:{avail_s}")
+            # Update the bitset mask
+            self.set_osd_cpuset(osd, osds_ba[osd])
+
             if _end <= socket["physical_end"]:
                 socket["physical_start"] = _end
-                # Produce the HT sibling list to disable
-                # Consider to use sets to avoid dupes
+                _ht_start = socket["ht_sibling_start"]
+                _ht_end = socket["ht_sibling_start"] + step
                 plist = list(
                     range(
-                        socket["ht_sibling_start"],
-                        (socket["ht_sibling_start"] + step),
+                        _ht_start,
+                        _ht_end,
                         1,
                     )
                 )
                 logger.debug(f"plist: {plist}")
-                pset = set(plist)
-                # _to_disable = pset.union(cores_to_disable)
-                cores_to_disable = pset.union(cores_to_disable)
-                logger.debug(f"cores_to_disable: {list(cores_to_disable)}")
                 socket["ht_sibling_start"] += step
             else:
                 # bail out
                 _sops = socket["physical_start"] + step
                 logger.debug(f"Out of range: {_sops}")
                 break
-        _to_disable = sorted(list(cores_to_disable))
-        logger.debug(f"Cores to disable: {_to_disable}")
-        print(" ".join(map(str, _to_disable)))
+        # Set the reminder available CPU
+        self.set_osd_cpuset('available', bytes(cpu_avail_ba) )
+        # Set the following to exercise the unit tests
+        self.osds_ba = osds_ba
+
+    def output_cpusets(self):
+        """
+        Generic print of the cpuset to use per OSD and the remaining list of CPU available
+        to use for everything else, eg Alien threads
+
+        # Convert a bytesarrys back to hex string
+        # hex_string = "".join("%02x" % b for b in array_alpha)
+        # print(bytes(bytes_array).hex())
+        """
+        # Output: either hex or decimal ranges
+        if self.out_hex:
+            for cpuset in self.osds_cpu_out["hex_cpu_mask"].values():
+                # one line per OSD-- ensure its sorted, last one must be the available
+                print(cpuset)
+        else:
+            for cpuset in self.osds_cpu_out["dec_ranges"].values():
+                print(cpuset)
+            #print(" ".join(map(str, self._to_disable)))
+
+        logger.debug(f"osds_cpu_out: {pformat(self.osds_cpu_out)}")
+
 
     def run(self, distribute_strat):
         """
@@ -196,21 +535,22 @@ class CpuCoreAllocator(object):
         produce the corresponding balance, print the balance as a list intended to be
         consumed by vstart.sh -- a dictionary will be used for cephadm.
         """
-        self.lscpu.load_json()
-        self.lscpu.get_ranges()
+        self.setup()
         if distribute_strat == "socket":
             self.do_distrib_socket_based()
         else:
             self.do_distrib_osd_based()
 
+        self.output_cpusets()
+
 
 def main(argv):
     examples = """
     Examples:
     # Produce a balanced CPU distribution of physical CPU cores intended for the Seastar
         reactor threads
-        %prog -u <lscpu.json> [-b <osd|socket>] [-d<dir>] [-v]
-                 [-o <num_OSDs>] [-r <num_reactors>]
+        %prog [-u <lscpu.json>|-t <taskset_mask>] [-b <osd|socket>] [-d<dir>] [-v]
+              [-o <num_OSDs>] [-r <num_reactors>]
 
     # such a list can be used for vstart.sh/cephadm to issue ceph conf set commands.
     """
@@ -219,21 +559,27 @@ def main(argv):
         epilog=examples,
         formatter_class=argparse.RawDescriptionHelpFormatter,
     )
+    parser.add_argument(
+        "-o",
+        "--num_osd",
+        type=int,
+        required=False,
+        help="Number of OSDs",
+        default=NUM_OSD,
+    )
     parser.add_argument(
         "-u",
         "--lscpu",
         type=str,
-        required=True,
         help="Input file: .json file produced by lscpu --json",
         default=None,
     )
     parser.add_argument(
-        "-o",
-        "--num_osd",
-        type=int,
-        required=False,
-        help="Number of OSDs",
-        default=NUM_OSD,
+        "-t",
+        "--taskset",
+        type=str,
+        help="The taskset argument of the parent process (eg. vstart)",
+        default=None,
     )
     parser.add_argument(
         "-r",
@@ -252,7 +598,7 @@ def main(argv):
         type=str,
         required=False,
         help="CPU balance strategy: osd (default), socket (NUMA)",
-        default="osd",
+        default=False,
     )
     parser.add_argument(
         "-v",
@@ -261,6 +607,13 @@ def main(argv):
         help="True to enable verbose logging mode",
         default=False,
     )
+    parser.add_argument(
+        "-x",
+        "--hexcpuset",
+        action="store_true",
+        help="True to enable hexadecimal cpuset bitmask output",
+        default=False,
+    )
 
     # parser.set_defaults(numosd=1)
     options = parser.parse_args(argv)
@@ -274,8 +627,15 @@ def main(argv):
         logging.basicConfig(filename=tmpfile.name, encoding="utf-8", level=logLevel)
 
     logger.debug(f"Got options: {options}")
+    os.chdir(options.directory)
 
-    cpu_cores = CpuCoreAllocator(options.lscpu, options.num_osd, options.num_reactor)
+    cpu_cores = CpuCoreAllocator(
+        options.lscpu,
+        options.num_osd,
+        options.num_reactor,
+        options.taskset,
+        options.hexcpuset,
+    )
     cpu_cores.run(options.balance)
 
 
diff --git a/src/tools/contrib/lscpu.py b/src/tools/contrib/lscpu.py
new file mode 100644 (file)
index 0000000..32dd2d9
--- /dev/null
@@ -0,0 +1,159 @@
+#!/usr/bin/python
+"""
+This module gets the output from lscpu and produces a list of CPU uids
+corresponding to physical cores.
+"""
+
+# import logging
+import os
+import re
+import json
+# import tempfile
+# import pprint
+
+__author__ = "Jose J Palacios-Perez"
+
+# logger = logging.getLogger(__name__)
+
+
+class LsCpuJson(object):
+    """
+    Process a sequence of CPU core ids
+
+    # lscpu --json
+    {
+    "lscpu": [
+      {
+        d: { "field": "CPU(s):", "data": "112"}
+        d: {"field": "Core(s) per socket:", "data": "28"}
+        d: {'field': 'NUMA node(s):', 'data': '2'}
+        d: {'field': 'NUMA node0 CPU(s):', 'data': '0-27,56-83'}
+        d: {'field': 'NUMA node1 CPU(s):', 'data': '28-55,84-111'}
+      }
+      :
+    }
+    """
+
+    def __init__(self, json_file: str):
+        """
+        This class expects the output from lscpu --json
+        """
+        self.json_file = json_file
+        self._dict = {}
+        self.socket_lst = {
+            "num_sockets": 0,
+            "num_logical_cpus": 0,
+            "num_cores_per_socket": 0,
+            # or more general, an array, index is the socket number
+            "sockets": [],
+        }
+
+    def load_json(self):
+        """
+        Load the lscpu --json output
+        """
+        json_file = self.json_file
+        with open(json_file, "r") as json_data:
+            # check for empty file
+            f_info = os.fstat(json_data.fileno())
+            if f_info.st_size == 0:
+                print(f"JSON input file {json_file} is empty")
+                return  # Should assert
+            self._dict = json.load(json_data)
+            json_data.close()
+        # logger.debug(f"_dict: {self._dict}")
+
+    def get_num_sockets(self):
+        """
+        Accessor
+        """
+        return self.socket_lst["num_sockets"]
+
+    def get_physical_start(self, sindex):
+        """
+        Accessor: cpu core id start physical
+        """
+        return self.socket_lst["sockets"][sindex]["physical_start"]
+
+    def get_ht_start(self, sindex):
+        """
+        Accessor: cpu core id start ht-sibling
+        """
+        return self.socket_lst["sockets"][sindex]["ht_sibling_start"]
+
+    def get_num_physical(self):
+        """
+        Accessor: num physical cpu core ids
+        """
+        return self.socket_lst["num_cores_per_socket"]
+
+    def get_num_logical_cpus(self):
+        """
+        Accessor: num CPU ids
+        """
+        return self.socket_lst["num_logical_cpus"]
+
+    def get_total_physical(self):
+        """
+        Accessor: sum of the physical cores for all sockets
+        """
+        return self.get_num_sockets() * self.get_num_physical()
+        
+
+    def get_socket(self, cpuid: int):
+        """
+        Accessor: given cpuid returns which socket number and
+        whether is a physical (True) or ht-sibling (False)
+        """
+        for _i, s in enumerate(self.socket_lst["sockets"]):
+            if s["physical_start"] <= cpuid and cpuid <= s["physical_end"]:
+                return (_i, True)
+            if s["ht_sibling_start"] <= cpuid and cpuid <= s["ht_sibling_end"]:
+                return (_i, False)
+
+    def get_ranges(self):
+        """
+        Parse the .json from lscpu
+        (we might extend this to parse either version: normal or .json)
+        """
+        ncpu_re = re.compile(r"^CPU\(s\):$")
+        numa_re = re.compile(r"NUMA node\(s\):")
+        node_re = re.compile(r"NUMA node(\d+) CPU\(s\):")
+        ranges_re = re.compile(r"(\d+)-(\d+),(\d+)-(\d+)")
+        cores_re = re.compile(r"^Core\(s\) per socket:$")
+        socket_lst = self.socket_lst
+        for d in self._dict["lscpu"]:
+            # logger.debug(f"d: {d}")
+            m = numa_re.search(d["field"])
+            if m:
+                socket_lst["num_sockets"] = int(d["data"])
+                continue
+            m = node_re.search(d["field"])
+            if m:
+                socket = m.group(1)
+                m = ranges_re.search(d["data"])
+                if m:
+                    drange = {
+                        "socket": int(socket),
+                        "physical_start": int(m.group(1)),
+                        "physical_end": int(m.group(2)),
+                        "ht_sibling_start": int(m.group(3)),
+                        "ht_sibling_end": int(m.group(4)),
+                    }
+                    socket_lst["sockets"].append(drange)
+                    continue
+            m = ncpu_re.search(d["field"])
+            if m:
+                socket_lst["num_logical_cpus"] = int(d["data"])
+                continue
+            m = cores_re.search(d["field"]) 
+            if m:
+                socket_lst["num_cores_per_socket"] = int(d["data"])
+        # logger.debug(f"result: {socket_lst}")
+        assert self.socket_lst["num_sockets"] > 0, "Failed to parse lscpu"
+
+    def get_sockets(self):
+        """
+        Return the socket_lst["sockets"]
+        """
+        return self.socket_lst["sockets"]
index 28b24a6dcdfad8637b0124aaae30f0d860cdf06b..2d238d18ec2da11219a1b654e6b8a3a705b0ad42 100644 (file)
@@ -2,7 +2,6 @@
 """
 This script traverses the ouput from taskset and ps to produce a .JSON
 to generate an ascii grid for visualisation.
-Returns the suggested CPu cores for the FIO client.
 """
 
 import argparse
@@ -12,9 +11,8 @@ import sys
 import re
 import json
 import tempfile
+from typing import Dict, List, Any, Set
 from pprint import pformat
-
-# import pprint
 from lscpu import LsCpuJson
 
 __author__ = "Jose J Palacios-Perez"
@@ -22,9 +20,9 @@ __author__ = "Jose J Palacios-Perez"
 logger = logging.getLogger(__name__)
 
 
-def to_color(string, color):
+def to_color(string: str, color: str) -> str:
     """
-    Simple basic color ascii coding
+    Simple basic color ANSI/ASCII Coding
     """
     color_code = {
         "blue": "\033[34m",
@@ -35,7 +33,19 @@ def to_color(string, color):
     return color_code[color] + str(string) + "\033[0m"
 
 
-def serialize_sets(obj):
+def ljust_color(text: str, padding: int, char=" ") -> str:
+    """
+    Find all matching ANSI sequences, then get the total length
+    of all matches to serve as our offset when we add it to the padding value
+    """
+    pattern = r"(?:\x1B[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]"
+
+    matches = re.findall(pattern, text)
+    offset = sum(len(match) for match in matches)
+    return text.ljust(padding + offset, char[0])
+
+
+def serialize_sets(obj) -> Any:
     """
     Serialise sets
     """
@@ -45,44 +55,121 @@ def serialize_sets(obj):
     return obj
 
 
+THREAD_TYPES = {
+    # log is a valid thread name for both reactor and aliens
+    "reactor": {
+        "regex": re.compile(r"(crimson|reactor|syscall|log).*"),
+        "color": "red",
+        "name": "R",
+    },
+    "alien": {
+        "regex": re.compile(r"(alien-store-tp)"),
+        "color": "green",
+        "name": "A",
+    },
+    "bluestore": {
+        "regex": re.compile(r"(bstore|rocksdb|cfin).*"),
+        "color": "blue",
+        "name": "B",
+    },
+}
+
+
 class CpuCell(object):
     """
-    Single cell representing a CPU core
-    Essentially a list of tuples, each tuple is a str (actually a letter) of the type of thread
-    running in this Cpu core. The type indicates the color code that it should be printed.
-    We only support three types: Reactors, Alien, and Bluestore threads.
-    Probably need to refactor the proc_groups dict as a member here, or a class of its own (?)
+    Single cell representing the threads allocated to a CPU core.
+    Each cell is a set of thread types (Reactor, Alien, Bluestore).
+    The Reactor type should be mutually exclusive to the other types.
+    The type indicates the color code that the thread set should be printed.
     """
 
-    def __init__(self, cpuid, atype):
-        self.type = atype
+    def __init__(self, cpuid=0):
+        """
+        Construct an empty CpuCell:
+        Might need extending to
+        { OSD: set(thread_types) }
+        """
+        self.osd_id = -1
+        self.cpuid = cpuid
+        self._set = set([])
+
+    def update(self, cpuid, cpuset: Dict[str, Any], osd_id: str) -> None:
+        """
+        Update the contents of a CpuCell.
+        """
+        _tlist = []
+        self.osd_id = osd_id
         self.cpuid = cpuid
+        for thread_id in cpuset:
+            if thread_id not in THREAD_TYPES:
+                logger.error(f"{thread_id} not in THREAD_TYPES")
+            else:
+                _tlist.append(thread_id)
+        self._set.update(set(_tlist))
+        logger.debug(f"--{self._set}--")
+
+    def __str__(self) -> str:
+        """
+        Method to be called by str() on instances of this class.
+        """
+        _str = "".join([THREAD_TYPES[item]["name"] for item in self._set])
+        return f"{self.osd_id}.{_str}"
+
+    def __repr__(self) -> str:
+        """
+        Method to represent an instance of this class.
+        Used implicitly by eg. pformat()
+        """
+        _str = "".join([THREAD_TYPES[item]["name"] for item in self._set])
+        return f"{self.osd_id}.{_str}"
+        # return f"{self.cpuid}.{_str}"
+
+    def print(self, width=0) -> str:
+        """
+        Print the cell in color coded.
+        """
+        _str = "."
+        _tlen = 1
+        for _id in self._set:
+            _name = THREAD_TYPES[_id]["name"]
+            _color = THREAD_TYPES[_id]["color"]
+            _item = f"{_name}{self.osd_id}"
+            _tlen += len(_item)
+            _str += to_color(
+                _item,
+                _color,
+            )
+        return ljust_color(_str, width)
 
 
 class CpuGrid(object):
     """
-    Grid for a Single CPU socket
-    Each cell is a CpuCell, which contains a set of threads initials (single letter as per the field 'name')
-    and a color code.
+    Grid for a Single CPU socket, basicaly a matrix of CpuCells.
     """
 
+    NUM_CPUS = 112
     ROWS = 8
     COLS = 7
     # width       = len(str(max(rows,cols)+1))
-    WIDTH = 5
+    WIDTH = 6
 
-    def __init__(self, id, socket):
+    def __init__(
+        self, id: int, socket: Dict[str, int], num_cpus: int = NUM_CPUS
+    ) -> None:
         """
         This class expects a single CPU socket, which has two sections:
-        physical of size num_cores, and a HT-sibling section with the same number
+        physical of size num_cores, and a HT-sibling section with the same number.
         """
         self.id = id
         self.socket = socket
-        # Or more generally, a list of tuples -- these should be CpuCell
-        self.grid = [["."] * self.COLS for _ in range(self.ROWS)]
-        self.str_lines = []
+        self.ROWS = num_cpus // (self.COLS * 2)
+        self.grid = [
+            [CpuCell(self.COLS * row + col) for col in range(self.COLS)]
+            for row in range(self.ROWS)
+        ]
+        self.str_lines: List[str] = []
 
-    def get_cell_coord(self, cpuid, is_phys):
+    def get_cell_coord(self, cpuid: int, is_phys: bool):
         """
         Return the tuple row,col for this cpuid
         """
@@ -90,42 +177,36 @@ class CpuGrid(object):
             row = (cpuid - self.socket["phy_start"]) // self.COLS
             col = (cpuid - self.socket["phy_start"]) % self.COLS
         else:
-            row = (
-                (self.socket["num_cores"] // self.COLS)  # should be 4
-                + (cpuid - self.socket["ht_start"]) // self.COLS
-            )
+            row = (self.ROWS // 2) + (cpuid - self.socket["ht_start"]) // self.COLS
             col = (cpuid - self.socket["ht_start"]) % self.COLS
         return (row, col)
 
-    def set_cell(self, cpuid, is_phys, vstr):
+    def set_cell(
+        self, cpuid: int, osd_id, cpuset: Dict[str, List[Any]], is_phys: bool
+    ) -> None:
         """
         Fill the cell for cpuid with the values vstr
         """
-        # vlen = len(vstr) // 10 # due to control chars
         row, col = self.get_cell_coord(cpuid, is_phys)
-        self.grid[row][col] = " " + vstr + " "  # use a new object instead?
-        # self.grid[row][col] = " " * (self.WIDTH - vlen) + vstr
+        logger.debug(f"cpu{cpuid}: {row},{col}")
+        try:
+            self.grid[row][col].update(cpuid, cpuset, osd_id)
+        except IndexError:
+            logger.error(f"{is_phys}-index_out_of_range for {cpuid}: {row},{col}")
 
     def set_header(self):
         """
-        Set this socket grid header
+        Set this socket grid header.
         """
         header = " " * self.WIDTH + f" Socket {self.id} ".center(
             (self.WIDTH + 1) * self.COLS, "-"
         )
         self.str_lines = [header]
 
-    def show_grid(self):
-        """
-        Debug show grid
-        """
-        logger.debug(f"Grid: {pformat(self.grid)}")
-        # logger.debug(f"{pformat(self.str_lines)}")
-
     def make_rows(self, is_phys):
         """
-        Construct the rows body of the Grid
-        Can be the physical section or HT section
+        Construct the rows body of the Grid.
+        Can be the physical section or HT section.
         """
         width = self.WIDTH
         cols = self.COLS
@@ -134,16 +215,15 @@ class CpuGrid(object):
         if is_phys:
             _startp = self.socket["phy_start"]
             _row = 0
-            _endp = 4  # self.get_cell_coord(self.socket["num_cores"]+1,is_phys)
+            _endp = self.ROWS // 2  # 4
         else:
             _startp = self.socket["ht_start"]
-            _row = 4  # self.get_cell_coord(self.socket["ht_start"],is_phys) # 4
-            _endp = 8  # self.get_cell_coord( self.socket["ht_start"]+self.socket["num_cores"]+1,is_phys) # 8
+            _row = self.ROWS // 2  # 4
+            _endp = self.ROWS  # 8
 
-        logger.debug(f"make_rows: {_row}, {_endp}")
         grid_slice = self.grid[_row:_endp]
         for i, row in enumerate(grid_slice):
-            values = "+".join(f"{v}".center(width, " ") for v in row)
+            values = "+".join(f"{v.print(width)}" for v in row)
             line = contentLine.replace("values", values)
             line = line.replace("#", f"{_startp + cols*i:>{width}d}")
             # This separates the Physical from the HT section
@@ -151,7 +231,7 @@ class CpuGrid(object):
 
     def make_grid(self):
         """
-        Construct the grid for this socket line by line
+        Construct the grid for this socket line by line.
         """
         width = self.WIDTH
         cols = self.COLS
@@ -192,37 +272,23 @@ class CpuGrid(object):
         """
         return next(self.itlines, "")
 
+    def show_grid(self):
+        """
+        Debug show grid
+        """
+        logger.debug(f"Socket: {self.id} Grid: {pformat(self.grid)}")
+
 
 class TasksetEntry(object):
     """
     Process a sequence of taskset_ps _thread.out files to
     produce a CpuGrid per socket and .JSON
-    """
-
-    # Only for OSD/Crimson
-    proc_groups = {
-        # TODO: log are valid thread names for both reactor and aliens
-        "reactor": {
-            "regex": re.compile(r"(crimson|reactor|syscall|log).*"),
-            "color": "red",
-            "name": "R",
-        },
-        "alien": {
-            "regex": re.compile(r"(alien-store-tp)"),
-            "color": "green",
-            "name": "A",
-        },
-        "bluestore": {
-            "regex": re.compile(r"(bstore|rocksdb|cfin).*"),
-            "color": "blue",
-            "name": "B",
-        },
-    }
-    proc_groups_set = set()
 
-    # Formmat from the _threads.out files:
+    # Format from the _threads.out files:
     # 1368714 1368714 crimson-osd       0     pid 1368714's current affinity list: 0
     # 1368714 1368720 reactor-1         1     pid 1368720's current affinity list: 1
+    """
+
     LINE_REGEX = re.compile(
         r"""
         ^\d+\s+ # PID
@@ -231,16 +297,18 @@ class TasksetEntry(object):
         (\d+)\s+   # CPU id
         pid\s+(\d+)[']s\s+current\s+affinity\s+list:\s+(\d+)$""",
         re.VERBOSE,
-    )  # |re.DEBUG)
-    FILE_SUFFIX_LST = re.compile(r"_list$")  # ,(_list|.out)re.DEBUG)
+    )
+    FILE_SUFFIX_LST = re.compile(r"_list$")
 
-    def __init__(self, config, directory, num_cpu_client, lscpu):
+    def __init__(
+        self, config, directory, num_cpu_client, lscpu: str = "", taskset=None
+    ):
         """
-        This class expects either:
-        a list of result files to process into a grid (suffix _list)
-        or a single _threads.out file
-        the number of CPU intended for the clients (eg. FIO)
-        the .json from lscpu
+        This class expects:
+        -   either a list of result files to process into a grid (suffix _list)
+            or a single _threads.out file
+        -   the number of CPU intended for the clients (eg. FIO)
+        -   the .json from lscpu.
         """
         self.config = config
         m = self.FILE_SUFFIX_LST.search(config)
@@ -254,13 +322,13 @@ class TasksetEntry(object):
         self.directory = directory
         self.num_cpu_client = num_cpu_client
         self.osd_num = 0
-        # This should be an array of dicts, size of num OSD
-        self.entries = []  # {}
-        # From lscpu we can get the ranges
-        # self.sockets = [ Cpugrid(_i, ) for _i in range(NUM_SOCKETS)] # array of CpuGrid
-        self.sockets = []
-        self.lscpu = LsCpuJson(lscpu)
-        self.proc_groups_set.update(self.proc_groups.keys())
+        # This is a dict with keys OSD num and values dicts w/keys CPU uid, values list of thread id
+        self.entries = {}
+        self.sockets: List[CpuGrid] = []
+        # We probably might not need this attribute
+        self.taskset = taskset
+        if lscpu:
+            self.lscpu = LsCpuJson(lscpu)
 
     def traverse_dir(self):
         """
@@ -272,59 +340,43 @@ class TasksetEntry(object):
         """
         find a name file in path
         """
-        for root, dirs, files in os.walk(path):
+        for root, _, files in os.walk(path):
             if name in files:
                 return os.path.join(root, name)
 
-    def _get_str(self, cpuset, osd_id):
-        """
-        Transform a cpu set into a string of chars to indicate
-        the thread allocation
-        """
-        _result = ""
-        logger.debug(f"Got cpuset: {cpuset} for {osd_id}:")
-        for item in cpuset:
-            logger.debug(f"Got {item}:")
-            if item not in self.proc_groups:
-                logger.error(f"{item} not in proc_groups")
-                return _result
-            _id = self.proc_groups[item]["name"]
-            logger.debug(f"Got {_id}.{osd_id}")
-            _result += to_color(
-                f"{_id}.{osd_id}",
-                self.proc_groups[item]["color"],
-                # self.proc_groups[item]["name"], self.proc_groups[item]["color"]
-            )
-        return _result
-
     def save_grid_json(self):
         """
         Save the grid into a .JSON
         Shall we use the same name as the config list replaced extension
         """
         if self.jsonName:
+            # Ensure the struct is OSD.id: [array of CPU entries]
+            # Sorts the CPU entries by numeric order:
+            # int_docs_info = {int(k) : v for k, v in dc.items()}
+            # sorted_dict = dict(sorted(int_docs_info.items()))
             with open(self.jsonName, "w", encoding="utf-8") as f:
                 json.dump(
                     self.entries, f, indent=4, sort_keys=True, default=serialize_sets
                 )
                 f.close()
 
-    def _get_tgroup(self, tname: str):
+    def _get_tgroup(self, tname: str) -> str:
         """
-        Get the proc_groups from the thread name
+        Get the THREAD_TYPES from the thread name.
+        Return the thread name if not registered as part of Crimson OSD process.
         """
-        for k in self.proc_groups:
-            if self.proc_groups[k]["regex"].match(tname):
+        for k in THREAD_TYPES:
+            if THREAD_TYPES[k]["regex"].match(tname):
                 return k
 
-        logger.debug(f"{tname}: not registered in groups")
         return tname
 
-    def _get_cpu_range(self, cpu_uid: str, cpu_range: str):
+    def _get_cpu_range(self, cpu_uid: str, cpu_range: str) -> Set:
         """
         Get the cpu id range provided by taskset (if exist)
         The first arg is the cpuid from ps field PSR
-        Returns the corresponding list as a set
+        Returns the corresponding list as a set.
+        We might produce a bitmask for the corresponding range.
         """
         cpu_list = []
         regex = re.compile(r"(\d+)([,-](\d+))?")
@@ -342,29 +394,28 @@ class TasksetEntry(object):
 
     def _parse_via_regex(self, line: str):
         """
-        Bug in the REGEx, alternative working fine
+        Bug in the REGEx, alternative working fine, left for reference if required in the future.
         """
-        logger.debug(f"Parsing: {line}")
         match = self.LINE_REGEX.search(line)
         if match:
             groups = match.groups()
-            logger.debug(f"Got groups: {groups}")
             tname = self._get_tgroup(groups[0])
             cpuid = str(groups[1])
             return tname, cpuid
 
-    def parse(self, fname: str):
+    def parse(self, fname: str) -> Dict:
         """
-        Parses individual _thread.out file
+        Parses an individual _thread.out file.
         Returns a dict whose keys are cpuid, values are dicts
         with the threads names, process group association (Reactor, Alien, Bluestore)
-        represented as a set (idempotent, we can later look at add info such as occurrences)
+        represented as a set.
         """
-        entry = {}
+        entry: Dict[int, Dict[str, List[Any]]] = {}
         with open(fname, "r") as _data:
             f_info = os.fstat(_data.fileno())
             if f_info.st_size == 0:
-                print(f"input file {fname} is empty")
+                print(f"Input file {fname} is empty")
+                logger.error(f"Input file {fname} is empty")
                 return entry
             lines = _data.read().splitlines()
             _data.close()
@@ -382,51 +433,63 @@ class TasksetEntry(object):
 
         return entry
 
-    def merge_entries(self, new_entry: dict, osd_id: int):
+    def merge_entries(self, new_entry: dict, osd_id):
         """
         Merges (via set union) with the new entry (eg. OSD num)
         keys of the new_entry are cpuid
         """
-        for k in new_entry.keys():
+        for k in new_entry.keys():  # cpuid
             entry = self.entries[osd_id]
             if k not in entry:
                 entry[k] = new_entry[k]
             else:
                 entry[k].update(new_entry[k])
 
-    def set_cpu_in_grid(self, cpuid: int, cpuset, osd_id: int):
+    def set_cpu_in_grid(self, cpuid: int, cpuset, osd_id):
         """
         Given a cpuid and its contents, set the corresponding CpuGrid
-        for given OSD
+        for a given OSD.
         """
-        vstr = self._get_str(cpuset, osd_id)
         sindex, is_phys = self.lscpu.get_socket(cpuid)
         grid = self.sockets[sindex]
-        grid.set_cell(cpuid, is_phys, vstr)
+        osd = self.get_osd_num(osd_id)
+        logger.debug(f"set_cpu_in_grid:{sindex}, {is_phys}")
+        grid.set_cell(cpuid, osd, cpuset, is_phys)
+
+    def get_osd_num(self, osd_id):
+        """
+        Extract the OSD number from the OSD id.
+        """
+        num = 0  # default
+        regex = re.compile(r"^osd_(\d+)$")
+        m = regex.search(osd_id)
+        if m:
+            num = m.group(1)
+        return num
 
     def get_osd_id(self, setup: str):
         """
-        Extract the OSD number from the filename (which should follow the expected convention)
+        Extract the OSD number from the filename (which should follow the expected convention).
         """
-        osd_id = 0  # default
-        regex = re.compile(r"^osd_(\d+).*$")
+        osd_id = "osd_0"  # default
+        regex = re.compile(r"^(osd_\d+).*$")
         m = regex.search(setup)
         if m:
-            osd_id = int(m.group(1))
+            osd_id = m.group(1)
         return osd_id
 
-    def update_grid(self, setup: str, osd_id: int):
+    def update_grid(self, setup: str, osd_id):
         """
-        Update the sockets grid for the given setup
+        Update the sockets grid for the given setup that is indicated per OSD process.
         """
-        print(f"== {setup} ==")  # OSD process
-
-        for _s in self.sockets:
-            _s.set_header()
+        print(f"== {setup} ==")
+        logger.debug(f"== {setup} ==")
 
         entry = self.entries[osd_id]
         for cpuid, cpuset in entry.items():
             self.set_cpu_in_grid(int(cpuid), cpuset, osd_id)
+        for _s in self.sockets:
+            _s.show_grid()
 
     def show_grid(self):
         """
@@ -434,11 +497,11 @@ class TasksetEntry(object):
         each line.
         """
         for _s in self.sockets:
+            _s.set_header()
             _s.make_grid()
-            _s.show_grid()
 
         # join the grid lines per socket: this should be done in a more Pythonic way...
-        for i in range(self.sockets[0].get_num_lines()):
+        for _ in range(self.sockets[0].get_num_lines()):
             line = " + ".join(_s.next() for _s in self.sockets)
             print(line)
 
@@ -458,23 +521,17 @@ class TasksetEntry(object):
             config_file.close()
 
         self.osd_num = len(out_files)
-        for entry in range(self.osd_num):
-            self.entries.append({})
-
         print(f"loading {len(out_files)} .out files ...")
 
-        # pp = pprint.PrettyPrinter(width=41, compact=True)
         # The number of files should be the same as the number of OSDs
         for fname in out_files:
             # Extract the OSD id from fname
-            cpuNodeList = self.parse(fname)
+            cpuNodeDict = self.parse(fname)
             osd_id = self.get_osd_id(fname)
-            # pp.pprint(cpuNodeList)# Ok
-            # merged = {**self.entries, **cpuNodeList }
-            # Show the grid for this fname
-            self.merge_entries(cpuNodeList, osd_id)
+            if osd_id not in self.entries:
+                self.entries[osd_id] = {}
+            self.merge_entries(cpuNodeDict, osd_id)
             self.update_grid(fname, osd_id)
-        # logger.debug(f"Got entries: {self.entries}:")
 
     def run(self):
         """
@@ -489,8 +546,10 @@ class TasksetEntry(object):
                     {
                         "phy_start": self.lscpu.get_physical_start(s),
                         "ht_start": self.lscpu.get_ht_start(s),
-                        "num_cores": self.lscpu.get_num_physical(s),
+                        "num_cores": self.lscpu.get_num_physical(),
                     },
+                    # Use this to calculate the size of grid
+                    self.lscpu.get_num_logical_cpus(),
                 )
             )
         self.traverse_files()
@@ -504,8 +563,10 @@ def main(argv):
     # Produce a CPU distribution visualisation grid for a single file:
         %prog -c osd_0_crimson_1osd_16reactor_256at_8fio_lt_disable_ht_threads.out
 
-    # Produce a CPU distribution visualisation grid for a _list _of files:
-        %prog -c crimson_1osd_16reactor_lt_disable_list
+    # Produce a CPU distribution visualisation grid for a _list _of files (located in /tmp) and the output
+        from ps and taskset:
+        %prog -v -c crimson_1osd_16reactor_lt_disable_list -d /tmp -u numa_nodes.json 
+
     """
     parser = argparse.ArgumentParser(
         description="""This tool is used to parse output from the combined taskset and ps commands""",
@@ -521,15 +582,23 @@ def main(argv):
         help="Input file: either containing a _list_ of _threads.out files, or a single .out file",
         default=None,
     )
-    # load the NUMA summary -- or lscpu --json
-    parser.add_argument(
+    # Load the NUMA CPU summary from lscpu --json
+    cmd_grp = parser.add_mutually_exclusive_group()
+    cmd_grp.add_argument(
         "-u",
         "--lscpu",
         type=str,
-        required=True,
         help="Input file: .json file produced by lscpu --json",
         default=None,
     )
+    cmd_grp.add_argument(
+        "-t",
+        "--taskset",
+        type=str,
+        help="The taskset argument of the parent process (eg. vstart)",
+        default=None,
+    )
+
     parser.add_argument(
         "-i",
         "--client",
@@ -560,13 +629,17 @@ def main(argv):
 
     with tempfile.NamedTemporaryFile(dir="/tmp", delete=False) as tmpfile:
         logging.basicConfig(filename=tmpfile.name, encoding="utf-8", level=logLevel)
-        # print(f"logname: {tmpfile.name}")
 
     logger.debug(f"Got options: {options}")
 
     os.chdir(options.directory)
+
     grid = TasksetEntry(
-        options.config, options.directory, options.client, options.lscpu
+        options.config,
+        options.directory,
+        options.client,
+        options.lscpu,
+        options.taskset,
     )
     grid.run()