def mdlog_autotrim(zone):
zone.cluster.admin(['mdlog', 'autotrim'])
-def meta_sync_status(zone):
- while True:
- cmd = ['metadata', 'sync', 'status'] + zone.zone_args()
- meta_sync_status_json, retcode = zone.cluster.admin(cmd, check_retcode=False, read_only=True)
- if retcode == 0:
- break
- assert(retcode == 2) # ENOENT
- time.sleep(5)
-
+def parse_meta_sync_status(meta_sync_status_json):
meta_sync_status_json = meta_sync_status_json.decode('utf-8')
log.debug('current meta sync status=%s', meta_sync_status_json)
sync_status = json.loads(meta_sync_status_json)
return period, realm_epoch, num_shards, markers
+def meta_sync_status(zone):
+ for _ in range(60):
+ cmd = ['metadata', 'sync', 'status'] + zone.zone_args()
+ meta_sync_status_json, retcode = zone.cluster.admin(cmd, check_retcode=False, read_only=True)
+ if retcode == 0:
+ return parse_meta_sync_status(meta_sync_status_json)
+ assert(retcode == 2) # ENOENT
+ time.sleep(5)
+
+ assert False, 'failed to read metadata sync status for zone=%s' % zone.name
+
def meta_master_log_status(master_zone):
cmd = ['mdlog', 'status'] + master_zone.zone_args()
mdlog_status_json, retcode = master_zone.cluster.admin(cmd, read_only=True)
log.info('starting meta checkpoint for zone=%s', zone.name)
- while True:
+ for _ in range(60):
period, realm_epoch, num_shards, sync_status = meta_sync_status(zone)
if realm_epoch < current_realm_epoch:
log.warning('zone %s is syncing realm epoch=%d, behind current realm epoch=%d',
log.debug('log_status=%s', master_status)
log.debug('sync_status=%s', sync_status)
if compare_meta_status(zone, master_status, sync_status):
- break
+ log.info('finish meta checkpoint for zone=%s', zone.name)
+ return
time.sleep(5)
-
- log.info('finish meta checkpoint for zone=%s', zone.name)
+ assert False, 'failed meta checkpoint for zone=%s' % zone.name
def zonegroup_meta_checkpoint(zonegroup, meta_master_zone = None, master_status = None):
if not meta_master_zone:
for zonegroup in realm.current_period.zonegroups:
zonegroup_meta_checkpoint(zonegroup, meta_master_zone, master_status)
-def data_sync_status(target_zone, source_zone):
- if target_zone == source_zone:
- return None
-
- while True:
- cmd = ['data', 'sync', 'status'] + target_zone.zone_args()
- cmd += ['--source-zone', source_zone.name]
- data_sync_status_json, retcode = target_zone.cluster.admin(cmd, check_retcode=False, read_only=True)
- if retcode == 0:
- break
-
- assert(retcode == 2) # ENOENT
-
+def parse_data_sync_status(data_sync_status_json):
data_sync_status_json = data_sync_status_json.decode('utf-8')
log.debug('current data sync status=%s', data_sync_status_json)
sync_status = json.loads(data_sync_status_json)
return (num_shards, markers)
+def data_sync_status(target_zone, source_zone):
+ if target_zone == source_zone:
+ return None
+
+ for _ in range(60):
+ cmd = ['data', 'sync', 'status'] + target_zone.zone_args()
+ cmd += ['--source-zone', source_zone.name]
+ data_sync_status_json, retcode = target_zone.cluster.admin(cmd, check_retcode=False, read_only=True)
+ if retcode == 0:
+ return parse_data_sync_status(data_sync_status_json)
+
+ assert(retcode == 2) # ENOENT
+ time.sleep(5)
+
+ assert False, 'failed to read data sync status for target_zone=%s source_zone=%s' % \
+ (target_zone.name, source_zone.name)
+
def bucket_sync_status(target_zone, source_zone, bucket_name):
if target_zone == source_zone:
return None
if target_zone == source_zone:
return
+ log_status = data_source_log_status(source_zone)
log.info('starting data checkpoint for target_zone=%s source_zone=%s', target_zone.name, source_zone.name)
- while True:
- log_status = data_source_log_status(source_zone)
+ for _ in range(60):
num_shards, sync_status = data_sync_status(target_zone, source_zone)
log.debug('log_status=%s', log_status)
log.debug('sync_status=%s', sync_status)
if compare_data_status(target_zone, source_zone, log_status, sync_status):
- break
-
+ log.info('finished data checkpoint for target_zone=%s source_zone=%s',
+ target_zone.name, source_zone.name)
+ return
time.sleep(5)
- log.info('finished data checkpoint for target_zone=%s source_zone=%s', target_zone.name, source_zone.name)
+ assert False, 'failed data checkpoint for target_zone=%s source_zone=%s' % \
+ (target_zone.name, source_zone.name)
+
def zone_bucket_checkpoint(target_zone, source_zone, bucket_name):
if target_zone == source_zone:
return
+ log_status = bucket_source_log_status(source_zone, bucket_name)
log.info('starting bucket checkpoint for target_zone=%s source_zone=%s bucket=%s', target_zone.name, source_zone.name, bucket_name)
- while True:
- log_status = bucket_source_log_status(source_zone, bucket_name)
+ for _ in range(60):
sync_status = bucket_sync_status(target_zone, source_zone, bucket_name)
log.debug('log_status=%s', log_status)
log.debug('sync_status=%s', sync_status)
if compare_bucket_status(target_zone, source_zone, bucket_name, log_status, sync_status):
- break
+ log.info('finished bucket checkpoint for target_zone=%s source_zone=%s bucket=%s', target_zone.name, source_zone.name, bucket_name)
+ return
time.sleep(5)
- log.info('finished bucket checkpoint for target_zone=%s source_zone=%s bucket=%s', target_zone.name, source_zone.name, bucket_name)
+ assert False, 'finished bucket checkpoint for target_zone=%s source_zone=%s bucket=%s' % \
+ (target_zone.name, source_zone.name, bucket_name)
def set_master_zone(zone):
zone.modify(zone.cluster, ['--master'])