def disable_mirroring_module(self):
self.run_ceph_cmd("mgr", "module", "disable", TestMirroring.MODULE_NAME)
+ def is_mirroring_module_enabled(self):
+ modules = json.loads(
+ self.get_ceph_cmd_stdout('mgr', 'module', 'ls', '--format', 'json'))
+ return self.MODULE_NAME in modules.get('enabled_modules', [])
+
+ def wait_mirroring_module_disabled(self):
+ with safe_while(sleep=1, tries=30,
+ action='wait for mirroring module disable') as proceed:
+ while proceed():
+ if not self.is_mirroring_module_enabled():
+ return
+
+ def wait_mirroring_module_reload(self, fs_name, dir_name):
+ """Wait for mirroring module enable and directory re-acquire after reload."""
+ with safe_while(sleep=2, tries=60,
+ action='wait for mirroring module reload') as proceed:
+ while proceed():
+ if not self.is_mirroring_module_enabled():
+ continue
+ dirmap = self.mirror_dirmap(fs_name, dir_name)
+ if dirmap.get('state') == 'mapped':
+ return
+
+ def wait_directory_mapped(self, fs_name, dir_name):
+ with safe_while(sleep=2, tries=30,
+ action='wait for directory to be mapped') as proceed:
+ while proceed():
+ dirmap = self.mirror_dirmap(fs_name, dir_name)
+ if dirmap.get('state') == 'mapped':
+ return
+
def enable_mirroring(self, fs_name, fs_id):
res = self.mirror_daemon_command(f'counter dump for fs: {fs_name}', 'counter', 'dump')
vbefore = res[TestMirroring.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR][0]
log.debug(f'destination snapshot checksum {snap_name} {dest_res}')
self.assertTrue(source_res == dest_res)
+ def mirror_dirmap(self, fs_name, dir_name):
+ return json.loads(self.get_ceph_cmd_stdout(
+ 'fs', 'snapshot', 'mirror', 'dirmap', fs_name, dir_name))
+
+ @retry_assert(timeout=60, interval=5)
+ def assert_snapshot_not_synced(self, dir_name, snap_name):
+ """Assert a snapshot on the primary has not appeared on the secondary."""
+ try:
+ snap_list = self.mount_b.ls(path=f'{dir_name}/.snap')
+ except CommandFailedError:
+ return
+ self.assertNotIn(snap_name, snap_list)
+
@retry_assert(timeout=150, interval=5)
def verify_failed_directory(self, fs_name, fs_id, peer_spec, dir_name):
peer_uuid = self.get_peer_uuid(peer_spec)
self.config_set('client.mirror', 'cephfs_mirror_distribute_datasync_threads', 'true')
self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+
+ def test_cephfs_mirror_duplicate_acquire_notify(self):
+ """Duplicate acquire notifies must not leave a ghost replayer directory.
+
+ Disabling and re-enabling the mirroring module reloads FSPolicy from omap
+ and re-sends acquire for mapped directories. Without an idempotent
+ PeerReplayer::add_directory(), a duplicate vector entry survives
+ remove_directory() (only one list entry is erased) and the replayer
+ keeps syncing after the directory is removed from mirroring.
+
+ Without the fix, a ghost entry may also crash the replayer thread when
+ pick_directory() calls m_snap_sync_stats.at() after remove erased the map
+ entry; this test catches spurious sync when that path still runs.
+ """
+ self.setup_mount_b(mds_perm='rw')
+ self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
+ peer_spec = "client.mirror_remote@ceph"
+ self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec,
+ self.secondary_fs_name)
+
+ dir_name = 'dup_acquire_dir'
+ self.mount_a.run_shell(['mkdir', dir_name])
+ self.add_directory(self.primary_fs_name, self.primary_fs_id, f'/{dir_name}')
+
+ self.wait_directory_mapped(self.primary_fs_name, f'/{dir_name}')
+
+ self.disable_mirroring_module()
+ self.wait_mirroring_module_disabled()
+ self.enable_mirroring_module()
+ self.wait_mirroring_module_reload(self.primary_fs_name, f'/{dir_name}')
+
+ self.remove_directory(self.primary_fs_name, self.primary_fs_id, f'/{dir_name}')
+
+ snap_name = 'snap_after_remove'
+ self.mount_a.run_shell(['mkdir', f'{dir_name}/.snap/{snap_name}'])
+ self.assert_snapshot_not_synced(dir_name, snap_name)
+ self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)