#include "include/stringify.h"
#include "msg/Messenger.h"
#include "FSMirror.h"
+#include "PeerReplayer.h"
#include "aio_utils.h"
+#include "Utils.h"
#include "common/Cond.h"
FSMirror::FSMirror(CephContext *cct, const Filesystem &filesystem, uint64_t pool_id,
std::vector<const char*> args, ContextWQ *work_queue)
- : m_filesystem(filesystem),
+ : m_cct(cct),
+ m_filesystem(filesystem),
m_pool_id(pool_id),
m_args(args),
m_work_queue(work_queue),
delete m_asok_hook;
}
-int FSMirror::connect(std::string_view client_name, std::string_view cluster_name,
- RadosRef *cluster) {
- dout(20) << ": connecting to cluster=" << cluster_name << ", client=" << client_name
- << dendl;
-
- CephInitParameters iparams(CEPH_ENTITY_TYPE_CLIENT);
- if (client_name.empty() || !iparams.name.from_str(client_name)) {
- derr << ": error initializing cluster handle for " << cluster_name << dendl;
- return -EINVAL;
- }
-
- CephContext *cct = common_preinit(iparams, CODE_ENVIRONMENT_LIBRARY,
- CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
- cct->_conf->cluster = cluster_name;
-
- int r = cct->_conf.parse_config_files(nullptr, nullptr, 0);
- if (r < 0) {
- derr << ": could not read ceph conf: " << ": " << cpp_strerror(r) << dendl;
- return r;
- }
-
- cct->_conf.parse_env(cct->get_module_type());
-
- std::vector<const char*> args;
- r = cct->_conf.parse_argv(args);
- if (r < 0) {
- derr << ": could not parse environment: " << cpp_strerror(r) << dendl;
- cct->put();
- return r;
- }
- cct->_conf.parse_env(cct->get_module_type());
-
- cluster->reset(new librados::Rados());
-
- r = (*cluster)->init_with_context(cct);
- ceph_assert(r == 0);
- cct->put();
-
- r = (*cluster)->connect();
- if (r < 0) {
- derr << ": error connecting to " << cluster_name << ": " << cpp_strerror(r)
- << dendl;
- return r;
- }
-
- dout(10) << ": connected to cluster=" << cluster_name << " using client="
- << client_name << dendl;
-
- return 0;
-}
-
-void FSMirror::run(PeerReplayer *peer_replayer) {
- dout(20) << dendl;
-
- std::unique_lock locker(m_lock);
- while (true) {
- dout(20) << ": trying to pick from " << m_directories.size() << " directories" << dendl;
- m_cond.wait(locker, [this, peer_replayer]{return m_directories.size() || peer_replayer->is_stopping();});
- if (peer_replayer->is_stopping()) {
- dout(5) << ": exiting" << dendl;
- break;
- }
-
- locker.unlock();
- ::sleep(1);
- locker.lock();
- }
-}
-
-void FSMirror::init_replayers(PeerReplayer *peer_replayer) {
+int FSMirror::init_replayer(PeerReplayer *peer_replayer) {
ceph_assert(ceph_mutex_is_locked(m_lock));
-
- auto nr_replayers = g_ceph_context->_conf.get_val<uint64_t>(
- "cephfs_mirror_max_concurrent_directory_syncs");
- dout(20) << ": spawning " << nr_replayers << " snapshot replayer(s)" << dendl;
-
- while (nr_replayers-- > 0) {
- std::unique_ptr<SnapshotReplayerThread> replayer(
- new SnapshotReplayerThread(this, peer_replayer));
- std::string name("replayer-" + stringify(nr_replayers));
- replayer->create(name.c_str());
- peer_replayer->replayers.push_back(std::move(replayer));
- }
+ return peer_replayer->init();
}
-void FSMirror::shutdown_replayers(PeerReplayer *peer_replayer,
- std::unique_lock<ceph::mutex> &locker) {
- peer_replayer->stopping = true;
- m_cond.notify_all();
-
- locker.unlock();
- // safe to iterate unlocked
- for (auto &replayer : peer_replayer->replayers) {
- replayer->join();
- }
- locker.lock();
+void FSMirror::shutdown_replayer(PeerReplayer *peer_replayer) {
+ peer_replayer->shutdown();
+}
- peer_replayer->replayers.clear();
+void FSMirror::cleanup() {
+ dout(20) << dendl;
+ ceph_unmount(m_mount);
+ m_ioctx.close();
+ m_cluster.reset();
}
void FSMirror::init(Context *on_finish) {
return;
}
- m_addrs = m_cluster->get_addrs();
- dout(10) << ": rados addrs=" << m_addrs << dendl;
-
r = m_cluster->ioctx_create2(m_pool_id, m_ioctx);
if (r < 0) {
+ m_cluster.reset();
derr << ": error accessing local pool (id=" << m_pool_id << "): "
<< cpp_strerror(r) << dendl;
on_finish->complete(r);
return;
}
+ r = mount(m_cluster, m_filesystem, true, &m_mount);
+ if (r < 0) {
+ m_ioctx.close();
+ m_cluster.reset();
+ on_finish->complete(r);
+ return;
+ }
+
+ m_addrs = m_cluster->get_addrs();
+ dout(10) << ": rados addrs=" << m_addrs << dendl;
+
init_instance_watcher(on_finish);
}
dout(20) << dendl;
{
- std::unique_lock locker(m_lock);
+ std::scoped_lock locker(m_lock);
m_stopping = true;
- m_cond.notify_all();
if (m_on_init_finish != nullptr) {
dout(10) << ": delaying shutdown -- init in progress" << dendl;
m_on_shutdown_finish = new LambdaContext([this, on_finish](int r) {
+ cleanup();
if (r < 0) {
on_finish->complete(0);
return;
}
m_on_shutdown_finish = on_finish;
+ }
- for (auto &[peer, peer_replayer] : m_peer_replayers) {
+ for (auto &[peer, peer_replayer] : m_peer_replayers) {
dout(5) << ": shutting down replayer for peer=" << peer << dendl;
- shutdown_replayers(&peer_replayer, locker);
- }
- m_peer_replayers.clear();
+ shutdown_replayer(peer_replayer.get());
}
-
+ m_peer_replayers.clear();
shutdown_mirror_watcher();
}
std::scoped_lock locker(m_lock);
m_directories.emplace(dir_path);
- m_cond.notify_all();
+ for (auto &[peer, peer_replayer] : m_peer_replayers) {
+ dout(10) << ": peer=" << peer << dendl;
+ peer_replayer->add_directory(dir_path);
+ }
}
void FSMirror::handle_release_directory(string_view dir_path) {
auto it = m_directories.find(dir_path);
if (it != m_directories.end()) {
m_directories.erase(it);
+ for (auto &[peer, peer_replayer] : m_peer_replayers) {
+ dout(10) << ": peer=" << peer << dendl;
+ peer_replayer->remove_directory(dir_path);
+ }
}
}
std::scoped_lock locker(m_lock);
m_all_peers.emplace(peer);
- auto p = m_peer_replayers.emplace(peer, PeerReplayer());
- ceph_assert(m_peer_replayers.size() == 1); // support only a single peer
- if (p.second) {
- init_replayers(&p.first->second);
+ if (m_peer_replayers.find(peer) != m_peer_replayers.end()) {
+ return;
+ }
+
+ auto replayer = std::make_unique<PeerReplayer>(
+ m_cct, this, m_filesystem, peer, m_directories, m_mount);
+ int r = init_replayer(replayer.get());
+ if (r < 0) {
+ return;
}
+ m_peer_replayers.emplace(peer, std::move(replayer));
+ ceph_assert(m_peer_replayers.size() == 1); // support only a single peer
}
void FSMirror::remove_peer(const Peer &peer) {
dout(10) << ": peer=" << peer << dendl;
- std::unique_lock locker(m_lock);
- m_all_peers.erase(peer);
- auto it = m_peer_replayers.find(peer);
- if (it != m_peer_replayers.end()) {
+ std::unique_ptr<PeerReplayer> replayer;
+ {
+ std::scoped_lock locker(m_lock);
+ m_all_peers.erase(peer);
+ auto it = m_peer_replayers.find(peer);
+ if (it != m_peer_replayers.end()) {
+ replayer = std::move(it->second);
+ m_peer_replayers.erase(it);
+ }
+ }
+
+ if (replayer) {
dout(5) << ": shutting down replayers for peer=" << peer << dendl;
- shutdown_replayers(&it->second, locker);
- m_peer_replayers.erase(it);
+ shutdown_replayer(replayer.get());
}
}
namespace mirror {
class MirrorAdminSocketHook;
+class PeerReplayer;
// handle mirroring for a filesystem to a set of peers
}
};
- struct PeerReplayer;
- class SnapshotReplayerThread : public Thread {
- public:
- SnapshotReplayerThread(FSMirror *fs_mirror, PeerReplayer *peer_replayer)
- : m_fs_mirror(fs_mirror),
- m_peer_replayer(peer_replayer) {
- }
-
- void *entry() override {
- m_fs_mirror->run(m_peer_replayer);
- return 0;
- }
-
- private:
- FSMirror *m_fs_mirror;
- PeerReplayer *m_peer_replayer;
- };
-
- typedef std::vector<std::unique_ptr<SnapshotReplayerThread>> SnapshotReplayers;
- struct PeerReplayer {
- SnapshotReplayers replayers;
- bool stopping = false;
-
- bool is_stopping() {
- return stopping;
- }
- };
-
+ CephContext *m_cct;
Filesystem m_filesystem;
uint64_t m_pool_id;
std::vector<const char *> m_args;
ContextWQ *m_work_queue;
ceph::mutex m_lock = ceph::make_mutex("cephfs::mirror::fs_mirror");
- ceph::condition_variable m_cond;
SnapListener m_snap_listener;
std::set<std::string, std::less<>> m_directories;
Peers m_all_peers;
- std::map<Peer, PeerReplayer> m_peer_replayers;
+ std::map<Peer, std::unique_ptr<PeerReplayer>> m_peer_replayers;
RadosRef m_cluster;
std::string m_addrs;
MirrorAdminSocketHook *m_asok_hook = nullptr;
- void run(PeerReplayer *peer_replayer);
- void init_replayers(PeerReplayer *peer_replayer);
- void shutdown_replayers(PeerReplayer *peer_replayer,
- std::unique_lock<ceph::mutex> &locker);
+ MountRef m_mount;
- int connect(std::string_view cluster_name, std::string_view client_name,
- RadosRef *cluster);
+ int init_replayer(PeerReplayer *peer_replayer);
+ void shutdown_replayer(PeerReplayer *peer_replayer);
+ void cleanup();
void init_instance_watcher(Context *on_finish);
void handle_init_instance_watcher(int r);