StandaloneClient::StandaloneClient(Messenger *m, MonClient *mc)
- : Client(m, mc, new Objecter(m->cct, m, mc, NULL, 0, 0))
+ : Client(m, mc, new Objecter(m->cct, m, mc, nullptr))
{
monclient->set_messenger(m);
objecter->set_client_incarnation(0);
OPTION(fio_dir, OPT_STR) // fio data directory for fio-objectstore
-OPTION(rados_mon_op_timeout, OPT_DOUBLE) // how many seconds to wait for a response from the monitor before returning an error from a rados operation. 0 means no limit.
-OPTION(rados_osd_op_timeout, OPT_DOUBLE) // how many seconds to wait for a response from osds before returning an error from a rados operation. 0 means no limit.
OPTION(rados_tracing, OPT_BOOL) // true if LTTng-UST tracepoints should be enabled
.set_default("/tmp/fio")
.set_description(""),
- Option("rados_mon_op_timeout", Option::TYPE_FLOAT, Option::LEVEL_ADVANCED)
+ Option("rados_mon_op_timeout", Option::TYPE_SECS, Option::LEVEL_ADVANCED)
.set_default(0)
- .set_description(""),
+ .set_description("timeout for operations handled by monitors such as statfs (0 is unlimited)")
+ .set_flag(Option::FLAG_RUNTIME)
+ .set_min(0),
- Option("rados_osd_op_timeout", Option::TYPE_FLOAT, Option::LEVEL_ADVANCED)
+ Option("rados_osd_op_timeout", Option::TYPE_SECS, Option::LEVEL_ADVANCED)
.set_default(0)
- .set_description(""),
+ .set_description("timeout for operations handled by osds such as write (0 is unlimited)")
+ .set_flag(Option::FLAG_RUNTIME)
+ .set_min(0),
Option("rados_tracing", Option::TYPE_BOOL, Option::LEVEL_ADVANCED)
.set_default(false)
log_last_version(0), log_cb(NULL), log_cb2(NULL), log_cb_arg(NULL),
finisher(cct, "radosclient", "fn-radosclient")
{
+ conf.add_observer(this);
}
int64_t librados::RadosClient::lookup_pool(const char *name)
ldout(cct, 1) << "starting objecter" << dendl;
- objecter = new (std::nothrow) Objecter(cct, messenger, &monclient,
- &finisher,
- cct->_conf->rados_mon_op_timeout,
- cct->_conf->rados_osd_op_timeout);
+ objecter = new (std::nothrow) Objecter(cct, messenger, &monclient, &finisher);
if (!objecter)
goto out;
objecter->set_balanced_budget();
librados::RadosClient::~RadosClient()
{
+ conf.remove_observer(this);
if (messenger)
delete messenger;
if (objecter)
if (need_map) {
std::unique_lock l(lock);
- ceph::timespan timeout{0};
- if (cct->_conf->rados_mon_op_timeout > 0) {
- timeout = ceph::make_timespan(cct->_conf->rados_mon_op_timeout);
- }
-
+ ceph::timespan timeout = rados_mon_op_timeout;
if (objecter->with_osdmap(std::mem_fn(&OSDMap::get_epoch)) == 0) {
ldout(cct, 10) << __func__ << " waiting" << dendl;
while (objecter->with_osdmap(std::mem_fn(&OSDMap::get_epoch)) == 0) {
return r;
lock.unlock();
- if (conf->rados_mon_op_timeout) {
- r = cond.wait_for(conf->rados_mon_op_timeout);
+ if (rados_mon_op_timeout.count() > 0) {
+ r = cond.wait_for(rados_mon_op_timeout);
} else {
r = cond.wait();
}
return r;
lock.unlock();
- if (conf->rados_mon_op_timeout) {
- r = cond.wait_for(conf->rados_mon_op_timeout);
+ if (rados_mon_op_timeout.count() > 0) {
+ r = cond.wait_for(rados_mon_op_timeout);
} else {
r = cond.wait();
}
}
return 0;
}
+
+const char** librados::RadosClient::get_tracked_conf_keys() const
+{
+ static const char *config_keys[] = {
+ "rados_mon_op_timeout",
+ nullptr
+ };
+ return config_keys;
+}
+
+void librados::RadosClient::handle_conf_change(const ConfigProxy& conf,
+ const std::set<std::string> &changed)
+{
+ if (changed.count("rados_mon_op_timeout")) {
+ rados_mon_op_timeout = conf.get_val<std::chrono::seconds>("rados_mon_op_timeout");
+ }
+}
#include "common/Timer.h"
#include "common/ceph_mutex.h"
#include "common/ceph_time.h"
+#include "common/config_obs.h"
#include "include/common_fwd.h"
#include "include/rados/librados.h"
#include "include/rados/librados.hpp"
class Messenger;
class AioCompletionImpl;
-class librados::RadosClient : public Dispatcher
+class librados::RadosClient : public Dispatcher, public md_config_obs_t
{
std::unique_ptr<CephContext,
std::function<void(CephContext*)> > cct_deleter;
public:
using Dispatcher::cct;
- const ConfigProxy& conf;
+ ConfigProxy& conf{cct->_conf};
private:
enum {
DISCONNECTED,
bool service_daemon = false;
string daemon_name, service_name;
map<string,string> daemon_metadata;
+ ceph::timespan rados_mon_op_timeout{};
int wait_for_osdmap();
mon_feature_t get_required_monitor_features() const;
int get_inconsistent_pgs(int64_t pool_id, std::vector<std::string>* pgs);
+
+ const char** get_tracked_conf_keys() const override;
+ void handle_conf_change(const ConfigProxy&, const std::set<std::string>&) override;
};
#endif
Context *suicide_hook_) :
cct(msgr->cct), mds_lock(mds_lock_), clog(clog_),
timer(timer_), mdsmap(mdsmap_),
- objecter(new Objecter(g_ceph_context, msgr, monc_, nullptr, 0, 0)),
+ objecter(new Objecter(g_ceph_context, msgr, monc_, nullptr)),
damage_table(whoami_), sessionmap(this),
op_tracker(g_ceph_context, g_conf()->mds_enable_op_tracker,
g_conf()->osd_num_op_tracker_shard),
"mgr",
Messenger::get_pid_nonce(),
0)),
- objecter{g_ceph_context, client_messenger.get(), &monc, NULL, 0, 0},
+ objecter{g_ceph_context, client_messenger.get(), &monc, NULL},
client{client_messenger.get(), &monc, &objecter},
mgrc(g_ceph_context, client_messenger.get(), &monc.monmap),
log_client(g_ceph_context, client_messenger.get(), &monc.monmap, LogClient::NO_FLAGS),
r->poutbl = outbl;
r->prs = outs;
r->onfinish = onfinish;
- if (cct->_conf->rados_mon_op_timeout > 0) {
+ auto timeout = cct->_conf.get_val<std::chrono::seconds>("rados_mon_op_timeout");
+ if (timeout.count() > 0) {
class C_CancelMonCommand : public Context
{
uint64_t tid;
}
};
r->ontimeout = new C_CancelMonCommand(r->tid, this);
- timer.add_event_after(cct->_conf->rados_mon_op_timeout, r->ontimeout);
+ timer.add_event_after(static_cast<double>(timeout.count()), r->ontimeout);
}
mon_commands[r->tid] = r;
_send_command(r);
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2012 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#include <boost/system/system_error.hpp>
+
+#include "common/common_init.h"
+
+#include "global/global_init.h"
+
+#include "RADOSImpl.h"
+
+namespace neorados {
+namespace detail {
+
+RADOS::RADOS(boost::asio::io_context& ioctx,
+ boost::intrusive_ptr<CephContext> cct)
+ : Dispatcher(cct.get()),
+ ioctx(ioctx),
+ cct(cct),
+ monclient(cct.get(), ioctx),
+ mgrclient(cct.get(), nullptr, &monclient.monmap) {
+ auto err = monclient.build_initial_monmap();
+ if (err < 0)
+ throw std::system_error(ceph::to_error_code(err));
+
+ messenger.reset(Messenger::create_client_messenger(cct.get(), "radosclient"));
+ if (!messenger)
+ throw std::bad_alloc();
+
+ // Require OSDREPLYMUX feature. This means we will fail to talk to
+ // old servers. This is necessary because otherwise we won't know
+ // how to decompose the reply data into its constituent pieces.
+ messenger->set_default_policy(
+ Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX));
+
+ objecter = std::make_unique<Objecter>(cct.get(), messenger.get(), &monclient, ioctx);
+
+ objecter->set_balanced_budget();
+ monclient.set_messenger(messenger.get());
+ mgrclient.set_messenger(messenger.get());
+ objecter->init();
+ messenger->add_dispatcher_head(&mgrclient);
+ messenger->add_dispatcher_tail(objecter.get());
+ messenger->start();
+ monclient.set_want_keys(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MGR);
+ err = monclient.init();
+ if (err) {
+ throw boost::system::system_error(ceph::to_error_code(err));
+ }
+ err = monclient.authenticate(cct->_conf->client_mount_timeout);
+ if (err) {
+ throw boost::system::system_error(ceph::to_error_code(err));
+ }
+ messenger->set_myname(entity_name_t::CLIENT(monclient.get_global_id()));
+ // Detect older cluster, put mgrclient into compatible mode
+ mgrclient.set_mgr_optional(
+ !get_required_monitor_features().contains_all(
+ ceph::features::mon::FEATURE_LUMINOUS));
+
+ // MgrClient needs this (it doesn't have MonClient reference itself)
+ monclient.sub_want("mgrmap", 0, 0);
+ monclient.renew_subs();
+
+ mgrclient.init();
+ objecter->set_client_incarnation(0);
+ objecter->start();
+
+ messenger->add_dispatcher_tail(this);
+
+ std::unique_lock l(lock);
+ instance_id = monclient.get_global_id();
+}
+
+RADOS::~RADOS() {
+ if (objecter && objecter->initialized) {
+ objecter->shutdown();
+ }
+
+ mgrclient.shutdown();
+ monclient.shutdown();
+
+ if (messenger) {
+ messenger->shutdown();
+ messenger->wait();
+ }
+}
+
+bool RADOS::ms_dispatch(Message *m)
+{
+ switch (m->get_type()) {
+ // OSD
+ case CEPH_MSG_OSD_MAP:
+ m->put();
+ return true;
+ }
+ return false;
+}
+
+void RADOS::ms_handle_connect(Connection *con) {}
+bool RADOS::ms_handle_reset(Connection *con) {
+ return false;
+}
+void RADOS::ms_handle_remote_reset(Connection *con) {}
+bool RADOS::ms_handle_refused(Connection *con) {
+ return false;
+}
+
+} // namespace detail
+} // namespace neorados
promote_max_bytes(0),
objecter(make_unique<Objecter>(osd->client_messenger->cct,
osd->objecter_messenger,
- osd->monc, nullptr, 0, 0)),
+ osd->monc, nullptr)),
m_objecter_finishers(cct->_conf->osd_objecter_finishers),
watch_timer(osd->client_messenger->cct, watch_lock),
next_notif_id(0),
// config obs ----------------------------
-static const char *config_keys[] = {
- "crush_location",
- NULL
-};
-
class Objecter::RequestStateHook : public AdminSocketHook {
Objecter *m_objecter;
public:
const char** Objecter::get_tracked_conf_keys() const
{
+ static const char *config_keys[] = {
+ "crush_location",
+ "rados_mon_op_timeout",
+ "rados_osd_op_timeout",
+ NULL
+ };
return config_keys;
}
if (changed.count("crush_location")) {
update_crush_location();
}
+ if (changed.count("rados_mon_op_timeout")) {
+ mon_timeout = conf.get_val<std::chrono::seconds>("rados_mon_op_timeout");
+ }
+ if (changed.count("rados_osd_op_timeout")) {
+ osd_timeout = conf.get_val<std::chrono::seconds>("rados_osd_op_timeout");
+ }
}
void Objecter::update_crush_location()
}
Objecter::Objecter(CephContext *cct_, Messenger *m, MonClient *mc,
- Finisher *fin,
- double mon_timeout,
- double osd_timeout) :
+ Finisher *fin) :
Dispatcher(cct_), messenger(m), monc(mc), finisher(fin),
trace_endpoint("0.0.0.0", 0, "Objecter"),
osdmap{std::make_unique<OSDMap>()},
homeless_session(new OSDSession(cct, -1)),
- mon_timeout(ceph::make_timespan(mon_timeout)),
- osd_timeout(ceph::make_timespan(osd_timeout)),
op_throttle_bytes(cct, "objecter_bytes",
cct->_conf->objecter_inflight_op_bytes),
op_throttle_ops(cct, "objecter_ops", cct->_conf->objecter_inflight_ops),
retry_writes_after_first_reply(cct->_conf->objecter_retry_writes_after_first_reply)
-{}
+{
+ mon_timeout = cct->_conf.get_val<std::chrono::seconds>("rados_mon_op_timeout");
+ osd_timeout = cct->_conf.get_val<std::chrono::seconds>("rados_osd_op_timeout");
+}
Objecter::~Objecter()
{
public:
Objecter(CephContext *cct_, Messenger *m, MonClient *mc,
- Finisher *fin,
- double mon_timeout,
- double osd_timeout);
+ Finisher *fin);
~Objecter() override;
void init();
ASSERT_EQ(0, rados_create(&cluster, "admin"));
ASSERT_EQ(0, rados_conf_read_file(cluster, NULL));
ASSERT_EQ(0, rados_conf_parse_env(cluster, NULL));
- ASSERT_EQ(0, rados_conf_set(cluster, "rados_osd_op_timeout", "0.00001")); // use any small value that will result in a timeout
+ ASSERT_EQ(0, rados_conf_set(cluster, "rados_osd_op_timeout", "1")); // use any small value that will result in a timeout
+ ASSERT_EQ(0, rados_conf_set(cluster, "ms_inject_internal_delays", "2")); // create a 2 second delay
ASSERT_EQ(0, rados_connect(cluster));
ASSERT_EQ(0, rados_ioctx_create(cluster, pool_name.c_str(), &ioctx));
rados_ioctx_set_namespace(ioctx, nspace.c_str());
dout(10) << "ClientStub::" << __func__ << " starting messenger at "
<< messenger->get_myaddrs() << dendl;
- objecter.reset(new Objecter(cct, messenger.get(), &monc, NULL, 0, 0));
+ objecter.reset(new Objecter(cct, messenger.get(), &monc, NULL));
ceph_assert(objecter.get() != NULL);
objecter->set_balanced_budget();
monc = new MonClient(g_ceph_context);
messenger = Messenger::create_client_messenger(g_ceph_context, "mds");
fsmap = new FSMap();
- objecter = new Objecter(g_ceph_context, messenger, monc, NULL, 0, 0);
+ objecter = new Objecter(g_ceph_context, messenger, monc, NULL);
}