}
};
+struct Mirror::C_RestartMirroring : Context {
+ Mirror *mirror;
+ Filesystem filesystem;
+ uint64_t pool_id;
+ Peers peers;
+
+ C_RestartMirroring(Mirror *mirror, const Filesystem &filesystem,
+ uint64_t pool_id, const Peers &peers)
+ : mirror(mirror),
+ filesystem(filesystem),
+ pool_id(pool_id),
+ peers(peers) {
+ }
+
+ void finish(int r) override {
+ disable_mirroring();
+ }
+
+ void disable_mirroring() {
+ Context *ctx = new C_CallbackAdapter<C_RestartMirroring,
+ &C_RestartMirroring::handle_disable_mirroring>(this);
+ mirror->disable_mirroring(filesystem, ctx);
+ }
+
+ void handle_disable_mirroring(int r) {
+ enable_mirroring();
+ }
+
+ void enable_mirroring() {
+ std::scoped_lock locker(mirror->m_lock);
+ Context *ctx = new C_CallbackAdapter<C_RestartMirroring,
+ &C_RestartMirroring::handle_enable_mirroring>(this);
+ mirror->enable_mirroring(filesystem, pool_id, ctx, true);
+ }
+
+ void handle_enable_mirroring(int r) {
+ mirror->handle_enable_mirroring(filesystem, peers, r);
+ delete this;
+ }
+
+ // context needs to live post completion
+ void complete(int r) override {
+ finish(r);
+ }
+};
+
Mirror::Mirror(CephContext *cct, const std::vector<const char*> &args,
MonClient *monc, Messenger *msgr)
: m_cct(cct),
m_args(args),
m_monc(monc),
m_msgr(msgr),
- m_listener(this) {
+ m_listener(this),
+ m_last_blocklist_check(ceph_clock_now()) {
auto thread_pool = &(cct->lookup_or_create_singleton_object<ThreadPoolSingleton>(
"cephfs::mirror::thread_pool", false, cct));
auto safe_timer = &(cct->lookup_or_create_singleton_object<SafeTimerSingleton>(
::exit(0);
}
-void Mirror::handle_mirroring_enabled(const Filesystem &filesystem, int r) {
+void Mirror::handle_enable_mirroring(const Filesystem &filesystem,
+ const Peers &peers, int r) {
+ dout(20) << ": filesystem=" << filesystem << ", peers=" << peers
+ << ", r=" << r << dendl;
+
+ std::scoped_lock locker(m_lock);
+ auto &mirror_action = m_mirror_actions.at(filesystem);
+ ceph_assert(mirror_action.action_in_progress);
+
+ mirror_action.action_in_progress = false;
+ m_cond.notify_all();
+ if (r < 0) {
+ derr << ": failed to initialize FSMirror for filesystem=" << filesystem
+ << ": " << cpp_strerror(r) << dendl;
+ return;
+ }
+
+ for (auto &peer : peers) {
+ mirror_action.fs_mirror->add_peer(peer);
+ }
+
+ dout(10) << ": Initialized FSMirror for filesystem=" << filesystem << dendl;
+}
+
+void Mirror::handle_enable_mirroring(const Filesystem &filesystem, int r) {
dout(20) << ": filesystem=" << filesystem << ", r=" << r << dendl;
std::scoped_lock locker(m_lock);
}
void Mirror::enable_mirroring(const Filesystem &filesystem, uint64_t local_pool_id,
- Context *on_finish) {
+ Context *on_finish, bool is_restart) {
ceph_assert(ceph_mutex_is_locked(m_lock));
auto &mirror_action = m_mirror_actions.at(filesystem);
+ if (is_restart) {
+ ceph_assert(mirror_action.action_in_progress);
+ mirror_action.fs_mirror.reset();
+ } else {
+ ceph_assert(!mirror_action.action_in_progress);
+ }
+
ceph_assert(!mirror_action.fs_mirror);
- ceph_assert(!mirror_action.action_in_progress);
dout(10) << ": starting FSMirror: filesystem=" << filesystem << dendl;
return;
}
- auto p = m_mirror_actions.emplace(filesystem, MirrorAction());
+ auto p = m_mirror_actions.emplace(filesystem, MirrorAction(local_pool_id));
auto &mirror_action = p.first->second;
mirror_action.action_ctxs.push_back(new C_EnableMirroring(this, filesystem, local_pool_id));
}
void Mirror::update_fs_mirrors() {
dout(20) << dendl;
+ double blocklist_interval = g_ceph_context->_conf.get_val<std::chrono::seconds>
+ ("cephfs_mirror_restart_mirror_on_blocklist_interval").count();
+ auto now = ceph_clock_now();
+ bool check_blocklist = blocklist_interval > 0 && ((now - m_last_blocklist_check) >= blocklist_interval);
+
{
std::scoped_lock locker(m_lock);
for (auto &[filesystem, mirror_action] : m_mirror_actions) {
+ if (check_blocklist && !mirror_action.action_in_progress
+ && mirror_action.fs_mirror && mirror_action.fs_mirror->is_blocklisted()) {
+ // about to restart blocklisted mirror instance -- nothing
+ // should interfere
+ dout(5) << ": filesystem=" << filesystem << " is blocklisted -- restarting" << dendl;
+ auto peers = mirror_action.fs_mirror->get_peers();
+ mirror_action.action_ctxs.push_front(
+ new C_RestartMirroring(this, filesystem, mirror_action.pool_id, peers));
+ }
if (!mirror_action.action_ctxs.empty() && !mirror_action.action_in_progress) {
auto ctx = std::move(mirror_action.action_ctxs.front());
mirror_action.action_ctxs.pop_front();
ctx->complete(0);
}
}
+
+ if (check_blocklist) {
+ m_last_blocklist_check = now;
+ }
}
schedule_mirror_update_task();
struct C_EnableMirroring;
struct C_DisableMirroring;
struct C_PeerUpdate;
+ struct C_RestartMirroring;
struct ClusterListener : ClusterWatcher::Listener {
Mirror *mirror;
};
struct MirrorAction {
+ MirrorAction(uint64_t pool_id) :
+ pool_id(pool_id) {
+ }
+
+ uint64_t pool_id; // for restarting blocklisted mirror instance
bool action_in_progress = false;
std::list<Context *> action_ctxs;
std::unique_ptr<FSMirror> fs_mirror;
std::unique_ptr<ClusterWatcher> m_cluster_watcher;
std::map<Filesystem, MirrorAction> m_mirror_actions;
+ utime_t m_last_blocklist_check;
+
int init_mon_client();
// called via listener
// mirror enable callback
void enable_mirroring(const Filesystem &filesystem, uint64_t local_pool_id,
- Context *on_finish);
+ Context *on_finish, bool is_restart=false);
void handle_enable_mirroring(const Filesystem &filesystem, int r);
+ void handle_enable_mirroring(const Filesystem &filesystem, const Peers &peers, int r);
// mirror disable callback
void disable_mirroring(const Filesystem &filesystem, Context *on_finish);