]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
client: maintain dn->offset
authorSage Weil <sage@newdream.net>
Mon, 13 Sep 2010 18:23:54 +0000 (11:23 -0700)
committerSage Weil <sage@newdream.net>
Mon, 13 Sep 2010 18:23:54 +0000 (11:23 -0700)
In preparation for readdir from cache.

src/client/Client.cc
src/client/Client.h

index 573069a623408a6f4f1f385593cfe57f919c9e93..048ee4b1a365677fd9b495e01f707a3fc36086d6 100644 (file)
@@ -506,6 +506,13 @@ Inode * Client::add_update_inode(InodeStat *st, utime_t from, int mds)
       in->dirstat.nsubdirs == 0) {
     dout(10) << " marking I_COMPLETE on empty dir " << *in << dendl;
     in->flags |= I_COMPLETE;
+    if (in->dir) {
+      dout(0) << "WARNING: dir is open on empty dir " << in->ino << " with "
+             << in->dir->dentry_map.size() << " entries" << dendl;
+      in->dir->max_offset = 2;
+
+      // FIXME: tear down dir?
+    }
   }
 
   return in;
@@ -515,8 +522,8 @@ Inode * Client::add_update_inode(InodeStat *st, utime_t from, int mds)
 /*
  * insert_dentry_inode - insert + link a single dentry + inode into the metadata cache.
  */
-void Client::insert_dentry_inode(Dir *dir, const string& dname, LeaseStat *dlease, 
-                                Inode *in, utime_t from, int mds)
+Dentry *Client::insert_dentry_inode(Dir *dir, const string& dname, LeaseStat *dlease, 
+                                   Inode *in, utime_t from, int mds, bool set_offset)
 {
   utime_t dttl = from;
   dttl += (float)dlease->duration_ms / 1000.0;
@@ -557,6 +564,10 @@ void Client::insert_dentry_inode(Dir *dir, const string& dname, LeaseStat *dleas
               << " unlinked, linking" << dendl;
       dn = link(dir, dname, in, dn);
     }
+    if (set_offset) {
+      dout(15) << " setting dn offset to " << dir->max_offset << dendl;
+      dn->offset = dir->max_offset++;
+    }
   }
 
   assert(dn && dn->inode);
@@ -572,6 +583,7 @@ void Client::insert_dentry_inode(Dir *dir, const string& dname, LeaseStat *dleas
     }
   }
   dn->cap_shared_gen = dir->parent_inode->shared_gen;
+  return dn;
 }
 
 
@@ -661,7 +673,7 @@ Inode* Client::insert_trace(MetaRequest *request, utime_t from, int mds)
 
     if (in) {
       Dir *dir = diri->open_dir();
-      insert_dentry_inode(dir, dname, &dlease, in, from, mds);
+      insert_dentry_inode(dir, dname, &dlease, in, from, mds, true);
     } else {
       Dentry *dn = NULL;
       if (diri->dir && diri->dir->dentries.count(dname)) {
@@ -686,7 +698,7 @@ Inode* Client::insert_trace(MetaRequest *request, utime_t from, int mds)
 
     if (in) {
       Dir *dir = diri->open_dir();
-      insert_dentry_inode(dir, dname, &dlease, in, from, mds);
+      insert_dentry_inode(dir, dname, &dlease, in, from, mds, true);
     } else {
       Dentry *dn = NULL;
       if (diri->dir && diri->dir->dentries.count(dname)) {
@@ -724,13 +736,14 @@ Inode* Client::insert_trace(MetaRequest *request, utime_t from, int mds)
 
     string dname;
     LeaseStat dlease;
-    while (numdn) {
+    for (unsigned i=0; i<numdn; i++) {
       ::decode(dname, p);
       ::decode(dlease, p);
       InodeStat ist(p);
       
       Inode *in = add_update_inode(&ist, from, mds);
-      insert_dentry_inode(dir, dname, &dlease, in, from, mds);
+      Dentry *dn = insert_dentry_inode(dir, dname, &dlease, in, from, mds, false);
+      dn->offset = DirResult::make_fpos(request->readdir_frag, i + request->readdir_offset);
 
       // remove any extra names
       while (pd != dir->dentry_map.end() && pd->first <= dname) {
@@ -747,9 +760,7 @@ Inode* Client::insert_trace(MetaRequest *request, utime_t from, int mds)
       in->get();
       request->readdir_result.push_back(pair<string,Inode*>(dname, in));
 
-      dout(15) << "insert_trace  '" << dname << "' -> " << in->ino << dendl;
-
-      numdn--;
+      dout(15) << "insert_trace  " << dn->offset << ": '" << dname << "' -> " << in->ino << dendl;
     }
     request->readdir_last_name = dname;
     
@@ -3905,7 +3916,7 @@ void Client::fill_dirent(struct dirent *de, const char *name, int type, uint64_t
 #endif
   de->d_reclen = 1;
   de->d_type = MODE_TO_DT(type);
-  dout(10) << "fill_dirent '" << de->d_name << "' -> " << de->d_ino
+  dout(10) << "fill_dirent '" << de->d_name << "' -> " << inodeno_t(de->d_ino)
           << " type " << (int)de->d_type << " w/ next_off " << next_off << dendl;
 #endif
 }
@@ -3968,6 +3979,8 @@ int Client::_readdir_get_frag(DirResult *dirp)
     req->path2.set_path(dirp->last_name.c_str());
     req->readdir_start = dirp->last_name;
   }
+  req->readdir_offset = dirp->next_offset;
+  req->readdir_frag = fg;
   
   
   bufferlist dirbl;
@@ -4028,6 +4041,9 @@ int Client::readdir_r_cb(DIR *d, add_dirent_cb_t cb, void *p)
 
   Inode *diri = dirp->inode;
 
+  if (dirp->at_end())
+    return 0;
+
   if (dirp->offset == 0) {
     dout(15) << " including ." << dendl;
     uint64_t next_off = diri->dn ? 1 : 2;
@@ -4104,13 +4120,15 @@ int Client::readdir_r_cb(DIR *d, add_dirent_cb_t cb, void *p)
       dout(10) << " advancing to next frag: " << fg << " -> " << dirp->frag() << dendl;
       continue;
     }
-    dirp->set_end();
 
     if (diri->dir && diri->dir->release_count == dirp->release_count) {
       dout(10) << " marking I_COMPLETE on " << *diri << dendl;
       diri->flags |= I_COMPLETE;
+      if (diri->dir)
+       diri->dir->max_offset = dirp->offset;
     }
 
+    dirp->set_end();
     return 1;
   }
   assert(0);
index 09db35dafd91a97f61ef4858ed639419e67c7136..d44609a3c1d980b6265afa2b3b988609b954d226 100644 (file)
@@ -132,7 +132,10 @@ struct MetaRequest {
   bool kick;
   
   // readdir result
+  frag_t readdir_frag;
   string readdir_start;  // starting _after_ this name
+  uint64_t readdir_offset;
+
   vector<pair<string,Inode*> > readdir_result;
   bool readdir_end;
   int readdir_num;
@@ -249,6 +252,7 @@ class Dentry : public LRUObject {
   Dir     *dir;
   Inode   *inode;
   int     ref;                       // 1 if there's a dir beneath me.
+  uint64_t offset;
   int lease_mds;
   utime_t lease_ttl;
   uint64_t lease_gen;
@@ -264,7 +268,7 @@ class Dentry : public LRUObject {
     //cout << "dentry.put on " << this << " " << name << " now " << ref << std::endl;
   }
   
-  Dentry() : dir(0), inode(0), ref(0), lease_mds(-1), lease_gen(0), lease_seq(0), cap_shared_gen(0) { }
+  Dentry() : dir(0), inode(0), ref(0), offset(0), lease_mds(-1), lease_gen(0), lease_seq(0), cap_shared_gen(0) { }
 };
 
 class Dir {
@@ -273,8 +277,9 @@ class Dir {
   hash_map<string, Dentry*> dentries;
   map<string, Dentry*> dentry_map;
   uint64_t release_count;
+  uint64_t max_offset;
 
-  Dir(Inode* in) : release_count(0) { parent_inode = in; }
+  Dir(Inode* in) : release_count(0), max_offset(2) { parent_inode = in; }
 
   bool is_empty() {  return dentries.empty(); }
 };
@@ -1139,8 +1144,8 @@ protected:
                              uint64_t time_warp_seq, utime_t ctime, utime_t mtime, utime_t atime,
                              int issued);
   Inode *add_update_inode(InodeStat *st, utime_t ttl, int mds);
-  void insert_dentry_inode(Dir *dir, const string& dname, LeaseStat *dlease, 
-                          Inode *in, utime_t from, int mds);
+  Dentry *insert_dentry_inode(Dir *dir, const string& dname, LeaseStat *dlease, 
+                             Inode *in, utime_t from, int mds, bool set_offset);
 
 
   // ----------------------
@@ -1149,7 +1154,9 @@ private:
 
   void fill_dirent(struct dirent *de, const char *name, int type, uint64_t ino, loff_t next_off);
 
-  // some helpers
+  // some readdir helpers
+  typedef int (*add_dirent_cb_t)(void *p, struct dirent *de, struct stat *st, int stmask, off_t off);
+
   int _opendir(Inode *in, DirResult **dirpp, int uid=-1, int gid=-1);
   void _readdir_drop_dirp_buffer(DirResult *dirp);
   bool _readdir_have_frag(DirResult *dirp);
@@ -1157,6 +1164,8 @@ private:
   void _readdir_rechoose_frag(DirResult *dirp);
   int _readdir_get_frag(DirResult *dirp);
   void _closedir(DirResult *dirp);
+
+  // other helpers
   void _ll_get(Inode *in);
   int _ll_put(Inode *in, int num);
   void _ll_drop_pins();
@@ -1212,7 +1221,6 @@ public:
   int opendir(const char *name, DIR **dirpp);
   int closedir(DIR *dirp);
 
-  typedef int (*add_dirent_cb_t)(void *p, struct dirent *de, struct stat *st, int stmask, off_t off);
   int readdir_r_cb(DIR *dirp, add_dirent_cb_t cb, void *p);
 
   int readdir_r(DIR *dirp, struct dirent *de);