#include "messages/MMDSSlaveRequest.h"
#include "messages/MMDSFragmentNotify.h"
+#include "messages/MMDSFragmentNotifyAck.h"
#include "messages/MGatherCaps.h"
p != fragments.end(); ) {
dirfrag_t df = p->first;
fragment_info_t& info = p->second;
- ++p;
- if (info.is_fragmenting())
+
+ if (info.is_fragmenting()) {
+ if (info.notify_ack_waiting.erase(who) &&
+ info.notify_ack_waiting.empty()) {
+ fragment_drop_locks(info);
+ fragment_maybe_finish(p++);
+ } else {
+ ++p;
+ }
continue;
+ }
+
+ ++p;
dout(10) << "cancelling fragment " << df << " bit " << info.bits << dendl;
list<CDir*> dirs;
info.dirs.swap(dirs);
case MSG_MDS_FRAGMENTNOTIFY:
handle_fragment_notify(static_cast<MMDSFragmentNotify*>(m));
break;
+ case MSG_MDS_FRAGMENTNOTIFYACK:
+ handle_fragment_notify_ack(static_cast<MMDSFragmentNotifyAck*>(m));
+ break;
case MSG_MDS_FINDINO:
handle_find_ino(static_cast<MMDSFindIno *>(m));
class C_MDC_FragmentCommit : public MDCacheLogContext {
dirfrag_t basedirfrag;
- list<CDir*> resultfrags;
+ MDRequestRef mdr;
public:
- C_MDC_FragmentCommit(MDCache *m, dirfrag_t df, list<CDir*>& l) :
- MDCacheLogContext(m), basedirfrag(df), resultfrags(l) {}
+ C_MDC_FragmentCommit(MDCache *m, dirfrag_t df, const MDRequestRef& r) :
+ MDCacheLogContext(m), basedirfrag(df), mdr(r) {}
void finish(int r) override {
- mdcache->_fragment_committed(basedirfrag, resultfrags);
+ mdcache->_fragment_committed(basedirfrag, mdr);
}
};
-class C_IO_MDC_FragmentFinish : public MDCacheIOContext {
+class C_IO_MDC_FragmentPurgeOld : public MDCacheIOContext {
dirfrag_t basedirfrag;
- list<CDir*> resultfrags;
+ int bits;
+ MDRequestRef mdr;
public:
- C_IO_MDC_FragmentFinish(MDCache *m, dirfrag_t f, list<CDir*>& l) :
- MDCacheIOContext(m), basedirfrag(f) {
- resultfrags.swap(l);
- }
+ C_IO_MDC_FragmentPurgeOld(MDCache *m, dirfrag_t f, int b,
+ const MDRequestRef& r) :
+ MDCacheIOContext(m), basedirfrag(f), bits(b), mdr(r) {}
void finish(int r) override {
assert(r == 0 || r == -ENOENT);
- mdcache->_fragment_finish(basedirfrag, resultfrags);
+ mdcache->_fragment_old_purged(basedirfrag, bits, mdr);
}
void print(ostream& out) const override {
- out << "dirfrags_commit(" << basedirfrag << ")";
+ out << "fragment_purge_old(" << basedirfrag << ")";
}
};
void MDCache::_fragment_logged(MDRequestRef& mdr)
{
dirfrag_t basedirfrag = mdr->more()->fragment_base;
- map<dirfrag_t,fragment_info_t>::iterator it = fragments.find(basedirfrag);
- assert(it != fragments.end());
- fragment_info_t &info = it->second;
+ auto& info = fragments.at(basedirfrag);
CInode *diri = info.resultfrags.front()->get_inode();
dout(10) << "fragment_logged " << basedirfrag << " bits " << info.bits
<< " on " << *diri << dendl;
+ mdr->mark_event("prepare logged");
if (diri->is_auth())
diri->pop_and_dirty_projected_inode(mdr->ls);
void MDCache::_fragment_stored(MDRequestRef& mdr)
{
dirfrag_t basedirfrag = mdr->more()->fragment_base;
- map<dirfrag_t,fragment_info_t>::iterator it = fragments.find(basedirfrag);
- assert(it != fragments.end());
- fragment_info_t &info = it->second;
- CInode *diri = info.resultfrags.front()->get_inode();
+ fragment_info_t &info = fragments.at(basedirfrag);
+ CDir *first = info.resultfrags.front();
+ CInode *diri = first->get_inode();
dout(10) << "fragment_stored " << basedirfrag << " bits " << info.bits
<< " on " << *diri << dendl;
+ mdr->mark_event("new frags stored");
// tell peers
- CDir *first = *info.resultfrags.begin();
+ mds_rank_t diri_auth = (first->is_subtree_root() && !diri->is_auth()) ?
+ diri->authority().first : CDIR_AUTH_UNKNOWN;
for (const auto &p : first->get_replicas()) {
if (mds->mdsmap->get_state(p.first) < MDSMap::STATE_REJOIN ||
(mds->mdsmap->get_state(p.first) == MDSMap::STATE_REJOIN &&
rejoin_gather.count(p.first)))
continue;
- MMDSFragmentNotify *notify = new MMDSFragmentNotify(basedirfrag, info.bits);
+ auto notify = new MMDSFragmentNotify(basedirfrag, info.bits, mdr->reqid.tid);
+ if (diri_auth != CDIR_AUTH_UNKNOWN && // subtree root
+ diri_auth != p.first) { // not auth mds of diri
+ /*
+ * In the nornal case, mds does not trim dir inode whose child dirfrags
+ * are likely being fragmented (see trim_inode()). But when fragmenting
+ * subtree roots, following race can happen:
+ *
+ * - mds.a (auth mds of dirfrag) sends fragment_notify message to
+ * mds.c and drops wrlock on dirfragtreelock.
+ * - mds.b (auth mds of dir inode) changes dirfragtreelock state to
+ * SYNC and send lock message mds.c
+ * - mds.c receives the lock message and changes dirfragtreelock state
+ * to SYNC
+ * - mds.c trim dirfrag and dir inode from its cache
+ * - mds.c receives the fragment_notify message
+ *
+ * So we need to ensure replicas have received the notify, then unlock
+ * the dirfragtreelock.
+ */
+ notify->mark_ack_wanted();
+ info.notify_ack_waiting.insert(p.first);
+ }
// freshly replicate new dirs to peers
for (list<CDir*>::iterator q = info.resultfrags.begin();
// journal commit
EFragment *le = new EFragment(mds->mdlog, EFragment::OP_COMMIT, basedirfrag, info.bits);
- mds->mdlog->start_submit_entry(le, new C_MDC_FragmentCommit(this, basedirfrag,
- info.resultfrags));
+ mds->mdlog->start_submit_entry(le, new C_MDC_FragmentCommit(this, basedirfrag, mdr));
- mds->locker->drop_locks(mdr.get());
// unfreeze resulting frags
for (list<CDir*>::iterator p = info.resultfrags.begin();
dir->unfreeze_dir();
}
- fragments.erase(it);
- request_finish(mdr);
+ if (info.notify_ack_waiting.empty()) {
+ fragment_drop_locks(info);
+ } else {
+ mds->locker->drop_locks_for_fragment_unfreeze(mdr.get());
+ }
}
-void MDCache::_fragment_committed(dirfrag_t basedirfrag, list<CDir*>& resultfrags)
+void MDCache::_fragment_committed(dirfrag_t basedirfrag, const MDRequestRef& mdr)
{
dout(10) << "fragment_committed " << basedirfrag << dendl;
- map<dirfrag_t, ufragment>::iterator it = uncommitted_fragments.find(basedirfrag);
- assert(it != uncommitted_fragments.end());
- ufragment &uf = it->second;
+ if (mdr)
+ mdr->mark_event("commit logged");
+
+ ufragment &uf = uncommitted_fragments.at(basedirfrag);
// remove old frags
C_GatherBuilder gather(
g_ceph_context,
new C_OnFinisher(
- new C_IO_MDC_FragmentFinish(this, basedirfrag, resultfrags),
+ new C_IO_MDC_FragmentPurgeOld(this, basedirfrag, uf.bits, mdr),
mds->finisher));
SnapContext nullsnapc;
gather.activate();
}
-void MDCache::_fragment_finish(dirfrag_t basedirfrag, list<CDir*>& resultfrags)
+void MDCache::_fragment_old_purged(dirfrag_t basedirfrag, int bits, const MDRequestRef& mdr)
{
- dout(10) << "fragment_finish " << basedirfrag << "resultfrags.size="
- << resultfrags.size() << dendl;
- map<dirfrag_t, ufragment>::iterator it = uncommitted_fragments.find(basedirfrag);
- assert(it != uncommitted_fragments.end());
- ufragment &uf = it->second;
+ dout(10) << "fragment_old_purged " << basedirfrag << dendl;
+ if (mdr)
+ mdr->mark_event("old frags purged");
+
+ EFragment *le = new EFragment(mds->mdlog, EFragment::OP_FINISH, basedirfrag, bits);
+ mds->mdlog->start_submit_entry(le);
+
+ finish_uncommitted_fragment(basedirfrag, EFragment::OP_FINISH);
+
+ if (mds->logger) {
+ if (bits > 0) {
+ mds->logger->inc(l_mds_dir_split);
+ } else {
+ mds->logger->inc(l_mds_dir_merge);
+ }
+ }
+
+ if (mdr) {
+ auto it = fragments.find(basedirfrag);
+ ceph_assert(it != fragments.end());
+ it->second.finishing = true;
+ if (it->second.notify_ack_waiting.empty())
+ fragment_maybe_finish(it);
+ else
+ mdr->mark_event("wating for notify acks");
+ }
+}
+
+void MDCache::fragment_drop_locks(fragment_info_t& info)
+{
+ mds->locker->drop_locks(info.mdr.get());
+ request_finish(info.mdr);
+ //info.mdr.reset();
+}
+
+void MDCache::fragment_maybe_finish(const fragment_info_iterator& it)
+{
+ if (!it->second.finishing)
+ return;
// unmark & auth_unpin
- for (const auto &dir : resultfrags) {
+ for (const auto &dir : it->second.resultfrags) {
dir->state_clear(CDir::STATE_FRAGMENTING);
dir->auth_unpin(this);
mds->balancer->maybe_fragment(dir, false);
}
- if (mds->logger) {
- if (resultfrags.size() > 1) {
- mds->logger->inc(l_mds_dir_split);
- } else {
- mds->logger->inc(l_mds_dir_merge);
- }
+ fragments.erase(it);
+}
+
+
+void MDCache::handle_fragment_notify_ack(MMDSFragmentNotifyAck *ack)
+{
+ dout(10) << "handle_fragment_notify_ack " << *ack << " from " << ack->get_source() << dendl;
+ mds_rank_t from = mds_rank_t(ack->get_source().num());
+
+ if (mds->get_state() < MDSMap::STATE_ACTIVE) {
+ ack->put();
+ return;
}
- EFragment *le = new EFragment(mds->mdlog, EFragment::OP_FINISH, basedirfrag, uf.bits);
- mds->mdlog->start_submit_entry(le);
+ auto it = fragments.find(ack->get_base_dirfrag());
+ if (it == fragments.end() ||
+ it->second.get_tid() != ack->get_tid()) {
+ dout(10) << "handle_fragment_notify_ack obsolete message, dropping" << dendl;
+ ack->put();
+ return;
+ }
- finish_uncommitted_fragment(basedirfrag, EFragment::OP_FINISH);
+ if (it->second.notify_ack_waiting.erase(from) &&
+ it->second.notify_ack_waiting.empty()) {
+ fragment_drop_locks(it->second);
+ fragment_maybe_finish(it);
+ }
+ ack->put();
}
/* This function DOES put the passed message before returning */
void MDCache::handle_fragment_notify(MMDSFragmentNotify *notify)
{
dout(10) << "handle_fragment_notify " << *notify << " from " << notify->get_source() << dendl;
+ mds_rank_t from = mds_rank_t(notify->get_source().num());
if (mds->get_state() < MDSMap::STATE_REJOIN) {
notify->put();
// add new replica dirs values
bufferlist::iterator p = notify->basebl.begin();
while (!p.end())
- add_replica_dir(p, diri, mds_rank_t(notify->get_source().num()), waiters);
+ add_replica_dir(p, diri, from, waiters);
mds->queue_waiters(waiters);
} else {
ceph_abort();
}
+ if (notify->is_ack_wanted()) {
+ auto ack = new MMDSFragmentNotifyAck(notify->get_base_dirfrag(),
+ notify->get_bits(), notify->get_tid());
+ mds->send_message_mds(ack, from);
+ }
notify->put();
}
assert(diri);
if (uf.committed) {
- list<CDir*> frags;
- diri->get_dirfrags_under(p->first.frag, frags);
- for (list<CDir*>::iterator q = frags.begin(); q != frags.end(); ++q) {
- CDir *dir = *q;
- dir->auth_pin(this);
- dir->state_set(CDir::STATE_FRAGMENTING);
- }
- _fragment_committed(p->first, frags);
+ _fragment_committed(p->first, MDRequestRef());
continue;
}
for (list<frag_t>::iterator q = old_frags.begin(); q != old_frags.end(); ++q)
assert(!diri->dirfragtree.is_leaf(*q));
- for (list<CDir*>::iterator q = resultfrags.begin(); q != resultfrags.end(); ++q) {
- CDir *dir = *q;
- dir->auth_pin(this);
- dir->state_set(CDir::STATE_FRAGMENTING);
- }
-
mds->mdlog->submit_entry(le);
uf.old_frags.swap(old_frags);
- _fragment_committed(p->first, resultfrags);
+ _fragment_committed(p->first, MDRequestRef());
}
}
struct MClientSnap;
class MMDSFragmentNotify;
+class MMDSFragmentNotifyAck;
class ESubtreeMap;
list<CDir*> dirs;
list<CDir*> resultfrags;
MDRequestRef mdr;
+ set<mds_rank_t> notify_ack_waiting;
+ bool finishing = false;
+
// for deadlock detection
- bool all_frozen;
+ bool all_frozen = false;
utime_t last_cum_auth_pins_change;
- int last_cum_auth_pins;
- int num_remote_waiters; // number of remote authpin waiters
- fragment_info_t() : bits(0), all_frozen(false), last_cum_auth_pins(0), num_remote_waiters(0) {}
+ int last_cum_auth_pins = 0;
+ int num_remote_waiters = 0; // number of remote authpin waiters
+ fragment_info_t() {}
bool is_fragmenting() { return !resultfrags.empty(); }
+ uint64_t get_tid() { return mdr ? mdr->reqid.tid : 0; }
};
map<dirfrag_t,fragment_info_t> fragments;
+ typedef map<dirfrag_t,fragment_info_t>::iterator fragment_info_iterator;
void adjust_dir_fragments(CInode *diri, frag_t basefrag, int bits,
list<CDir*>& frags, list<MDSInternalContextBase*>& waiters, bool replay);
void fragment_mark_and_complete(MDRequestRef& mdr);
void fragment_frozen(MDRequestRef& mdr, int r);
void fragment_unmark_unfreeze_dirs(list<CDir*>& dirs);
+ void fragment_drop_locks(fragment_info_t &info);
+ void fragment_maybe_finish(const fragment_info_iterator& it);
void dispatch_fragment_dir(MDRequestRef& mdr);
void _fragment_logged(MDRequestRef& mdr);
void _fragment_stored(MDRequestRef& mdr);
- void _fragment_committed(dirfrag_t f, list<CDir*>& resultfrags);
- void _fragment_finish(dirfrag_t f, list<CDir*>& resultfrags);
+ void _fragment_committed(dirfrag_t f, const MDRequestRef& mdr);
+ void _fragment_old_purged(dirfrag_t f, int bits, const MDRequestRef& mdr);
friend class EFragment;
friend class C_MDC_FragmentFrozen;
friend class C_MDC_FragmentPrep;
friend class C_MDC_FragmentStore;
friend class C_MDC_FragmentCommit;
- friend class C_IO_MDC_FragmentFinish;
+ friend class C_IO_MDC_FragmentPurgeOld;
void handle_fragment_notify(MMDSFragmentNotify *m);
+ void handle_fragment_notify_ack(MMDSFragmentNotifyAck *m);
void add_uncommitted_fragment(dirfrag_t basedirfrag, int bits, list<frag_t>& old_frag,
LogSegment *ls, bufferlist *rollback=NULL);
#include "msg/Message.h"
class MMDSFragmentNotify : public Message {
- inodeno_t ino;
- frag_t basefrag;
- int8_t bits;
+ static constexpr int HEAD_VERSION = 2;
+ static constexpr int COMPAT_VERSION = 1;
+
+ dirfrag_t base_dirfrag;
+ int8_t bits = 0;
+ bool ack_wanted = false;
public:
- inodeno_t get_ino() { return ino; }
- frag_t get_basefrag() { return basefrag; }
+ inodeno_t get_ino() { return base_dirfrag.ino; }
+ frag_t get_basefrag() { return base_dirfrag.frag; }
+ dirfrag_t get_base_dirfrag() const { return base_dirfrag; }
int get_bits() { return bits; }
+ bool is_ack_wanted() const { return ack_wanted; }
+ void mark_ack_wanted() { ack_wanted = true; }
bufferlist basebl;
- MMDSFragmentNotify() : Message(MSG_MDS_FRAGMENTNOTIFY) {}
- MMDSFragmentNotify(dirfrag_t df, int b) :
- Message(MSG_MDS_FRAGMENTNOTIFY),
- ino(df.ino), basefrag(df.frag), bits(b) { }
+ MMDSFragmentNotify() :
+ Message(MSG_MDS_FRAGMENTNOTIFY, HEAD_VERSION, COMPAT_VERSION) {}
+ MMDSFragmentNotify(dirfrag_t df, int b, uint64_t tid) :
+ Message(MSG_MDS_FRAGMENTNOTIFY, HEAD_VERSION, COMPAT_VERSION),
+ base_dirfrag(df), bits(b) {
+ set_tid(tid);
+ }
private:
~MMDSFragmentNotify() override {}
public:
const char *get_type_name() const override { return "fragment_notify"; }
void print(ostream& o) const override {
- o << "fragment_notify(" << ino << "." << basefrag
- << " " << (int)bits << ")";
+ o << "fragment_notify(" << base_dirfrag << " " << (int)bits << ")";
}
void encode_payload(uint64_t features) override {
- ::encode(ino, payload);
- ::encode(basefrag, payload);
+ ::encode(base_dirfrag, payload);
::encode(bits, payload);
::encode(basebl, payload);
+ ::encode(ack_wanted, payload);
}
void decode_payload() override {
bufferlist::iterator p = payload.begin();
- ::decode(ino, p);
- ::decode(basefrag, p);
+ ::decode(base_dirfrag, p);
::decode(bits, p);
::decode(basebl, p);
+ if (header.version >= 2)
+ ::decode(ack_wanted, p);
}
};