]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
test/rgw: use bucket for data checkpoint instead of data 8190/head
authorYehuda Sadeh <yehuda@redhat.com>
Mon, 21 Mar 2016 23:52:14 +0000 (16:52 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Wed, 23 Mar 2016 17:35:28 +0000 (10:35 -0700)
data checkpoint is not accurate, as data log might not show repeating changes.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/test/rgw/test_multi.py

index dce6c0ebf683a553c570d9271bd48df52bfd4d41..92460d74f05128b002fe4c13cc81422a557faf43 100644 (file)
@@ -304,6 +304,30 @@ class RGWRealm:
 
         return (num_shards, markers)
 
+    def bucket_sync_status(self, target_zone, source_zone, bucket_name):
+        if target_zone.zone_name == source_zone.zone_name:
+            return None
+
+        while True:
+            (bucket_sync_status_json, retcode) = target_zone.cluster.rgw_admin_ro('--rgw-realm=' + self.realm +
+                                                                                ' bucket sync status --source-zone=' + source_zone.zone_name +
+                                                                                ' --bucket=' + bucket_name, check_retcode = False)
+            if retcode == 0:
+                break
+
+            assert(retcode == 2) # ENOENT
+
+        log(20, 'current bucket sync status=', bucket_sync_status_json)
+        sync_status = json.loads(bucket_sync_status_json)
+
+        markers={}
+        for entry in sync_status:
+            val = entry['val']
+            pos = val['inc_marker']['position'].split('#')[-1] # get rid of shard id; e.g., 6#00000000002.132.3 -> 00000000002.132.3
+            markers[entry['key']] = pos
+
+        return markers
+
     def data_source_log_status(self, source_zone):
         source_cluster = source_zone.cluster
         (datalog_status_json, retcode) = source_cluster.rgw_admin_ro('--rgw-realm=' + self.realm + ' datalog status')
@@ -319,6 +343,27 @@ class RGWRealm:
 
         return markers
 
+    def bucket_source_log_status(self, source_zone, bucket_name):
+        source_cluster = source_zone.cluster
+        (bilog_status_json, retcode) = source_cluster.rgw_admin_ro('--rgw-realm=' + self.realm + ' bilog status --bucket=' + bucket_name)
+        bilog_status = json.loads(bilog_status_json)
+
+        m={}
+        markers={}
+        try:
+            m = bilog_status['markers']
+        except:
+            pass
+
+        for s in m:
+            key = s['key']
+            val = s['val']
+            markers[key] = val
+
+        log(20, 'bilog markers for zone=', source_zone.zone_name, ' bucket=', bucket_name, ' markers=', markers)
+
+        return markers
+
     def compare_data_status(self, target_zone, source_zone, log_status, sync_status):
         if len(log_status) != len(sync_status):
             log(10, 'len(log_status)=', len(log_status), ' len(sync_status)=', len(sync_status))
@@ -337,6 +382,24 @@ class RGWRealm:
 
         return True
 
+    def compare_bucket_status(self, target_zone, source_zone, bucket_name, log_status, sync_status):
+        if len(log_status) != len(sync_status):
+            log(10, 'len(log_status)=', len(log_status), ' len(sync_status)=', len(sync_status))
+            return False
+
+        msg =  ''
+        for i, l, s in zip(log_status, log_status.itervalues(), sync_status.itervalues()):
+            if l > s:
+                if len(s) != 0:
+                    msg += ', '
+                msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
+
+        if len(msg) > 0:
+            log(1, 'bucket ', bucket_name, ' zone ', target_zone.zone_name, ' behind zone ', source_zone.zone_name, ': ', msg)
+            return False
+
+        return True
+
     def zone_data_checkpoint(self, target_zone, source_zone):
         if target_zone.zone_name == source_zone.zone_name:
             return
@@ -357,6 +420,26 @@ class RGWRealm:
 
         log(10, 'finished data checkpoint for target_zone=', target_zone.zone_name, ' source_zone=', source_zone.zone_name)
 
+    def zone_bucket_checkpoint(self, target_zone, source_zone, bucket_name):
+        if target_zone.zone_name == source_zone.zone_name:
+            return
+
+        log(10, 'starting bucket checkpoint for target_zone=', target_zone.zone_name, ' source_zone=', source_zone.zone_name, ' bucket_name=', bucket_name)
+
+        while True:
+            log_status = self.bucket_source_log_status(source_zone, bucket_name)
+            sync_status = self.bucket_sync_status(target_zone, source_zone, bucket_name)
+
+            log(20, 'log_status=', log_status)
+            log(20, 'sync_status=', sync_status)
+
+            if self.compare_bucket_status(target_zone, source_zone, bucket_name, log_status, sync_status):
+                break
+
+            time.sleep(5)
+
+        log(10, 'finished bucket checkpoint for target_zone=', target_zone.zone_name, ' source_zone=', source_zone.zone_name, ' bucket_name=', bucket_name)
+
 
     def create_user(self, user, wait_meta = True):
         log(5, 'creating user uid=', user.uid)
@@ -595,14 +678,14 @@ def test_object_sync():
 
     realm.meta_checkpoint()
 
-    for source_zone, bucket_name in zone_bucket.iteritems():
+    for source_zone, bucket in zone_bucket.iteritems():
         for target_zone in all_zones:
             if source_zone.zone_name == target_zone.zone_name:
                 continue
 
-            realm.zone_data_checkpoint(target_zone, source_zone)
+            realm.zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
 
-            check_bucket_eq(source_zone, target_zone, bucket_name)
+            check_bucket_eq(source_zone, target_zone, bucket)
 
 def test_object_delete():
     buckets, zone_bucket = create_bucket_per_zone()
@@ -615,33 +698,33 @@ def test_object_delete():
     content = 'asdasd'
 
     # don't wait for meta sync just yet
-    for zone, bucket_name in zone_bucket.iteritems():
-        k = new_key(zone, bucket_name, objname)
+    for zone, bucket in zone_bucket.iteritems():
+        k = new_key(zone, bucket, objname)
         k.set_contents_from_string(content)
 
     realm.meta_checkpoint()
 
     # check object exists
-    for source_zone, bucket_name in zone_bucket.iteritems():
+    for source_zone, bucket in zone_bucket.iteritems():
         for target_zone in all_zones:
             if source_zone.zone_name == target_zone.zone_name:
                 continue
 
-            realm.zone_data_checkpoint(target_zone, source_zone)
+            realm.zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
 
-            check_bucket_eq(source_zone, target_zone, bucket_name)
+            check_bucket_eq(source_zone, target_zone, bucket)
 
     # check object removal
-    for source_zone, bucket_name in zone_bucket.iteritems():
-        k = get_key(source_zone, bucket_name, objname)
+    for source_zone, bucket in zone_bucket.iteritems():
+        k = get_key(source_zone, bucket, objname)
         k.delete()
         for target_zone in all_zones:
             if source_zone.zone_name == target_zone.zone_name:
                 continue
 
-            realm.zone_data_checkpoint(target_zone, source_zone)
+            realm.zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
 
-            check_bucket_eq(source_zone, target_zone, bucket_name)
+            check_bucket_eq(source_zone, target_zone, bucket)
 
 def test_multi_period_incremental_sync():
     if len(realm.clusters) < 3:
@@ -692,14 +775,14 @@ def test_multi_period_incremental_sync():
     realm.meta_checkpoint()
 
     # verify that we end up with the same objects
-    for source_zone, bucket_name in zone_bucket.iteritems():
+    for source_zone, bucket in zone_bucket.iteritems():
         for target_zone in all_zones:
             if source_zone.zone_name == target_zone.zone_name:
                 continue
 
-            realm.zone_data_checkpoint(target_zone, source_zone)
+            realm.zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
 
-            check_bucket_eq(source_zone, target_zone, bucket_name)
+            check_bucket_eq(source_zone, target_zone, bucket)
 
 
 def init(parse_args):