]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
client: pass want and flags to readdir_r_cb
authorJeff Layton <jlayton@redhat.com>
Mon, 24 Oct 2016 14:03:01 +0000 (10:03 -0400)
committerJeff Layton <jlayton@redhat.com>
Tue, 25 Oct 2016 17:06:29 +0000 (13:06 -0400)
...so we can ensure that we have the necessary caps when filling out
the ceph_statx for each dentry's inode. In order to only do this when
completely necessary, we have want default to 0 and the flags default
to AT_NO_ATTR_SYNC. The only codepath where we pass in a non-default
set of args there is ceph_readdirplus_r as it's the only codepath that
cares about fields in the ceph_statx that aren't immutable.

For now, since we have no support for requesting caps during a readdir
call, we simply issue getattrs prior to calling fill_statx. If we
already have the necessary caps, or are doing a lazy statx then this
becomes a no-op.

Note too that I _think_ the MDS will recall caps on the entries when
satisfying a readdir, so we avoid calling getattr when we're populating
the ceph_statx out of a just-acquired readdir response.

Signed-off-by: Jeff Layton <jlayton@redhat.com>
src/client/Client.cc
src/client/Client.h

index 9667ecd0e253728ae27143dc38a43cae2c1459d2..44085c201b553a03cd57decadc6b6c5e2e2746ca 100644 (file)
@@ -7324,7 +7324,7 @@ struct dentry_off_lt {
   }
 };
 
-int Client::_readdir_cache_cb(dir_result_t *dirp, add_dirent_cb_t cb, void *p, bool lazy)
+int Client::_readdir_cache_cb(dir_result_t *dirp, add_dirent_cb_t cb, void *p, int caps)
 {
   assert(client_lock.is_locked());
   ldout(cct, 10) << "_readdir_cache_cb " << dirp << " on " << dirp->inode->ino
@@ -7360,9 +7360,13 @@ int Client::_readdir_cache_cb(dir_result_t *dirp, add_dirent_cb_t cb, void *p, b
       continue;
     }
 
+    int r = _getattr(dn->inode, caps, dirp->perms);
+    if (r < 0)
+      return r;
+
     struct ceph_statx stx;
     struct dirent de;
-    fill_statx(dn->inode, lazy ? 0 : dn->inode->caps_issued(), &stx);
+    fill_statx(dn->inode, caps, &stx);
 
     uint64_t next_off = dn->offset + 1;
     ++pd;
@@ -7374,7 +7378,7 @@ int Client::_readdir_cache_cb(dir_result_t *dirp, add_dirent_cb_t cb, void *p, b
     dn_name = dn->name; // fill in name while we have lock
 
     client_lock.Unlock();
-    int r = cb(p, &de, &stx, next_off);  // _next_ offset
+    r = cb(p, &de, &stx, next_off);  // _next_ offset
     client_lock.Lock();
     ldout(cct, 15) << " de " << de.d_name << " off " << hex << dn->offset << dec
                   << " = " << r << dendl;
@@ -7397,8 +7401,11 @@ int Client::_readdir_cache_cb(dir_result_t *dirp, add_dirent_cb_t cb, void *p, b
   return 0;
 }
 
-int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p, bool lazy)
+int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p,
+                        unsigned want, unsigned flags)
 {
+  int caps = statx_to_mask(flags, want);
+
   Mutex::Locker lock(client_lock);
 
   dir_result_t *dirp = static_cast<dir_result_t*>(d);
@@ -7422,11 +7429,16 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p, bool lazy
     assert(diri->dn_set.size() < 2); // can't have multiple hard-links to a dir
     uint64_t next_off = 1;
 
-    fill_statx(diri, lazy ? 0 : diri->caps_issued(), &stx);
+    int r;
+    r = _getattr(diri, caps, dirp->perms);
+    if (r < 0)
+      return r;
+
+    fill_statx(diri, caps, &stx);
     fill_dirent(&de, ".", S_IFDIR, stx.stx_ino, next_off);
 
     client_lock.Unlock();
-    int r = cb(p, &de, &stx, next_off);
+    r = cb(p, &de, &stx, next_off);
     client_lock.Lock();
     if (r < 0)
       return r;
@@ -7444,11 +7456,16 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p, bool lazy
     else
       in = diri->get_first_parent()->inode;
 
-    fill_statx(in, lazy ? 0 : in->caps_issued(), &stx);
+    int r;
+    r = _getattr(diri, caps, dirp->perms);
+    if (r < 0)
+      return r;
+
+    fill_statx(in, caps, &stx);
     fill_dirent(&de, "..", S_IFDIR, stx.stx_ino, next_off);
 
     client_lock.Unlock();
-    int r = cb(p, &de, &stx, next_off);
+    r = cb(p, &de, &stx, next_off);
     client_lock.Lock();
     if (r < 0)
       return r;
@@ -7467,7 +7484,7 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p, bool lazy
   if (dirp->inode->snapid != CEPH_SNAPDIR &&
       dirp->inode->is_complete_and_ordered() &&
       dirp->inode->caps_issued_mask(CEPH_CAP_FILE_SHARED)) {
-    int err = _readdir_cache_cb(dirp, cb, p, lazy);
+    int err = _readdir_cache_cb(dirp, cb, p, caps);
     if (err != -EAGAIN)
       return err;
   }
@@ -7476,12 +7493,14 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p, bool lazy
     if (dirp->at_end())
       return 0;
 
+    bool check_caps = true;
     if (!dirp->is_cached()) {
       int r = _readdir_get_frag(dirp);
       if (r)
        return r;
       // _readdir_get_frag () may updates dirp->offset if the replied dirfrag is
       // different than the requested one. (our dirfragtree was outdated)
+      check_caps = false;
     }
     frag_t fg = dirp->buffer_frag;
 
@@ -7495,11 +7514,19 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p, bool lazy
       dir_result_t::dentry &entry = *it;
 
       uint64_t next_off = entry.offset + 1;
-      fill_statx(entry.inode, lazy ? 0 : entry.inode->caps_issued(), &stx);
+
+      int r;
+      if (check_caps) {
+       r = _getattr(entry.inode, caps, dirp->perms);
+       if (r < 0)
+         return r;
+      }
+
+      fill_statx(entry.inode, caps, &stx);
       fill_dirent(&de, entry.name.c_str(), stx.stx_mode, stx.stx_ino, next_off);
 
       client_lock.Unlock();
-      int r = cb(p, &de, &stx, next_off);  // _next_ offset
+      r = cb(p, &de, &stx, next_off);  // _next_ offset
       client_lock.Lock();
 
       ldout(cct, 15) << " de " << de.d_name << " off " << hex << next_off - 1 << dec
@@ -7615,8 +7642,7 @@ int Client::readdirplus_r(dir_result_t *d, struct dirent *de,
 
   // our callback fills the dirent and sets sr.full=true on first
   // call, and returns -1 the second time around.
-  int r = readdir_r_cb(d, _readdir_single_dirent_cb, (void *)&sr,
-                      flags & AT_NO_ATTR_SYNC);
+  int r = readdir_r_cb(d, _readdir_single_dirent_cb, (void *)&sr, want, flags);
   if (r < -1)
     return r;
   if (sr.full)
index 05a9af8029ade67a816c9ff6ea64ca868e42fde3..907fd4864c98d890524602dcabc036c814362cfd 100644 (file)
@@ -48,6 +48,7 @@ using std::fstream;
 
 #include "InodeRef.h"
 #include "UserPerm.h"
+#include "include/cephfs/ceph_statx.h"
 
 class FSMap;
 class FSMapUser;
@@ -727,7 +728,7 @@ private:
   void _readdir_next_frag(dir_result_t *dirp);
   void _readdir_rechoose_frag(dir_result_t *dirp);
   int _readdir_get_frag(dir_result_t *dirp);
-  int _readdir_cache_cb(dir_result_t *dirp, add_dirent_cb_t cb, void *p, bool lazy);
+  int _readdir_cache_cb(dir_result_t *dirp, add_dirent_cb_t cb, void *p, int caps);
   void _closedir(dir_result_t *dirp);
 
   // other helpers
@@ -952,7 +953,8 @@ public:
    * Returns 0 if it reached the end of the directory.
    * If @a cb returns a negative error code, stop and return that.
    */
-  int readdir_r_cb(dir_result_t *dirp, add_dirent_cb_t cb, void *p, bool lazy=true);
+  int readdir_r_cb(dir_result_t *dirp, add_dirent_cb_t cb, void *p,
+                  unsigned want=0, unsigned flags=AT_NO_ATTR_SYNC);
 
   struct dirent * readdir(dir_result_t *d);
   int readdir_r(dir_result_t *dirp, struct dirent *de);