]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw_file: chunked readdir
authorMatt Benjamin <mbenjamin@redhat.com>
Tue, 11 Apr 2017 09:56:13 +0000 (05:56 -0400)
committerNathan Cutler <ncutler@suse.com>
Thu, 20 Apr 2017 09:26:29 +0000 (11:26 +0200)
Adjust readdir callback path for new nfs-ganesha chunked readdir,
including changes to respect the result of callback to not
continue.

Pending introduction of offset name hint, our caller will just be
completely enumerating, so it is possible to remove the offset map
and just keep a last offset.

Fixes: http://tracker.ceph.com/issues/19624
Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
(cherry picked from commit e0191d74e3aef06bf300df045a53a3952a71f651)

src/rgw/rgw_file.cc
src/rgw/rgw_file.h

index 77a29836e18366370010d201a0b52764f9381e6e..4dd017db3a6a80e1d319e6fedb03ffd4e7ad8de6 100644 (file)
@@ -762,7 +762,7 @@ namespace rgw {
            d = get<directory>(&rgw_fh->variant_type);
            if (d) {
              lock_guard guard(rgw_fh->mtx);
-             d->clear_state();
+             rgw_fh->clear_state();
              rgw_fh->invalidate();
            }
          rele:
@@ -853,14 +853,6 @@ namespace rgw {
     if (unlikely(! is_dir()))
       return false;
 
-#if 0
-    /* XXX pretty, but unsafe w/o consistent caching */
-    const directory* d = get<directory>(&variant_type);
-    if ((d->flags & RGWFileHandle::directory::FLAG_CACHED) &&
-       (state.nlink > 2 /* . and .. */))
-      return true;
-#endif /* 0 */
-
     RGWRMdirCheck req(fs->get_context(), fs->get_user(), this);
     int rc = rgwlib.get_fe()->execute_req(&req);
     if (! rc) {
@@ -886,6 +878,11 @@ namespace rgw {
       rcb("..", cb_arg, 2, RGW_LOOKUP_FLAG_DIR);
     }
 
+    lsubdout(fs->get_context(), rgw, 15)
+      << __func__
+      << " offset=" << *offset
+      << dendl;
+
     if (is_root()) {
       RGWListBucketsRequest req(cct, fs->get_user(), this, rcb, cb_arg,
                                offset);
@@ -893,7 +890,7 @@ namespace rgw {
       if (! rc) {
        lock_guard guard(mtx);
        state.atime = now;
-       set_nlink(2 + 1); // XXXX
+       inc_nlink(req.d_count);
        *eof = req.eof();
        event ev(event::type::READDIR, get_key(), state.atime);
        fs->state.push_event(ev);
@@ -905,12 +902,18 @@ namespace rgw {
       if (! rc) {
        lock_guard guard(mtx);
        state.atime = now;
-       set_nlink(2 + 1); // XXXX
+       inc_nlink(req.d_count);
        *eof = req.eof();
        event ev(event::type::READDIR, get_key(), state.atime);
        fs->state.push_event(ev);
       }
     }
+
+    lsubdout(fs->get_context(), rgw, 15)
+      << __func__
+      << " final link count=" << state.nlink
+      << dendl;
+
     return rc;
   } /* RGWFileHandle::readdir */
 
@@ -1062,10 +1065,13 @@ namespace rgw {
     delete write_req;
   }
 
-  void RGWFileHandle::directory::clear_state()
+  void RGWFileHandle::clear_state()
   {
-    flags &= ~RGWFileHandle::directory::FLAG_CACHED;
-    marker_cache.clear();
+    directory* d = get<directory>(&variant_type);
+    if (d) {
+      state.nlink = 2;
+      d->last_marker = rgw_obj_key{};
+    }
   }
 
   void RGWFileHandle::invalidate() {
index 99b31fd5c7ac0308aace8e0a3da0f62f1489ccec..cb87c7dc0bb670e5bb2c9779813879b8ddad3cd4 100644 (file)
@@ -164,7 +164,12 @@ namespace rgw {
     using lock_guard = std::lock_guard<std::mutex>;
     using unique_lock = std::unique_lock<std::mutex>;
 
-    using marker_cache_t = flat_map<uint64_t, rgw_obj_key>;
+    /* TODO: keeping just the last marker is sufficient for
+     * nfs-ganesha 2.4.5; in the near future, nfs-ganesha will
+     * be able to hint the name of the next dirent required,
+     * from which we can directly synthesize a RADOS marker.
+     * using marker_cache_t = flat_map<uint64_t, rgw_obj_key>;
+     */
 
     struct State {
       uint64_t dev;
@@ -189,17 +194,15 @@ namespace rgw {
     struct directory {
 
       static constexpr uint32_t FLAG_NONE =     0x0000;
-      static constexpr uint32_t FLAG_CACHED =   0x0001;
-      static constexpr uint32_t FLAG_OVERFLOW = 0x0002;
 
       uint32_t flags;
-      marker_cache_t marker_cache;
+      rgw_obj_key last_marker;
 
       directory() : flags(FLAG_NONE) {}
-
-      void clear_state();
     };
 
+    void clear_state();
+
     boost::variant<file, directory> variant_type;
 
     uint16_t depth;
@@ -368,7 +371,7 @@ namespace rgw {
 
       switch (fh.fh_type) {
       case RGW_FS_TYPE_DIRECTORY:
-       st->st_nlink = 3;
+       st->st_nlink = 2;
        break;
       case RGW_FS_TYPE_FILE:
        st->st_nlink = 1;
@@ -453,8 +456,7 @@ namespace rgw {
       directory* d = get<directory>(&variant_type);
       if (d) {
        unique_lock guard(mtx);
-       d->marker_cache.insert(
-         marker_cache_t::value_type(off, marker));
+       d->last_marker = marker;
       }
     }
 
@@ -462,9 +464,7 @@ namespace rgw {
       using std::get;
       const directory* d = get<directory>(&variant_type);
       if (d) {
-       const auto& iter = d->marker_cache.find(off);
-       if (iter != d->marker_cache.end())
-         return &(iter->second);
+       return &d->last_marker;
       }
       return nullptr;
     }
@@ -519,6 +519,10 @@ namespace rgw {
       flags &= ~FLAG_CREATING;
     }
 
+    void inc_nlink(const uint64_t n) {
+      state.nlink += n;
+    }
+
     void set_nlink(const uint64_t n) {
       state.nlink = n;
     }
@@ -1128,12 +1132,13 @@ public:
   void* cb_arg;
   rgw_readdir_cb rcb;
   size_t ix;
+  uint32_t d_count;
 
   RGWListBucketsRequest(CephContext* _cct, RGWUserInfo *_user,
                        RGWFileHandle* _rgw_fh, rgw_readdir_cb _rcb,
                        void* _cb_arg, uint64_t* _offset)
     : RGWLibRequest(_cct, _user), rgw_fh(_rgw_fh), offset(_offset),
-      cb_arg(_cb_arg), rcb(_rcb), ix(0) {
+      cb_arg(_cb_arg), rcb(_rcb), ix(0), d_count(0) {
     const auto& mk = rgw_fh->find_marker(*offset);
     if (mk) {
       marker = mk->name;
@@ -1188,8 +1193,15 @@ public:
     for (const auto& iter : m) {
       boost::string_ref marker{iter.first};
       const RGWBucketEnt& ent = iter.second;
-      /* call me maybe */
-      this->operator()(ent.bucket.name, marker);
+      if (! this->operator()(ent.bucket.name, marker)) {
+       /* caller cannot accept more */
+       lsubdout(cct, rgw, 5) << "ListBuckets rcb failed"
+                             << " dirent=" << ent.bucket.name
+                             << " call count=" << ix
+                             << dendl;
+       return;
+      }
+      ++d_count;
       ++ix;
     }
   } /* send_response_data */
@@ -1205,8 +1217,7 @@ public:
     /* update traversal cache */
     rgw_fh->add_marker(off, rgw_obj_key{marker.data(), ""},
                       RGW_FS_TYPE_DIRECTORY);
-    rcb(name.data(), cb_arg, off, RGW_LOOKUP_FLAG_DIR);
-    return 0;
+    return rcb(name.data(), cb_arg, off, RGW_LOOKUP_FLAG_DIR);
   }
 
   bool eof() {
@@ -1231,12 +1242,13 @@ public:
   void* cb_arg;
   rgw_readdir_cb rcb;
   size_t ix;
+  uint32_t d_count;
 
   RGWReaddirRequest(CephContext* _cct, RGWUserInfo *_user,
                    RGWFileHandle* _rgw_fh, rgw_readdir_cb _rcb,
                    void* _cb_arg, uint64_t* _offset)
     : RGWLibRequest(_cct, _user), rgw_fh(_rgw_fh), offset(_offset),
-      cb_arg(_cb_arg), rcb(_rcb), ix(0) {
+      cb_arg(_cb_arg), rcb(_rcb), ix(0), d_count(0) {
     const auto& mk = rgw_fh->find_marker(*offset);
     if (mk) {
       marker = *mk;
@@ -1292,11 +1304,10 @@ public:
     *offset = off;
     /* update traversal cache */
     rgw_fh->add_marker(off, marker, type);
-    rcb(name.data(), cb_arg, off,
-       (type == RGW_FS_TYPE_DIRECTORY) ?
-       RGW_LOOKUP_FLAG_DIR :
-       RGW_LOOKUP_FLAG_FILE);
-    return 0;
+    return rcb(name.data(), cb_arg, off,
+              (type == RGW_FS_TYPE_DIRECTORY) ?
+              RGW_LOOKUP_FLAG_DIR :
+              RGW_LOOKUP_FLAG_FILE);
   }
 
   virtual int get_params() {
@@ -1329,8 +1340,15 @@ public:
                             << " (" << sref << ")" << ""
                             << dendl;
 
-      /* call me maybe */
-      this->operator()(sref, next_marker, RGW_FS_TYPE_FILE);
+      if(! this->operator()(sref, next_marker, RGW_FS_TYPE_FILE)) {
+       /* caller cannot accept more */
+       lsubdout(cct, rgw, 5) << "readdir rcb failed"
+                             << " dirent=" << sref.data()
+                             << " call count=" << ix
+                             << dendl;
+       return;
+      }
+      ++d_count;
       ++ix;
     }
     for (auto& iter : common_prefixes) {