#define CEPH_COND_H
#include "include/Context.h"
-
-class Cond {
- // my bits
- pthread_cond_t _c;
-
- Mutex *waiter_mutex;
-
- // don't allow copying.
- void operator=(Cond &C);
- Cond(const Cond &C);
-
- public:
- Cond() : waiter_mutex(NULL) {
- int r = pthread_cond_init(&_c,NULL);
- assert(r == 0);
- }
- virtual ~Cond() {
- pthread_cond_destroy(&_c);
- }
-
- int Wait(Mutex &mutex) {
- // make sure this cond is used with one mutex only
- assert(waiter_mutex == NULL || waiter_mutex == &mutex);
- waiter_mutex = &mutex;
-
- assert(mutex.is_locked());
-
- mutex._pre_unlock();
- int r = pthread_cond_wait(&_c, &mutex._m);
- mutex._post_lock();
- return r;
- }
-
- int WaitUntil(Mutex &mutex, utime_t when) {
- // make sure this cond is used with one mutex only
- assert(waiter_mutex == NULL || waiter_mutex == &mutex);
- waiter_mutex = &mutex;
-
- assert(mutex.is_locked());
-
- struct timespec ts;
- when.to_timespec(&ts);
-
- mutex._pre_unlock();
- int r = pthread_cond_timedwait(&_c, &mutex._m, &ts);
- mutex._post_lock();
-
- return r;
- }
-
- int WaitInterval(Mutex &mutex, utime_t interval) {
- utime_t when = ceph_clock_now();
- when += interval;
- return WaitUntil(mutex, when);
- }
-
- template<typename Duration>
- int WaitInterval(Mutex &mutex, Duration interval) {
- ceph::real_time when(ceph::real_clock::now());
- when += interval;
-
- struct timespec ts = ceph::real_clock::to_timespec(when);
-
- mutex._pre_unlock();
- int r = pthread_cond_timedwait(&_c, &mutex._m, &ts);
- mutex._post_lock();
-
- return r;
- }
-
- int SloppySignal() {
- int r = pthread_cond_broadcast(&_c);
- return r;
- }
- int Signal() {
- // make sure signaler is holding the waiter's lock.
- assert(waiter_mutex == NULL ||
- waiter_mutex->is_locked());
-
- int r = pthread_cond_broadcast(&_c);
- return r;
- }
- int SignalOne() {
- // make sure signaler is holding the waiter's lock.
- assert(waiter_mutex == NULL ||
- waiter_mutex->is_locked());
-
- int r = pthread_cond_signal(&_c);
- return r;
- }
- int SignalAll() {
- // make sure signaler is holding the waiter's lock.
- assert(waiter_mutex == NULL ||
- waiter_mutex->is_locked());
-
- int r = pthread_cond_broadcast(&_c);
- return r;
- }
-};
+#include "CondVar.h"
/**
* context to signal a cond
--- /dev/null
+#ifndef CEPH_COND_VAR_H
+#define CEPH_COND_VAR_H
+
+#include "include/utime.h"
+
+#include "Clock.h"
+#include "Mutex.h"
+#include "pthread.h"
+
+class Cond {
+ // my bits
+ pthread_cond_t _c;
+
+ Mutex *waiter_mutex;
+
+ // don't allow copying.
+ void operator=(Cond &C);
+ Cond(const Cond &C);
+
+ public:
+ Cond() : waiter_mutex(NULL) {
+ int r = pthread_cond_init(&_c,NULL);
+ assert(r == 0);
+ }
+ virtual ~Cond() {
+ pthread_cond_destroy(&_c);
+ }
+
+ int Wait(Mutex &mutex) {
+ // make sure this cond is used with one mutex only
+ assert(waiter_mutex == NULL || waiter_mutex == &mutex);
+ waiter_mutex = &mutex;
+
+ assert(mutex.is_locked());
+
+ mutex._pre_unlock();
+ int r = pthread_cond_wait(&_c, &mutex._m);
+ mutex._post_lock();
+ return r;
+ }
+
+ int WaitUntil(Mutex &mutex, utime_t when) {
+ // make sure this cond is used with one mutex only
+ assert(waiter_mutex == NULL || waiter_mutex == &mutex);
+ waiter_mutex = &mutex;
+
+ assert(mutex.is_locked());
+
+ struct timespec ts;
+ when.to_timespec(&ts);
+
+ mutex._pre_unlock();
+ int r = pthread_cond_timedwait(&_c, &mutex._m, &ts);
+ mutex._post_lock();
+
+ return r;
+ }
+
+ int WaitInterval(Mutex &mutex, utime_t interval) {
+ utime_t when = ceph_clock_now();
+ when += interval;
+ return WaitUntil(mutex, when);
+ }
+
+ template<typename Duration>
+ int WaitInterval(Mutex &mutex, Duration interval) {
+ ceph::real_time when(ceph::real_clock::now());
+ when += interval;
+
+ struct timespec ts = ceph::real_clock::to_timespec(when);
+
+ mutex._pre_unlock();
+ int r = pthread_cond_timedwait(&_c, &mutex._m, &ts);
+ mutex._post_lock();
+
+ return r;
+ }
+
+ int SloppySignal() {
+ int r = pthread_cond_broadcast(&_c);
+ return r;
+ }
+ int Signal() {
+ // make sure signaler is holding the waiter's lock.
+ assert(waiter_mutex == NULL ||
+ waiter_mutex->is_locked());
+
+ int r = pthread_cond_broadcast(&_c);
+ return r;
+ }
+ int SignalOne() {
+ // make sure signaler is holding the waiter's lock.
+ assert(waiter_mutex == NULL ||
+ waiter_mutex->is_locked());
+
+ int r = pthread_cond_signal(&_c);
+ return r;
+ }
+ int SignalAll() {
+ // make sure signaler is holding the waiter's lock.
+ assert(waiter_mutex == NULL ||
+ waiter_mutex->is_locked());
+
+ int r = pthread_cond_broadcast(&_c);
+ return r;
+ }
+};
+
+#endif // CEPH_COND_VAR_H
#include "osd/osd_types.h"
#include "common/errno.h"
#include "common/hostname.h"
+#include "common/backport14.h"
#include <boost/type_traits.hpp>
obs_map_t::value_type val(*k, observer_);
observers.insert(val);
}
+ obs_call_gate.emplace(observer_, ceph::make_unique<CallGate>());
}
void md_config_t::remove_observer(md_config_obs_t* observer_)
{
Mutex::Locker l(lock);
+
+ call_gate_close(observer_);
+ obs_call_gate.erase(observer_);
+
bool found_obs = false;
for (obs_map_t::iterator o = observers.begin(); o != observers.end(); ) {
if (o->second == observer_) {
void md_config_t::apply_changes(std::ostream *oss)
{
- Mutex::Locker l(lock);
- /*
- * apply changes until the cluster name is assigned
- */
- if (cluster.size())
- _apply_changes(oss);
+ rev_obs_map_t rev_obs;
+ {
+ Mutex::Locker l(lock);
+ /*
+ * apply changes until the cluster name is assigned
+ */
+ if (cluster.size()) {
+ for_each_change(
+ oss, [this, &rev_obs](md_config_obs_t *obs, const std::string &key) {
+ map_observer_changes(obs, key, &rev_obs);
+ });
+ }
+ }
+
+ call_observers(rev_obs);
}
bool md_config_t::_internal_field(const string& s)
return false;
}
-void md_config_t::_apply_changes(std::ostream *oss)
+void md_config_t::for_each_change(std::ostream *oss, config_gather_cb callback)
{
- /* Maps observers to the configuration options that they care about which
- * have changed. */
- typedef std::map < md_config_obs_t*, std::set <std::string> > rev_obs_map_t;
-
expand_all_meta();
// expand_all_meta could have modified anything. Copy it all out again.
update_legacy_val(option, ptr);
}
- // create the reverse observer mapping, mapping observers to the set of
- // changed keys that they'll get.
- rev_obs_map_t robs;
std::set <std::string> empty_set;
char buf[128];
char *bufptr = (char*)buf;
}
}
for (obs_map_t::iterator r = range.first; r != range.second; ++r) {
- rev_obs_map_t::value_type robs_val(r->second, empty_set);
- pair < rev_obs_map_t::iterator, bool > robs_ret(robs.insert(robs_val));
- std::set <std::string> &keys(robs_ret.first->second);
- keys.insert(key);
+ callback(r->second, key);
}
}
changed.clear();
-
- // Make any pending observer callbacks
- for (rev_obs_map_t::const_iterator r = robs.begin(); r != robs.end(); ++r) {
- md_config_obs_t *obs = r->first;
- obs->handle_conf_change(this, r->second);
- }
-
}
void md_config_t::call_all_observers()
{
- std::map<md_config_obs_t*,std::set<std::string> > obs;
+ rev_obs_map_t rev_obs;
{
Mutex::Locker l(lock);
expand_all_meta();
for (auto r = observers.begin(); r != observers.end(); ++r) {
- obs[r->second].insert(r->first);
+ map_observer_changes(r->second, r->first, &rev_obs);
}
}
- for (auto p = obs.begin();
- p != obs.end();
- ++p) {
- p->first->handle_conf_change(this, p->second);
- }
+
+ call_observers(rev_obs);
}
int md_config_t::injectargs(const std::string& s, std::ostream *oss)
{
int ret;
- Mutex::Locker l(lock);
- char b[s.length()+1];
- strcpy(b, s.c_str());
- std::vector<const char*> nargs;
- char *p = b;
- while (*p) {
- nargs.push_back(p);
- while (*p && *p != ' ') p++;
- if (!*p)
- break;
- *p++ = 0;
- while (*p && *p == ' ') p++;
- }
- ret = parse_injectargs(nargs, oss);
- if (!nargs.empty()) {
- *oss << " failed to parse arguments: ";
- std::string prefix;
- for (std::vector<const char*>::const_iterator i = nargs.begin();
- i != nargs.end(); ++i) {
- *oss << prefix << *i;
- prefix = ",";
+ rev_obs_map_t rev_obs;
+ {
+ Mutex::Locker l(lock);
+
+ char b[s.length()+1];
+ strcpy(b, s.c_str());
+ std::vector<const char*> nargs;
+ char *p = b;
+ while (*p) {
+ nargs.push_back(p);
+ while (*p && *p != ' ') p++;
+ if (!*p)
+ break;
+ *p++ = 0;
+ while (*p && *p == ' ') p++;
+ }
+ ret = parse_injectargs(nargs, oss);
+ if (!nargs.empty()) {
+ *oss << " failed to parse arguments: ";
+ std::string prefix;
+ for (std::vector<const char*>::const_iterator i = nargs.begin();
+ i != nargs.end(); ++i) {
+ *oss << prefix << *i;
+ prefix = ",";
+ }
+ *oss << "\n";
+ ret = -EINVAL;
}
- *oss << "\n";
- ret = -EINVAL;
+
+ for_each_change(
+ oss, [this, &rev_obs](md_config_obs_t *obs, const std::string &key) {
+ map_observer_changes(obs, key, &rev_obs);
+ });
}
- _apply_changes(oss);
+
+ call_observers(rev_obs);
return ret;
}
::complain_about_parse_errors(cct, &parse_errors);
}
+void md_config_t::call_observers(rev_obs_map_t &rev_obs) {
+ for (auto p : rev_obs) {
+ p.first->handle_conf_change(this, p.second);
+ // this can be done outside the lock as call_gate_enter()
+ // and remove_observer() are serialized via lock
+ call_gate_leave(p.first);
+ }
+}
+
+void md_config_t::map_observer_changes(md_config_obs_t *obs, const std::string &key,
+ rev_obs_map_t *rev_obs) {
+ ceph_assert(lock.is_locked());
+
+ auto p = rev_obs->emplace(obs, std::set<std::string>{});
+
+ p.first->second.emplace(key);
+ if (p.second) {
+ // this needs to be done under lock as once this lock is
+ // dropped (before calling observers) a remove_observer()
+ // can sneak in and cause havoc.
+ call_gate_enter(p.first->first);
+ }
+}
#include "common/entity_name.h"
#include "common/code_environment.h"
#include "common/Mutex.h"
+#include "common/CondVar.h"
#include "log/SubsystemMap.h"
#include "common/config_obs.h"
#include "common/options.h"
* while another thread is reading them, either.
*/
struct md_config_t {
+private:
+ class CallGate {
+ private:
+ uint32_t call_count = 0;
+ Mutex lock;
+ Cond cond;
+ public:
+ CallGate()
+ : lock("call::gate::lock", false, true) {
+ }
+
+ void enter() {
+ Mutex::Locker locker(lock);
+ ++call_count;
+ }
+ void leave() {
+ Mutex::Locker locker(lock);
+ ceph_assert(call_count > 0);
+ if (--call_count == 0) {
+ cond.Signal();
+ }
+ }
+ void close() {
+ Mutex::Locker locker(lock);
+ while (call_count != 0) {
+ cond.Wait(lock);
+ }
+ }
+ };
+
+ void call_gate_enter(md_config_obs_t *obs) {
+ auto p = obs_call_gate.find(obs);
+ ceph_assert(p != obs_call_gate.end());
+ p->second->enter();
+ }
+ void call_gate_leave(md_config_obs_t *obs) {
+ auto p = obs_call_gate.find(obs);
+ ceph_assert(p != obs_call_gate.end());
+ p->second->leave();
+ }
+ void call_gate_close(md_config_obs_t *obs) {
+ auto p = obs_call_gate.find(obs);
+ ceph_assert(p != obs_call_gate.end());
+ p->second->close();
+ }
+
+ typedef std::unique_ptr<CallGate> CallGateRef;
+ std::map<md_config_obs_t*, CallGateRef> obs_call_gate;
+
+ typedef std::map<md_config_obs_t*, std::set<std::string>> rev_obs_map_t;
+ typedef std::function<void(md_config_obs_t*, const std::string&)> config_gather_cb;
+
+ void call_observers(rev_obs_map_t &rev_obs);
+ void map_observer_changes(md_config_obs_t *obs, const std::string &key,
+ rev_obs_map_t *rev_obs);
+
public:
typedef boost::variant<int64_t md_config_t::*,
uint64_t md_config_t::*,
// Expand all metavariables. Make any pending observer callbacks.
void apply_changes(std::ostream *oss);
- void _apply_changes(std::ostream *oss);
+ void for_each_change(std::ostream *oss, config_gather_cb callback);
bool _internal_field(const string& k);
void call_all_observers();