]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw_file: implement variant offset readdir processing
authorMatt Benjamin <mbenjamin@redhat.com>
Mon, 16 Oct 2017 01:48:19 +0000 (21:48 -0400)
committerMatt Benjamin <mbenjamin@redhat.com>
Wed, 18 Oct 2017 20:23:51 +0000 (16:23 -0400)
Introduce new rgw_readdir2(...), which in which continues
from an arbitrary dirent name, which presumably has been
seen in a prior partial enumeration.

Add single-file unit test for READDIR cases, librgw_file_marker.cc.

Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
src/include/rados/rgw_file.h
src/rgw/rgw_file.cc
src/rgw/rgw_file.h
src/test/CMakeLists.txt
src/test/librgw_file_marker.cc [new file with mode: 0644]

index e27337f13e63f655c34044253902b4f16d2d7c8a..5c76c471c64278b45ce2a66def9428a7e0ba35b2 100644 (file)
@@ -27,7 +27,7 @@ extern "C" {
 
 #define LIBRGW_FILE_VER_MAJOR 1
 #define LIBRGW_FILE_VER_MINOR 1
-#define LIBRGW_FILE_VER_EXTRA 5
+#define LIBRGW_FILE_VER_EXTRA 6
 
 #define LIBRGW_FILE_VERSION(maj, min, extra) ((maj << 16) + (min << 8) + extra)
 #define LIBRGW_FILE_VERSION_CODE LIBRGW_FILE_VERSION(LIBRGW_FILE_VER_MAJOR, LIBRGW_FILE_VER_MINOR, LIBRGW_FILE_VER_EXTRA)
@@ -221,6 +221,12 @@ int rgw_readdir(struct rgw_fs *rgw_fs,
                rgw_readdir_cb rcb, void *cb_arg, bool *eof,
                uint32_t flags);
 
+/* enumeration continuing from name */
+int rgw_readdir2(struct rgw_fs *rgw_fs,
+                struct rgw_file_handle *parent_fh, const char *name,
+                rgw_readdir_cb rcb, void *cb_arg, bool *eof,
+                uint32_t flags);
+
 /* project offset of dirent name */
 #define RGW_DIRENT_OFFSET_FLAG_NONE 0x0000
 
index 3a3edf382bcf04f0e6f952c70c5d79c4842fc05d..7feb9439d05517386ecdf6a9d9f2abf20f54aa4f 100644 (file)
@@ -1023,26 +1023,29 @@ namespace rgw {
     return false;
   }
 
-  int RGWFileHandle::readdir(rgw_readdir_cb rcb, void *cb_arg, uint64_t *offset,
+  std::ostream& operator<<(std::ostream &os,
+                          RGWFileHandle::readdir_offset const &offset)
+  {
+    using boost::get;
+    if (unlikely(!! get<uint64_t*>(&offset))) {
+      uint64_t* ioff = get<uint64_t*>(offset);
+      os << *ioff;
+    }
+    else
+      os << get<const char*>(offset);
+    return os;
+  }
+
+  int RGWFileHandle::readdir(rgw_readdir_cb rcb, void *cb_arg,
+                            readdir_offset offset,
                             bool *eof, uint32_t flags)
   {
     using event = RGWLibFS::event;
+    using boost::get;
     int rc = 0;
     struct timespec now;
     CephContext* cct = fs->get_context();
 
-    if ((*offset == 0) &&
-       (flags & RGW_READDIR_FLAG_DOTDOT)) {
-      /* send '.' and '..' with their NFS-defined offsets */
-      rcb(".", cb_arg, 1, RGW_LOOKUP_FLAG_DIR);
-      rcb("..", cb_arg, 2, RGW_LOOKUP_FLAG_DIR);
-    }
-
-    lsubdout(fs->get_context(), rgw, 15)
-      << __func__
-      << " offset=" << *offset
-      << dendl;
-
     directory* d = get<directory>(&variant_type);
     if (d) {
       (void) clock_gettime(CLOCK_MONOTONIC_COARSE, &now); /* !LOCKED */
@@ -1050,6 +1053,13 @@ namespace rgw {
       d->last_readdir = now;
     }
 
+    bool initial_off;
+    if (likely(!! get<const char*>(&offset))) {
+      initial_off = ! get<const char*>(offset);
+    } else {
+      initial_off = (*get<uint64_t*>(offset) == 0);
+    }
+
     if (is_root()) {
       RGWListBucketsRequest req(cct, fs->get_user(), this, rcb, cb_arg,
                                offset);
@@ -1058,7 +1068,7 @@ namespace rgw {
        (void) clock_gettime(CLOCK_MONOTONIC_COARSE, &now); /* !LOCKED */
        lock_guard guard(mtx);
        state.atime = now;
-       if (*offset == 0)
+       if (initial_off)
          set_nlink(2);
        inc_nlink(req.d_count);
        *eof = req.eof();
@@ -1073,7 +1083,7 @@ namespace rgw {
        (void) clock_gettime(CLOCK_MONOTONIC_COARSE, &now); /* !LOCKED */
        lock_guard guard(mtx);
        state.atime = now;
-       if (*offset == 0)
+       if (initial_off)
          set_nlink(2);
        inc_nlink(req.d_count);
        *eof = req.eof();
@@ -1893,9 +1903,50 @@ int rgw_readdir(struct rgw_fs *rgw_fs,
     /* bad parent */
     return -EINVAL;
   }
+
+  lsubdout(parent->get_fs()->get_context(), rgw, 15)
+    << __func__
+    << " offset=" << *offset
+    << dendl;
+
+  if ((*offset == 0) &&
+      (flags & RGW_READDIR_FLAG_DOTDOT)) {
+    /* send '.' and '..' with their NFS-defined offsets */
+    rcb(".", cb_arg, 1, RGW_LOOKUP_FLAG_DIR);
+    rcb("..", cb_arg, 2, RGW_LOOKUP_FLAG_DIR);
+  }
+
   int rc = parent->readdir(rcb, cb_arg, offset, eof, flags);
   return rc;
-}
+} /* rgw_readdir */
+
+/* enumeration continuing from name */
+int rgw_readdir2(struct rgw_fs *rgw_fs,
+                struct rgw_file_handle *parent_fh, const char *name,
+                rgw_readdir_cb rcb, void *cb_arg, bool *eof,
+                uint32_t flags)
+{
+  RGWFileHandle* parent = get_rgwfh(parent_fh);
+  if (! parent) {
+    /* bad parent */
+    return -EINVAL;
+  }
+
+  lsubdout(parent->get_fs()->get_context(), rgw, 15)
+    << __func__
+    << " offset=" << name
+    << dendl;
+
+  if ((! name) &&
+      (flags & RGW_READDIR_FLAG_DOTDOT)) {
+    /* send '.' and '..' with their NFS-defined offsets */
+    rcb(".", cb_arg, 1, RGW_LOOKUP_FLAG_DIR);
+    rcb("..", cb_arg, 2, RGW_LOOKUP_FLAG_DIR);
+  }
+
+  int rc = parent->readdir(rcb, cb_arg, name, eof, flags);
+  return rc;
+} /* rgw_readdir2 */
 
 /* project offset of dirent name */
 int rgw_dirent_offset(struct rgw_fs *rgw_fs,
index 008b40efe30958dc031270b98df9bbb837168095..6eb2d51150f6cd50e3960f7a79cad048f2eb6717 100644 (file)
@@ -555,8 +555,11 @@ namespace rgw {
       return -EPERM;
     }
 
-    int readdir(rgw_readdir_cb rcb, void *cb_arg, uint64_t *offset, bool *eof,
-               uint32_t flags);
+    typedef boost::variant<uint64_t*, const char*> readdir_offset;
+
+    int readdir(rgw_readdir_cb rcb, void *cb_arg, readdir_offset offset,
+               bool *eof, uint32_t flags);
+
     int write(uint64_t off, size_t len, size_t *nbytes, void *buffer);
 
     int commit(uint64_t offset, uint64_t length, uint32_t flags) {
@@ -1215,20 +1218,32 @@ class RGWListBucketsRequest : public RGWLibRequest,
 {
 public:
   RGWFileHandle* rgw_fh;
-  uint64_t* offset;
+  RGWFileHandle::readdir_offset offset;
   void* cb_arg;
   rgw_readdir_cb rcb;
+  uint64_t* ioff;
   size_t ix;
   uint32_t d_count;
 
   RGWListBucketsRequest(CephContext* _cct, RGWUserInfo *_user,
                        RGWFileHandle* _rgw_fh, rgw_readdir_cb _rcb,
-                       void* _cb_arg, uint64_t* _offset)
+                       void* _cb_arg, RGWFileHandle::readdir_offset& _offset)
     : RGWLibRequest(_cct, _user), rgw_fh(_rgw_fh), offset(_offset),
-      cb_arg(_cb_arg), rcb(_rcb), ix(0), d_count(0) {
-    const auto& mk = rgw_fh->find_marker(*offset);
-    if (mk) {
-      marker = mk->name;
+      cb_arg(_cb_arg), rcb(_rcb), ioff(nullptr), ix(0), d_count(0) {
+
+    using boost::get;
+
+    if (unlikely(!! get<uint64_t*>(&offset))) {
+      ioff = get<uint64_t*>(offset);
+      const auto& mk = rgw_fh->find_marker(*ioff);
+      if (mk) {
+       marker = mk->name;
+      }
+    } else {
+      const char* mk = get<const char*>(offset);
+      if (mk) {
+       marker = mk;
+      }
     }
     op = this;
   }
@@ -1299,7 +1314,9 @@ public:
   int operator()(const boost::string_ref& name,
                 const boost::string_ref& marker) {
     uint64_t off = XXH64(name.data(), name.length(), fh_key::seed);
-    *offset = off;
+    if (!! ioff) {
+      *ioff = off;
+    }
     /* update traversal cache */
     rgw_fh->add_marker(off, rgw_obj_key{marker.data(), ""},
                       RGW_FS_TYPE_DIRECTORY);
@@ -1308,7 +1325,7 @@ public:
   }
 
   bool eof() {
-    lsubdout(cct, rgw, 15) << "READDIR offset: " << *offset
+    lsubdout(cct, rgw, 15) << "READDIR offset: " << offset
                           << " is_truncated: " << is_truncated
                           << dendl;
     return !is_truncated;
@@ -1325,21 +1342,37 @@ class RGWReaddirRequest : public RGWLibRequest,
 {
 public:
   RGWFileHandle* rgw_fh;
-  uint64_t* offset;
+  RGWFileHandle::readdir_offset offset;
   void* cb_arg;
   rgw_readdir_cb rcb;
+  uint64_t* ioff;
   size_t ix;
   uint32_t d_count;
 
   RGWReaddirRequest(CephContext* _cct, RGWUserInfo *_user,
                    RGWFileHandle* _rgw_fh, rgw_readdir_cb _rcb,
-                   void* _cb_arg, uint64_t* _offset)
+                   void* _cb_arg, RGWFileHandle::readdir_offset& _offset)
     : RGWLibRequest(_cct, _user), rgw_fh(_rgw_fh), offset(_offset),
-      cb_arg(_cb_arg), rcb(_rcb), ix(0), d_count(0) {
-    const auto& mk = rgw_fh->find_marker(*offset);
-    if (mk) {
-      marker = *mk;
+      cb_arg(_cb_arg), rcb(_rcb), ioff(nullptr), ix(0), d_count(0) {
+
+    using boost::get;
+
+    if (unlikely(!! get<uint64_t*>(&offset))) {
+      ioff = get<uint64_t*>(offset);
+      const auto& mk = rgw_fh->find_marker(*ioff);
+      if (mk) {
+       marker = *mk;
+      }
+    } else {
+      const char* mk = get<const char*>(offset);
+      if (mk) {
+       std::string tmark{rgw_fh->relative_object_name()};
+       tmark += "/";
+       tmark += mk;    
+       marker = rgw_obj_key{std::move(tmark), "", ""};
+      }
     }
+
     default_max = 1000; // XXX was being omitted
     op = this;
   }
@@ -1388,7 +1421,9 @@ public:
 
     /* hash offset of name in parent (short name) for NFS readdir cookie */
     uint64_t off = XXH64(name.data(), name.length(), fh_key::seed);
-    *offset = off;
+    if (unlikely(!! ioff)) {
+      *ioff = off;
+    }
     /* update traversal cache */
     rgw_fh->add_marker(off, marker, type);
     ++d_count;
@@ -1478,7 +1513,7 @@ public:
   }
 
   bool eof() {
-    lsubdout(cct, rgw, 15) << "READDIR offset: " << *offset
+    lsubdout(cct, rgw, 15) << "READDIR offset: " << offset
                           << " next marker: " << next_marker
                           << " is_truncated: " << is_truncated
                           << dendl;
index 6c5414627a3f5b2ff962b66171bf996016fa6552..13ef4f8184e4abc3374e1eafe12e9089966ad681 100644 (file)
@@ -348,6 +348,20 @@ target_link_libraries(ceph_test_librgw_file_aw
   ${EXTRALIBS}
   )
 
+# ceph_test_librgw_file_marker (READDIR with string and uint64 offsets)
+add_executable(ceph_test_librgw_file_marker
+  librgw_file_marker.cc
+  )
+set_target_properties(ceph_test_librgw_file_marker PROPERTIES COMPILE_FLAGS
+  ${UNITTEST_CXX_FLAGS})
+target_link_libraries(ceph_test_librgw_file_marker
+  rgw
+  librados
+  ceph-common
+  ${UNITTEST_LIBS}
+  ${EXTRALIBS}
+  )
+
 # ceph_test_rgw_token
 add_executable(ceph_test_rgw_token
   test_rgw_token.cc
diff --git a/src/test/librgw_file_marker.cc b/src/test/librgw_file_marker.cc
new file mode 100644 (file)
index 0000000..74199df
--- /dev/null
@@ -0,0 +1,488 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include <stdint.h>
+#include <tuple>
+#include <iostream>
+#include <fstream>
+#include <stack>
+
+#include "include/rados/librgw.h"
+#include "include/rados/rgw_file.h"
+#include "rgw/rgw_file.h"
+#include "rgw/rgw_lib_frontend.h" // direct requests
+
+#include "gtest/gtest.h"
+#include "common/backport14.h"
+#include "common/ceph_argparse.h"
+#include "common/debug.h"
+#include "global/global_init.h"
+#include "include/assert.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+namespace {
+
+  using namespace rgw;
+  using std::get;
+  using std::string;
+
+  librgw_t rgw_h = nullptr;
+  string userid("testuser");
+  string access_key("");
+  string secret_key("");
+  struct rgw_fs *fs = nullptr;
+  CephContext* cct = nullptr;
+
+  uint32_t owner_uid = 867;
+  uint32_t owner_gid = 5309;
+
+  uint32_t create_mask = RGW_SETATTR_UID | RGW_SETATTR_GID | RGW_SETATTR_MODE;
+
+  string bucket_name("nfsroot");
+
+  class obj_rec
+  {
+  public:
+    string name;
+    struct rgw_file_handle* fh;
+    struct rgw_file_handle* parent_fh;
+    RGWFileHandle* rgw_fh; // alias into fh
+
+    struct state {
+      bool readdir;
+      state() : readdir(false) {}
+    } state;
+
+    obj_rec(string _name, struct rgw_file_handle* _fh,
+           struct rgw_file_handle* _parent_fh, RGWFileHandle* _rgw_fh)
+      : name(std::move(_name)), fh(_fh), parent_fh(_parent_fh),
+       rgw_fh(_rgw_fh) {}
+
+    void clear() {
+      fh = nullptr;
+      rgw_fh = nullptr;
+    }
+
+    void sync() {
+      if (fh)
+       rgw_fh = get_rgwfh(fh);
+    }
+
+    friend ostream& operator<<(ostream& os, const obj_rec& rec);
+  };
+
+  ostream& operator<<(ostream& os, const obj_rec& rec)
+  {
+    RGWFileHandle* rgw_fh = rec.rgw_fh;
+    if (rgw_fh) {
+      const char* type = rgw_fh->is_dir() ? "DIR " : "FILE ";
+      os << rec.rgw_fh->full_object_name()
+        << " (" << rec.rgw_fh->object_name() << "): "
+        << type;
+    }
+    return os;
+  }
+  
+  std::stack<obj_rec> obj_stack;
+  std::deque<obj_rec> cleanup_queue;
+
+  typedef std::vector<obj_rec> obj_vec;
+  typedef std::tuple<obj_rec, obj_vec> dirs1_rec;
+  typedef std::vector<dirs1_rec> dirs1_vec;
+
+  dirs1_vec dirs_vec;
+
+  struct obj_rec_st
+  {
+    const obj_rec& obj;
+    const struct stat& st;
+
+    obj_rec_st(const obj_rec& _obj, const struct stat& _st)
+      : obj(_obj), st(_st) {}
+  };
+
+  ostream& operator<<(ostream& os, const obj_rec_st& rec)
+  {
+    RGWFileHandle* rgw_fh = rec.obj.rgw_fh;
+    if (rgw_fh) {
+      const char* type = rgw_fh->is_dir() ? "DIR " : "FILE ";
+      os << rgw_fh->full_object_name()
+        << " (" << rgw_fh->object_name() << "): "
+        << type;
+      const struct stat& st = rec.st;
+      switch(uint8_t(rgw_fh->is_dir())) {
+      case 1:
+       os << " mode: " << st.st_mode;
+       os << " nlinks: " << st.st_nlink;
+       break;
+      case 0:
+      default:
+       os << " mode: " << st.st_mode;
+       os << " size: " << st.st_size;
+       // xxx
+       break;
+      }
+    }
+    return os;
+  }
+
+  bool do_marker1 = false;
+  bool do_marker2 = true;
+  bool do_create = false;
+  bool do_delete = false;
+  bool verbose = false;
+
+  string marker_dir("nfs_marker");
+  struct rgw_file_handle *bucket_fh = nullptr;
+  struct rgw_file_handle *marker_fh;
+  static constexpr int marker_nobjs = 2*1024;
+  std::deque<obj_rec> marker_objs;
+
+  using dirent_t = std::tuple<std::string, uint64_t>;
+  struct dirent_vec
+  {
+    std::vector<dirent_t> obj_names;
+    uint32_t count;
+    dirent_vec() : count(0) {}
+  };
+
+  struct {
+    int argc;
+    char **argv;
+  } saved_args;
+}
+
+TEST(LibRGW, TVAR) {
+  typedef boost::variant<uint64_t*, const char*> readdir_offset;
+
+  uint64_t i1{64001};
+  std::string s1{"blunderbuss"};
+
+  readdir_offset v1{&i1};
+  readdir_offset v2{s1.c_str()};
+  readdir_offset v3{static_cast<const char*>(nullptr)};
+
+  uint64_t* pi1 = get<uint64_t*>(v1);
+  ASSERT_NE(pi1, nullptr);
+  std::cout << "read i1: " << *pi1 << std::endl;
+
+  const char* ps1 = get<const char*>(v2);
+  ASSERT_NE(ps1, nullptr);
+  std::cout << "read s1: " << ps1 << std::endl;
+
+  const char* ps3 = get<const char*>(v3);
+  ASSERT_EQ(ps3, nullptr);
+  std::cout << "read s3: " << ps3 << std::endl;
+}
+
+TEST(LibRGW, INIT) {
+  int ret = librgw_create(&rgw_h, saved_args.argc, saved_args.argv);
+  ASSERT_EQ(ret, 0);
+  ASSERT_NE(rgw_h, nullptr);
+}
+
+TEST(LibRGW, MOUNT) {
+  int ret = rgw_mount2(rgw_h, userid.c_str(), access_key.c_str(),
+                       secret_key.c_str(), "/", &fs, RGW_MOUNT_FLAG_NONE);
+  ASSERT_EQ(ret, 0);
+  ASSERT_NE(fs, nullptr);
+
+  cct = static_cast<RGWLibFS*>(fs->fs_private)->get_context();
+}
+
+TEST(LibRGW, MARKER1_SETUP_BUCKET) {
+  /* "large" directory enumeration test.  this one deals only with
+   * file objects */
+  struct stat st;
+  int ret;
+
+  st.st_uid = owner_uid;
+  st.st_gid = owner_gid;
+  st.st_mode = 755;
+
+  (void) rgw_lookup(fs, fs->root_fh, bucket_name.c_str(), &bucket_fh,
+                   RGW_LOOKUP_FLAG_NONE);
+  if (! bucket_fh) {
+    if (do_create) {
+      struct stat st;
+
+      st.st_uid = owner_uid;
+      st.st_gid = owner_gid;
+      st.st_mode = 755;
+
+      ret = rgw_mkdir(fs, fs->root_fh, bucket_name.c_str(), &st, create_mask,
+                     &bucket_fh, RGW_MKDIR_FLAG_NONE);
+      ASSERT_EQ(ret, 0);
+    }
+  }
+
+  ASSERT_NE(bucket_fh, nullptr);
+
+  (void) rgw_lookup(fs, bucket_fh, marker_dir.c_str(), &marker_fh,
+                   RGW_LOOKUP_FLAG_NONE);
+  if (! marker_fh) {
+    if (do_create) {
+      ret = rgw_mkdir(fs, bucket_fh, marker_dir.c_str(), &st, create_mask,
+                     &marker_fh, RGW_MKDIR_FLAG_NONE);
+      ASSERT_EQ(ret, 0);
+    }
+  }
+
+  ASSERT_NE(marker_fh, nullptr);
+}
+
+TEST(LibRGW, MARKER1_SETUP_OBJECTS)
+{
+  /* "large" directory enumeration test.  this one deals only with
+   * file objects */
+  if (do_create) {
+    int ret;
+
+    for (int ix = 0; ix < marker_nobjs; ++ix) {
+      std::string object_name("f_");
+      object_name += to_string(ix);
+      obj_rec obj{object_name, nullptr, marker_fh, nullptr};
+      // lookup object--all operations are by handle
+      ret = rgw_lookup(fs, marker_fh, obj.name.c_str(), &obj.fh,
+                      RGW_LOOKUP_FLAG_CREATE);
+      ASSERT_EQ(ret, 0);
+      obj.rgw_fh = get_rgwfh(obj.fh);
+      // open object--open transaction
+      ret = rgw_open(fs, obj.fh, 0 /* posix flags */, RGW_OPEN_FLAG_NONE);
+      ASSERT_EQ(ret, 0);
+      ASSERT_TRUE(obj.rgw_fh->is_open());
+      // unstable write data
+      size_t nbytes;
+      string data("data for ");
+      data += object_name;
+      int ret = rgw_write(fs, obj.fh, 0, data.length(), &nbytes,
+                         (void*) data.c_str(), RGW_WRITE_FLAG_NONE);
+      ASSERT_EQ(ret, 0);
+      ASSERT_EQ(nbytes, data.length());
+      // commit transaction (write on close)
+      ret = rgw_close(fs, obj.fh, 0 /* flags */);
+      ASSERT_EQ(ret, 0);
+      // save for cleanup
+      marker_objs.push_back(obj);
+    }
+  }
+}
+
+extern "C" {
+  static bool r2_cb(const char* name, void *arg, uint64_t offset,
+                   uint32_t flags) {
+    dirent_vec& dvec =
+      *(static_cast<dirent_vec*>(arg));
+    lsubdout(cct, rgw, 10) << __func__
+                          << " bucket=" << bucket_name
+                          << " dir=" << marker_dir
+                          << " iv count=" << dvec.count
+                          << " called back name=" << name
+                          << " flags=" << flags
+                          << dendl;
+
+  std::cout << __func__
+                          << " bucket=" << bucket_name
+                          << " dir=" << marker_dir
+                          << " iv count=" << dvec.count
+                          << " called back name=" << name
+                          << " flags=" << flags
+                          << std::endl;
+
+    string name_str{name};
+    if (! ((name_str == ".") ||
+          (name_str == ".."))) {
+      dvec.obj_names.push_back(dirent_t{std::move(name_str), offset});
+    }
+    return true; /* XXX */
+  }
+}
+
+TEST(LibRGW, MARKER1_READDIR)
+{
+  if (do_marker1) {
+    using std::get;
+
+    dirent_vec dvec;
+    uint64_t offset = 0;
+    bool eof = false;
+
+    /* because RGWReaddirRequest::default_max is 1000 (XXX make
+     * configurable?) and marker_nobjs is 5*1024, the number
+     * of required rgw_readdir operations N should be
+     * marker_nobjs/1000 < N < marker_nobjs/1000+1, i.e., 6 when
+     * marker_nobjs==5*1024 */
+    uint32_t max_iterations = marker_nobjs/1000+1;
+
+    do {
+      ASSERT_TRUE(dvec.count <= max_iterations);
+      int ret = rgw_readdir(fs, marker_fh, &offset, r2_cb, &dvec, &eof,
+                           RGW_READDIR_FLAG_DOTDOT);
+      ASSERT_EQ(ret, 0);
+      ASSERT_EQ(offset, get<1>(dvec.obj_names.back())); // cookie check
+      ++dvec.count;
+    } while(!eof);
+    std::cout << "Read " << dvec.obj_names.size() << " objects in "
+             << marker_dir.c_str() << std::endl;
+  }
+}
+
+TEST(LibRGW, MARKER2_READDIR)
+{
+  if (do_marker2) {
+    using std::get;
+
+    dirent_vec dvec;
+    std::string marker{""};
+    bool eof = false;
+
+    /* because RGWReaddirRequest::default_max is 1000 (XXX make
+     * configurable?) and marker_nobjs is 5*1024, the number
+     * of required rgw_readdir operations N should be
+     * marker_nobjs/1000 < N < marker_nobjs/1000+1, i.e., 6 when
+     * marker_nobjs==5*1024 */
+    uint32_t max_iterations = marker_nobjs/1000+1;
+
+    do {
+      ASSERT_TRUE(dvec.count <= max_iterations);
+      int ret = rgw_readdir2(fs, marker_fh,
+                            (marker.length() > 0) ? marker.c_str() : nullptr,
+                            r2_cb, &dvec, &eof,
+                            RGW_READDIR_FLAG_DOTDOT);
+      ASSERT_EQ(ret, 0);
+      marker = get<0>(dvec.obj_names.back());
+      ++dvec.count;
+    } while((!eof) && dvec.count < 4);
+    std::cout << "Read " << dvec.obj_names.size() << " objects in "
+             << marker_dir.c_str() << std::endl;
+  }
+}
+
+TEST(LibRGW, MARKER1_OBJ_CLEANUP)
+{
+  int rc;
+  for (auto& obj : marker_objs) {
+    if (obj.fh) {
+      if (do_delete) {
+       if (verbose) {
+         std::cout << "unlinking: " << bucket_name << ":" << obj.name
+                   << std::endl;
+       }
+       rc = rgw_unlink(fs, marker_fh, obj.name.c_str(), RGW_UNLINK_FLAG_NONE);
+      }
+      rc = rgw_fh_rele(fs, obj.fh, 0 /* flags */);
+      ASSERT_EQ(rc, 0);
+    }
+  }
+  marker_objs.clear();
+}
+
+TEST(LibRGW, CLEANUP) {
+  int rc;
+
+  if (do_marker1) {
+    cleanup_queue.push_back(
+      obj_rec{bucket_name, bucket_fh, fs->root_fh, get_rgwfh(fs->root_fh)});
+  }
+
+  for (auto& elt : cleanup_queue) {
+    if (elt.fh) {
+      rc = rgw_fh_rele(fs, elt.fh, 0 /* flags */);
+      ASSERT_EQ(rc, 0);
+    }
+  }
+  cleanup_queue.clear();
+}
+
+TEST(LibRGW, UMOUNT) {
+  if (! fs)
+    return;
+
+  int ret = rgw_umount(fs, RGW_UMOUNT_FLAG_NONE);
+  ASSERT_EQ(ret, 0);
+}
+
+TEST(LibRGW, SHUTDOWN) {
+  librgw_shutdown(rgw_h);
+}
+
+int main(int argc, char *argv[])
+{
+  char *v{nullptr};
+  string val;
+  vector<const char*> args;
+
+  argv_to_vec(argc, const_cast<const char**>(argv), args);
+  env_to_vec(args);
+
+  v = getenv("AWS_ACCESS_KEY_ID");
+  if (v) {
+    access_key = v;
+  }
+
+  v = getenv("AWS_SECRET_ACCESS_KEY");
+  if (v) {
+    secret_key = v;
+  }
+
+  for (auto arg_iter = args.begin(); arg_iter != args.end();) {
+    if (ceph_argparse_witharg(args, arg_iter, &val, "--access",
+                             (char*) nullptr)) {
+      access_key = val;
+    } else if (ceph_argparse_witharg(args, arg_iter, &val, "--secret",
+                                    (char*) nullptr)) {
+      secret_key = val;
+    } else if (ceph_argparse_witharg(args, arg_iter, &val, "--userid",
+                                    (char*) nullptr)) {
+      userid = val;
+    } else if (ceph_argparse_witharg(args, arg_iter, &val, "--bn",
+                                    (char*) nullptr)) {
+      bucket_name = val;
+    } else if (ceph_argparse_witharg(args, arg_iter, &val, "--uid",
+                                    (char*) nullptr)) {
+      owner_uid = std::stoi(val);
+    } else if (ceph_argparse_witharg(args, arg_iter, &val, "--gid",
+                                    (char*) nullptr)) {
+      owner_gid = std::stoi(val);
+    } else if (ceph_argparse_flag(args, arg_iter, "--marker1",
+                                           (char*) nullptr)) {
+      do_marker1 = true;
+    } else if (ceph_argparse_flag(args, arg_iter, "--create",
+                                           (char*) nullptr)) {
+      do_create = true;
+    } else if (ceph_argparse_flag(args, arg_iter, "--delete",
+                                           (char*) nullptr)) {
+      do_delete = true;
+    } else if (ceph_argparse_flag(args, arg_iter, "--verbose",
+                                           (char*) nullptr)) {
+      verbose = true;
+    } else {
+      ++arg_iter;
+    }
+  }
+
+  /* dont accidentally run as anonymous */
+  if ((access_key == "") ||
+      (secret_key == "")) {
+    std::cout << argv[0] << " no AWS credentials, exiting" << std::endl;
+    return EPERM;
+  }
+
+  saved_args.argc = argc;
+  saved_args.argv = argv;
+
+  ::testing::InitGoogleTest(&argc, argv);
+  return RUN_ALL_TESTS();
+}