Of course now everyone has to feed an io_context into the MonClient.
Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
#include <sys/utsname.h>
#include <iostream>
#include <string>
+#include <optional>
+#include "common/async/context_pool.h"
#include "common/config.h"
#include "common/errno.h"
#define dout_context g_ceph_context
+ceph::async::io_context_pool icp;
+
static void fuse_usage()
{
const char* argv[] = {
int tester_r = 0;
void *tester_rp = nullptr;
- MonClient *mc = new MonClient(g_ceph_context);
+ icp.start(cct->_conf.get_val<std::uint64_t>("client_asio_thread_count"));
+ MonClient *mc = new MonClient(g_ceph_context, icp);
int r = mc->build_initial_monmap();
if (r == -EINVAL) {
cerr << "failed to generate initial mon list" << std::endl;
client->unmount();
cfuse->finalize();
out_shutdown:
+ icp.stop();
client->shutdown();
out_init_failed:
unregister_async_signal_handler(SIGHUP, sighup_handler);
#include <iostream>
#include <string>
+#include "common/async/context_pool.h"
#include "include/ceph_features.h"
#include "include/compat.h"
#include "include/random.h"
register_async_signal_handler(SIGHUP, sighup_handler);
// get monmap
- MonClient mc(g_ceph_context);
+ ceph::async::io_context_pool ctxpool(2);
+ MonClient mc(g_ceph_context, ctxpool);
if (mc.build_initial_monmap() < 0)
forker.exit(1);
global_init_chdir(g_ceph_context);
msgr->start();
// start mds
- mds = new MDSDaemon(g_conf()->name.get_id().c_str(), msgr, &mc);
+ mds = new MDSDaemon(g_conf()->name.get_id().c_str(), msgr, &mc, ctxpool);
// in case we have to respawn...
mds->orig_argc = argc;
shutdown_async_signal_handler();
shutdown:
+ ctxpool.stop();
// yuck: grab the mds lock, so we can be sure that whoever in *mds
// called shutdown finishes what they were doing.
mds->mds_lock.lock();
return 0;
}
-
srand(time(NULL) + getpid());
- MonClient mc(g_ceph_context);
+ ceph::async::io_context_pool poolctx(
+ cct->_conf.get_val<std::uint64_t>("osd_asio_thread_count"));
+
+ MonClient mc(g_ceph_context, poolctx);
if (mc.build_initial_monmap() < 0)
return -1;
global_init_chdir(g_ceph_context);
ms_objecter,
&mc,
data_path,
- journal_path);
+ journal_path,
+ poolctx);
int err = osdptr->pre_init();
if (err < 0) {
shutdown_async_signal_handler();
// done
+ poolctx.stop();
delete osdptr;
delete ms_public;
delete ms_hb_front_client;
#include "common/config.h"
+#include "common/async/context_pool.h"
#include "client/SyntheticClient.h"
#include "client/Client.h"
pick_addresses(g_ceph_context, CEPH_PICK_ADDRESS_PUBLIC);
// get monmap
- MonClient mc(g_ceph_context);
+ ceph::async::io_context_pool poolctx(1);
+ MonClient mc(g_ceph_context, poolctx);
if (mc.build_initial_monmap() < 0)
return -1;
messengers[i] = Messenger::create_client_messenger(g_ceph_context,
"synclient");
messengers[i]->bind(g_conf()->public_addr);
- mclients[i] = new MonClient(g_ceph_context);
+ mclients[i] = new MonClient(g_ceph_context, poolctx);
mclients[i]->build_initial_monmap();
auto client = new StandaloneClient(messengers[i], mclients[i]);
client->set_filer_flags(syn_filer_flags);
++p)
(*p)->start_thread();
+ poolctx.stop();
+
//cout << "waiting for client(s) to finish" << std::endl;
while (!clients.empty()) {
Client *client = clients.front();
}
return 0;
}
-
#include "common/config.h"
#include "common/version.h"
+#include "common/async/blocked_completion.h"
#include "mon/MonClient.h"
#define DEBUG_GETATTR_CAPS (CEPH_CAP_XATTR_SHARED)
+namespace bs = boost::system;
+namespace ca = ceph::async;
+
void client_flush_set_callback(void *p, ObjectCacher::ObjectSet *oset)
{
Client *client = static_cast<Client*>(p);
int Client::fetch_fsmap(bool user)
{
- int r;
// Retrieve FSMap to enable looking up daemon addresses. We need FSMap
// rather than MDSMap because no one MDSMap contains all the daemons, and
// a `tell` can address any daemon.
version_t fsmap_latest;
+ bs::error_code ec;
do {
- C_SaferCond cond;
- monclient->get_version("fsmap", &fsmap_latest, NULL, &cond);
client_lock.unlock();
- r = cond.wait();
+ std::tie(fsmap_latest, std::ignore) =
+ monclient->get_version("fsmap", ca::use_blocked[ec]);
client_lock.lock();
- } while (r == -EAGAIN);
+ } while (ec == bs::errc::resource_unavailable_try_again);
- if (r < 0) {
- lderr(cct) << "Failed to learn FSMap version: " << cpp_strerror(r) << dendl;
- return r;
+ if (ec) {
+ lderr(cct) << "Failed to learn FSMap version: " << ec << dendl;
+ return ceph::from_error_code(ec);
}
ldout(cct, 10) << __func__ << " learned FSMap version " << fsmap_latest << dendl;
StandaloneClient::StandaloneClient(Messenger *m, MonClient *mc)
- : Client(m, mc, new Objecter(m->cct, m, mc, NULL, 0, 0))
+ : Client(m, mc, new Objecter(m->cct, m, mc, nullptr, 0, 0))
{
monclient->set_messenger(m);
objecter->set_client_incarnation(0);
#include <typeinfo>
#include <typeindex>
-#include "include/common_fwd.h"
+#include <boost/intrusive_ptr.hpp>
+
#include "include/any.h"
+#include "include/common_fwd.h"
#include "common/cmdparse.h"
#include "common/code_environment.h"
#endif
#endif // WITH_SEASTAR
+#if !(defined(WITH_SEASTAR) && !defined(WITH_ALIEN)) && defined(__cplusplus)
+namespace ceph::common {
+inline void intrusive_ptr_add_ref(CephContext* cct)
+{
+ cct->get();
+}
+
+inline void intrusive_ptr_release(CephContext* cct)
+{
+ cct->put();
+}
+}
+#endif // !(defined(WITH_SEASTAR) && !defined(WITH_ALIEN)) && defined(__cplusplus)
#endif
.set_default(0)
.set_description("Override 60 second periods for testing only"),
+ Option("librados_thread_count", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
+ .set_default(2)
+ .set_min(1)
+ .set_description("Size of thread pool for Objecter")
+ .add_tag("client"),
+
+ Option("osd_asio_thread_count", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
+ .set_default(2)
+ .set_min(1)
+ .set_description("Size of thread pool for ASIO completions")
+ .add_tag("osd"),
+
// ----------------------------
// Crimson specific options
.set_flag(Option::FLAG_RUNTIME)
.set_description("max snapshots per directory")
.set_long_description("maximum number of snapshots that can be created per directory"),
+
+ Option("mds_asio_thread_count", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
+ .set_default(2)
+ .set_min(1)
+ .set_description("Size of thread pool for ASIO completions")
+ .add_tag("mds")
});
}
Option("debug_allow_any_pool_priority", Option::TYPE_BOOL, Option::LEVEL_DEV)
.set_default(false)
.set_description("Allow any pool priority to be set to test conversion to new range"),
+
+ Option("client_asio_thread_count", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
+ .set_default(2)
+ .set_min(1)
+ .set_description("Size of thread pool for ASIO completions")
+ .add_tag("client")
});
}
*
*/
+#include "common/async/context_pool.h"
#include "common/ceph_argparse.h"
#include "common/code_environment.h"
#include "common/config.h"
// make sure our mini-session gets legacy values
g_conf().apply_changes(nullptr);
- MonClient mc_bootstrap(g_ceph_context);
+ ceph::async::io_context_pool cp(1);
+ MonClient mc_bootstrap(g_ceph_context, cp);
if (mc_bootstrap.get_monmap_and_config() < 0) {
+ cp.stop();
g_ceph_context->_log->flush();
cerr << "failed to fetch mon config (--no-mon-config to skip)"
<< std::endl;
_exit(1);
}
+ cp.stop();
}
// Expand metavariables. Invoke configuration observers. Open log file.
return boost::intrusive_ptr<CephContext>{g_ceph_context, false};
}
-namespace TOPNSPC::common {
-void intrusive_ptr_add_ref(CephContext* cct)
-{
- cct->get();
-}
-void intrusive_ptr_release(CephContext* cct)
-{
- cct->put();
-}
-}
void global_print_banner(void)
{
output_ceph_version();
#include <map>
#include <boost/intrusive_ptr.hpp>
#include "include/ceph_assert.h"
+#include "common/ceph_context.h"
#include "common/code_environment.h"
#include "common/common_init.h"
const char *data_dir_option = 0,
bool run_pre_init = true);
-namespace TOPNSPC::common {
- void intrusive_ptr_add_ref(CephContext* cct);
- void intrusive_ptr_release(CephContext* cct);
-}
-
// just the first half; enough to get config parsed but doesn't start up the
// cct or log.
void global_pre_init(const std::map<std::string,std::string> *defaults,
#include "auth/Crypto.h"
#include "client/Client.h"
#include "librados/RadosClient.h"
+#include "common/async/context_pool.h"
#include "common/ceph_argparse.h"
#include "common/common_init.h"
#include "common/config.h"
#define DEFAULT_UMASK 002
static mode_t umask_cb(void *);
+ceph::async::io_context_pool icp;
struct ceph_mount_info
{
cct->_log->start();
}
+ icp.start(cct->_conf.get_val<std::uint64_t>("client_asio_thread_count"));
{
- MonClient mc_bootstrap(cct);
+ MonClient mc_bootstrap(cct, icp);
ret = mc_bootstrap.get_monmap_and_config();
if (ret < 0)
return ret;
common_init_finish(cct);
//monmap
- monclient = new MonClient(cct);
+ monclient = new MonClient(cct, icp);
ret = -CEPHFS_ERROR_MON_MAP_BUILD; //defined in libcephfs.h;
if (monclient->build_initial_monmap() < 0)
goto fail;
delete messenger;
messenger = nullptr;
}
+ icp.stop();
if (monclient) {
delete monclient;
monclient = nullptr;
#include "common/ceph_json.h"
#include "common/errno.h"
#include "common/ceph_json.h"
+#include "common/async/blocked_completion.h"
#include "include/buffer.h"
#include "include/stringify.h"
#include "include/util.h"
#undef dout_prefix
#define dout_prefix *_dout << "librados: "
+namespace bs = boost::system;
+namespace ca = ceph::async;
+
librados::RadosClient::RadosClient(CephContext *cct_)
: Dispatcher(cct_->get()),
cct_deleter{cct_, [](CephContext *p) {p->put();}},
conf(cct_->_conf),
state(DISCONNECTED),
- monclient(cct_),
+ monclient(cct_, poolctx),
mgrclient(cct_, nullptr, &monclient.monmap),
messenger(NULL),
instance_id(0),
}
{
- MonClient mc_bootstrap(cct);
+ MonClient mc_bootstrap(cct, poolctx);
err = mc_bootstrap.get_monmap_and_config();
if (err < 0)
return err;
common_init_finish(cct);
+ poolctx.start(cct->_conf.get_val<std::uint64_t>("librados_thread_count"));
+
// get monmap
err = monclient.build_initial_monmap();
if (err < 0)
messenger->wait();
}
ldout(cct, 1) << "shutdown" << dendl;
+ poolctx.finish();
}
int librados::RadosClient::watch_flush()
Context *on_finish)
{
std::lock_guard l{lock};
- monclient.start_mon_command(cmd, inbl, outbl, outs, on_finish);
+ monclient.start_mon_command(cmd, inbl,
+ [outs, outbl,
+ on_finish = std::unique_ptr<Context>(on_finish)]
+ (bs::error_code e,
+ std::string&& s,
+ ceph::bufferlist&& b) mutable {
+ if (outs)
+ *outs = std::move(s);
+ if (outbl)
+ *outbl = std::move(b);
+ if (on_finish)
+ on_finish.release()->complete(
+ ceph::from_error_code(e));
+ });
}
int librados::RadosClient::mgr_command(const vector<string>& cmd,
const bufferlist &inbl,
bufferlist *outbl, string *outs)
{
- ceph::mutex mylock = ceph::make_mutex("RadosClient::mon_command::mylock");
- ceph::condition_variable cond;
- bool done;
- int rval;
- {
- std::lock_guard l{mylock};
- monclient.start_mon_command(rank, cmd, inbl, outbl, outs,
- new C_SafeCond(mylock, cond, &done, &rval));
- }
- std::unique_lock l{mylock};
- cond.wait(l, [&done] { return done;});
- return rval;
+ bs::error_code ec;
+ auto&& [s, bl] = monclient.start_mon_command(rank, cmd, inbl,
+ ca::use_blocked[ec]);
+ if (outs)
+ *outs = std::move(s);
+ if (outbl)
+ *outbl = std::move(bl);
+
+ return ceph::from_error_code(ec);
}
int librados::RadosClient::mon_command(string name, const vector<string>& cmd,
const bufferlist &inbl,
bufferlist *outbl, string *outs)
{
- ceph::mutex mylock = ceph::make_mutex("RadosClient::mon_command::mylock");
- ceph::condition_variable cond;
- bool done;
- int rval;
- {
- std::lock_guard l{mylock};
- monclient.start_mon_command(name, cmd, inbl, outbl, outs,
- new C_SafeCond(mylock, cond, &done, &rval));
- }
- std::unique_lock l{mylock};
- cond.wait(l, [&done] { return done;});
- return rval;
+ bs::error_code ec;
+ auto&& [s, bl] = monclient.start_mon_command(name, cmd, inbl,
+ ca::use_blocked[ec]);
+ if (outs)
+ *outs = std::move(s);
+ if (outbl)
+ *outbl = std::move(bl);
+
+ return ceph::from_error_code(ec);
}
int librados::RadosClient::osd_command(int osd, vector<string>& cmd,
}
return 0;
}
+
+namespace {
+const char *config_keys[] = {
+ "librados_thread_count",
+ NULL
+};
+}
+
+const char** librados::RadosClient::get_tracked_conf_keys() const
+{
+ return config_keys;
+}
+
+void librados::RadosClient::handle_conf_change(const ConfigProxy& conf,
+ const std::set<std::string> &changed)
+{
+ if (changed.count("librados_thread_count")) {
+ poolctx.stop();
+ poolctx.start(conf.get_val<std::uint64_t>("librados_thread_count"));
+ }
+}
#ifndef CEPH_LIBRADOS_RADOSCLIENT_H
#define CEPH_LIBRADOS_RADOSCLIENT_H
+#include <functional>
+#include <memory>
+#include <string>
+
+#include "msg/Dispatcher.h"
+
+#include "common/async/context_pool.h"
#include "common/config_fwd.h"
#include "common/Cond.h"
#include "common/Timer.h"
#include "common/ceph_mutex.h"
#include "common/ceph_time.h"
+#include "common/config_obs.h"
#include "include/common_fwd.h"
#include "include/rados/librados.h"
#include "include/rados/librados.hpp"
#include "mon/MonClient.h"
#include "mgr/MgrClient.h"
-#include "msg/Dispatcher.h"
#include "IoCtxImpl.h"
-struct AuthAuthorizer;
struct Context;
-struct Connection;
class Message;
class MLog;
class Messenger;
class AioCompletionImpl;
-class librados::RadosClient : public Dispatcher
+class librados::RadosClient : public Dispatcher,
+ public md_config_obs_t
{
std::unique_ptr<CephContext,
std::function<void(CephContext*)> > cct_deleter;
public:
using Dispatcher::cct;
- const ConfigProxy& conf;
+ const ConfigProxy& conf{cct->_conf};
+ ceph::async::io_context_pool poolctx;
private:
enum {
DISCONNECTED,
explicit RadosClient(CephContext *cct_);
~RadosClient() override;
- int ping_monitor(string mon_id, string *result);
+ int ping_monitor(std::string mon_id, std::string *result);
int connect();
void shutdown();
mon_feature_t get_required_monitor_features() const;
int get_inconsistent_pgs(int64_t pool_id, std::vector<std::string>* pgs);
+ const char** get_tracked_conf_keys() const override;
+ void handle_conf_change(const ConfigProxy& conf,
+ const std::set <std::string> &changed) override;
};
#endif
if (!pending_safe)
safe();
}
-
-void intrusive_ptr_add_ref(libradosstriper::MultiAioCompletionImpl* ptr)
-{
- ptr->get();
-}
-
-void intrusive_ptr_release(libradosstriper::MultiAioCompletionImpl* ptr)
-{
- ptr->put();
-}
#include "common/ceph_mutex.h"
#include "include/radosstriper/libradosstriper.hpp"
-struct libradosstriper::MultiAioCompletionImpl {
+namespace libradosstriper {
+
+struct MultiAioCompletionImpl {
ceph::mutex lock = ceph::make_mutex("MultiAioCompletionImpl lock", false);
ceph::condition_variable cond;
void complete_request(ssize_t r);
void safe_request(ssize_t r);
void finish_adding_requests();
-
};
-void intrusive_ptr_add_ref(libradosstriper::MultiAioCompletionImpl*);
-void intrusive_ptr_release(libradosstriper::MultiAioCompletionImpl*);
+inline void intrusive_ptr_add_ref(MultiAioCompletionImpl* ptr)
+{
+ ptr->get();
+}
+
+inline void intrusive_ptr_release(MultiAioCompletionImpl* ptr)
+{
+ ptr->put();
+}
+}
#endif // CEPH_LIBRADOSSTRIPERSTRIPER_MULTIAIOCOMPLETIONIMPL_H
#include <string>
+#include <boost/intrusive_ptr.hpp>
+
#include "include/rados/librados.h"
#include "include/rados/librados.hpp"
#include "include/radosstriper/libradosstriper.h"
#include "librados/IoCtxImpl.h"
#include "librados/AioCompletionImpl.h"
#include "common/RefCountedObj.h"
+#include "common/ceph_context.h"
namespace libradosstriper {
#define dout_prefix *_dout << "mds." << name << ' '
using TOPNSPC::common::cmd_getval;
// cons/des
-MDSDaemon::MDSDaemon(std::string_view n, Messenger *m, MonClient *mc) :
+MDSDaemon::MDSDaemon(std::string_view n, Messenger *m, MonClient *mc,
+ boost::asio::io_context& ioctx) :
Dispatcher(m->cct),
timer(m->cct, mds_lock),
gss_ktfile_client(m->cct->_conf.get_val<std::string>("gss_ktab_client_file")),
name(n),
messenger(m),
monc(mc),
+ ioctx(ioctx),
mgrc(m->cct, m, &mc->monmap),
log_client(m->cct, messenger, &mc->monmap, LogClient::NO_FLAGS),
starttime(mono_clock::now())
// Did I previously not hold a rank? Initialize!
if (mds_rank == NULL) {
- mds_rank = new MDSRankDispatcher(whoami, mds_lock, clog,
- timer, beacon, mdsmap, messenger, monc, &mgrc,
- new LambdaContext([this](int r){respawn();}),
- new LambdaContext([this](int r){suicide();}));
+ mds_rank = new MDSRankDispatcher(
+ whoami, mds_lock, clog,
+ timer, beacon, mdsmap, messenger, monc, &mgrc,
+ new LambdaContext([this](int r){respawn();}),
+ new LambdaContext([this](int r){suicide();}),
+ ioctx);
dout(10) << __func__ << ": initializing MDS rank "
<< mds_rank->get_nodeid() << dendl;
mds_rank->init();
class MDSDaemon : public Dispatcher {
public:
- MDSDaemon(std::string_view n, Messenger *m, MonClient *mc);
+ MDSDaemon(std::string_view n, Messenger *m, MonClient *mc,
+ boost::asio::io_context& ioctx);
+
~MDSDaemon() override;
mono_time get_starttime() const {
Messenger *messenger;
MonClient *monc;
+ boost::asio::io_context& ioctx;
MgrClient mgrc;
std::unique_ptr<MDSMap> mdsmap;
LogClient log_client;
MonClient *monc_,
MgrClient *mgrc,
Context *respawn_hook_,
- Context *suicide_hook_) :
+ Context *suicide_hook_,
+ boost::asio::io_context& ioc) :
cct(msgr->cct), mds_lock(mds_lock_), clog(clog_),
timer(timer_), mdsmap(mdsmap_),
objecter(new Objecter(g_ceph_context, msgr, monc_, nullptr, 0, 0)),
messenger(msgr), monc(monc_), mgrc(mgrc),
respawn_hook(respawn_hook_),
suicide_hook(suicide_hook_),
- starttime(mono_clock::now())
+ starttime(mono_clock::now()),
+ ioc(ioc)
{
hb = g_ceph_context->get_heartbeat_map()->add_worker("MDSRank", pthread_self());
MonClient *monc_,
MgrClient *mgrc,
Context *respawn_hook_,
- Context *suicide_hook_)
+ Context *suicide_hook_,
+ boost::asio::io_context& ioc)
: MDSRank(whoami_, mds_lock_, clog_, timer_, beacon_, mdsmap_,
- msgr, monc_, mgrc, respawn_hook_, suicide_hook_)
+ msgr, monc_, mgrc, respawn_hook_, suicide_hook_, ioc)
{
g_conf().add_observer(this);
}
#include <string_view>
+#include <boost/asio/io_context.hpp>
+
#include "common/DecayCounter.h"
#include "common/LogClient.h"
#include "common/Timer.h"
MonClient *monc_,
MgrClient *mgrc,
Context *respawn_hook_,
- Context *suicide_hook_);
+ Context *suicide_hook_,
+ boost::asio::io_context& ioc);
mds_rank_t get_nodeid() const { return whoami; }
int64_t get_metadata_pool();
void send_task_status();
mono_time starttime = mono_clock::zero();
+ boost::asio::io_context& ioc;
};
/* This expects to be given a reference which it is responsible for.
MonClient *monc_,
MgrClient *mgrc,
Context *respawn_hook_,
- Context *suicide_hook_);
+ Context *suicide_hook_,
+ boost::asio::io_context& ioc);
void init();
void tick();
#include <memory>
#include "common/ceph_json.h"
+#include "common/Cond.h"
#include "mon/MonClient.h"
class Command
MgrStandby::MgrStandby(int argc, const char **argv) :
Dispatcher(g_ceph_context),
- monc{g_ceph_context},
+ monc{g_ceph_context, poolctx},
client_messenger(Messenger::create(
g_ceph_context,
cct->_conf.get_val<std::string>("ms_type"),
client_messenger->add_dispatcher_tail(&client);
client_messenger->start();
+ poolctx.start(2);
+
// Initialize MonClient
if (monc.build_initial_monmap() < 0) {
client_messenger->shutdown();
dout(4) << "Shutting down" << dendl;
- // stop sending beacon first, i use monc to talk with monitors
+ py_module_registry.shutdown();
+ // stop sending beacon first, I use monc to talk with monitors
timer.shutdown();
// client uses monc and objecter
client.shutdown();
mgrc.shutdown();
+ // Stop asio threads, so leftover events won't call into shut down
+ // monclient/objecter.
+ poolctx.finish();
// stop monc, so mon won't be able to instruct me to shutdown/activate after
// the active_mgr is stopped
monc.shutdown();
if (active_mgr) {
active_mgr->shutdown();
}
-
- py_module_registry.shutdown();
-
// objecter is used by monc and active_mgr
objecter.shutdown();
// client_messenger is used by all of them, so stop it in the end
#define MGR_STANDBY_H_
#include "auth/Auth.h"
+#include "common/async/context_pool.h"
#include "common/Finisher.h"
#include "common/Timer.h"
#include "common/LogClient.h"
const std::set <std::string> &changed) override;
protected:
+ ceph::async::io_context_pool poolctx;
MonClient monc;
std::unique_ptr<Messenger> client_messenger;
Objecter objecter;
// set daemon profiles
if ((p->first.is_osd() || p->first.is_mds()) &&
mon_caps == "allow rwx") {
- new_caps = string("allow profile ") + p->first.get_type_name();
+ new_caps = string("allow profile ") + std::string(p->first.get_type_name());
}
// update bootstrap keys
#include "messages/MMonGetMap.h"
#include "messages/MMonGetVersion.h"
+#include "messages/MMonGetMap.h"
#include "messages/MMonGetVersionReply.h"
#include "messages/MMonMap.h"
#include "messages/MConfig.h"
#include "common/LogClient.h"
#include "MonClient.h"
+#include "error_code.h"
#include "MonMap.h"
#include "auth/Auth.h"
#undef dout_prefix
#define dout_prefix *_dout << "monclient" << (_hunting() ? "(hunting)":"") << ": "
+namespace bs = boost::system;
using std::string;
+using namespace std::literals;
-MonClient::MonClient(CephContext *cct_) :
+MonClient::MonClient(CephContext *cct_, boost::asio::io_context& service) :
Dispatcher(cct_),
AuthServer(cct_),
messenger(NULL),
timer(cct_, monc_lock),
- finisher(cct_),
+ service(service),
initialized(false),
log_client(NULL),
more_log_pending(false),
{
ldout(cct, 10) << __func__ << dendl;
std::unique_lock l(monc_lock);
-
+
sub.want("monmap", 0, 0);
if (!_opened())
_reopen_session();
void MonClient::handle_config(MConfig *m)
{
ldout(cct,10) << __func__ << " " << *m << dendl;
- finisher.queue(new LambdaContext([this, m](int r) {
- cct->_conf.set_mon_vals(cct, m->config, config_cb);
- if (config_notify_cb) {
- config_notify_cb();
- }
- m->put();
- }));
+ // Take the sledgehammer approach to ensuring we don't depend on
+ // anything in MonClient.
+ boost::asio::defer(finish_strand,
+ [m, cct = boost::intrusive_ptr<CephContext>(cct),
+ config_notify_cb = config_notify_cb,
+ config_cb = config_cb]() {
+ cct->_conf.set_mon_vals(cct.get(), m->config, config_cb);
+ if (config_notify_cb) {
+ config_notify_cb();
+ }
+ m->put();
+ });
got_config = true;
map_cond.notify_all();
}
messenger->add_dispatcher_head(this);
timer.init();
- finisher.start();
schedule_tick();
return 0;
monc_lock.lock();
stopping = true;
while (!version_requests.empty()) {
- version_requests.begin()->second->context->complete(-ECANCELED);
+ ceph::async::defer(std::move(version_requests.begin()->second),
+ monc_errc::shutting_down, 0, 0);
ldout(cct, 20) << __func__ << " canceling and discarding version request "
- << version_requests.begin()->second << dendl;
- delete version_requests.begin()->second;
+ << version_requests.begin()->first << dendl;
version_requests.erase(version_requests.begin());
}
while (!mon_commands.empty()) {
monc_lock.unlock();
if (initialized) {
- finisher.wait_for_empty();
- finisher.stop();
initialized = false;
}
monc_lock.lock();
// throw out version check requests
while (!version_requests.empty()) {
- finisher.queue(version_requests.begin()->second->context, -EAGAIN);
- delete version_requests.begin()->second;
+ ceph::async::defer(std::move(version_requests.begin()->second),
+ monc_errc::session_reset, 0, 0);
version_requests.erase(version_requests.begin());
}
if (r->is_tell()) {
++r->send_attempts;
if (r->send_attempts > cct->_conf->mon_client_directed_command_retry) {
- _finish_command(r, -ENXIO, "mon unavailable");
+ _finish_command(r, monc_errc::mon_unavailable, "mon unavailable", {});
return;
}
-
// tell-style command
if (monmap.min_mon_release >= ceph_release_t::octopus) {
if (r->target_con) {
if (r->target_rank >= (int)monmap.size()) {
ldout(cct, 10) << " target " << r->target_rank
<< " >= max mon " << monmap.size() << dendl;
- _finish_command(r, -ENOENT, "mon rank dne");
+ _finish_command(r, monc_errc::rank_dne, "mon rank dne"sv, {});
return;
}
r->target_con = messenger->connect_to_mon(
if (!monmap.contains(r->target_name)) {
ldout(cct, 10) << " target " << r->target_name
<< " not present in monmap" << dendl;
- _finish_command(r, -ENOENT, "mon dne");
+ _finish_command(r, monc_errc::mon_dne, "mon dne"sv, {});
return;
}
r->target_con = messenger->connect_to_mon(
if (r->target_rank >= (int)monmap.size()) {
ldout(cct, 10) << " target " << r->target_rank
<< " >= max mon " << monmap.size() << dendl;
- _finish_command(r, -ENOENT, "mon rank dne");
+ _finish_command(r, monc_errc::rank_dne, "mon rank dne"sv, {});
return;
}
_reopen_session(r->target_rank);
if (!monmap.contains(r->target_name)) {
ldout(cct, 10) << " target " << r->target_name
<< " not present in monmap" << dendl;
- _finish_command(r, -ENOENT, "mon dne");
+ _finish_command(r, monc_errc::mon_dne, "mon dne"sv, {});
return;
}
_reopen_session(monmap.get_rank(r->target_name));
}
ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
- if (r->poutbl)
- r->poutbl->claim(ack->get_data());
- _finish_command(r, ack->r, ack->rs);
+ auto ec = ack->r < 0 ? bs::error_code(-ack->r, mon_category())
+ : bs::error_code();
+ _finish_command(r, ec, ack->rs,
+ std::move(ack->get_data()));
ack->put();
}
}
ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
- if (r->poutbl)
- r->poutbl->claim(reply->get_data());
- _finish_command(r, reply->r, reply->rs);
+ auto ec = reply->r < 0 ? bs::error_code(-reply->r, mon_category())
+ : bs::error_code();
+ _finish_command(r, ec, reply->rs, std::move(reply->get_data()));
reply->put();
}
ldout(cct, 10) << __func__ << " tid " << tid << dendl;
MonCommand *cmd = it->second;
- _finish_command(cmd, -ETIMEDOUT, "");
+ _finish_command(cmd, monc_errc::timed_out, "timed out"sv, {});
return 0;
}
-void MonClient::_finish_command(MonCommand *r, int ret, string rs)
+void MonClient::_finish_command(MonCommand *r, bs::error_code ret,
+ std::string_view rs, ceph::buffer::list&& bl)
{
- ldout(cct, 10) << __func__ << " " << r->tid << " = " << ret << " " << rs << dendl;
- if (r->prval)
- *(r->prval) = ret;
- if (r->prs)
- *(r->prs) = rs;
- if (r->onfinish)
- finisher.queue(r->onfinish, ret);
+ ldout(cct, 10) << __func__ << " " << r->tid << " = " << ret << " " << rs
+ << dendl;
+ ceph::async::defer(std::move(r->onfinish), ret, std::string(rs),
+ std::move(bl));
if (r->target_con) {
r->target_con->mark_down();
}
delete r;
}
-void MonClient::start_mon_command(const std::vector<string>& cmd,
- const ceph::buffer::list& inbl,
- ceph::buffer::list *outbl, string *outs,
- Context *onfinish)
-{
- ldout(cct,10) << __func__ << " cmd=" << cmd << dendl;
- std::lock_guard l(monc_lock);
- if (!initialized || stopping) {
- if (onfinish) {
- onfinish->complete(-ECANCELED);
- }
- return;
- }
- MonCommand *r = new MonCommand(++last_mon_command_tid);
- r->cmd = cmd;
- r->inbl = inbl;
- r->poutbl = outbl;
- r->prs = outs;
- r->onfinish = onfinish;
- if (cct->_conf->rados_mon_op_timeout > 0) {
- class C_CancelMonCommand : public Context
- {
- uint64_t tid;
- MonClient *monc;
- public:
- C_CancelMonCommand(uint64_t tid, MonClient *monc) : tid(tid), monc(monc) {}
- void finish(int r) override {
- monc->_cancel_mon_command(tid);
- }
- };
- r->ontimeout = new C_CancelMonCommand(r->tid, this);
- timer.add_event_after(cct->_conf->rados_mon_op_timeout, r->ontimeout);
- }
- mon_commands[r->tid] = r;
- _send_command(r);
-}
-
-void MonClient::start_mon_command(const string &mon_name,
- const std::vector<string>& cmd,
- const ceph::buffer::list& inbl,
- ceph::buffer::list *outbl, string *outs,
- Context *onfinish)
-{
- ldout(cct,10) << __func__ << " mon." << mon_name << " cmd=" << cmd << dendl;
- std::lock_guard l(monc_lock);
- if (!initialized || stopping) {
- if (onfinish) {
- onfinish->complete(-ECANCELED);
- }
- return;
- }
- MonCommand *r = new MonCommand(++last_mon_command_tid);
-
- // detect/tolerate mon *rank* passed as a string
- string err;
- int rank = strict_strtoll(mon_name.c_str(), 10, &err);
- if (err.size() == 0 && rank >= 0) {
- ldout(cct,10) << __func__ << " interpreting name '" << mon_name
- << "' as rank " << rank << dendl;
- r->target_rank = rank;
- } else {
- r->target_name = mon_name;
- }
- r->cmd = cmd;
- r->inbl = inbl;
- r->poutbl = outbl;
- r->prs = outs;
- r->onfinish = onfinish;
- mon_commands[r->tid] = r;
- _send_command(r);
-}
-
-void MonClient::start_mon_command(int rank,
- const std::vector<string>& cmd,
- const ceph::buffer::list& inbl,
- ceph::buffer::list *outbl, string *outs,
- Context *onfinish)
-{
- ldout(cct,10) << __func__ << " rank " << rank << " cmd=" << cmd << dendl;
- std::lock_guard l(monc_lock);
- if (!initialized || stopping) {
- if (onfinish) {
- onfinish->complete(-ECANCELED);
- }
- return;
- }
- MonCommand *r = new MonCommand(++last_mon_command_tid);
- r->target_rank = rank;
- r->cmd = cmd;
- r->inbl = inbl;
- r->poutbl = outbl;
- r->prs = outs;
- r->onfinish = onfinish;
- mon_commands[r->tid] = r;
- _send_command(r);
-}
-
// ---------
-void MonClient::get_version(string map, version_t *newest, version_t *oldest, Context *onfinish)
-{
- version_req_d *req = new version_req_d(onfinish, newest, oldest);
- ldout(cct, 10) << "get_version " << map << " req " << req << dendl;
- std::lock_guard l(monc_lock);
- auto m = ceph::make_message<MMonGetVersion>();
- m->what = map;
- m->handle = ++version_req_id;
- version_requests[m->handle] = req;
- _send_mon_message(std::move(m));
-}
-
void MonClient::handle_get_version_reply(MMonGetVersionReply* m)
{
ceph_assert(ceph_mutex_is_locked(monc_lock));
ldout(cct, 0) << __func__ << " version request with handle " << m->handle
<< " not found" << dendl;
} else {
- version_req_d *req = iter->second;
- ldout(cct, 10) << __func__ << " finishing " << req << " version " << m->version << dendl;
+ auto req = std::move(iter->second);
+ ldout(cct, 10) << __func__ << " finishing " << iter->first << " version "
+ << m->version << dendl;
version_requests.erase(iter);
- if (req->newest)
- *req->newest = m->version;
- if (req->oldest)
- *req->oldest = m->oldest_version;
- finisher.queue(req->context, 0);
- delete req;
+ ceph::async::defer(std::move(req), bs::error_code(),
+ m->version, m->oldest_version);
}
m->put();
}
allowed_methods,
allowed_modes);
if (r < 0) {
- _finish_command(i.second, r, "auth failed");
+ auto ec = bs::error_code(-r, mon_category());
+ _finish_command(i.second, ec, "auth failed"sv, {});
}
return r;
}
md_config_t::config_callback MonClient::get_config_callback() {
return config_cb;
}
+
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wnon-virtual-dtor"
+#pragma clang diagnostic push
+#pragma clang diagnostic ignored "-Wnon-virtual-dtor"
+class monc_error_category : public ceph::converting_category {
+public:
+ monc_error_category(){}
+ const char* name() const noexcept override;
+ const char* message(int ev, char*, std::size_t) const noexcept override;
+ std::string message(int ev) const override;
+ bs::error_condition default_error_condition(int ev) const noexcept
+ override;
+ bool equivalent(int ev, const bs::error_condition& c) const
+ noexcept override;
+ using ceph::converting_category::equivalent;
+ int from_code(int ev) const noexcept override;
+};
+#pragma GCC diagnostic pop
+#pragma clang diagnostic pop
+
+const char* monc_error_category::name() const noexcept {
+ return "monc";
+}
+
+const char* monc_error_category::message(int ev, char*, std::size_t) const noexcept {
+ if (ev == 0)
+ return "No error";
+
+ switch (static_cast<monc_errc>(ev)) {
+ case monc_errc::shutting_down: // Command failed due to MonClient shutting down
+ return "Command failed due to MonClient shutting down";
+ case monc_errc::session_reset:
+ return "Monitor session was reset";
+ case monc_errc::rank_dne:
+ return "Requested monitor rank does not exist";
+ case monc_errc::mon_dne:
+ return "Requested monitor does not exist";
+ case monc_errc::timed_out:
+ return "Monitor operation timed out";
+ case monc_errc::mon_unavailable:
+ return "Monitor unavailable";
+ }
+
+ return "Unknown error";
+}
+
+std::string monc_error_category::message(int ev) const {
+ return message(ev, nullptr, 0);
+}
+
+bs::error_condition monc_error_category::default_error_condition(int ev) const noexcept {
+ switch (static_cast<monc_errc>(ev)) {
+ case monc_errc::shutting_down:
+ return bs::errc::operation_canceled;
+ case monc_errc::session_reset:
+ return bs::errc::resource_unavailable_try_again;
+ case monc_errc::rank_dne:
+ [[fallthrough]];
+ case monc_errc::mon_dne:
+ return ceph::errc::not_in_map;
+ case monc_errc::timed_out:
+ return bs::errc::timed_out;
+ case monc_errc::mon_unavailable:
+ return bs::errc::no_such_device;
+ }
+ return { ev, *this };
+}
+
+bool monc_error_category::equivalent(int ev, const bs::error_condition& c) const noexcept {
+ switch (static_cast<monc_errc>(ev)) {
+ case monc_errc::rank_dne:
+ [[fallthrough]];
+ case monc_errc::mon_dne:
+ return c == bs::errc::no_such_file_or_directory;
+ default:
+ return default_error_condition(ev) == c;
+ }
+}
+
+int monc_error_category::from_code(int ev) const noexcept {
+ if (ev == 0)
+ return 0;
+
+ switch (static_cast<monc_errc>(ev)) {
+ case monc_errc::shutting_down:
+ return -ECANCELED;
+ case monc_errc::session_reset:
+ return -EAGAIN;
+ case monc_errc::rank_dne:
+ [[fallthrough]];
+ case monc_errc::mon_dne:
+ return -ENOENT;
+ case monc_errc::timed_out:
+ return -ETIMEDOUT;
+ case monc_errc::mon_unavailable:
+ return -ENXIO;
+ }
+ return -EDOM;
+}
+
+const bs::error_category& monc_category() noexcept {
+ static const monc_error_category c;
+ return c;
+}
#include "MonMap.h"
#include "MonSub.h"
+#include "common/async/completion.h"
#include "common/Timer.h"
-#include "common/Finisher.h"
#include "common/config.h"
+#include "messages/MMonGetVersion.h"
#include "auth/AuthClient.h"
#include "auth/AuthServer.h"
class MMonMap;
class MConfig;
class MMonGetVersionReply;
-struct MMonSubscribeAck;
class MMonCommandAck;
-struct MAuthReply;
class LogClient;
-class AuthAuthorizer;
class AuthClientHandler;
class AuthRegistry;
class KeyRing;
struct MonClientPinger : public Dispatcher,
public AuthClient {
-
ceph::mutex lock = ceph::make_mutex("MonClientPinger::lock");
ceph::condition_variable ping_recvd_cond;
std::string *result;
}
};
+const boost::system::error_category& monc_category() noexcept;
+
+enum class monc_errc {
+ shutting_down = 1, // Command failed due to MonClient shutting down
+ session_reset, // Monitor session was reset
+ rank_dne, // Requested monitor rank does not exist
+ mon_dne, // Requested monitor does not exist
+ timed_out, // Monitor operation timed out
+ mon_unavailable // Monitor unavailable
+};
+
+namespace boost::system {
+template<>
+struct is_error_code_enum<::monc_errc> {
+ static const bool value = true;
+};
+}
+
+// implicit conversion:
+inline boost::system::error_code make_error_code(monc_errc e) noexcept {
+ return { static_cast<int>(e), monc_category() };
+}
+
+// explicit conversion:
+inline boost::system::error_condition make_error_condition(monc_errc e) noexcept {
+ return { static_cast<int>(e), monc_category() };
+}
+
+const boost::system::error_category& monc_category() noexcept;
class MonClient : public Dispatcher,
public AuthClient,
public AuthServer /* for mgr, osd, mds */ {
+ static constexpr auto dout_subsys = ceph_subsys_monc;
public:
+ // Error, Newest, Oldest
+ using VersionSig = void(boost::system::error_code, version_t, version_t);
+ using VersionCompletion = ceph::async::Completion<VersionSig>;
+
+ using CommandSig = void(boost::system::error_code, std::string,
+ ceph::buffer::list);
+ using CommandCompletion = ceph::async::Completion<CommandSig>;
+
MonMap monmap;
std::map<std::string,std::string> config_mgr;
mutable ceph::mutex monc_lock = ceph::make_mutex("MonClient::monc_lock");
SafeTimer timer;
- Finisher finisher;
+ boost::asio::io_context& service;
+ boost::asio::io_context::strand finish_strand{service};
bool initialized;
bool stopping = false;
double reopen_interval_multiplier;
Dispatcher *handle_authentication_dispatcher = nullptr;
-
bool _opened() const;
bool _hunting() const;
void _start_hunting();
bool more,
uint32_t auth_method,
const ceph::buffer::list& bl,
- ceph::buffer::list *reply);
+ ceph::buffer::list *reply) override;
void set_entity_name(EntityName name) { entity_name = name; }
void set_handle_authentication_dispatcher(Dispatcher *d) {
std::lock_guard l(monc_lock);
return sub.inc_want(what, start, flags);
}
-
+
std::unique_ptr<KeyRing> keyring;
std::unique_ptr<RotatingKeyRing> rotating_secrets;
public:
- explicit MonClient(CephContext *cct_);
+ MonClient(CephContext *cct_, boost::asio::io_context& service);
MonClient(const MonClient &) = delete;
MonClient& operator=(const MonClient &) = delete;
~MonClient() override;
struct MonCommand {
// for tell only
std::string target_name;
- int target_rank;
+ int target_rank = -1;
ConnectionRef target_con;
std::unique_ptr<MonConnection> target_session;
unsigned send_attempts = 0; ///< attempt count for legacy mons
utime_t last_send_attempt;
-
uint64_t tid;
std::vector<std::string> cmd;
ceph::buffer::list inbl;
- ceph::buffer::list *poutbl;
- std::string *prs;
- int *prval;
- Context *onfinish, *ontimeout;
-
- explicit MonCommand(uint64_t t)
- : target_rank(-1),
- tid(t),
- poutbl(NULL), prs(NULL), prval(NULL), onfinish(NULL), ontimeout(NULL)
- {}
+ std::unique_ptr<CommandCompletion> onfinish;
+ std::optional<boost::asio::steady_timer> cancel_timer;
+
+ MonCommand(MonClient& monc, uint64_t t, std::unique_ptr<CommandCompletion> onfinish)
+ : tid(t), onfinish(std::move(onfinish)) {
+ auto timeout = ceph::maybe_timespan(monc.cct->_conf->rados_mon_op_timeout);
+ if (timeout) {
+ cancel_timer.emplace(monc.service, *timeout);
+ cancel_timer->async_wait(
+ [this, &monc](boost::system::error_code ec) {
+ if (ec)
+ return;
+ std::scoped_lock l(monc.monc_lock);
+ monc._cancel_mon_command(tid);
+ });
+ }
+ }
bool is_tell() const {
return target_name.size() || target_rank >= 0;
}
};
+ friend MonCommand;
std::map<uint64_t,MonCommand*> mon_commands;
void _send_command(MonCommand *r);
void _check_tell_commands();
void _resend_mon_commands();
int _cancel_mon_command(uint64_t tid);
- void _finish_command(MonCommand *r, int ret, std::string rs);
+ void _finish_command(MonCommand *r, boost::system::error_code ret, std::string_view rs,
+ bufferlist&& bl);
void _finish_auth();
void handle_mon_command_ack(MMonCommandAck *ack);
void handle_command_reply(MCommandReply *reply);
public:
- void start_mon_command(const std::vector<std::string>& cmd, const ceph::buffer::list& inbl,
- ceph::buffer::list *outbl, std::string *outs,
- Context *onfinish);
+ template<typename CompletionToken>
+ auto start_mon_command(const std::vector<std::string>& cmd,
+ const ceph::buffer::list& inbl,
+ CompletionToken&& token) {
+ ldout(cct,10) << __func__ << " cmd=" << cmd << dendl;
+ boost::asio::async_completion<CompletionToken, CommandSig> init(token);
+ {
+ std::scoped_lock l(monc_lock);
+ auto h = CommandCompletion::create(service.get_executor(),
+ std::move(init.completion_handler));
+ if (!initialized || stopping) {
+ ceph::async::post(std::move(h), monc_errc::shutting_down, std::string{},
+ bufferlist{});
+ } else {
+ auto r = new MonCommand(*this, ++last_mon_command_tid, std::move(h));
+ r->cmd = cmd;
+ r->inbl = inbl;
+ mon_commands.emplace(r->tid, r);
+ _send_command(r);
+ }
+ }
+ return init.result.get();
+ }
+
+ template<typename CompletionToken>
+ auto start_mon_command(int mon_rank, const std::vector<std::string>& cmd,
+ const ceph::buffer::list& inbl, CompletionToken&& token) {
+ ldout(cct,10) << __func__ << " cmd=" << cmd << dendl;
+ boost::asio::async_completion<CompletionToken, CommandSig> init(token);
+ {
+ std::scoped_lock l(monc_lock);
+ auto h = CommandCompletion::create(service.get_executor(),
+ std::move(init.completion_handler));
+ if (!initialized || stopping) {
+ ceph::async::post(std::move(h), monc_errc::shutting_down, std::string{},
+ bufferlist{});
+ } else {
+ auto r = new MonCommand(*this, ++last_mon_command_tid, std::move(h));
+ r->target_rank = mon_rank;
+ r->cmd = cmd;
+ r->inbl = inbl;
+ mon_commands.emplace(r->tid, r);
+ _send_command(r);
+ }
+ }
+ return init.result.get();
+ }
+
+ template<typename CompletionToken>
+ auto start_mon_command(const std::string& mon_name,
+ const std::vector<std::string>& cmd,
+ const ceph::buffer::list& inbl,
+ CompletionToken&& token) {
+ ldout(cct,10) << __func__ << " cmd=" << cmd << dendl;
+ boost::asio::async_completion<CompletionToken, CommandSig> init(token);
+ {
+ std::scoped_lock l(monc_lock);
+ auto h = CommandCompletion::create(service.get_executor(),
+ std::move(init.completion_handler));
+ if (!initialized || stopping) {
+ ceph::async::post(std::move(h), monc_errc::shutting_down, std::string{},
+ bufferlist{});
+ } else {
+ auto r = new MonCommand(*this, ++last_mon_command_tid, std::move(h));
+ // detect/tolerate mon *rank* passed as a string
+ std::string err;
+ int rank = strict_strtoll(mon_name.c_str(), 10, &err);
+ if (err.size() == 0 && rank >= 0) {
+ ldout(cct,10) << __func__ << " interpreting name '" << mon_name
+ << "' as rank " << rank << dendl;
+ r->target_rank = rank;
+ } else {
+ r->target_name = mon_name;
+ }
+ r->cmd = cmd;
+ r->inbl = inbl;
+ mon_commands.emplace(r->tid, r);
+ _send_command(r);
+ }
+ }
+ return init.result.get();
+ }
+
+ class ContextVerter {
+ std::string* outs;
+ ceph::bufferlist* outbl;
+ Context* onfinish;
+
+ public:
+ ContextVerter(std::string* outs, ceph::bufferlist* outbl, Context* onfinish)
+ : outs(outs), outbl(outbl), onfinish(onfinish) {}
+ ~ContextVerter() = default;
+ ContextVerter(const ContextVerter&) = default;
+ ContextVerter& operator =(const ContextVerter&) = default;
+ ContextVerter(ContextVerter&&) = default;
+ ContextVerter& operator =(ContextVerter&&) = default;
+
+ void operator()(boost::system::error_code e,
+ std::string s,
+ ceph::bufferlist bl) {
+ if (outs)
+ *outs = std::move(s);
+ if (outbl)
+ *outbl = std::move(bl);
+ if (onfinish)
+ onfinish->complete(ceph::from_error_code(e));
+ }
+ };
+
+ void start_mon_command(const vector<string>& cmd, const bufferlist& inbl,
+ bufferlist *outbl, string *outs,
+ Context *onfinish) {
+ start_mon_command(cmd, inbl, ContextVerter(outs, outbl, onfinish));
+ }
void start_mon_command(int mon_rank,
- const std::vector<std::string>& cmd, const ceph::buffer::list& inbl,
- ceph::buffer::list *outbl, std::string *outs,
- Context *onfinish);
- void start_mon_command(const std::string &mon_name, ///< mon name, with mon. prefix
- const std::vector<std::string>& cmd, const ceph::buffer::list& inbl,
- ceph::buffer::list *outbl, std::string *outs,
- Context *onfinish);
+ const vector<string>& cmd, const bufferlist& inbl,
+ bufferlist *outbl, string *outs,
+ Context *onfinish) {
+ start_mon_command(mon_rank, cmd, inbl, ContextVerter(outs, outbl, onfinish));
+ }
+ void start_mon_command(const string &mon_name, ///< mon name, with mon. prefix
+ const vector<string>& cmd, const bufferlist& inbl,
+ bufferlist *outbl, string *outs,
+ Context *onfinish) {
+ start_mon_command(mon_name, cmd, inbl, ContextVerter(outs, outbl, onfinish));
+ }
+
// version requests
public:
/**
* get latest known version(s) of cluster map
*
- * @param map std::string name of map (e.g., 'osdmap')
- * @param newest pointer where newest map version will be stored
- * @param oldest pointer where oldest map version will be stored
- * @param onfinish context that will be triggered on completion
- * @return (via context) 0 on success, -EAGAIN if we need to resubmit our request
+ * @param map string name of map (e.g., 'osdmap')
+ * @param token context that will be triggered on completion
+ * @return (via Completion) {} on success,
+ * boost::system::errc::resource_unavailable_try_again if we need to
+ * resubmit our request
*/
- void get_version(std::string map, version_t *newest, version_t *oldest, Context *onfinish);
+ template<typename CompletionToken>
+ auto get_version(std::string&& map, CompletionToken&& token) {
+ boost::asio::async_completion<CompletionToken, VersionSig> init(token);
+ {
+ std::scoped_lock l(monc_lock);
+ auto m = ceph::make_message<MMonGetVersion>();
+ m->what = std::move(map);
+ m->handle = ++version_req_id;
+ version_requests.emplace(m->handle,
+ VersionCompletion::create(
+ service.get_executor(),
+ std::move(init.completion_handler)));
+ _send_mon_message(m);
+ }
+ return init.result.get();
+ }
+
/**
* Run a callback within our lock, with a reference
* to the MonMap
md_config_t::config_callback get_config_callback();
private:
- struct version_req_d {
- Context *context;
- version_t *newest, *oldest;
- version_req_d(Context *con, version_t *n, version_t *o) : context(con),newest(n), oldest(o) {}
- };
- std::map<ceph_tid_t, version_req_d*> version_requests;
+ std::map<ceph_tid_t, std::unique_ptr<VersionCompletion>> version_requests;
ceph_tid_t version_req_id;
void handle_get_version_reply(MMonGetVersionReply* m);
-
md_config_t::config_callback config_cb;
std::function<void(void)> config_notify_cb;
};
#include <cstring>
#include <map>
+#include "common/async/context_pool.h"
#include "common/ceph_context.h"
#include "common/ceph_argparse.h"
#include "global/global_init.h"
conf.parse_env(cct->get_module_type()); // environment variables override
conf.apply_changes(nullptr);
- MonClient monc = MonClient(cct.get());
+ ceph::async::io_context_pool ioc(1);
+ MonClient monc = MonClient(cct.get(), ioc);
err = monc.build_initial_monmap();
if (err)
goto scrape_keyring;
return compat;
}
-OSDService::OSDService(OSD *osd) :
+OSDService::OSDService(OSD *osd, ceph::async::io_context_pool& poolctx) :
osd(osd),
cct(osd->cct),
whoami(osd->whoami), store(osd->store),
last_recalibrate(ceph_clock_now()),
promote_max_objects(0),
promote_max_bytes(0),
+ poolctx(poolctx),
objecter(make_unique<Objecter>(osd->client_messenger->cct,
osd->objecter_messenger,
osd->monc, nullptr, 0, 0)),
Messenger *hb_back_serverm,
Messenger *osdc_messenger,
MonClient *mc,
- const std::string &dev, const std::string &jdev) :
+ const std::string &dev, const std::string &jdev,
+ ceph::async::io_context_pool& poolctx) :
Dispatcher(cct_),
tick_timer(cct, osd_lock),
tick_timer_without_osd_lock(cct, tick_timer_lock),
up_thru_wanted(0),
requested_full_first(0),
requested_full_last(0),
- service(this)
+ service(this, poolctx)
{
if (!gss_ktfile_client.empty()) {
return true;
}
-struct C_OSD_GetVersion : public Context {
+struct CB_OSD_GetVersion {
OSD *osd;
- uint64_t oldest, newest;
- explicit C_OSD_GetVersion(OSD *o) : osd(o), oldest(0), newest(0) {}
- void finish(int r) override {
- if (r >= 0)
+ explicit CB_OSD_GetVersion(OSD *o) : osd(o) {}
+ void operator ()(boost::system::error_code ec, version_t newest,
+ version_t oldest) {
+ if (!ec)
osd->_got_mon_epochs(oldest, newest);
}
};
set_state(STATE_PREBOOT);
dout(10) << "start_boot - have maps " << superblock.oldest_map
<< ".." << superblock.newest_map << dendl;
- C_OSD_GetVersion *c = new C_OSD_GetVersion(this);
- monc->get_version("osdmap", &c->newest, &c->oldest, c);
+ monc->get_version("osdmap", CB_OSD_GetVersion(this));
}
void OSD::_got_mon_epochs(epoch_t oldest, epoch_t newest)
dout(0) << __func__ << ": scrub interval change" << dendl;
}
check_config();
+ if (changed.count("osd_asio_thread_count")) {
+ service.poolctx.stop();
+ service.poolctx.start(conf.get_val<std::uint64_t>("osd_asio_thread_count"));
+ }
}
void OSD::update_log_config()
#include "msg/Dispatcher.h"
+#include "common/async/context_pool.h"
#include "common/Timer.h"
#include "common/WorkQueue.h"
#include "common/AsyncReserver.h"
}
// -- Objecter, for tiering reads/writes from/to other OSDs --
+ ceph::async::io_context_pool& poolctx;
std::unique_ptr<Objecter> objecter;
int m_objecter_finishers;
std::vector<std::unique_ptr<Finisher>> objecter_finishers;
void dump_live_pgids();
#endif
- explicit OSDService(OSD *osd);
+ explicit OSDService(OSD *osd, ceph::async::io_context_pool& poolctx);
~OSDService() = default;
};
void send_full_update();
- friend struct C_OSD_GetVersion;
+ friend struct CB_OSD_GetVersion;
// -- alive --
epoch_t up_thru_wanted;
Messenger *hb_front_server,
Messenger *hb_back_server,
Messenger *osdc_messenger,
- MonClient *mc, const std::string &dev, const std::string &jdev);
+ MonClient *mc, const std::string &dev, const std::string &jdev,
+ ceph::async::io_context_pool& poolctx);
~OSD() override;
// static bits
#include "messages/MWatchNotify.h"
+#include "common/Cond.h"
#include "common/config.h"
#include "common/perf_counters.h"
#include "common/scrub_types.h"
// op pool check
-void Objecter::C_Op_Map_Latest::finish(int r)
+void Objecter::CB_Op_Map_Latest::operator()(boost::system::error_code e,
+ version_t latest, version_t)
{
- if (r == -EAGAIN || r == -ECANCELED)
+ if (e == boost::system::errc::resource_unavailable_try_again ||
+ e == boost::system::errc::operation_canceled)
return;
lgeneric_subdout(objecter->cct, objecter, 10)
- << "op_map_latest r=" << r << " tid=" << tid
+ << "op_map_latest r=" << e << " tid=" << tid
<< " latest " << latest << dendl;
Objecter::unique_lock wl(objecter->rwlock);
if (check_latest_map_ops.count(op->tid) == 0) {
op->get();
check_latest_map_ops[op->tid] = op;
- C_Op_Map_Latest *c = new C_Op_Map_Latest(this, op->tid);
- monc->get_version("osdmap", &c->latest, NULL, c);
+ monc->get_version("osdmap", CB_Op_Map_Latest(this, op->tid));
}
}
// linger pool check
-void Objecter::C_Linger_Map_Latest::finish(int r)
+void Objecter::CB_Linger_Map_Latest::operator()(boost::system::error_code e,
+ version_t latest,
+ version_t)
{
- if (r == -EAGAIN || r == -ECANCELED) {
+ if (e == boost::system::errc::resource_unavailable_try_again ||
+ e == boost::system::errc::operation_canceled) {
// ignore callback; we will retry in resend_mon_ops()
return;
}
if (check_latest_map_lingers.count(op->linger_id) == 0) {
op->get();
check_latest_map_lingers[op->linger_id] = op;
- C_Linger_Map_Latest *c = new C_Linger_Map_Latest(this, op->linger_id);
- monc->get_version("osdmap", &c->latest, NULL, c);
+ monc->get_version("osdmap", CB_Linger_Map_Latest(this, op->linger_id));
}
}
// command pool check
-void Objecter::C_Command_Map_Latest::finish(int r)
+void Objecter::CB_Command_Map_Latest::operator()(boost::system::error_code e,
+ version_t latest, version_t)
{
- if (r == -EAGAIN || r == -ECANCELED) {
+ if (e == boost::system::errc::resource_unavailable_try_again ||
+ e == boost::system::errc::operation_canceled) {
// ignore callback; we will retry in resend_mon_ops()
return;
}
if (check_latest_map_commands.count(c->tid) == 0) {
c->get();
check_latest_map_commands[c->tid] = c;
- C_Command_Map_Latest *f = new C_Command_Map_Latest(this, c->tid);
- monc->get_version("osdmap", &f->latest, NULL, f);
+ monc->get_version("osdmap", CB_Command_Map_Latest(this, c->tid));
}
}
cond.wait(mlock, [&done] { return done; });
}
-struct C_Objecter_GetVersion : public Context {
+struct CB_Objecter_GetVersion {
Objecter *objecter;
- uint64_t oldest, newest;
Context *fin;
- C_Objecter_GetVersion(Objecter *o, Context *c)
- : objecter(o), oldest(0), newest(0), fin(c) {}
- void finish(int r) override {
- if (r >= 0) {
+ CB_Objecter_GetVersion(Objecter *o, Context *c) : objecter(o), fin(c) {}
+ void operator()(boost::system::error_code e, version_t newest, version_t oldest) {
+ if (!e) {
objecter->get_latest_version(oldest, newest, fin);
- } else if (r == -EAGAIN) { // try again as instructed
+ } else if (e == boost::system::errc::resource_unavailable_try_again) {
+ // try again as instructed
objecter->wait_for_latest_osdmap(fin);
} else {
// it doesn't return any other error codes!
void Objecter::wait_for_latest_osdmap(Context *fin)
{
ldout(cct, 10) << __func__ << dendl;
- C_Objecter_GetVersion *c = new C_Objecter_GetVersion(this, fin);
- monc->get_version("osdmap", &c->newest, &c->oldest, c);
+ monc->get_version("osdmap", CB_Objecter_GetVersion(this, fin));
}
void Objecter::get_latest_version(epoch_t oldest, epoch_t newest, Context *fin)
for (map<ceph_tid_t, Op*>::iterator p = check_latest_map_ops.begin();
p != check_latest_map_ops.end();
++p) {
- C_Op_Map_Latest *c = new C_Op_Map_Latest(this, p->second->tid);
- monc->get_version("osdmap", &c->latest, NULL, c);
+ monc->get_version("osdmap", CB_Op_Map_Latest(this, p->second->tid));
}
for (map<uint64_t, LingerOp*>::iterator p = check_latest_map_lingers.begin();
p != check_latest_map_lingers.end();
++p) {
- C_Linger_Map_Latest *c
- = new C_Linger_Map_Latest(this, p->second->linger_id);
- monc->get_version("osdmap", &c->latest, NULL, c);
+ monc->get_version("osdmap", CB_Linger_Map_Latest(this, p->second->linger_id));
}
for (map<uint64_t, CommandOp*>::iterator p
= check_latest_map_commands.begin();
p != check_latest_map_commands.end();
++p) {
- C_Command_Map_Latest *c = new C_Command_Map_Latest(this, p->second->tid);
- monc->get_version("osdmap", &c->latest, NULL, c);
+ monc->get_version("osdmap", CB_Command_Map_Latest(this, p->second->tid));
}
}
}
};
- struct C_Op_Map_Latest : public Context {
+ struct CB_Op_Map_Latest {
Objecter *objecter;
ceph_tid_t tid;
- version_t latest;
- C_Op_Map_Latest(Objecter *o, ceph_tid_t t) : objecter(o), tid(t),
- latest(0) {}
- void finish(int r) override;
+ CB_Op_Map_Latest(Objecter *o, ceph_tid_t t) : objecter(o), tid(t) {}
+ void operator()(boost::system::error_code err, version_t latest, version_t);
};
- struct C_Command_Map_Latest : public Context {
+ struct CB_Command_Map_Latest {
Objecter *objecter;
uint64_t tid;
- version_t latest;
- C_Command_Map_Latest(Objecter *o, ceph_tid_t t) : objecter(o), tid(t),
- latest(0) {}
- void finish(int r) override;
+ CB_Command_Map_Latest(Objecter *o, ceph_tid_t t) : objecter(o), tid(t) {}
+ void operator()(boost::system::error_code err, version_t latest, version_t);
};
struct C_Stat : public Context {
}
};
- struct C_Linger_Map_Latest : public Context {
+ struct CB_Linger_Map_Latest {
Objecter *objecter;
uint64_t linger_id;
- version_t latest;
- C_Linger_Map_Latest(Objecter *o, uint64_t id) :
- objecter(o), linger_id(id), latest(0) {}
- void finish(int r) override;
+ CB_Linger_Map_Latest(Objecter *o, uint64_t id) : objecter(o), linger_id(id) {}
+ void operator()(boost::system::error_code err, version_t latest, version_t);
};
// -- osd sessions --
#include "global/global_init.h"
#include "global/global_context.h"
+#include "common/async/context_pool.h"
#include "common/ceph_argparse.h"
#include "common/version.h"
#include "common/dout.h"
{
protected:
CephContext *cct;
+ ceph::async::io_context_pool poolctx;
Messenger *msg;
MonClient monc;
explicit MonClientHelper(CephContext *cct_)
: Dispatcher(cct_),
cct(cct_),
+ poolctx(1),
msg(NULL),
- monc(cct_)
+ monc(cct_, poolctx)
{ }
#include "mon/MonClient.h"
#include "msg/Dispatcher.h"
#include "msg/Messenger.h"
+#include "common/async/context_pool.h"
#include "common/Timer.h"
#include "common/ceph_argparse.h"
#include "global/global_init.h"
{
protected:
MessengerRef messenger;
+ ceph::async::io_context_pool poolctx;
MonClient monc;
ceph::mutex lock;
monc.shutdown();
timer.shutdown();
messenger->shutdown();
+ poolctx.finish();
return 0;
}
TestStub(CephContext *cct, string who)
: Dispatcher(cct),
- monc(cct),
+ monc(cct, poolctx),
lock(ceph::make_mutex(who.append("::lock"))),
timer(cct, lock),
do_shutdown(false),
int init() override {
int err;
+ poolctx.start(1);
err = monc.build_initial_monmap();
if (err < 0) {
derr << "ClientStub::" << __func__ << " ERROR: build initial monmap: "
#include <stdio.h>
#include <signal.h>
#include <gtest/gtest.h>
+#include "common/async/context_pool.h"
#include "osd/OSD.h"
#include "os/ObjectStore.h"
#include "mon/MonClient.h"
Messenger *hb_front_server,
Messenger *hb_back_server,
Messenger *osdc_messenger,
- MonClient *mc, const std::string &dev, const std::string &jdev) :
- OSD(cct_, store_, id, internal, external, hb_front_client, hb_back_client, hb_front_server, hb_back_server, osdc_messenger, mc, dev, jdev)
+ MonClient *mc, const std::string &dev, const std::string &jdev,
+ ceph::async::io_context_pool& ictx) :
+ OSD(cct_, store_, id, internal, external, hb_front_client, hb_back_client, hb_front_server, hb_back_server, osdc_messenger, mc, dev, jdev, ictx)
{
}
};
TEST(TestOSDScrub, scrub_time_permit) {
+ ceph::async::io_context_pool icp(1);
ObjectStore *store = ObjectStore::create(g_ceph_context,
g_conf()->osd_objectstore,
g_conf()->osd_data,
ms->set_cluster_protocol(CEPH_OSD_PROTOCOL);
ms->set_default_policy(Messenger::Policy::stateless_server(0));
ms->bind(g_conf()->public_addr);
- MonClient mc(g_ceph_context);
+ MonClient mc(g_ceph_context, icp);
mc.build_initial_monmap();
- TestOSDScrub* osd = new TestOSDScrub(g_ceph_context, store, 0, ms, ms, ms, ms, ms, ms, ms, &mc, "", "");
+ TestOSDScrub* osd = new TestOSDScrub(g_ceph_context, store, 0, ms, ms, ms, ms, ms, ms, ms, &mc, "", "", icp);
g_ceph_context->_conf.set_val("osd_scrub_begin_hour", "0");
g_ceph_context->_conf.set_val("osd_scrub_end_hour", "24");
waiting_for_mds_map(NULL),
inited(false)
{
- monc = new MonClient(g_ceph_context);
+ monc = new MonClient(g_ceph_context, poolctx);
messenger = Messenger::create_client_messenger(g_ceph_context, "mds");
fsmap = new FSMap();
objecter = new Objecter(g_ceph_context, messenger, monc, NULL, 0, 0);
if (r < 0)
return r;
+ poolctx.start(1);
messenger->start();
objecter->set_client_incarnation(0);
monc->shutdown();
messenger->shutdown();
messenger->wait();
+ poolctx.finish();
}
#include "msg/Dispatcher.h"
#include "msg/Messenger.h"
#include "auth/Auth.h"
+#include "common/async/context_pool.h"
#include "common/Finisher.h"
#include "common/Timer.h"
ceph::mutex lock = ceph::make_mutex("MDSUtility::lock");
Finisher finisher;
+ ceph::async::io_context_pool poolctx;
Context *waiting_for_mds_map;