// Maps configuration options to the observer listening for them.
using obs_map_t = std::multimap<std::string, ConfigObs*>;
obs_map_t observers;
- /* Maps observers to the configuration options that they care about which
- * have changed. */
- using rev_obs_map_t = std::map<ConfigObs*, std::set<std::string>>;
public:
+ typedef std::map<ConfigObs*, std::set<std::string>> rev_obs_map;
+ typedef std::function<void(ConfigObs*, const std::string&)> config_gather_cb;
+
// Adds a new observer to this configuration. You can do this at any time,
// but it will only receive notifications for the changes that happen after
// you attach it, obviously.
// This function will assert if you try to delete an observer that isn't
// there.
void remove_observer(ConfigObs* observer);
+ // invoke callback for every observers tracking keys
+ void for_each_observer(config_gather_cb callback);
+ // invoke callback for observers keys tracking the provided change set
template<class ConfigProxyT>
- void call_all_observers(const ConfigProxyT& proxy);
- template<class ConfigProxyT>
- void apply_changes(const std::set<std::string>& changes,
- const ConfigProxyT& proxy,
- std::ostream *oss);
+ void for_each_change(const std::set<std::string>& changes,
+ ConfigProxyT& proxy,
+ config_gather_cb callback, std::ostream *oss);
bool is_tracking(const std::string& name) const override;
};
}
template<class ConfigObs>
-template<class ConfigProxyT>
-void ObserverMgr<ConfigObs>::call_all_observers(const ConfigProxyT& proxy)
+void ObserverMgr<ConfigObs>::for_each_observer(config_gather_cb callback)
{
- rev_obs_map_t rev_obs;
for (const auto& [key, obs] : observers) {
- rev_obs[obs].insert(key);
- }
- for (auto& [obs, keys] : rev_obs) {
- obs->handle_conf_change(proxy, keys);
+ callback(obs, key);
}
}
template<class ConfigObs>
-template<class ConfigProxy>
-void ObserverMgr<ConfigObs>::apply_changes(const std::set<std::string>& changes,
- const ConfigProxy& proxy,
- std::ostream *oss)
+template<class ConfigProxyT>
+void ObserverMgr<ConfigObs>::for_each_change(const std::set<std::string>& changes,
+ ConfigProxyT& proxy,
+ config_gather_cb callback, std::ostream *oss)
{
// create the reverse observer mapping, mapping observers to the set of
// changed keys that they'll get.
- rev_obs_map_t robs;
string val;
for (auto& key : changes) {
auto [first, last] = observers.equal_range(key);
}
}
for (auto r = first; r != last; ++r) {
- auto obs = r->second;
- robs[obs].insert(key);
+ callback(r->second, key);
}
}
- // Make any pending observer callbacks
- for (auto& [obs, keys] : robs) {
- obs->handle_conf_change(proxy, keys);
- }
}
template<class ConfigObs>
mutable ceph::recursive_mutex lock =
ceph::make_recursive_mutex("ConfigProxy::lock");
+ class CallGate {
+ private:
+ uint32_t call_count = 0;
+ ceph::mutex lock;
+ ceph::condition_variable cond;
+ public:
+ CallGate()
+ : lock(ceph::make_mutex("call::gate::lock")) {
+ }
+
+ void enter() {
+ std::lock_guard<ceph::mutex> locker(lock);
+ ++call_count;
+ }
+ void leave() {
+ std::lock_guard<ceph::mutex> locker(lock);
+ ceph_assert(call_count > 0);
+ if (--call_count == 0) {
+ cond.notify_all();
+ }
+ }
+ void close() {
+ std::unique_lock<ceph::mutex> locker(lock);
+ while (call_count != 0) {
+ cond.wait(locker);
+ }
+ }
+ };
+
+ void call_gate_enter(md_config_obs_t *obs) {
+ auto p = obs_call_gate.find(obs);
+ ceph_assert(p != obs_call_gate.end());
+ p->second->enter();
+ }
+ void call_gate_leave(md_config_obs_t *obs) {
+ auto p = obs_call_gate.find(obs);
+ ceph_assert(p != obs_call_gate.end());
+ p->second->leave();
+ }
+ void call_gate_close(md_config_obs_t *obs) {
+ auto p = obs_call_gate.find(obs);
+ ceph_assert(p != obs_call_gate.end());
+ p->second->close();
+ }
+
+ using rev_obs_map_t = ObserverMgr<md_config_obs_t>::rev_obs_map;
+ typedef std::unique_ptr<CallGate> CallGateRef;
+
+ std::map<md_config_obs_t*, CallGateRef> obs_call_gate;
+
+ void call_observers(rev_obs_map_t &rev_obs) {
+ for (auto& [obs, keys] : rev_obs) {
+ obs->handle_conf_change(*this, keys);
+ // this can be done outside the lock as call_gate_enter()
+ // and remove_observer() are serialized via lock
+ call_gate_leave(obs);
+ }
+ }
+
+ void map_observer_changes(md_config_obs_t *obs, const std::string &key,
+ rev_obs_map_t *rev_obs) {
+ ceph_assert(ceph_mutex_is_locked(lock));
+
+ auto [it, new_entry] = rev_obs->emplace(obs, std::set<std::string>{});
+ it->second.emplace(key);
+ if (new_entry) {
+ // this needs to be done under lock as once this lock is
+ // dropped (before calling observers) a remove_observer()
+ // can sneak in and cause havoc.
+ call_gate_enter(obs);
+ }
+ }
+
public:
explicit ConfigProxy(bool is_daemon)
: config{values, obs_mgr, is_daemon}
}
// for those want to reexpand special meta, e.g, $pid
void finalize_reexpand_meta() {
- std::lock_guard l(lock);
- if (config.finalize_reexpand_meta(values, obs_mgr)) {
- obs_mgr.apply_changes(values.changed, *this, nullptr);
- values.changed.clear();
+ rev_obs_map_t rev_obs;
+ {
+ std::lock_guard l(lock);
+ if (config.finalize_reexpand_meta(values, obs_mgr)) {
+ _gather_changes(values.changed, &rev_obs, nullptr);
+ values.changed.clear();
+ }
}
+
+ call_observers(rev_obs);
}
void add_observer(md_config_obs_t* obs) {
std::lock_guard l(lock);
obs_mgr.add_observer(obs);
+ obs_call_gate.emplace(obs, std::make_unique<CallGate>());
}
void remove_observer(md_config_obs_t* obs) {
std::lock_guard l(lock);
+ call_gate_close(obs);
+ obs_call_gate.erase(obs);
obs_mgr.remove_observer(obs);
}
void call_all_observers() {
- std::lock_guard l(lock);
- // Have the scope of the lock extend to the scope of
- // handle_conf_change since that function expects to be called with
- // the lock held. (And the comment in config.h says that is the
- // expected behavior.)
- //
- // An alternative might be to pass a std::unique_lock to
- // handle_conf_change and have a version of get_var that can take it
- // by reference and lock as appropriate.
- obs_mgr.call_all_observers(*this);
+ rev_obs_map_t rev_obs;
+ {
+ std::lock_guard l(lock);
+ obs_mgr.for_each_observer(
+ [this, &rev_obs](md_config_obs_t *obs, const std::string &key) {
+ map_observer_changes(obs, key, &rev_obs);
+ });
+ }
+
+ call_observers(rev_obs);
}
void set_safe_to_start_threads() {
config.set_safe_to_start_threads();
}
// Expand all metavariables. Make any pending observer callbacks.
void apply_changes(std::ostream* oss) {
- std::lock_guard l{lock};
- // apply changes until the cluster name is assigned
- if (!values.cluster.empty()) {
- // meta expands could have modified anything. Copy it all out again.
- obs_mgr.apply_changes(values.changed, *this, oss);
- values.changed.clear();
+ rev_obs_map_t rev_obs;
+ {
+ std::lock_guard l{lock};
+ // apply changes until the cluster name is assigned
+ if (!values.cluster.empty()) {
+ // meta expands could have modified anything. Copy it all out again.
+ _gather_changes(values.changed, &rev_obs, oss);
+ values.changed.clear();
+ }
}
+
+ call_observers(rev_obs);
+ }
+ void _gather_changes(std::set<std::string> &changes,
+ rev_obs_map_t *rev_obs, std::ostream* oss) {
+ obs_mgr.for_each_change(
+ changes, *this,
+ [this, rev_obs](md_config_obs_t *obs, const std::string &key) {
+ map_observer_changes(obs, key, rev_obs);
+ }, oss);
}
int set_val(const std::string& key, const std::string& s,
std::stringstream* err_ss=nullptr) {
int set_mon_vals(CephContext *cct,
const map<std::string,std::string>& kv,
md_config_t::config_callback config_cb) {
- std::lock_guard l{lock};
- int ret = config.set_mon_vals(cct, values, obs_mgr, kv, config_cb);
- obs_mgr.apply_changes(values.changed, *this, nullptr);
- values.changed.clear();
+ int ret;
+ rev_obs_map_t rev_obs;
+ {
+ std::lock_guard l{lock};
+ ret = config.set_mon_vals(cct, values, obs_mgr, kv, config_cb);
+ _gather_changes(values.changed, &rev_obs, nullptr);
+ values.changed.clear();
+ }
+
+ call_observers(rev_obs);
return ret;
}
int injectargs(const std::string &s, std::ostream *oss) {
- std::lock_guard l{lock};
- int ret = config.injectargs(values, obs_mgr, s, oss);
- obs_mgr.apply_changes(values.changed, *this, oss);
- values.changed.clear();
+ int ret;
+ rev_obs_map_t rev_obs;
+ {
+ std::lock_guard l{lock};
+ ret = config.injectargs(values, obs_mgr, s, oss);
+ _gather_changes(values.changed, &rev_obs, oss);
+ values.changed.clear();
+ }
+
+ call_observers(rev_obs);
return ret;
}
void parse_env(const char *env_var = "CEPH_ARGS") {
// always apply the new settings synchronously on the owner shard, to
// avoid racings with other do_change() calls in parallel.
+ ObserverMgr<ConfigObserver>::rev_obs_map rev_obs;
owner.values.reset(new_values);
- owner.obs_mgr.apply_changes(owner.values->changed,
- owner, nullptr);
+ owner.obs_mgr.for_each_change(owner.values->changed, owner,
+ [&rev_obs](ConfigObserver *obs,
+ const std::string &key) {
+ rev_obs[obs].insert(key);
+ }, nullptr);
+ for (auto& [obs, keys] : rev_obs) {
+ obs->handle_conf_change(owner, keys);
+ }
return seastar::parallel_for_each(boost::irange(1u, seastar::smp::count),
[&owner, new_values] (auto cpu) {
[foreign_values = seastar::make_foreign(new_values)](ConfigProxy& proxy) mutable {
proxy.values.reset();
proxy.values = std::move(foreign_values);
- proxy.obs_mgr.apply_changes(proxy.values->changed,
- proxy, nullptr);
+
+ ObserverMgr<ConfigObserver>::rev_obs_map rev_obs;
+ proxy.obs_mgr.for_each_change(proxy.values->changed, proxy,
+ [&rev_obs](md_config_obs_t *obs,
+ const std::string &key) {
+ rev_obs[obs].insert(key);
+ }, nullptr);
+ for (auto& [obs, keys] : rev_obs) {
+ obs->handle_conf_change(proxy, keys);
+ }
});
}).finally([new_values] {
new_values->changed.clear();