// @param func a functor which accepts @c "ConfigValues&"
template<typename Func>
seastar::future<> do_change(Func&& func) {
- auto new_values = seastar::make_lw_shared(*values);
- func(*new_values);
- return seastar::do_with(seastar::make_foreign(new_values),
- [this](auto& foreign_values) {
- return container().invoke_on_all([&foreign_values](ConfigProxy& proxy) {
- return foreign_values.copy().then([&proxy](auto foreign_values) {
- proxy.values.reset();
- proxy.values = std::move(foreign_values);
- proxy.obs_mgr.apply_changes(proxy.values->changed,
- proxy, nullptr);
+ return container().invoke_on(values.get_owner_shard(),
+ [func = std::move(func)](ConfigProxy& owner) {
+ // apply the changes to a copy of the values
+ auto new_values = seastar::make_lw_shared(*owner.values);
+ new_values->changed.clear();
+ func(*new_values);
+
+ // always apply the new settings synchronously on the owner shard, to
+ // avoid racings with other do_change() calls in parallel.
+ owner.values.reset(new_values);
+
+ return seastar::parallel_for_each(boost::irange(1u, seastar::smp::count),
+ [&owner, new_values] (auto cpu) {
+ return owner.container().invoke_on(cpu,
+ [foreign_values = seastar::make_foreign(new_values)](ConfigProxy& proxy) mutable {
+ proxy.values.reset();
+ proxy.values = std::move(foreign_values);
+ proxy.obs_mgr.apply_changes(proxy.values->changed,
+ proxy, nullptr);
+ });
+ }).finally([new_values] {
+ new_values->changed.clear();
});
});
- }).then([this] {
- values->changed.clear();
- });
}
public:
ConfigProxy();
return ceph::common::sharded_conf().start().then([] {
return ceph::common::sharded_conf().invoke_on(0, &Config::start);
}).then([] {
- return ceph::common::sharded_conf().invoke_on_all([](auto& config) {
+ return ceph::common::sharded_conf().invoke_on_all([](Config& config) {
return config.set_val("osd_tracing", "true");
});
}).then([] {
- return ceph::common::local_conf().get_val<bool>("osd_tracing");
- }).then([](bool osd_tracing) {
- if (osd_tracing) {
- return seastar::make_ready_future<>();
- } else {
- throw std::runtime_error("run osd_tracing");
- }
+ return ceph::common::sharded_conf().invoke_on_all([](Config& config) {
+ if (!config.get_val<bool>("osd_tracing")) {
+ throw std::runtime_error("run osd_tracing");
+ }
+ });
}).then([] {
return ceph::common::sharded_conf().stop();
});