class SafeTimerSingleton : public SafeTimer {
public:
- SafeTimer *timer;
ceph::mutex timer_lock = ceph::make_mutex("cephfs::mirror::timer_lock");
explicit SafeTimerSingleton(CephContext *cct)
} // anonymous namespace
+struct Mirror::C_EnableMirroring : Context {
+ Mirror *mirror;
+ Filesystem filesystem;
+ uint64_t pool_id;
+
+ C_EnableMirroring(Mirror *mirror, const Filesystem &filesystem, uint64_t pool_id)
+ : mirror(mirror),
+ filesystem(filesystem),
+ pool_id(pool_id) {
+ }
+
+ void finish(int r) override {
+ enable_mirroring();
+ }
+
+ void enable_mirroring() {
+ Context *ctx = new C_CallbackAdapter<C_EnableMirroring,
+ &C_EnableMirroring::handle_enable_mirroring>(this);
+ mirror->enable_mirroring(filesystem, pool_id, ctx);
+ }
+
+ void handle_enable_mirroring(int r) {
+ mirror->handle_enable_mirroring(filesystem, r);
+ delete this;
+ }
+
+ // context needs to live post completion
+ void complete(int r) override {
+ finish(r);
+ }
+};
+
+struct Mirror::C_DisableMirroring : Context {
+ Mirror *mirror;
+ Filesystem filesystem;
+
+ C_DisableMirroring(Mirror *mirror, const Filesystem &filesystem)
+ : mirror(mirror),
+ filesystem(filesystem) {
+ }
+
+ void finish(int r) override {
+ disable_mirroring();
+ }
+
+ void disable_mirroring() {
+ Context *ctx = new C_CallbackAdapter<C_DisableMirroring,
+ &C_DisableMirroring::handle_disable_mirroring>(this);
+ mirror->disable_mirroring(filesystem, ctx);
+ }
+
+ void handle_disable_mirroring(int r) {
+ mirror->handle_disable_mirroring(filesystem, r);
+ delete this;
+ }
+
+ // context needs to live post completion
+ void complete(int r) override {
+ finish(r);
+ }
+};
+
+struct Mirror::C_PeerUpdate : Context {
+ Mirror *mirror;
+ Filesystem filesystem;
+ Peer peer;
+ bool remove = false;
+
+ C_PeerUpdate(Mirror *mirror, const Filesystem &filesystem,
+ const Peer &peer)
+ : mirror(mirror),
+ filesystem(filesystem),
+ peer(peer) {
+ }
+ C_PeerUpdate(Mirror *mirror, const Filesystem &filesystem,
+ const Peer &peer, bool remove)
+ : mirror(mirror),
+ filesystem(filesystem),
+ peer(peer),
+ remove(remove) {
+ }
+
+ void finish(int r) override {
+ if (remove) {
+ mirror->remove_peer(filesystem, peer);
+ } else {
+ mirror->add_peer(filesystem, peer);
+ }
+ }
+};
+
Mirror::Mirror(CephContext *cct, const std::vector<const char*> &args,
MonClient *monc, Messenger *msgr)
: m_cct(cct),
auto safe_timer = &(cct->lookup_or_create_singleton_object<SafeTimerSingleton>(
"cephfs::mirror::safe_timer", false, cct));
m_work_queue = thread_pool->work_queue;
- m_timer = safe_timer->timer;
+ m_timer = safe_timer;
+ m_timer_lock = &safe_timer->timer_lock;
+ std::scoped_lock timer_lock(*m_timer_lock);
+ schedule_mirror_update_task();
}
Mirror::~Mirror() {
dout(20) << dendl;
std::unique_lock locker(m_lock);
- if (m_fs_mirrors.empty()) {
+ if (m_mirror_actions.empty()) {
return;
}
::exit(0);
}
-void Mirror::handle_mirroring_enabled(const std::string &fs_name, int r) {
- dout(20) << ": fs_name=" << fs_name << ", r=" << r << dendl;
+void Mirror::handle_mirroring_enabled(const Filesystem &filesystem, int r) {
+ dout(20) << ": filesystem=" << filesystem << ", r=" << r << dendl;
std::scoped_lock locker(m_lock);
+ auto &mirror_action = m_mirror_actions.at(filesystem);
+ ceph_assert(mirror_action.action_in_progress);
+
+ mirror_action.action_in_progress = false;
+ m_cond.notify_all();
if (r < 0) {
- derr << ": failed to initialize FSMirror for filesystem=" << fs_name
+ derr << ": failed to initialize FSMirror for filesystem=" << filesystem
<< ": " << cpp_strerror(r) << dendl;
- if (!m_stopping) {
- m_fs_mirrors.erase(fs_name);
- }
-
return;
}
- dout(10) << ": Initialized FSMirror for filesystem=" << fs_name << dendl;
+ dout(10) << ": Initialized FSMirror for filesystem=" << filesystem << dendl;
+}
+
+void Mirror::enable_mirroring(const Filesystem &filesystem, uint64_t local_pool_id,
+ Context *on_finish) {
+ ceph_assert(ceph_mutex_is_locked(m_lock));
+
+ auto &mirror_action = m_mirror_actions.at(filesystem);
+ ceph_assert(!mirror_action.fs_mirror);
+ ceph_assert(!mirror_action.action_in_progress);
+
+ dout(10) << ": starting FSMirror: filesystem=" << filesystem << dendl;
+
+ mirror_action.action_in_progress = true;
+ mirror_action.fs_mirror = std::make_unique<FSMirror>(m_cct, filesystem, local_pool_id,
+ m_args, m_work_queue);
+ mirror_action.fs_mirror->init(new C_AsyncCallback<ContextWQ>(m_work_queue, on_finish));
}
-void Mirror::mirroring_enabled(const std::string &fs_name, uint64_t local_pool_id) {
- dout(10) << ": fs_name=" << fs_name << ", pool_id=" << local_pool_id << dendl;
+void Mirror::mirroring_enabled(const Filesystem &filesystem, uint64_t local_pool_id) {
+ dout(10) << ": filesystem=" << filesystem << ", pool_id=" << local_pool_id << dendl;
std::scoped_lock locker(m_lock);
if (m_stopping) {
return;
}
- // TODO: handle consecutive overlapping enable/disable calls
- ceph_assert(m_fs_mirrors.find(fs_name) == m_fs_mirrors.end());
-
- dout(10) << ": starting FSMirror: fs_name=" << fs_name << dendl;
- std::unique_ptr<FSMirror> fs_mirror(new FSMirror(m_cct, fs_name, local_pool_id,
- m_args, m_work_queue));
- Context *on_finish = new LambdaContext([this, fs_name](int r) {
- handle_mirroring_enabled(fs_name, r);
- });
- fs_mirror->init(new C_AsyncCallback<ContextWQ>(m_work_queue, on_finish));
- m_fs_mirrors.emplace(fs_name, std::move(fs_mirror));
+ auto p = m_mirror_actions.emplace(filesystem, MirrorAction());
+ auto &mirror_action = p.first->second;
+ mirror_action.action_ctxs.push_back(new C_EnableMirroring(this, filesystem, local_pool_id));
}
-void Mirror::mirroring_disabled(const std::string &fs_name) {
- dout(10) << ": fs_name=" << fs_name << dendl;
+void Mirror::handle_disable_mirroring(const Filesystem &filesystem, int r) {
+ dout(10) << ": filesystem=" << filesystem << ", r=" << r << dendl;
std::scoped_lock locker(m_lock);
- if (!m_fs_mirrors.count(fs_name)) {
- dout(5) << ": fs mirror not found -- init failure(?) for " << fs_name
- << dendl;
- return;
- }
+ auto &mirror_action = m_mirror_actions.at(filesystem);
- if (m_stopping) {
- dout(5) << "shutting down" << dendl;
- return;
+ if (!mirror_action.fs_mirror->is_failed()) {
+ ceph_assert(mirror_action.action_in_progress);
+ mirror_action.action_in_progress = false;
+ m_cond.notify_all();
}
- auto &fs_mirror = m_fs_mirrors.at(fs_name);
- if (!fs_mirror->is_stopping()) {
- Context *on_finish = new LambdaContext([this, fs_name](int r) {
- handle_shutdown(fs_name, r);
- });
- fs_mirror->shutdown(new C_AsyncCallback<ContextWQ>(m_work_queue, on_finish));
+ if (!m_stopping) {
+ mirror_action.fs_mirror.reset();
+ if (mirror_action.action_ctxs.empty()) {
+ dout(10) << ": no pending actions for filesystem=" << filesystem << dendl;
+ m_mirror_actions.erase(filesystem);
+ }
}
}
-void Mirror::peer_added(const std::string &fs_name, const Peer &peer) {
- dout(20) << ": fs_name=" << fs_name << ", peer=" << peer << dendl;
+void Mirror::disable_mirroring(const Filesystem &filesystem, Context *on_finish) {
+ ceph_assert(ceph_mutex_is_locked(m_lock));
- std::scoped_lock locker(m_lock);
- if (!m_fs_mirrors.count(fs_name)) {
- dout(5) << ": fs mirror not found -- init failure(?) for " << fs_name
- << dendl;
+ auto &mirror_action = m_mirror_actions.at(filesystem);
+ ceph_assert(mirror_action.fs_mirror);
+ ceph_assert(!mirror_action.action_in_progress);
+
+ if (mirror_action.fs_mirror->is_failed()) {
+ dout(10) << ": init failed for filesystem=" << filesystem << dendl;
+ m_work_queue->queue(on_finish, -EINVAL);
return;
}
+ mirror_action.action_in_progress = true;
+ mirror_action.fs_mirror->shutdown(new C_AsyncCallback<ContextWQ>(m_work_queue, on_finish));
+}
+
+void Mirror::mirroring_disabled(const Filesystem &filesystem) {
+ dout(10) << ": filesystem=" << filesystem << dendl;
+
+ std::scoped_lock locker(m_lock);
if (m_stopping) {
dout(5) << "shutting down" << dendl;
return;
}
- auto &fs_mirror = m_fs_mirrors.at(fs_name);
- fs_mirror->add_peer(peer);
+ auto &mirror_action = m_mirror_actions.at(filesystem);
+ mirror_action.action_ctxs.push_back(new C_DisableMirroring(this, filesystem));
+}
+
+void Mirror::add_peer(const Filesystem &filesystem, const Peer &peer) {
+ ceph_assert(ceph_mutex_is_locked(m_lock));
+
+ auto &mirror_action = m_mirror_actions.at(filesystem);
+ ceph_assert(mirror_action.fs_mirror);
+ ceph_assert(!mirror_action.action_in_progress);
+
+ mirror_action.fs_mirror->add_peer(peer);
}
-void Mirror::peer_removed(const std::string &fs_name, const Peer &peer) {
- dout(20) << ": fs_name=" << fs_name << ", peer=" << peer << dendl;
+void Mirror::peer_added(const Filesystem &filesystem, const Peer &peer) {
+ dout(20) << ": filesystem=" << filesystem << ", peer=" << peer << dendl;
std::scoped_lock locker(m_lock);
- if (!m_fs_mirrors.count(fs_name)) {
- dout(5) << ": fs mirror not found -- init failure(?) for " << fs_name
- << dendl;
+ if (m_stopping) {
+ dout(5) << "shutting down" << dendl;
return;
}
+ auto &mirror_action = m_mirror_actions.at(filesystem);
+ mirror_action.action_ctxs.push_back(new C_PeerUpdate(this, filesystem, peer));
+}
+
+void Mirror::remove_peer(const Filesystem &filesystem, const Peer &peer) {
+ ceph_assert(ceph_mutex_is_locked(m_lock));
+
+ auto &mirror_action = m_mirror_actions.at(filesystem);
+ ceph_assert(mirror_action.fs_mirror);
+ ceph_assert(!mirror_action.action_in_progress);
+
+ mirror_action.fs_mirror->remove_peer(peer);
+}
+
+void Mirror::peer_removed(const Filesystem &filesystem, const Peer &peer) {
+ dout(20) << ": filesystem=" << filesystem << ", peer=" << peer << dendl;
+
+ std::scoped_lock locker(m_lock);
if (m_stopping) {
dout(5) << "shutting down" << dendl;
return;
}
- auto &fs_mirror = m_fs_mirrors.at(fs_name);
- fs_mirror->remove_peer(peer);
+ auto &mirror_action = m_mirror_actions.at(filesystem);
+ mirror_action.action_ctxs.push_back(new C_PeerUpdate(this, filesystem, peer, true));
}
-void Mirror::handle_shutdown(const std::string &fs_name, int r) {
- dout(10) << ": fs_name=" << fs_name << ", r=" << r << dendl;
+void Mirror::update_fs_mirrors() {
+ dout(20) << dendl;
- std::scoped_lock locker(m_lock);
- m_fs_mirrors.erase(fs_name);
- m_cond.notify_all();
+ {
+ std::scoped_lock locker(m_lock);
+ for (auto &[filesystem, mirror_action] : m_mirror_actions) {
+ if (!mirror_action.action_ctxs.empty() && !mirror_action.action_in_progress) {
+ auto ctx = std::move(mirror_action.action_ctxs.front());
+ mirror_action.action_ctxs.pop_front();
+ ctx->complete(0);
+ }
+ }
+ }
+
+ schedule_mirror_update_task();
+}
+
+void Mirror::schedule_mirror_update_task() {
+ ceph_assert(m_timer_task == nullptr);
+ ceph_assert(ceph_mutex_is_locked(*m_timer_lock));
+
+ m_timer_task = new LambdaContext([this](int _) {
+ m_timer_task = nullptr;
+ update_fs_mirrors();
+ });
+ double after = g_ceph_context->_conf.get_val<std::chrono::seconds>
+ ("cephfs_mirror_mirror_action_update_interval").count();
+ dout(20) << ": scheduling fs mirror update (" << m_timer_task << ") after "
+ << after << " seconds" << dendl;
+ m_timer->add_event_after(after, m_timer_task);
}
void Mirror::run() {
m_cluster_watcher->init();
m_cond.wait(locker, [this]{return m_stopping;});
- for (auto &[fs_name, fs_mirror] : m_fs_mirrors) {
- dout(10) << ": shutting down mirror for fs_name=" << fs_name << dendl;
- if (fs_mirror->is_stopping()) {
- dout(10) << ": fs_name=" << fs_name << " is under shutdown" << dendl;
- continue;
+ locker.unlock();
+ {
+ std::scoped_lock timer_lock(*m_timer_lock);
+ if (m_timer_task != nullptr) {
+ dout(10) << ": canceling timer task=" << m_timer_task << dendl;
+ m_timer->cancel_event(m_timer_task);
+ m_timer_task = nullptr;
+ }
+ }
+ locker.lock();
+
+ for (auto &[filesystem, mirror_action] : m_mirror_actions) {
+ dout(10) << ": trying to shutdown filesystem=" << filesystem << dendl;
+ // wait for in-progress action and shutdown
+ m_cond.wait(locker, [this, &mirror_action] {return !mirror_action.action_in_progress;});
+ if (mirror_action.fs_mirror &&
+ !mirror_action.fs_mirror->is_stopping() &&
+ !mirror_action.fs_mirror->is_failed()) {
+ C_SaferCond cond;
+ mirror_action.fs_mirror->shutdown(new C_AsyncCallback<ContextWQ>(m_work_queue, &cond));
+ int r = cond.wait();
+ dout(10) << ": shutdown filesystem=" << filesystem << ", r=" << r << dendl;
}
-
- Context *on_finish = new LambdaContext([this, fs_name = fs_name](int r) {
- handle_shutdown(fs_name, r);
- });
- fs_mirror->shutdown(new C_AsyncCallback<ContextWQ>(m_work_queue, on_finish));
}
-
- m_cond.wait(locker, [this] {return m_fs_mirrors.empty();});
m_stopped = true;
m_cond.notify_all();
private:
static constexpr std::string_view MIRRORING_MODULE = "mirroring";
+ struct C_EnableMirroring;
+ struct C_DisableMirroring;
+ struct C_PeerUpdate;
+
struct ClusterListener : ClusterWatcher::Listener {
Mirror *mirror;
}
void handle_mirroring_enabled(const FilesystemSpec &spec) override {
- mirror->mirroring_enabled(spec.fs_name, spec.pool_id);
+ mirror->mirroring_enabled(spec.filesystem, spec.pool_id);
}
- void handle_mirroring_disabled(const std::string &fs_name) override {
- mirror->mirroring_disabled(fs_name);
+ void handle_mirroring_disabled(const Filesystem &filesystem) override {
+ mirror->mirroring_disabled(filesystem);
}
- void handle_peers_added(const std::string &fs_name, const Peer &peer) override {
- mirror->peer_added(fs_name, peer);
+ void handle_peers_added(const Filesystem &filesystem, const Peer &peer) override {
+ mirror->peer_added(filesystem, peer);
}
- void handle_peers_removed(const std::string &fs_name, const Peer &peer) override {
- mirror->peer_removed(fs_name, peer);
+ void handle_peers_removed(const Filesystem &filesystem, const Peer &peer) override {
+ mirror->peer_removed(filesystem, peer);
}
};
+ struct MirrorAction {
+ bool action_in_progress = false;
+ std::list<Context *> action_ctxs;
+ std::unique_ptr<FSMirror> fs_mirror;
+ };
+
ceph::mutex m_lock = ceph::make_mutex("cephfs::mirror::Mirror");
ceph::condition_variable m_cond;
ContextWQ *m_work_queue = nullptr;
SafeTimer *m_timer = nullptr;
+ ceph::mutex *m_timer_lock = nullptr;
+ Context *m_timer_task = nullptr;
bool m_stopping = false;
bool m_stopped = false;
std::unique_ptr<ClusterWatcher> m_cluster_watcher;
- std::map<std::string, std::unique_ptr<FSMirror>> m_fs_mirrors;
+ std::map<Filesystem, MirrorAction> m_mirror_actions;
int init_mon_client();
- void handle_mirroring_enabled(const std::string &fs_name, int r);
- void mirroring_enabled(const std::string &fs_name, uint64_t local_pool_id);
- void mirroring_disabled(const std::string &fs_name);
+ // called via listener
+ void mirroring_enabled(const Filesystem &filesystem, uint64_t local_pool_id);
+ void mirroring_disabled(const Filesystem &filesystem);
+ void peer_added(const Filesystem &filesystem, const Peer &peer);
+ void peer_removed(const Filesystem &filesystem, const Peer &peer);
+
+ // mirror enable callback
+ void enable_mirroring(const Filesystem &filesystem, uint64_t local_pool_id,
+ Context *on_finish);
+ void handle_enable_mirroring(const Filesystem &filesystem, int r);
+
+ // mirror disable callback
+ void disable_mirroring(const Filesystem &filesystem, Context *on_finish);
+ void handle_disable_mirroring(const Filesystem &filesystem, int r);
- void peer_added(const std::string &fs_name, const Peer &peer);
- void peer_removed(const std::string &fs_name, const Peer &peer);
+ // peer update callback
+ void add_peer(const Filesystem &filesystem, const Peer &peer);
+ void remove_peer(const Filesystem &filesystem, const Peer &peer);
- void handle_shutdown(const std::string &fs_name, int r);
+ void schedule_mirror_update_task();
+ void update_fs_mirrors();
};
} // namespace mirror