]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
monc: Asifoact MonClient
authorAdam C. Emerson <aemerson@redhat.com>
Tue, 28 Apr 2020 21:32:56 +0000 (17:32 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Fri, 15 May 2020 14:55:10 +0000 (10:55 -0400)
Of course now everyone has to feed an io_context into the MonClient.

Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
35 files changed:
src/ceph_fuse.cc
src/ceph_mds.cc
src/ceph_osd.cc
src/ceph_syn.cc
src/client/Client.cc
src/common/ceph_context.h
src/common/options.cc
src/global/global_init.cc
src/global/global_init.h
src/libcephfs.cc
src/librados/RadosClient.cc
src/librados/RadosClient.h
src/libradosstriper/MultiAioCompletionImpl.cc
src/libradosstriper/MultiAioCompletionImpl.h
src/libradosstriper/RadosStriperImpl.h
src/mds/MDSDaemon.cc
src/mds/MDSDaemon.h
src/mds/MDSRank.cc
src/mds/MDSRank.h
src/mgr/MgrContext.h
src/mgr/MgrStandby.cc
src/mgr/MgrStandby.h
src/mon/AuthMonitor.cc
src/mon/MonClient.cc
src/mon/MonClient.h
src/mount/conf.cc
src/osd/OSD.cc
src/osd/OSD.h
src/osdc/Objecter.cc
src/osdc/Objecter.h
src/test/mon/test-mon-msg.cc
src/test/mon/test_mon_workloadgen.cc
src/test/osd/TestOSDScrub.cc
src/tools/cephfs/MDSUtility.cc
src/tools/cephfs/MDSUtility.h

index 9d6141c1eaa4c5434b4d0174bbaffc53c100a6e0..ab4db60920c0263a931145757bfc1226af198704 100644 (file)
@@ -16,7 +16,9 @@
 #include <sys/utsname.h>
 #include <iostream>
 #include <string>
+#include <optional>
 
+#include "common/async/context_pool.h"
 #include "common/config.h"
 #include "common/errno.h"
 
@@ -45,6 +47,8 @@
 
 #define dout_context g_ceph_context
 
+ceph::async::io_context_pool icp;
+
 static void fuse_usage()
 {
   const char* argv[] = {
@@ -234,7 +238,8 @@ int main(int argc, const char **argv, const char *envp[]) {
     int tester_r = 0;
     void *tester_rp = nullptr;
 
-    MonClient *mc = new MonClient(g_ceph_context);
+    icp.start(cct->_conf.get_val<std::uint64_t>("client_asio_thread_count"));
+    MonClient *mc = new MonClient(g_ceph_context, icp);
     int r = mc->build_initial_monmap();
     if (r == -EINVAL) {
       cerr << "failed to generate initial mon list" << std::endl;
@@ -316,6 +321,7 @@ int main(int argc, const char **argv, const char *envp[]) {
     client->unmount();
     cfuse->finalize();
   out_shutdown:
+    icp.stop();
     client->shutdown();
   out_init_failed:
     unregister_async_signal_handler(SIGHUP, sighup_handler);
index 4d960dc2e7f2733fceb54ade343cea5c5a533fcc..82c89ae9d5280df5213504b5490e7403bf4cf45e 100644 (file)
@@ -20,6 +20,7 @@
 #include <iostream>
 #include <string>
 
+#include "common/async/context_pool.h"
 #include "include/ceph_features.h"
 #include "include/compat.h"
 #include "include/random.h"
@@ -195,7 +196,8 @@ int main(int argc, const char **argv)
   register_async_signal_handler(SIGHUP, sighup_handler);
   
   // get monmap
-  MonClient mc(g_ceph_context);
+  ceph::async::io_context_pool ctxpool(2);
+  MonClient mc(g_ceph_context, ctxpool);
   if (mc.build_initial_monmap() < 0)
     forker.exit(1);
   global_init_chdir(g_ceph_context);
@@ -203,7 +205,7 @@ int main(int argc, const char **argv)
   msgr->start();
 
   // start mds
-  mds = new MDSDaemon(g_conf()->name.get_id().c_str(), msgr, &mc);
+  mds = new MDSDaemon(g_conf()->name.get_id().c_str(), msgr, &mc, ctxpool);
 
   // in case we have to respawn...
   mds->orig_argc = argc;
@@ -234,6 +236,7 @@ int main(int argc, const char **argv)
   shutdown_async_signal_handler();
 
  shutdown:
+  ctxpool.stop();
   // yuck: grab the mds lock, so we can be sure that whoever in *mds
   // called shutdown finishes what they were doing.
   mds->mds_lock.lock();
@@ -257,4 +260,3 @@ int main(int argc, const char **argv)
 
   return 0;
 }
-
index 9fde44000288ff98f6964f7f8dfbfca9b3e92670..63ae56643fa6efc7a74782878078fa4b99b6758b 100644 (file)
@@ -670,7 +670,10 @@ flushjournal_out:
 
   srand(time(NULL) + getpid());
 
-  MonClient mc(g_ceph_context);
+  ceph::async::io_context_pool poolctx(
+    cct->_conf.get_val<std::uint64_t>("osd_asio_thread_count"));
+
+  MonClient mc(g_ceph_context, poolctx);
   if (mc.build_initial_monmap() < 0)
     return -1;
   global_init_chdir(g_ceph_context);
@@ -691,7 +694,8 @@ flushjournal_out:
                   ms_objecter,
                   &mc,
                   data_path,
-                  journal_path);
+                  journal_path,
+                  poolctx);
 
   int err = osdptr->pre_init();
   if (err < 0) {
@@ -746,6 +750,7 @@ flushjournal_out:
   shutdown_async_signal_handler();
 
   // done
+  poolctx.stop();
   delete osdptr;
   delete ms_public;
   delete ms_hb_front_client;
index 50e26f2815a00b0f812015f342e56642e7cdbb10..e3ca1328702060ca905acf12c494fb288bf69240 100644 (file)
@@ -18,6 +18,7 @@
 
 #include "common/config.h"
 
+#include "common/async/context_pool.h"
 #include "client/SyntheticClient.h"
 #include "client/Client.h"
 
@@ -50,7 +51,8 @@ int main(int argc, const char **argv, char *envp[])
   pick_addresses(g_ceph_context, CEPH_PICK_ADDRESS_PUBLIC);
 
   // get monmap
-  MonClient mc(g_ceph_context);
+  ceph::async::io_context_pool  poolctx(1);
+  MonClient mc(g_ceph_context, poolctx);
   if (mc.build_initial_monmap() < 0)
     return -1;
 
@@ -64,7 +66,7 @@ int main(int argc, const char **argv, char *envp[])
     messengers[i] = Messenger::create_client_messenger(g_ceph_context,
                                                       "synclient");
     messengers[i]->bind(g_conf()->public_addr);
-    mclients[i] = new MonClient(g_ceph_context);
+    mclients[i] = new MonClient(g_ceph_context, poolctx);
     mclients[i]->build_initial_monmap();
     auto client = new StandaloneClient(messengers[i], mclients[i]);
     client->set_filer_flags(syn_filer_flags);
@@ -79,6 +81,8 @@ int main(int argc, const char **argv, char *envp[])
        ++p)
     (*p)->start_thread();
 
+  poolctx.stop();
+
   //cout << "waiting for client(s) to finish" << std::endl;
   while (!clients.empty()) {
     Client *client = clients.front();
@@ -99,4 +103,3 @@ int main(int argc, const char **argv, char *envp[])
   }
   return 0;
 }
-
index 0a2a4da425557816f1c3111599dcb66ee1915fe8..8bd78b4046bed492c355d8a27868b501901931f0 100644 (file)
@@ -44,6 +44,7 @@
 
 #include "common/config.h"
 #include "common/version.h"
+#include "common/async/blocked_completion.h"
 
 #include "mon/MonClient.h"
 
 
 #define DEBUG_GETATTR_CAPS (CEPH_CAP_XATTR_SHARED)
 
+namespace bs = boost::system;
+namespace ca = ceph::async;
+
 void client_flush_set_callback(void *p, ObjectCacher::ObjectSet *oset)
 {
   Client *client = static_cast<Client*>(p);
@@ -5703,22 +5707,21 @@ int Client::authenticate()
 
 int Client::fetch_fsmap(bool user)
 {
-  int r;
   // Retrieve FSMap to enable looking up daemon addresses.  We need FSMap
   // rather than MDSMap because no one MDSMap contains all the daemons, and
   // a `tell` can address any daemon.
   version_t fsmap_latest;
+  bs::error_code ec;
   do {
-    C_SaferCond cond;
-    monclient->get_version("fsmap", &fsmap_latest, NULL, &cond);
     client_lock.unlock();
-    r = cond.wait();
+    std::tie(fsmap_latest, std::ignore) =
+      monclient->get_version("fsmap", ca::use_blocked[ec]);
     client_lock.lock();
-  } while (r == -EAGAIN);
+  } while (ec == bs::errc::resource_unavailable_try_again);
 
-  if (r < 0) {
-    lderr(cct) << "Failed to learn FSMap version: " << cpp_strerror(r) << dendl;
-    return r;
+  if (ec) {
+    lderr(cct) << "Failed to learn FSMap version: " << ec << dendl;
+    return ceph::from_error_code(ec);
   }
 
   ldout(cct, 10) << __func__ << " learned FSMap version " << fsmap_latest << dendl;
@@ -14669,7 +14672,7 @@ mds_rank_t Client::_get_random_up_mds() const
 
 
 StandaloneClient::StandaloneClient(Messenger *m, MonClient *mc)
-    : Client(m, mc, new Objecter(m->cct, m, mc, NULL, 0, 0))
+  : Client(m, mc, new Objecter(m->cct, m, mc, nullptr, 0, 0))
 {
   monclient->set_messenger(m);
   objecter->set_client_incarnation(0);
index adacecebfc3991d4008ec4c661fdd09d890ed3c9..187cd3d165a7ab0e8406d129645f1d397d664e0a 100644 (file)
 #include <typeinfo>
 #include <typeindex>
 
-#include "include/common_fwd.h"
+#include <boost/intrusive_ptr.hpp>
+
 #include "include/any.h"
+#include "include/common_fwd.h"
 
 #include "common/cmdparse.h"
 #include "common/code_environment.h"
@@ -375,4 +377,17 @@ private:
 #endif
 #endif // WITH_SEASTAR
 
+#if !(defined(WITH_SEASTAR) && !defined(WITH_ALIEN)) && defined(__cplusplus)
+namespace ceph::common {
+inline void intrusive_ptr_add_ref(CephContext* cct)
+{
+  cct->get();
+}
+
+inline void intrusive_ptr_release(CephContext* cct)
+{
+  cct->put();
+}
+}
+#endif // !(defined(WITH_SEASTAR) && !defined(WITH_ALIEN)) && defined(__cplusplus)
 #endif
index 9d7c97af532e60fee84af9660f271520a51ded77..53cc9df101c04a8aa4eb0a6309fd3bd488c252ea 100644 (file)
@@ -5357,6 +5357,18 @@ std::vector<Option> get_global_options() {
     .set_default(0)
     .set_description("Override 60 second periods for testing only"),
 
+    Option("librados_thread_count", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
+    .set_default(2)
+    .set_min(1)
+    .set_description("Size of thread pool for Objecter")
+    .add_tag("client"),
+
+    Option("osd_asio_thread_count", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
+    .set_default(2)
+    .set_min(1)
+    .set_description("Size of thread pool for ASIO completions")
+    .add_tag("osd"),
+
     // ----------------------------
     // Crimson specific options
 
@@ -8215,6 +8227,12 @@ std::vector<Option> get_mds_options() {
      .set_flag(Option::FLAG_RUNTIME)
      .set_description("max snapshots per directory")
      .set_long_description("maximum number of snapshots that can be created per directory"),
+
+    Option("mds_asio_thread_count", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
+    .set_default(2)
+    .set_min(1)
+    .set_description("Size of thread pool for ASIO completions")
+    .add_tag("mds")
   });
 }
 
@@ -8457,6 +8475,12 @@ std::vector<Option> get_mds_client_options() {
     Option("debug_allow_any_pool_priority", Option::TYPE_BOOL, Option::LEVEL_DEV)
     .set_default(false)
     .set_description("Allow any pool priority to be set to test conversion to new range"),
+
+    Option("client_asio_thread_count", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
+    .set_default(2)
+    .set_min(1)
+    .set_description("Size of thread pool for ASIO completions")
+    .add_tag("client")
   });
 }
 
index f396da9ed58034b08c05e9acedc2c105d3096171..c59d0228b24f8c2ac74e659e7d4382dbee8050f1 100644 (file)
@@ -12,6 +12,7 @@
  *
  */
 
+#include "common/async/context_pool.h"
 #include "common/ceph_argparse.h"
 #include "common/code_environment.h"
 #include "common/config.h"
@@ -343,13 +344,16 @@ global_init(const std::map<std::string,std::string> *defaults,
     // make sure our mini-session gets legacy values
     g_conf().apply_changes(nullptr);
 
-    MonClient mc_bootstrap(g_ceph_context);
+    ceph::async::io_context_pool cp(1);
+    MonClient mc_bootstrap(g_ceph_context, cp);
     if (mc_bootstrap.get_monmap_and_config() < 0) {
+      cp.stop();
       g_ceph_context->_log->flush();
       cerr << "failed to fetch mon config (--no-mon-config to skip)"
           << std::endl;
       _exit(1);
     }
+    cp.stop();
   }
 
   // Expand metavariables. Invoke configuration observers. Open log file.
@@ -408,17 +412,7 @@ global_init(const std::map<std::string,std::string> *defaults,
 
   return boost::intrusive_ptr<CephContext>{g_ceph_context, false};
 }
-namespace TOPNSPC::common {
-void intrusive_ptr_add_ref(CephContext* cct)
-{
-  cct->get();
-}
 
-void intrusive_ptr_release(CephContext* cct)
-{
-  cct->put();
-}
-}
 void global_print_banner(void)
 {
   output_ceph_version();
index d1d6dbbddbfb1272a507bf4c6f42d6384b1fef82..b14ddb57c4c20f9ff110b45734af2ae2c9f10250 100644 (file)
@@ -20,6 +20,7 @@
 #include <map>
 #include <boost/intrusive_ptr.hpp>
 #include "include/ceph_assert.h"
+#include "common/ceph_context.h"
 #include "common/code_environment.h"
 #include "common/common_init.h"
 
@@ -38,11 +39,6 @@ global_init(
   const char *data_dir_option = 0,
   bool run_pre_init = true);
 
-namespace TOPNSPC::common {
-  void intrusive_ptr_add_ref(CephContext* cct);
-  void intrusive_ptr_release(CephContext* cct);
-}
-
 // just the first half; enough to get config parsed but doesn't start up the
 // cct or log.
 void global_pre_init(const std::map<std::string,std::string> *defaults,
index ebc8e932825b7d136d270007c513b034b7ad2a63..623b6bb51ab7353a7366c3c512e32cff8bb99934 100644 (file)
@@ -20,6 +20,7 @@
 #include "auth/Crypto.h"
 #include "client/Client.h"
 #include "librados/RadosClient.h"
+#include "common/async/context_pool.h"
 #include "common/ceph_argparse.h"
 #include "common/common_init.h"
 #include "common/config.h"
@@ -36,6 +37,7 @@
 #define DEFAULT_UMASK 002
 
 static mode_t umask_cb(void *);
+ceph::async::io_context_pool icp;
 
 struct ceph_mount_info
 {
@@ -83,8 +85,9 @@ public:
       cct->_log->start();
     }
 
+    icp.start(cct->_conf.get_val<std::uint64_t>("client_asio_thread_count"));
     {
-      MonClient mc_bootstrap(cct);
+      MonClient mc_bootstrap(cct, icp);
       ret = mc_bootstrap.get_monmap_and_config();
       if (ret < 0)
        return ret;
@@ -93,7 +96,7 @@ public:
     common_init_finish(cct);
 
     //monmap
-    monclient = new MonClient(cct);
+    monclient = new MonClient(cct, icp);
     ret = -CEPHFS_ERROR_MON_MAP_BUILD; //defined in libcephfs.h;
     if (monclient->build_initial_monmap() < 0)
       goto fail;
@@ -202,6 +205,7 @@ public:
       delete messenger;
       messenger = nullptr;
     }
+    icp.stop();
     if (monclient) {
       delete monclient;
       monclient = nullptr;
index 3fc4bbec659024c973a185266b086756422783b7..3a96cc07cee4aec085db7213261e5b0dfd54927c 100644 (file)
@@ -28,6 +28,7 @@
 #include "common/ceph_json.h"
 #include "common/errno.h"
 #include "common/ceph_json.h"
+#include "common/async/blocked_completion.h"
 #include "include/buffer.h"
 #include "include/stringify.h"
 #include "include/util.h"
 #undef dout_prefix
 #define dout_prefix *_dout << "librados: "
 
+namespace bs = boost::system;
+namespace ca = ceph::async;
+
 librados::RadosClient::RadosClient(CephContext *cct_)
   : Dispatcher(cct_->get()),
     cct_deleter{cct_, [](CephContext *p) {p->put();}},
     conf(cct_->_conf),
     state(DISCONNECTED),
-    monclient(cct_),
+    monclient(cct_, poolctx),
     mgrclient(cct_, nullptr, &monclient.monmap),
     messenger(NULL),
     instance_id(0),
@@ -233,7 +237,7 @@ int librados::RadosClient::connect()
   }
 
   {
-    MonClient mc_bootstrap(cct);
+    MonClient mc_bootstrap(cct, poolctx);
     err = mc_bootstrap.get_monmap_and_config();
     if (err < 0)
       return err;
@@ -241,6 +245,8 @@ int librados::RadosClient::connect()
 
   common_init_finish(cct);
 
+  poolctx.start(cct->_conf.get_val<std::uint64_t>("librados_thread_count"));
+
   // get monmap
   err = monclient.build_initial_monmap();
   if (err < 0)
@@ -382,6 +388,7 @@ void librados::RadosClient::shutdown()
     messenger->wait();
   }
   ldout(cct, 1) << "shutdown" << dendl;
+  poolctx.finish();
 }
 
 int librados::RadosClient::watch_flush()
@@ -848,7 +855,20 @@ void librados::RadosClient::mon_command_async(const vector<string>& cmd,
                                               Context *on_finish)
 {
   std::lock_guard l{lock};
-  monclient.start_mon_command(cmd, inbl, outbl, outs, on_finish);
+  monclient.start_mon_command(cmd, inbl,
+                             [outs, outbl,
+                              on_finish = std::unique_ptr<Context>(on_finish)]
+                             (bs::error_code e,
+                              std::string&& s,
+                              ceph::bufferlist&& b) mutable {
+                               if (outs)
+                                 *outs = std::move(s);
+                               if (outbl)
+                                 *outbl = std::move(b);
+                               if (on_finish)
+                                 on_finish.release()->complete(
+                                   ceph::from_error_code(e));
+                             });
 }
 
 int librados::RadosClient::mgr_command(const vector<string>& cmd,
@@ -902,36 +922,30 @@ int librados::RadosClient::mon_command(int rank, const vector<string>& cmd,
                                       const bufferlist &inbl,
                                       bufferlist *outbl, string *outs)
 {
-  ceph::mutex mylock = ceph::make_mutex("RadosClient::mon_command::mylock");
-  ceph::condition_variable cond;
-  bool done;
-  int rval;
-  {
-    std::lock_guard l{mylock};
-    monclient.start_mon_command(rank, cmd, inbl, outbl, outs,
-                               new C_SafeCond(mylock, cond, &done, &rval));
-  }
-  std::unique_lock l{mylock};
-  cond.wait(l, [&done] { return done;});
-  return rval;
+  bs::error_code ec;
+  auto&& [s, bl] = monclient.start_mon_command(rank, cmd, inbl,
+                                              ca::use_blocked[ec]);
+  if (outs)
+    *outs = std::move(s);
+  if (outbl)
+    *outbl = std::move(bl);
+
+  return ceph::from_error_code(ec);
 }
 
 int librados::RadosClient::mon_command(string name, const vector<string>& cmd,
                                       const bufferlist &inbl,
                                       bufferlist *outbl, string *outs)
 {
-  ceph::mutex mylock = ceph::make_mutex("RadosClient::mon_command::mylock");
-  ceph::condition_variable cond;
-  bool done;
-  int rval;
-  {
-    std::lock_guard l{mylock};
-    monclient.start_mon_command(name, cmd, inbl, outbl, outs,
-                               new C_SafeCond(mylock, cond, &done, &rval));
-  }
-  std::unique_lock l{mylock};
-  cond.wait(l, [&done] { return done;});
-  return rval;
+  bs::error_code ec;
+  auto&& [s, bl] = monclient.start_mon_command(name, cmd, inbl,
+                                              ca::use_blocked[ec]);
+  if (outs)
+    *outs = std::move(s);
+  if (outbl)
+    *outbl = std::move(bl);
+
+  return ceph::from_error_code(ec);
 }
 
 int librados::RadosClient::osd_command(int osd, vector<string>& cmd,
@@ -1174,3 +1188,24 @@ int librados::RadosClient::get_inconsistent_pgs(int64_t pool_id,
   }
   return 0;
 }
+
+namespace {
+const char *config_keys[] = {
+  "librados_thread_count",
+  NULL
+};
+}
+
+const char** librados::RadosClient::get_tracked_conf_keys() const
+{
+  return config_keys;
+}
+
+void librados::RadosClient::handle_conf_change(const ConfigProxy& conf,
+                                              const std::set<std::string> &changed)
+{
+  if (changed.count("librados_thread_count")) {
+    poolctx.stop();
+    poolctx.start(conf.get_val<std::uint64_t>("librados_thread_count"));
+  }
+}
index a3b8d20dfb48bc160cdfd281df3d6c994cd69590..5ad083ead05eb864c776c9378bfd5eb516d343ec 100644 (file)
 #ifndef CEPH_LIBRADOS_RADOSCLIENT_H
 #define CEPH_LIBRADOS_RADOSCLIENT_H
 
+#include <functional>
+#include <memory>
+#include <string>
+
+#include "msg/Dispatcher.h"
+
+#include "common/async/context_pool.h"
 #include "common/config_fwd.h"
 #include "common/Cond.h"
 #include "common/Timer.h"
 #include "common/ceph_mutex.h"
 #include "common/ceph_time.h"
+#include "common/config_obs.h"
 #include "include/common_fwd.h"
 #include "include/rados/librados.h"
 #include "include/rados/librados.hpp"
 #include "mon/MonClient.h"
 #include "mgr/MgrClient.h"
-#include "msg/Dispatcher.h"
 
 #include "IoCtxImpl.h"
 
-struct AuthAuthorizer;
 struct Context;
-struct Connection;
 class Message;
 class MLog;
 class Messenger;
 class AioCompletionImpl;
 
-class librados::RadosClient : public Dispatcher
+class librados::RadosClient : public Dispatcher,
+                             public md_config_obs_t
 {
   std::unique_ptr<CephContext,
                  std::function<void(CephContext*)> > cct_deleter;
 
 public:
   using Dispatcher::cct;
-  const ConfigProxy& conf;
+  const ConfigProxy& conf{cct->_conf};
+  ceph::async::io_context_pool poolctx;
 private:
   enum {
     DISCONNECTED,
@@ -89,7 +96,7 @@ public:
 
   explicit RadosClient(CephContext *cct_);
   ~RadosClient() override;
-  int ping_monitor(string mon_id, string *result);
+  int ping_monitor(std::string mon_id, std::string *result);
   int connect();
   void shutdown();
 
@@ -179,6 +186,9 @@ public:
   mon_feature_t get_required_monitor_features() const;
 
   int get_inconsistent_pgs(int64_t pool_id, std::vector<std::string>* pgs);
+  const char** get_tracked_conf_keys() const override;
+  void handle_conf_change(const ConfigProxy& conf,
+                          const std::set <std::string> &changed) override;
 };
 
 #endif
index 901bb1366640ce1aa3e67c4177eadf0285e084af..acf9e0b6b8b1b3ec071801ca1ccc694d8919d012 100644 (file)
@@ -58,13 +58,3 @@ void libradosstriper::MultiAioCompletionImpl::finish_adding_requests()
   if (!pending_safe)
     safe();
 }
-
-void intrusive_ptr_add_ref(libradosstriper::MultiAioCompletionImpl* ptr)
-{
-  ptr->get();
-}
-
-void intrusive_ptr_release(libradosstriper::MultiAioCompletionImpl* ptr)
-{
-  ptr->put();
-}
index 32f7b9a84cf92274f2b420de7edd40893a8988f0..3ac3aae44920fcef0ea3526da0d06c6d8e78d6ea 100644 (file)
@@ -20,7 +20,9 @@
 #include "common/ceph_mutex.h"
 #include "include/radosstriper/libradosstriper.hpp"
 
-struct libradosstriper::MultiAioCompletionImpl {
+namespace libradosstriper {
+
+struct MultiAioCompletionImpl {
 
   ceph::mutex lock = ceph::make_mutex("MultiAioCompletionImpl lock", false);
   ceph::condition_variable cond;
@@ -151,10 +153,17 @@ struct libradosstriper::MultiAioCompletionImpl {
   void complete_request(ssize_t r);
   void safe_request(ssize_t r);
   void finish_adding_requests();
-
 };
 
-void intrusive_ptr_add_ref(libradosstriper::MultiAioCompletionImpl*);
-void intrusive_ptr_release(libradosstriper::MultiAioCompletionImpl*);
+inline void intrusive_ptr_add_ref(MultiAioCompletionImpl* ptr)
+{
+  ptr->get();
+}
+
+inline void intrusive_ptr_release(MultiAioCompletionImpl* ptr)
+{
+  ptr->put();
+}
+}
 
 #endif // CEPH_LIBRADOSSTRIPERSTRIPER_MULTIAIOCOMPLETIONIMPL_H
index 160db7b6f8d36e917c8f001a803e0fb0270c2a38..8226a9ba2a23436d539975a815c65c2119726e6a 100644 (file)
@@ -17,6 +17,8 @@
 
 #include <string>
 
+#include <boost/intrusive_ptr.hpp>
+
 #include "include/rados/librados.h"
 #include "include/rados/librados.hpp"
 #include "include/radosstriper/libradosstriper.h"
@@ -26,6 +28,7 @@
 #include "librados/IoCtxImpl.h"
 #include "librados/AioCompletionImpl.h"
 #include "common/RefCountedObj.h"
+#include "common/ceph_context.h"
 
 namespace libradosstriper {
 
index d0ff38a3ef068644f4de6b86696dce6d0770e25b..2091ad2016dbf88f3018fe36d78747f99506ffd6 100644 (file)
@@ -61,7 +61,8 @@
 #define dout_prefix *_dout << "mds." << name << ' '
 using TOPNSPC::common::cmd_getval;
 // cons/des
-MDSDaemon::MDSDaemon(std::string_view n, Messenger *m, MonClient *mc) :
+MDSDaemon::MDSDaemon(std::string_view n, Messenger *m, MonClient *mc,
+                    boost::asio::io_context& ioctx) :
   Dispatcher(m->cct),
   timer(m->cct, mds_lock),
   gss_ktfile_client(m->cct->_conf.get_val<std::string>("gss_ktab_client_file")),
@@ -69,6 +70,7 @@ MDSDaemon::MDSDaemon(std::string_view n, Messenger *m, MonClient *mc) :
   name(n),
   messenger(m),
   monc(mc),
+  ioctx(ioctx),
   mgrc(m->cct, m, &mc->monmap),
   log_client(m->cct, messenger, &mc->monmap, LogClient::NO_FLAGS),
   starttime(mono_clock::now())
@@ -741,10 +743,12 @@ void MDSDaemon::handle_mds_map(const cref_t<MMDSMap> &m)
 
     // Did I previously not hold a rank?  Initialize!
     if (mds_rank == NULL) {
-      mds_rank = new MDSRankDispatcher(whoami, mds_lock, clog,
-          timer, beacon, mdsmap, messenger, monc, &mgrc,
-          new LambdaContext([this](int r){respawn();}),
-          new LambdaContext([this](int r){suicide();}));
+      mds_rank = new MDSRankDispatcher(
+       whoami, mds_lock, clog,
+       timer, beacon, mdsmap, messenger, monc, &mgrc,
+       new LambdaContext([this](int r){respawn();}),
+       new LambdaContext([this](int r){suicide();}),
+       ioctx);
       dout(10) <<  __func__ << ": initializing MDS rank "
                << mds_rank->get_nodeid() << dendl;
       mds_rank->init();
index 7024d94c580a50000ed19474a90da597c2aa5a23..2f97de4b6f36159da7efe56ac17063f4b57dd8c6 100644 (file)
@@ -42,7 +42,9 @@ class MonClient;
 
 class MDSDaemon : public Dispatcher {
  public:
-  MDSDaemon(std::string_view n, Messenger *m, MonClient *mc);
+  MDSDaemon(std::string_view n, Messenger *m, MonClient *mc,
+           boost::asio::io_context& ioctx);
+
   ~MDSDaemon() override;
 
   mono_time get_starttime() const {
@@ -129,6 +131,7 @@ class MDSDaemon : public Dispatcher {
 
   Messenger    *messenger;
   MonClient    *monc;
+  boost::asio::io_context& ioctx;
   MgrClient     mgrc;
   std::unique_ptr<MDSMap> mdsmap;
   LogClient    log_client;
index ce64fa0a5a895f15da5159de5dd87ea0c5f4d9a2..cc858b0028ba135a07e0c972d42de4508398cfd7 100644 (file)
@@ -484,7 +484,8 @@ MDSRank::MDSRank(
     MonClient *monc_,
     MgrClient *mgrc,
     Context *respawn_hook_,
-    Context *suicide_hook_) :
+    Context *suicide_hook_,
+    boost::asio::io_context& ioc) :
     cct(msgr->cct), mds_lock(mds_lock_), clog(clog_),
     timer(timer_), mdsmap(mdsmap_),
     objecter(new Objecter(g_ceph_context, msgr, monc_, nullptr, 0, 0)),
@@ -504,7 +505,8 @@ MDSRank::MDSRank(
     messenger(msgr), monc(monc_), mgrc(mgrc),
     respawn_hook(respawn_hook_),
     suicide_hook(suicide_hook_),
-    starttime(mono_clock::now())
+    starttime(mono_clock::now()),
+    ioc(ioc)
 {
   hb = g_ceph_context->get_heartbeat_map()->add_worker("MDSRank", pthread_self());
 
@@ -3503,9 +3505,10 @@ MDSRankDispatcher::MDSRankDispatcher(
     MonClient *monc_,
     MgrClient *mgrc,
     Context *respawn_hook_,
-    Context *suicide_hook_)
+    Context *suicide_hook_,
+    boost::asio::io_context& ioc)
   : MDSRank(whoami_, mds_lock_, clog_, timer_, beacon_, mdsmap_,
-            msgr, monc_, mgrc, respawn_hook_, suicide_hook_)
+            msgr, monc_, mgrc, respawn_hook_, suicide_hook_, ioc)
 {
     g_conf().add_observer(this);
 }
index c0605be626a0b5c15ac345ca89f01c9a702ec309..a657b1731be7e4ee4080b6e36764cdbb25a81b47 100644 (file)
@@ -17,6 +17,8 @@
 
 #include <string_view>
 
+#include <boost/asio/io_context.hpp>
+
 #include "common/DecayCounter.h"
 #include "common/LogClient.h"
 #include "common/Timer.h"
@@ -151,7 +153,8 @@ class MDSRank {
         MonClient *monc_,
         MgrClient *mgrc,
         Context *respawn_hook_,
-        Context *suicide_hook_);
+        Context *suicide_hook_,
+       boost::asio::io_context& ioc);
 
     mds_rank_t get_nodeid() const { return whoami; }
     int64_t get_metadata_pool();
@@ -580,6 +583,7 @@ private:
     void send_task_status();
 
     mono_time starttime = mono_clock::zero();
+    boost::asio::io_context& ioc;
 };
 
 /* This expects to be given a reference which it is responsible for.
@@ -628,7 +632,8 @@ public:
       MonClient *monc_,
       MgrClient *mgrc,
       Context *respawn_hook_,
-      Context *suicide_hook_);
+      Context *suicide_hook_,
+      boost::asio::io_context& ioc);
 
   void init();
   void tick();
index bec51a3687188e3c86b61b05dc02abe93e3f9423..c6e647365b505317b75de76c9107b53e63a6b103 100644 (file)
@@ -17,6 +17,7 @@
 #include <memory>
 
 #include "common/ceph_json.h"
+#include "common/Cond.h"
 #include "mon/MonClient.h"
 
 class Command
index 296494ece14cad86084fcc888bb30c2ab27c681a..7e30eac618168cbacbad54a124665d4acc5fc251 100644 (file)
@@ -38,7 +38,7 @@
 
 MgrStandby::MgrStandby(int argc, const char **argv) :
   Dispatcher(g_ceph_context),
-  monc{g_ceph_context},
+  monc{g_ceph_context, poolctx},
   client_messenger(Messenger::create(
                     g_ceph_context,
                     cct->_conf.get_val<std::string>("ms_type"),
@@ -115,6 +115,8 @@ int MgrStandby::init()
   client_messenger->add_dispatcher_tail(&client);
   client_messenger->start();
 
+  poolctx.start(2);
+
   // Initialize MonClient
   if (monc.build_initial_monmap() < 0) {
     client_messenger->shutdown();
@@ -272,20 +274,21 @@ void MgrStandby::shutdown()
 
     dout(4) << "Shutting down" << dendl;
 
-    // stop sending beacon first, i use monc to talk with monitors
+    py_module_registry.shutdown();
+    // stop sending beacon first, I use monc to talk with monitors
     timer.shutdown();
     // client uses monc and objecter
     client.shutdown();
     mgrc.shutdown();
+    // Stop asio threads, so leftover events won't call into shut down
+    // monclient/objecter.
+    poolctx.finish();
     // stop monc, so mon won't be able to instruct me to shutdown/activate after
     // the active_mgr is stopped
     monc.shutdown();
     if (active_mgr) {
       active_mgr->shutdown();
     }
-
-    py_module_registry.shutdown();
-
     // objecter is used by monc and active_mgr
     objecter.shutdown();
     // client_messenger is used by all of them, so stop it in the end
index d79fef2d5e577cab03c295f00a3e2f7426253a30..cac31a5764017ce6e566e8e7e7117f8f95a7a526 100644 (file)
@@ -16,6 +16,7 @@
 #define MGR_STANDBY_H_
 
 #include "auth/Auth.h"
+#include "common/async/context_pool.h"
 #include "common/Finisher.h"
 #include "common/Timer.h"
 #include "common/LogClient.h"
@@ -39,6 +40,7 @@ public:
                          const std::set <std::string> &changed) override;
 
 protected:
+  ceph::async::io_context_pool poolctx;
   MonClient monc;
   std::unique_ptr<Messenger> client_messenger;
   Objecter objecter;
index 0609cd82c4ae4117166b483217955f2dfc314661..508aebc46e8074d12c612105ab6a013753d1cdb4 100644 (file)
@@ -1802,7 +1802,7 @@ bool AuthMonitor::_upgrade_format_to_dumpling()
     // set daemon profiles
     if ((p->first.is_osd() || p->first.is_mds()) &&
         mon_caps == "allow rwx") {
-      new_caps = string("allow profile ") + p->first.get_type_name();
+      new_caps = string("allow profile ") + std::string(p->first.get_type_name());
     }
 
     // update bootstrap keys
index 75f59231c5aa1d26de07f5263195c5d868838118..e2e46bc24fa316ccbc502c2eb30c58ce1b38ad8e 100644 (file)
@@ -26,6 +26,7 @@
 
 #include "messages/MMonGetMap.h"
 #include "messages/MMonGetVersion.h"
+#include "messages/MMonGetMap.h"
 #include "messages/MMonGetVersionReply.h"
 #include "messages/MMonMap.h"
 #include "messages/MConfig.h"
@@ -46,6 +47,7 @@
 #include "common/LogClient.h"
 
 #include "MonClient.h"
+#include "error_code.h"
 #include "MonMap.h"
 
 #include "auth/Auth.h"
 #undef dout_prefix
 #define dout_prefix *_dout << "monclient" << (_hunting() ? "(hunting)":"") << ": "
 
+namespace bs = boost::system;
 using std::string;
+using namespace std::literals;
 
-MonClient::MonClient(CephContext *cct_) :
+MonClient::MonClient(CephContext *cct_, boost::asio::io_context& service) :
   Dispatcher(cct_),
   AuthServer(cct_),
   messenger(NULL),
   timer(cct_, monc_lock),
-  finisher(cct_),
+  service(service),
   initialized(false),
   log_client(NULL),
   more_log_pending(false),
@@ -95,7 +99,7 @@ int MonClient::get_monmap()
 {
   ldout(cct, 10) << __func__ << dendl;
   std::unique_lock l(monc_lock);
-  
+
   sub.want("monmap", 0, 0);
   if (!_opened())
     _reopen_session();
@@ -432,13 +436,18 @@ void MonClient::handle_monmap(MMonMap *m)
 void MonClient::handle_config(MConfig *m)
 {
   ldout(cct,10) << __func__ << " " << *m << dendl;
-  finisher.queue(new LambdaContext([this, m](int r) {
-       cct->_conf.set_mon_vals(cct, m->config, config_cb);
-       if (config_notify_cb) {
-         config_notify_cb();
-       }
-       m->put();
-      }));
+  // Take the sledgehammer approach to ensuring we don't depend on
+  // anything in MonClient.
+  boost::asio::defer(finish_strand,
+                   [m, cct = boost::intrusive_ptr<CephContext>(cct),
+                    config_notify_cb = config_notify_cb,
+                    config_cb = config_cb]() {
+                     cct->_conf.set_mon_vals(cct.get(), m->config, config_cb);
+                     if (config_notify_cb) {
+                       config_notify_cb();
+                     }
+                     m->put();
+                   });
   got_config = true;
   map_cond.notify_all();
 }
@@ -478,7 +487,6 @@ int MonClient::init()
   messenger->add_dispatcher_head(this);
 
   timer.init();
-  finisher.start();
   schedule_tick();
 
   return 0;
@@ -490,10 +498,10 @@ void MonClient::shutdown()
   monc_lock.lock();
   stopping = true;
   while (!version_requests.empty()) {
-    version_requests.begin()->second->context->complete(-ECANCELED);
+    ceph::async::defer(std::move(version_requests.begin()->second),
+                      monc_errc::shutting_down, 0, 0);
     ldout(cct, 20) << __func__ << " canceling and discarding version request "
-                  << version_requests.begin()->second << dendl;
-    delete version_requests.begin()->second;
+                  << version_requests.begin()->first << dendl;
     version_requests.erase(version_requests.begin());
   }
   while (!mon_commands.empty()) {
@@ -511,8 +519,6 @@ void MonClient::shutdown()
   monc_lock.unlock();
 
   if (initialized) {
-    finisher.wait_for_empty();
-    finisher.stop();
     initialized = false;
   }
   monc_lock.lock();
@@ -696,8 +702,8 @@ void MonClient::_reopen_session(int rank)
 
   // throw out version check requests
   while (!version_requests.empty()) {
-    finisher.queue(version_requests.begin()->second->context, -EAGAIN);
-    delete version_requests.begin()->second;
+    ceph::async::defer(std::move(version_requests.begin()->second),
+                      monc_errc::session_reset, 0, 0);
     version_requests.erase(version_requests.begin());
   }
 
@@ -1091,10 +1097,9 @@ void MonClient::_send_command(MonCommand *r)
   if (r->is_tell()) {
     ++r->send_attempts;
     if (r->send_attempts > cct->_conf->mon_client_directed_command_retry) {
-      _finish_command(r, -ENXIO, "mon unavailable");
+      _finish_command(r, monc_errc::mon_unavailable, "mon unavailable", {});
       return;
     }
-
     // tell-style command
     if (monmap.min_mon_release >= ceph_release_t::octopus) {
       if (r->target_con) {
@@ -1104,7 +1109,7 @@ void MonClient::_send_command(MonCommand *r)
        if (r->target_rank >= (int)monmap.size()) {
          ldout(cct, 10) << " target " << r->target_rank
                         << " >= max mon " << monmap.size() << dendl;
-         _finish_command(r, -ENOENT, "mon rank dne");
+         _finish_command(r, monc_errc::rank_dne, "mon rank dne"sv, {});
          return;
        }
        r->target_con = messenger->connect_to_mon(
@@ -1113,7 +1118,7 @@ void MonClient::_send_command(MonCommand *r)
        if (!monmap.contains(r->target_name)) {
          ldout(cct, 10) << " target " << r->target_name
                         << " not present in monmap" << dendl;
-         _finish_command(r, -ENOENT, "mon dne");
+         _finish_command(r, monc_errc::mon_dne, "mon dne"sv, {});
          return;
        }
        r->target_con = messenger->connect_to_mon(
@@ -1148,7 +1153,7 @@ void MonClient::_send_command(MonCommand *r)
       if (r->target_rank >= (int)monmap.size()) {
        ldout(cct, 10) << " target " << r->target_rank
                       << " >= max mon " << monmap.size() << dendl;
-       _finish_command(r, -ENOENT, "mon rank dne");
+       _finish_command(r, monc_errc::rank_dne, "mon rank dne"sv, {});
        return;
       }
       _reopen_session(r->target_rank);
@@ -1163,7 +1168,7 @@ void MonClient::_send_command(MonCommand *r)
       if (!monmap.contains(r->target_name)) {
        ldout(cct, 10) << " target " << r->target_name
                       << " not present in monmap" << dendl;
-       _finish_command(r, -ENOENT, "mon dne");
+       _finish_command(r, monc_errc::mon_dne, "mon dne"sv, {});
        return;
       }
       _reopen_session(monmap.get_rank(r->target_name));
@@ -1234,9 +1239,10 @@ void MonClient::handle_mon_command_ack(MMonCommandAck *ack)
   }
 
   ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
-  if (r->poutbl)
-    r->poutbl->claim(ack->get_data());
-  _finish_command(r, ack->r, ack->rs);
+  auto ec = ack->r < 0 ? bs::error_code(-ack->r, mon_category())
+    : bs::error_code();
+  _finish_command(r, ec, ack->rs,
+                 std::move(ack->get_data()));
   ack->put();
 }
 
@@ -1261,9 +1267,9 @@ void MonClient::handle_command_reply(MCommandReply *reply)
   }
 
   ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
-  if (r->poutbl)
-    r->poutbl->claim(reply->get_data());
-  _finish_command(r, reply->r, reply->rs);
+  auto ec = reply->r < 0 ? bs::error_code(-reply->r, mon_category())
+    : bs::error_code();
+  _finish_command(r, ec, reply->rs, std::move(reply->get_data()));
   reply->put();
 }
 
@@ -1280,19 +1286,17 @@ int MonClient::_cancel_mon_command(uint64_t tid)
   ldout(cct, 10) << __func__ << " tid " << tid << dendl;
 
   MonCommand *cmd = it->second;
-  _finish_command(cmd, -ETIMEDOUT, "");
+  _finish_command(cmd, monc_errc::timed_out, "timed out"sv, {});
   return 0;
 }
 
-void MonClient::_finish_command(MonCommand *r, int ret, string rs)
+void MonClient::_finish_command(MonCommand *r, bs::error_code ret,
+                               std::string_view rs, ceph::buffer::list&& bl)
 {
-  ldout(cct, 10) << __func__ << " " << r->tid << " = " << ret << " " << rs << dendl;
-  if (r->prval)
-    *(r->prval) = ret;
-  if (r->prs)
-    *(r->prs) = rs;
-  if (r->onfinish)
-    finisher.queue(r->onfinish, ret);
+  ldout(cct, 10) << __func__ << " " << r->tid << " = " << ret << " " << rs
+                << dendl;
+  ceph::async::defer(std::move(r->onfinish), ret, std::string(rs),
+                    std::move(bl));
   if (r->target_con) {
     r->target_con->mark_down();
   }
@@ -1300,117 +1304,8 @@ void MonClient::_finish_command(MonCommand *r, int ret, string rs)
   delete r;
 }
 
-void MonClient::start_mon_command(const std::vector<string>& cmd,
-                                  const ceph::buffer::list& inbl,
-                                  ceph::buffer::list *outbl, string *outs,
-                                  Context *onfinish)
-{
-  ldout(cct,10) << __func__ << " cmd=" << cmd << dendl;
-  std::lock_guard l(monc_lock);
-  if (!initialized || stopping) {
-    if (onfinish) {
-      onfinish->complete(-ECANCELED);
-    }
-    return;
-  }
-  MonCommand *r = new MonCommand(++last_mon_command_tid);
-  r->cmd = cmd;
-  r->inbl = inbl;
-  r->poutbl = outbl;
-  r->prs = outs;
-  r->onfinish = onfinish;
-  if (cct->_conf->rados_mon_op_timeout > 0) {
-    class C_CancelMonCommand : public Context
-    {
-      uint64_t tid;
-      MonClient *monc;
-      public:
-      C_CancelMonCommand(uint64_t tid, MonClient *monc) : tid(tid), monc(monc) {}
-      void finish(int r) override {
-       monc->_cancel_mon_command(tid);
-      }
-    };
-    r->ontimeout = new C_CancelMonCommand(r->tid, this);
-    timer.add_event_after(cct->_conf->rados_mon_op_timeout, r->ontimeout);
-  }
-  mon_commands[r->tid] = r;
-  _send_command(r);
-}
-
-void MonClient::start_mon_command(const string &mon_name,
-                                  const std::vector<string>& cmd,
-                                  const ceph::buffer::list& inbl,
-                                  ceph::buffer::list *outbl, string *outs,
-                                  Context *onfinish)
-{
-  ldout(cct,10) << __func__ << " mon." << mon_name << " cmd=" << cmd << dendl;
-  std::lock_guard l(monc_lock);
-  if (!initialized || stopping) {
-    if (onfinish) {
-      onfinish->complete(-ECANCELED);
-    }
-    return;
-  }
-  MonCommand *r = new MonCommand(++last_mon_command_tid);
-
-  // detect/tolerate mon *rank* passed as a string
-  string err;
-  int rank = strict_strtoll(mon_name.c_str(), 10, &err);
-  if (err.size() == 0 && rank >= 0) {
-    ldout(cct,10) << __func__ << " interpreting name '" << mon_name
-                 << "' as rank " << rank << dendl;
-    r->target_rank = rank;
-  } else {
-    r->target_name = mon_name;
-  }
-  r->cmd = cmd;
-  r->inbl = inbl;
-  r->poutbl = outbl;
-  r->prs = outs;
-  r->onfinish = onfinish;
-  mon_commands[r->tid] = r;
-  _send_command(r);
-}
-
-void MonClient::start_mon_command(int rank,
-                                  const std::vector<string>& cmd,
-                                  const ceph::buffer::list& inbl,
-                                  ceph::buffer::list *outbl, string *outs,
-                                  Context *onfinish)
-{
-  ldout(cct,10) << __func__ << " rank " << rank << " cmd=" << cmd << dendl;
-  std::lock_guard l(monc_lock);
-  if (!initialized || stopping) {
-    if (onfinish) {
-      onfinish->complete(-ECANCELED);
-    }
-    return;
-  }
-  MonCommand *r = new MonCommand(++last_mon_command_tid);
-  r->target_rank = rank;
-  r->cmd = cmd;
-  r->inbl = inbl;
-  r->poutbl = outbl;
-  r->prs = outs;
-  r->onfinish = onfinish;
-  mon_commands[r->tid] = r;
-  _send_command(r);
-}
-
 // ---------
 
-void MonClient::get_version(string map, version_t *newest, version_t *oldest, Context *onfinish)
-{
-  version_req_d *req = new version_req_d(onfinish, newest, oldest);
-  ldout(cct, 10) << "get_version " << map << " req " << req << dendl;
-  std::lock_guard l(monc_lock);
-  auto m = ceph::make_message<MMonGetVersion>();
-  m->what = map;
-  m->handle = ++version_req_id;
-  version_requests[m->handle] = req;
-  _send_mon_message(std::move(m));
-}
-
 void MonClient::handle_get_version_reply(MMonGetVersionReply* m)
 {
   ceph_assert(ceph_mutex_is_locked(monc_lock));
@@ -1419,15 +1314,12 @@ void MonClient::handle_get_version_reply(MMonGetVersionReply* m)
     ldout(cct, 0) << __func__ << " version request with handle " << m->handle
                  << " not found" << dendl;
   } else {
-    version_req_d *req = iter->second;
-    ldout(cct, 10) << __func__ << " finishing " << req << " version " << m->version << dendl;
+    auto req = std::move(iter->second);
+    ldout(cct, 10) << __func__ << " finishing " << iter->first << " version "
+                  << m->version << dendl;
     version_requests.erase(iter);
-    if (req->newest)
-      *req->newest = m->version;
-    if (req->oldest)
-      *req->oldest = m->oldest_version;
-    finisher.queue(req->context, 0);
-    delete req;
+    ceph::async::defer(std::move(req), bs::error_code(),
+                      m->version, m->oldest_version);
   }
   m->put();
 }
@@ -1597,7 +1489,8 @@ int MonClient::handle_auth_bad_method(
            allowed_methods,
            allowed_modes);
          if (r < 0) {
-           _finish_command(i.second, r, "auth failed");
+           auto ec = bs::error_code(-r, mon_category());
+           _finish_command(i.second, ec, "auth failed"sv, {});
          }
          return r;
        }
@@ -2006,3 +1899,108 @@ void MonClient::register_config_callback(md_config_t::config_callback fn) {
 md_config_t::config_callback MonClient::get_config_callback() {
   return config_cb;
 }
+
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wnon-virtual-dtor"
+#pragma clang diagnostic push
+#pragma clang diagnostic ignored "-Wnon-virtual-dtor"
+class monc_error_category : public ceph::converting_category {
+public:
+  monc_error_category(){}
+  const char* name() const noexcept override;
+  const char* message(int ev, char*, std::size_t) const noexcept override;
+  std::string message(int ev) const override;
+  bs::error_condition default_error_condition(int ev) const noexcept
+    override;
+  bool equivalent(int ev, const bs::error_condition& c) const
+    noexcept override;
+  using ceph::converting_category::equivalent;
+  int from_code(int ev) const noexcept override;
+};
+#pragma GCC diagnostic pop
+#pragma clang diagnostic pop
+
+const char* monc_error_category::name() const noexcept {
+  return "monc";
+}
+
+const char* monc_error_category::message(int ev, char*, std::size_t) const noexcept {
+  if (ev == 0)
+    return "No error";
+
+  switch (static_cast<monc_errc>(ev)) {
+  case monc_errc::shutting_down: // Command failed due to MonClient shutting down
+    return "Command failed due to MonClient shutting down";
+  case monc_errc::session_reset:
+    return "Monitor session was reset";
+  case monc_errc::rank_dne:
+    return "Requested monitor rank does not exist";
+  case monc_errc::mon_dne:
+    return "Requested monitor does not exist";
+  case monc_errc::timed_out:
+    return "Monitor operation timed out";
+  case monc_errc::mon_unavailable:
+    return "Monitor unavailable";
+  }
+
+  return "Unknown error";
+}
+
+std::string monc_error_category::message(int ev) const {
+  return message(ev, nullptr, 0);
+}
+
+bs::error_condition monc_error_category::default_error_condition(int ev) const noexcept {
+  switch (static_cast<monc_errc>(ev)) {
+  case monc_errc::shutting_down:
+    return bs::errc::operation_canceled;
+  case monc_errc::session_reset:
+    return bs::errc::resource_unavailable_try_again;
+  case monc_errc::rank_dne:
+    [[fallthrough]];
+  case monc_errc::mon_dne:
+    return ceph::errc::not_in_map;
+  case monc_errc::timed_out:
+    return bs::errc::timed_out;
+  case monc_errc::mon_unavailable:
+    return bs::errc::no_such_device;
+  }
+  return { ev, *this };
+}
+
+bool monc_error_category::equivalent(int ev, const bs::error_condition& c) const noexcept {
+  switch (static_cast<monc_errc>(ev)) {
+  case monc_errc::rank_dne:
+    [[fallthrough]];
+  case monc_errc::mon_dne:
+      return c == bs::errc::no_such_file_or_directory;
+  default:
+    return default_error_condition(ev) == c;
+  }
+}
+
+int monc_error_category::from_code(int ev) const noexcept {
+  if (ev == 0)
+    return 0;
+
+  switch (static_cast<monc_errc>(ev)) {
+  case monc_errc::shutting_down:
+    return -ECANCELED;
+  case monc_errc::session_reset:
+    return -EAGAIN;
+  case monc_errc::rank_dne:
+    [[fallthrough]];
+  case monc_errc::mon_dne:
+    return -ENOENT;
+  case monc_errc::timed_out:
+    return -ETIMEDOUT;
+  case monc_errc::mon_unavailable:
+    return -ENXIO;
+  }
+  return -EDOM;
+}
+
+const bs::error_category& monc_category() noexcept {
+  static const monc_error_category c;
+  return c;
+}
index b04db11f1350b2b90ed7512c8a60458450106241..4683b0005d8023f86c1ef8a8d0cdad2040b32b6d 100644 (file)
 #include "MonMap.h"
 #include "MonSub.h"
 
+#include "common/async/completion.h"
 #include "common/Timer.h"
-#include "common/Finisher.h"
 #include "common/config.h"
+#include "messages/MMonGetVersion.h"
 
 #include "auth/AuthClient.h"
 #include "auth/AuthServer.h"
 class MMonMap;
 class MConfig;
 class MMonGetVersionReply;
-struct MMonSubscribeAck;
 class MMonCommandAck;
-struct MAuthReply;
 class LogClient;
-class AuthAuthorizer;
 class AuthClientHandler;
 class AuthRegistry;
 class KeyRing;
@@ -141,7 +139,6 @@ private:
 
 struct MonClientPinger : public Dispatcher,
                         public AuthClient {
-
   ceph::mutex lock = ceph::make_mutex("MonClientPinger::lock");
   ceph::condition_variable ping_recvd_cond;
   std::string *result;
@@ -240,11 +237,49 @@ struct MonClientPinger : public Dispatcher,
   }
 };
 
+const boost::system::error_category& monc_category() noexcept;
+
+enum class monc_errc {
+  shutting_down = 1, // Command failed due to MonClient shutting down
+  session_reset, // Monitor session was reset
+  rank_dne, // Requested monitor rank does not exist
+  mon_dne, // Requested monitor does not exist
+  timed_out, // Monitor operation timed out
+  mon_unavailable // Monitor unavailable
+};
+
+namespace boost::system {
+template<>
+struct is_error_code_enum<::monc_errc> {
+  static const bool value = true;
+};
+}
+
+//  implicit conversion:
+inline boost::system::error_code make_error_code(monc_errc e) noexcept {
+  return { static_cast<int>(e), monc_category() };
+}
+
+// explicit conversion:
+inline boost::system::error_condition make_error_condition(monc_errc e) noexcept {
+  return { static_cast<int>(e), monc_category() };
+}
+
+const boost::system::error_category& monc_category() noexcept;
 
 class MonClient : public Dispatcher,
                  public AuthClient,
                  public AuthServer /* for mgr, osd, mds */ {
+  static constexpr auto dout_subsys = ceph_subsys_monc;
 public:
+  // Error, Newest, Oldest
+  using VersionSig = void(boost::system::error_code, version_t, version_t);
+  using VersionCompletion = ceph::async::Completion<VersionSig>;
+
+  using CommandSig = void(boost::system::error_code, std::string,
+                         ceph::buffer::list);
+  using CommandCompletion = ceph::async::Completion<CommandSig>;
+
   MonMap monmap;
   std::map<std::string,std::string> config_mgr;
 
@@ -259,7 +294,8 @@ private:
 
   mutable ceph::mutex monc_lock = ceph::make_mutex("MonClient::monc_lock");
   SafeTimer timer;
-  Finisher finisher;
+  boost::asio::io_context& service;
+  boost::asio::io_context::strand finish_strand{service};
 
   bool initialized;
   bool stopping = false;
@@ -307,7 +343,6 @@ private:
   double reopen_interval_multiplier;
 
   Dispatcher *handle_authentication_dispatcher = nullptr;
-  
   bool _opened() const;
   bool _hunting() const;
   void _start_hunting();
@@ -364,7 +399,7 @@ public:
     bool more,
     uint32_t auth_method,
     const ceph::buffer::list& bl,
-    ceph::buffer::list *reply);
+    ceph::buffer::list *reply) override;
 
   void set_entity_name(EntityName name) { entity_name = name; }
   void set_handle_authentication_dispatcher(Dispatcher *d) {
@@ -413,12 +448,12 @@ public:
     std::lock_guard l(monc_lock);
     return sub.inc_want(what, start, flags);
   }
-  
+
   std::unique_ptr<KeyRing> keyring;
   std::unique_ptr<RotatingKeyRing> rotating_secrets;
 
  public:
-  explicit MonClient(CephContext *cct_);
+  MonClient(CephContext *cct_, boost::asio::io_context& service);
   MonClient(const MonClient &) = delete;
   MonClient& operator=(const MonClient &) = delete;
   ~MonClient() override;
@@ -517,66 +552,206 @@ private:
   struct MonCommand {
     // for tell only
     std::string target_name;
-    int target_rank;
+    int target_rank = -1;
     ConnectionRef target_con;
     std::unique_ptr<MonConnection> target_session;
     unsigned send_attempts = 0;  ///< attempt count for legacy mons
     utime_t last_send_attempt;
-
     uint64_t tid;
     std::vector<std::string> cmd;
     ceph::buffer::list inbl;
-    ceph::buffer::list *poutbl;
-    std::string *prs;
-    int *prval;
-    Context *onfinish, *ontimeout;
-
-    explicit MonCommand(uint64_t t)
-      : target_rank(-1),
-       tid(t),
-       poutbl(NULL), prs(NULL), prval(NULL), onfinish(NULL), ontimeout(NULL)
-    {}
+    std::unique_ptr<CommandCompletion> onfinish;
+    std::optional<boost::asio::steady_timer> cancel_timer;
+
+    MonCommand(MonClient& monc, uint64_t t, std::unique_ptr<CommandCompletion> onfinish)
+      : tid(t), onfinish(std::move(onfinish)) {
+      auto timeout = ceph::maybe_timespan(monc.cct->_conf->rados_mon_op_timeout);
+      if (timeout) {
+       cancel_timer.emplace(monc.service, *timeout);
+       cancel_timer->async_wait(
+          [this, &monc](boost::system::error_code ec) {
+           if (ec)
+             return;
+           std::scoped_lock l(monc.monc_lock);
+           monc._cancel_mon_command(tid);
+         });
+      }
+    }
 
     bool is_tell() const {
       return target_name.size() || target_rank >= 0;
     }
   };
+  friend MonCommand;
   std::map<uint64_t,MonCommand*> mon_commands;
 
   void _send_command(MonCommand *r);
   void _check_tell_commands();
   void _resend_mon_commands();
   int _cancel_mon_command(uint64_t tid);
-  void _finish_command(MonCommand *r, int ret, std::string rs);
+  void _finish_command(MonCommand *r, boost::system::error_code ret, std::string_view rs,
+                      bufferlist&& bl);
   void _finish_auth();
   void handle_mon_command_ack(MMonCommandAck *ack);
   void handle_command_reply(MCommandReply *reply);
 
 public:
-  void start_mon_command(const std::vector<std::string>& cmd, const ceph::buffer::list& inbl,
-                       ceph::buffer::list *outbl, std::string *outs,
-                       Context *onfinish);
+  template<typename CompletionToken>
+  auto start_mon_command(const std::vector<std::string>& cmd,
+                         const ceph::buffer::list& inbl,
+                        CompletionToken&& token) {
+    ldout(cct,10) << __func__ << " cmd=" << cmd << dendl;
+    boost::asio::async_completion<CompletionToken, CommandSig> init(token);
+    {
+      std::scoped_lock l(monc_lock);
+      auto h = CommandCompletion::create(service.get_executor(),
+                                        std::move(init.completion_handler));
+      if (!initialized || stopping) {
+       ceph::async::post(std::move(h), monc_errc::shutting_down, std::string{},
+                         bufferlist{});
+      } else {
+       auto r = new MonCommand(*this, ++last_mon_command_tid, std::move(h));
+       r->cmd = cmd;
+       r->inbl = inbl;
+       mon_commands.emplace(r->tid, r);
+       _send_command(r);
+      }
+    }
+    return init.result.get();
+  }
+
+  template<typename CompletionToken>
+  auto start_mon_command(int mon_rank, const std::vector<std::string>& cmd,
+                        const ceph::buffer::list& inbl, CompletionToken&& token) {
+    ldout(cct,10) << __func__ << " cmd=" << cmd << dendl;
+    boost::asio::async_completion<CompletionToken, CommandSig> init(token);
+    {
+      std::scoped_lock l(monc_lock);
+      auto h = CommandCompletion::create(service.get_executor(),
+                                        std::move(init.completion_handler));
+      if (!initialized || stopping) {
+       ceph::async::post(std::move(h), monc_errc::shutting_down, std::string{},
+                         bufferlist{});
+      } else {
+       auto r = new MonCommand(*this, ++last_mon_command_tid, std::move(h));
+       r->target_rank = mon_rank;
+       r->cmd = cmd;
+       r->inbl = inbl;
+       mon_commands.emplace(r->tid, r);
+       _send_command(r);
+      }
+    }
+    return init.result.get();
+  }
+
+  template<typename CompletionToken>
+  auto start_mon_command(const std::string& mon_name,
+                         const std::vector<std::string>& cmd,
+                        const ceph::buffer::list& inbl,
+                        CompletionToken&& token) {
+    ldout(cct,10) << __func__ << " cmd=" << cmd << dendl;
+    boost::asio::async_completion<CompletionToken, CommandSig> init(token);
+    {
+      std::scoped_lock l(monc_lock);
+      auto h = CommandCompletion::create(service.get_executor(),
+                                        std::move(init.completion_handler));
+      if (!initialized || stopping) {
+       ceph::async::post(std::move(h), monc_errc::shutting_down, std::string{},
+                         bufferlist{});
+      } else {
+       auto r = new MonCommand(*this, ++last_mon_command_tid, std::move(h));
+       // detect/tolerate mon *rank* passed as a string
+       std::string err;
+       int rank = strict_strtoll(mon_name.c_str(), 10, &err);
+       if (err.size() == 0 && rank >= 0) {
+         ldout(cct,10) << __func__ << " interpreting name '" << mon_name
+                       << "' as rank " << rank << dendl;
+         r->target_rank = rank;
+       } else {
+         r->target_name = mon_name;
+       }
+       r->cmd = cmd;
+       r->inbl = inbl;
+       mon_commands.emplace(r->tid, r);
+       _send_command(r);
+      }
+    }
+    return init.result.get();
+  }
+
+  class ContextVerter {
+    std::string* outs;
+    ceph::bufferlist* outbl;
+    Context* onfinish;
+
+  public:
+    ContextVerter(std::string* outs, ceph::bufferlist* outbl, Context* onfinish)
+      : outs(outs), outbl(outbl), onfinish(onfinish) {}
+    ~ContextVerter() = default;
+    ContextVerter(const ContextVerter&) = default;
+    ContextVerter& operator =(const ContextVerter&) = default;
+    ContextVerter(ContextVerter&&) = default;
+    ContextVerter& operator =(ContextVerter&&) = default;
+
+    void operator()(boost::system::error_code e,
+                   std::string s,
+                   ceph::bufferlist bl) {
+      if (outs)
+       *outs = std::move(s);
+      if (outbl)
+       *outbl = std::move(bl);
+      if (onfinish)
+       onfinish->complete(ceph::from_error_code(e));
+    }
+  };
+
+  void start_mon_command(const vector<string>& cmd, const bufferlist& inbl,
+                        bufferlist *outbl, string *outs,
+                        Context *onfinish) {
+    start_mon_command(cmd, inbl, ContextVerter(outs, outbl, onfinish));
+  }
   void start_mon_command(int mon_rank,
-                         const std::vector<std::string>& cmd, const ceph::buffer::list& inbl,
-                         ceph::buffer::list *outbl, std::string *outs,
-                         Context *onfinish);
-  void start_mon_command(const std::string &mon_name,  ///< mon name, with mon. prefix
-                         const std::vector<std::string>& cmd, const ceph::buffer::list& inbl,
-                         ceph::buffer::list *outbl, std::string *outs,
-                         Context *onfinish);
+                        const vector<string>& cmd, const bufferlist& inbl,
+                        bufferlist *outbl, string *outs,
+                        Context *onfinish) {
+    start_mon_command(mon_rank, cmd, inbl, ContextVerter(outs, outbl, onfinish));
+  }
+  void start_mon_command(const string &mon_name,  ///< mon name, with mon. prefix
+                        const vector<string>& cmd, const bufferlist& inbl,
+                        bufferlist *outbl, string *outs,
+                        Context *onfinish) {
+    start_mon_command(mon_name, cmd, inbl, ContextVerter(outs, outbl, onfinish));
+  }
+
 
   // version requests
 public:
   /**
    * get latest known version(s) of cluster map
    *
-   * @param map std::string name of map (e.g., 'osdmap')
-   * @param newest pointer where newest map version will be stored
-   * @param oldest pointer where oldest map version will be stored
-   * @param onfinish context that will be triggered on completion
-   * @return (via context) 0 on success, -EAGAIN if we need to resubmit our request
+   * @param map string name of map (e.g., 'osdmap')
+   * @param token context that will be triggered on completion
+   * @return (via Completion) {} on success,
+   *         boost::system::errc::resource_unavailable_try_again if we need to
+   *         resubmit our request
    */
-  void get_version(std::string map, version_t *newest, version_t *oldest, Context *onfinish);
+  template<typename CompletionToken>
+  auto get_version(std::string&& map, CompletionToken&& token) {
+    boost::asio::async_completion<CompletionToken, VersionSig> init(token);
+    {
+      std::scoped_lock l(monc_lock);
+      auto m = ceph::make_message<MMonGetVersion>();
+      m->what = std::move(map);
+      m->handle = ++version_req_id;
+      version_requests.emplace(m->handle,
+                              VersionCompletion::create(
+                                service.get_executor(),
+                                std::move(init.completion_handler)));
+      _send_mon_message(m);
+    }
+    return init.result.get();
+  }
+
   /**
    * Run a callback within our lock, with a reference
    * to the MonMap
@@ -595,16 +770,10 @@ public:
   md_config_t::config_callback get_config_callback();
 
 private:
-  struct version_req_d {
-    Context *context;
-    version_t *newest, *oldest;
-    version_req_d(Context *con, version_t *n, version_t *o) : context(con),newest(n), oldest(o) {}
-  };
 
-  std::map<ceph_tid_t, version_req_d*> version_requests;
+  std::map<ceph_tid_t, std::unique_ptr<VersionCompletion>> version_requests;
   ceph_tid_t version_req_id;
   void handle_get_version_reply(MMonGetVersionReply* m);
-
   md_config_t::config_callback config_cb;
   std::function<void(void)> config_notify_cb;
 };
index 014cc821c00d3991a9ab1ea52a2857353b619457..ea7103f26096a71a321c83e5a7db3770f5ab6478 100644 (file)
@@ -6,6 +6,7 @@
 #include <cstring>
 #include <map>
 
+#include "common/async/context_pool.h"
 #include "common/ceph_context.h"
 #include "common/ceph_argparse.h"
 #include "global/global_init.h"
@@ -40,7 +41,8 @@ extern "C" void mount_ceph_get_config_info(const char *config_file,
   conf.parse_env(cct->get_module_type()); // environment variables override
   conf.apply_changes(nullptr);
 
-  MonClient monc = MonClient(cct.get());
+  ceph::async::io_context_pool ioc(1);
+  MonClient monc = MonClient(cct.get(), ioc);
   err = monc.build_initial_monmap();
   if (err)
     goto scrape_keyring;
index 16e400cbd39b80eb590be8097f83d86309e71832..39cf8d19beafd59f2737b8fba437731c01422aee 100644 (file)
@@ -246,7 +246,7 @@ CompatSet OSD::get_osd_compat_set() {
   return compat;
 }
 
-OSDService::OSDService(OSD *osd) :
+OSDService::OSDService(OSD *osd, ceph::async::io_context_pool& poolctx) :
   osd(osd),
   cct(osd->cct),
   whoami(osd->whoami), store(osd->store),
@@ -274,6 +274,7 @@ OSDService::OSDService(OSD *osd) :
   last_recalibrate(ceph_clock_now()),
   promote_max_objects(0),
   promote_max_bytes(0),
+  poolctx(poolctx),
   objecter(make_unique<Objecter>(osd->client_messenger->cct,
                                 osd->objecter_messenger,
                                 osd->monc, nullptr, 0, 0)),
@@ -2142,7 +2143,8 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
         Messenger *hb_back_serverm,
         Messenger *osdc_messenger,
         MonClient *mc,
-        const std::string &dev, const std::string &jdev) :
+        const std::string &dev, const std::string &jdev,
+        ceph::async::io_context_pool& poolctx) :
   Dispatcher(cct_),
   tick_timer(cct, osd_lock),
   tick_timer_without_osd_lock(cct, tick_timer_lock),
@@ -2189,7 +2191,7 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
   up_thru_wanted(0),
   requested_full_first(0),
   requested_full_last(0),
-  service(this)
+  service(this, poolctx)
 {
 
   if (!gss_ktfile_client.empty()) {
@@ -6240,12 +6242,12 @@ bool OSD::ms_handle_refused(Connection *con)
   return true;
 }
 
-struct C_OSD_GetVersion : public Context {
+struct CB_OSD_GetVersion {
   OSD *osd;
-  uint64_t oldest, newest;
-  explicit C_OSD_GetVersion(OSD *o) : osd(o), oldest(0), newest(0) {}
-  void finish(int r) override {
-    if (r >= 0)
+  explicit CB_OSD_GetVersion(OSD *o) : osd(o) {}
+  void operator ()(boost::system::error_code ec, version_t newest,
+                  version_t oldest) {
+    if (!ec)
       osd->_got_mon_epochs(oldest, newest);
   }
 };
@@ -6265,8 +6267,7 @@ void OSD::start_boot()
   set_state(STATE_PREBOOT);
   dout(10) << "start_boot - have maps " << superblock.oldest_map
           << ".." << superblock.newest_map << dendl;
-  C_OSD_GetVersion *c = new C_OSD_GetVersion(this);
-  monc->get_version("osdmap", &c->newest, &c->oldest, c);
+  monc->get_version("osdmap", CB_OSD_GetVersion(this));
 }
 
 void OSD::_got_mon_epochs(epoch_t oldest, epoch_t newest)
@@ -9863,6 +9864,10 @@ void OSD::handle_conf_change(const ConfigProxy& conf,
     dout(0) << __func__ << ": scrub interval change" << dendl;
   }
   check_config();
+  if (changed.count("osd_asio_thread_count")) {
+    service.poolctx.stop();
+    service.poolctx.start(conf.get_val<std::uint64_t>("osd_asio_thread_count"));
+  }
 }
 
 void OSD::update_log_config()
index 44dcae442e45cda1dd57cd76788b4e080180599d..522de0cc846d0a2cd180a427e385383579b18a9d 100644 (file)
@@ -19,6 +19,7 @@
 
 #include "msg/Dispatcher.h"
 
+#include "common/async/context_pool.h"
 #include "common/Timer.h"
 #include "common/WorkQueue.h"
 #include "common/AsyncReserver.h"
@@ -523,6 +524,7 @@ public:
   }
 
   // -- Objecter, for tiering reads/writes from/to other OSDs --
+  ceph::async::io_context_pool& poolctx;
   std::unique_ptr<Objecter> objecter;
   int m_objecter_finishers;
   std::vector<std::unique_ptr<Finisher>> objecter_finishers;
@@ -894,7 +896,7 @@ public:
   void dump_live_pgids();
 #endif
 
-  explicit OSDService(OSD *osd);
+  explicit OSDService(OSD *osd, ceph::async::io_context_pool& poolctx);
   ~OSDService() = default;
 };
 
@@ -1819,7 +1821,7 @@ protected:
 
   void send_full_update();
   
-  friend struct C_OSD_GetVersion;
+  friend struct CB_OSD_GetVersion;
 
   // -- alive --
   epoch_t up_thru_wanted;
@@ -1993,7 +1995,8 @@ private:
       Messenger *hb_front_server,
       Messenger *hb_back_server,
       Messenger *osdc_messenger,
-      MonClient *mc, const std::string &dev, const std::string &jdev);
+      MonClient *mc, const std::string &dev, const std::string &jdev,
+      ceph::async::io_context_pool& poolctx);
   ~OSD() override;
 
   // static bits
index d56a76dbd8d59d268f2d2b907a07268c5a846980..2a8577f05bde2022065a932383bb38ed405adebb 100644 (file)
@@ -45,6 +45,7 @@
 #include "messages/MWatchNotify.h"
 
 
+#include "common/Cond.h"
 #include "common/config.h"
 #include "common/perf_counters.h"
 #include "common/scrub_types.h"
@@ -1437,13 +1438,15 @@ void Objecter::emit_blacklist_events(const OSDMap &old_osd_map,
 
 // op pool check
 
-void Objecter::C_Op_Map_Latest::finish(int r)
+void Objecter::CB_Op_Map_Latest::operator()(boost::system::error_code e,
+                                           version_t latest, version_t)
 {
-  if (r == -EAGAIN || r == -ECANCELED)
+  if (e == boost::system::errc::resource_unavailable_try_again ||
+      e == boost::system::errc::operation_canceled)
     return;
 
   lgeneric_subdout(objecter->cct, objecter, 10)
-    << "op_map_latest r=" << r << " tid=" << tid
+    << "op_map_latest r=" << e << " tid=" << tid
     << " latest " << latest << dendl;
 
   Objecter::unique_lock wl(objecter->rwlock);
@@ -1584,8 +1587,7 @@ void Objecter::_send_op_map_check(Op *op)
   if (check_latest_map_ops.count(op->tid) == 0) {
     op->get();
     check_latest_map_ops[op->tid] = op;
-    C_Op_Map_Latest *c = new C_Op_Map_Latest(this, op->tid);
-    monc->get_version("osdmap", &c->latest, NULL, c);
+    monc->get_version("osdmap", CB_Op_Map_Latest(this, op->tid));
   }
 }
 
@@ -1603,9 +1605,12 @@ void Objecter::_op_cancel_map_check(Op *op)
 
 // linger pool check
 
-void Objecter::C_Linger_Map_Latest::finish(int r)
+void Objecter::CB_Linger_Map_Latest::operator()(boost::system::error_code e,
+                                               version_t latest,
+                                               version_t)
 {
-  if (r == -EAGAIN || r == -ECANCELED) {
+  if (e == boost::system::errc::resource_unavailable_try_again ||
+      e == boost::system::errc::operation_canceled) {
     // ignore callback; we will retry in resend_mon_ops()
     return;
   }
@@ -1675,8 +1680,7 @@ void Objecter::_send_linger_map_check(LingerOp *op)
   if (check_latest_map_lingers.count(op->linger_id) == 0) {
     op->get();
     check_latest_map_lingers[op->linger_id] = op;
-    C_Linger_Map_Latest *c = new C_Linger_Map_Latest(this, op->linger_id);
-    monc->get_version("osdmap", &c->latest, NULL, c);
+    monc->get_version("osdmap", CB_Linger_Map_Latest(this, op->linger_id));
   }
 }
 
@@ -1695,9 +1699,11 @@ void Objecter::_linger_cancel_map_check(LingerOp *op)
 
 // command pool check
 
-void Objecter::C_Command_Map_Latest::finish(int r)
+void Objecter::CB_Command_Map_Latest::operator()(boost::system::error_code e,
+                                                version_t latest, version_t)
 {
-  if (r == -EAGAIN || r == -ECANCELED) {
+  if (e == boost::system::errc::resource_unavailable_try_again ||
+      e == boost::system::errc::operation_canceled) {
     // ignore callback; we will retry in resend_mon_ops()
     return;
   }
@@ -1750,8 +1756,7 @@ void Objecter::_send_command_map_check(CommandOp *c)
   if (check_latest_map_commands.count(c->tid) == 0) {
     c->get();
     check_latest_map_commands[c->tid] = c;
-    C_Command_Map_Latest *f = new C_Command_Map_Latest(this, c->tid);
-    monc->get_version("osdmap", &f->latest, NULL, f);
+    monc->get_version("osdmap", CB_Command_Map_Latest(this, c->tid));
   }
 }
 
@@ -1930,16 +1935,15 @@ void Objecter::wait_for_osd_map(epoch_t e)
   cond.wait(mlock, [&done] { return done; });
 }
 
-struct C_Objecter_GetVersion : public Context {
+struct CB_Objecter_GetVersion {
   Objecter *objecter;
-  uint64_t oldest, newest;
   Context *fin;
-  C_Objecter_GetVersion(Objecter *o, Context *c)
-    : objecter(o), oldest(0), newest(0), fin(c) {}
-  void finish(int r) override {
-    if (r >= 0) {
+  CB_Objecter_GetVersion(Objecter *o, Context *c) : objecter(o), fin(c) {}
+  void operator()(boost::system::error_code e, version_t newest, version_t oldest) {
+    if (!e) {
       objecter->get_latest_version(oldest, newest, fin);
-    } else if (r == -EAGAIN) { // try again as instructed
+    } else if (e == boost::system::errc::resource_unavailable_try_again) {
+      // try again as instructed
       objecter->wait_for_latest_osdmap(fin);
     } else {
       // it doesn't return any other error codes!
@@ -1951,8 +1955,7 @@ struct C_Objecter_GetVersion : public Context {
 void Objecter::wait_for_latest_osdmap(Context *fin)
 {
   ldout(cct, 10) << __func__ << dendl;
-  C_Objecter_GetVersion *c = new C_Objecter_GetVersion(this, fin);
-  monc->get_version("osdmap", &c->newest, &c->oldest, c);
+  monc->get_version("osdmap", CB_Objecter_GetVersion(this, fin));
 }
 
 void Objecter::get_latest_version(epoch_t oldest, epoch_t newest, Context *fin)
@@ -2230,24 +2233,20 @@ void Objecter::resend_mon_ops()
   for (map<ceph_tid_t, Op*>::iterator p = check_latest_map_ops.begin();
        p != check_latest_map_ops.end();
        ++p) {
-    C_Op_Map_Latest *c = new C_Op_Map_Latest(this, p->second->tid);
-    monc->get_version("osdmap", &c->latest, NULL, c);
+    monc->get_version("osdmap", CB_Op_Map_Latest(this, p->second->tid));
   }
 
   for (map<uint64_t, LingerOp*>::iterator p = check_latest_map_lingers.begin();
        p != check_latest_map_lingers.end();
        ++p) {
-    C_Linger_Map_Latest *c
-      = new C_Linger_Map_Latest(this, p->second->linger_id);
-    monc->get_version("osdmap", &c->latest, NULL, c);
+    monc->get_version("osdmap", CB_Linger_Map_Latest(this, p->second->linger_id));
   }
 
   for (map<uint64_t, CommandOp*>::iterator p
         = check_latest_map_commands.begin();
        p != check_latest_map_commands.end();
        ++p) {
-    C_Command_Map_Latest *c = new C_Command_Map_Latest(this, p->second->tid);
-    monc->get_version("osdmap", &c->latest, NULL, c);
+    monc->get_version("osdmap", CB_Command_Map_Latest(this, p->second->tid));
   }
 }
 
index 91ebc0a14db2cd8a8905a10dae67fc48bd269fad..5adc4a77b470195dd9b855189e6fa0a344575c89 100644 (file)
@@ -1553,22 +1553,18 @@ public:
     }
   };
 
-  struct C_Op_Map_Latest : public Context {
+  struct CB_Op_Map_Latest {
     Objecter *objecter;
     ceph_tid_t tid;
-    version_t latest;
-    C_Op_Map_Latest(Objecter *o, ceph_tid_t t) : objecter(o), tid(t),
-                                                latest(0) {}
-    void finish(int r) override;
+    CB_Op_Map_Latest(Objecter *o, ceph_tid_t t) : objecter(o), tid(t) {}
+    void operator()(boost::system::error_code err, version_t latest, version_t);
   };
 
-  struct C_Command_Map_Latest : public Context {
+  struct CB_Command_Map_Latest {
     Objecter *objecter;
     uint64_t tid;
-    version_t latest;
-    C_Command_Map_Latest(Objecter *o, ceph_tid_t t) :  objecter(o), tid(t),
-                                                      latest(0) {}
-    void finish(int r) override;
+    CB_Command_Map_Latest(Objecter *o, ceph_tid_t t) :  objecter(o), tid(t) {}
+    void operator()(boost::system::error_code err, version_t latest, version_t);
   };
 
   struct C_Stat : public Context {
@@ -1912,13 +1908,11 @@ public:
     }
   };
 
-  struct C_Linger_Map_Latest : public Context {
+  struct CB_Linger_Map_Latest {
     Objecter *objecter;
     uint64_t linger_id;
-    version_t latest;
-    C_Linger_Map_Latest(Objecter *o, uint64_t id) :
-      objecter(o), linger_id(id), latest(0) {}
-    void finish(int r) override;
+    CB_Linger_Map_Latest(Objecter *o, uint64_t id) : objecter(o), linger_id(id) {}
+    void operator()(boost::system::error_code err, version_t latest, version_t);
   };
 
   // -- osd sessions --
index ad7ab6c8e23a1505f4704b829726aa60f2fa31f1..4c551cab8109e179acbd86e7895d4f697ba2aa4f 100644 (file)
@@ -20,6 +20,7 @@
 
 #include "global/global_init.h"
 #include "global/global_context.h"
+#include "common/async/context_pool.h"
 #include "common/ceph_argparse.h"
 #include "common/version.h"
 #include "common/dout.h"
@@ -51,6 +52,7 @@ class MonClientHelper : public Dispatcher
 {
 protected:
   CephContext *cct;
+  ceph::async::io_context_pool poolctx;
   Messenger *msg;
   MonClient monc;
 
@@ -63,8 +65,9 @@ public:
   explicit MonClientHelper(CephContext *cct_)
     : Dispatcher(cct_),
       cct(cct_),
+      poolctx(1),
       msg(NULL),
-      monc(cct_)
+      monc(cct_, poolctx)
   { }
 
 
index 15792a63af5816f76ad338eda2c19126411f5f68..d62b0d7104a736f40a3d9707e1b271d3e4af5a98 100644 (file)
@@ -39,6 +39,7 @@
 #include "mon/MonClient.h"
 #include "msg/Dispatcher.h"
 #include "msg/Messenger.h"
+#include "common/async/context_pool.h"
 #include "common/Timer.h"
 #include "common/ceph_argparse.h"
 #include "global/global_init.h"
@@ -82,6 +83,7 @@ class TestStub : public Dispatcher
 {
  protected:
   MessengerRef messenger;
+  ceph::async::io_context_pool poolctx;
   MonClient monc;
 
   ceph::mutex lock;
@@ -163,6 +165,7 @@ class TestStub : public Dispatcher
     monc.shutdown();
     timer.shutdown();
     messenger->shutdown();
+    poolctx.finish();
     return 0;
   }
 
@@ -177,7 +180,7 @@ class TestStub : public Dispatcher
 
   TestStub(CephContext *cct, string who)
     : Dispatcher(cct),
-      monc(cct),
+      monc(cct, poolctx),
       lock(ceph::make_mutex(who.append("::lock"))),
       timer(cct, lock),
       do_shutdown(false),
@@ -244,6 +247,7 @@ class ClientStub : public TestStub
 
   int init() override {
     int err;
+    poolctx.start(1);
     err = monc.build_initial_monmap();
     if (err < 0) {
       derr << "ClientStub::" << __func__ << " ERROR: build initial monmap: "
index 55175726af9479ce8180eb0a34c855587e34b20c..b61a83dde08bba1cbb83f2c07ff08ac5f7817d61 100644 (file)
@@ -22,6 +22,7 @@
 #include <stdio.h>
 #include <signal.h>
 #include <gtest/gtest.h>
+#include "common/async/context_pool.h"
 #include "osd/OSD.h"
 #include "os/ObjectStore.h"
 #include "mon/MonClient.h"
@@ -41,8 +42,9 @@ public:
       Messenger *hb_front_server,
       Messenger *hb_back_server,
       Messenger *osdc_messenger,
-      MonClient *mc, const std::string &dev, const std::string &jdev) :
-      OSD(cct_, store_, id, internal, external, hb_front_client, hb_back_client, hb_front_server, hb_back_server, osdc_messenger, mc, dev, jdev)
+      MonClient *mc, const std::string &dev, const std::string &jdev,
+      ceph::async::io_context_pool& ictx) :
+      OSD(cct_, store_, id, internal, external, hb_front_client, hb_back_client, hb_front_server, hb_back_server, osdc_messenger, mc, dev, jdev, ictx)
   {
   }
 
@@ -52,6 +54,7 @@ public:
 };
 
 TEST(TestOSDScrub, scrub_time_permit) {
+  ceph::async::io_context_pool icp(1);
   ObjectStore *store = ObjectStore::create(g_ceph_context,
              g_conf()->osd_objectstore,
              g_conf()->osd_data,
@@ -63,9 +66,9 @@ TEST(TestOSDScrub, scrub_time_permit) {
   ms->set_cluster_protocol(CEPH_OSD_PROTOCOL);
   ms->set_default_policy(Messenger::Policy::stateless_server(0));
   ms->bind(g_conf()->public_addr);
-  MonClient mc(g_ceph_context);
+  MonClient mc(g_ceph_context, icp);
   mc.build_initial_monmap();
-  TestOSDScrub* osd = new TestOSDScrub(g_ceph_context, store, 0, ms, ms, ms, ms, ms, ms, ms, &mc, "", "");
+  TestOSDScrub* osd = new TestOSDScrub(g_ceph_context, store, 0, ms, ms, ms, ms, ms, ms, ms, &mc, "", "", icp);
 
   g_ceph_context->_conf.set_val("osd_scrub_begin_hour", "0");
   g_ceph_context->_conf.set_val("osd_scrub_end_hour", "24");
index 839b4aea0502d25bea0d20be00670367d58e7049..3e867aa735f277b777eae0a9c6ab6f0422f41f8c 100644 (file)
@@ -25,7 +25,7 @@ MDSUtility::MDSUtility() :
   waiting_for_mds_map(NULL),
   inited(false)
 {
-  monc = new MonClient(g_ceph_context);
+  monc = new MonClient(g_ceph_context, poolctx);
   messenger = Messenger::create_client_messenger(g_ceph_context, "mds");
   fsmap = new FSMap();
   objecter = new Objecter(g_ceph_context, messenger, monc, NULL, 0, 0);
@@ -52,6 +52,7 @@ int MDSUtility::init()
   if (r < 0)
     return r;
 
+  poolctx.start(1);
   messenger->start();
 
   objecter->set_client_incarnation(0);
@@ -125,6 +126,7 @@ void MDSUtility::shutdown()
   monc->shutdown();
   messenger->shutdown();
   messenger->wait();
+  poolctx.finish();
 }
 
 
index e5097ec48fe25dd7d90a550885b259e09c3c88c3..09f1918ba4457c5a0a6de195a7aa546ec4eb22e6 100644 (file)
@@ -20,6 +20,7 @@
 #include "msg/Dispatcher.h"
 #include "msg/Messenger.h"
 #include "auth/Auth.h"
+#include "common/async/context_pool.h"
 #include "common/Finisher.h"
 #include "common/Timer.h"
 
@@ -38,6 +39,7 @@ protected:
 
   ceph::mutex lock = ceph::make_mutex("MDSUtility::lock");
   Finisher finisher;
+  ceph::async::io_context_pool poolctx;
 
   Context *waiting_for_mds_map;