]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
libcephsqlite: recover from blocklisting
authorPatrick Donnelly <pdonnell@redhat.com>
Mon, 27 Feb 2023 19:56:00 +0000 (14:56 -0500)
committerPatrick Donnelly <pdonnell@redhat.com>
Tue, 27 Jun 2023 18:18:16 +0000 (14:18 -0400)
Presently the libcephsqlite library becomes unusable (by design, at the
time) when the RADOS instance it uses is blocklisted. Unfortunately,
it's fairly common for this to happen during disruptive events. For the
ceph-mgr, it's unacceptable to require manual intervention via the
`ceph mgr fail` command.

So, this commit reworks libcephsqlite to reconnect to RADOS when its
cluster handle is blocklisted. This is safe as any open "file" by
sqlite3 has its own ptr to the open cluster handle. So those open files
will all continually fail until sqlite3 closes the handle (on database
connection shutdown). The application can then reopen the database if
desired using the fresh RADOS handle.

Signed-off-by: Patrick Donnelly <pdonnell@redhat.com>
src/libcephsqlite.cc

index f533780c548bd99bda9555b8902c995872a1f3cb..f1cf0c3f366a49b08499ef4bc2469586a0c0a1d9 100644 (file)
@@ -54,9 +54,9 @@ SQLITE_EXTENSION_INIT1
 #define dout_subsys ceph_subsys_cephsqlite
 #undef dout_prefix
 #define dout_prefix *_dout << "cephsqlite: " << __func__ << ": "
-#define d(vfs,lvl) ldout(getcct(vfs), (lvl)) << "(client." << getdata(vfs).cluster.get_instance_id() << ") "
-#define dv(lvl) d(vfs,(lvl))
-#define df(lvl) d(f->vfs,(lvl)) << f->loc << " "
+#define d(cct,cluster,lvl) ldout((cct), (lvl)) << "(client." << cluster->get_instance_id() << ") "
+#define dv(lvl) d(cct,cluster,(lvl))
+#define df(lvl) d(f->io.cct,f->io.cluster,(lvl)) << f->loc << " "
 
 enum {
   P_FIRST = 0xf0000,
@@ -80,15 +80,21 @@ enum {
   P_LAST,
 };
 
+using cctptr = boost::intrusive_ptr<CephContext>;
+using rsptr = std::shared_ptr<librados::Rados>;
+
 struct cephsqlite_appdata {
   ~cephsqlite_appdata() {
+    {
+      std::scoped_lock lock(cluster_mutex);
+      _disconnect();
+    }
     if (logger) {
       cct->get_perfcounters_collection()->remove(logger.get());
     }
     if (striper_logger) {
       cct->get_perfcounters_collection()->remove(striper_logger.get());
     }
-    cluster.shutdown();
   }
   int setup_perf() {
     ceph_assert(cct);
@@ -118,26 +124,96 @@ struct cephsqlite_appdata {
     cct->get_perfcounters_collection()->add(striper_logger.get());
     return 0;
   }
-  int init_cluster() {
+
+  std::pair<cctptr, rsptr> get_cluster() {
+    std::scoped_lock lock(cluster_mutex);
+    if (!cct) {
+      if (int rc = _open(nullptr); rc < 0) {
+        ceph_abort("could not open connection to ceph");
+      }
+    }
+    return {cct, cluster};
+  }
+  int connect() {
+    std::scoped_lock lock(cluster_mutex);
+    return _connect();
+  }
+  int reconnect() {
+    std::scoped_lock lock(cluster_mutex);
+    _disconnect();
+    return _connect();
+  }
+  int maybe_reconnect(rsptr _cluster) {
+    std::scoped_lock lock(cluster_mutex);
+    if (!cluster || cluster == _cluster) {
+      ldout(cct, 10) << "reconnecting to RADOS" << dendl;
+      _disconnect();
+      return _connect();
+    } else {
+      ldout(cct, 10) << "already reconnected" << dendl;
+      return 0;
+    }
+  }
+  int open(CephContext* _cct) {
+    std::scoped_lock lock(cluster_mutex);
+    return _open(_cct);
+  }
+
+  std::unique_ptr<PerfCounters> logger;
+  std::shared_ptr<PerfCounters> striper_logger;
+
+private:
+  int _open(CephContext* _cct) {
+    if (!_cct) {
+      std::vector<const char*> env_args;
+      env_to_vec(env_args, "CEPH_ARGS");
+      std::string cluster, conf_file_list; // unused
+      CephInitParameters iparams = ceph_argparse_early_args(env_args, CEPH_ENTITY_TYPE_CLIENT, &cluster, &conf_file_list);
+      cct = cctptr(common_preinit(iparams, CODE_ENVIRONMENT_LIBRARY, 0), false);
+      cct->_conf.parse_config_files(nullptr, &std::cerr, 0);
+      cct->_conf.parse_env(cct->get_module_type()); // environment variables override
+      cct->_conf.apply_changes(nullptr);
+      common_init_finish(cct.get());
+    } else {
+      cct = cctptr(_cct);
+    }
+
+    if (int rc = setup_perf(); rc < 0) {
+      return rc;
+    }
+
+    if (int rc = _connect(); rc < 0) {
+      return rc;
+    }
+
+    return 0;
+  }
+  void _disconnect() {
+    if (cluster) {
+      cluster.reset();
+    }
+  }
+  int _connect() {
     ceph_assert(cct);
+    auto _cluster = rsptr(new librados::Rados());
     ldout(cct, 5) << "initializing RADOS handle as " << cct->_conf->name << dendl;
-    if (int rc = cluster.init_with_context(cct.get()); rc < 0) {
+    if (int rc = _cluster->init_with_context(cct.get()); rc < 0) {
       lderr(cct) << "cannot initialize RADOS: " << cpp_strerror(rc) << dendl;
       return rc;
     }
-    if (int rc = cluster.connect(); rc < 0) {
+    if (int rc = _cluster->connect(); rc < 0) {
       lderr(cct) << "cannot connect: " << cpp_strerror(rc) << dendl;
       return rc;
     }
-    auto s = cluster.get_addrs();
+    auto s = _cluster->get_addrs();
     ldout(cct, 5) << "completed connection to RADOS with address " << s << dendl;
+    cluster = std::move(_cluster);
     return 0;
   }
 
-  boost::intrusive_ptr<CephContext> cct;
-  std::unique_ptr<PerfCounters> logger;
-  std::shared_ptr<PerfCounters> striper_logger;
-  librados::Rados cluster;
+  ceph::mutex cluster_mutex = ceph::make_mutex("libcephsqlite");;
+  cctptr cct;
+  rsptr cluster;
 };
 
 struct cephsqlite_fileloc {
@@ -147,6 +223,8 @@ struct cephsqlite_fileloc {
 };
 
 struct cephsqlite_fileio {
+  cctptr cct;
+  rsptr cluster; // anchor for ioctx
   librados::IoCtx ioctx;
   std::unique_ptr<SimpleRADOSStriper> rs;
 };
@@ -176,36 +254,6 @@ struct cephsqlite_file {
 
 #define getdata(vfs) (*((cephsqlite_appdata*)((vfs)->pAppData)))
 
-static CephContext* getcct(sqlite3_vfs* vfs)
-{
-  auto&& appd = getdata(vfs);
-  auto& cct = appd.cct;
-  if (cct) {
-    return cct.get();
-  }
-
-  /* bootstrap cct */
-  std::vector<const char*> env_args;
-  env_to_vec(env_args, "CEPH_ARGS");
-  std::string cluster, conf_file_list; // unused
-  CephInitParameters iparams = ceph_argparse_early_args(env_args, CEPH_ENTITY_TYPE_CLIENT, &cluster, &conf_file_list);
-  cct = boost::intrusive_ptr<CephContext>(common_preinit(iparams, CODE_ENVIRONMENT_LIBRARY, 0), false);
-  cct->_conf.parse_config_files(nullptr, &std::cerr, 0);
-  cct->_conf.parse_env(cct->get_module_type()); // environment variables override
-  cct->_conf.apply_changes(nullptr);
-  common_init_finish(cct.get());
-
-  if (int rc = appd.setup_perf(); rc < 0) {
-    ceph_abort("cannot setup perf counters");
-  }
-
-  if (int rc = appd.init_cluster(); rc < 0) {
-    ceph_abort("cannot setup RADOS cluster handle");
-  }
-
-  return cct.get();
-}
-
 static int Lock(sqlite3_file *file, int ilock)
 {
   auto f = (cephsqlite_file*)file;
@@ -218,6 +266,9 @@ static int Lock(sqlite3_file *file, int ilock)
   if (!f->io.rs->is_locked() && ilock > SQLITE_LOCK_NONE) {
     if (int rc = f->io.rs->lock(0); rc < 0) {
       df(5) << "failed: " << rc << dendl;
+      if (rc == -EBLOCKLISTED) {
+        getdata(f->vfs).maybe_reconnect(f->io.cluster);
+      }
       return SQLITE_IOERR;
     }
   }
@@ -240,6 +291,9 @@ static int Unlock(sqlite3_file *file, int ilock)
   if (ilock <= SQLITE_LOCK_NONE && SQLITE_LOCK_NONE < lock) {
     if (int rc = f->io.rs->unlock(); rc < 0) {
       df(5) << "failed: " << rc << dendl;
+      if (rc == -EBLOCKLISTED) {
+        getdata(f->vfs).maybe_reconnect(f->io.cluster);
+      }
       return SQLITE_IOERR;
     }
   }
@@ -290,6 +344,9 @@ static int Read(sqlite3_file *file, void *buf, int len, sqlite_int64 off)
 
   if (int rc = f->io.rs->read(buf, len, off); rc < 0) {
     df(5) << "read failed: " << cpp_strerror(rc) << dendl;
+    if (rc == -EBLOCKLISTED) {
+      getdata(f->vfs).maybe_reconnect(f->io.cluster);
+    }
     return SQLITE_IOERR_READ;
   } else {
     df(5) << "= " << rc << dendl;
@@ -312,6 +369,9 @@ static int Write(sqlite3_file *file, const void *buf, int len, sqlite_int64 off)
 
   if (int rc = f->io.rs->write(buf, len, off); rc < 0) {
     df(5) << "write failed: " << cpp_strerror(rc) << dendl;
+    if (rc == -EBLOCKLISTED) {
+      getdata(f->vfs).maybe_reconnect(f->io.cluster);
+    }
     return SQLITE_IOERR_WRITE;
   } else {
     df(5) << "= " << rc << dendl;
@@ -330,6 +390,9 @@ static int Truncate(sqlite3_file *file, sqlite_int64 size)
 
   if (int rc = f->io.rs->truncate(size); rc < 0) {
     df(5) << "truncate failed: " << cpp_strerror(rc) << dendl;
+    if (rc == -EBLOCKLISTED) {
+      getdata(f->vfs).maybe_reconnect(f->io.cluster);
+    }
     return SQLITE_IOERR;
   }
 
@@ -346,6 +409,9 @@ static int Sync(sqlite3_file *file, int flags)
 
   if (int rc = f->io.rs->flush(); rc < 0) {
     df(5) << "failed: " << cpp_strerror(rc) << dendl;
+    if (rc == -EBLOCKLISTED) {
+      getdata(f->vfs).maybe_reconnect(f->io.cluster);
+    }
     return SQLITE_IOERR;
   }
 
@@ -366,6 +432,9 @@ static int FileSize(sqlite3_file *file, sqlite_int64 *osize)
   uint64_t size = 0;
   if (int rc = f->io.rs->stat(&size); rc < 0) {
     df(5) << "stat failed: " << cpp_strerror(rc) << dendl;
+    if (rc == -EBLOCKLISTED) {
+      getdata(f->vfs).maybe_reconnect(f->io.cluster);
+    }
     return SQLITE_NOTFOUND;
   }
 
@@ -397,37 +466,34 @@ static bool parsepath(std::string_view path, struct cephsqlite_fileloc* fileloc)
   return true;
 }
 
-static int makestriper(sqlite3_vfs* vfs, const cephsqlite_fileloc& loc, cephsqlite_fileio* io)
+static int makestriper(sqlite3_vfs* vfs, cctptr cct, rsptr cluster, const cephsqlite_fileloc& loc, cephsqlite_fileio* io)
 {
-  auto&& appd = getdata(vfs);
-  auto& cct = appd.cct;
-  auto& cluster = appd.cluster;
   bool gotmap = false;
 
-  dv(10) << loc << dendl;
+  d(cct,cluster,10) << loc << dendl;
 
 enoent_retry:
   if (loc.pool[0] == '*') {
     std::string err;
     int64_t id = strict_strtoll(loc.pool.c_str()+1, 10, &err);
     ceph_assert(err.empty());
-    if (int rc = cluster.ioctx_create2(id, io->ioctx); rc < 0) {
+    if (int rc = cluster->ioctx_create2(id, io->ioctx); rc < 0) {
       if (rc == -ENOENT && !gotmap) {
-        cluster.wait_for_latest_osdmap();
+        cluster->wait_for_latest_osdmap();
         gotmap = true;
         goto enoent_retry;
       }
-      dv(10) << "cannot create ioctx: " << cpp_strerror(rc) << dendl;
+      d(cct,cluster,1) << "cannot create ioctx: " << cpp_strerror(rc) << dendl;
       return rc;
     }
   } else {
-    if (int rc = cluster.ioctx_create(loc.pool.c_str(), io->ioctx); rc < 0) {
+    if (int rc = cluster->ioctx_create(loc.pool.c_str(), io->ioctx); rc < 0) {
       if (rc == -ENOENT && !gotmap) {
-        cluster.wait_for_latest_osdmap();
+        cluster->wait_for_latest_osdmap();
         gotmap = true;
         goto enoent_retry;
       }
-      dv(10) << "cannot create ioctx: " << cpp_strerror(rc) << dendl;
+      d(cct,cluster,1) << "cannot create ioctx: " << cpp_strerror(rc) << dendl;
       return rc;
     }
   }
@@ -436,10 +502,12 @@ enoent_retry:
     io->ioctx.set_namespace(loc.radosns);
 
   io->rs = std::make_unique<SimpleRADOSStriper>(io->ioctx, loc.name);
-  io->rs->set_logger(appd.striper_logger);
+  io->rs->set_logger(getdata(vfs).striper_logger);
   io->rs->set_lock_timeout(cct->_conf.get_val<std::chrono::milliseconds>("cephsqlite_lock_renewal_timeout"));
   io->rs->set_lock_interval(cct->_conf.get_val<std::chrono::milliseconds>("cephsqlite_lock_renewal_interval"));
   io->rs->set_blocklist_the_dead(cct->_conf.get_val<bool>("cephsqlite_blocklist_dead_locker"));
+  io->cluster = std::move(cluster);
+  io->cct = cct;
 
   return 0;
 }
@@ -502,7 +570,7 @@ static int Open(sqlite3_vfs *vfs, const char *name, sqlite3_file *file,
 
   auto start = ceph::coarse_mono_clock::now();
   bool gotmap = false;
-  auto& cluster = getdata(vfs).cluster;
+  auto [cct, cluster] = getdata(vfs).get_cluster();
 
   /* we are not going to create temporary files */
   if (name == NULL) {
@@ -525,9 +593,9 @@ static int Open(sqlite3_vfs *vfs, const char *name, sqlite3_file *file,
   f->flags = flags;
 
 enoent_retry:
-  if (int rc = makestriper(vfs, f->loc, &f->io); rc < 0) {
+  if (int rc = makestriper(vfs, cct, cluster, f->loc, &f->io); rc < 0) {
     f->~cephsqlite_file();
-    dv(5) << "cannot open striper" << dendl;
+    dv(-1) << "cannot open striper" << dendl;
     return SQLITE_IOERR;
   }
 
@@ -540,7 +608,7 @@ enoent_retry:
          * in testing when pools are getting created/deleted left and right.
          */
         dv(5) << "retrying create after getting latest OSDMap" << dendl;
-        cluster.wait_for_latest_osdmap();
+        cluster->wait_for_latest_osdmap();
         gotmap = true;
         goto enoent_retry;
       }
@@ -553,7 +621,7 @@ enoent_retry:
     if (rc == -ENOENT && !gotmap) {
       /* See comment above for create case. */
       dv(5) << "retrying open after getting latest OSDMap" << dendl;
-      cluster.wait_for_latest_osdmap();
+      cluster->wait_for_latest_osdmap();
       gotmap = true;
       goto enoent_retry;
     }
@@ -578,6 +646,7 @@ enoent_retry:
 static int Delete(sqlite3_vfs* vfs, const char* path, int dsync)
 {
   auto start = ceph::coarse_mono_clock::now();
+  auto [cct, cluster] = getdata(vfs).get_cluster();
   dv(5) << "'" << path << "', " << dsync << dendl;
 
   cephsqlite_fileloc fileloc;
@@ -587,8 +656,8 @@ static int Delete(sqlite3_vfs* vfs, const char* path, int dsync)
   }
 
   cephsqlite_fileio io;
-  if (int rc = makestriper(vfs, fileloc, &io); rc < 0) {
-    dv(5) << "cannot open striper" << dendl;
+  if (int rc = makestriper(vfs, cct, cluster, fileloc, &io); rc < 0) {
+    dv(-1) << "cannot open striper" << dendl;
     return SQLITE_IOERR;
   }
 
@@ -616,6 +685,7 @@ static int Delete(sqlite3_vfs* vfs, const char* path, int dsync)
 static int Access(sqlite3_vfs* vfs, const char* path, int flags, int* result)
 {
   auto start = ceph::coarse_mono_clock::now();
+  auto [cct, cluster] = getdata(vfs).get_cluster();
   dv(5) << path << " " << std::hex << flags << dendl;
 
   cephsqlite_fileloc fileloc;
@@ -625,8 +695,8 @@ static int Access(sqlite3_vfs* vfs, const char* path, int flags, int* result)
   }
 
   cephsqlite_fileio io;
-  if (int rc = makestriper(vfs, fileloc, &io); rc < 0) {
-    dv(5) << "cannot open striper" << dendl;
+  if (int rc = makestriper(vfs, cct, cluster, fileloc, &io); rc < 0) {
+    dv(-1) << "cannot open striper" << dendl;
     return SQLITE_IOERR;
   }
 
@@ -662,7 +732,7 @@ static int FullPathname(sqlite3_vfs* vfs, const char* ipath, int opathlen, char*
 {
   auto start = ceph::coarse_mono_clock::now();
   auto path = std::string_view(ipath);
-
+  auto [cct, cluster] = getdata(vfs).get_cluster();
   dv(5) << "1: " <<  path << dendl;
 
   cephsqlite_fileloc fileloc;
@@ -688,6 +758,7 @@ static int FullPathname(sqlite3_vfs* vfs, const char* ipath, int opathlen, char*
 static int CurrentTime(sqlite3_vfs* vfs, sqlite3_int64* time)
 {
   auto start = ceph::coarse_mono_clock::now();
+  auto [cct, cluster] = getdata(vfs).get_cluster();
   dv(5) << time << dendl;
 
   auto t = ceph_clock_now();
@@ -698,33 +769,29 @@ static int CurrentTime(sqlite3_vfs* vfs, sqlite3_int64* time)
   return SQLITE_OK;
 }
 
-LIBCEPHSQLITE_API int cephsqlite_setcct(CephContext* cct, char** ident)
+LIBCEPHSQLITE_API int cephsqlite_setcct(CephContext* _cct, char** ident)
 {
-  ldout(cct, 1) << "cct: " << cct << dendl;
+  ldout(_cct, 1) << "cct: " << _cct << dendl;
 
   if (sqlite3_api == nullptr) {
-    lderr(cct) << "API violation: must have sqlite3 init libcephsqlite" << dendl;
+    lderr(_cct) << "API violation: must have sqlite3 init libcephsqlite" << dendl;
     return -EINVAL;
   }
 
   auto vfs = sqlite3_vfs_find("ceph");
   if (!vfs) {
-    lderr(cct) << "API violation: must have sqlite3 init libcephsqlite" << dendl;
+    lderr(_cct) << "API violation: must have sqlite3 init libcephsqlite" << dendl;
     return -EINVAL;
   }
 
   auto& appd = getdata(vfs);
-  appd.cct = cct;
-  if (int rc = appd.setup_perf(); rc < 0) {
-    appd.cct = nullptr;
-    return rc;
-  }
-  if (int rc = appd.init_cluster(); rc < 0) {
-    appd.cct = nullptr;
+  if (int rc = appd.open(_cct); rc < 0) {
     return rc;
   }
 
-  auto s = appd.cluster.get_addrs();
+  auto [cct, cluster] = appd.get_cluster();
+
+  auto s = cluster->get_addrs();
   if (ident) {
     *ident = strdup(s.c_str());
   }
@@ -737,6 +804,7 @@ LIBCEPHSQLITE_API int cephsqlite_setcct(CephContext* cct, char** ident)
 static void f_perf(sqlite3_context* ctx, int argc, sqlite3_value** argv)
 {
   auto vfs = (sqlite3_vfs*)sqlite3_user_data(ctx);
+  auto [cct, cluster] = getdata(vfs).get_cluster();
   dv(10) << dendl;
   auto&& appd = getdata(vfs);
   JSONFormatter f(false);
@@ -756,12 +824,12 @@ static void f_perf(sqlite3_context* ctx, int argc, sqlite3_value** argv)
 static void f_status(sqlite3_context* ctx, int argc, sqlite3_value** argv)
 {
   auto vfs = (sqlite3_vfs*)sqlite3_user_data(ctx);
+  auto [cct, cluster] = getdata(vfs).get_cluster();
   dv(10) << dendl;
-  auto&& appd = getdata(vfs);
   JSONFormatter f(false);
   f.open_object_section("ceph_status");
-  f.dump_int("id", appd.cluster.get_instance_id());
-  f.dump_string("addr", appd.cluster.get_addrs());
+  f.dump_int("id", cluster->get_instance_id());
+  f.dump_string("addr", cluster->get_addrs());
   f.close_section();
   {
     CachedStackStringStream css;