From: Adam C. Emerson Date: Tue, 7 Aug 2018 20:40:57 +0000 (-0400) Subject: monc: Asifoact MonClient X-Git-Tag: v15.1.0~843^2~8 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=73f5249f3f14886831a9da5fb7731b0c227b8d69;p=ceph.git monc: Asifoact MonClient Of course now everyone has to feed an io_context into the MonClient. Signed-off-by: Adam C. Emerson --- diff --git a/src/ceph_fuse.cc b/src/ceph_fuse.cc index 5ac18116f12c..4f3cb7837532 100644 --- a/src/ceph_fuse.cc +++ b/src/ceph_fuse.cc @@ -16,7 +16,9 @@ #include #include #include +#include +#include "common/async/context_pool.h" #include "common/config.h" #include "common/errno.h" @@ -44,6 +46,8 @@ #define dout_context g_ceph_context +ceph::async::io_context_pool icp; + static void fuse_usage() { const char* argv[] = { @@ -223,7 +227,8 @@ int main(int argc, const char **argv, const char *envp[]) { int tester_r = 0; void *tester_rp = nullptr; - MonClient *mc = new MonClient(g_ceph_context); + icp.start(cct->_conf.get_val("client_asio_thread_count")); + MonClient *mc = new MonClient(g_ceph_context, icp); int r = mc->build_initial_monmap(); if (r == -EINVAL) { cerr << "failed to generate initial mon list" << std::endl; @@ -305,6 +310,7 @@ int main(int argc, const char **argv, const char *envp[]) { client->unmount(); cfuse->finalize(); out_shutdown: + icp.stop(); client->shutdown(); out_init_failed: unregister_async_signal_handler(SIGHUP, sighup_handler); diff --git a/src/ceph_mds.cc b/src/ceph_mds.cc index 38e673755a95..4e6714ac44e4 100644 --- a/src/ceph_mds.cc +++ b/src/ceph_mds.cc @@ -20,6 +20,7 @@ #include #include +#include "common/async/context_pool.h" #include "include/ceph_features.h" #include "include/compat.h" #include "include/random.h" @@ -176,7 +177,8 @@ int main(int argc, const char **argv) register_async_signal_handler(SIGHUP, sighup_handler); // get monmap - MonClient mc(g_ceph_context); + ceph::async::io_context_pool ctxpool(2); + MonClient mc(g_ceph_context, ctxpool); if (mc.build_initial_monmap() < 0) forker.exit(1); global_init_chdir(g_ceph_context); @@ -184,7 +186,7 @@ int main(int argc, const char **argv) msgr->start(); // start mds - mds = new MDSDaemon(g_conf()->name.get_id().c_str(), msgr, &mc); + mds = new MDSDaemon(g_conf()->name.get_id().c_str(), msgr, &mc, ctxpool); // in case we have to respawn... mds->orig_argc = argc; @@ -215,6 +217,7 @@ int main(int argc, const char **argv) shutdown_async_signal_handler(); shutdown: + ctxpool.stop(); // yuck: grab the mds lock, so we can be sure that whoever in *mds // called shutdown finishes what they were doing. mds->mds_lock.lock(); @@ -238,4 +241,3 @@ int main(int argc, const char **argv) return 0; } - diff --git a/src/ceph_osd.cc b/src/ceph_osd.cc index 3e03e8632396..18101dee1221 100644 --- a/src/ceph_osd.cc +++ b/src/ceph_osd.cc @@ -664,7 +664,10 @@ flushjournal_out: srand(time(NULL) + getpid()); - MonClient mc(g_ceph_context); + ceph::async::io_context_pool poolctx( + cct->_conf.get_val("osd_asio_thread_count")); + + MonClient mc(g_ceph_context, poolctx); if (mc.build_initial_monmap() < 0) return -1; global_init_chdir(g_ceph_context); @@ -685,7 +688,8 @@ flushjournal_out: ms_objecter, &mc, data_path, - journal_path); + journal_path, + poolctx); int err = osdptr->pre_init(); if (err < 0) { @@ -740,6 +744,7 @@ flushjournal_out: shutdown_async_signal_handler(); // done + poolctx.stop(); delete osdptr; delete ms_public; delete ms_hb_front_client; diff --git a/src/ceph_syn.cc b/src/ceph_syn.cc index 50e26f2815a0..e3ca13287020 100644 --- a/src/ceph_syn.cc +++ b/src/ceph_syn.cc @@ -18,6 +18,7 @@ #include "common/config.h" +#include "common/async/context_pool.h" #include "client/SyntheticClient.h" #include "client/Client.h" @@ -50,7 +51,8 @@ int main(int argc, const char **argv, char *envp[]) pick_addresses(g_ceph_context, CEPH_PICK_ADDRESS_PUBLIC); // get monmap - MonClient mc(g_ceph_context); + ceph::async::io_context_pool poolctx(1); + MonClient mc(g_ceph_context, poolctx); if (mc.build_initial_monmap() < 0) return -1; @@ -64,7 +66,7 @@ int main(int argc, const char **argv, char *envp[]) messengers[i] = Messenger::create_client_messenger(g_ceph_context, "synclient"); messengers[i]->bind(g_conf()->public_addr); - mclients[i] = new MonClient(g_ceph_context); + mclients[i] = new MonClient(g_ceph_context, poolctx); mclients[i]->build_initial_monmap(); auto client = new StandaloneClient(messengers[i], mclients[i]); client->set_filer_flags(syn_filer_flags); @@ -79,6 +81,8 @@ int main(int argc, const char **argv, char *envp[]) ++p) (*p)->start_thread(); + poolctx.stop(); + //cout << "waiting for client(s) to finish" << std::endl; while (!clients.empty()) { Client *client = clients.front(); @@ -99,4 +103,3 @@ int main(int argc, const char **argv, char *envp[]) } return 0; } - diff --git a/src/client/Client.cc b/src/client/Client.cc index f3f6cacc9eba..ec0fb160b047 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -44,6 +44,7 @@ #include "common/config.h" #include "common/version.h" +#include "common/async/waiter.h" #include "mon/MonClient.h" @@ -124,6 +125,8 @@ #define DEBUG_GETATTR_CAPS (CEPH_CAP_XATTR_SHARED) +namespace bs = boost::system; + void client_flush_set_callback(void *p, ObjectCacher::ObjectSet *oset) { Client *client = static_cast(p); @@ -5643,22 +5646,22 @@ int Client::authenticate() int Client::fetch_fsmap(bool user) { - int r; // Retrieve FSMap to enable looking up daemon addresses. We need FSMap // rather than MDSMap because no one MDSMap contains all the daemons, and // a `tell` can address any daemon. version_t fsmap_latest; + boost::system::error_code ec; do { - C_SaferCond cond; - monclient->get_version("fsmap", &fsmap_latest, NULL, &cond); + ceph::async::waiter w; + monclient->get_version("fsmap", w); client_lock.unlock(); - r = cond.wait(); + std::tie(ec, fsmap_latest, std::ignore) = w.wait(); client_lock.lock(); - } while (r == -EAGAIN); + } while (ec == boost::system::errc::resource_unavailable_try_again); - if (r < 0) { - lderr(cct) << "Failed to learn FSMap version: " << cpp_strerror(r) << dendl; - return r; + if (ec) { + lderr(cct) << "Failed to learn FSMap version: " << ec << dendl; + return ceph::from_error_code(ec); } ldout(cct, 10) << __func__ << " learned FSMap version " << fsmap_latest << dendl; @@ -14595,7 +14598,7 @@ mds_rank_t Client::_get_random_up_mds() const StandaloneClient::StandaloneClient(Messenger *m, MonClient *mc) - : Client(m, mc, new Objecter(m->cct, m, mc, NULL, 0, 0)) + : Client(m, mc, new Objecter(m->cct, m, mc, nullptr, 0, 0)) { monclient->set_messenger(m); objecter->set_client_incarnation(0); diff --git a/src/common/ceph_context.h b/src/common/ceph_context.h index 6b6e9482b997..ae590865945c 100644 --- a/src/common/ceph_context.h +++ b/src/common/ceph_context.h @@ -25,6 +25,8 @@ #include #include +#include + #include "include/any.h" #include "common/cmdparse.h" @@ -363,4 +365,15 @@ private: }; #endif // WITH_SEASTAR +inline void intrusive_ptr_add_ref(CephContext* cct) +{ + cct->get(); +} + +inline void intrusive_ptr_release(CephContext* cct) +{ + cct->put(); +} + + #endif diff --git a/src/common/options.cc b/src/common/options.cc index 1f76931f8054..19b172830fd5 100644 --- a/src/common/options.cc +++ b/src/common/options.cc @@ -5224,6 +5224,18 @@ std::vector