scrub_waiting.push_back(&obj->item_scrub);
}
-void ScrubStack::remove_from_waiting(MDSCacheObject *obj)
+void ScrubStack::remove_from_waiting(MDSCacheObject *obj, bool kick)
{
scrubs_in_progress--;
if (obj->item_scrub.is_on_list()) {
obj->item_scrub.remove_myself();
scrub_stack.push_front(&obj->item_scrub);
- kick_off_scrubs();
+ if (kick)
+ kick_off_scrubs();
}
}
assert(state == STATE_RUNNING || state == STATE_IDLE);
set_state(STATE_RUNNING);
-
if (CInode *in = dynamic_cast<CInode*>(*it)) {
dout(20) << __func__ << " examining " << *in << dendl;
++it;
+
+ if (!validate_inode_auth(in))
+ continue;
+
if (!in->is_dir()) {
// it's a regular file, symlink, or hard link
dequeue(in); // we only touch it this once, so remove from stack
}
}
+bool ScrubStack::validate_inode_auth(CInode *in)
+{
+ if (in->is_auth()) {
+ if (!in->can_auth_pin()) {
+ dout(10) << __func__ << " can't auth pin" << dendl;
+ in->add_waiter(CInode::WAIT_UNFREEZE, new C_RetryScrub(this, in));
+ return false;
+ }
+ return true;
+ } else {
+ MDSRank *mds = mdcache->mds;
+ if (in->is_ambiguous_auth()) {
+ dout(10) << __func__ << " ambiguous auth" << dendl;
+ in->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_RetryScrub(this, in));
+ } else if (mds->is_cluster_degraded()) {
+ dout(20) << __func__ << " cluster degraded" << dendl;
+ mds->wait_for_cluster_recovered(new C_RetryScrub(this, in));
+ } else {
+ ScrubHeaderRef header = in->get_scrub_header();
+ ceph_assert(header);
+
+ auto ret = remote_scrubs.emplace(std::piecewise_construct,
+ std::forward_as_tuple(in),
+ std::forward_as_tuple());
+ ceph_assert(ret.second); // FIXME: parallel scrubs?
+ auto &scrub_r = ret.first->second;
+ scrub_r.tag = header->get_tag();
+
+ mds_rank_t auth = in->authority().first;
+ dout(10) << __func__ << " forward to mds." << auth << dendl;
+ auto r = make_message<MMDSScrub>(MMDSScrub::OP_QUEUEINO, in->ino(),
+ std::move(in->scrub_queued_frags()),
+ header->get_tag(), header->is_internal_tag(),
+ header->get_force(), header->get_recursive(),
+ header->get_repair());
+ mdcache->mds->send_message_mds(r, auth);
+
+ scrub_r.gather_set.insert(auth);
+ // wait for ACK
+ add_to_waiting(in);
+ }
+ return false;
+ }
+}
+
void ScrubStack::scrub_dir_inode(CInode *in, bool *added_children, bool *done)
{
dout(10) << __func__ << " " << *in << dendl;
+ ceph_assert(in->is_auth());
+ MDSRank *mds = mdcache->mds;
ScrubHeaderRef header = in->get_scrub_header();
ceph_assert(header);
MDSGatherBuilder gather(g_ceph_context);
+ auto &queued = in->scrub_queued_frags();
+ std::map<mds_rank_t, fragset_t> scrub_remote;
+
frag_vec_t frags;
in->dirfragtree.get_leaves(frags);
dout(20) << __func__ << "recursive mode, frags " << frags << dendl;
-
for (auto &fg : frags) {
+ if (queued.contains(fg))
+ continue;
CDir *dir = in->get_or_open_dirfrag(mdcache, fg);
if (!dir->is_auth()) {
- dout(20) << __func__ << " not auth " << *dir << dendl;
- // no-op
+ if (dir->is_ambiguous_auth()) {
+ dout(20) << __func__ << " ambiguous auth " << *dir << dendl;
+ dir->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, gather.new_sub());
+ } else if (mds->is_cluster_degraded()) {
+ dout(20) << __func__ << " cluster degraded" << dendl;
+ mds->wait_for_cluster_recovered(gather.new_sub());
+ } else {
+ mds_rank_t auth = dir->authority().first;
+ scrub_remote[auth].insert_raw(fg);
+ }
} else if (!dir->can_auth_pin()) {
dout(20) << __func__ << " freezing/frozen " << *dir << dendl;
dir->add_waiter(CDir::WAIT_UNFREEZE, gather.new_sub());
} else if (dir->get_version() == 0) {
dout(20) << __func__ << " barebones " << *dir << dendl;
dir->fetch(gather.new_sub());
+ } else {
+ _enqueue(dir, header, nullptr, true);
+ queued.insert_raw(dir->get_frag());
+ *added_children = true;
}
}
+
+ queued.simplify();
+
if (gather.has_subs()) {
gather.set_finisher(new C_RetryScrub(this, in));
gather.activate();
return;
}
- std::vector<CDir*> dfs;
- in->get_dirfrags(dfs);
- for (auto &dir : dfs) {
- if (dir->is_auth()){
- _enqueue(dir, header, nullptr, true);
- *added_children = true;
- } else {
- // FIXME: ask auth mds to scrub
+ if (!scrub_remote.empty()) {
+ auto ret = remote_scrubs.emplace(std::piecewise_construct,
+ std::forward_as_tuple(in),
+ std::forward_as_tuple());
+ ceph_assert(ret.second); // FIXME: parallel scrubs?
+ auto &scrub_r = ret.first->second;
+ scrub_r.tag = header->get_tag();
+
+ for (auto& p : scrub_remote) {
+ p.second.simplify();
+ dout(20) << __func__ << " forward " << p.second << " to mds." << p.first << dendl;
+ auto r = make_message<MMDSScrub>(MMDSScrub::OP_QUEUEDIR, in->ino(),
+ std::move(p.second), header->get_tag(),
+ header->is_internal_tag(), header->get_force(),
+ header->get_recursive(), header->get_repair());
+ mds->send_message_mds(r, p.first);
+ scrub_r.gather_set.insert(p.first);
}
+ // wait for ACKs
+ add_to_waiting(in);
+ return;
}
scrub_dir_inode_final(in);
if (in == header->get_origin()) {
scrub_origins.erase(in);
clog_scrub_summary(in);
- if (!header->get_recursive()) {
+ if (!header->get_recursive() && header->get_formatter()) {
if (r >= 0) { // we got into the scrubbing dump it
- result.dump(&(header->get_formatter()));
+ result.dump(header->get_formatter());
} else { // we failed the lookup or something; dump ourselves
- header->get_formatter().open_object_section("results");
- header->get_formatter().dump_int("return_code", r);
- header->get_formatter().close_section(); // results
+ header->get_formatter()->open_object_section("results");
+ header->get_formatter()->dump_int("return_code", r);
+ header->get_formatter()->close_section(); // results
}
}
}
stack_size = 0;
scrub_stack.clear();
scrub_waiting.clear();
+
+ for (auto& p : remote_scrubs)
+ remove_from_waiting(p.first, false);
+ remote_scrubs.clear();
+
clear_stack = false;
}
clog->info() << "scrub summary: " << scrub_summary();
}
+
+void ScrubStack::dispatch(const cref_t<Message> &m)
+{
+ switch (m->get_type()) {
+ case MSG_MDS_SCRUB:
+ handle_scrub(ref_cast<MMDSScrub>(m));
+ break;
+
+ default:
+ derr << " scrub stack unknown message " << m->get_type() << dendl_impl;
+ ceph_abort_msg("scrub stack unknown message");
+ }
+}
+
+void ScrubStack::handle_scrub(const cref_t<MMDSScrub> &m)
+{
+
+ mds_rank_t from = mds_rank_t(m->get_source().num());
+ dout(10) << __func__ << " " << *m << " from mds." << from << dendl;
+
+ switch (m->get_op()) {
+ case MMDSScrub::OP_QUEUEDIR:
+ {
+ CInode *diri = mdcache->get_inode(m->get_ino());
+ ceph_assert(diri);
+
+ std::vector<CDir*> dfs;
+ MDSGatherBuilder gather(g_ceph_context);
+ for (const auto& fg : m->get_frags()) {
+ CDir *dir = diri->get_dirfrag(fg);
+ if (!dir) {
+ dout(10) << __func__ << " no frag " << fg << dendl;
+ continue;
+ }
+ if (!dir->is_auth()) {
+ dout(10) << __func__ << " not auth " << *dir << dendl;
+ continue;
+ }
+ if (!dir->can_auth_pin()) {
+ dout(10) << __func__ << " can't auth pin " << *dir << dendl;
+ dir->add_waiter(CDir::WAIT_UNFREEZE, gather.new_sub());
+ continue;
+ }
+ dfs.push_back(dir);
+ }
+
+ if (gather.has_subs()) {
+ gather.set_finisher(new C_MDS_RetryMessage(mdcache->mds, m));
+ gather.activate();
+ return;
+ }
+
+ fragset_t queued;
+ if (!dfs.empty()) {
+ ScrubHeaderRef header = std::make_shared<ScrubHeader>(m->get_tag(), m->is_internal_tag(),
+ m->is_force(), m->is_recursive(),
+ m->is_repair(), nullptr);
+ for (auto dir : dfs) {
+ queued.insert_raw(dir->get_frag());
+ _enqueue(dir, header, nullptr, true);
+ }
+ queued.simplify();
+ kick_off_scrubs();
+ }
+
+ auto r = make_message<MMDSScrub>(MMDSScrub::OP_QUEUEDIR_ACK, m->get_ino(),
+ std::move(queued), m->get_tag());
+ mdcache->mds->send_message_mds(r, from);
+ }
+ break;
+ case MMDSScrub::OP_QUEUEDIR_ACK:
+ {
+ CInode *diri = mdcache->get_inode(m->get_ino());
+ ceph_assert(diri);
+ auto it = remote_scrubs.find(diri);
+ if (it != remote_scrubs.end() &&
+ m->get_tag() == it->second.tag) {
+ if (it->second.gather_set.erase(from)) {
+ auto &queued = diri->scrub_queued_frags();
+ for (auto &fg : m->get_frags())
+ queued.insert_raw(fg);
+ queued.simplify();
+
+ if (it->second.gather_set.empty()) {
+ remote_scrubs.erase(it);
+ remove_from_waiting(diri);
+ }
+ }
+ }
+ }
+ break;
+ case MMDSScrub::OP_QUEUEINO:
+ {
+ CInode *in = mdcache->get_inode(m->get_ino());
+ ceph_assert(in);
+
+ ScrubHeaderRef header = std::make_shared<ScrubHeader>(m->get_tag(), m->is_internal_tag(),
+ m->is_force(), m->is_recursive(),
+ m->is_repair(), nullptr);
+
+ _enqueue(in, header, nullptr, true);
+ in->scrub_queued_frags() = m->get_frags();
+ kick_off_scrubs();
+
+ fragset_t queued;
+ auto r = make_message<MMDSScrub>(MMDSScrub::OP_QUEUEINO_ACK, m->get_ino(),
+ std::move(queued), m->get_tag());
+ mdcache->mds->send_message_mds(r, from);
+ }
+ break;
+ case MMDSScrub::OP_QUEUEINO_ACK:
+ {
+ CInode *in = mdcache->get_inode(m->get_ino());
+ ceph_assert(in);
+ auto it = remote_scrubs.find(in);
+ if (it != remote_scrubs.end() &&
+ m->get_tag() == it->second.tag &&
+ it->second.gather_set.erase(from)) {
+ ceph_assert(it->second.gather_set.empty());
+ remote_scrubs.erase(it);
+
+ remove_from_waiting(in, false);
+ dequeue(in);
+
+ if (in == in->scrub_info()->header->get_origin()) {
+ scrub_origins.erase(in);
+ clog_scrub_summary(in);
+ }
+ MDSContext *c = nullptr;
+ in->scrub_finished(&c);
+ if (c)
+ finisher->queue(new MDSIOContextWrapper(mdcache->mds, c), 0);
+
+ kick_off_scrubs();
+ }
+ }
+ break;
+ default:
+ derr << " scrub stack unknown scrub operation " << m->get_op() << dendl_impl;
+ ceph_abort_msg("scrub stack unknown scrub operation");
+ }
+}
+
+void ScrubStack::handle_mds_failure(mds_rank_t mds)
+{
+ bool kick = false;
+ for (auto it = remote_scrubs.begin(); it != remote_scrubs.end(); ) {
+ if (it->second.gather_set.erase(mds) &&
+ it->second.gather_set.empty()) {
+ CInode *in = it->first;
+ remote_scrubs.erase(it++);
+ remove_from_waiting(in, false);
+ kick = true;
+ } else {
+ ++it;
+ }
+ }
+ if (kick)
+ kick_off_scrubs();
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_MMDSSCRUB_H
+#define CEPH_MMDSSCRUB_H
+
+#include "messages/MMDSOp.h"
+
+#include "include/types.h"
+#include "include/frag.h"
+
+class MMDSScrub : public MMDSOp {
+public:
+ static constexpr int OP_QUEUEDIR = 1;
+ static constexpr int OP_QUEUEDIR_ACK = -1;
+ static constexpr int OP_QUEUEINO = 2;
+ static constexpr int OP_QUEUEINO_ACK = -2;
+
+ static const char *get_opname(int o) {
+ switch (o) {
+ case OP_QUEUEDIR: return "queue_dir";
+ case OP_QUEUEDIR_ACK: return "queue_dir_ack";
+ case OP_QUEUEINO: return "queue_ino";
+ case OP_QUEUEINO_ACK: return "queue_ino_ack";
+ default: ceph_abort(); return nullptr;
+ }
+ }
+
+ std::string_view get_type_name() const override { return "mds_scrub"; }
+
+ void print(std::ostream& out) const override {
+ out << "mds_scrub(" << get_opname(op) << " "
+ << ino << " " << frags << " " << tag;
+ if (is_force()) out << " force";
+ if (is_recursive()) out << " recursive";
+ if (is_repair()) out << " repair";
+ out << ")";
+ }
+ void encode_payload(uint64_t features) override {
+ using ceph::encode;
+ encode(op, payload);
+ encode(ino, payload);
+ encode(frags, payload);
+ encode(tag, payload);
+ encode(flags, payload);
+ }
+ void decode_payload() override {
+ using ceph::decode;
+ auto p = payload.cbegin();
+ decode(op, p);
+ decode(ino, p);
+ decode(frags, p);
+ decode(tag, p);
+ decode(flags, p);
+ }
+ inodeno_t get_ino() const {
+ return ino;
+ }
+ const fragset_t& get_frags() const {
+ return frags;
+ }
+ const std::string& get_tag() const {
+ return tag;
+ }
+ int get_op() const {
+ return op;
+ }
+ bool is_internal_tag() const {
+ return flags & FLAG_INTERNAL_TAG;
+ }
+ bool is_force() const {
+ return flags & FLAG_FORCE;
+ }
+ bool is_recursive() const {
+ return flags & FLAG_RECURSIVE;
+ }
+ bool is_repair() const {
+ return flags & FLAG_REPAIR;
+ }
+
+protected:
+ static constexpr int HEAD_VERSION = 1;
+ static constexpr int COMPAT_VERSION = 1;
+
+ MMDSScrub() : SafeMessage(MSG_MDS_SCRUB, HEAD_VERSION, COMPAT_VERSION) {}
+ MMDSScrub(int o, inodeno_t i, fragset_t&& _frags, std::string_view _tag,
+ bool internal_tag=false, bool force=false,
+ bool recursive=false, bool repair=false)
+ : SafeMessage(MSG_MDS_SCRUB, HEAD_VERSION, COMPAT_VERSION),
+ op(o), ino(i), frags(std::move(_frags)), tag(_tag) {
+ if (internal_tag) flags |= FLAG_INTERNAL_TAG;
+ if (force) flags |= FLAG_FORCE;
+ if (recursive) flags |= FLAG_RECURSIVE;
+ if (repair) flags |= FLAG_REPAIR;
+ }
+
+ ~MMDSScrub() override {}
+private:
+ template<class T, typename... Args>
+ friend boost::intrusive_ptr<T> ceph::make_message(Args&&... args);
+
+ static constexpr unsigned FLAG_INTERNAL_TAG = 1<<0;
+ static constexpr unsigned FLAG_FORCE = 1<<1;
+ static constexpr unsigned FLAG_RECURSIVE = 1<<2;
+ static constexpr unsigned FLAG_REPAIR = 1<<3;
+
+ int op;
+ inodeno_t ino;
+ fragset_t frags;
+ std::string tag;
+ unsigned flags = 0;
+};
+#endif // CEPH_MMDSSCRUB_H