From: Venky Shankar Date: Wed, 16 Sep 2020 04:23:00 +0000 (-0400) Subject: cephfs-mirror: spawn peer replayers on peer changes X-Git-Tag: v16.1.0~786^2~10 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=aaebc6c2b8fa6c274ac883dddc9672c6ef1e2513;p=ceph.git cephfs-mirror: spawn peer replayers on peer changes Signed-off-by: Venky Shankar --- diff --git a/src/tools/cephfs_mirror/FSMirror.cc b/src/tools/cephfs_mirror/FSMirror.cc index 68cb5789b31e..b2952cfac07c 100644 --- a/src/tools/cephfs_mirror/FSMirror.cc +++ b/src/tools/cephfs_mirror/FSMirror.cc @@ -154,14 +154,15 @@ int FSMirror::connect(std::string_view client_name, std::string_view cluster_nam return 0; } -void FSMirror::run() { +void FSMirror::run(PeerReplayer *peer_replayer) { dout(20) << dendl; std::unique_lock locker(m_lock); while (true) { dout(20) << ": trying to pick from " << m_directories.size() << " directories" << dendl; - m_cond.wait(locker, [this]{return m_directories.size() || is_stopping();}); - if (is_stopping()) { + m_cond.wait(locker, [this, peer_replayer]{return m_directories.size() || peer_replayer->is_stopping();}); + if (peer_replayer->is_stopping()) { + dout(5) << ": exiting" << dendl; break; } @@ -171,19 +172,35 @@ void FSMirror::run() { } } -void FSMirror::init_replayers() { - std::scoped_lock locker(m_lock); +void FSMirror::init_replayers(PeerReplayer *peer_replayer) { + ceph_assert(ceph_mutex_is_locked(m_lock)); - auto replayers = g_ceph_context->_conf.get_val( + auto nr_replayers = g_ceph_context->_conf.get_val( "cephfs_mirror_max_concurrent_directory_syncs"); - dout(20) << ": spawning " << replayers << " snapshot replayer(s)" << dendl; + dout(20) << ": spawning " << nr_replayers << " snapshot replayer(s)" << dendl; - while (replayers-- > 0) { - std::unique_ptr replayer(new SnapshotReplayer(this)); - std::string name("replayer-" + stringify(replayers)); + while (nr_replayers-- > 0) { + std::unique_ptr replayer( + new SnapshotReplayerThread(this, peer_replayer)); + std::string name("replayer-" + stringify(nr_replayers)); replayer->create(name.c_str()); - m_snapshot_replayers.push_back(std::move(replayer)); + peer_replayer->replayers.push_back(std::move(replayer)); + } +} + +void FSMirror::shutdown_replayers(PeerReplayer *peer_replayer, + std::unique_lock &locker) { + peer_replayer->stopping = true; + m_cond.notify_all(); + + locker.unlock(); + // safe to iterate unlocked + for (auto &replayer : peer_replayer->replayers) { + replayer->join(); } + locker.lock(); + + peer_replayer->replayers.clear(); } void FSMirror::init(Context *on_finish) { @@ -215,7 +232,7 @@ void FSMirror::shutdown(Context *on_finish) { dout(20) << dendl; { - std::scoped_lock locker(m_lock); + std::unique_lock locker(m_lock); m_stopping = true; m_cond.notify_all(); if (m_on_init_finish != nullptr) { @@ -232,9 +249,15 @@ void FSMirror::shutdown(Context *on_finish) { } m_on_shutdown_finish = on_finish; + + for (auto &[peer, peer_replayer] : m_peer_replayers) { + dout(5) << ": shutting down replayer for peer=" << peer << dendl; + shutdown_replayers(&peer_replayer, locker); + } + m_peer_replayers.clear(); } - wait_for_replayers(); + shutdown_mirror_watcher(); } void FSMirror::init_instance_watcher(Context *on_finish) { @@ -305,17 +328,6 @@ void FSMirror::handle_init_mirror_watcher(int r) { shutdown_instance_watcher(); } -void FSMirror::wait_for_replayers() { - dout(20) << dendl; - - for (auto &replayer : m_snapshot_replayers) { - replayer->join(); - } - - m_snapshot_replayers.clear(); - shutdown_mirror_watcher(); -} - void FSMirror::shutdown_mirror_watcher() { dout(20) << dendl; @@ -382,22 +394,30 @@ void FSMirror::add_peer(const Peer &peer) { dout(10) << ": peer=" << peer << dendl; std::scoped_lock locker(m_lock); - m_peers.emplace(peer); - ceph_assert(m_peers.size() == 1); // support only a single peer + auto p = m_peer_replayers.emplace(peer, PeerReplayer()); + ceph_assert(m_peer_replayers.size() == 1); // support only a single peer + if (p.second) { + init_replayers(&p.first->second); + } } void FSMirror::remove_peer(const Peer &peer) { dout(10) << ": peer=" << peer << dendl; - std::scoped_lock locker(m_lock); - m_peers.erase(peer); + std::unique_lock locker(m_lock); + auto it = m_peer_replayers.find(peer); + if (it != m_peer_replayers.end()) { + dout(5) << ": shutting down replayers for peer=" << peer << dendl; + shutdown_replayers(&it->second, locker); + } + m_peer_replayers.erase(it); } void FSMirror::mirror_status(Formatter *f) { std::scoped_lock locker(m_lock); f->open_object_section("status"); f->open_object_section("peers"); - for (auto &peer : m_peers) { + for ([[maybe_unused]] auto &[peer, peer_replayer] : m_peer_replayers) { peer.dump(f); } f->close_section(); // peers diff --git a/src/tools/cephfs_mirror/FSMirror.h b/src/tools/cephfs_mirror/FSMirror.h index 28e5d127a651..993b291482b4 100644 --- a/src/tools/cephfs_mirror/FSMirror.h +++ b/src/tools/cephfs_mirror/FSMirror.h @@ -56,19 +56,32 @@ private: } }; - class SnapshotReplayer : public Thread { + struct PeerReplayer; + class SnapshotReplayerThread : public Thread { public: - SnapshotReplayer(FSMirror *fs_mirror) - : m_fs_mirror(fs_mirror) { + SnapshotReplayerThread(FSMirror *fs_mirror, PeerReplayer *peer_replayer) + : m_fs_mirror(fs_mirror), + m_peer_replayer(peer_replayer) { } void *entry() override { - m_fs_mirror->run(); + m_fs_mirror->run(m_peer_replayer); return 0; } private: FSMirror *m_fs_mirror; + PeerReplayer *m_peer_replayer; + }; + + typedef std::vector> SnapshotReplayers; + struct PeerReplayer { + SnapshotReplayers replayers; + bool stopping = false; + + bool is_stopping() { + return stopping; + } }; std::string m_fs_name; @@ -79,9 +92,8 @@ private: ceph::mutex m_lock = ceph::make_mutex("cephfs::mirror::fs_mirror"); ceph::condition_variable m_cond; SnapListener m_snap_listener; - std::set m_peers; std::set> m_directories; - std::vector> m_snapshot_replayers; + std::map m_peer_replayers; RadosRef m_cluster; std::string m_addrs; @@ -91,14 +103,16 @@ private: int m_retval = 0; bool m_stopping = false; + bool m_init_failed = false; Context *m_on_init_finish = nullptr; Context *m_on_shutdown_finish = nullptr; MirrorAdminSocketHook *m_asok_hook = nullptr; - void run(); - void init_replayers(); - void wait_for_replayers(); + void run(PeerReplayer *peer_replayer); + void init_replayers(PeerReplayer *peer_replayer); + void shutdown_replayers(PeerReplayer *peer_replayer, + std::unique_lock &locker); int connect(std::string_view cluster_name, std::string_view client_name, RadosRef *cluster);