delete oldmap;
delete m;
- monclient->update_sub("mdsmap", mdsmap->get_epoch());
+ monclient->sub_got("mdsmap", mdsmap->get_epoch());
}
void Client::send_reconnect(int mds)
<< " and mdsmap " << mdsmap->get_epoch()
<< dendl;
- monclient->update_sub("mdsmap", mdsmap->get_epoch());
+ monclient->sub_want("mdsmap", mdsmap->get_epoch());
+ monclient->renew_subs();
// hack: get+pin root inode.
#include "msg/Message.h"
struct MMonSubscribe : public Message {
- map<nstring, version_t> what;
+ struct sub_rec {
+ version_t have;
+ bool onetime; // just one version, or keep sending them?
+
+ void encode(bufferlist& bl) const {
+ ::encode(have, bl);
+ ::encode(onetime, bl);
+ }
+ void decode(bufferlist::iterator& bl) {
+ ::decode(have, bl);
+ ::decode(onetime, bl);
+ }
+ };
+ WRITE_CLASS_ENCODER(sub_rec)
+
+ map<nstring, sub_rec> what;
MMonSubscribe() : Message(CEPH_MSG_MON_SUBSCRIBE) {}
+ void sub_onetime(const char *w, version_t have) {
+ what[w].onetime = true;
+ what[w].have = have;
+ }
+ void sub_persistent(const char *w, version_t have) {
+ what[w].onetime = false;
+ what[w].have = have;
+ }
+
const char *get_type_name() { return "mon_subscribe"; }
void print(ostream& o) {
o << "mon_subscribe(" << what << ")";
::encode(what, payload);
}
};
+WRITE_CLASS_ENCODER(MMonSubscribe::sub_rec)
+
+static inline ostream& operator<<(ostream& out, const MMonSubscribe::sub_rec& r)
+{
+ return out << r.have << (r.onetime ? "(onetime)":"(persistent)");
+}
#endif
send_full(p->first);
p->second.last = mdsmap.get_epoch();
}
- }
+ }
+ subs.trim_onetime();
}
#include "common/Timer.h"
+#include "messages/MMonSubscribe.h"
class MonMap;
class MMonMap;
// mon subscriptions
private:
- map<nstring,version_t> sub_have; // my subs, and current versions
+ map<nstring,MMonSubscribe::sub_rec> sub_have; // my subs, and current versions
utime_t sub_renew_sent, sub_renew_after;
public:
void renew_subs();
- void update_sub(nstring what, version_t have) {
- bool had = sub_have.count(what);
- sub_have[what] = have;
- if (!had)
- renew_subs();
+ void sub_want(nstring what, version_t have) {
+ sub_have[what].have = have;
+ sub_have[what].onetime = false;
+ }
+ void sub_want_onetime(nstring what, version_t have) {
+ sub_have[what].have = have;
+ sub_have[what].onetime = true;
+ }
+ void sub_got(nstring what, version_t have) {
+ if (sub_have.count(what)) {
+ if (sub_have[what].onetime)
+ sub_have.erase(what);
+ else
+ sub_have[what].have = have;
+ }
}
void handle_subscribe_ack(MMonSubscribeAck* m);
utime_t until = g_clock.now();
until += g_conf.mon_subscribe_interval;
- for (map<nstring,version_t>::iterator p = m->what.begin();
+ for (map<nstring,MMonSubscribe::sub_rec>::iterator p = m->what.begin();
p != m->what.end();
p++) {
if (p->first == "osdmap")
- osdmon()->subscribe(m->get_source_inst(), p->second, until);
+ osdmon()->subscribe(m->get_source_inst(), p->second.have, p->second.onetime ? utime_t() : until);
else if (p->first == "mdsmap")
- mdsmon()->subscribe(m->get_source_inst(), p->second, until);
+ mdsmon()->subscribe(m->get_source_inst(), p->second.have, p->second.onetime ? utime_t() : until);
else
dout(10) << " ignoring sub for '" << p->first << "'" << dendl;
}
send_latest(p->first, p->second.last);
p->second.last = osdmap.get_epoch();
}
- }
+ }
+ subs.trim_onetime();
}
void trim(utime_t now) {
map<entity_inst_t, sub_info>::iterator p = subs.begin();
while (p != subs.end())
- if (p->second.until < now)
+ if (p->second.until != utime_t() &&
+ p->second.until < now)
+ subs.erase(p++);
+ else
+ p++;
+ }
+ void trim_onetime() {
+ map<entity_inst_t, sub_info>::iterator p = subs.begin();
+ while (p != subs.end())
+ if (p->second.until == utime_t())
subs.erase(p++);
else
p++;
lockdep = 1
debug mon = 20
debug paxos = 20
- debug ms = 1'
+ debug ms = 20'
COSDDEBUG='
lockdep = 1
debug ms = 1
debug filestore = 10'
CMDSDEBUG='
lockdep = 1
- debug ms = 1
+ debug ms = 20
debug mds = 20
mds log max segments = 2'
fi