// the changes of settings at runtime.
template<class ConfigObs>
class ObserverMgr : public ConfigTracker {
- // Maps configuration options to the observer listening for them.
- using obs_map_t = std::multimap<std::string, ConfigObs*>;
- obs_map_t observers;
-
public:
- typedef std::map<ConfigObs*, std::set<std::string>> rev_obs_map;
- typedef std::function<void(ConfigObs*, const std::string&)> config_gather_cb;
+ using config_obs_ptr = std::shared_ptr<ConfigObs*>;
+ using config_obs_wptr = std::weak_ptr<ConfigObs*>;
+ typedef std::map<config_obs_ptr, std::set<std::string>> rev_obs_map;
+ typedef std::function<void(config_obs_ptr, const std::string&)> config_gather_cb;
// Adds a new observer to this configuration. You can do this at any time,
// but it will only receive notifications for the changes that happen after
// you need to delete it yourself.
// This function will assert if you try to delete an observer that isn't
// there.
- void remove_observer(ConfigObs* observer);
+ config_obs_wptr remove_observer(ConfigObs* observer);
// invoke callback for every observers tracking keys
void for_each_observer(config_gather_cb callback);
// invoke callback for observers keys tracking the provided change set
- template<class ConfigProxyT>
- void for_each_change(const std::set<std::string>& changes,
- ConfigProxyT& proxy,
+ void for_each_change(const std::map<std::string,bool>& changes,
config_gather_cb callback, std::ostream *oss);
bool is_tracking(const std::string& name) const override;
+
+private:
+ // Maps configuration options to the observer listening for them.
+ using obs_map_t = std::multimap<std::string, config_obs_ptr>;
+ obs_map_t observers;
};
// we could put the implementations in a .cc file, and only instantiate the
void ObserverMgr<ConfigObs>::add_observer(ConfigObs* observer)
{
const char **keys = observer->get_tracked_conf_keys();
+ auto ptr = std::make_shared<ConfigObs*>(observer);
for (const char ** k = keys; *k; ++k) {
- observers.emplace(*k, observer);
+ observers.emplace(*k, ptr);
}
}
template<class ConfigObs>
-void ObserverMgr<ConfigObs>::remove_observer(ConfigObs* observer)
+typename ObserverMgr<ConfigObs>::config_obs_wptr ObserverMgr<ConfigObs>::remove_observer(ConfigObs* observer)
{
[[maybe_unused]] bool found_obs = false;
+ config_obs_ptr ptr;
for (auto o = observers.begin(); o != observers.end(); ) {
- if (o->second == observer) {
+ if (*o->second == observer) {
+ ptr = std::move(o->second);
observers.erase(o++);
found_obs = true;
} else {
}
}
ceph_assert(found_obs);
+ return config_obs_wptr(ptr);
}
template<class ConfigObs>
}
template<class ConfigObs>
-template<class ConfigProxyT>
-void ObserverMgr<ConfigObs>::for_each_change(const std::set<std::string>& changes,
- ConfigProxyT& proxy,
+void ObserverMgr<ConfigObs>::for_each_change(const std::map<std::string,bool>& changes,
config_gather_cb callback, std::ostream *oss)
{
// create the reverse observer mapping, mapping observers to the set of
// changed keys that they'll get.
std::string val;
- for (auto& key : changes) {
+ for (auto& [key, present] : changes) {
auto [first, last] = observers.equal_range(key);
- if ((oss) && !proxy.get_val(key, &val)) {
+ if ((oss) && present) {
(*oss) << key << " = '" << val << "' ";
if (first == last) {
(*oss) << "(not observed, change may require restart) ";
*/
ConfigValues values;
using md_config_obs_t = ceph::md_config_obs_impl<ConfigProxy>;
- ObserverMgr<md_config_obs_t> obs_mgr;
+ using ObsMgr = ObserverMgr<md_config_obs_t>;
+ ObsMgr obs_mgr;
md_config_t config;
/** A lock that protects the md_config_t internals. It is
* recursive, for simplicity.
* It is best if this lock comes first in the lock hierarchy. We will
* hold this lock when calling configuration observers. */
- mutable ceph::recursive_mutex lock =
- ceph::make_recursive_mutex("ConfigProxy::lock");
+ mutable ceph::mutex lock = ceph::make_mutex("ConfigProxy::lock");
+ ceph::condition_variable cond;
- class CallGate {
- private:
- uint32_t call_count = 0;
- ceph::mutex lock;
- ceph::condition_variable cond;
- public:
- CallGate()
- : lock(ceph::make_mutex("call::gate::lock")) {
- }
+ using rev_obs_map_t = ObsMgr::rev_obs_map;
- void enter() {
- std::lock_guard<ceph::mutex> locker(lock);
- ++call_count;
- }
- void leave() {
- std::lock_guard<ceph::mutex> locker(lock);
- ceph_assert(call_count > 0);
- if (--call_count == 0) {
- cond.notify_all();
- }
+ void _call_observers(rev_obs_map_t& rev_obs) {
+ ceph_assert(!ceph::mutex_debugging || !ceph_mutex_is_locked_by_me(lock));
+ for (auto& [obs, keys] : rev_obs) {
+ (*obs)->handle_conf_change(*this, keys);
}
- void close() {
- std::unique_lock<ceph::mutex> locker(lock);
- while (call_count != 0) {
- cond.wait(locker);
- }
+ rev_obs.clear(); // drop shared_ptrs
+ {
+ std::lock_guard l{lock};
+ cond.notify_all();
}
- };
-
- void call_gate_enter(md_config_obs_t *obs) {
- auto p = obs_call_gate.find(obs);
- ceph_assert(p != obs_call_gate.end());
- p->second->enter();
- }
- void call_gate_leave(md_config_obs_t *obs) {
- auto p = obs_call_gate.find(obs);
- ceph_assert(p != obs_call_gate.end());
- p->second->leave();
- }
- void call_gate_close(md_config_obs_t *obs) {
- auto p = obs_call_gate.find(obs);
- ceph_assert(p != obs_call_gate.end());
- p->second->close();
}
-
- using rev_obs_map_t = ObserverMgr<md_config_obs_t>::rev_obs_map;
- typedef std::unique_ptr<CallGate> CallGateRef;
-
- std::map<md_config_obs_t*, CallGateRef> obs_call_gate;
-
- void call_observers(std::unique_lock<ceph::recursive_mutex>& locker,
- rev_obs_map_t& rev_obs) {
- // observers are notified outside of lock
- locker.unlock();
- for (auto& [obs, keys] : rev_obs) {
- obs->handle_conf_change(*this, keys);
- }
- locker.lock();
-
- for (auto& rev_ob : rev_obs) {
- call_gate_leave(rev_ob.first);
+ void _gather_changes(std::set<std::string> &changes,
+ rev_obs_map_t *rev_obs, std::ostream* oss) {
+ ceph_assert(ceph_mutex_is_locked_by_me(lock));
+ std::map<std::string,bool> changes_present;
+ for (auto& change : changes) {
+ std::string dummy;
+ changes_present[change] = (0 == config.get_val(values, change, &dummy));
}
+ obs_mgr.for_each_change(
+ changes_present,
+ [this, rev_obs](auto obs, const std::string &key) {
+ _map_observer_changes(obs, key, rev_obs);
+ }, oss);
+ changes.clear();
}
- void map_observer_changes(md_config_obs_t *obs, const std::string &key,
+ void _map_observer_changes(ObsMgr::config_obs_ptr obs, const std::string& key,
rev_obs_map_t *rev_obs) {
- ceph_assert(ceph_mutex_is_locked(lock));
+ ceph_assert(ceph_mutex_is_locked_by_me(lock));
auto [it, new_entry] = rev_obs->emplace(obs, std::set<std::string>{});
it->second.emplace(key);
- if (new_entry) {
- // this needs to be done under lock as once this lock is
- // dropped (before calling observers) a remove_observer()
- // can sneak in and cause havoc.
- call_gate_enter(obs);
- }
}
public:
}
// for those want to reexpand special meta, e.g, $pid
void finalize_reexpand_meta() {
- std::unique_lock locker(lock);
rev_obs_map_t rev_obs;
- if (config.finalize_reexpand_meta(values, obs_mgr)) {
- _gather_changes(values.changed, &rev_obs, nullptr);
+ {
+ std::lock_guard locker(lock);
+ if (config.finalize_reexpand_meta(values, obs_mgr)) {
+ _gather_changes(values.changed, &rev_obs, nullptr);
+ }
}
- call_observers(locker, rev_obs);
+ _call_observers(rev_obs);
}
void add_observer(md_config_obs_t* obs) {
std::lock_guard l(lock);
obs_mgr.add_observer(obs);
- obs_call_gate.emplace(obs, std::make_unique<CallGate>());
+ cond.notify_all();
}
void remove_observer(md_config_obs_t* obs) {
- std::lock_guard l(lock);
- call_gate_close(obs);
- obs_call_gate.erase(obs);
- obs_mgr.remove_observer(obs);
+ std::unique_lock l(lock);
+ auto wptr = obs_mgr.remove_observer(obs);
+ while (!wptr.expired()) {
+ cond.wait(l);
+ }
}
void call_all_observers() {
- std::unique_lock locker(lock);
rev_obs_map_t rev_obs;
- obs_mgr.for_each_observer(
- [this, &rev_obs](md_config_obs_t *obs, const std::string &key) {
- map_observer_changes(obs, key, &rev_obs);
- });
+ {
+ std::lock_guard locker(lock);
+ obs_mgr.for_each_observer(
+ [this, &rev_obs](auto obs, const std::string& key) {
+ _map_observer_changes(obs, key, &rev_obs);
+ });
+ }
- call_observers(locker, rev_obs);
+ _call_observers(rev_obs);
}
void set_safe_to_start_threads() {
std::lock_guard l(lock);
}
// Expand all metavariables. Make any pending observer callbacks.
void apply_changes(std::ostream* oss) {
- std::unique_lock locker(lock);
rev_obs_map_t rev_obs;
- // apply changes until the cluster name is assigned
- if (!values.cluster.empty()) {
- // meta expands could have modified anything. Copy it all out again.
- _gather_changes(values.changed, &rev_obs, oss);
+ {
+ std::lock_guard locker(lock);
+ // apply changes until the cluster name is assigned
+ if (!values.cluster.empty()) {
+ // meta expands could have modified anything. Copy it all out again.
+ _gather_changes(values.changed, &rev_obs, oss);
+ }
}
- call_observers(locker, rev_obs);
- }
- void _gather_changes(std::set<std::string> &changes,
- rev_obs_map_t *rev_obs, std::ostream* oss) {
- obs_mgr.for_each_change(
- changes, *this,
- [this, rev_obs](md_config_obs_t *obs, const std::string &key) {
- map_observer_changes(obs, key, rev_obs);
- }, oss);
- changes.clear();
+ _call_observers(rev_obs);
}
int set_val(const std::string_view key, const std::string& s,
std::stringstream* err_ss=nullptr) {
int set_mon_vals(CephContext *cct,
const std::map<std::string,std::string,std::less<>>& kv,
md_config_t::config_callback config_cb) {
- std::unique_lock locker(lock);
- int ret = config.set_mon_vals(cct, values, obs_mgr, kv, config_cb);
-
+ int ret;
rev_obs_map_t rev_obs;
- _gather_changes(values.changed, &rev_obs, nullptr);
- call_observers(locker, rev_obs);
+ {
+ std::lock_guard locker(lock);
+ ret = config.set_mon_vals(cct, values, obs_mgr, kv, config_cb);
+ _gather_changes(values.changed, &rev_obs, nullptr);
+ }
+
+ _call_observers(rev_obs);
return ret;
}
int injectargs(const std::string &s, std::ostream *oss) {
- std::unique_lock locker(lock);
- int ret = config.injectargs(values, obs_mgr, s, oss);
-
+ int ret;
rev_obs_map_t rev_obs;
- _gather_changes(values.changed, &rev_obs, oss);
-
- call_observers(locker, rev_obs);
+ {
+ std::lock_guard locker(lock);
+ ret = config.injectargs(values, obs_mgr, s, oss);
+ _gather_changes(values.changed, &rev_obs, oss);
+ }
+ _call_observers(rev_obs);
return ret;
}
void parse_env(unsigned entity_type,
// avoid racings with other do_change() calls in parallel.
ObserverMgr<ConfigObserver>::rev_obs_map rev_obs;
owner.values.reset(new_values);
- owner.obs_mgr.for_each_change(owner.values->changed, owner,
- [&rev_obs](ConfigObserver *obs,
+ std::map<std::string, bool> changes_present;
+ for (const auto& change : owner.values->changed) {
+ std::string dummy;
+ changes_present[change] = owner.get_val(change, &dummy);
+ }
+ owner.obs_mgr.for_each_change(changes_present,
+ [&rev_obs](auto obs,
const std::string &key) {
rev_obs[obs].insert(key);
}, nullptr);
for (auto& [obs, keys] : rev_obs) {
- obs->handle_conf_change(owner, keys);
+ (*obs)->handle_conf_change(owner, keys);
}
return seastar::parallel_for_each(boost::irange(1u, seastar::smp::count),
proxy.values.reset();
proxy.values = std::move(foreign_values);
+ std::map<std::string, bool> changes_present;
+ for (const auto& change : proxy.values->changed) {
+ std::string dummy;
+ changes_present[change] = proxy.get_val(change, &dummy);
+ }
+
ObserverMgr<ConfigObserver>::rev_obs_map rev_obs;
- proxy.obs_mgr.for_each_change(proxy.values->changed, proxy,
- [&rev_obs](ConfigObserver *obs, const std::string& key) {
+ proxy.obs_mgr.for_each_change(changes_present,
+ [&rev_obs](auto obs, const std::string& key) {
rev_obs[obs].insert(key);
}, nullptr);
- for (auto& obs_keys : rev_obs) {
- obs_keys.first->handle_conf_change(proxy, obs_keys.second);
+ for (auto& [obs, keys] : rev_obs) {
+ (*obs)->handle_conf_change(proxy, keys);
}
});
}).finally([new_values] {