]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw_file: chunked readdir
authorMatt Benjamin <mbenjamin@redhat.com>
Tue, 11 Apr 2017 09:56:13 +0000 (05:56 -0400)
committerNathan Cutler <ncutler@suse.com>
Tue, 4 Jul 2017 08:00:29 +0000 (10:00 +0200)
Adjust readdir callback path for new nfs-ganesha chunked readdir,
including changes to respect the result of callback to not
continue.

Pending introduction of offset name hint, our caller will just be
completely enumerating, so it is possible to remove the offset map
and just keep a last offset.

Fixes: http://tracker.ceph.com/issues/19624
Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
(cherry picked from commit e0191d74e3aef06bf300df045a53a3952a71f651)

src/rgw/rgw_file.cc
src/rgw/rgw_file.h

index 624bd16b7f1ef44f9d0e1cd5a4f6affebe5de783..861374374e020f15c2107ec987ae20f65a667547 100644 (file)
@@ -777,7 +777,7 @@ namespace rgw {
            d = get<directory>(&rgw_fh->variant_type);
            if (d) {
              lock_guard guard(rgw_fh->mtx);
-             d->clear_state();
+             rgw_fh->clear_state();
              rgw_fh->invalidate();
            }
          rele:
@@ -868,14 +868,6 @@ namespace rgw {
     if (unlikely(! is_dir()))
       return false;
 
-#if 0
-    /* XXX pretty, but unsafe w/o consistent caching */
-    const directory* d = get<directory>(&variant_type);
-    if ((d->flags & RGWFileHandle::directory::FLAG_CACHED) &&
-       (state.nlink > 2 /* . and .. */))
-      return true;
-#endif /* 0 */
-
     RGWRMdirCheck req(fs->get_context(), fs->get_user(), this);
     int rc = rgwlib.get_fe()->execute_req(&req);
     if (! rc) {
@@ -902,6 +894,11 @@ namespace rgw {
       rcb("..", cb_arg, 2, RGW_LOOKUP_FLAG_DIR);
     }
 
+    lsubdout(fs->get_context(), rgw, 15)
+      << __func__
+      << " offset=" << *offset
+      << dendl;
+
     if (is_root()) {
       RGWListBucketsRequest req(cct, fs->get_user(), this, rcb, cb_arg,
                                offset);
@@ -909,7 +906,7 @@ namespace rgw {
       if (! rc) {
        lock_guard guard(mtx);
        state.atime = now;
-       set_nlink(2 + 1); // XXXX
+       inc_nlink(req.d_count);
        *eof = req.eof();
        event ev(event::type::READDIR, get_key(), state.atime);
        fs->state.push_event(ev);
@@ -920,12 +917,18 @@ namespace rgw {
       if (! rc) {
        lock_guard guard(mtx);
        state.atime = now;
-       set_nlink(2 + 1); // XXXX
+       inc_nlink(req.d_count);
        *eof = req.eof();
        event ev(event::type::READDIR, get_key(), state.atime);
        fs->state.push_event(ev);
       }
     }
+
+    lsubdout(fs->get_context(), rgw, 15)
+      << __func__
+      << " final link count=" << state.nlink
+      << dendl;
+
     return rc;
   } /* RGWFileHandle::readdir */
 
@@ -1077,10 +1080,13 @@ namespace rgw {
     delete write_req;
   }
 
-  void RGWFileHandle::directory::clear_state()
+  void RGWFileHandle::clear_state()
   {
-    flags &= ~RGWFileHandle::directory::FLAG_CACHED;
-    marker_cache.clear();
+    directory* d = get<directory>(&variant_type);
+    if (d) {
+      state.nlink = 2;
+      d->last_marker = rgw_obj_key{};
+    }
   }
 
   void RGWFileHandle::invalidate() {
index d95c3b98718cf8fc5d1be931b1f0fdad010499b1..c11fdde377715bec44f958bfd2cb14efa1bbf271 100644 (file)
@@ -164,7 +164,12 @@ namespace rgw {
     using lock_guard = std::lock_guard<std::mutex>;
     using unique_lock = std::unique_lock<std::mutex>;
 
-    using marker_cache_t = flat_map<uint64_t, rgw_obj_key>;
+    /* TODO: keeping just the last marker is sufficient for
+     * nfs-ganesha 2.4.5; in the near future, nfs-ganesha will
+     * be able to hint the name of the next dirent required,
+     * from which we can directly synthesize a RADOS marker.
+     * using marker_cache_t = flat_map<uint64_t, rgw_obj_key>;
+     */
 
     struct State {
       uint64_t dev;
@@ -189,17 +194,15 @@ namespace rgw {
     struct directory {
 
       static constexpr uint32_t FLAG_NONE =     0x0000;
-      static constexpr uint32_t FLAG_CACHED =   0x0001;
-      static constexpr uint32_t FLAG_OVERFLOW = 0x0002;
 
       uint32_t flags;
-      marker_cache_t marker_cache;
+      rgw_obj_key last_marker;
 
       directory() : flags(FLAG_NONE) {}
-
-      void clear_state();
     };
 
+    void clear_state();
+
     boost::variant<file, directory> variant_type;
 
     uint16_t depth;
@@ -374,7 +377,7 @@ namespace rgw {
 
       switch (fh.fh_type) {
       case RGW_FS_TYPE_DIRECTORY:
-       st->st_nlink = 3;
+       st->st_nlink = 2;
        break;
       case RGW_FS_TYPE_FILE:
        st->st_nlink = 1;
@@ -459,8 +462,7 @@ namespace rgw {
       directory* d = get<directory>(&variant_type);
       if (d) {
        unique_lock guard(mtx);
-       d->marker_cache.insert(
-         marker_cache_t::value_type(off, marker));
+       d->last_marker = marker;
       }
     }
 
@@ -468,9 +470,7 @@ namespace rgw {
       using std::get;
       const directory* d = get<directory>(&variant_type);
       if (d) {
-       const auto& iter = d->marker_cache.find(off);
-       if (iter != d->marker_cache.end())
-         return &(iter->second);
+       return &d->last_marker;
       }
       return nullptr;
     }
@@ -525,6 +525,10 @@ namespace rgw {
       flags &= ~FLAG_CREATING;
     }
 
+    void inc_nlink(const uint64_t n) {
+      state.nlink += n;
+    }
+
     void set_nlink(const uint64_t n) {
       state.nlink = n;
     }
@@ -1137,12 +1141,13 @@ public:
   void* cb_arg;
   rgw_readdir_cb rcb;
   size_t ix;
+  uint32_t d_count;
 
   RGWListBucketsRequest(CephContext* _cct, RGWUserInfo *_user,
                        RGWFileHandle* _rgw_fh, rgw_readdir_cb _rcb,
                        void* _cb_arg, uint64_t* _offset)
     : RGWLibRequest(_cct, _user), rgw_fh(_rgw_fh), offset(_offset),
-      cb_arg(_cb_arg), rcb(_rcb), ix(0) {
+      cb_arg(_cb_arg), rcb(_rcb), ix(0), d_count(0) {
     const auto& mk = rgw_fh->find_marker(*offset);
     if (mk) {
       marker = mk->name;
@@ -1197,8 +1202,15 @@ public:
     for (const auto& iter : m) {
       boost::string_ref marker{iter.first};
       const RGWBucketEnt& ent = iter.second;
-      /* call me maybe */
-      this->operator()(ent.bucket.name, marker);
+      if (! this->operator()(ent.bucket.name, marker)) {
+       /* caller cannot accept more */
+       lsubdout(cct, rgw, 5) << "ListBuckets rcb failed"
+                             << " dirent=" << ent.bucket.name
+                             << " call count=" << ix
+                             << dendl;
+       return;
+      }
+      ++d_count;
       ++ix;
     }
   } /* send_response_data */
@@ -1214,8 +1226,7 @@ public:
     /* update traversal cache */
     rgw_fh->add_marker(off, rgw_obj_key{marker.data(), ""},
                       RGW_FS_TYPE_DIRECTORY);
-    rcb(name.data(), cb_arg, off, RGW_LOOKUP_FLAG_DIR);
-    return 0;
+    return rcb(name.data(), cb_arg, off, RGW_LOOKUP_FLAG_DIR);
   }
 
   bool eof() {
@@ -1240,12 +1251,13 @@ public:
   void* cb_arg;
   rgw_readdir_cb rcb;
   size_t ix;
+  uint32_t d_count;
 
   RGWReaddirRequest(CephContext* _cct, RGWUserInfo *_user,
                    RGWFileHandle* _rgw_fh, rgw_readdir_cb _rcb,
                    void* _cb_arg, uint64_t* _offset)
     : RGWLibRequest(_cct, _user), rgw_fh(_rgw_fh), offset(_offset),
-      cb_arg(_cb_arg), rcb(_rcb), ix(0) {
+      cb_arg(_cb_arg), rcb(_rcb), ix(0), d_count(0) {
     const auto& mk = rgw_fh->find_marker(*offset);
     if (mk) {
       marker = *mk;
@@ -1301,11 +1313,10 @@ public:
     *offset = off;
     /* update traversal cache */
     rgw_fh->add_marker(off, marker, type);
-    rcb(name.data(), cb_arg, off,
-       (type == RGW_FS_TYPE_DIRECTORY) ?
-       RGW_LOOKUP_FLAG_DIR :
-       RGW_LOOKUP_FLAG_FILE);
-    return 0;
+    return rcb(name.data(), cb_arg, off,
+              (type == RGW_FS_TYPE_DIRECTORY) ?
+              RGW_LOOKUP_FLAG_DIR :
+              RGW_LOOKUP_FLAG_FILE);
   }
 
   virtual int get_params() {
@@ -1338,8 +1349,15 @@ public:
                             << " (" << sref << ")" << ""
                             << dendl;
 
-      /* call me maybe */
-      this->operator()(sref, next_marker, RGW_FS_TYPE_FILE);
+      if(! this->operator()(sref, next_marker, RGW_FS_TYPE_FILE)) {
+       /* caller cannot accept more */
+       lsubdout(cct, rgw, 5) << "readdir rcb failed"
+                             << " dirent=" << sref.data()
+                             << " call count=" << ix
+                             << dendl;
+       return;
+      }
+      ++d_count;
       ++ix;
     }
     for (auto& iter : common_prefixes) {