]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
qa/tasks: Add a task which performs an offline check of the consistency of parity... 59903/head
authorConnor Fawcett <connorfa@uk.ibm.com>
Mon, 9 Dec 2024 17:02:11 +0000 (17:02 +0000)
committerConnor Fawcett <connorfa@uk.ibm.com>
Tue, 27 May 2025 16:18:36 +0000 (17:18 +0100)
Add a Python script which can be used to scan a Ceph cluster, find any erasure coded data objects and
check them for consistency. This is achieved by reading the data shards for a given object, running the data shards
through the existing EC tool and verifying the output matches the parity shards stored on the OSDs.

This commit adds a new teuthology task but does not add it to any YAMLs currently, this work will be
expanded on in future commits.

Signed-off-by: Connor Fawcett <connorfa@uk.ibm.com>
Fixes: https://tracker.ceph.com/issues/71412
qa/tasks/ec_parity_consistency.py [new file with mode: 0755]
qa/tasks/tests/requirements.txt [new file with mode: 0644]
qa/tasks/tests/test_parity_consistency.py [new file with mode: 0644]

diff --git a/qa/tasks/ec_parity_consistency.py b/qa/tasks/ec_parity_consistency.py
new file mode 100755 (executable)
index 0000000..aa7207c
--- /dev/null
@@ -0,0 +1,730 @@
+"""
+Use this task to check that parity shards in an EC pool
+match the output produced by the Ceph Erasure Code Tool.
+"""
+
+import logging
+import json
+import os
+import shutil
+import time
+import atexit
+from io import StringIO
+from io import BytesIO
+from typing import Dict, List, Any
+from tasks import ceph_manager
+from teuthology import misc as teuthology
+
+log = logging.getLogger(__name__)
+DATA_SHARD_FILENAME = 'ec-obj'
+
+
+class ErasureCodeObject:
+    """
+    Store data relating to an RBD erasure code object,
+    including the object's erasure code profile as well as
+    the data for k + m shards.
+    """
+
+    def __init__(self, oid: str, snapid: int, ec_profile: Dict[str, Any]):
+        self.oid = oid
+        self.snapid = snapid
+        self.uid = oid + '_' + str(snapid)
+        self.ec_profile = ec_profile
+        self.k = int(ec_profile["k"])
+        self.m = int(ec_profile["m"])
+        self.shards: List[bytearray] = [bytearray()] * (self.k + self.m)
+        self.jsons: List[str] = [""] * (self.k + self.m)
+        self.osd_ids: List[int] = [-1] * (self.k + self.m)
+        self.object_size = 0
+
+    def get_ec_tool_profile(self) -> str:
+        """
+        Return the erasure code profile associated with the object
+        in string format suitable to be fed into the erasure code tool
+        """
+        profile_str = ''
+        for key, value in self.ec_profile.items():
+            profile_str += str(key) + '=' + str(value) + ','
+        return profile_str[:-1]
+
+    def get_shards_to_encode_str(self) -> str:
+        """
+        Return a comma-seperated string of the shards
+        to be encoded by the EC tool.
+        This includes k + m shards as tool also produces the data shards
+        """
+        nums = "0,"
+        for i in range(1, self.k + self.m):
+            nums += (str(i) + ",")
+        return nums[:-1]
+
+    def update_shard(self, index: int, data: bytearray):
+        """
+        Update a shard at the specified index
+        """
+        self.shards[index] = data
+
+    def get_data_shards(self) -> List[bytearray]:
+        """
+        Return a list of the data shards.
+        Shards will always be returned in the same order they are
+        arranged in within the corresponding ceph object.
+        """
+        return self.shards[:self.k]
+
+    def get_parity_shards(self) -> List[bytearray]:
+        """
+        Return a list of the parity shards.
+        Shards will always be returned in the same order they are
+        arranged in within the corresponding ceph object.
+        """
+        return self.shards[self.k:self.k + self.m]
+
+    def write_data_shards_to_file(self, filepath: str, remote: Any = None):
+        """
+        Write the data shards to files for
+        consumption by Erasure Code tool.
+        Write to remote if remote is specified.
+        """
+        shards = self.get_data_shards()
+        assert None not in shards, "Object is missing data shards"
+        data = bytearray()
+        for shard in shards:
+            data += shard
+        if remote:
+            bytess = bytes(data)
+            remote.write_file(filepath, BytesIO(bytess),
+                              mkdir=True, append=False)
+        else:
+            with open(filepath, "wb") as binary_file:
+                binary_file.write(data)
+
+    def delete_shards(self):
+        """
+        Free up memory used by the shards for this object
+        """
+        self.shards = [None] * (self.k + self.m)
+
+    def does_shard_match_file(self, index: int, filepath: str,
+                              remote: Any = None) -> bool:
+        """
+        Compare shard at specified index with contents of the supplied file
+        If remote is specified fetch the file and make a local copy
+        Return True if they match, False otherwise
+        """
+        shard_data = self.shards[index]
+        file_content = bytearray()
+        if remote:
+            remote.get_file(filepath, False, os.path.dirname(filepath))
+        with open(filepath, "rb") as binary_file:
+            b = binary_file.read()
+            file_content.extend(b)
+        return shard_data == file_content
+
+    def compare_parity_shards_to_files(self, filepath: str,
+                                       remote: Any = None) -> bool:
+        """
+        Check the object's parity shards match the files generated
+        by the erasure code tool. Return True if they match, False otherwise.
+        """
+        do_all_shards_match = True
+        for i in range(self.k, self.k + self.m):
+            shard_filename = filepath + '.' + str(i)
+            match = self.does_shard_match_file(i, shard_filename, remote)
+            if match:
+                log.debug("Shard %i in object %s matches file content",
+                          i,
+                          self.uid)
+            else:
+                log.debug("MISMATCH: Shard %i in object "
+                          "%s does not match file content",
+                          i,
+                          self.uid)
+                do_all_shards_match = False
+        return do_all_shards_match
+
+
+class ErasureCodeObjects:
+    """
+    Class for managing objects of type ErasureCodeObject
+    Constuctor
+    Takes an optional list of oids to check if specified; any
+    objects not on the list will not be checked
+    """
+    def __init__(self, manager: ceph_manager.CephManager,
+                 config: Dict[str, Any]):
+        self.manager = manager
+        self.os_tool = ObjectStoreTool(manager)
+        self.pools_json = self.manager.get_osd_dump_json()["pools"]
+        self.objects_to_include = config.get('object_list', None)
+        self.pools_to_check = config.get('pools_to_check', None)
+        self.ec_profiles: Dict[int, Any] = {}
+        self.objects: List[ErasureCodeObject] = []
+
+    def has_object_with_uid(self, object_id: str) -> bool:
+        """
+        Return true if an object with the supplied object ID is found.
+        """
+        for obj in self.objects:
+            if obj.uid == object_id:
+                return True
+        return False
+
+    def get_object_by_uid(self, object_id: str) -> ErasureCodeObject:
+        """
+        Return the ErasureCodeObject corresponding to the supplied
+        UID if it exists. Assert if no object is found.
+        """
+        for obj in self.objects:
+            if obj.uid == object_id:
+                return obj
+        assert False, "Error: Object with UID not found"
+
+    def create_ec_object(self, oid: str, snapid: int,
+                         ec_profile: Dict[str, Any]) -> ErasureCodeObject:
+        """
+        Create a new ErasureCodeObject and add it to the list
+        """
+        ec_object = ErasureCodeObject(oid, snapid, ec_profile)
+        self.objects.append(ec_object)
+        return ec_object
+
+    def update_object_shard(self, object_id: str,
+                            shard_id: int, data: bytearray):
+        """
+        Update a shard of an existing ErasureCodeObject
+        """
+        ec_object = self.get_object_by_uid(object_id)
+        ec_object.update_shard(shard_id, data)
+
+    def get_object_uid(self, info_dump: Dict[str, Any]) -> str:
+        """
+        Returns a unique ID for an object, a combination of the oid and snapid
+        """
+        return info_dump["oid"] + "_" + str(info_dump["snapid"])
+
+    def is_object_in_pool_to_be_checked(self, object_json: Dict[str, Any]) -> bool:
+        """
+        Check if an object is a member of pool
+        that is to be checked for consistency.
+        """
+        if not self.pools_to_check:
+            return True  # All pools to be checked
+        shard_pool_id = object_json["pool"]
+        for pool_json in self.pools_json:
+            if shard_pool_id == pool_json["pool"]:
+                if pool_json["pool_name"] in self.pools_to_check:
+                    return True
+        return False
+
+    def is_object_in_ec_pool(self, object_json: Dict[str, Any]):
+        """
+        Check if an object is a member of an EC pool or not.
+        """
+        is_object_in_ec_pool = False
+        shard_pool_id = object_json["pool"]
+        for pool_json in self.pools_json:
+            if shard_pool_id == pool_json["pool"]:
+                pool_type = pool_json['type']  # 1 for rep, 3 for ec
+                if pool_type == 3:
+                    is_object_in_ec_pool = True
+                    break
+        return is_object_in_ec_pool
+
+    def is_ec_plugin_supported(self, object_json: Dict[str, Any]) -> bool:
+        """
+        Return true if the plugin in the EC profile for the object's pool is
+        supported by consistency checker, otherwise false.
+        """
+        shard_pool_id = object_json["pool"]
+        ec_profile = self.get_ec_profile_for_pool(shard_pool_id)
+        supported_plugins = ['jerasure', 'isa']
+        return ec_profile['plugin'] in supported_plugins
+
+    def get_ec_profile_for_pool(self,
+                                pool_id: int) -> Dict[str, Any]:
+        """
+        Find and return the EC profile for a given pool.
+        Cache it locally if not already stored.
+        """
+        ec_profile_json = ""
+        if pool_id in self.ec_profiles:
+            return self.ec_profiles[pool_id]
+        for pool_json in self.pools_json:
+            if pool_id == pool_json["pool"]:
+                ec_profile_name = self.manager.get_pool_property(
+                    pool_json["pool_name"], "erasure_code_profile")
+                ec_profile_json = self.manager.raw_cluster_cmd(
+                    "osd",
+                    "erasure-code-profile",
+                    "get",
+                    ec_profile_name,
+                    "--format=json"
+                )
+                break
+        try:
+            ec_profile = json.loads(ec_profile_json)
+            self.ec_profiles[pool_id] = ec_profile
+        except ValueError as e:
+            log.error("Failed to parse object dump to JSON: %s", e)
+        return self.ec_profiles[pool_id]
+
+
+    def process_object_shard_data(self, ec_object: ErasureCodeObject) -> bool:
+        """
+        Use the Object Store tool to get object info and the bytes data
+        for all shards in an object, returns true if successful
+        """
+        for (json_str, osd_id) in zip(ec_object.jsons, ec_object.osd_ids):
+            shard_info = self.os_tool.get_shard_info_dump(osd_id, json_str)
+            shard_data = self.os_tool.get_shard_bytes(osd_id, json_str)
+            shard_index = shard_info["id"]["shard_id"]
+            shard_whited_out = ("whiteout" in shard_info["info"]["flags"])
+            if shard_whited_out:
+                log.info("Found whiteout shard, skipping.")
+                return False
+            ec_object.object_size = shard_info["hinfo"]["total_chunk_size"]
+            ec_object.update_shard(shard_index, shard_data)
+        return True
+
+    def process_object_json(self, osd_id: int, object_json: List[Any]):
+        """
+        Create an ErasureCodeObject from JSON list output
+        Don't populate data and other info yet as that requires
+        slow calls to the ObjectStore tool
+        """
+        json_str = json.dumps(object_json)
+        json_section = object_json[1]
+        object_oid = json_section["oid"]
+        object_snapid = json_section["snapid"]
+        object_uid = object_oid + '_' + str(object_snapid)
+        if (self.has_object_with_uid(object_uid)):
+            ec_object = self.get_object_by_uid(object_uid)
+            if (self.objects_to_include and
+                object_oid not in self.objects_to_include):
+                return
+        else:
+            shard_pool_id = json_section["pool"]
+            ec_profile = self.get_ec_profile_for_pool(shard_pool_id)
+            ec_object = self.create_ec_object(object_oid,
+                                              object_snapid, ec_profile)
+        shard_id = json_section["shard_id"]
+        ec_object.osd_ids[shard_id] = osd_id
+        ec_object.jsons[shard_id] = json_str
+
+
+class ObjectStoreTool:
+    """
+    Interface for running the Object Store Tool, contains functions
+    for retreiving information and data from OSDs
+    """
+    def __init__(self, manager: ceph_manager.CephManager):
+        self.manager = manager
+        self.fspath = self.manager.get_filepath()
+
+    def run_objectstore_tool(self, osd_id: int, cmd: List[str],
+                             string_out: bool = True):
+        """
+        Run the ceph objectstore tool.
+        Execute the objectstore tool with the supplied arguments
+        in cmd on the machine where the specified OSD lives.
+        """
+        remote = self.manager.find_remote("osd", osd_id)
+        data_path = self.fspath.format(id=osd_id)
+        if self.manager.cephadm:
+            return shell(
+                self.manager.ctx,
+                self.manager.cluster,
+                remote,
+                args=[
+                    "ceph-objectstore-tool",
+                    "--err-to-stderr",
+                    "--no-mon-config",
+                    "--data-path",
+                    data_path
+                ]
+                + cmd,
+                name="osd" + str(osd_id),
+                wait=True,
+                check_status=False,
+                stdout=StringIO() if string_out else BytesIO(),
+                stderr=StringIO()
+            )
+        elif self.manager.rook:
+            assert False, "not implemented"
+        else:
+            return remote.run(
+                args=[
+                    "sudo",
+                    "adjust-ulimits",
+                    "ceph-objectstore-tool",
+                    "--err-to-stderr",
+                    "--no-mon-config",
+                    "--data-path",
+                    data_path
+                ]
+                + cmd,
+                wait=True,
+                check_status=False,
+                stdout=StringIO() if string_out else BytesIO(),
+                stderr=StringIO()
+            )
+
+    def get_ec_data_objects(self, osd_id: int) -> List[Any]:
+        """
+        Return list of erasure code objects living on this OSD.
+        """
+        objects = []
+        proc = self.run_objectstore_tool(osd_id, ["--op", "list"])
+        stdout = proc.stdout.getvalue()
+        if not stdout:
+            log.error("Objectstore tool failed with error "
+                      "when retrieving list of data objects")
+        else:
+            for line in stdout.split('\n'):
+                if line:
+                    try:
+                        shard = json.loads(line)
+                        if self.is_shard_part_of_ec_object(shard):
+                            objects.append(shard)
+                    except ValueError as e:
+                        log.error("Failed to parse shard list to JSON: %s", e)
+        return objects
+
+    def get_shard_info_dump(self, osd_id: int,
+                            json_str: str) -> Dict[str, Any]:
+        """
+        Return the JSON formatted shard information living on specified OSD.
+        json_str is the line of the string produced by the OS tool 'list'
+        command which corresponds to a given shard
+        """
+        shard_info = {}
+        proc = self.run_objectstore_tool(osd_id, ["--json", json_str, "dump"])
+        stdout = proc.stdout.getvalue()
+        if not stdout:
+            log.error("Objectstore tool failed with error "
+                      "when dumping object info.")
+        else:
+            try:
+                shard_info = json.loads(stdout)
+            except ValueError as e:
+                log.error("Failed to parse object dump to JSON: %s", e)
+        return shard_info
+
+    def get_shard_bytes(self, osd_id: int, object_id: str) -> bytearray:
+        """
+        Return the contents of the shard living on the specified OSD as bytes.
+        """
+        shard_bytes = bytearray()
+        proc = self.run_objectstore_tool(osd_id,
+                                         [object_id, "get-bytes"], False)
+        stdout = proc.stdout.getvalue()
+        if not stdout:
+            log.error("Objectstore tool failed to get shard bytes.")
+        else:
+            shard_bytes = bytearray(stdout)
+        return shard_bytes
+
+    def is_shard_part_of_ec_object(self, shard: List[Any]):
+        """
+        Perform some checks on a shard to determine if it's actually a shard
+        in a valid EC object that should be checked for consistency. Attempts
+        to exclude scrub objects, trash and other various metadata objects.
+        """
+        pgid = shard[0]
+        shard_info = shard[1]
+        object_is_sharded = 's' in pgid
+        shard_has_oid = shard_info["oid"] != ''
+        shard_has_pool = shard_info["pool"] >= 0
+        shard_is_not_trash = "trash" not in shard_info["oid"]
+        shard_is_not_info = "info" not in shard_info["oid"]
+        if (object_is_sharded and shard_has_oid and
+           shard_has_pool and shard_is_not_trash and
+           shard_is_not_info):
+            return True
+        else:
+            return False
+
+
+class ErasureCodeTool:
+    """
+    Interface for running the Ceph Erasure Code Tool
+    """
+    def __init__(self, manager: ceph_manager.CephManager, remote: Any):
+        self.manager = manager
+        self.remote = remote
+
+    def run_erasure_code_tool(self, cmd: List[str]):
+        """
+        Run the ceph erasure code tool with the arguments in the supplied list
+        """
+        args = ["sudo", "adjust-ulimits", "ceph-erasure-code-tool"] + cmd
+        if self.manager.cephadm:
+            return shell(
+                self.manager.ctx,
+                self.manager.cluster,
+                self.remote,
+                args=args,
+                name="",
+                wait=True,
+                check_status=False,
+                stdout=StringIO(),
+                stderr=StringIO(),
+            )
+        elif self.manager.rook:
+            assert False, "not implemented"
+        else:
+            return self.remote.run(
+                args=args,
+                wait=True,
+                check_status=False,
+                stdout=StringIO(),
+                stderr=StringIO(),
+            )
+
+    def calc_chunk_size(self, profile: str, object_size: str) -> int:
+        """
+        Returns the chunk size for the given profile and object size
+        """
+        cmd = ["calc-chunk-size", profile, object_size]
+        proc = self.run_erasure_code_tool(cmd)
+        if not proc.stdout:
+            log.error("Erasure Code tool failed to calculate chunk size: %s",
+                      proc.stderr)
+        return proc.stdout
+
+    def encode(self, profile: str, stripe_unit: int,
+               file_nums: str, filepath: str):
+        """
+        Encode the specified file using the erasure code tool
+        Output will be written to files in the same directory
+        """
+        cmd = ["encode", profile, str(stripe_unit), file_nums, filepath]
+        proc = self.run_erasure_code_tool(cmd)
+        if proc.returncode != 0:
+            log.error("Erasure Code tool failed to encode: %s", proc.stderr)
+
+    def decode(self, profile: str, stripe_unit: int,
+               file_nums: str, filepath: str):
+        """
+        Decode the specified file using the erasure code tool
+        Output will be written to files in the same directory
+        """
+        cmd = ["decode", profile, str(stripe_unit), file_nums, filepath]
+        proc = self.run_erasure_code_tool(cmd)
+        if proc.returncode != 0:
+            log.error("Erasure Code tool failed to decode: %s", proc.stderr)
+
+
+def shell(ctx: Any, cluster_name: str, remote: Any,
+          args: List[str], name: str = "", **kwargs: Any):
+    """
+    Interface for running commands on cephadm clusters
+    """
+    extra_args = []
+    if name:
+        extra_args = ['-n', name]
+    return remote.run(
+        args=[
+            'sudo',
+            ctx.cephadm,
+            '--image', ctx.ceph[cluster_name].image,
+            'shell',
+        ] + extra_args + [
+            '--fsid', ctx.ceph[cluster_name].fsid,
+            '--',
+        ] + args,
+        **kwargs
+    )
+
+
+def get_tmp_directory():
+    """
+    Returns a temporary directory name that will be used to store shard data
+    Includes the PID so different instances can be run in parallel
+    """
+    tmpdir = '/var/tmp/consistency-check-' + str(os.getpid()) + '/'
+    return tmpdir
+
+
+def handle_mismatch(assert_on_mismatch: bool):
+    """
+    Raise a RuntimeError if assert_on_mismatch is set,
+    otherwise just log an error.
+    """
+    err = "Shard mismatch detected."
+    if assert_on_mismatch:
+        raise RuntimeError(err)
+    log.error(err)
+
+
+def revive_osds(manager: ceph_manager.CephManager, osds: List[Dict[str, Any]]):
+    """
+    Revive any OSDs that were killed during the task and
+    clean up any temporary files (temp files both locally and on remote).
+    Optionally retain files for debug if specified by the config
+    """
+    for osd in osds:
+        osd_id = osd["osd"]
+        manager.revive_osd(osd_id, skip_admin_check=False)
+        manager.mark_in_osd(osd_id)
+        manager.wait_till_osd_is_up(osd_id)
+
+
+def clean_up_test_files(remote: Any = None):
+    """
+    Clean any test files that were created
+    both locally and on a remote if specified
+    """
+    tmp_dir = get_tmp_directory()
+    local_dir_exists = os.path.isdir(tmp_dir)
+    if local_dir_exists:
+        shutil.rmtree(tmp_dir)
+    if remote:
+        remote.run(args=["rm", "-rf", tmp_dir])
+
+
+def print_summary(consistent: List[str],
+                  inconsistent: List[str],
+                  skipped: List[str]):
+    """
+    Print a summary including counts of objects checked
+    and a JSON-formatted lists of consistent and inconsistent objects.
+    """
+    log.info("Consistent objects counted: %i", len(consistent))
+    log.info("Inconsistent objects counted %i", len(inconsistent))
+    log.info("Objects skipped: %i", len(skipped))
+    log.info("Total objects checked: %i", len(consistent) + len(inconsistent))
+    if consistent:
+        out = '[' + ','.join("'" + str(o) + "'" for o in consistent) + ']'
+        log.info("Consistent objects: %s", out)
+    if inconsistent:
+        out = '[' + ','.join("'" + str(o) + "'" for o in inconsistent) + ']'
+        log.info("Objects with a mismatch: %s", out)
+    if skipped:
+        out = '[' + ','.join("'" + str(o) + "'" for o in skipped) + ']'
+        log.info("Objects skipped: %s", out)
+
+
+def task(ctx, config: Dict[str, Any]):
+    """
+    Gathers information about EC objects living on the OSDs, then
+    gathers the shards the shard data using the ObjectStore tool.
+    Runs the data shards through the EC tool and verifies the encoded
+    output matches the parity shards on the OSDs.
+
+    Only Jerasure and ISA are supported at this stage.
+    Other plugins may work with some small tweaks but have not been tested.
+
+
+    Accepts the following optional config options:
+
+    ec_parity_consistency:
+        retain_files: <bool> - Keep files gathered during the test in /var/tmp/
+        assert_on_mismatch: <bool> - Whether to count a mismatch as a failure
+        max_run_time: <int> - Max amount of time to run the tool for in seconds
+        object_list: <List[str]> - OID list of which objects to check
+        pools_to_check: <List[str]> - List of pool names to check for objects
+    """
+
+    if config is None:
+        config = {}
+
+    log.info("Python Process ID: %i", os.getpid())
+    first_mon = teuthology.get_first_mon(ctx, config)
+    (mon,) = ctx.cluster.only(first_mon).remotes.keys()
+
+    manager = ceph_manager.CephManager(
+        mon,
+        ctx=ctx,
+        logger=log.getChild("ceph_manager")
+    )
+
+    cephadm_not_supported = "Tool not supported for use with cephadm clusters"
+    assert not manager.cephadm, cephadm_not_supported
+
+    retain_files = config.get('retain_files', False)
+    max_time = config.get('max_run_time', None)
+    assert_on_mismatch = config.get('assert_on_mismatch', True)
+
+    osds = manager.get_osd_dump()
+    ec_remote = manager.find_remote("osd", osds[0]["osd"])
+
+    os_tool = ObjectStoreTool(manager)
+    ec_tool = ErasureCodeTool(manager, ec_remote)
+    ec_objects = ErasureCodeObjects(manager, config)
+    start_time = time.time()
+    consistent: List[str] = []
+    inconsistent: List[str] = []
+    skipped: List[str] = []
+
+    atexit.register(revive_osds, manager, osds)
+    atexit.register(print_summary, consistent, inconsistent, skipped)
+    if not retain_files:
+        atexit.register(clean_up_test_files, ec_remote)
+
+    # Loop through every OSD, storing each object shard in an EC object
+    # Objects not in EC pools or the object_list will be ignored
+    for osd in osds:
+        osd_id = osd["osd"]
+        manager.kill_osd(osd_id)
+        manager.mark_down_osd(osd_id)
+        manager.mark_out_osd(osd_id)
+        data_objects = os_tool.get_ec_data_objects(osd_id)
+        if data_objects:
+            for full_obj_json in data_objects:
+                obj_json = full_obj_json[1]
+                is_in_ec_pool = ec_objects.is_object_in_ec_pool(obj_json)
+                check_pool = ec_objects.is_object_in_pool_to_be_checked(obj_json)
+                plugin_supported = ec_objects.is_ec_plugin_supported(obj_json)
+                if is_in_ec_pool and check_pool and plugin_supported:
+                    ec_objects.process_object_json(osd_id, full_obj_json)
+                else:
+                    log.debug("Object not in pool to be checked, skipping.")
+
+    # Now compute the parities for each object
+    # and verify they match what the EC tool produces
+    for ec_object in ec_objects.objects:
+        time_elapsed = time.time() - start_time
+        if max_time is not None and time_elapsed > max_time:
+            log.info("%i seconds elapsed, stopping "
+                     "due to time limit.", time_elapsed)
+            break
+
+        # Try to process the object shards, skip if
+        # something goes wrong e.g. deleted object
+        object_uid = ec_object.uid
+        if not ec_objects.process_object_shard_data(ec_object):
+            skipped.append(object_uid)
+            continue
+
+        # Create dir and write out shards
+        object_dir = get_tmp_directory() + object_uid + '/'
+        object_filepath = object_dir + DATA_SHARD_FILENAME
+        try:
+            os.makedirs(object_dir)
+        except OSError as e:
+            log.error("Directory '%s' can not be created: %s", object_dir, e)
+        ec_object.write_data_shards_to_file(object_filepath, ec_remote)
+
+        # Encode the shards and output to the object dir
+        want_to_encode = ec_object.get_shards_to_encode_str()
+        ec_profile = ec_object.get_ec_tool_profile()
+        object_size = ec_object.object_size
+        ec_tool.encode(ec_profile,
+                       object_size,
+                       want_to_encode,
+                       object_filepath)
+        # Compare stored parities to EC tool output
+        match = ec_object.compare_parity_shards_to_files(object_filepath,
+                                                         ec_remote)
+        if match:
+            consistent.append(object_uid)
+        else:
+            inconsistent.append(object_uid)
+        # Free up memory consumed by shards
+        ec_object.delete_shards()
+
+    if len(inconsistent) > 0:
+        handle_mismatch(assert_on_mismatch)
diff --git a/qa/tasks/tests/requirements.txt b/qa/tasks/tests/requirements.txt
new file mode 100644 (file)
index 0000000..0a10409
--- /dev/null
@@ -0,0 +1,6 @@
+boto
+gevent
+httplib2
+pytest
+python-dateutil
+teuthology
diff --git a/qa/tasks/tests/test_parity_consistency.py b/qa/tasks/tests/test_parity_consistency.py
new file mode 100644 (file)
index 0000000..81e84c3
--- /dev/null
@@ -0,0 +1,246 @@
+"""
+Unit tests for ec_parity_consistency.py
+
+Tests can be run by invoking pytest from the qa directory:
+
+cd ceph/qa
+python3 -m venv tasks/tests/venv
+source tasks/tests/venv/bin/activate
+pip install -r tasks/tests/requirements.txt
+pytest tasks/tests/test_parity_consistency.py
+deactivate (when done)
+
+"""
+import os
+import random
+import tempfile
+from typing import List
+from unittest.mock import Mock, MagicMock
+from tasks.ec_parity_consistency import ErasureCodeObject, ErasureCodeObjects
+
+
+class TestParityConsistency:
+    """
+    Tests for the classes contributing to ec_parity_consistency.py
+    Mostly covers basic functionality of the ErasureCodeObject class,
+    other classes are quite reliant on a functioning Ceph cluster
+    and would be difficult to meaningfully test with unit tests
+    """
+    def create_dummy_ec_obj(self, k: int,
+                            m: int,
+                            shards: List[bytearray]) -> ErasureCodeObject:
+        """
+        Return an ErasureCodeObject created with the specified k + m
+        values and the supplied shards
+        """
+        rnd = random.randint(0, 999)
+        oid = 'rbd_data.4.1061d2ba6457.000000000000' + str(rnd)
+        snapid = -2
+        ec_obj = ErasureCodeObject(oid, snapid, {'k': k, 'm': m})
+        for i in range(0, k + m):
+            ec_obj.update_shard(i, shards[i])
+        return ec_obj
+
+    def create_random_shards(self, count: int) -> List[bytearray]:
+        """
+        Return a List of shards (bytearrays) filled with random data
+        """
+        shards = []
+        chunk_size = random.randint(1024, 1048576)
+        for _ in range(0, count):
+            shards.append(bytearray(os.urandom(chunk_size)))
+        return shards
+
+    def test_ec_profile_jerasure(self):
+        """
+        Test the EC profile dict is processed and parsed properly for Jerasure pools
+        """
+        profile_dict = {
+            'crush-device-class': '',
+            'crush-failure-domain': 'osd',
+            'crush-num-failure-domains': 3,
+            'crush-osds-per-failure-domain': 3,
+            'crush-root': 'default',
+            'jerasure-per-chunk-alignment': 'false',
+            'k': 4,
+            'm': 2,
+            'plugin': 'jerasure',
+            'technique': '',
+            'w': 8
+        }
+        supported_techniques = [ 'reed_sol_van', 'reed_sol_r6_op', 'cauchy_orig',
+                                  'cauchy_good', 'liberation', 'blaum_roth', 'liber8tion' ]
+        for technique in supported_techniques:
+            profile_dict['technique'] = technique
+            ec_obj = ErasureCodeObject('', 0, profile_dict)
+            profile_str = ec_obj.get_ec_tool_profile()
+            expected_str = ('crush-device-class=,crush-failure-domain=osd,'
+                            'crush-num-failure-domains=3,'
+                            'crush-osds-per-failure-domain=3,'
+                            'crush-root=default,'
+                            'jerasure-per-chunk-alignment=false,k=4,m=2,'
+                            'plugin=jerasure,technique=' + technique + ',w=8')
+            assert profile_str == expected_str
+
+    def test_ec_profile_isa(self):
+        """
+        Test the EC profile dict is processed and parsed properly for ISA-L pools
+        """
+        profile_dict = {
+            'crush-device-class': '',
+            'crush-failure-domain': 'host',
+            'crush-num-failure-domains': 2,
+            'crush-osds-per-failure-domain': 1,
+            'crush-root': 'default',
+            'k': 8,
+            'm': 3,
+            'plugin': 'isa',
+            'technique': ''
+        }
+        supported_techniques = [ 'reed_sol_van', 'cauchy' ]
+        for technique in supported_techniques:
+            profile_dict['technique'] = technique
+            ec_obj = ErasureCodeObject('', 0, profile_dict)
+            profile_str = ec_obj.get_ec_tool_profile()
+            expected_str = ('crush-device-class=,crush-failure-domain=host,'
+                            'crush-num-failure-domains=2,'
+                            'crush-osds-per-failure-domain=1,'
+                            'crush-root=default,k=8,m=3,'
+                            'plugin=isa,technique=' + technique)
+            assert profile_str == expected_str
+
+    def test_want_to_encode_str(self):
+        """
+        Test the want to encode string for the EC tool is formatted correctly
+        """
+        ec_obj = ErasureCodeObject('test', 0, {'k': 1, 'm': 0})
+        strout = ec_obj.get_shards_to_encode_str()
+        assert strout == '0'
+        ec_obj = ErasureCodeObject('test', 0, {'k': 8, 'm': 3})
+        strout = ec_obj.get_shards_to_encode_str()
+        assert strout == '0,1,2,3,4,5,6,7,8,9,10'
+        ec_obj = ErasureCodeObject('test', 0, {'k': 5, 'm': 1})
+        strout = ec_obj.get_shards_to_encode_str()
+        assert strout == '0,1,2,3,4,5'
+        ec_obj = ErasureCodeObject('test', 0, {'k': 12, 'm': 0})
+        strout = ec_obj.get_shards_to_encode_str()
+        assert strout == '0,1,2,3,4,5,6,7,8,9,10,11'
+        ec_obj = ErasureCodeObject('test', 0, {'k': 2, 'm': 0})
+        strout = ec_obj.get_shards_to_encode_str()
+        assert strout == '0,1'
+
+    def test_shard_updates(self):
+        """
+        Check that shards are correctly updated and returned
+        for an EC object.
+        """
+        m = random.randint(1, 3)
+        k = random.randint(m + 1, 12)
+        data_shards = self.create_random_shards(k)
+        parity_shards = self.create_random_shards(m)
+        shards = data_shards + parity_shards
+        ec_obj = self.create_dummy_ec_obj(k, m, shards)
+        obj_data_shards = ec_obj.get_data_shards()
+        obj_parity_shards = ec_obj.get_parity_shards()
+        assert len(obj_data_shards) == k
+        assert len(obj_parity_shards) == m
+        assert data_shards == obj_data_shards
+        assert parity_shards == obj_parity_shards
+
+    def test_delete_shards(self):
+        """
+        Test that the delete shards function sets the
+        shard list elements back to None.
+        """
+        m = random.randint(1, 3)
+        k = random.randint(m + 1, 12)
+        shards = self.create_random_shards(k + m)
+        ec_obj = self.create_dummy_ec_obj(k, m, shards)
+        shards = ec_obj.get_data_shards() + ec_obj.get_parity_shards()
+        for shard in shards:
+            assert shard is not None
+        ec_obj.delete_shards()
+        shards = ec_obj.get_data_shards() + ec_obj.get_parity_shards()
+        for shard in shards:
+            assert shard is None
+
+    def test_comparing_parity_shards(self):
+        """
+        Test the compare parity shards function
+        """
+        m = random.randint(1, 3)
+        k = random.randint(m + 1, 12)
+        shards = self.create_random_shards(k + m)
+        ec_obj = self.create_dummy_ec_obj(k, m, shards)
+        parity_shards = ec_obj.get_parity_shards()
+        tmpdir = tempfile.gettempdir()
+        tmpname = 'ec_obj'
+        for i, shard in enumerate(parity_shards):
+            filepath = tmpdir + '/' + tmpname + '.' + str(i + k)
+            with open(filepath, "wb") as binary_file:
+                binary_file.write(shard)
+                binary_file.close()
+        filepath = tmpdir + '/' + tmpname
+        shards_match = ec_obj.compare_parity_shards_to_files(filepath)
+        assert shards_match is True
+        shards = self.create_random_shards(1)
+        ec_obj.update_shard(k, shards[0])
+        shards_match = ec_obj.compare_parity_shards_to_files(filepath)
+        assert shards_match is False
+
+    def test_writing_data_shards(self):
+        """
+        Test the data shard write function produces a single file
+        of an object's data shards in order
+        """
+        m = random.randint(1, 3)
+        k = random.randint(m + 1, 12)
+        shards = self.create_random_shards(k + m)
+        data_shard = bytearray()
+        for i, shard in enumerate(shards):
+            if i < k:
+                data_shard += shard
+        ec_obj = self.create_dummy_ec_obj(k, m, shards)
+        tmpdir = tempfile.gettempdir()
+        filepath = tmpdir + '/ec_obj'
+        ec_obj.write_data_shards_to_file(filepath)
+        with open(filepath, "rb") as binary_file:
+            contents = binary_file.read()
+        assert bytearray(contents) == data_shard
+        shards = self.create_random_shards(k)
+        data_shard = bytearray()
+        for shard in shards:
+            data_shard += shard
+        assert bytearray(contents) != data_shard
+
+    def test_get_object_by_uid(self):
+        """
+        Test an object can be retrieved from ErasureCodeObjects
+        by its UID
+        """
+        manager = Mock()
+        manager.get_filepath = MagicMock(return_value='/tmp/')
+        manager.get_osd_dump_json = MagicMock(return_value={'pools': ''})
+        ec_objs = ErasureCodeObjects(manager, {})
+        ec_objs.create_ec_object('test', -2, {'k': 2, 'm': 2, 'test_key': 20})
+        ec_obj = ec_objs.get_object_by_uid('test_-2')
+        assert ec_obj.ec_profile['test_key'] == 20
+
+    def test_only_isa_and_jerasure_pools_are_supported(self):
+        """
+        Test only Jerasure and ISA are supported by the consistency checker
+        """
+        manager = Mock()
+        manager.get_filepath = MagicMock(return_value='/tmp/')
+        manager.get_osd_dump_json = MagicMock(return_value={'pools': ''})
+        ec_objs = ErasureCodeObjects(manager, {})
+        supported_plugins = ['jerasure', 'isa']
+        for plugin in supported_plugins:
+            ec_objs.get_ec_profile_for_pool = MagicMock(return_value={'plugin': plugin})
+            is_supported = ec_objs.is_ec_plugin_supported({'pool': 3})
+            assert is_supported == True
+        unsupported_plugins = ['lrc', 'shec', 'clay']
+        for plugin in unsupported_plugins:
+            ec_objs.get_ec_profile_for_pool = MagicMock(return_value={'plugin': plugin})
+            is_supported = ec_objs.is_ec_plugin_supported({'pool': 3})
+            assert is_supported == False