]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: fix shared_ptr MDRequest bugs 1623/head
authorYan, Zheng <zheng.z.yan@intel.com>
Tue, 8 Apr 2014 08:11:03 +0000 (16:11 +0800)
committerYan, Zheng <zheng.z.yan@intel.com>
Tue, 8 Apr 2014 08:29:43 +0000 (16:29 +0800)
The main change is use shared_ptr instead of weak_ptr to define
active request map. The reason is that slave request needs to be
preserved until master explicitly finishes it.

Fixes: #8026
Signed-off-by: Yan, Zheng <zheng.z.yan@intel.com>
src/mds/MDCache.cc
src/mds/MDCache.h
src/mds/Migrator.cc
src/mds/Mutation.h
src/mds/Server.cc

index 22b4875d1b227a08bf1a89cb78a25d760554be52..362f0f61dbf26660a08ec8660f04315299b03464 100644 (file)
@@ -2494,21 +2494,20 @@ void MDCache::send_slave_resolves()
   } else {
     set<int> resolve_set;
     mds->mdsmap->get_mds_set(resolve_set, MDSMap::STATE_RESOLVE);
-    for (ceph::unordered_map<metareqid_t, ceph::weak_ptr<MDRequestImpl> >::iterator p = active_requests.begin();
+    for (ceph::unordered_map<metareqid_t, MDRequestRef>::iterator p = active_requests.begin();
         p != active_requests.end();
         ++p) {
-      MDRequestRef amdr(p->second.lock());
-      assert(amdr);
-      if (!amdr->is_slave() || !amdr->slave_did_prepare())
+      MDRequestRef& mdr = p->second;
+      if (!mdr->is_slave() || !mdr->slave_did_prepare())
        continue;
-      int master = amdr->slave_to_mds;
+      int master = mdr->slave_to_mds;
       if (resolve_set.count(master) || is_ambiguous_slave_update(p->first, master)) {
-       dout(10) << " including uncommitted " << *amdr << dendl;
+       dout(10) << " including uncommitted " << *mdr << dendl;
        if (!resolves.count(master))
          resolves[master] = new MMDSResolve;
-       if (amdr->has_more() && amdr->more()->is_inode_exporter) {
+       if (mdr->has_more() && mdr->more()->is_inode_exporter) {
          // re-send cap exports
-         CInode *in = amdr->more()->rename_inode;
+         CInode *in = mdr->more()->rename_inode;
          map<client_t, Capability::Export> cap_map;
          in->export_client_caps(cap_map);
          bufferlist bl;
@@ -2650,95 +2649,94 @@ void MDCache::handle_mds_failure(int who)
 
   // clean up any requests slave to/from this node
   list<MDRequestRef> finish;
-  for (ceph::unordered_map<metareqid_t, ceph::weak_ptr<MDRequestImpl> >::iterator p = active_requests.begin();
+  for (ceph::unordered_map<metareqid_t, MDRequestRef>::iterator p = active_requests.begin();
        p != active_requests.end();
        ++p) {
-    MDRequestRef amdr(p->second.lock());
-    assert(amdr);
+    MDRequestRef& mdr = p->second;;
     // slave to the failed node?
-    if (amdr->slave_to_mds == who) {
-      if (amdr->slave_did_prepare()) {
-       dout(10) << " slave request " << *amdr << " uncommitted, will resolve shortly" << dendl;
-       if (!amdr->more()->waiting_on_slave.empty()) {
-         assert(amdr->more()->srcdn_auth_mds == mds->get_nodeid());
+    if (mdr->slave_to_mds == who) {
+      if (mdr->slave_did_prepare()) {
+       dout(10) << " slave request " << *mdr << " uncommitted, will resolve shortly" << dendl;
+       if (!mdr->more()->waiting_on_slave.empty()) {
+         assert(mdr->more()->srcdn_auth_mds == mds->get_nodeid());
          // will rollback, no need to wait
-         if (amdr->slave_request) {
-           amdr->slave_request->put();
-           amdr->slave_request = 0;
+         if (mdr->slave_request) {
+           mdr->slave_request->put();
+           mdr->slave_request = 0;
          }
-         amdr->more()->waiting_on_slave.clear();
+         mdr->more()->waiting_on_slave.clear();
        }
       } else {
-       dout(10) << " slave request " << *amdr << " has no prepare, finishing up" << dendl;
-       if (amdr->slave_request)
-         amdr->aborted = true;
+       dout(10) << " slave request " << *mdr << " has no prepare, finishing up" << dendl;
+       if (mdr->slave_request)
+         mdr->aborted = true;
        else
-         finish.push_back(amdr);
+         finish.push_back(mdr);
       }
     }
 
-    if (amdr->is_slave() && amdr->slave_did_prepare()) {
-      if (amdr->more()->waiting_on_slave.count(who)) {
-       assert(amdr->more()->srcdn_auth_mds == mds->get_nodeid());
-       dout(10) << " slave request " << *amdr << " no longer need rename notity ack from mds."
+    if (mdr->is_slave() && mdr->slave_did_prepare()) {
+      if (mdr->more()->waiting_on_slave.count(who)) {
+       assert(mdr->more()->srcdn_auth_mds == mds->get_nodeid());
+       dout(10) << " slave request " << *mdr << " no longer need rename notity ack from mds."
                 << who << dendl;
-       amdr->more()->waiting_on_slave.erase(who);
-       if (amdr->more()->waiting_on_slave.empty() && amdr->slave_request)
-         mds->queue_waiter(new C_MDS_RetryRequest(this, amdr));
+       mdr->more()->waiting_on_slave.erase(who);
+       if (mdr->more()->waiting_on_slave.empty() && mdr->slave_request)
+         mds->queue_waiter(new C_MDS_RetryRequest(this, mdr));
       }
 
-      if (amdr->more()->srcdn_auth_mds == who &&
-         mds->mdsmap->is_clientreplay_or_active_or_stopping(amdr->slave_to_mds)) {
+      if (mdr->more()->srcdn_auth_mds == who &&
+         mds->mdsmap->is_clientreplay_or_active_or_stopping(mdr->slave_to_mds)) {
        // rename srcdn's auth mds failed, resolve even I'm a survivor.
-       dout(10) << " slave request " << *amdr << " uncommitted, will resolve shortly" << dendl;
-       add_ambiguous_slave_update(p->first, amdr->slave_to_mds);
+       dout(10) << " slave request " << *mdr << " uncommitted, will resolve shortly" << dendl;
+       add_ambiguous_slave_update(p->first, mdr->slave_to_mds);
       }
     }
     
     // failed node is slave?
-    if (amdr->is_master() && !amdr->committing) {
-      if (amdr->more()->srcdn_auth_mds == who) {
-       dout(10) << " master request " << *amdr << " waiting for rename srcdn's auth mds."
+    if (mdr->is_master() && !mdr->committing) {
+      if (mdr->more()->srcdn_auth_mds == who) {
+       dout(10) << " master request " << *mdr << " waiting for rename srcdn's auth mds."
                 << who << " to recover" << dendl;
-       assert(amdr->more()->witnessed.count(who) == 0);
-       if (amdr->more()->is_ambiguous_auth)
-         amdr->clear_ambiguous_auth();
+       assert(mdr->more()->witnessed.count(who) == 0);
+       if (mdr->more()->is_ambiguous_auth)
+         mdr->clear_ambiguous_auth();
        // rename srcdn's auth mds failed, all witnesses will rollback
-       amdr->more()->witnessed.clear();
+       mdr->more()->witnessed.clear();
        pending_masters.erase(p->first);
       }
 
-      if (amdr->more()->witnessed.count(who)) {
-       int srcdn_auth = amdr->more()->srcdn_auth_mds;
-       if (srcdn_auth >= 0 && amdr->more()->waiting_on_slave.count(srcdn_auth)) {
-         dout(10) << " master request " << *amdr << " waiting for rename srcdn's auth mds."
-                  << amdr->more()->srcdn_auth_mds << " to reply" << dendl;
+      if (mdr->more()->witnessed.count(who)) {
+       int srcdn_auth = mdr->more()->srcdn_auth_mds;
+       if (srcdn_auth >= 0 && mdr->more()->waiting_on_slave.count(srcdn_auth)) {
+         dout(10) << " master request " << *mdr << " waiting for rename srcdn's auth mds."
+                  << mdr->more()->srcdn_auth_mds << " to reply" << dendl;
          // waiting for the slave (rename srcdn's auth mds), delay sending resolve ack
          // until either the request is committing or the slave also fails.
-         assert(amdr->more()->waiting_on_slave.size() == 1);
+         assert(mdr->more()->waiting_on_slave.size() == 1);
          pending_masters.insert(p->first);
        } else {
-         dout(10) << " master request " << *amdr << " no longer witnessed by slave mds."
+         dout(10) << " master request " << *mdr << " no longer witnessed by slave mds."
                   << who << " to recover" << dendl;
          if (srcdn_auth >= 0)
-           assert(amdr->more()->witnessed.count(srcdn_auth) == 0);
+           assert(mdr->more()->witnessed.count(srcdn_auth) == 0);
 
          // discard this peer's prepare (if any)
-         amdr->more()->witnessed.erase(who);
+         mdr->more()->witnessed.erase(who);
        }
       }
       
-      if (amdr->more()->waiting_on_slave.count(who)) {
-       dout(10) << " master request " << *amdr << " waiting for slave mds." << who
+      if (mdr->more()->waiting_on_slave.count(who)) {
+       dout(10) << " master request " << *mdr << " waiting for slave mds." << who
                 << " to recover" << dendl;
        // retry request when peer recovers
-       amdr->more()->waiting_on_slave.erase(who);
-       if (amdr->more()->waiting_on_slave.empty())
-         mds->wait_for_active_peer(who, new C_MDS_RetryRequest(this, amdr));
+       mdr->more()->waiting_on_slave.erase(who);
+       if (mdr->more()->waiting_on_slave.empty())
+         mds->wait_for_active_peer(who, new C_MDS_RetryRequest(this, mdr));
       }
 
-      if (amdr->locking && amdr->locking_target_mds == who)
-       amdr->finish_locking(amdr->locking);
+      if (mdr->locking && mdr->locking_target_mds == who)
+       mdr->finish_locking(mdr->locking);
     }
   }
 
@@ -3702,71 +3700,70 @@ void MDCache::rejoin_send_rejoins()
   if (!mds->is_rejoin()) {
     // i am survivor.  send strong rejoin.
     // note request remote_auth_pins, xlocks
-    for (ceph::unordered_map<metareqid_t, ceph::weak_ptr<MDRequestImpl> >::iterator p = active_requests.begin();
+    for (ceph::unordered_map<metareqid_t, MDRequestRef>::iterator p = active_requests.begin();
         p != active_requests.end();
         ++p) {
-      MDRequestRef amdr(p->second.lock());
-      assert(amdr);
-      if (amdr->is_slave())
+      MDRequestRef& mdr = p->second;
+      if (mdr->is_slave())
        continue;
       // auth pins
-      for (set<MDSCacheObject*>::iterator q = amdr->remote_auth_pins.begin();
-          q != amdr->remote_auth_pins.end();
+      for (set<MDSCacheObject*>::iterator q = mdr->remote_auth_pins.begin();
+          q != mdr->remote_auth_pins.end();
           ++q) {
        if (!(*q)->is_auth()) {
          int who = (*q)->authority().first;
          if (rejoins.count(who) == 0) continue;
          MMDSCacheRejoin *rejoin = rejoins[who];
          
-         dout(15) << " " << *amdr << " authpin on " << **q << dendl;
+         dout(15) << " " << *mdr << " authpin on " << **q << dendl;
          MDSCacheObjectInfo i;
          (*q)->set_object_info(i);
          if (i.ino)
-           rejoin->add_inode_authpin(vinodeno_t(i.ino, i.snapid), amdr->reqid, amdr->attempt);
+           rejoin->add_inode_authpin(vinodeno_t(i.ino, i.snapid), mdr->reqid, mdr->attempt);
          else
-           rejoin->add_dentry_authpin(i.dirfrag, i.dname, i.snapid, amdr->reqid, amdr->attempt);
+           rejoin->add_dentry_authpin(i.dirfrag, i.dname, i.snapid, mdr->reqid, mdr->attempt);
 
-         if (amdr->has_more() && amdr->more()->is_remote_frozen_authpin &&
-             amdr->more()->rename_inode == (*q))
+         if (mdr->has_more() && mdr->more()->is_remote_frozen_authpin &&
+             mdr->more()->rename_inode == (*q))
            rejoin->add_inode_frozen_authpin(vinodeno_t(i.ino, i.snapid),
-                                            amdr->reqid, amdr->attempt);
+                                            mdr->reqid, mdr->attempt);
        }
       }
       // xlocks
-      for (set<SimpleLock*>::iterator q = amdr->xlocks.begin();
-          q != amdr->xlocks.end();
+      for (set<SimpleLock*>::iterator q = mdr->xlocks.begin();
+          q != mdr->xlocks.end();
           ++q) {
        if (!(*q)->get_parent()->is_auth()) {
          int who = (*q)->get_parent()->authority().first;
          if (rejoins.count(who) == 0) continue;
          MMDSCacheRejoin *rejoin = rejoins[who];
          
-         dout(15) << " " << *amdr << " xlock on " << **q << " " << *(*q)->get_parent() << dendl;
+         dout(15) << " " << *mdr << " xlock on " << **q << " " << *(*q)->get_parent() << dendl;
          MDSCacheObjectInfo i;
          (*q)->get_parent()->set_object_info(i);
          if (i.ino)
            rejoin->add_inode_xlock(vinodeno_t(i.ino, i.snapid), (*q)->get_type(),
-                                   amdr->reqid, amdr->attempt);
+                                   mdr->reqid, mdr->attempt);
          else
            rejoin->add_dentry_xlock(i.dirfrag, i.dname, i.snapid,
-                                    amdr->reqid, amdr->attempt);
+                                    mdr->reqid, mdr->attempt);
        }
       }
       // remote wrlocks
-      for (map<SimpleLock*, int>::iterator q = amdr->remote_wrlocks.begin();
-          q != amdr->remote_wrlocks.end();
+      for (map<SimpleLock*, int>::iterator q = mdr->remote_wrlocks.begin();
+          q != mdr->remote_wrlocks.end();
           ++q) {
        int who = q->second;
        if (rejoins.count(who) == 0) continue;
        MMDSCacheRejoin *rejoin = rejoins[who];
 
-       dout(15) << " " << *amdr << " wrlock on " << q->second
+       dout(15) << " " << *mdr << " wrlock on " << q->second
                 << " " << q->first->get_parent() << dendl;
        MDSCacheObjectInfo i;
        q->first->get_parent()->set_object_info(i);
        assert(i.ino);
        rejoin->add_inode_wrlock(vinodeno_t(i.ino, i.snapid), q->first->get_type(),
-                                amdr->reqid, amdr->attempt);
+                                mdr->reqid, mdr->attempt);
       }
     }
   }
@@ -8839,12 +8836,11 @@ void MDCache::kick_find_ino_peers(int who)
 int MDCache::get_num_client_requests()
 {
   int count = 0;
-  for (ceph::unordered_map<metareqid_t, ceph::weak_ptr<MDRequestImpl> >::iterator p = active_requests.begin();
+  for (ceph::unordered_map<metareqid_t, MDRequestRef>::iterator p = active_requests.begin();
       p != active_requests.end();
       ++p) {
-    MDRequestRef amdr(p->second.lock());
-    assert(amdr);
-    if (amdr->reqid.name.is_client() && !amdr->is_slave())
+    MDRequestRef& mdr = p->second;
+    if (mdr->reqid.name.is_client() && !mdr->is_slave())
       count++;
   }
   return count;
@@ -8855,7 +8851,7 @@ MDRequestRef MDCache::request_start(MClientRequest *req)
 {
   // did we win a forward race against a slave?
   if (active_requests.count(req->get_reqid())) {
-    MDRequestRef mdr = active_requests[req->get_reqid()].lock();
+    MDRequestRef& mdr = active_requests[req->get_reqid()];
     assert(mdr);
     if (mdr->is_slave()) {
       dout(10) << "request_start already had " << *mdr << ", waiting for finish" << dendl;
@@ -8870,7 +8866,6 @@ MDRequestRef MDCache::request_start(MClientRequest *req)
   // register new client request
   MDRequestRef mdr(new MDRequestImpl(req->get_reqid(),
                                      req->get_num_fwd(), req));
-  mdr->set_self_ref(mdr);
   active_requests[req->get_reqid()] = mdr;
   dout(7) << "request_start " << *mdr << dendl;
   return mdr;
@@ -8879,7 +8874,6 @@ MDRequestRef MDCache::request_start(MClientRequest *req)
 MDRequestRef MDCache::request_start_slave(metareqid_t ri, __u32 attempt, int by)
 {
   MDRequestRef mdr(new MDRequestImpl(ri, attempt, by));
-  mdr->set_self_ref(mdr);
   assert(active_requests.count(mdr->reqid) == 0);
   active_requests[mdr->reqid] = mdr;
   dout(7) << "request_start_slave " << *mdr << " by mds." << by << dendl;
@@ -8889,7 +8883,6 @@ MDRequestRef MDCache::request_start_slave(metareqid_t ri, __u32 attempt, int by)
 MDRequestRef MDCache::request_start_internal(int op)
 {
   MDRequestRef mdr(new MDRequestImpl);
-  mdr->set_self_ref(mdr);
   mdr->reqid.name = entity_name_t::MDS(mds->get_nodeid());
   mdr->reqid.tid = mds->issue_tid();
   mdr->internal_op = op;
@@ -8900,13 +8893,12 @@ MDRequestRef MDCache::request_start_internal(int op)
   return mdr;
 }
 
-
 MDRequestRef MDCache::request_get(metareqid_t rid)
 {
-  assert(active_requests.count(rid));
-  MDRequestRef amdr(active_requests[rid]);
-  dout(7) << "request_get " << rid << " " << *amdr << dendl;
-  return amdr;
+  ceph::unordered_map<metareqid_t, MDRequestRef>::iterator p = active_requests.find(rid);
+  assert(p != active_requests.end());
+  dout(7) << "request_get " << rid << " " << *p->second << dendl;
+  return p->second;
 }
 
 void MDCache::request_finish(MDRequestRef& mdr)
index 13c3d5f72f093ac41900582f2ba233b759b3a144..c536d32f74b717d900e77d32301895aa94b1425c 100644 (file)
@@ -240,7 +240,7 @@ protected:
 
   // -- requests --
 protected:
-  ceph::unordered_map<metareqid_t,ceph::weak_ptr<MDRequestImpl> > active_requests;
+  ceph::unordered_map<metareqid_t, MDRequestRef> active_requests;
 
 public:
   int get_num_client_requests();
index e873bf950470dc94e6e0999b9e4a463a89558507..9f0e3d8bc430320b5905ba1e78b858527a80c882 100644 (file)
@@ -2618,7 +2618,7 @@ void Migrator::import_finish(CDir *dir, bool notify, bool last)
   it->second.peer_exports.swap(peer_exports);
 
   // clear import state (we're done!)
-  MutationRef& mut = it->second.mut;
+  MutationRef mut = it->second.mut;
   import_state.erase(it);
 
   mds->mdlog->start_submit_entry(new EImportFinish(dir, true));
index 53243df739b953d984258b842ec236b231a6a50e..dbfbe7595b0aebe8d668ea34acffac702dd2fc69 100644 (file)
@@ -34,7 +34,6 @@ class MClientRequest;
 class MMDSSlaveRequest;
 
 struct MutationImpl {
-  ceph::weak_ptr<MutationImpl> self_ref;
   metareqid_t reqid;
   __u32 attempt;      // which attempt for this request
   LogSegment *ls;  // the log segment i'm committing to
@@ -80,16 +79,14 @@ struct MutationImpl {
   list<pair<CDentry*,version_t> > dirty_cow_dentries;
 
   MutationImpl()
-    : self_ref(),
-      attempt(0),
+    : attempt(0),
       ls(0),
       slave_to_mds(-1),
       locking(NULL),
       locking_target_mds(-1),
       done_locking(false), committing(false), aborted(false), killed(false) { }
   MutationImpl(metareqid_t ri, __u32 att=0, int slave_to=-1)
-    : self_ref(),
-      reqid(ri), attempt(att),
+    : reqid(ri), attempt(att),
       ls(0),
       slave_to_mds(slave_to), 
       locking(NULL),
@@ -141,10 +138,6 @@ struct MutationImpl {
   virtual void print(ostream &out) {
     out << "mutation(" << this << ")";
   }
-
-  void set_self_ref(ceph::shared_ptr<MutationImpl>& ref) {
-    self_ref = ref;
-  }
 };
 
 inline ostream& operator<<(ostream& out, MutationImpl &mut)
@@ -314,9 +307,6 @@ struct MDRequestImpl : public MutationImpl {
   void clear_ambiguous_auth();
 
   void print(ostream &out);
-  void set_self_ref(ceph::shared_ptr<MDRequestImpl>& ref) {
-    self_ref = ceph::static_pointer_cast<MutationImpl,MDRequestImpl>(ref);
-  }
 };
 
 typedef ceph::shared_ptr<MDRequestImpl> MDRequestRef;
index 11172e186793b3180cfd4dcf1d35772fa5426e19..75f4612ebf5181fd2a667eeb27e69c22f3f686bf 100644 (file)
@@ -548,8 +548,7 @@ void Server::journal_close_session(Session *session, int state)
     session->requests.begin(member_offset(MDRequestImpl,
                                          item_session_request));
   while (!p.end()) {
-    MDRequestRef mdr = ceph::static_pointer_cast<MDRequestImpl,MutationImpl>((*p)->self_ref.lock());
-    assert(mdr);
+    MDRequestRef mdr = mdcache->request_get((*p)->reqid);
     ++p;
     mdcache->request_kill(mdr);
   }