From: Yan, Zheng Date: Tue, 8 Apr 2014 08:11:03 +0000 (+0800) Subject: mds: fix shared_ptr MDRequest bugs X-Git-Tag: v0.80-rc1~82^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=refs%2Fpull%2F1623%2Fhead;p=ceph.git mds: fix shared_ptr MDRequest bugs The main change is use shared_ptr instead of weak_ptr to define active request map. The reason is that slave request needs to be preserved until master explicitly finishes it. Fixes: #8026 Signed-off-by: Yan, Zheng --- diff --git a/src/mds/MDCache.cc b/src/mds/MDCache.cc index 22b4875d1b2..362f0f61dbf 100644 --- a/src/mds/MDCache.cc +++ b/src/mds/MDCache.cc @@ -2494,21 +2494,20 @@ void MDCache::send_slave_resolves() } else { set resolve_set; mds->mdsmap->get_mds_set(resolve_set, MDSMap::STATE_RESOLVE); - for (ceph::unordered_map >::iterator p = active_requests.begin(); + for (ceph::unordered_map::iterator p = active_requests.begin(); p != active_requests.end(); ++p) { - MDRequestRef amdr(p->second.lock()); - assert(amdr); - if (!amdr->is_slave() || !amdr->slave_did_prepare()) + MDRequestRef& mdr = p->second; + if (!mdr->is_slave() || !mdr->slave_did_prepare()) continue; - int master = amdr->slave_to_mds; + int master = mdr->slave_to_mds; if (resolve_set.count(master) || is_ambiguous_slave_update(p->first, master)) { - dout(10) << " including uncommitted " << *amdr << dendl; + dout(10) << " including uncommitted " << *mdr << dendl; if (!resolves.count(master)) resolves[master] = new MMDSResolve; - if (amdr->has_more() && amdr->more()->is_inode_exporter) { + if (mdr->has_more() && mdr->more()->is_inode_exporter) { // re-send cap exports - CInode *in = amdr->more()->rename_inode; + CInode *in = mdr->more()->rename_inode; map cap_map; in->export_client_caps(cap_map); bufferlist bl; @@ -2650,95 +2649,94 @@ void MDCache::handle_mds_failure(int who) // clean up any requests slave to/from this node list finish; - for (ceph::unordered_map >::iterator p = active_requests.begin(); + for (ceph::unordered_map::iterator p = active_requests.begin(); p != active_requests.end(); ++p) { - MDRequestRef amdr(p->second.lock()); - assert(amdr); + MDRequestRef& mdr = p->second;; // slave to the failed node? - if (amdr->slave_to_mds == who) { - if (amdr->slave_did_prepare()) { - dout(10) << " slave request " << *amdr << " uncommitted, will resolve shortly" << dendl; - if (!amdr->more()->waiting_on_slave.empty()) { - assert(amdr->more()->srcdn_auth_mds == mds->get_nodeid()); + if (mdr->slave_to_mds == who) { + if (mdr->slave_did_prepare()) { + dout(10) << " slave request " << *mdr << " uncommitted, will resolve shortly" << dendl; + if (!mdr->more()->waiting_on_slave.empty()) { + assert(mdr->more()->srcdn_auth_mds == mds->get_nodeid()); // will rollback, no need to wait - if (amdr->slave_request) { - amdr->slave_request->put(); - amdr->slave_request = 0; + if (mdr->slave_request) { + mdr->slave_request->put(); + mdr->slave_request = 0; } - amdr->more()->waiting_on_slave.clear(); + mdr->more()->waiting_on_slave.clear(); } } else { - dout(10) << " slave request " << *amdr << " has no prepare, finishing up" << dendl; - if (amdr->slave_request) - amdr->aborted = true; + dout(10) << " slave request " << *mdr << " has no prepare, finishing up" << dendl; + if (mdr->slave_request) + mdr->aborted = true; else - finish.push_back(amdr); + finish.push_back(mdr); } } - if (amdr->is_slave() && amdr->slave_did_prepare()) { - if (amdr->more()->waiting_on_slave.count(who)) { - assert(amdr->more()->srcdn_auth_mds == mds->get_nodeid()); - dout(10) << " slave request " << *amdr << " no longer need rename notity ack from mds." + if (mdr->is_slave() && mdr->slave_did_prepare()) { + if (mdr->more()->waiting_on_slave.count(who)) { + assert(mdr->more()->srcdn_auth_mds == mds->get_nodeid()); + dout(10) << " slave request " << *mdr << " no longer need rename notity ack from mds." << who << dendl; - amdr->more()->waiting_on_slave.erase(who); - if (amdr->more()->waiting_on_slave.empty() && amdr->slave_request) - mds->queue_waiter(new C_MDS_RetryRequest(this, amdr)); + mdr->more()->waiting_on_slave.erase(who); + if (mdr->more()->waiting_on_slave.empty() && mdr->slave_request) + mds->queue_waiter(new C_MDS_RetryRequest(this, mdr)); } - if (amdr->more()->srcdn_auth_mds == who && - mds->mdsmap->is_clientreplay_or_active_or_stopping(amdr->slave_to_mds)) { + if (mdr->more()->srcdn_auth_mds == who && + mds->mdsmap->is_clientreplay_or_active_or_stopping(mdr->slave_to_mds)) { // rename srcdn's auth mds failed, resolve even I'm a survivor. - dout(10) << " slave request " << *amdr << " uncommitted, will resolve shortly" << dendl; - add_ambiguous_slave_update(p->first, amdr->slave_to_mds); + dout(10) << " slave request " << *mdr << " uncommitted, will resolve shortly" << dendl; + add_ambiguous_slave_update(p->first, mdr->slave_to_mds); } } // failed node is slave? - if (amdr->is_master() && !amdr->committing) { - if (amdr->more()->srcdn_auth_mds == who) { - dout(10) << " master request " << *amdr << " waiting for rename srcdn's auth mds." + if (mdr->is_master() && !mdr->committing) { + if (mdr->more()->srcdn_auth_mds == who) { + dout(10) << " master request " << *mdr << " waiting for rename srcdn's auth mds." << who << " to recover" << dendl; - assert(amdr->more()->witnessed.count(who) == 0); - if (amdr->more()->is_ambiguous_auth) - amdr->clear_ambiguous_auth(); + assert(mdr->more()->witnessed.count(who) == 0); + if (mdr->more()->is_ambiguous_auth) + mdr->clear_ambiguous_auth(); // rename srcdn's auth mds failed, all witnesses will rollback - amdr->more()->witnessed.clear(); + mdr->more()->witnessed.clear(); pending_masters.erase(p->first); } - if (amdr->more()->witnessed.count(who)) { - int srcdn_auth = amdr->more()->srcdn_auth_mds; - if (srcdn_auth >= 0 && amdr->more()->waiting_on_slave.count(srcdn_auth)) { - dout(10) << " master request " << *amdr << " waiting for rename srcdn's auth mds." - << amdr->more()->srcdn_auth_mds << " to reply" << dendl; + if (mdr->more()->witnessed.count(who)) { + int srcdn_auth = mdr->more()->srcdn_auth_mds; + if (srcdn_auth >= 0 && mdr->more()->waiting_on_slave.count(srcdn_auth)) { + dout(10) << " master request " << *mdr << " waiting for rename srcdn's auth mds." + << mdr->more()->srcdn_auth_mds << " to reply" << dendl; // waiting for the slave (rename srcdn's auth mds), delay sending resolve ack // until either the request is committing or the slave also fails. - assert(amdr->more()->waiting_on_slave.size() == 1); + assert(mdr->more()->waiting_on_slave.size() == 1); pending_masters.insert(p->first); } else { - dout(10) << " master request " << *amdr << " no longer witnessed by slave mds." + dout(10) << " master request " << *mdr << " no longer witnessed by slave mds." << who << " to recover" << dendl; if (srcdn_auth >= 0) - assert(amdr->more()->witnessed.count(srcdn_auth) == 0); + assert(mdr->more()->witnessed.count(srcdn_auth) == 0); // discard this peer's prepare (if any) - amdr->more()->witnessed.erase(who); + mdr->more()->witnessed.erase(who); } } - if (amdr->more()->waiting_on_slave.count(who)) { - dout(10) << " master request " << *amdr << " waiting for slave mds." << who + if (mdr->more()->waiting_on_slave.count(who)) { + dout(10) << " master request " << *mdr << " waiting for slave mds." << who << " to recover" << dendl; // retry request when peer recovers - amdr->more()->waiting_on_slave.erase(who); - if (amdr->more()->waiting_on_slave.empty()) - mds->wait_for_active_peer(who, new C_MDS_RetryRequest(this, amdr)); + mdr->more()->waiting_on_slave.erase(who); + if (mdr->more()->waiting_on_slave.empty()) + mds->wait_for_active_peer(who, new C_MDS_RetryRequest(this, mdr)); } - if (amdr->locking && amdr->locking_target_mds == who) - amdr->finish_locking(amdr->locking); + if (mdr->locking && mdr->locking_target_mds == who) + mdr->finish_locking(mdr->locking); } } @@ -3702,71 +3700,70 @@ void MDCache::rejoin_send_rejoins() if (!mds->is_rejoin()) { // i am survivor. send strong rejoin. // note request remote_auth_pins, xlocks - for (ceph::unordered_map >::iterator p = active_requests.begin(); + for (ceph::unordered_map::iterator p = active_requests.begin(); p != active_requests.end(); ++p) { - MDRequestRef amdr(p->second.lock()); - assert(amdr); - if (amdr->is_slave()) + MDRequestRef& mdr = p->second; + if (mdr->is_slave()) continue; // auth pins - for (set::iterator q = amdr->remote_auth_pins.begin(); - q != amdr->remote_auth_pins.end(); + for (set::iterator q = mdr->remote_auth_pins.begin(); + q != mdr->remote_auth_pins.end(); ++q) { if (!(*q)->is_auth()) { int who = (*q)->authority().first; if (rejoins.count(who) == 0) continue; MMDSCacheRejoin *rejoin = rejoins[who]; - dout(15) << " " << *amdr << " authpin on " << **q << dendl; + dout(15) << " " << *mdr << " authpin on " << **q << dendl; MDSCacheObjectInfo i; (*q)->set_object_info(i); if (i.ino) - rejoin->add_inode_authpin(vinodeno_t(i.ino, i.snapid), amdr->reqid, amdr->attempt); + rejoin->add_inode_authpin(vinodeno_t(i.ino, i.snapid), mdr->reqid, mdr->attempt); else - rejoin->add_dentry_authpin(i.dirfrag, i.dname, i.snapid, amdr->reqid, amdr->attempt); + rejoin->add_dentry_authpin(i.dirfrag, i.dname, i.snapid, mdr->reqid, mdr->attempt); - if (amdr->has_more() && amdr->more()->is_remote_frozen_authpin && - amdr->more()->rename_inode == (*q)) + if (mdr->has_more() && mdr->more()->is_remote_frozen_authpin && + mdr->more()->rename_inode == (*q)) rejoin->add_inode_frozen_authpin(vinodeno_t(i.ino, i.snapid), - amdr->reqid, amdr->attempt); + mdr->reqid, mdr->attempt); } } // xlocks - for (set::iterator q = amdr->xlocks.begin(); - q != amdr->xlocks.end(); + for (set::iterator q = mdr->xlocks.begin(); + q != mdr->xlocks.end(); ++q) { if (!(*q)->get_parent()->is_auth()) { int who = (*q)->get_parent()->authority().first; if (rejoins.count(who) == 0) continue; MMDSCacheRejoin *rejoin = rejoins[who]; - dout(15) << " " << *amdr << " xlock on " << **q << " " << *(*q)->get_parent() << dendl; + dout(15) << " " << *mdr << " xlock on " << **q << " " << *(*q)->get_parent() << dendl; MDSCacheObjectInfo i; (*q)->get_parent()->set_object_info(i); if (i.ino) rejoin->add_inode_xlock(vinodeno_t(i.ino, i.snapid), (*q)->get_type(), - amdr->reqid, amdr->attempt); + mdr->reqid, mdr->attempt); else rejoin->add_dentry_xlock(i.dirfrag, i.dname, i.snapid, - amdr->reqid, amdr->attempt); + mdr->reqid, mdr->attempt); } } // remote wrlocks - for (map::iterator q = amdr->remote_wrlocks.begin(); - q != amdr->remote_wrlocks.end(); + for (map::iterator q = mdr->remote_wrlocks.begin(); + q != mdr->remote_wrlocks.end(); ++q) { int who = q->second; if (rejoins.count(who) == 0) continue; MMDSCacheRejoin *rejoin = rejoins[who]; - dout(15) << " " << *amdr << " wrlock on " << q->second + dout(15) << " " << *mdr << " wrlock on " << q->second << " " << q->first->get_parent() << dendl; MDSCacheObjectInfo i; q->first->get_parent()->set_object_info(i); assert(i.ino); rejoin->add_inode_wrlock(vinodeno_t(i.ino, i.snapid), q->first->get_type(), - amdr->reqid, amdr->attempt); + mdr->reqid, mdr->attempt); } } } @@ -8839,12 +8836,11 @@ void MDCache::kick_find_ino_peers(int who) int MDCache::get_num_client_requests() { int count = 0; - for (ceph::unordered_map >::iterator p = active_requests.begin(); + for (ceph::unordered_map::iterator p = active_requests.begin(); p != active_requests.end(); ++p) { - MDRequestRef amdr(p->second.lock()); - assert(amdr); - if (amdr->reqid.name.is_client() && !amdr->is_slave()) + MDRequestRef& mdr = p->second; + if (mdr->reqid.name.is_client() && !mdr->is_slave()) count++; } return count; @@ -8855,7 +8851,7 @@ MDRequestRef MDCache::request_start(MClientRequest *req) { // did we win a forward race against a slave? if (active_requests.count(req->get_reqid())) { - MDRequestRef mdr = active_requests[req->get_reqid()].lock(); + MDRequestRef& mdr = active_requests[req->get_reqid()]; assert(mdr); if (mdr->is_slave()) { dout(10) << "request_start already had " << *mdr << ", waiting for finish" << dendl; @@ -8870,7 +8866,6 @@ MDRequestRef MDCache::request_start(MClientRequest *req) // register new client request MDRequestRef mdr(new MDRequestImpl(req->get_reqid(), req->get_num_fwd(), req)); - mdr->set_self_ref(mdr); active_requests[req->get_reqid()] = mdr; dout(7) << "request_start " << *mdr << dendl; return mdr; @@ -8879,7 +8874,6 @@ MDRequestRef MDCache::request_start(MClientRequest *req) MDRequestRef MDCache::request_start_slave(metareqid_t ri, __u32 attempt, int by) { MDRequestRef mdr(new MDRequestImpl(ri, attempt, by)); - mdr->set_self_ref(mdr); assert(active_requests.count(mdr->reqid) == 0); active_requests[mdr->reqid] = mdr; dout(7) << "request_start_slave " << *mdr << " by mds." << by << dendl; @@ -8889,7 +8883,6 @@ MDRequestRef MDCache::request_start_slave(metareqid_t ri, __u32 attempt, int by) MDRequestRef MDCache::request_start_internal(int op) { MDRequestRef mdr(new MDRequestImpl); - mdr->set_self_ref(mdr); mdr->reqid.name = entity_name_t::MDS(mds->get_nodeid()); mdr->reqid.tid = mds->issue_tid(); mdr->internal_op = op; @@ -8900,13 +8893,12 @@ MDRequestRef MDCache::request_start_internal(int op) return mdr; } - MDRequestRef MDCache::request_get(metareqid_t rid) { - assert(active_requests.count(rid)); - MDRequestRef amdr(active_requests[rid]); - dout(7) << "request_get " << rid << " " << *amdr << dendl; - return amdr; + ceph::unordered_map::iterator p = active_requests.find(rid); + assert(p != active_requests.end()); + dout(7) << "request_get " << rid << " " << *p->second << dendl; + return p->second; } void MDCache::request_finish(MDRequestRef& mdr) diff --git a/src/mds/MDCache.h b/src/mds/MDCache.h index 13c3d5f72f0..c536d32f74b 100644 --- a/src/mds/MDCache.h +++ b/src/mds/MDCache.h @@ -240,7 +240,7 @@ protected: // -- requests -- protected: - ceph::unordered_map > active_requests; + ceph::unordered_map active_requests; public: int get_num_client_requests(); diff --git a/src/mds/Migrator.cc b/src/mds/Migrator.cc index e873bf95047..9f0e3d8bc43 100644 --- a/src/mds/Migrator.cc +++ b/src/mds/Migrator.cc @@ -2618,7 +2618,7 @@ void Migrator::import_finish(CDir *dir, bool notify, bool last) it->second.peer_exports.swap(peer_exports); // clear import state (we're done!) - MutationRef& mut = it->second.mut; + MutationRef mut = it->second.mut; import_state.erase(it); mds->mdlog->start_submit_entry(new EImportFinish(dir, true)); diff --git a/src/mds/Mutation.h b/src/mds/Mutation.h index 53243df739b..dbfbe7595b0 100644 --- a/src/mds/Mutation.h +++ b/src/mds/Mutation.h @@ -34,7 +34,6 @@ class MClientRequest; class MMDSSlaveRequest; struct MutationImpl { - ceph::weak_ptr self_ref; metareqid_t reqid; __u32 attempt; // which attempt for this request LogSegment *ls; // the log segment i'm committing to @@ -80,16 +79,14 @@ struct MutationImpl { list > dirty_cow_dentries; MutationImpl() - : self_ref(), - attempt(0), + : attempt(0), ls(0), slave_to_mds(-1), locking(NULL), locking_target_mds(-1), done_locking(false), committing(false), aborted(false), killed(false) { } MutationImpl(metareqid_t ri, __u32 att=0, int slave_to=-1) - : self_ref(), - reqid(ri), attempt(att), + : reqid(ri), attempt(att), ls(0), slave_to_mds(slave_to), locking(NULL), @@ -141,10 +138,6 @@ struct MutationImpl { virtual void print(ostream &out) { out << "mutation(" << this << ")"; } - - void set_self_ref(ceph::shared_ptr& ref) { - self_ref = ref; - } }; inline ostream& operator<<(ostream& out, MutationImpl &mut) @@ -314,9 +307,6 @@ struct MDRequestImpl : public MutationImpl { void clear_ambiguous_auth(); void print(ostream &out); - void set_self_ref(ceph::shared_ptr& ref) { - self_ref = ceph::static_pointer_cast(ref); - } }; typedef ceph::shared_ptr MDRequestRef; diff --git a/src/mds/Server.cc b/src/mds/Server.cc index 11172e18679..75f4612ebf5 100644 --- a/src/mds/Server.cc +++ b/src/mds/Server.cc @@ -548,8 +548,7 @@ void Server::journal_close_session(Session *session, int state) session->requests.begin(member_offset(MDRequestImpl, item_session_request)); while (!p.end()) { - MDRequestRef mdr = ceph::static_pointer_cast((*p)->self_ref.lock()); - assert(mdr); + MDRequestRef mdr = mdcache->request_get((*p)->reqid); ++p; mdcache->request_kill(mdr); }