return 0;
}
-void FSMirror::run() {
+void FSMirror::run(PeerReplayer *peer_replayer) {
dout(20) << dendl;
std::unique_lock locker(m_lock);
while (true) {
dout(20) << ": trying to pick from " << m_directories.size() << " directories" << dendl;
- m_cond.wait(locker, [this]{return m_directories.size() || is_stopping();});
- if (is_stopping()) {
+ m_cond.wait(locker, [this, peer_replayer]{return m_directories.size() || peer_replayer->is_stopping();});
+ if (peer_replayer->is_stopping()) {
+ dout(5) << ": exiting" << dendl;
break;
}
}
}
-void FSMirror::init_replayers() {
- std::scoped_lock locker(m_lock);
+void FSMirror::init_replayers(PeerReplayer *peer_replayer) {
+ ceph_assert(ceph_mutex_is_locked(m_lock));
- auto replayers = g_ceph_context->_conf.get_val<uint64_t>(
+ auto nr_replayers = g_ceph_context->_conf.get_val<uint64_t>(
"cephfs_mirror_max_concurrent_directory_syncs");
- dout(20) << ": spawning " << replayers << " snapshot replayer(s)" << dendl;
+ dout(20) << ": spawning " << nr_replayers << " snapshot replayer(s)" << dendl;
- while (replayers-- > 0) {
- std::unique_ptr<SnapshotReplayer> replayer(new SnapshotReplayer(this));
- std::string name("replayer-" + stringify(replayers));
+ while (nr_replayers-- > 0) {
+ std::unique_ptr<SnapshotReplayerThread> replayer(
+ new SnapshotReplayerThread(this, peer_replayer));
+ std::string name("replayer-" + stringify(nr_replayers));
replayer->create(name.c_str());
- m_snapshot_replayers.push_back(std::move(replayer));
+ peer_replayer->replayers.push_back(std::move(replayer));
+ }
+}
+
+void FSMirror::shutdown_replayers(PeerReplayer *peer_replayer,
+ std::unique_lock<ceph::mutex> &locker) {
+ peer_replayer->stopping = true;
+ m_cond.notify_all();
+
+ locker.unlock();
+ // safe to iterate unlocked
+ for (auto &replayer : peer_replayer->replayers) {
+ replayer->join();
}
+ locker.lock();
+
+ peer_replayer->replayers.clear();
}
void FSMirror::init(Context *on_finish) {
dout(20) << dendl;
{
- std::scoped_lock locker(m_lock);
+ std::unique_lock locker(m_lock);
m_stopping = true;
m_cond.notify_all();
if (m_on_init_finish != nullptr) {
}
m_on_shutdown_finish = on_finish;
+
+ for (auto &[peer, peer_replayer] : m_peer_replayers) {
+ dout(5) << ": shutting down replayer for peer=" << peer << dendl;
+ shutdown_replayers(&peer_replayer, locker);
+ }
+ m_peer_replayers.clear();
}
- wait_for_replayers();
+ shutdown_mirror_watcher();
}
void FSMirror::init_instance_watcher(Context *on_finish) {
shutdown_instance_watcher();
}
-void FSMirror::wait_for_replayers() {
- dout(20) << dendl;
-
- for (auto &replayer : m_snapshot_replayers) {
- replayer->join();
- }
-
- m_snapshot_replayers.clear();
- shutdown_mirror_watcher();
-}
-
void FSMirror::shutdown_mirror_watcher() {
dout(20) << dendl;
dout(10) << ": peer=" << peer << dendl;
std::scoped_lock locker(m_lock);
- m_peers.emplace(peer);
- ceph_assert(m_peers.size() == 1); // support only a single peer
+ auto p = m_peer_replayers.emplace(peer, PeerReplayer());
+ ceph_assert(m_peer_replayers.size() == 1); // support only a single peer
+ if (p.second) {
+ init_replayers(&p.first->second);
+ }
}
void FSMirror::remove_peer(const Peer &peer) {
dout(10) << ": peer=" << peer << dendl;
- std::scoped_lock locker(m_lock);
- m_peers.erase(peer);
+ std::unique_lock locker(m_lock);
+ auto it = m_peer_replayers.find(peer);
+ if (it != m_peer_replayers.end()) {
+ dout(5) << ": shutting down replayers for peer=" << peer << dendl;
+ shutdown_replayers(&it->second, locker);
+ }
+ m_peer_replayers.erase(it);
}
void FSMirror::mirror_status(Formatter *f) {
std::scoped_lock locker(m_lock);
f->open_object_section("status");
f->open_object_section("peers");
- for (auto &peer : m_peers) {
+ for ([[maybe_unused]] auto &[peer, peer_replayer] : m_peer_replayers) {
peer.dump(f);
}
f->close_section(); // peers
}
};
- class SnapshotReplayer : public Thread {
+ struct PeerReplayer;
+ class SnapshotReplayerThread : public Thread {
public:
- SnapshotReplayer(FSMirror *fs_mirror)
- : m_fs_mirror(fs_mirror) {
+ SnapshotReplayerThread(FSMirror *fs_mirror, PeerReplayer *peer_replayer)
+ : m_fs_mirror(fs_mirror),
+ m_peer_replayer(peer_replayer) {
}
void *entry() override {
- m_fs_mirror->run();
+ m_fs_mirror->run(m_peer_replayer);
return 0;
}
private:
FSMirror *m_fs_mirror;
+ PeerReplayer *m_peer_replayer;
+ };
+
+ typedef std::vector<std::unique_ptr<SnapshotReplayerThread>> SnapshotReplayers;
+ struct PeerReplayer {
+ SnapshotReplayers replayers;
+ bool stopping = false;
+
+ bool is_stopping() {
+ return stopping;
+ }
};
std::string m_fs_name;
ceph::mutex m_lock = ceph::make_mutex("cephfs::mirror::fs_mirror");
ceph::condition_variable m_cond;
SnapListener m_snap_listener;
- std::set<Peer> m_peers;
std::set<std::string, std::less<>> m_directories;
- std::vector<std::unique_ptr<SnapshotReplayer>> m_snapshot_replayers;
+ std::map<Peer, PeerReplayer> m_peer_replayers;
RadosRef m_cluster;
std::string m_addrs;
int m_retval = 0;
bool m_stopping = false;
+ bool m_init_failed = false;
Context *m_on_init_finish = nullptr;
Context *m_on_shutdown_finish = nullptr;
MirrorAdminSocketHook *m_asok_hook = nullptr;
- void run();
- void init_replayers();
- void wait_for_replayers();
+ void run(PeerReplayer *peer_replayer);
+ void init_replayers(PeerReplayer *peer_replayer);
+ void shutdown_replayers(PeerReplayer *peer_replayer,
+ std::unique_lock<ceph::mutex> &locker);
int connect(std::string_view cluster_name, std::string_view client_name,
RadosRef *cluster);