return (num_shards, markers)
+ def bucket_sync_status(self, target_zone, source_zone, bucket_name):
+ if target_zone.zone_name == source_zone.zone_name:
+ return None
+
+ while True:
+ (bucket_sync_status_json, retcode) = target_zone.cluster.rgw_admin_ro('--rgw-realm=' + self.realm +
+ ' bucket sync status --source-zone=' + source_zone.zone_name +
+ ' --bucket=' + bucket_name, check_retcode = False)
+ if retcode == 0:
+ break
+
+ assert(retcode == 2) # ENOENT
+
+ log(20, 'current bucket sync status=', bucket_sync_status_json)
+ sync_status = json.loads(bucket_sync_status_json)
+
+ markers={}
+ for entry in sync_status:
+ val = entry['val']
+ pos = val['inc_marker']['position'].split('#')[-1] # get rid of shard id; e.g., 6#00000000002.132.3 -> 00000000002.132.3
+ markers[entry['key']] = pos
+
+ return markers
+
def data_source_log_status(self, source_zone):
source_cluster = source_zone.cluster
(datalog_status_json, retcode) = source_cluster.rgw_admin_ro('--rgw-realm=' + self.realm + ' datalog status')
return markers
+ def bucket_source_log_status(self, source_zone, bucket_name):
+ source_cluster = source_zone.cluster
+ (bilog_status_json, retcode) = source_cluster.rgw_admin_ro('--rgw-realm=' + self.realm + ' bilog status --bucket=' + bucket_name)
+ bilog_status = json.loads(bilog_status_json)
+
+ m={}
+ markers={}
+ try:
+ m = bilog_status['markers']
+ except:
+ pass
+
+ for s in m:
+ key = s['key']
+ val = s['val']
+ markers[key] = val
+
+ log(20, 'bilog markers for zone=', source_zone.zone_name, ' bucket=', bucket_name, ' markers=', markers)
+
+ return markers
+
def compare_data_status(self, target_zone, source_zone, log_status, sync_status):
if len(log_status) != len(sync_status):
log(10, 'len(log_status)=', len(log_status), ' len(sync_status)=', len(sync_status))
return True
+ def compare_bucket_status(self, target_zone, source_zone, bucket_name, log_status, sync_status):
+ if len(log_status) != len(sync_status):
+ log(10, 'len(log_status)=', len(log_status), ' len(sync_status)=', len(sync_status))
+ return False
+
+ msg = ''
+ for i, l, s in zip(log_status, log_status.itervalues(), sync_status.itervalues()):
+ if l > s:
+ if len(s) != 0:
+ msg += ', '
+ msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
+
+ if len(msg) > 0:
+ log(1, 'bucket ', bucket_name, ' zone ', target_zone.zone_name, ' behind zone ', source_zone.zone_name, ': ', msg)
+ return False
+
+ return True
+
def zone_data_checkpoint(self, target_zone, source_zone):
if target_zone.zone_name == source_zone.zone_name:
return
log(10, 'finished data checkpoint for target_zone=', target_zone.zone_name, ' source_zone=', source_zone.zone_name)
+ def zone_bucket_checkpoint(self, target_zone, source_zone, bucket_name):
+ if target_zone.zone_name == source_zone.zone_name:
+ return
+
+ log(10, 'starting bucket checkpoint for target_zone=', target_zone.zone_name, ' source_zone=', source_zone.zone_name, ' bucket_name=', bucket_name)
+
+ while True:
+ log_status = self.bucket_source_log_status(source_zone, bucket_name)
+ sync_status = self.bucket_sync_status(target_zone, source_zone, bucket_name)
+
+ log(20, 'log_status=', log_status)
+ log(20, 'sync_status=', sync_status)
+
+ if self.compare_bucket_status(target_zone, source_zone, bucket_name, log_status, sync_status):
+ break
+
+ time.sleep(5)
+
+ log(10, 'finished bucket checkpoint for target_zone=', target_zone.zone_name, ' source_zone=', source_zone.zone_name, ' bucket_name=', bucket_name)
+
def create_user(self, user, wait_meta = True):
log(5, 'creating user uid=', user.uid)
realm.meta_checkpoint()
- for source_zone, bucket_name in zone_bucket.iteritems():
+ for source_zone, bucket in zone_bucket.iteritems():
for target_zone in all_zones:
if source_zone.zone_name == target_zone.zone_name:
continue
- realm.zone_data_checkpoint(target_zone, source_zone)
+ realm.zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
- check_bucket_eq(source_zone, target_zone, bucket_name)
+ check_bucket_eq(source_zone, target_zone, bucket)
def test_object_delete():
buckets, zone_bucket = create_bucket_per_zone()
content = 'asdasd'
# don't wait for meta sync just yet
- for zone, bucket_name in zone_bucket.iteritems():
- k = new_key(zone, bucket_name, objname)
+ for zone, bucket in zone_bucket.iteritems():
+ k = new_key(zone, bucket, objname)
k.set_contents_from_string(content)
realm.meta_checkpoint()
# check object exists
- for source_zone, bucket_name in zone_bucket.iteritems():
+ for source_zone, bucket in zone_bucket.iteritems():
for target_zone in all_zones:
if source_zone.zone_name == target_zone.zone_name:
continue
- realm.zone_data_checkpoint(target_zone, source_zone)
+ realm.zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
- check_bucket_eq(source_zone, target_zone, bucket_name)
+ check_bucket_eq(source_zone, target_zone, bucket)
# check object removal
- for source_zone, bucket_name in zone_bucket.iteritems():
- k = get_key(source_zone, bucket_name, objname)
+ for source_zone, bucket in zone_bucket.iteritems():
+ k = get_key(source_zone, bucket, objname)
k.delete()
for target_zone in all_zones:
if source_zone.zone_name == target_zone.zone_name:
continue
- realm.zone_data_checkpoint(target_zone, source_zone)
+ realm.zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
- check_bucket_eq(source_zone, target_zone, bucket_name)
+ check_bucket_eq(source_zone, target_zone, bucket)
def test_multi_period_incremental_sync():
if len(realm.clusters) < 3:
realm.meta_checkpoint()
# verify that we end up with the same objects
- for source_zone, bucket_name in zone_bucket.iteritems():
+ for source_zone, bucket in zone_bucket.iteritems():
for target_zone in all_zones:
if source_zone.zone_name == target_zone.zone_name:
continue
- realm.zone_data_checkpoint(target_zone, source_zone)
+ realm.zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
- check_bucket_eq(source_zone, target_zone, bucket_name)
+ check_bucket_eq(source_zone, target_zone, bucket)
def init(parse_args):