]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
client: add an optional Inode ** parm to ceph_readdirplus_r
authorJeff Layton <jlayton@redhat.com>
Mon, 24 Oct 2016 14:03:01 +0000 (10:03 -0400)
committerJeff Layton <jlayton@redhat.com>
Tue, 25 Oct 2016 17:06:29 +0000 (13:06 -0400)
Ganesha needs an inode reference in addition to the attributes when it
calls readdirplus. Other callers however don't need an inode reference.

We could just take one universally and pass it to the callback, but most
callers don't need that reference and would need to put it in the
callback. That's cumbersome and mutex-thrashy.

So, we need to fix the readdir engine to only conditionally take this
extra reference, when the callback will actually use it. Add a bool to
readdir_r_cb that defaults to false and indicates that the caller wants
an inode reference for each dentry returned. When that bool is true
we'll pass a pointer to the inode to the callback after taking a
reference. Otherwise, NULL is passed to the callback.

Next, add a return double pointer arg to ceph_readdirplus_r that
indicates whether the caller wants an inode reference and where to put
the pointer to the inode. Almost all callers will set that to NULL, but
ganesha can set it to a non-NULL value to get the inode reference that
it wants on each call.

Signed-off-by: Jeff Layton <jlayton@redhat.com>
src/client/Client.cc
src/client/Client.h
src/client/fuse_ll.cc
src/client/hypertable/CephBroker.cc
src/include/cephfs/libcephfs.h
src/libcephfs.cc
src/test/libcephfs/test.cc

index 44085c201b553a03cd57decadc6b6c5e2e2746ca..c4c2053522506c9853bf8dd50e537da14f9bce59 100644 (file)
@@ -7324,7 +7324,8 @@ struct dentry_off_lt {
   }
 };
 
-int Client::_readdir_cache_cb(dir_result_t *dirp, add_dirent_cb_t cb, void *p, int caps)
+int Client::_readdir_cache_cb(dir_result_t *dirp, add_dirent_cb_t cb, void *p,
+                             int caps, bool getref)
 {
   assert(client_lock.is_locked());
   ldout(cct, 10) << "_readdir_cache_cb " << dirp << " on " << dirp->inode->ino
@@ -7373,12 +7374,17 @@ int Client::_readdir_cache_cb(dir_result_t *dirp, add_dirent_cb_t cb, void *p, i
     if (pd == dir->readdir_cache.end())
       next_off = dir_result_t::END;
 
+    Inode *in = NULL;
     fill_dirent(&de, dn->name.c_str(), stx.stx_mode, stx.stx_ino, next_off);
+    if (getref) {
+      in = dn->inode.get();
+      _ll_get(in);
+    }
 
     dn_name = dn->name; // fill in name while we have lock
 
     client_lock.Unlock();
-    r = cb(p, &de, &stx, next_off);  // _next_ offset
+    r = cb(p, &de, &stx, next_off, in);  // _next_ offset
     client_lock.Lock();
     ldout(cct, 15) << " de " << de.d_name << " off " << hex << dn->offset << dec
                   << " = " << r << dendl;
@@ -7402,7 +7408,7 @@ int Client::_readdir_cache_cb(dir_result_t *dirp, add_dirent_cb_t cb, void *p, i
 }
 
 int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p,
-                        unsigned want, unsigned flags)
+                        unsigned want, unsigned flags, bool getref)
 {
   int caps = statx_to_mask(flags, want);
 
@@ -7437,8 +7443,14 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p,
     fill_statx(diri, caps, &stx);
     fill_dirent(&de, ".", S_IFDIR, stx.stx_ino, next_off);
 
+    Inode *inode = NULL;
+    if (getref) {
+      inode = diri.get();
+      _ll_get(inode);
+    }
+
     client_lock.Unlock();
-    r = cb(p, &de, &stx, next_off);
+    r = cb(p, &de, &stx, next_off, inode);
     client_lock.Lock();
     if (r < 0)
       return r;
@@ -7464,8 +7476,14 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p,
     fill_statx(in, caps, &stx);
     fill_dirent(&de, "..", S_IFDIR, stx.stx_ino, next_off);
 
+    Inode *inode = NULL;
+    if (getref) {
+      inode = in.get();
+      _ll_get(inode);
+    }
+
     client_lock.Unlock();
-    r = cb(p, &de, &stx, next_off);
+    r = cb(p, &de, &stx, next_off, inode);
     client_lock.Lock();
     if (r < 0)
       return r;
@@ -7484,7 +7502,7 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p,
   if (dirp->inode->snapid != CEPH_SNAPDIR &&
       dirp->inode->is_complete_and_ordered() &&
       dirp->inode->caps_issued_mask(CEPH_CAP_FILE_SHARED)) {
-    int err = _readdir_cache_cb(dirp, cb, p, caps);
+    int err = _readdir_cache_cb(dirp, cb, p, caps, getref);
     if (err != -EAGAIN)
       return err;
   }
@@ -7525,8 +7543,14 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p,
       fill_statx(entry.inode, caps, &stx);
       fill_dirent(&de, entry.name.c_str(), stx.stx_mode, stx.stx_ino, next_off);
 
+      Inode *inode = NULL;
+      if (getref) {
+       inode = entry.inode.get();
+       _ll_get(inode);
+      }
+
       client_lock.Unlock();
-      r = cb(p, &de, &stx, next_off);  // _next_ offset
+      r = cb(p, &de, &stx, next_off, inode);  // _next_ offset
       client_lock.Lock();
 
       ldout(cct, 15) << " de " << de.d_name << " off " << hex << next_off - 1 << dec
@@ -7576,7 +7600,7 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p,
 
 int Client::readdir_r(dir_result_t *d, struct dirent *de)
 {  
-  return readdirplus_r(d, de, 0, 0, 0);
+  return readdirplus_r(d, de, 0, 0, 0, NULL);
 }
 
 /*
@@ -7591,11 +7615,13 @@ int Client::readdir_r(dir_result_t *d, struct dirent *de)
 struct single_readdir {
   struct dirent *de;
   struct ceph_statx *stx;
+  Inode *inode;
   bool full;
 };
 
 static int _readdir_single_dirent_cb(void *p, struct dirent *de,
-                                    struct ceph_statx *stx, off_t off)
+                                    struct ceph_statx *stx, off_t off,
+                                    Inode *in)
 {
   single_readdir *c = static_cast<single_readdir *>(p);
 
@@ -7605,6 +7631,7 @@ static int _readdir_single_dirent_cb(void *p, struct dirent *de,
   *c->de = *de;
   if (c->stx)
     *c->stx = *stx;
+  c->inode = in;
   c->full = true;
   return 1;
 }
@@ -7616,6 +7643,7 @@ struct dirent *Client::readdir(dir_result_t *d)
   single_readdir sr;
   sr.de = &de;
   sr.stx = NULL;
+  sr.inode = NULL;
   sr.full = false;
 
   // our callback fills the dirent and sets sr.full=true on first
@@ -7633,18 +7661,21 @@ struct dirent *Client::readdir(dir_result_t *d)
 
 int Client::readdirplus_r(dir_result_t *d, struct dirent *de,
                          struct ceph_statx *stx, unsigned want,
-                         unsigned flags)
+                         unsigned flags, Inode **out)
 {  
   single_readdir sr;
   sr.de = de;
   sr.stx = stx;
+  sr.inode = NULL;
   sr.full = false;
 
   // our callback fills the dirent and sets sr.full=true on first
   // call, and returns -1 the second time around.
-  int r = readdir_r_cb(d, _readdir_single_dirent_cb, (void *)&sr, want, flags);
+  int r = readdir_r_cb(d, _readdir_single_dirent_cb, (void *)&sr, want, flags, out);
   if (r < -1)
     return r;
+  if (out)
+    *out = sr.inode;
   if (sr.full)
     return 1;
   return 0;
@@ -7660,7 +7691,7 @@ struct getdents_result {
 };
 
 static int _readdir_getdent_cb(void *p, struct dirent *de,
-                              struct ceph_statx *stx, off_t off)
+                              struct ceph_statx *stx, off_t off, Inode *in)
 {
   struct getdents_result *c = static_cast<getdents_result *>(p);
 
@@ -7712,7 +7743,7 @@ struct getdir_result {
   int num;
 };
 
-static int _getdir_cb(void *p, struct dirent *de, struct ceph_statx *stx, off_t off)
+static int _getdir_cb(void *p, struct dirent *de, struct ceph_statx *stx, off_t off, Inode *in)
 {
   getdir_result *r = static_cast<getdir_result *>(p);
 
index 907fd4864c98d890524602dcabc036c814362cfd..70f764251bfa485ded85b08d50984da1f2f8841e 100644 (file)
@@ -720,7 +720,7 @@ private:
   void fill_dirent(struct dirent *de, const char *name, int type, uint64_t ino, loff_t next_off);
 
   // some readdir helpers
-  typedef int (*add_dirent_cb_t)(void *p, struct dirent *de, struct ceph_statx *stx, off_t off);
+  typedef int (*add_dirent_cb_t)(void *p, struct dirent *de, struct ceph_statx *stx, off_t off, Inode *in);
 
   int _opendir(Inode *in, dir_result_t **dirpp, const UserPerm& perms);
   void _readdir_drop_dirp_buffer(dir_result_t *dirp);
@@ -728,7 +728,7 @@ private:
   void _readdir_next_frag(dir_result_t *dirp);
   void _readdir_rechoose_frag(dir_result_t *dirp);
   int _readdir_get_frag(dir_result_t *dirp);
-  int _readdir_cache_cb(dir_result_t *dirp, add_dirent_cb_t cb, void *p, int caps);
+  int _readdir_cache_cb(dir_result_t *dirp, add_dirent_cb_t cb, void *p, int caps, bool getref);
   void _closedir(dir_result_t *dirp);
 
   // other helpers
@@ -954,11 +954,12 @@ public:
    * If @a cb returns a negative error code, stop and return that.
    */
   int readdir_r_cb(dir_result_t *dirp, add_dirent_cb_t cb, void *p,
-                  unsigned want=0, unsigned flags=AT_NO_ATTR_SYNC);
+                  unsigned want=0, unsigned flags=AT_NO_ATTR_SYNC,
+                  bool getref=false);
 
   struct dirent * readdir(dir_result_t *d);
   int readdir_r(dir_result_t *dirp, struct dirent *de);
-  int readdirplus_r(dir_result_t *dirp, struct dirent *de, struct ceph_statx *stx, unsigned want, unsigned flags);
+  int readdirplus_r(dir_result_t *dirp, struct dirent *de, struct ceph_statx *stx, unsigned want, unsigned flags, Inode **out);
 
   int getdir(const char *relpath, list<string>& names,
             const UserPerm& perms);  // get the whole dir at once.
index 9d6890041d893f70a190d306a5e15319755c4003..8b3695417d9df612ade9d547ad9d0e4dc83baf63 100644 (file)
@@ -680,7 +680,8 @@ struct readdir_context {
  * return 0 on success, -1 if out of space
  */
 static int fuse_ll_add_dirent(void *p, struct dirent *de,
-                             struct ceph_statx *stx, off_t next_off)
+                             struct ceph_statx *stx, off_t next_off,
+                             Inode *in)
 {
   struct readdir_context *c = (struct readdir_context *)p;
   CephFuse::Handle *cfuse = (CephFuse::Handle *)fuse_req_userdata(c->req);
index 6c8ce64de7c07484bb2586c20713e3c38b5044ac..77f6e083419ffab568ac6a137931b35cbab058c1 100644 (file)
@@ -388,7 +388,7 @@ int CephBroker::rmdir_recursive(const char *directory) {
   int r;
   if ((r = ceph_opendir(cmount, directory, &dirp)) < 0)
     return r; //failed to open
-  while ((r = ceph_readdirplus_r(cmount, dirp, &de, &stx, CEPH_STATX_INO, AT_NO_ATTR_SYNC)) > 0) {
+  while ((r = ceph_readdirplus_r(cmount, dirp, &de, &stx, CEPH_STATX_INO, AT_NO_ATTR_SYNC, NULL)) > 0) {
     String new_dir = de.d_name;
     if(!(new_dir.compare(".")==0 || new_dir.compare("..")==0)) {
       new_dir = directory;
index 13078412032a174717a8c475ce0eefe88f3f8a2b..cacdab98d451a76280d94ed2f63de2cc15ee9500 100644 (file)
@@ -483,11 +483,13 @@ int ceph_readdir_r(struct ceph_mount_info *cmount, struct ceph_dir_result *dirp,
  * @param stx the stats of the file/directory of the entry returned
  * @param want mask showing desired inode attrs for returned entry
  * @param flags bitmask of flags to use when filling out attributes
+ * @param out optional returned Inode argument. If non-NULL, then a reference will be taken on
+ *            the inode and the pointer set on success.
  * @returns 1 if the next entry was filled in, 0 if the end of the directory stream was reached,
  *          and a negative error code on failure.
  */
 int ceph_readdirplus_r(struct ceph_mount_info *cmount, struct ceph_dir_result *dirp, struct dirent *de,
-                      struct ceph_statx *stx, unsigned want, unsigned flags);
+                      struct ceph_statx *stx, unsigned want, unsigned flags, struct Inode **out);
 
 /**
  * Gets multiple directory entries.
index 7b7e7c517dc043993894d44ce39043445119c1d1..288f6a7c80e696f81fec905110c891411186cddc 100644 (file)
@@ -518,11 +518,11 @@ extern "C" int ceph_readdir_r(struct ceph_mount_info *cmount, struct ceph_dir_re
 
 extern "C" int ceph_readdirplus_r(struct ceph_mount_info *cmount, struct ceph_dir_result *dirp,
                                  struct dirent *de, struct ceph_statx *stx, unsigned want,
-                                 unsigned flags)
+                                 unsigned flags, struct Inode **out)
 {
   if (!cmount->is_mounted())
     return -ENOTCONN;
-  return cmount->get_client()->readdirplus_r(reinterpret_cast<dir_result_t*>(dirp), de, stx, want, flags);
+  return cmount->get_client()->readdirplus_r(reinterpret_cast<dir_result_t*>(dirp), de, stx, want, flags, out);
 }
 
 extern "C" int ceph_getdents(struct ceph_mount_info *cmount, struct ceph_dir_result *dirp,
index 5a38cf8c7dc1e43518da804717878b15e9a32a84..e8eaef4ba5279785b28088e70cc31f2644cba8bb 100644 (file)
@@ -426,7 +426,7 @@ TEST(LibCephFS, DirLs) {
     struct dirent rdent;
     struct ceph_statx stx;
     int len = ceph_readdirplus_r(cmount, ls_dir, &rdent, &stx,
-                                CEPH_STATX_SIZE, AT_NO_ATTR_SYNC);
+                                CEPH_STATX_SIZE, AT_NO_ATTR_SYNC, NULL);
     if (len == 0)
       break;
     ASSERT_EQ(len, 1);
@@ -1138,7 +1138,7 @@ TEST(LibCephFS, UseUnmounted) {
   EXPECT_EQ(-ENOTCONN, ceph_readdir_r(cmount, dirp, &rdent));
 
   struct ceph_statx stx;
-  EXPECT_EQ(-ENOTCONN, ceph_readdirplus_r(cmount, dirp, &rdent, &stx, 0, 0));
+  EXPECT_EQ(-ENOTCONN, ceph_readdirplus_r(cmount, dirp, &rdent, &stx, 0, 0, NULL));
   EXPECT_EQ(-ENOTCONN, ceph_getdents(cmount, dirp, NULL, 0));
   EXPECT_EQ(-ENOTCONN, ceph_getdnames(cmount, dirp, NULL, 0));
   EXPECT_EQ(-ENOTCONN, ceph_telldir(cmount, dirp));