From: Jos Collin Date: Fri, 14 Feb 2025 09:43:31 +0000 (+0530) Subject: cephfs_mirror: add --sync-latest-snapshot and --sync_from_snapshot options X-Git-Tag: testing/wip-vshankar-testing-20260219.125903^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=312572c62bb7375e67358032fa17dff9986a8191;p=ceph-ci.git cephfs_mirror: add --sync-latest-snapshot and --sync_from_snapshot options Fixes: https://tracker.ceph.com/issues/69966 Fixes: https://tracker.ceph.com/issues/69187 Fixes: https://tracker.ceph.com/issues/69188 Signed-off-by: Jos Collin --- diff --git a/src/pybind/mgr/mirroring/fs/snapshot_mirror.py b/src/pybind/mgr/mirroring/fs/snapshot_mirror.py index c348ce82de1..3d4f88a6e0e 100644 --- a/src/pybind/mgr/mirroring/fs/snapshot_mirror.py +++ b/src/pybind/mgr/mirroring/fs/snapshot_mirror.py @@ -178,8 +178,16 @@ class FSPolicy: def process_updates(self): def acquire_message(dir_path): + lookup_info = self.policy.lookup(dir_path) + sync_latest_snapshot = False + sync_from_snapshot = None + if lookup_info: + sync_latest_snapshot = lookup_info['sync_latest_snapshot'] + sync_from_snapshot = lookup_info['sync_from_snapshot'] return json.dumps({'dir_path': dir_path, - 'mode': 'acquire' + 'mode': 'acquire', + 'sync_latest_snapshot': sync_latest_snapshot, + 'sync_from_snapshot': sync_from_snapshot }) def release_message(dir_path): return json.dumps({'dir_path': dir_path, @@ -202,7 +210,9 @@ class FSPolicy: # take care to not overwrite purge status update_map[dir_path] = {'version': 1, 'instance_id': lookup_info['instance_id'], - 'last_shuffled': lookup_info['mapped_time'] + 'last_shuffled': lookup_info['mapped_time'], + 'sync_latest_snapshot': lookup_info['sync_latest_snapshot'], + 'sync_from_snapshot': lookup_info['sync_from_snapshot'] } if lookup_info['purging']: update_map[dir_path]['purging'] = 1 @@ -219,7 +229,7 @@ class FSPolicy: self.notifier.notify(dir_path, message, self.handle_peer_ack) self.dir_paths.clear() - def add_dir(self, dir_path): + def add_dir(self, dir_path, sync_latest_snapshot, sync_from_snapshot): with self.lock: lookup_info = self.policy.lookup(dir_path) if lookup_info: @@ -230,7 +240,9 @@ class FSPolicy: schedule = self.policy.add_dir(dir_path) if not schedule: return - update_map = {dir_path: {'version': 1, 'instance_id': '', 'last_shuffled': 0.0}} + update_map = {dir_path: {'version': 1, 'instance_id': '', 'last_shuffled': 0.0, + 'sync_latest_snapshot': sync_latest_snapshot, + 'sync_from_snapshot': sync_from_snapshot}} updated = False def update_safe(updates, removals, r): nonlocal updated @@ -689,7 +701,7 @@ class FSSnapshotMirror: raise MirrorException(-errno.EINVAL, f'{dir_path} should be an absolute path') return os.path.normpath(dir_path) - def add_dir(self, filesystem, dir_path): + def add_dir(self, filesystem, dir_path, sync_latest_snapshot, sync_from_snapshot): try: with self.lock: if not self.filesystem_exist(filesystem): @@ -699,7 +711,7 @@ class FSSnapshotMirror: raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored') dir_path = FSSnapshotMirror.norm_path(dir_path) log.debug(f'path normalized to {dir_path}') - fspolicy.add_dir(dir_path) + fspolicy.add_dir(dir_path, sync_latest_snapshot, sync_from_snapshot) return 0, json.dumps({}), '' except MirrorException as me: return me.args[0], '', me.args[1] diff --git a/src/pybind/mgr/mirroring/module.py b/src/pybind/mgr/mirroring/module.py index 2ea3e1b067c..744a69ba809 100644 --- a/src/pybind/mgr/mirroring/module.py +++ b/src/pybind/mgr/mirroring/module.py @@ -77,9 +77,12 @@ class Module(MgrModule): @CLIWriteCommand('fs snapshot mirror add') def snapshot_mirror_add_dir(self, fs_name: str, - path: str): + path: str, + sync_latest_snapshot: Optional[bool] = False, + sync_from_snapshot: Optional[str] = None): """Add a directory for snapshot mirroring""" - return self.fs_snapshot_mirror.add_dir(fs_name, path) + return self.fs_snapshot_mirror.add_dir(fs_name, path, sync_latest_snapshot, + sync_from_snapshot) @CLIWriteCommand('fs snapshot mirror remove') def snapshot_mirror_remove_dir(self, diff --git a/src/tools/cephfs_mirror/FSMirror.cc b/src/tools/cephfs_mirror/FSMirror.cc index 4158361e7b9..f2bd5a3c18d 100644 --- a/src/tools/cephfs_mirror/FSMirror.cc +++ b/src/tools/cephfs_mirror/FSMirror.cc @@ -371,8 +371,11 @@ void FSMirror::handle_shutdown_instance_watcher(int r) { } } -void FSMirror::handle_acquire_directory(string_view dir_path) { - dout(5) << ": dir_path=" << dir_path << dendl; +void FSMirror::handle_acquire_directory(string_view dir_path, + bool sync_latest_snapshot, + string_view sync_from_snapshot) { + dout(5) << ": dir_path=" << dir_path << ", sync_latest_snapshot=" << sync_latest_snapshot + << ", sync_from_snapshot=" << sync_from_snapshot << dendl; { std::scoped_lock locker(m_lock); @@ -382,7 +385,7 @@ void FSMirror::handle_acquire_directory(string_view dir_path) { for (auto &[peer, peer_replayer] : m_peer_replayers) { dout(10) << ": peer=" << peer << dendl; - peer_replayer->add_directory(dir_path); + peer_replayer->add_directory(dir_path, sync_latest_snapshot, sync_from_snapshot); } } if (m_perf_counters) { diff --git a/src/tools/cephfs_mirror/FSMirror.h b/src/tools/cephfs_mirror/FSMirror.h index 17f0f82164b..216206d8ed7 100644 --- a/src/tools/cephfs_mirror/FSMirror.h +++ b/src/tools/cephfs_mirror/FSMirror.h @@ -117,8 +117,10 @@ private: : fs_mirror(fs_mirror) { } - void acquire_directory(std::string_view dir_path) override { - fs_mirror->handle_acquire_directory(dir_path); + void acquire_directory(std::string_view dir_path, + bool sync_latest_snapshot = false, + std::string_view sync_from_snapshot = "") override { + fs_mirror->handle_acquire_directory(dir_path, sync_latest_snapshot, sync_from_snapshot); } void release_directory(std::string_view dir_path) override { @@ -192,7 +194,8 @@ private: void shutdown_instance_watcher(); void handle_shutdown_instance_watcher(int r); - void handle_acquire_directory(std::string_view dir_path); + void handle_acquire_directory(std::string_view dir_path, bool sync_latest_snapshot, + std::string_view sync_from_snapshot); void handle_release_directory(std::string_view dir_path); }; diff --git a/src/tools/cephfs_mirror/InstanceWatcher.cc b/src/tools/cephfs_mirror/InstanceWatcher.cc index 3ea3906404c..c4bdd8c13fa 100644 --- a/src/tools/cephfs_mirror/InstanceWatcher.cc +++ b/src/tools/cephfs_mirror/InstanceWatcher.cc @@ -87,19 +87,29 @@ void InstanceWatcher::handle_notify(uint64_t notify_id, uint64_t handle, std::string dir_path; std::string mode; + bool sync_latest_snapshot; + std::string sync_from_snapshot; + try { JSONDecoder jd(bl); JSONDecoder::decode_json("dir_path", dir_path, &jd.parser, true); JSONDecoder::decode_json("mode", mode, &jd.parser, true); + JSONDecoder::decode_json("sync_latest_snapshot", sync_latest_snapshot, &jd.parser, true); + JSONDecoder::decode_json("sync_from_snapshot", sync_from_snapshot, &jd.parser, true); } catch (const JSONDecoder::err &e) { derr << ": failed to decode notify json: " << e.what() << dendl; } dout(20) << ": notifier_id=" << notifier_id << ", dir_path=" << dir_path - << ", mode=" << mode << dendl; + << ", mode=" << mode << ", sync_latest_snapshot=" << sync_latest_snapshot + << ", sync_from_snapshot=" << sync_from_snapshot << dendl; if (mode == "acquire") { - m_listener.acquire_directory(dir_path); + if (sync_from_snapshot == "null") { + m_listener.acquire_directory(dir_path, sync_latest_snapshot); + } else { + m_listener.acquire_directory(dir_path, sync_latest_snapshot, sync_from_snapshot); + } } else if (mode == "release") { m_listener.release_directory(dir_path); } else { diff --git a/src/tools/cephfs_mirror/InstanceWatcher.h b/src/tools/cephfs_mirror/InstanceWatcher.h index 5a48085d28c..3b85bb7118d 100644 --- a/src/tools/cephfs_mirror/InstanceWatcher.h +++ b/src/tools/cephfs_mirror/InstanceWatcher.h @@ -26,7 +26,9 @@ public: virtual ~Listener() { } - virtual void acquire_directory(std::string_view dir_path) = 0; + virtual void acquire_directory(std::string_view dir_path, + bool sync_latest_snapshot = false, + std::string_view sync_from_snapshot = "") = 0; virtual void release_directory(std::string_view dir_path) = 0; }; diff --git a/src/tools/cephfs_mirror/PeerReplayer.cc b/src/tools/cephfs_mirror/PeerReplayer.cc index 7ea41ec11ac..b9e4059d21f 100644 --- a/src/tools/cephfs_mirror/PeerReplayer.cc +++ b/src/tools/cephfs_mirror/PeerReplayer.cc @@ -174,11 +174,14 @@ PeerReplayer::PeerReplayer(CephContext *cct, FSMirror *fs_mirror, m_local_cluster(local_cluster), m_filesystem(filesystem), m_peer(peer), - m_directories(directories.begin(), directories.end()), m_local_mount(mount), m_service_daemon(service_daemon), m_asok_hook(new PeerReplayerAdminSocketHook(cct, filesystem, peer, this)), m_lock(ceph::make_mutex("cephfs::mirror::PeerReplayer::" + stringify(peer.uuid))) { + std::for_each(directories.begin(), directories.end(), [this](const auto& dir_root) { + m_directories[dir_root] = SnapSyncFrom(); + }); + // reset sync stats sent via service daemon m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer, SERVICE_DAEMON_FAILED_DIR_COUNT_KEY, (uint64_t)0); @@ -228,10 +231,12 @@ PeerReplayer::~PeerReplayer() { } int PeerReplayer::init() { - dout(20) << ": initial dir list=[" << m_directories << "]" << dendl; + std::string str_list = ": initial dir list=["; for (auto &dir_root : m_directories) { - m_snap_sync_stats.emplace(dir_root, SnapSyncStat()); + m_snap_sync_stats.emplace(dir_root.first, SnapSyncStat()); + str_list += dir_root.first + " "; } + dout(20) << str_list << "]" << dendl; auto &remote_client = m_peer.remote.client_name; auto &remote_cluster = m_peer.remote.cluster_name; @@ -322,11 +327,13 @@ void PeerReplayer::shutdown() { m_remote_cluster.reset(); } -void PeerReplayer::add_directory(string_view dir_root) { - dout(20) << ": dir_root=" << dir_root << dendl; +void PeerReplayer::add_directory(string_view dir_root, bool sync_latest_snapshot, + string_view sync_from_snapshot) { + dout(20) << ": dir_root=" << dir_root << ", sync_latest_snapshot=" << sync_latest_snapshot + << ", sync_from_snapshot=" << sync_from_snapshot << dendl; std::scoped_lock locker(m_lock); - m_directories.emplace_back(dir_root); + m_directories[std::string(dir_root)] = SnapSyncFrom(sync_latest_snapshot, sync_from_snapshot); m_snap_sync_stats.emplace(dir_root, SnapSyncStat()); m_cond.notify_all(); } @@ -336,7 +343,7 @@ void PeerReplayer::remove_directory(string_view dir_root) { auto _dir_root = std::string(dir_root); std::scoped_lock locker(m_lock); - auto it = std::find(m_directories.begin(), m_directories.end(), _dir_root); + auto it = m_directories.find(_dir_root); if (it != m_directories.end()) { m_directories.erase(it); } @@ -359,20 +366,19 @@ boost::optional PeerReplayer::pick_directory() { boost::optional candidate; for (auto &dir_root : m_directories) { - auto &sync_stat = m_snap_sync_stats.at(dir_root); + auto &sync_stat = m_snap_sync_stats.at(dir_root.first); if (sync_stat.failed) { std::chrono::duration d = now - *sync_stat.last_failed; if (d.count() < retry_timo) { continue; } } - if (!m_registered.count(dir_root)) { - candidate = dir_root; + if (!m_registered.count(dir_root.first)) { + candidate = dir_root.first; break; } } - std::rotate(m_directories.begin(), m_directories.begin() + 1, m_directories.end()); return candidate; } @@ -401,7 +407,7 @@ void PeerReplayer::unregister_directory(const std::string &dir_root) { unlock_directory(it->first, it->second); m_registered.erase(it); - if (std::find(m_directories.begin(), m_directories.end(), dir_root) == m_directories.end()) { + if (m_directories.find(dir_root) == m_directories.end()) { m_snap_sync_stats.erase(dir_root); } } @@ -480,6 +486,19 @@ void PeerReplayer::unlock_directory(const std::string &dir_root, const DirRegist dout(10) << ": dir_root=" << dir_root << " unlocked" << dendl; } +int PeerReplayer::get_snap_id(const std::string &dir_root, const std::string& snap_name) { + snap_info info; + auto snap_dir = snapshot_dir_path(m_cct, dir_root); + auto snap_path = snapshot_path(snap_dir, snap_name); + int r = ceph_get_snap_info(m_local_mount, snap_path.c_str(), &info); + if (r < 0) { + derr << ": failed to fetch " << snap_name << ", snap info for snap_path=" << snap_path + << ": " << cpp_strerror(r) << dendl; + return r; + } + return info.id; +} + int PeerReplayer::build_snap_map(const std::string &dir_root, std::map *snap_map, bool is_remote) { auto snap_dir = snapshot_dir_path(m_cct, dir_root); @@ -500,6 +519,7 @@ int PeerReplayer::build_snap_map(const std::string &dir_root, return r; } + auto sync_stat = m_snap_sync_stats.at(dir_root); std::set snaps; auto entry = ceph_readdir(mnt, dirp); while (entry != NULL) { @@ -512,6 +532,17 @@ int PeerReplayer::build_snap_map(const std::string &dir_root, entry = ceph_readdir(mnt, dirp); } + uint64_t snap_id_in = 0; + std::string snap_name = m_directories.at(dir_root).sync_from_snapshot; + if (!is_remote && !snap_name.empty()) { + r = get_snap_id(dir_root, snap_name); + if (r < 0) { + derr << ": defaulting to first snapshot for syncing" << dendl; + } else { + snap_id_in = r; + } + } + int rv = 0; for (auto &snap : snaps) { snap_info info; @@ -551,7 +582,21 @@ int PeerReplayer::build_snap_map(const std::string &dir_root, if (rv != 0) { break; } - snap_map->emplace(snap_id, snap); + if (snap_id >= snap_id_in) { + snap_map->emplace(snap_id, snap); + } + } + + if (!is_remote && m_directories.at(dir_root).sync_latest_snapshot) { + // Now we know the highest snap_id object emplaced. + // Reset the snap_map to the highest snap_id element. + if (!sync_stat.current_syncing_snap) { // skip if there's a sync in progress + auto it = snap_map->rbegin(); + auto sid = it->first; + auto sname = it->second; + snap_map->clear(); + snap_map->emplace(sid, sname); + } } r = ceph_closedir(mnt, dirp); @@ -1956,10 +2001,12 @@ int PeerReplayer::do_sync_snaps(const std::string &dir_root) { } } - r = propagate_snap_deletes(dir_root, snaps_deleted); - if (r < 0) { - derr << ": failed to propgate deleted snapshots" << dendl; - return r; + if (!m_directories.at(dir_root).sync_latest_snapshot) { + r = propagate_snap_deletes(dir_root, snaps_deleted); + if (r < 0) { + derr << ": failed to propgate deleted snapshots" << dendl; + return r; + } } r = propagate_snap_renames(dir_root, snaps_renamed); diff --git a/src/tools/cephfs_mirror/PeerReplayer.h b/src/tools/cephfs_mirror/PeerReplayer.h index c99d75abafc..ffbc329540b 100644 --- a/src/tools/cephfs_mirror/PeerReplayer.h +++ b/src/tools/cephfs_mirror/PeerReplayer.h @@ -34,7 +34,8 @@ public: void shutdown(); // add a directory to mirror queue - void add_directory(std::string_view dir_root); + void add_directory(std::string_view dir_root, bool sync_latest_snapshot, + std::string_view sync_from_snapshot); // remove a directory from queue void remove_directory(std::string_view dir_root); @@ -242,19 +243,42 @@ private: }; struct SnapSyncStat { - uint64_t nr_failures = 0; // number of consecutive failures + uint64_t nr_failures; // number of consecutive failures boost::optional last_failed; // lat failed timestamp boost::optional last_failed_reason; - bool failed = false; // hit upper cap for consecutive failures + bool failed; // hit upper cap for consecutive failures boost::optional> last_synced_snap; boost::optional> current_syncing_snap; - uint64_t synced_snap_count = 0; - uint64_t deleted_snap_count = 0; - uint64_t renamed_snap_count = 0; - monotime last_synced = clock::zero(); + uint64_t synced_snap_count; + uint64_t deleted_snap_count; + uint64_t renamed_snap_count; + monotime last_synced; boost::optional last_sync_duration; boost::optional last_sync_bytes; //last sync bytes for display in status - uint64_t sync_bytes = 0; //sync bytes counter, independently for each directory sync. + uint64_t sync_bytes; //sync bytes counter, independently for each directory sync. + + SnapSyncStat() { + nr_failures = 0; + failed = false; + synced_snap_count = 0; + deleted_snap_count = 0; + renamed_snap_count = 0; + last_synced = clock::zero(); + sync_bytes = 0; + } + }; + + struct SnapSyncFrom { + bool sync_latest_snapshot; + std::string sync_from_snapshot; + SnapSyncFrom() { + sync_latest_snapshot = false; + sync_from_snapshot = ""; + } + SnapSyncFrom(bool sl, const std::string_view& sf) { + sync_latest_snapshot = sl; + sync_from_snapshot = sf; + } }; void _inc_failed_count(const std::string &dir_root) { @@ -365,7 +389,7 @@ private: Peer m_peer; // probably need to be encapsulated when supporting cancelations std::map m_registered; - std::vector m_directories; + std::map m_directories; std::map m_snap_sync_stats; MountRef m_local_mount; ServiceDaemon *m_service_daemon; @@ -396,6 +420,7 @@ private: int build_snap_map(const std::string &dir_root, std::map *snap_map, bool is_remote=false); + int get_snap_id(const std::string &dir_root, const std::string& snap_name); int propagate_snap_deletes(const std::string &dir_root, const std::set &snaps); int propagate_snap_renames(const std::string &dir_root, const std::set> &snaps);