monclient->init();
monclient->set_want_keys(CEPH_ENTITY_TYPE_MDS | CEPH_ENTITY_TYPE_OSD);
- monclient->sub_want("mdsmap", mdsmap->get_epoch());
- monclient->sub_want_onetime("osdmap", 0);
+ monclient->sub_want("mdsmap", 0, 0);
+ monclient->sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME);
// do logger crap only once per process.
static bool did_init = false;
#define CEPH_FEATURE_NOSRCADDR (1<<1)
#define CEPH_FEATURE_MONCLOCKCHECK (1<<2)
#define CEPH_FEATURE_FLOCK (1<<3)
+#define CEPH_FEATURE_SUBSCRIBE2 (1<<4)
/*
struct ceph_mon_request_header monhdr;
} __attribute__ ((packed));
+#define CEPH_SUBSCRIBE_ONETIME 1 /* i want only 1 update after have */
+
struct ceph_mon_subscribe_item {
- __le64 have_version; __le64 have;
- __u8 onetime;
+ __le64 start;
+ __u8 flags;
} __attribute__ ((packed));
struct ceph_mon_subscribe_ack {
inline ostream& operator<<(ostream& out, const ceph_mon_subscribe_item& i)
{
- return out << i.have << (i.onetime ? "" : "+");
+ return out << i.start
+ << ((i.flags & CEPH_SUBSCRIBE_ONETIME) ? "" : "+");
}
#endif
objecter->init();
- monc->sub_want("mdsmap", 0);
+ monc->sub_want("mdsmap", 0, 0);
monc->renew_subs();
// schedule tick
#include "msg/Message.h"
+/*
+ * compatibility with old crap
+ */
+struct ceph_mon_subscribe_item_old {
+ __le64 unused;
+ __le64 have;
+ __u8 onetime;
+} __attribute__ ((packed));
+WRITE_RAW_ENCODER(ceph_mon_subscribe_item_old)
+
+
struct MMonSubscribe : public Message {
map<string, ceph_mon_subscribe_item> what;
~MMonSubscribe() {}
public:
- void sub_onetime(const char *w, version_t have) {
- what[w].onetime = true;
- what[w].have = have;
- }
- void sub_persistent(const char *w, version_t have) {
- what[w].onetime = false;
- what[w].have = have;
+ void sub_want(const char *w, version_t start, unsigned flags) {
+ what[w].start = start;
+ what[w].flags = flags;
}
const char *get_type_name() { return "mon_subscribe"; }
void decode_payload() {
bufferlist::iterator p = payload.begin();
- ::decode(what, p);
+ if (header.version < 2) {
+ map<string, ceph_mon_subscribe_item_old> oldwhat;
+ ::decode(oldwhat, p);
+ what.clear();
+ for (map<string, ceph_mon_subscribe_item_old>::iterator q = oldwhat.begin();
+ q != oldwhat.end();
+ q++) {
+ if (q->second.have)
+ what[q->first].start = q->second.have + 1;
+ else
+ what[q->first].start = 0;
+ what[q->first].flags = 0;
+ if (q->second.onetime)
+ what[q->first].flags |= CEPH_SUBSCRIBE_ONETIME;
+ }
+ } else {
+ ::decode(what, p);
+ }
}
void encode_payload() {
- ::encode(what, payload);
+ if (get_connection()->has_feature(CEPH_FEATURE_SUBSCRIBE2)) {
+ header.version = 2;
+ ::encode(what, payload);
+ } else {
+ map<string, ceph_mon_subscribe_item_old> oldwhat;
+ for (map<string, ceph_mon_subscribe_item>::iterator q = what.begin();
+ q != what.end();
+ q++) {
+ if (q->second.start)
+ // warning: start=1 -> have=0, which was ambiguous
+ oldwhat[q->first].have = q->second.start - 1;
+ else
+ oldwhat[q->first].have = 0;
+ oldwhat[q->first].onetime = q->second.flags & CEPH_SUBSCRIBE_ONETIME;
+ }
+ ::encode(oldwhat, payload);
+ }
}
};
void MDSMonitor::check_sub(Subscription *sub)
{
- if (sub->last < mdsmap.get_epoch()) {
+ if (sub->next <= mdsmap.get_epoch()) {
mon->messenger->send_message(new MMDSMap(mon->monmap->fsid, &mdsmap),
sub->session->inst);
if (sub->onetime)
mon->session_map.remove_sub(sub);
else
- sub->last = mdsmap.get_epoch();
+ sub->next = mdsmap.get_epoch() + 1;
}
}
dout(10) << "get_monmap" << dendl;
Mutex::Locker l(monc_lock);
- _sub_want("monmap", monmap.get_epoch());
+ _sub_want("monmap", 0, 0);
if (cur_mon < 0)
_reopen_session();
return 0;
}
- _sub_want("monmap", monmap.get_epoch());
+ _sub_want("monmap", monmap.get_epoch() ? monmap.get_epoch() + 1 : 0, 0);
if (cur_mon < 0)
_reopen_session();
void _renew_subs();
void handle_subscribe_ack(MMonSubscribeAck* m);
- void _sub_want(string what, version_t have) {
- sub_have[what].have = have;
- sub_have[what].onetime = false;
- }
- bool _sub_want_onetime(string what, version_t have) {
- if (sub_have.count(what) == 0) {
- sub_have[what].have = have;
- sub_have[what].onetime = true;
- return true;
- } else
- sub_have[what].have = have;
- return false;
- }
- void _sub_got(string what, version_t have) {
+ bool _sub_want(string what, version_t start, unsigned flags) {
+ if (sub_have.count(what) &&
+ sub_have[what].start == start &&
+ sub_have[what].flags == flags)
+ return false;
+ sub_have[what].start = start;
+ sub_have[what].flags = flags;
+ return true;
+ }
+ void _sub_got(string what, version_t got) {
if (sub_have.count(what)) {
- if (sub_have[what].onetime)
+ if (sub_have[what].flags & CEPH_SUBSCRIBE_ONETIME)
sub_have.erase(what);
else
- sub_have[what].have = have;
+ sub_have[what].start = got + 1;
}
}
Mutex::Locker l(monc_lock);
_renew_subs();
}
- void sub_want(string what, version_t have) {
- Mutex::Locker l(monc_lock);
- _sub_want(what, have);
- }
- bool sub_want_onetime(string what, version_t have) {
+ bool sub_want(string what, version_t start, unsigned flags) {
Mutex::Locker l(monc_lock);
- return _sub_want_onetime(what, have);
+ return _sub_want(what, start, flags);
}
void sub_got(string what, version_t have) {
Mutex::Locker l(monc_lock);
for (map<string,ceph_mon_subscribe_item>::iterator p = m->what.begin();
p != m->what.end();
p++) {
- if (!p->second.onetime)
+ // if there are any non-onetime subscriptions, we need to reply to start the resubscribe timer
+ if ((p->second.flags & CEPH_SUBSCRIBE_ONETIME) == 0)
reply = true;
- session_map.add_update_sub(s, p->first, p->second.have, p->second.onetime);
+
+ session_map.add_update_sub(s, p->first, p->second.start,
+ p->second.flags & CEPH_SUBSCRIBE_ONETIME);
+
if (p->first == "mdsmap") {
if ((int)s->caps.check_privileges(PAXOS_MDSMAP, MON_CAP_R)) {
mdsmon()->check_sub(s->sub_map["mdsmap"]);
void Monitor::check_sub(Subscription *sub)
{
- dout(0) << "check_sub monmap last " << sub->last << " have " << monmap->get_epoch() << dendl;
- if (sub->last < monmap->get_epoch()) {
+ dout(0) << "check_sub monmap next " << sub->next << " have " << monmap->get_epoch() << dendl;
+ if (sub->next <= monmap->get_epoch()) {
send_latest_monmap(sub->session->inst);
if (sub->onetime)
session_map.remove_sub(sub);
else
- sub->last = monmap->get_epoch();
+ sub->next = monmap->get_epoch() + 1;
}
}
void OSDMonitor::check_sub(Subscription *sub)
{
- if (sub->last < osdmap.get_epoch()) {
- if (sub->last)
- send_incremental(sub->last, sub->session->inst);
+ if (sub->next <= osdmap.get_epoch()) {
+ if (sub->next > 1)
+ send_incremental(sub->next - 1, sub->session->inst);
else
mon->messenger->send_message(new MOSDMap(mon->monmap->fsid, &osdmap),
sub->session->inst);
if (sub->onetime)
mon->session_map.remove_sub(sub);
else
- sub->last = osdmap.get_epoch();
+ sub->next = osdmap.get_epoch() + 1;
}
}
MonSession *session;
string type;
xlist<Subscription*>::item type_item;
- version_t last;
+ version_t next;
bool onetime;
- Subscription(MonSession *s, const string& t) : session(s), type(t), type_item(this) {};
+ Subscription(MonSession *s, const string& t) : session(s), type(t), type_item(this),
+ next(0), onetime(false) {};
};
struct MonSession : public RefCountedObject {
}
- void add_update_sub(MonSession *s, const string& what, version_t have, bool onetime) {
+ void add_update_sub(MonSession *s, const string& what, version_t start, bool onetime) {
Subscription *sub = 0;
if (s->sub_map.count(what)) {
sub = s->sub_map[what];
s->sub_map[what] = sub;
subs[what].push_back(&sub->type_item);
}
- sub->last = have;
+ sub->next = start;
sub->onetime = onetime;
}
*/
// default feature(s) everyone gets
-#define MSGR_FEATURES_SUPPORTED CEPH_FEATURE_NOSRCADDR
+#define MSGR_FEATURES_SUPPORTED CEPH_FEATURE_NOSRCADDR|CEPH_FEATURE_SUBSCRIBE2
class SimpleMessenger : public Messenger {
public:
Policy() :
lossy(false), server(false), throttler(NULL),
- features_supported(MSGR_FEATURES_SUPPORTED), features_required(0) {}
+ features_supported(MSGR_FEATURES_SUPPORTED),
+ features_required(0) {}
Policy(bool l, bool s, uint64_t sup, uint64_t req) :
lossy(l), server(s), throttler(NULL),
- features_supported(sup | MSGR_FEATURES_SUPPORTED), features_required(req) {}
+ features_supported(sup | MSGR_FEATURES_SUPPORTED),
+ features_required(req) {}
static Policy stateful_server(uint64_t sup, uint64_t req) { return Policy(false, true, sup, req); }
static Policy stateless_server(uint64_t sup, uint64_t req) { return Policy(true, true, sup, req); }
monc->set_want_keys(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD);
monc->init();
- monc->sub_want("monmap", 0);
-
osd_lock.Unlock();
monc->authenticate();
if (now - last_mon_heartbeat > g_conf.osd_mon_heartbeat_interval) {
last_mon_heartbeat = now;
dout(10) << "i have no heartbeat peers; checking mon for new map" << dendl;
- monc->sub_want_onetime("osdmap", osdmap->get_epoch());
+ monc->sub_want("osdmap", osdmap->get_epoch() + 1, CEPH_SUBSCRIBE_ONETIME);
monc->renew_subs();
}
}
{
// ask?
if (waiting_for_osdmap.empty()) {
- monc->sub_want_onetime("osdmap", osdmap->get_epoch());
+ monc->sub_want("osdmap", osdmap->get_epoch() + 1, CEPH_SUBSCRIBE_ONETIME);
monc->renew_subs();
}
}
else {
dout(10) << "handle_osd_map missing epoch " << cur+1 << dendl;
- monc->sub_want_onetime("osdmap", cur);
+ monc->sub_want("osdmap", cur+1, CEPH_SUBSCRIBE_ONETIME);
monc->renew_subs();
break;
}
}
else {
dout(3) << "handle_osd_map requesting missing epoch " << osdmap->get_epoch()+1 << dendl;
- monc->sub_want_onetime("osdmap", osdmap->get_epoch());
+ monc->sub_want("osdmap", osdmap->get_epoch() + 1, CEPH_SUBSCRIBE_ONETIME);
monc->renew_subs();
break;
}
scan_pgs(changed_pgs);
} else {
dout(3) << "handle_osd_map hmm, i want a full map, requesting" << dendl;
- monc->sub_want_onetime("osdmap", 0);
+ monc->sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME);
monc->renew_subs();
}
}
void Objecter::maybe_request_map()
{
dout(10) << "maybe_request_map subscribing (onetime) to next osd map" << dendl;
- if (monc->sub_want_onetime("osdmap", osdmap->get_epoch()))
+ if (monc->sub_want("osdmap", osdmap->get_epoch() ? osdmap->get_epoch()+1 : 0, CEPH_SUBSCRIBE_ONETIME))
monc->renew_subs();
}