]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
qa: add validation stage for deduplication.py
authorSungmin Lee <ssdohammer@gmail.com>
Tue, 23 Aug 2022 04:51:31 +0000 (13:51 +0900)
committermyoungwon oh <ohmyoungwon@gmail.com>
Mon, 19 Sep 2022 04:21:00 +0000 (13:21 +0900)
To validate sample-dedup actually works, validate() runs
separated thread from sample-dedup and verifies
two following things.
1. check sample-dedup starts properly.
2. check references of all the chunk objects' in chunk tier
   exists in designated base pool.
This routune repeats for max_valication_cnt times while
sample-dedup is running. If it doesn't raise any fail while the loop,
we can pretend sample-dedup works accurately.
If not, assert() will stop this test.

In case that a reference of chunk object doesn't exist in base pool,
validate() gives a second chance after repairing it (chunk-repair op)
to deal with false-positive reference inconsistency.

Signed-off-by: Sungmin Lee <sung_min.lee@samsung.com>
qa/tasks/deduplication.py

index 056c91f4391706c36f6ace72317744ae2a04e2de..ace0c86e144a65e4f3f423315be11b84e79eaf0e 100644 (file)
@@ -5,6 +5,9 @@ import contextlib
 import logging
 import gevent
 from teuthology import misc as teuthology
+import json
+import time
+from io import StringIO
 
 
 from teuthology.orchestra import run
@@ -15,9 +18,7 @@ log = logging.getLogger(__name__)
 def task(ctx, config):
     """
     Run ceph-dedup-tool.
-
     The config should be as follows::
-
         ceph-dedup-tool:
           clients: [client list]
           op: <operation name>
@@ -26,13 +27,10 @@ def task(ctx, config):
           chunk_size: <chunk size>
           chunk_algorithm: <chunk algorithm, fixed|fastcdc>
           fingerprint_algorithm: <fingerprint algorithm, sha1|sha256|sha512>
-          object_dedup_threashold: <#% of duplicate chunks within an object to trigger object dedup>
           chunk_dedup_threashold: <the number of duplicate chunks to trigger chunk dedup>
           max_thread: <the number of threads>
           wakeup_period: <duration>
-
     For example::
-
         tasks:
         - exec:
             client.0:
@@ -42,14 +40,13 @@ def task(ctx, config):
             op: 'sample-dedup'
             pool: 'default.rgw.buckets.data'
             chunk_pool: 'low_tier'
-            chunk_size: 8192
+            chunk_size: 131072
             chunk_algorithm: 'fastcdc'
             fingerprint_algorithm: 'sha1'
-            object_dedup_threshold: 15
             chunk_dedup_threshold: 5
             max_thread: 2
             wakeup_period: 20
-
+            sampling_ratio: 100
     """
     log.info('Beginning deduplication...')
     assert isinstance(config, dict), \
@@ -58,59 +55,171 @@ def task(ctx, config):
     #assert hasattr(ctx, 'rgw')
     testdir = teuthology.get_testdir(ctx)
     args = [
-        'adjust-ulimits',
-        'ceph-coverage',
-        '{tdir}/archive/coverage'.format(tdir=testdir),
         'ceph-dedup-tool']
     if config.get('op', None):
         args.extend(['--op', config.get('op', None)])
     if config.get('chunk_pool', None):
         args.extend(['--chunk-pool', config.get('chunk_pool', None)])
     if config.get('chunk_size', False):
-        args.extend(['--chunk-size', str(config.get('chunk_size', 8192))])
+        args.extend(['--chunk-size', str(config.get('chunk_size', 131072))])
     if config.get('chunk_algorithm', False):
         args.extend(['--chunk-algorithm', config.get('chunk_algorithm', None)] )
     if config.get('fingerprint_algorithm', False):
         args.extend(['--fingerprint-algorithm', config.get('fingerprint_algorithm', None)] )
-    if config.get('object_dedup_threshold', False):
-        args.extend(['--object-dedup-threshold', str(config.get('object_dedup_threshold', 50))])
     if config.get('chunk_dedup_threshold', False):
-        args.extend(['--chunk-dedup-threshold', str(config.get('chunk_dedup_threshold', 5))])
+        args.extend(['--chunk-dedup-threshold', str(config.get('chunk_dedup_threshold', 1))])
     if config.get('max_thread', False):
         args.extend(['--max-thread', str(config.get('max_thread', 2))])
+    if config.get('sampling_ratio', False):
+        args.extend(['--sampling-ratio', str(config.get('sampling_ratio', 100))])
     if config.get('wakeup_period', False):
-        args.extend(['"--wakeup-period"', str(config.get('wakeup_period', 30))])
+        args.extend(['--wakeup-period', str(config.get('wakeup_period', 20))])
     if config.get('pool', False):
         args.extend(['--pool', config.get('pool', None)])
 
     args.extend([
         '--debug',
-        '--deamon',
-        '--iterative'])
+        '--daemon',
+        '--loop'])
 
     def thread():
+        run_remote(args, False, 0)
+
+    def run_remote(args, need_wait, client_num):
         clients = ['client.{id}'.format(id=id_) for id_ in teuthology.all_roles_of_type(ctx.cluster, 'client')]
         log.info('clients are %s' % clients)
-        manager = ctx.managers['ceph']
-        tests = {}
-        log.info("args %s", args)
-        for role in config.get('clients', clients):
-            assert isinstance(role, str)
-            PREFIX = 'client.'
-            assert role.startswith(PREFIX)
-            id_ = role[len(PREFIX):]
+        role = 'client.{id}'.format(id=client_num)
+        if role not in clients:
+            raise Exception('wrong client {c}'.format(c=role))
+        assert isinstance(role, str)
+        PREFIX = 'client.'
+        assert role.startswith(PREFIX)
+        testdir = teuthology.get_testdir(ctx)
+        cmd_args = [
+                'adjust-ulimits',
+                'ceph-coverage',
+                '{tdir}/archive/coverage'.format(tdir=testdir)]
+        cmd_args.extend(args)
+        log.info("cmd: %s", cmd_args)
+        tries = 0
+        while True:
             (remote,) = ctx.cluster.only(role).remotes.keys()
             proc = remote.run(
-                args=args,
-                stdin=run.PIPE,
-                wait=False
+                args=cmd_args,
+                wait=need_wait, check_status=False,
+                stdout=StringIO(),
                 )
-            tests[id_] = proc
+            log.info('exitstatus {r}'.format(r=proc.exitstatus))
+            if proc.exitstatus == 0 or need_wait == False:
+                log.info('proc stdout ', proc.stdout.getvalue())
+                return proc.stdout.getvalue().strip()
+            tries += 1
+            if tries > 30:
+                raise Exception('timed out getting correct exitstatus')
+            time.sleep(30)
+
+    def get_chunk_objs(chunk_pool):
+        chunk_obj_list = run_remote(('rados ls -p ' + chunk_pool).split(), True, 1).split()
+        if chunk_obj_list == False:
+            return None
+        else:
+            return chunk_obj_list
+
+    def get_ref_list(chunk_pool, chunk_obj):
+        # get reference list of chunk object
+        dump_str = run_remote(
+            ('ceph-dedup-tool --op dump-chunk-refs --chunk-pool '
+            + chunk_pool + ' --object ' + chunk_obj).split(),
+            True, 1
+        )
+        # fail in case that reference object is not written
+        assert len(dump_str) > 0
+        log.info('{0} obj has {1} refs'
+            .format(chunk_obj, json.loads(dump_str)['count']))
+
+        # check if chunk object's reference object exists in base-tier
+        ref_list = json.loads(dump_str)['refs']
+        return ref_list
+
+    # To validate whether the sample-dedup operation works well, this function checks if
+    #   1. sample-dedup has been started and
+    #   2. reference of chunk objects' exists in correct base pool
+    def validate():
+        log.info('start validating sample-dedup')
+        base_pool = config.get('pool', None)
+        chunk_pool = config.get('chunk_pool', None)
+        max_validation_cnt = 15
+        retry_cnt = 0
+        # chunk objs for re-validation after chunk-repair
+        retry_chunk_objs = list() 
+
+        # check whether sample-dedup has been started
+        chunk_obj_list = get_chunk_objs(chunk_pool)
+        while (chunk_obj_list == None or len(chunk_obj_list) == 0) and retry_cnt < max_validation_cnt:
+            # retry getting # chunk objs after 30 secs of sleep
+            time.sleep(30)
+            chunk_obj_list = get_chunk_objs(chunk_pool)
+            retry_cnt += 1
+            log.info('chunk pool empty. retry ', retry_cnt)
+        assert retry_cnt < max_validation_cnt
+
+        log.info('sample-dedup started successfully')
+
+        retry_cnt = 0
+        max_validation_cnt = 5
+        # validate chunk pool for max_validation_cnt times
+        while retry_cnt < max_validation_cnt:
+            for chunk_obj in chunk_obj_list:
+                ref_list = get_ref_list(chunk_pool, chunk_obj)
+                for ref in ref_list:
+                    ret = run_remote(
+                        ('rados -p ' + base_pool + ' stat ' + ref['oid'])
+                        .split(), True, 1
+                    )
+                    # check if ref exists in base pool
+                    if ret == False or len(ret) == 0:
+                        # if ref not exists in base pool, try repair in order to avoid 
+                        # false-positive inconsistent reference
+                        ret = run_remote(('ceph osd pool stats ' + base_pool).split(), True, 1)
+                        assert len(ret) > 0
+                        base_pool_id = ret.split()[3]
+                        ret = run_remote(
+                            ('ceph-dedup-tool --op chunk-repair --chunk-pool '
+                            + chunk_pool + ' --object ' + chunk_obj + ' --target-ref ' 
+                            + ref['oid'] + ' --target-ref-pool-id ' + base_pool_id)
+                            .split(), True, 1
+                        )
+                        retry_chunk_objs.append(chunk_obj)
+                    log.info('{0} obj exists in {1}'.format(ref['oid'], base_pool))
+
+            # retry validation for repaired objects
+            for chunk_obj in retry_chunk_objs:
+                ref_list = get_ref_list(chunk_pool, chunk_obj)
+                for ref in ref_list:
+                    ret = run_remote(
+                        ('rados -p ' + base_pool + ' stat ' + ref['oid'])
+                        .split(), True, 1
+                    )
+                    assert len(ret) > 0
+                    log.info(
+                        '{0} obj exists in {1} after repair'.format(ref['oid'], 
+                        base_pool)
+                    )
+            retry_chunk_objs = list()
+
+            # get chunk objects for the next loop
+            chunk_obj_list = get_chunk_objs(chunk_pool)
+            retry_cnt += 1
+            time.sleep(30)
+        return True
+
 
     running = gevent.spawn(thread)
+    checker = gevent.spawn(validate)
 
     try:
         yield
     finally:
         log.info('joining ceph-dedup-tool')
         running.get()
+        checker.get()