]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
client: reimplement readdir
authorSage Weil <sage@newdream.net>
Wed, 8 Sep 2010 21:02:43 +0000 (14:02 -0700)
committerSage Weil <sage@newdream.net>
Wed, 8 Sep 2010 21:02:43 +0000 (14:02 -0700)
Reimplement core readdir (readdir_r_cb), using kclient as a template.
Reimplement all other readdir variants in terms of readdir_r_cb.

Main change is support for frag chunking, and hopefully lots of subtle bugs
that have been fixed in the kclient code we're based on.

src/client/Client.cc
src/client/Client.h

index 144717cb69d28fd4283ea544966147cdc7ba36db..9f94c82e36033f4e3c2311ab779f348452c1fd30 100644 (file)
@@ -3800,10 +3800,9 @@ void Client::_readdir_add_dirent(DirResult *dirp, const string& name, Inode *in)
 {
   struct stat st;
   int stmask = fill_stat(in, &st);  
-  frag_t fg = dirp->frag();
-  dirp->buffer[fg].push_back(DirEntry(name, st, stmask));
+  dirp->buffer->push_back(DirEntry(name, st, stmask));
   dout(10) << "_readdir_add_dirent " << dirp << " added '" << name << "' -> " << in->ino
-          << ", size now " << dirp->buffer[fg].size() << dendl;
+          << ", size now " << dirp->buffer->size() << dendl;
 }
 
 //struct dirent {
@@ -3813,18 +3812,18 @@ void Client::_readdir_add_dirent(DirResult *dirp, const string& name, Inode *in)
 //  unsigned char  d_type;      /* type of file */
 //  char           d_name[256]; /* filename */
 //};
-void Client::_readdir_fill_dirent(struct dirent *de, DirEntry *entry, loff_t off)
+void Client::fill_dirent(struct dirent *de, const char *name, int type, uint64_t ino, loff_t next_off)
 {
-  strncpy(de->d_name, entry->d_name.c_str(), 256);
+  strncpy(de->d_name, name, 256);
 #ifndef __CYGWIN__
-  de->d_ino = entry->st.st_ino;
+  de->d_ino = ino;
 #ifndef DARWIN
-  de->d_off = off + 1;
+  de->d_off = next_off;
 #endif
   de->d_reclen = 1;
-  de->d_type = MODE_TO_DT(entry->st.st_mode);
-  dout(10) << "_readdir_fill_dirent '" << de->d_name << "' -> " << de->d_ino
-          << " type " << (int)de->d_type << " at off " << off << dendl;
+  de->d_type = MODE_TO_DT(type);
+  dout(10) << "fill_dirent '" << de->d_name << "' -> " << de->d_ino
+          << " type " << (int)de->d_type << " w/ next_off " << next_off << dendl;
 #endif
 }
 
@@ -3832,10 +3831,6 @@ void Client::_readdir_next_frag(DirResult *dirp)
 {
   frag_t fg = dirp->frag();
 
-  // hose old data
-  assert(dirp->buffer.count(fg));
-  dirp->buffer.erase(fg);
-
   // advance
   dirp->next_frag();
   if (dirp->at_end()) {
@@ -3861,7 +3856,6 @@ int Client::_readdir_get_frag(DirResult *dirp)
 {
   // get the current frag.
   frag_t fg = dirp->frag();
-  assert(dirp->buffer.count(fg) == 0);
   
   dout(10) << "_readdir_get_frag " << dirp << " on " << dirp->inode->ino << " fg " << fg << dendl;
 
@@ -3877,6 +3871,8 @@ int Client::_readdir_get_frag(DirResult *dirp)
   req->set_filepath(path); 
   req->inode = diri;
   req->head.args.readdir.frag = fg;
+  if (dirp->last_name.length())
+    req->path2.set_path(dirp->last_name.c_str());
   
   bufferlist dirbl;
   int res = make_request(req, -1, -1, 0, -1, &dirbl);
@@ -3891,49 +3887,45 @@ int Client::_readdir_get_frag(DirResult *dirp)
     // stuff dir contents to cache, DirResult
     assert(diri);
 
-    // create empty result vector
-    dirp->buffer[fg].clear();
-
-    if (fg.is_leftmost()) {
-      // add . and ..?
-      string dot(".");
-      _readdir_add_dirent(dirp, dot, diri);
-      string dotdot("..");
-      if (diri->dn)
-       _readdir_add_dirent(dirp, dotdot, diri->dn->dir->parent_inode);
-      //else
-      //_readdir_add_dirent(dirp, dotdot, DT_DIR);
-    }
-    
+    delete dirp->buffer;
+    dirp->buffer = new vector<DirEntry>;
+    dirp->buffer_frag = fg;
+
     // the rest?
     bufferlist::iterator p = dirbl.begin();
-    if (!p.end()) {
-      // dirstat
-      DirStat dst(p);
-      __u32 numdn;
-      __u8 complete, end;
-      ::decode(numdn, p);
-      ::decode(end, p);
-      ::decode(complete, p);
-
-      string dname;
-      LeaseStat dlease;
-      while (numdn) {
-       ::decode(dname, p);
-       ::decode(dlease, p);
-       InodeStat ist(p);
-
-       Inode *in = _ll_get_inode(ist.vino);
-       dout(15) << "_readdir_get_frag got " << dname << " to " << in->ino << dendl;
-       _readdir_add_dirent(dirp, dname, in);
-
-       numdn--;
-      }
+    assert(!p.end());
+
+    // dirstat
+    DirStat dst(p);
+    __u32 numdn;
+    __u8 complete, end;
+    ::decode(numdn, p);
+    ::decode(end, p);
+    ::decode(complete, p);
+
+    string dname;
+    LeaseStat dlease;
+    for (unsigned i=0; i<numdn; i++) {
+      ::decode(dname, p);
+      ::decode(dlease, p);
+      InodeStat ist(p);
+      
+      Inode *in = _ll_get_inode(ist.vino);
+      dout(15) << "_readdir_get_frag " << dirp << "    " << dname << " to " << in->ino << dendl;
+      _readdir_add_dirent(dirp, dname, in);
     }
 
-    if (diri->dir && diri->dir->release_count == dirp->release_count) {
-      dout(10) << " marking I_COMPLETE on " << *diri << dendl;
-      diri->flags |= I_COMPLETE;
+    dirp->this_offset = dirp->next_offset;
+    dout(10) << "_readdir_get_frag " << dirp << " got frag " << dirp->buffer_frag
+            << " this_offset " << dirp->this_offset
+            << " size " << dirp->buffer->size() << dendl;
+
+    if (end) {
+      dirp->last_name.clear();
+      dirp->next_offset = 2;
+    } else {
+      dirp->last_name = dname;
+      dirp->next_offset += numdn;
     }
   } else {
     dout(10) << "_readdir_get_frag got error " << res << ", setting end flag" << dendl;
@@ -3954,53 +3946,42 @@ int Client::readdir_r(DIR *d, struct dirent *de)
  *  0 for end of directory
  * <0 on error
  */
-int Client::readdirplus_r(DIR *d, struct dirent *de, struct stat *st, int *stmask)
-{  
-  DirResult *dirp = (DirResult*)d;
-  
-  dout(10) << "readdirplus_r " << *dirp->inode << " offset " << dirp->offset
-          << " frag " << dirp->frag() << " fragpos " << dirp->fragpos()
-          << " at_end=" << dirp->at_end()
-          << dendl;
-
-  while (1) {
-    if (dirp->at_end())
-      return 0;
 
-    if (dirp->buffer.count(dirp->frag()) == 0) {
-      Mutex::Locker lock(client_lock);
-      _readdir_get_frag(dirp);
-      if (dirp->at_end())
-       return 0;
-    }
+struct single_readdir {
+  struct dirent *de;
+  struct stat *st;
+  int *stmask;
+  bool full;
+};
 
-    frag_t fg = dirp->frag();
-    uint32_t pos = dirp->fragpos();
-    assert(dirp->buffer.count(fg));   
-    vector<DirEntry> &ent = dirp->buffer[fg];
+static int _readdir_single_dirent_cb(void *p, struct dirent *de, struct stat *st, int stmask, off_t off)
+{
+  single_readdir *c = (single_readdir *)p;
 
-    if (ent.empty() ||
-       pos >= ent.size()) {
-      dout(10) << "empty frag " << fg << ", moving on to next" << dendl;
-      _readdir_next_frag(dirp);
-      continue;
-    }
+  if (c->full)
+    return -1;
 
-    assert(pos < ent.size());
-    _readdir_fill_dirent(de, &ent[pos], dirp->offset);
-    if (st)
-      *st = ent[pos].st;
-    if (stmask)
-      *stmask = ent[pos].stmask;
-    pos++;
-    dirp->offset++;
+  *c->de = *de;
+  *c->st = *st;
+  *c->stmask = stmask;
+  c->full = true;
+  return 0;  
+}
 
-    if (pos == ent.size()) 
-      _readdir_next_frag(dirp);
+int Client::readdirplus_r(DIR *d, struct dirent *de, struct stat *st, int *stmask)
+{  
+  single_readdir sr;
+  sr.de = de;
+  sr.st = st;
+  sr.stmask = stmask;
+  sr.full = false;
 
+  int r = readdir_r_cb(d, _readdir_single_dirent_cb, (void *)&sr);
+  if (r < 0)
+    return r;
+  if (sr.full)
     return 1;
-  }
-  assert(0);
+  return 0;
 }
 
 int Client::readdir_r_cb(DIR *d, add_dirent_cb_t cb, void *p)
@@ -4017,111 +3998,151 @@ int Client::readdir_r_cb(DIR *d, add_dirent_cb_t cb, void *p)
   memset(&de, 0, sizeof(de));
   memset(&st, 0, sizeof(st));
 
-  while (1) {
-    if (dirp->at_end())
-      return 0;
+  frag_t fg = dirp->frag();
+  uint32_t off = dirp->fragpos();
 
-    if (dirp->buffer.count(dirp->frag()) == 0) {
-      Mutex::Locker lock(client_lock);
-      _readdir_get_frag(dirp);
-      if (dirp->at_end())
-       return 0;
-    }
+  Inode *diri = dirp->inode;
 
-    frag_t fg = dirp->frag();
-    uint32_t pos = dirp->fragpos();
-    assert(dirp->buffer.count(fg));   
-    vector<DirEntry> &ent = dirp->buffer[fg];
+  if (dirp->offset == 0) {
+    dout(15) << " including ." << dendl;
+    uint64_t next_off = diri->dn ? 1 : 2;
 
-    if (ent.empty() ||
-       pos >= ent.size()) {
-      dout(10) << "empty frag " << fg << ", moving on to next" << dendl;
-      _readdir_next_frag(dirp);
-      continue;
-    }
+    fill_dirent(&de, ".", S_IFDIR, diri->ino, next_off);
 
-    assert(pos < ent.size());
-    _readdir_fill_dirent(&de, &ent[pos], dirp->offset);
-    st = ent[pos].st;
-    int stmask = ent[pos].stmask;
+    struct stat st;
+    fill_stat(diri, &st);
 
-    int r = cb(p, &de, &st, stmask, dirp->offset + 1);  // _next_ offset
+    int r = cb(p, &de, &st, -1, next_off);
     if (r < 0)
-      break;
+      return r;
 
-    dout(15) << " de " << de.d_name << " off " << dirp->offset
-            << " = " << r
-            << dendl;
+    dirp->offset = next_off;
+    off = next_off;
+  }
+  if (dirp->offset == 1) {
+    dout(15) << " including .." << dendl;
+    assert(diri->dn);
+    Inode *in = diri->dn->inode;
+    fill_dirent(&de, "..", S_IFDIR, in->ino, 2);
 
-    pos++;
-    dirp->offset++;
+    struct stat st;
+    fill_stat(in, &st);
 
-    if (pos == ent.size()) 
-      _readdir_next_frag(dirp);
+    int r = cb(p, &de, &st, -1, 2);
+    if (r < 0)
+      return r;
 
-    return 1;
+    dirp->offset = 2;
+    off = 2;
   }
-
-  return 0;
-}
-
-int Client::_getdents(DIR *dir, char *buf, int buflen, bool fullent)
-{
-  DirResult *dirp = (DirResult *)dir;
-  int ret = 0;
-  
+    
   while (1) {
     if (dirp->at_end())
-      return ret;
+      return 0;
 
-    if (dirp->buffer.count(dirp->frag()) == 0) {
+    if (dirp->buffer_frag != dirp->frag() || dirp->buffer == NULL) {
       Mutex::Locker lock(client_lock);
-      _readdir_get_frag(dirp);
-      if (dirp->at_end())
-       return ret;
+      int r = _readdir_get_frag(dirp);
+      if (r)
+       return r;
     }
 
-    frag_t fg = dirp->frag();
-    uint32_t pos = dirp->fragpos();
-    assert(dirp->buffer.count(fg));   
-    vector<DirEntry> &ent = dirp->buffer[fg];
+    dout(10) << "off " << off << " this_offset " << dirp->this_offset << " size " << dirp->buffer->size() << dendl;
+    while (off - dirp->this_offset >= 0 &&
+          off - dirp->this_offset < dirp->buffer->size()) {
+      uint64_t pos = DirResult::make_fpos(fg, off);
+      DirEntry& ent = (*dirp->buffer)[off - dirp->this_offset];
 
-    if (ent.empty()) {
-      dout(10) << "empty frag " << fg << ", moving on to next" << dendl;
-      _readdir_next_frag(dirp);
-      continue;
+      fill_dirent(&de, ent.d_name.c_str(), ent.st.st_mode, ent.st.st_ino, dirp->offset + 1);
+      
+      int r = cb(p, &de, &ent.st, ent.stmask, dirp->offset + 1);  // _next_ offset
+      dout(15) << " de " << de.d_name << " off " << dirp->offset
+              << " = " << r
+              << dendl;
+      if (r < 0)
+       return r;
+      
+      off++;
+      dirp->offset = pos + 1;
     }
 
-    assert(pos < ent.size());
+    if (dirp->last_name.length()) {
+      dout(10) << " fetching next chunk of this frag" << dendl;
+      delete dirp->buffer;
+      dirp->buffer = NULL;
+      continue;  // more!
+    }
 
-    // is there room?
-    int dlen = ent[pos].d_name.length();
-    if (fullent)
-      dlen += sizeof(struct dirent);
-    else
-      dlen += 1; // null terminator
-    if (ret + dlen > buflen) {
-      if (!ret)
-       return -ERANGE;  // the buffer is too small for the first name!
-      return ret;
+    if (!fg.is_rightmost()) {
+      // next frag!
+      dirp->next_frag();
+      fg = dirp->frag();
+      off = 0;
+      dout(10) << " advancing to next frag: " << fg << " -> " << dirp->frag() << dendl;
+      continue;
     }
-    if (fullent)
-      _readdir_fill_dirent((struct dirent *)(buf + ret), &ent[pos], dirp->offset);
-    else
-      memcpy(buf + ret, ent[pos].d_name.c_str(), dlen);
-    ret += dlen;
+    dirp->set_end();
 
-    pos++;
-    dirp->offset++;
+    if (diri->dir && diri->dir->release_count == dirp->release_count) {
+      dout(10) << " marking I_COMPLETE on " << *diri << dendl;
+      diri->flags |= I_COMPLETE;
+    }
 
-    if (pos == ent.size()) 
-      _readdir_next_frag(dirp);
+    return 1;
   }
   assert(0);
+  return 0;
 }
 
 
 
+struct getdents_result {
+  char *buf;
+  int buflen;
+  int pos;
+  bool fullent;
+};
+
+static int _readdir_getdent_cb(void *p, struct dirent *de, struct stat *st, int stmask, off_t off)
+{
+  struct getdents_result *c = (getdents_result *)p;
+
+  int dlen;
+  if (c->fullent)
+    dlen = sizeof(*de);
+  else
+    dlen = strlen(de->d_name) + 1;
+
+  if (c->pos + dlen > c->buflen)
+    return -1;  // doesn't fit
+
+  if (c->fullent) {
+    memcpy(c->buf + c->pos, de, sizeof(*de));
+  } else {
+    memcpy(c->buf + c->pos, de->d_name, dlen);
+  }
+  c->pos += dlen;
+  return 0;
+}
+
+int Client::_getdents(DIR *dir, char *buf, int buflen, bool fullent)
+{
+  getdents_result gr;
+  gr.buf = buf;
+  gr.buflen = buflen;
+  gr.fullent = fullent;
+  gr.pos = 0;
+
+  int r = readdir_r_cb(dir, _readdir_getdent_cb, (void *)&gr);
+  if (r < 0) 
+    return r;
+
+  if (gr.pos == 0)
+    return -ERANGE;
+
+  return gr.pos;
+}
+
 int Client::closedir(DIR *dir) 
 {
   Mutex::Locker lock(client_lock);
@@ -4148,8 +4169,7 @@ void Client::rewinddir(DIR *dirp)
 {
   dout(3) << "rewinddir(" << dirp << ")" << dendl;
   DirResult *d = (DirResult*)dirp;
-  d->offset = 0;
-  d->buffer.clear();
+  d->reset();
 }
  
 loff_t Client::telldir(DIR *dirp)
@@ -4163,6 +4183,16 @@ void Client::seekdir(DIR *dirp, loff_t offset)
 {
   dout(3) << "seekdir(" << dirp << ", " << offset << ")" << dendl;
   DirResult *d = (DirResult*)dirp;
+
+  if (offset == 0 ||
+      DirResult::fpos_frag(offset) != d->frag() ||
+      DirResult::fpos_off(offset) < d->fragpos()) {
+    d->reset();
+  }
+
+  if (offset > d->offset)
+    d->release_count--;   // bump if we do a forward seek
+
   d->offset = offset;
 }
 
index c123a780fe0d7524c9b088a62df5ce96f294df12..9375bf5bc19b6285ce12ee104603b39ff251f0fa 100644 (file)
@@ -686,12 +686,33 @@ class Client : public Dispatcher {
     static const int64_t MASK = (1 << SHIFT) - 1;
     static const loff_t END = 1ULL << (SHIFT + 32);
 
+    static uint64_t make_fpos(unsigned frag, unsigned off) {
+      return ((uint64_t)frag << SHIFT) | (uint64_t)off;
+    }
+    static unsigned fpos_frag(uint64_t p) {
+      return p >> SHIFT;
+    }
+    static unsigned fpos_off(uint64_t p) {
+      return p & MASK;
+    }
+
+
     Inode *inode;
-    int64_t offset;   // high bits: frag_t, low bits: an offset
+
+    int64_t offset;        // high bits: frag_t, low bits: an offset
+
+    uint64_t this_offset;  // offset of last chunk, adjusted for . and ..
+    uint64_t next_offset;  // offset of next chunk (last_name's + 1)
+    string last_name;      // last entry in previous chunk
+
     uint64_t release_count;
-    map<frag_t, vector<DirEntry> > buffer;
 
-    DirResult(Inode *in) : inode(in), offset(0), release_count(0) { 
+    frag_t buffer_frag;
+    vector<DirEntry> *buffer;
+
+    DirResult(Inode *in) : inode(in), offset(0), next_offset(2),
+                          release_count(0),
+                          buffer(0) { 
       inode->get();
     }
 
@@ -711,6 +732,15 @@ class Client : public Dispatcher {
     }
     void set_end() { offset = END; }
     bool at_end() { return (offset == END); }
+
+    void reset() {
+      last_name.clear();
+      next_offset = 2;
+      this_offset = 0;
+      offset = 0;
+      delete buffer;
+      buffer = 0;
+    }
   };
 
   // **** WARNING: be sure to update the struct in libceph.h too! ****
@@ -1105,10 +1135,11 @@ protected:
   // fs ops.
 private:
 
+  void fill_dirent(struct dirent *de, const char *name, int type, uint64_t ino, loff_t next_off);
+
   // some helpers
   int _opendir(Inode *in, DirResult **dirpp, int uid=-1, int gid=-1);
   void _readdir_add_dirent(DirResult *dirp, const string& name, Inode *in);
-  void _readdir_fill_dirent(struct dirent *de, DirEntry *entry, loff_t);
   bool _readdir_have_frag(DirResult *dirp);
   void _readdir_next_frag(DirResult *dirp);
   void _readdir_rechoose_frag(DirResult *dirp);