char *client_address,
uint32_t expire_seconds);
+/**
+ * Gets addresses of the RADOS session, suitable for blacklisting.
+ *
+ * @param cluster cluster handle
+ * @param addrs the output string.
+ * @returns 0 on success, negative error code on failure
+ */
+CEPH_RADOS_API int rados_getaddrs(rados_t cluster, char** addrs);
+
CEPH_RADOS_API void rados_set_osdmap_full_try(rados_ioctx_t io)
__attribute__((deprecated));
return umask;
}
+ std::string getaddrs()
+ {
+ CachedStackStringStream cos;
+ *cos << messenger->get_myaddrs();
+ return std::string(cos->strv());
+ }
+
int conf_read_file(const char *path_list)
{
int ret = cct->_conf.parse_config_files(path_list, nullptr, 0);
return 0;
}
+extern "C" int ceph_getaddrs(struct ceph_mount_info *cmount, char** addrs)
+{
+ if (!cmount->is_initialized())
+ return -ENOTCONN;
+ auto s = cmount->getaddrs();
+ *addrs = strdup(s.c_str());
+ return 0;
+}
+
extern "C" int ceph_conf_read_file(struct ceph_mount_info *cmount, const char *path)
{
return cmount->conf_read_file(path);
objecter->blacklist_self(set);
}
+std::string librados::RadosClient::get_addrs() const {
+ CachedStackStringStream cos;
+ *cos << messenger->get_myaddrs();
+ return std::string(cos->strv());
+}
+
int librados::RadosClient::blacklist_add(const string& client_address,
uint32_t expire_seconds)
{
bool put();
void blacklist_self(bool set);
+ std::string get_addrs() const;
+
int service_daemon_register(
const std::string& service, ///< service name (e.g., 'rgw')
const std::string& name, ///< daemon name (e.g., 'gwfoo')
}
LIBRADOS_C_API_BASE_DEFAULT(rados_blacklist_add);
+extern "C" int _rados_getaddrs(rados_t cluster, char** addrs)
+{
+ librados::RadosClient *radosp = (librados::RadosClient *)cluster;
+ auto s = radosp->get_addrs();
+ *addrs = strdup(s.c_str());
+ return 0;
+}
+LIBRADOS_C_API_BASE_DEFAULT(rados_getaddrs);
+
extern "C" void _rados_set_osdmap_full_try(rados_ioctx_t io)
{
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
class MMgrBeacon : public PaxosServiceMessage {
private:
- static constexpr int HEAD_VERSION = 9;
+ static constexpr int HEAD_VERSION = 10;
static constexpr int COMPAT_VERSION = 8;
protected:
map<string,string> metadata; ///< misc metadata about this osd
+ std::vector<entity_addrvec_t> clients;
+
uint64_t mgr_features = 0; ///< reporting mgr's features
public:
entity_addrvec_t server_addrs_, bool available_,
std::vector<MgrMap::ModuleInfo>&& modules_,
map<string,string>&& metadata_,
+ std::vector<entity_addrvec_t> clients,
uint64_t feat)
: PaxosServiceMessage{MSG_MGR_BEACON, 0, HEAD_VERSION, COMPAT_VERSION},
gid(gid_), server_addrs(server_addrs_), available(available_), name(name_),
fsid(fsid_), modules(std::move(modules_)), metadata(std::move(metadata_)),
+ clients(std::move(clients)),
mgr_features(feat)
{
}
return modules;
}
+ const auto& get_clients() const
+ {
+ return clients;
+ }
+
private:
~MMgrBeacon() override {}
encode(modules, payload);
encode(mgr_features, payload);
+ encode(clients, payload, features);
}
void decode_payload() override {
auto p = payload.cbegin();
if (header.version >= 9) {
decode(mgr_features, p);
}
+ if (header.version >= 10) {
+ decode(clients, p);
+ }
}
private:
template<class T, typename... Args>
clog->do_log(prio, message);
}
}
+
+void ActivePyModules::register_client(std::string_view name, std::string addrs)
+{
+ std::lock_guard l(lock);
+
+ entity_addrvec_t addrv;
+ addrv.parse(addrs.data());
+
+ dout(7) << "registering msgr client handle " << addrv << dendl;
+ py_module_registry.register_client(name, std::move(addrv));
+}
+
+void ActivePyModules::unregister_client(std::string_view name, std::string addrs)
+{
+ std::lock_guard l(lock);
+
+ entity_addrvec_t addrv;
+ addrv.parse(addrs.data());
+
+ dout(7) << "unregistering msgr client handle " << addrv << dendl;
+ py_module_registry.unregister_client(name, addrv);
+}
void clear_all_progress_events();
void get_progress_events(std::map<std::string,ProgressEvent>* events);
+ void register_client(std::string_view name, std::string addrs);
+ void unregister_client(std::string_view name, std::string addrs);
+
void config_notify();
void set_uri(const std::string& module_name, const std::string &uri);
Py_RETURN_FALSE;
}
+static PyObject*
+ceph_register_client(BaseMgrModule *self, PyObject *args)
+{
+ char *addrs = nullptr;
+ if (!PyArg_ParseTuple(args, "s:ceph_register_client", &addrs)) {
+ return nullptr;
+ }
+
+ self->py_modules->register_client(self->this_module->get_name(), addrs);
+
+ Py_RETURN_NONE;
+}
+
+static PyObject*
+ceph_unregister_client(BaseMgrModule *self, PyObject *args)
+{
+ char *addrs = nullptr;
+ if (!PyArg_ParseTuple(args, "s:ceph_unregister_client", &addrs)) {
+ return nullptr;
+ }
+
+ self->py_modules->unregister_client(self->this_module->get_name(), addrs);
+
+ Py_RETURN_NONE;
+}
+
PyMethodDef BaseMgrModule_methods[] = {
{"_ceph_get", (PyCFunction)ceph_state_get, METH_VARARGS,
"Get a cluster object"},
{"_ceph_is_authorized", (PyCFunction)ceph_is_authorized,
METH_VARARGS, "Verify the current session caps are valid"},
+ {"_ceph_register_client", (PyCFunction)ceph_register_client,
+ METH_VARARGS, "Register RADOS instance for potential blacklisting"},
+
+ {"_ceph_unregister_client", (PyCFunction)ceph_unregister_client,
+ METH_VARARGS, "Unregister RADOS instance for potential blacklisting"},
+
{NULL, NULL, 0, NULL}
};
module_info.push_back(std::move(info));
}
+ auto clients = py_module_registry.get_clients();
+ for (const auto& client : clients) {
+ dout(15) << "noting RADOS client for blacklist: " << client << dendl;
+ }
+
// Whether I think I am available (request MgrMonitor to set me
// as available in the map)
bool available = active_mgr != nullptr && active_mgr->is_initialized();
available,
std::move(module_info),
std::move(metadata),
+ std::move(clients),
CEPH_FEATURES_ALL);
if (available) {
LogChannelRef clog;
std::map<std::string, PyModuleRef> modules;
+ std::multimap<std::string, entity_addrvec_t> clients;
std::unique_ptr<ActivePyModules> active_modules;
std::unique_ptr<StandbyPyModules> standby_modules;
ceph_assert(active_modules);
return active_modules->get_services();
}
+
+ void register_client(std::string_view name, entity_addrvec_t addrs)
+ {
+ clients.emplace(std::string(name), std::move(addrs));
+ }
+ void unregister_client(std::string_view name, const entity_addrvec_t& addrs)
+ {
+ auto itp = clients.equal_range(std::string(name));
+ for (auto it = itp.first; it != itp.second; ++it) {
+ if (it->second == addrs) {
+ it = clients.erase(it);
+ }
+ }
+ }
+
+ auto get_clients() const
+ {
+ std::scoped_lock l(lock);
+ std::vector<entity_addrvec_t> v;
+ for (const auto& p : clients) {
+ v.push_back(p.second);
+ }
+ return v;
+ }
+
// <<< (end of ActivePyModules cheeky call-throughs)
};
/// features
uint64_t active_mgr_features = 0;
+ std::vector<entity_addrvec_t> clients; // for blacklist
+
std::map<uint64_t, StandbyInfo> standbys;
// Modules which are enabled
ENCODE_FINISH(bl);
return;
}
- ENCODE_START(10, 6, bl);
+ ENCODE_START(11, 6, bl);
encode(epoch, bl);
encode(active_addrs, bl, features);
encode(active_gid, bl);
encode(always_on_modules, bl);
encode(active_mgr_features, bl);
encode(last_failure_osd_epoch, bl);
+ encode(clients, bl, features);
ENCODE_FINISH(bl);
return;
}
void decode(ceph::buffer::list::const_iterator& p)
{
- DECODE_START(8, p);
+ DECODE_START(11, p);
decode(epoch, p);
decode(active_addrs, p);
decode(active_gid, p);
if (struct_v >= 10) {
decode(last_failure_osd_epoch, p);
}
+ if (struct_v >= 11) {
+ decode(clients, p);
+ }
DECODE_FINISH(p);
}
f->close_section();
}
f->dump_int("last_failure_osd_epoch", last_failure_osd_epoch);
+ f->open_array_section("active_clients");
+ for (const auto &c : clients) {
+ f->dump_object("client", c);
+ }
+ f->close_section();
f->close_section();
}
pending_map.available_modules = m->get_available_modules();
updated = true;
}
+ const auto& clients = m->get_clients();
+ if (pending_map.clients != clients) {
+ dout(4) << "active's RADOS clients " << clients
+ << " (was " << pending_map.clients << ")" << dendl;
+ pending_map.clients = clients;
+ updated = true;
+ }
} else if (pending_map.active_gid == 0) {
// There is no currently active daemon, select this one.
if (pending_map.standbys.count(m->get_gid())) {
<< pending_map.active_gid << " ("
<< pending_map.active_addrs << ")" << dendl;
auto blacklist_epoch = mon->osdmon()->blacklist(pending_map.active_addrs, until);
+
+ /* blacklist RADOS clients in use by the mgr */
+ for (const auto& a : pending_map.clients) {
+ mon->osdmon()->blacklist(a, until);
+ }
request_proposal(mon->osdmon());
pending_metadata_rm.insert(pending_map.active_name);
pending_map.available = false;
pending_map.active_addrs = entity_addrvec_t();
pending_map.services.clear();
+ pending_map.clients.clear();
pending_map.last_failure_osd_epoch = blacklist_epoch;
// So that when new active mgr subscribes to mgrdigest, it will
int ceph_init(ceph_mount_info *cmount)
void ceph_shutdown(ceph_mount_info *cmount)
+ int ceph_getaddrs(ceph_mount_info* cmount, char** addrs)
int ceph_conf_read_file(ceph_mount_info *cmount, const char *path_list)
int ceph_conf_parse_argv(ceph_mount_info *cmount, int argc, const char **argv)
int ceph_conf_get(ceph_mount_info *cmount, const char *option, char *buf, size_t len)
for key, value in conf.iteritems():
self.conf_set(key, value)
+ def get_addrs(self):
+ """
+ Get associated client addresses with this RADOS session.
+ """
+ self.require_state("mounted")
+
+ cdef:
+ char* addrs = NULL
+
+ try:
+
+ with nogil:
+ ret = ceph_getaddrs(self.cluster, &addrs)
+ if ret:
+ raise make_ex(ret, "error calling getaddrs")
+
+ return decode_cstr(addrs)
+ finally:
+ free(addrs)
+
+
def conf_read_file(self, conffile=None):
"""
Load the ceph configuration from the specified config file.
:return: None
"""
if self._rados:
+ addrs = self._rados.get_addrs()
self._rados.shutdown()
+ self._ceph_unregister_client(addrs)
def get(self, data_name):
"""
ctx_capsule = self.get_context()
self._rados = rados.Rados(context=ctx_capsule)
self._rados.connect()
-
+ self._ceph_register_client(self._rados.get_addrs())
return self._rados
@staticmethod
log.debug("CephFS mounting...")
self.fs.mount(filesystem_name=self.fs_name.encode('utf-8'))
log.debug("Connection to cephfs '{0}' complete".format(self.fs_name))
+ self.mgr._ceph_register_client(self.fs.get_addrs())
def disconnect(self):
assert self.ops_in_progress == 0
log.info("disconnecting from cephfs '{0}'".format(self.fs_name))
+ addrs = self.fs.get_addrs()
self.fs.shutdown()
+ self.mgr._ceph_unregister_client(addrs)
self.fs = None
def abort(self):
int rados_cluster_stat(rados_t cluster, rados_cluster_stat_t *result)
int rados_cluster_fsid(rados_t cluster, char *buf, size_t len)
int rados_blacklist_add(rados_t cluster, char *client_address, uint32_t expire_seconds)
+ int rados_getaddrs(rados_t cluster, char** addrs)
int rados_application_enable(rados_ioctx_t io, const char *app_name,
int force)
void rados_set_osdmap_full_try(rados_ioctx_t io)
for key, value in conf.items():
self.conf_set(key, value)
+ def get_addrs(self):
+ """
+ Get associated client addresses with this RADOS session.
+ """
+ self.require_state("configuring", "connected")
+
+ cdef:
+ char* addrs = NULL
+
+ try:
+
+ with nogil:
+ ret = rados_getaddrs(self.cluster, &addrs)
+ if ret:
+ raise make_ex(ret, "error calling getaddrs")
+
+ return decode_cstr(addrs)
+ finally:
+ free(addrs)
+
def require_state(self, *args):
"""
Checks if the Rados object is in a special state