]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: link/unlink behaving
authorSage Weil <sage@newdream.net>
Sat, 24 May 2008 20:01:57 +0000 (13:01 -0700)
committerSage Weil <sage@newdream.net>
Sat, 24 May 2008 20:01:57 +0000 (13:01 -0700)
src/client/Client.cc
src/include/types.h
src/mds/Locker.cc
src/mds/Locker.h
src/mds/Server.cc
src/mds/Server.h

index 6b2a782c8fb42ad5d45364b39e7b7156ede8cb2f..3511c341665257d3768bf5c8b641125f7e58c7b3 100644 (file)
@@ -1849,6 +1849,10 @@ int Client::unmount()
          p != inode_map.end();
          p++) {
       Inode *in = p->second;
+      if (!in) {
+       dout(0) << "null inode_map entry ino " << p->first << dendl;
+       assert(in);
+      }      
       if (!in->caps.empty()) {
        _release(in);
        _flush(in);
index 807e8fefd0e49b08076262aaf9373be6a03bca95..131e0642728b28d59891879b2a8e3987b602bc18 100644 (file)
@@ -205,6 +205,10 @@ struct nested_info_t {
   __u64 rfiles;
   __u64 rsubdirs;
   
+  void zero() {
+    rbytes = rfiles = rsubdirs = 0;
+  }
+
   void encode(bufferlist &bl) const {
     ::encode(rbytes, bl);
     ::encode(rfiles, bl);
index e1b69d23ece4a064261890d5eb52971e19458bf6..b20590230ff247be71b0fd05f6c3480e0094a705 100644 (file)
@@ -850,7 +850,7 @@ bool Locker::check_inode_max_size(CInode *in, bool forcewrlock)
   pi->version = in->pre_dirty();
   pi->max_size = new_max;
   EOpen *le = new EOpen(mds->mdlog);
-  predirty_nested(mut, &le->metablob, in, false);
+  predirty_nested(mut, &le->metablob, in, true, 0, false);
   le->metablob.add_dir_context(in->get_parent_dir());
   le->metablob.add_primary_dentry(in->parent, true, 0, pi);
   le->add_ino(in->ino());
@@ -1033,6 +1033,7 @@ void Locker::handle_client_file_caps(MClientFileCaps *m)
       dout(7) << "  size " << pi->size << " -> " << size
              << " for " << *in << dendl;
       pi->size = size;
+      pi->nested.rbytes = size;
     }
     if (dirty_atime) {
       dout(7) << "  atime " << pi->atime << " -> " << atime
@@ -1047,7 +1048,7 @@ void Locker::handle_client_file_caps(MClientFileCaps *m)
     Mutation *mut = new Mutation;
     mut->ls = mds->mdlog->get_current_segment();
     file_wrlock_force(&in->filelock, mut);  // wrlock for duration of journal
-    predirty_nested(mut, &le->metablob, in, false);
+    predirty_nested(mut, &le->metablob, in, true, 0, false);
     le->metablob.add_dir_context(in->get_parent_dir());
     le->metablob.add_primary_dentry(in->parent, true, 0, pi);
     mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, change_max));
@@ -1221,19 +1222,23 @@ void Locker::revoke_client_leases(SimpleLock *lock)
 
 // nested ---------------------------------------------------------------
 
-void Locker::predirty_nested(Mutation *mut, EMetaBlob *blob, CInode *in, 
-                            bool do_parent, int dfiles, int dsubdirs)
+void Locker::predirty_nested(Mutation *mut, EMetaBlob *blob,
+                            CInode *in, bool primary_dn, CDir *parent,
+                            bool do_parent, int linkunlink)
 {
   dout(10) << "predirty_nested "
-          << do_parent << "/" << dfiles << "/" << dsubdirs
+          << (primary_dn ? "primary_dn ":"remote_dn ")
+          << (do_parent ? "do_parent_mtime ":"")
+          << "linkunlink=" <<  linkunlink
           << " " << *in << dendl;
 
-  CDir *parent = in->get_projected_parent_dn()->get_dir();
+  if (!parent)
+    parent = in->get_projected_parent_dn()->get_dir();
 
   inode_t *curi = in->get_projected_inode();
-  if (curi->is_file()) {
-    curi->nested.rbytes = curi->size;
-  }
+
+  __s64 drbytes, drfiles, drsubdirs;
+  utime_t rctime;
 
   // build list of inodes to wrlock, dirty, and update
   list<CInode*> lsi;
@@ -1257,35 +1262,44 @@ void Locker::predirty_nested(Mutation *mut, EMetaBlob *blob, CInode *in,
     }
 
     // inode -> dirfrag
-    __u64 drbytes = curi->nested.rbytes - curi->accounted_nested.rbytes;
-    __u64 drfiles = curi->nested.rfiles - curi->accounted_nested.rfiles;
-    __u64 drsubdirs = curi->nested.rsubdirs - curi->accounted_nested.rsubdirs;
-    utime_t rctime = MAX(curi->ctime, curi->nested.rctime);
-
     mut->add_projected_fnode(parent);
 
     fnode_t *pf = parent->project_fnode();
     pf->version = parent->pre_dirty();
+
     if (do_parent) {
       dout(10) << "predirty_nested updating mtime/size on " << *parent << dendl;
       pf->fraginfo.mtime = mut->now;
-      pf->fraginfo.nfiles += dfiles;
-      pf->fraginfo.nsubdirs += dsubdirs;
-      //pf->nested.rfiles += dfiles;
-      //pf->nested.rsubdirs += dsubdirs;
+      if (linkunlink) {
+       if (in->is_dir())
+         pf->fraginfo.nsubdirs += linkunlink;
+       else
+         pf->fraginfo.nfiles += linkunlink;
+      }
     }
-    dout(10) << "predirty_nested delta "
-            << drbytes << " bytes / " << drfiles << " files / " << drsubdirs << " subdirs for " 
-            << *parent << dendl;
-    pf->nested.rbytes += drbytes;
-    pf->nested.rfiles += drfiles;
-    pf->nested.rsubdirs += drsubdirs;
-    pf->nested.rctime = rctime;
+    if (primary_dn) {
+      drbytes = curi->nested.rbytes - curi->accounted_nested.rbytes;
+      drfiles = curi->nested.rfiles - curi->accounted_nested.rfiles;
+      drsubdirs = curi->nested.rsubdirs - curi->accounted_nested.rsubdirs;
+      rctime = MAX(curi->ctime, curi->nested.rctime);
+
+      dout(10) << "predirty_nested delta "
+              << drbytes << " bytes / " << drfiles << " files / " << drsubdirs << " subdirs for " 
+              << *parent << dendl;
+      pf->nested.rbytes += drbytes;
+      pf->nested.rfiles += drfiles;
+      pf->nested.rsubdirs += drsubdirs;
+      pf->nested.rctime = rctime;
     
-    curi->accounted_nested.rbytes += drbytes;
-    curi->accounted_nested.rfiles += drfiles;
-    curi->accounted_nested.rsubdirs += drsubdirs;
-    curi->accounted_nested.rctime = rctime;
+      curi->accounted_nested.rbytes += drbytes;
+      curi->accounted_nested.rfiles += drfiles;
+      curi->accounted_nested.rsubdirs += drsubdirs;
+      curi->accounted_nested.rctime = rctime;
+    } else {
+      dout(10) << "predirty_nested no delta (remote dentry) in " << *parent << dendl;
+      assert(!in->is_dir());
+      pf->nested.rfiles += linkunlink;
+    }
 
     if (pin->is_base())
       break;
@@ -1300,9 +1314,9 @@ void Locker::predirty_nested(Mutation *mut, EMetaBlob *blob, CInode *in,
     mut->add_projected_inode(pin);
     lsi.push_back(pin);
 
-    version_t ppv = pin->pre_dirty();
     inode_t *pi = pin->project_inode();
-    pi->version = ppv;
+    pi->version = pin->pre_dirty();
+
     if (do_parent) {
       dout(10) << "predirty_nested updating size/mtime on " << *pin << dendl;
       if (pf->fraginfo.mtime > pi->mtime)
@@ -1326,6 +1340,7 @@ void Locker::predirty_nested(Mutation *mut, EMetaBlob *blob, CInode *in,
     cur = pin;
     curi = pi;
     parent = cur->get_projected_parent_dn()->get_dir();
+    primary_dn = true;
     do_parent = false;
   }
 
index 876f41b8e00c4d9b99018f7993f5952caa3cd127..6a79f5193f902480f9b918d6e270169a76efdf02 100644 (file)
@@ -156,7 +156,8 @@ protected:
   void scatter_writebehind_finish(ScatterLock *lock, LogSegment *ls);
 
 public:
-  void predirty_nested(Mutation *mut, EMetaBlob *blob, CInode *in, bool do_parent, int dfiles=0, int dsubdirs=0);
+  void predirty_nested(Mutation *mut, EMetaBlob *blob, CInode *in, bool primary_dn, CDir *dir,
+                      bool do_parent, int linkunlink=0);
 
   // local
 protected:
index 607c1019149e110d3c066e8e707a2e19f2820633..38f732a98408b8702c1989448d9009e597d4b57e 100644 (file)
@@ -2053,7 +2053,7 @@ void Server::handle_client_mknod(MDRequest *mdr)
   le->metablob.add_client_req(req->get_reqid());
   le->metablob.add_allocated_ino(newi->ino(), mds->idalloc->get_version());
 
-  mds->locker->predirty_nested(mdr, &le->metablob, newi, true, 1, 0);
+  mds->locker->predirty_nested(mdr, &le->metablob, newi, true, dn->dir, true, 1);
 
   le->metablob.add_primary_dentry(dn, true, newi, &newi->inode);
   
@@ -2098,7 +2098,7 @@ void Server::handle_client_mkdir(MDRequest *mdr)
   EUpdate *le = new EUpdate(mdlog, "mkdir");
   le->metablob.add_client_req(req->get_reqid());
   le->metablob.add_allocated_ino(newi->ino(), mds->idalloc->get_version());
-  mds->locker->predirty_nested(mdr, &le->metablob, newi, true, 0, 1);
+  mds->locker->predirty_nested(mdr, &le->metablob, newi, true, dn->dir, true, 1);
   le->metablob.add_primary_dentry(dn, true, newi, &newi->inode);
   le->metablob.add_dir(newdir, true, true); // dirty AND complete
   
@@ -2150,7 +2150,7 @@ void Server::handle_client_symlink(MDRequest *mdr)
   EUpdate *le = new EUpdate(mdlog, "symlink");
   le->metablob.add_client_req(req->get_reqid());
   le->metablob.add_allocated_ino(newi->ino(), mds->idalloc->get_version());
-  mds->locker->predirty_nested(mdr, &le->metablob, newi, true, 1, 0);
+  mds->locker->predirty_nested(mdr, &le->metablob, newi, true, dn->dir, true, 1);
   le->metablob.add_primary_dentry(dn, true, newi, &newi->inode);
 
   // log + wait
@@ -2216,6 +2216,7 @@ void Server::handle_client_link(MDRequest *mdr)
     rdlocks.insert(&linktrace[i]->lock);
   xlocks.insert(&dn->lock);
   wrlocks.insert(&dn->dir->inode->dirlock);
+  wrlocks.insert(&dn->dir->inode->nestedlock);
   for (int i=0; i<(int)targettrace.size(); i++)
     rdlocks.insert(&targettrace[i]->lock);
   xlocks.insert(&targeti->linklock);
@@ -2265,15 +2266,14 @@ class C_MDS_link_local_finish : public Context {
   CInode *targeti;
   version_t dnpv;
   version_t tipv;
-  version_t dirpv;
 public:
   C_MDS_link_local_finish(MDS *m, MDRequest *r, CDentry *d, CInode *ti, 
-                         version_t dnpv_, version_t tipv_, version_t dirpv_) :
+                         version_t dnpv_, version_t tipv_) :
     mds(m), mdr(r), dn(d), targeti(ti),
-    dnpv(dnpv_), tipv(tipv_), dirpv(dirpv_) { }
+    dnpv(dnpv_), tipv(tipv_) { }
   void finish(int r) {
     assert(r == 0);
-    mds->server->_link_local_finish(mdr, dn, targeti, dnpv, tipv, dirpv);
+    mds->server->_link_local_finish(mdr, dn, targeti, dnpv, tipv);
   }
 };
 
@@ -2297,18 +2297,18 @@ void Server::_link_local(MDRequest *mdr, CDentry *dn, CInode *targeti)
   // log + wait
   EUpdate *le = new EUpdate(mdlog, "link_local");
   le->metablob.add_client_req(mdr->reqid);
-  version_t dirpv = predirty_dn_diri(mdr, dn, &le->metablob);   // dir inode's mtime
-  le->metablob.add_dir_context(dn->get_dir());
+  mds->locker->predirty_nested(mdr, &le->metablob, targeti, false, dn->dir,
+                              true, 1);
   le->metablob.add_remote_dentry(dn, true, targeti->ino(), 
                                 MODE_TO_DT(targeti->inode.mode));  // new remote
   le->metablob.add_dir_context(targeti->get_parent_dir());
   le->metablob.add_primary_dentry(targeti->parent, true, targeti, pi);  // update old primary
 
-  mdlog->submit_entry(le, new C_MDS_link_local_finish(mds, mdr, dn, targeti, dnpv, tipv, dirpv));
+  mdlog->submit_entry(le, new C_MDS_link_local_finish(mds, mdr, dn, targeti, dnpv, tipv));
 }
 
 void Server::_link_local_finish(MDRequest *mdr, CDentry *dn, CInode *targeti,
-                               version_t dnpv, version_t tipv, version_t dirpv)
+                               version_t dnpv, version_t tipv)
 {
   dout(10) << "_link_local_finish " << *dn << " to " << *targeti << dendl;
 
@@ -2319,8 +2319,7 @@ void Server::_link_local_finish(MDRequest *mdr, CDentry *dn, CInode *targeti,
   // target inode
   targeti->pop_and_dirty_projected_inode(mdr->ls);
 
-  // new dentry dir mtime
-  dirty_dn_diri(mdr, dn, dirpv);
+  mdr->apply();
   
   // bump target popularity
   mds->balancer->hit_inode(mdr->now, targeti, META_POP_IWR);
@@ -2671,6 +2670,7 @@ void Server::handle_client_unlink(MDRequest *mdr)
     rdlocks.insert(&trace[i]->lock);
   xlocks.insert(&dn->lock);
   wrlocks.insert(&dn->dir->inode->dirlock);
+  wrlocks.insert(&dn->dir->inode->nestedlock);
   xlocks.insert(&in->linklock);
   
   if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
@@ -2715,15 +2715,13 @@ class C_MDS_unlink_local_finish : public Context {
   CDentry *dn;
   CDentry *straydn;
   version_t dnpv;  // deleted dentry
-  version_t dirpv;
 public:
-  C_MDS_unlink_local_finish(MDS *m, MDRequest *r, CDentry *d, CDentry *sd,
-                           version_t dirpv_) :
+  C_MDS_unlink_local_finish(MDS *m, MDRequest *r, CDentry *d, CDentry *sd) :
     mds(m), mdr(r), dn(d), straydn(sd),
-    dnpv(d->get_projected_version()), dirpv(dirpv_) { }
+    dnpv(d->get_projected_version()) {}
   void finish(int r) {
     assert(r == 0);
-    mds->server->_unlink_local_finish(mdr, dn, straydn, dnpv, dirpv);
+    mds->server->_unlink_local_finish(mdr, dn, straydn, dnpv);
   }
 };
 
@@ -2760,25 +2758,25 @@ void Server::_unlink_local(MDRequest *mdr, CDentry *dn, CDentry *straydn)
   pi->nlink--;
   pi->ctime = mdr->now;
   pi->version = ipv;
+  pi->nested.zero();
   *ji = *pi;  // copy into journal
 
   // the unlinked dentry
   dn->pre_dirty();
-  version_t dirpv = predirty_dn_diri(mdr, dn, &le->metablob);
-  le->metablob.add_dir_context(dn->get_dir());
+  mds->locker->predirty_nested(mdr, &le->metablob, dn->inode, dn->is_primary(), dn->dir,
+                              true, -1);
   le->metablob.add_null_dentry(dn, true);
 
   if (mdr->more()->dst_reanchor_atid)
     le->metablob.add_anchor_transaction(mdr->more()->dst_reanchor_atid);
 
   // log + wait
-  mdlog->submit_entry(le, new C_MDS_unlink_local_finish(mds, mdr, dn, straydn, 
-                                                       dirpv));
+  mdlog->submit_entry(le, new C_MDS_unlink_local_finish(mds, mdr, dn, straydn));
 }
 
 void Server::_unlink_local_finish(MDRequest *mdr, 
                                  CDentry *dn, CDentry *straydn,
-                                 version_t dnpv, version_t dirpv
+                                 version_t dnpv) 
 {
   dout(10) << "_unlink_local_finish " << *dn << dendl;
 
@@ -2796,8 +2794,7 @@ void Server::_unlink_local_finish(MDRequest *mdr,
   in->pop_and_dirty_projected_inode(mdr->ls);
   dn->mark_dirty(dnpv, mdr->ls);  
 
-  // dir inode's mtime
-  dirty_dn_diri(mdr, dn, dirpv);
+  mdr->apply();
   
   // share unlink news with replicas
   for (map<int,int>::iterator it = dn->replicas_begin();
@@ -3127,6 +3124,7 @@ void Server::handle_client_rename(MDRequest *mdr)
     rdlocks.insert(&srctrace[i]->lock);
   xlocks.insert(&srcdn->lock);
   wrlocks.insert(&srcdn->dir->inode->dirlock);
+  wrlocks.insert(&srcdn->dir->inode->nestedlock);
   /*
    * no, this causes problems if the dftlock is scattered...
    *  and what was i thinking anyway? 
@@ -3138,6 +3136,7 @@ void Server::handle_client_rename(MDRequest *mdr)
     rdlocks.insert(&desttrace[i]->lock);
   xlocks.insert(&destdn->lock);
   wrlocks.insert(&destdn->dir->inode->dirlock);
+  wrlocks.insert(&destdn->dir->inode->nestedlock);
 
   // xlock versionlock on srci if remote?
   //  this ensures it gets safely remotely auth_pinned, avoiding deadlock;
@@ -4338,7 +4337,7 @@ void Server::handle_client_openc(MDRequest *mdr)
   le->metablob.add_client_req(req->get_reqid());
   le->metablob.add_allocated_ino(in->ino(), mds->idalloc->get_version());
 
-  mds->locker->predirty_nested(mdr, &le->metablob, in, true, 1, 0);
+  mds->locker->predirty_nested(mdr, &le->metablob, in, true, dn->dir, true, 1);
   le->metablob.add_primary_dentry(dn, true, in, &in->inode);
   
   // log + wait
index 599a06da0cf722f3db2b7cd4749516f55dc49d1d..266de1b5685961a73f7606cc72b6cd6bd99c639c 100644 (file)
@@ -133,7 +133,7 @@ public:
   void _link_local(MDRequest *mdr, CDentry *dn, CInode *targeti);
   void _link_local_finish(MDRequest *mdr,
                          CDentry *dn, CInode *targeti,
-                         version_t, version_t, version_t);
+                         version_t, version_t);
 
   void _link_remote(MDRequest *mdr, CDentry *dn, CInode *targeti);
   void _link_remote_finish(MDRequest *mdr, CDentry *dn, CInode *targeti,
@@ -151,7 +151,7 @@ public:
   void _unlink_local(MDRequest *mdr, CDentry *dn, CDentry *straydn);
   void _unlink_local_finish(MDRequest *mdr, 
                            CDentry *dn, CDentry *straydn,
-                           version_t, version_t);    
+                           version_t);    
 
   void _unlink_remote(MDRequest *mdr, CDentry *dn);
   void _unlink_remote_finish(MDRequest *mdr,