log.debug(f'status: {status}')
return status
+ def check_sync_completed(self, fs_name, fs_id, peer_spec, dir_path, expected_snap_name,
+ expected_snap_count):
+ peer_uuid = self.get_peer_uuid(peer_spec)
+ with safe_while(sleep=5, tries=20, action='wait for sync completion') as proceed:
+ while proceed():
+ try:
+ # verify via asok
+ peer_status = self.mirror_daemon_command(f'peer status for fs: {fs_name}',
+ 'fs', 'mirror', 'peer', 'status',
+ f'{fs_name}@{fs_id}', peer_uuid)
+ if dir_path in peer_status:
+ res = peer_status[f'{dir_path}']
+ if res['state'] == "idle" and 'last_synced_snap' in res and \
+ res['last_synced_snap']['name'] == expected_snap_name and \
+ res['snaps_synced'] == expected_snap_count:
+ log.debug (f"{expected_snap_name} sync completed.")
+ break
+ except:
+ raise RuntimeError('Error getting peer status')
+
def test_basic_mirror_commands(self):
self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
self.mount_a.run_shell(["mkdir", repo_dir])
clone_repo()
+ peer_spec = "client.mirror_remote@ceph"
self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
- self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
-
+ self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec, self.secondary_fs_name)
self.add_directory(self.primary_fs_name, self.primary_fs_id, f'/{repo_path}')
- self.mount_a.run_shell(['mkdir', f'{repo_path}/.snap/snap_a'])
- # full copy, takes time
- time.sleep(500)
- self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
- "client.mirror_remote@ceph", f'/{repo_path}', 'snap_a', 1)
+ # full copy
+ self.mount_a.run_shell(['mkdir', f'{repo_path}/.snap/snap_a'])
+ self.check_sync_completed(self.primary_fs_name, self.primary_fs_id,
+ peer_spec, f'/{repo_path}', 'snap_a', 1)
self.verify_snapshot(repo_path, 'snap_a')
# create some diff
log.debug(f'resetting to HEAD~{num}')
exec_git_cmd(["reset", "--hard", f'HEAD~{num}'])
+ # take the second snapshot
self.mount_a.run_shell(['mkdir', f'{repo_path}/.snap/snap_b'])
- time.sleep(15)
- self.mount_a.run_shell(['rmdir', f'{repo_path}/.snap/snap_a'])
+ # confirm if sync starts
+ with safe_while(sleep=1, tries=50, action='wait for local_scan start') as proceed:
+ while proceed():
+ try:
+ # verify via asok
+ res = self.mirror_daemon_command(f'peer status for fs: {self.primary_fs_name}',
+ 'fs', 'mirror', 'peer', 'status',
+ f'{self.primary_fs_name}@{self.primary_fs_id}',
+ self.get_peer_uuid(peer_spec))[f'/{repo_path}']
+ if res['state'] == "syncing" and 'sync_type' in res:
+ if res['sync_type'] == "local_scan":
+ log.debug ("sync_type == local_scan")
+ break
+ except:
+ raise RuntimeError('Error getting peer status')
+ # remove the prev snapshot
+ self.mount_a.run_shell(['rmdir', f'{repo_path}/.snap/snap_a'])
# incremental copy but based on remote dir_root
- time.sleep(300)
- self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
- "client.mirror_remote@ceph", f'/{repo_path}', 'snap_b', 2)
+ # confirm if the sync completed
+ self.check_sync_completed(self.primary_fs_name, self.primary_fs_id,
+ peer_spec, f'/{repo_path}', 'snap_b', 2)
self.verify_snapshot(repo_path, 'snap_b')
-
self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
def test_cephfs_mirror_peer_add_primary(self):