]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw_file: chunked readdir
authorMatt Benjamin <mbenjamin@redhat.com>
Tue, 11 Apr 2017 09:56:13 +0000 (05:56 -0400)
committerMatt Benjamin <mbenjamin@redhat.com>
Thu, 13 Apr 2017 14:58:22 +0000 (10:58 -0400)
Adjust readdir callback path for new nfs-ganesha chunked readdir,
including changes to respect the result of callback to not
continue.

Pending introduction of offset name hint, our caller will just be
completely enumerating, so it is possible to remove the offset map
and just keep a last offset.

Fixes: http://tracker.ceph.com/issues/19624
Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
src/rgw/rgw_file.cc
src/rgw/rgw_file.h

index 3133eb0e9d28e94b6779653dd0e62a4b80d4306f..3ba45aee00b8e223768d60887cdb037cc61ede6a 100644 (file)
@@ -801,7 +801,7 @@ namespace rgw {
            d = get<directory>(&rgw_fh->variant_type);
            if (d) {
              lock_guard guard(rgw_fh->mtx);
-             d->clear_state();
+             rgw_fh->clear_state();
              rgw_fh->invalidate();
            }
          rele:
@@ -892,14 +892,6 @@ namespace rgw {
     if (unlikely(! is_dir()))
       return false;
 
-#if 0
-    /* XXX pretty, but unsafe w/o consistent caching */
-    const directory* d = get<directory>(&variant_type);
-    if ((d->flags & RGWFileHandle::directory::FLAG_CACHED) &&
-       (state.nlink > 2 /* . and .. */))
-      return true;
-#endif /* 0 */
-
     RGWRMdirCheck req(fs->get_context(), fs->get_user(), this);
     int rc = rgwlib.get_fe()->execute_req(&req);
     if (! rc) {
@@ -926,6 +918,11 @@ namespace rgw {
       rcb("..", cb_arg, 2, RGW_LOOKUP_FLAG_DIR);
     }
 
+    lsubdout(fs->get_context(), rgw, 15)
+      << __func__
+      << " offset=" << *offset
+      << dendl;
+
     if (is_root()) {
       RGWListBucketsRequest req(cct, fs->get_user(), this, rcb, cb_arg,
                                offset);
@@ -933,7 +930,7 @@ namespace rgw {
       if (! rc) {
        lock_guard guard(mtx);
        state.atime = now;
-       set_nlink(2 + 1); // XXXX
+       inc_nlink(req.d_count);
        *eof = req.eof();
        event ev(event::type::READDIR, get_key(), state.atime);
        fs->state.push_event(ev);
@@ -944,12 +941,18 @@ namespace rgw {
       if (! rc) {
        lock_guard guard(mtx);
        state.atime = now;
-       set_nlink(2 + 1); // XXXX
+       inc_nlink(req.d_count);
        *eof = req.eof();
        event ev(event::type::READDIR, get_key(), state.atime);
        fs->state.push_event(ev);
       }
     }
+
+    lsubdout(fs->get_context(), rgw, 15)
+      << __func__
+      << " final link count=" << state.nlink
+      << dendl;
+
     return rc;
   } /* RGWFileHandle::readdir */
 
@@ -1101,10 +1104,13 @@ namespace rgw {
     delete write_req;
   }
 
-  void RGWFileHandle::directory::clear_state()
+  void RGWFileHandle::clear_state()
   {
-    flags &= ~RGWFileHandle::directory::FLAG_CACHED;
-    marker_cache.clear();
+    directory* d = get<directory>(&variant_type);
+    if (d) {
+      state.nlink = 2;
+      d->last_marker = rgw_obj_key{};
+    }
   }
 
   void RGWFileHandle::invalidate() {
index 46ae0f8870187f3221127baa0d9fb17fe08e0697..4f4873b8ca3a656e9f8723d42609c9d7a7cd675f 100644 (file)
@@ -164,7 +164,12 @@ namespace rgw {
     using lock_guard = std::lock_guard<std::mutex>;
     using unique_lock = std::unique_lock<std::mutex>;
 
-    using marker_cache_t = flat_map<uint64_t, rgw_obj_key>;
+    /* TODO: keeping just the last marker is sufficient for
+     * nfs-ganesha 2.4.5; in the near future, nfs-ganesha will
+     * be able to hint the name of the next dirent required,
+     * from which we can directly synthesize a RADOS marker.
+     * using marker_cache_t = flat_map<uint64_t, rgw_obj_key>;
+     */
 
     struct State {
       uint64_t dev;
@@ -189,17 +194,15 @@ namespace rgw {
     struct directory {
 
       static constexpr uint32_t FLAG_NONE =     0x0000;
-      static constexpr uint32_t FLAG_CACHED =   0x0001;
-      static constexpr uint32_t FLAG_OVERFLOW = 0x0002;
 
       uint32_t flags;
-      marker_cache_t marker_cache;
+      rgw_obj_key last_marker;
 
       directory() : flags(FLAG_NONE) {}
-
-      void clear_state();
     };
 
+    void clear_state();
+
     boost::variant<file, directory> variant_type;
 
     uint16_t depth;
@@ -374,7 +377,7 @@ namespace rgw {
 
       switch (fh.fh_type) {
       case RGW_FS_TYPE_DIRECTORY:
-       st->st_nlink = 3;
+       st->st_nlink = 2;
        break;
       case RGW_FS_TYPE_FILE:
        st->st_nlink = 1;
@@ -459,8 +462,7 @@ namespace rgw {
       directory* d = get<directory>(&variant_type);
       if (d) {
        unique_lock guard(mtx);
-       d->marker_cache.insert(
-         marker_cache_t::value_type(off, marker));
+       d->last_marker = marker;
       }
     }
 
@@ -468,9 +470,7 @@ namespace rgw {
       using std::get;
       const directory* d = get<directory>(&variant_type);
       if (d) {
-       const auto& iter = d->marker_cache.find(off);
-       if (iter != d->marker_cache.end())
-         return &(iter->second);
+       return &d->last_marker;
       }
       return nullptr;
     }
@@ -525,6 +525,10 @@ namespace rgw {
       flags &= ~FLAG_CREATING;
     }
 
+    void inc_nlink(const uint64_t n) {
+      state.nlink += n;
+    }
+
     void set_nlink(const uint64_t n) {
       state.nlink = n;
     }
@@ -1139,12 +1143,13 @@ public:
   void* cb_arg;
   rgw_readdir_cb rcb;
   size_t ix;
+  uint32_t d_count;
 
   RGWListBucketsRequest(CephContext* _cct, RGWUserInfo *_user,
                        RGWFileHandle* _rgw_fh, rgw_readdir_cb _rcb,
                        void* _cb_arg, uint64_t* _offset)
     : RGWLibRequest(_cct, _user), rgw_fh(_rgw_fh), offset(_offset),
-      cb_arg(_cb_arg), rcb(_rcb), ix(0) {
+      cb_arg(_cb_arg), rcb(_rcb), ix(0), d_count(0) {
     const auto& mk = rgw_fh->find_marker(*offset);
     if (mk) {
       marker = mk->name;
@@ -1199,8 +1204,15 @@ public:
     for (const auto& iter : m) {
       boost::string_ref marker{iter.first};
       const RGWBucketEnt& ent = iter.second;
-      /* call me maybe */
-      this->operator()(ent.bucket.name, marker);
+      if (! this->operator()(ent.bucket.name, marker)) {
+       /* caller cannot accept more */
+       lsubdout(cct, rgw, 5) << "ListBuckets rcb failed"
+                             << " dirent=" << ent.bucket.name
+                             << " call count=" << ix
+                             << dendl;
+       return;
+      }
+      ++d_count;
       ++ix;
     }
   } /* send_response_data */
@@ -1216,8 +1228,7 @@ public:
     /* update traversal cache */
     rgw_fh->add_marker(off, rgw_obj_key{marker.data(), ""},
                       RGW_FS_TYPE_DIRECTORY);
-    rcb(name.data(), cb_arg, off, RGW_LOOKUP_FLAG_DIR);
-    return 0;
+    return rcb(name.data(), cb_arg, off, RGW_LOOKUP_FLAG_DIR);
   }
 
   bool eof() {
@@ -1242,12 +1253,13 @@ public:
   void* cb_arg;
   rgw_readdir_cb rcb;
   size_t ix;
+  uint32_t d_count;
 
   RGWReaddirRequest(CephContext* _cct, RGWUserInfo *_user,
                    RGWFileHandle* _rgw_fh, rgw_readdir_cb _rcb,
                    void* _cb_arg, uint64_t* _offset)
     : RGWLibRequest(_cct, _user), rgw_fh(_rgw_fh), offset(_offset),
-      cb_arg(_cb_arg), rcb(_rcb), ix(0) {
+      cb_arg(_cb_arg), rcb(_rcb), ix(0), d_count(0) {
     const auto& mk = rgw_fh->find_marker(*offset);
     if (mk) {
       marker = *mk;
@@ -1303,11 +1315,10 @@ public:
     *offset = off;
     /* update traversal cache */
     rgw_fh->add_marker(off, marker, type);
-    rcb(name.data(), cb_arg, off,
-       (type == RGW_FS_TYPE_DIRECTORY) ?
-       RGW_LOOKUP_FLAG_DIR :
-       RGW_LOOKUP_FLAG_FILE);
-    return 0;
+    return rcb(name.data(), cb_arg, off,
+              (type == RGW_FS_TYPE_DIRECTORY) ?
+              RGW_LOOKUP_FLAG_DIR :
+              RGW_LOOKUP_FLAG_FILE);
   }
 
   int get_params() override {
@@ -1340,8 +1351,15 @@ public:
                             << " (" << sref << ")" << ""
                             << dendl;
 
-      /* call me maybe */
-      this->operator()(sref, next_marker, RGW_FS_TYPE_FILE);
+      if(! this->operator()(sref, next_marker, RGW_FS_TYPE_FILE)) {
+       /* caller cannot accept more */
+       lsubdout(cct, rgw, 5) << "readdir rcb failed"
+                             << " dirent=" << sref.data()
+                             << " call count=" << ix
+                             << dendl;
+       return;
+      }
+      ++d_count;
       ++ix;
     }
     for (auto& iter : common_prefixes) {