import json
import errno
import logging
+import random
import time
from io import StringIO
+from collections import deque
from tasks.cephfs.cephfs_test_case import CephFSTestCase
from teuthology.exceptions import CommandFailedError
self.assertTrue(res['state'], 'mapped')
self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+
+ def test_cephfs_mirror_incremental_sync(self):
+ """ Test incremental snapshot synchronization (based on mtime differences)."""
+ log.debug('reconfigure client auth caps')
+ self.mds_cluster.mon_manager.raw_cluster_cmd_result(
+ 'auth', 'caps', "client.{0}".format(self.mount_b.client_id),
+ 'mds', 'allow rw',
+ 'mon', 'allow r',
+ 'osd', 'allow rw pool={0}, allow rw pool={1}'.format(
+ self.backup_fs.get_data_pool_name(), self.backup_fs.get_data_pool_name()))
+ log.debug(f'mounting filesystem {self.secondary_fs_name}')
+ self.mount_b.umount_wait()
+ self.mount_b.mount(cephfs_name=self.secondary_fs_name)
+
+ repo = 'ceph-qa-suite'
+ repo_dir = 'ceph_repo'
+ repo_path = f'{repo_dir}/{repo}'
+
+ def clone_repo():
+ self.mount_a.run_shell([
+ 'git', 'clone', '--branch', 'giant',
+ f'http://github.com/ceph/{repo}', repo_path])
+
+ def exec_git_cmd(cmd_list):
+ self.mount_a.run_shell(['git', '--git-dir', f'{self.mount_a.mountpoint}/{repo_path}/.git', *cmd_list])
+
+ self.mount_a.run_shell(["mkdir", repo_dir])
+ clone_repo()
+
+ self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
+ self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
+
+ self.add_directory(self.primary_fs_name, self.primary_fs_id, f'/{repo_path}')
+ self.mount_a.run_shell(['mkdir', f'{repo_path}/.snap/snap_a'])
+
+ # full copy, takes time
+ time.sleep(500)
+ self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
+ "client.mirror_remote@ceph", f'/{repo_path}', 'snap_a', 1)
+ self.verify_snapshot(repo_path, 'snap_a')
+
+ # create some diff
+ num = random.randint(5, 20)
+ log.debug(f'resetting to HEAD~{num}')
+ exec_git_cmd(["reset", "--hard", f'HEAD~{num}'])
+
+ self.mount_a.run_shell(['mkdir', f'{repo_path}/.snap/snap_b'])
+ # incremental copy, should be fast
+ time.sleep(180)
+ self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
+ "client.mirror_remote@ceph", f'/{repo_path}', 'snap_b', 2)
+ self.verify_snapshot(repo_path, 'snap_b')
+
+ # diff again, this time back to HEAD
+ log.debug('resetting to HEAD')
+ exec_git_cmd(["pull"])
+
+ self.mount_a.run_shell(['mkdir', f'{repo_path}/.snap/snap_c'])
+ # incremental copy, should be fast
+ time.sleep(180)
+ self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
+ "client.mirror_remote@ceph", f'/{repo_path}', 'snap_c', 3)
+ self.verify_snapshot(repo_path, 'snap_c')
+
+ self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+
+ def test_cephfs_mirror_incremental_sync_with_type_mixup(self):
+ """ Test incremental snapshot synchronization with file type changes.
+
+ The same filename exist as a different type in subsequent snapshot.
+ This verifies if the mirror daemon can identify file type mismatch and
+ sync snapshots.
+
+ \ snap_0 snap_1 snap_2 snap_3
+ \-----------------------------------------------
+ file_x | reg sym dir reg
+ |
+ file_y | dir reg sym dir
+ |
+ file_z | sym dir reg sym
+ """
+ log.debug('reconfigure client auth caps')
+ self.mds_cluster.mon_manager.raw_cluster_cmd_result(
+ 'auth', 'caps', "client.{0}".format(self.mount_b.client_id),
+ 'mds', 'allow rw',
+ 'mon', 'allow r',
+ 'osd', 'allow rw pool={0}, allow rw pool={1}'.format(
+ self.backup_fs.get_data_pool_name(), self.backup_fs.get_data_pool_name()))
+ log.debug(f'mounting filesystem {self.secondary_fs_name}')
+ self.mount_b.umount_wait()
+ self.mount_b.mount(cephfs_name=self.secondary_fs_name)
+
+ typs = deque(['reg', 'dir', 'sym'])
+ def cleanup_and_create_with_type(dirname, fnames):
+ self.mount_a.run_shell_payload(f"rm -rf {dirname}/*")
+ fidx = 0
+ for t in typs:
+ fname = f'{dirname}/{fnames[fidx]}'
+ log.debug(f'file: {fname} type: {t}')
+ if t == 'reg':
+ self.mount_a.run_shell(["touch", fname])
+ self.mount_a.write_file(fname, data=fname)
+ elif t == 'dir':
+ self.mount_a.run_shell(["mkdir", fname])
+ elif t == 'sym':
+ # verify ELOOP in mirror daemon
+ self.mount_a.run_shell(["ln", "-s", "..", fname])
+ fidx += 1
+
+ def verify_types(dirname, fnames, snap_name):
+ tidx = 0
+ for fname in fnames:
+ t = self.mount_b.run_shell_payload(f"stat -c %F {dirname}/.snap/{snap_name}/{fname}").stdout.getvalue().strip()
+ if typs[tidx] == 'reg':
+ self.assertEquals('regular file', t)
+ elif typs[tidx] == 'dir':
+ self.assertEquals('directory', t)
+ elif typs[tidx] == 'sym':
+ self.assertEquals('symbolic link', t)
+ tidx += 1
+
+ self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
+ self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
+
+ self.mount_a.run_shell(["mkdir", "d0"])
+ self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0')
+
+ fnames = ['file_x', 'file_y', 'file_z']
+ turns = 0
+ while turns != len(typs):
+ snapname = f'snap_{turns}'
+ cleanup_and_create_with_type('d0', fnames)
+ self.mount_a.run_shell(['mkdir', f'd0/.snap/{snapname}'])
+ time.sleep(30)
+ self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
+ "client.mirror_remote@ceph", '/d0', snapname, turns+1)
+ verify_types('d0', fnames, snapname)
+ # next type
+ typs.rotate(1)
+ turns += 1
+
+ self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+
+ def test_cephfs_mirror_sync_with_purged_snapshot(self):
+ """Test snapshot synchronization in midst of snapshot deletes.
+
+ Deleted the previous snapshot when the mirror daemon is figuring out
+ incremental differences between current and previous snaphot. The
+ mirror daemon should identify the purge and switch to using remote
+ comparison to sync the snapshot (in the next iteration of course).
+ """
+
+ log.debug('reconfigure client auth caps')
+ self.mds_cluster.mon_manager.raw_cluster_cmd_result(
+ 'auth', 'caps', "client.{0}".format(self.mount_b.client_id),
+ 'mds', 'allow rw',
+ 'mon', 'allow r',
+ 'osd', 'allow rw pool={0}, allow rw pool={1}'.format(
+ self.backup_fs.get_data_pool_name(), self.backup_fs.get_data_pool_name()))
+ log.debug(f'mounting filesystem {self.secondary_fs_name}')
+ self.mount_b.umount_wait()
+ self.mount_b.mount(cephfs_name=self.secondary_fs_name)
+
+ repo = 'ceph-qa-suite'
+ repo_dir = 'ceph_repo'
+ repo_path = f'{repo_dir}/{repo}'
+
+ def clone_repo():
+ self.mount_a.run_shell([
+ 'git', 'clone', '--branch', 'giant',
+ f'http://github.com/ceph/{repo}', repo_path])
+
+ def exec_git_cmd(cmd_list):
+ self.mount_a.run_shell(['git', '--git-dir', f'{self.mount_a.mountpoint}/{repo_path}/.git', *cmd_list])
+
+ self.mount_a.run_shell(["mkdir", repo_dir])
+ clone_repo()
+
+ self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
+ self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
+
+ self.add_directory(self.primary_fs_name, self.primary_fs_id, f'/{repo_path}')
+ self.mount_a.run_shell(['mkdir', f'{repo_path}/.snap/snap_a'])
+
+ # full copy, takes time
+ time.sleep(500)
+ self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
+ "client.mirror_remote@ceph", f'/{repo_path}', 'snap_a', 1)
+ self.verify_snapshot(repo_path, 'snap_a')
+
+ # create some diff
+ num = random.randint(60, 100)
+ log.debug(f'resetting to HEAD~{num}')
+ exec_git_cmd(["reset", "--hard", f'HEAD~{num}'])
+
+ self.mount_a.run_shell(['mkdir', f'{repo_path}/.snap/snap_b'])
+
+ time.sleep(15)
+ self.mount_a.run_shell(['rmdir', f'{repo_path}/.snap/snap_a'])
+
+ # incremental copy but based on remote dir_root
+ time.sleep(300)
+ self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
+ "client.mirror_remote@ceph", f'/{repo_path}', 'snap_b', 2)
+ self.verify_snapshot(repo_path, 'snap_b')
+
+ self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)