From 306eebe05bec1220b2477a66b6501da222bacdfc Mon Sep 17 00:00:00 2001 From: "Adam C. Emerson" Date: Tue, 28 Apr 2020 17:32:56 -0400 Subject: [PATCH] monc: Asifoact MonClient Of course now everyone has to feed an io_context into the MonClient. Signed-off-by: Adam C. Emerson --- src/ceph_fuse.cc | 8 +- src/ceph_mds.cc | 8 +- src/ceph_osd.cc | 9 +- src/ceph_syn.cc | 9 +- src/client/Client.cc | 21 +- src/common/ceph_context.h | 17 +- src/common/options.cc | 24 ++ src/global/global_init.cc | 16 +- src/global/global_init.h | 6 +- src/libcephfs.cc | 8 +- src/librados/RadosClient.cc | 89 +++-- src/librados/RadosClient.h | 22 +- src/libradosstriper/MultiAioCompletionImpl.cc | 10 - src/libradosstriper/MultiAioCompletionImpl.h | 17 +- src/libradosstriper/RadosStriperImpl.h | 3 + src/mds/MDSDaemon.cc | 14 +- src/mds/MDSDaemon.h | 5 +- src/mds/MDSRank.cc | 11 +- src/mds/MDSRank.h | 9 +- src/mgr/MgrContext.h | 1 + src/mgr/MgrStandby.cc | 13 +- src/mgr/MgrStandby.h | 2 + src/mon/AuthMonitor.cc | 2 +- src/mon/MonClient.cc | 312 +++++++++--------- src/mon/MonClient.h | 261 ++++++++++++--- src/mount/conf.cc | 4 +- src/osd/OSD.cc | 25 +- src/osd/OSD.h | 9 +- src/osdc/Objecter.cc | 57 ++-- src/osdc/Objecter.h | 24 +- src/test/mon/test-mon-msg.cc | 5 +- src/test/mon/test_mon_workloadgen.cc | 6 +- src/test/osd/TestOSDScrub.cc | 11 +- src/tools/cephfs/MDSUtility.cc | 4 +- src/tools/cephfs/MDSUtility.h | 2 + 35 files changed, 674 insertions(+), 370 deletions(-) diff --git a/src/ceph_fuse.cc b/src/ceph_fuse.cc index 9d6141c1eaa..ab4db60920c 100644 --- a/src/ceph_fuse.cc +++ b/src/ceph_fuse.cc @@ -16,7 +16,9 @@ #include #include #include +#include +#include "common/async/context_pool.h" #include "common/config.h" #include "common/errno.h" @@ -45,6 +47,8 @@ #define dout_context g_ceph_context +ceph::async::io_context_pool icp; + static void fuse_usage() { const char* argv[] = { @@ -234,7 +238,8 @@ int main(int argc, const char **argv, const char *envp[]) { int tester_r = 0; void *tester_rp = nullptr; - MonClient *mc = new MonClient(g_ceph_context); + icp.start(cct->_conf.get_val("client_asio_thread_count")); + MonClient *mc = new MonClient(g_ceph_context, icp); int r = mc->build_initial_monmap(); if (r == -EINVAL) { cerr << "failed to generate initial mon list" << std::endl; @@ -316,6 +321,7 @@ int main(int argc, const char **argv, const char *envp[]) { client->unmount(); cfuse->finalize(); out_shutdown: + icp.stop(); client->shutdown(); out_init_failed: unregister_async_signal_handler(SIGHUP, sighup_handler); diff --git a/src/ceph_mds.cc b/src/ceph_mds.cc index 4d960dc2e7f..82c89ae9d52 100644 --- a/src/ceph_mds.cc +++ b/src/ceph_mds.cc @@ -20,6 +20,7 @@ #include #include +#include "common/async/context_pool.h" #include "include/ceph_features.h" #include "include/compat.h" #include "include/random.h" @@ -195,7 +196,8 @@ int main(int argc, const char **argv) register_async_signal_handler(SIGHUP, sighup_handler); // get monmap - MonClient mc(g_ceph_context); + ceph::async::io_context_pool ctxpool(2); + MonClient mc(g_ceph_context, ctxpool); if (mc.build_initial_monmap() < 0) forker.exit(1); global_init_chdir(g_ceph_context); @@ -203,7 +205,7 @@ int main(int argc, const char **argv) msgr->start(); // start mds - mds = new MDSDaemon(g_conf()->name.get_id().c_str(), msgr, &mc); + mds = new MDSDaemon(g_conf()->name.get_id().c_str(), msgr, &mc, ctxpool); // in case we have to respawn... mds->orig_argc = argc; @@ -234,6 +236,7 @@ int main(int argc, const char **argv) shutdown_async_signal_handler(); shutdown: + ctxpool.stop(); // yuck: grab the mds lock, so we can be sure that whoever in *mds // called shutdown finishes what they were doing. mds->mds_lock.lock(); @@ -257,4 +260,3 @@ int main(int argc, const char **argv) return 0; } - diff --git a/src/ceph_osd.cc b/src/ceph_osd.cc index 9fde4400028..63ae56643fa 100644 --- a/src/ceph_osd.cc +++ b/src/ceph_osd.cc @@ -670,7 +670,10 @@ flushjournal_out: srand(time(NULL) + getpid()); - MonClient mc(g_ceph_context); + ceph::async::io_context_pool poolctx( + cct->_conf.get_val("osd_asio_thread_count")); + + MonClient mc(g_ceph_context, poolctx); if (mc.build_initial_monmap() < 0) return -1; global_init_chdir(g_ceph_context); @@ -691,7 +694,8 @@ flushjournal_out: ms_objecter, &mc, data_path, - journal_path); + journal_path, + poolctx); int err = osdptr->pre_init(); if (err < 0) { @@ -746,6 +750,7 @@ flushjournal_out: shutdown_async_signal_handler(); // done + poolctx.stop(); delete osdptr; delete ms_public; delete ms_hb_front_client; diff --git a/src/ceph_syn.cc b/src/ceph_syn.cc index 50e26f2815a..e3ca1328702 100644 --- a/src/ceph_syn.cc +++ b/src/ceph_syn.cc @@ -18,6 +18,7 @@ #include "common/config.h" +#include "common/async/context_pool.h" #include "client/SyntheticClient.h" #include "client/Client.h" @@ -50,7 +51,8 @@ int main(int argc, const char **argv, char *envp[]) pick_addresses(g_ceph_context, CEPH_PICK_ADDRESS_PUBLIC); // get monmap - MonClient mc(g_ceph_context); + ceph::async::io_context_pool poolctx(1); + MonClient mc(g_ceph_context, poolctx); if (mc.build_initial_monmap() < 0) return -1; @@ -64,7 +66,7 @@ int main(int argc, const char **argv, char *envp[]) messengers[i] = Messenger::create_client_messenger(g_ceph_context, "synclient"); messengers[i]->bind(g_conf()->public_addr); - mclients[i] = new MonClient(g_ceph_context); + mclients[i] = new MonClient(g_ceph_context, poolctx); mclients[i]->build_initial_monmap(); auto client = new StandaloneClient(messengers[i], mclients[i]); client->set_filer_flags(syn_filer_flags); @@ -79,6 +81,8 @@ int main(int argc, const char **argv, char *envp[]) ++p) (*p)->start_thread(); + poolctx.stop(); + //cout << "waiting for client(s) to finish" << std::endl; while (!clients.empty()) { Client *client = clients.front(); @@ -99,4 +103,3 @@ int main(int argc, const char **argv, char *envp[]) } return 0; } - diff --git a/src/client/Client.cc b/src/client/Client.cc index 0a2a4da4255..8bd78b4046b 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -44,6 +44,7 @@ #include "common/config.h" #include "common/version.h" +#include "common/async/blocked_completion.h" #include "mon/MonClient.h" @@ -124,6 +125,9 @@ #define DEBUG_GETATTR_CAPS (CEPH_CAP_XATTR_SHARED) +namespace bs = boost::system; +namespace ca = ceph::async; + void client_flush_set_callback(void *p, ObjectCacher::ObjectSet *oset) { Client *client = static_cast(p); @@ -5703,22 +5707,21 @@ int Client::authenticate() int Client::fetch_fsmap(bool user) { - int r; // Retrieve FSMap to enable looking up daemon addresses. We need FSMap // rather than MDSMap because no one MDSMap contains all the daemons, and // a `tell` can address any daemon. version_t fsmap_latest; + bs::error_code ec; do { - C_SaferCond cond; - monclient->get_version("fsmap", &fsmap_latest, NULL, &cond); client_lock.unlock(); - r = cond.wait(); + std::tie(fsmap_latest, std::ignore) = + monclient->get_version("fsmap", ca::use_blocked[ec]); client_lock.lock(); - } while (r == -EAGAIN); + } while (ec == bs::errc::resource_unavailable_try_again); - if (r < 0) { - lderr(cct) << "Failed to learn FSMap version: " << cpp_strerror(r) << dendl; - return r; + if (ec) { + lderr(cct) << "Failed to learn FSMap version: " << ec << dendl; + return ceph::from_error_code(ec); } ldout(cct, 10) << __func__ << " learned FSMap version " << fsmap_latest << dendl; @@ -14669,7 +14672,7 @@ mds_rank_t Client::_get_random_up_mds() const StandaloneClient::StandaloneClient(Messenger *m, MonClient *mc) - : Client(m, mc, new Objecter(m->cct, m, mc, NULL, 0, 0)) + : Client(m, mc, new Objecter(m->cct, m, mc, nullptr, 0, 0)) { monclient->set_messenger(m); objecter->set_client_incarnation(0); diff --git a/src/common/ceph_context.h b/src/common/ceph_context.h index adacecebfc3..187cd3d165a 100644 --- a/src/common/ceph_context.h +++ b/src/common/ceph_context.h @@ -25,8 +25,10 @@ #include #include -#include "include/common_fwd.h" +#include + #include "include/any.h" +#include "include/common_fwd.h" #include "common/cmdparse.h" #include "common/code_environment.h" @@ -375,4 +377,17 @@ private: #endif #endif // WITH_SEASTAR +#if !(defined(WITH_SEASTAR) && !defined(WITH_ALIEN)) && defined(__cplusplus) +namespace ceph::common { +inline void intrusive_ptr_add_ref(CephContext* cct) +{ + cct->get(); +} + +inline void intrusive_ptr_release(CephContext* cct) +{ + cct->put(); +} +} +#endif // !(defined(WITH_SEASTAR) && !defined(WITH_ALIEN)) && defined(__cplusplus) #endif diff --git a/src/common/options.cc b/src/common/options.cc index 9d7c97af532..53cc9df101c 100644 --- a/src/common/options.cc +++ b/src/common/options.cc @@ -5357,6 +5357,18 @@ std::vector