::encode(global_id, m->auth_payload);
_send_mon_message(m, true);
- if (!sub_have.empty())
+ for (map<string,ceph_mon_subscribe_item>::iterator p = sub_sent.begin();
+ p != sub_sent.end();
+ ++p) {
+ if (sub_new.count(p->first) == 0)
+ sub_new[p->first] = p->second;
+ }
+ if (!sub_new.empty())
_renew_subs();
}
void MonClient::_renew_subs()
{
assert(monc_lock.is_locked());
- if (sub_have.empty()) {
+ if (sub_new.empty()) {
ldout(cct, 10) << "renew_subs - empty" << dendl;
return;
}
sub_renew_sent = ceph_clock_now(cct);
MMonSubscribe *m = new MMonSubscribe;
- m->what = sub_have;
+ m->what = sub_new;
_send_mon_message(m);
+
+ sub_sent.insert(sub_new.begin(), sub_new.end());
+ sub_new.clear();
}
}
// mon subscriptions
private:
- map<string,ceph_mon_subscribe_item> sub_have; // my subs, and current versions
+ map<string,ceph_mon_subscribe_item> sub_sent; // my subs, and current versions
+ map<string,ceph_mon_subscribe_item> sub_new; // unsent new subs
utime_t sub_renew_sent, sub_renew_after;
void _renew_subs();
void handle_subscribe_ack(MMonSubscribeAck* m);
bool _sub_want(string what, version_t start, unsigned flags) {
- if (sub_have.count(what) &&
- sub_have[what].start == start &&
- sub_have[what].flags == flags)
+ if ((sub_new.count(what) == 0 &&
+ sub_sent.count(what) &&
+ sub_sent[what].start == start &&
+ sub_sent[what].flags == flags) ||
+ (sub_new.count(what) &&
+ sub_new[what].start == start &&
+ sub_new[what].flags == flags))
return false;
- sub_have[what].start = start;
- sub_have[what].flags = flags;
+ sub_new[what].start = start;
+ sub_new[what].flags = flags;
return true;
}
void _sub_got(string what, version_t got) {
- if (sub_have.count(what)) {
- if (sub_have[what].flags & CEPH_SUBSCRIBE_ONETIME)
- sub_have.erase(what);
+ if (sub_new.count(what)) {
+ if (sub_new[what].flags & CEPH_SUBSCRIBE_ONETIME)
+ sub_new.erase(what);
else
- sub_have[what].start = got + 1;
+ sub_new[what].start = got + 1;
+ } else if (sub_sent.count(what)) {
+ if (sub_sent[what].flags & CEPH_SUBSCRIBE_ONETIME)
+ sub_sent.erase(what);
+ else
+ sub_sent[what].start = got + 1;
}
}
void _sub_unwant(string what) {
- sub_have.erase(what);
+ sub_sent.erase(what);
+ sub_new.erase(what);
}
// auth tickets
*/
bool sub_want_increment(string what, version_t start, unsigned flags) {
Mutex::Locker l(monc_lock);
- map<string,ceph_mon_subscribe_item>::iterator i =
- sub_have.find(what);
- if (i == sub_have.end() || i->second.start < start) {
- ceph_mon_subscribe_item& item = sub_have[what];
+ map<string,ceph_mon_subscribe_item>::iterator i = sub_new.find(what);
+ if (i != sub_new.end()) {
+ if (i->second.start >= start)
+ return false;
+ i->second.start = start;
+ i->second.flags = flags;
+ return true;
+ }
+
+ i = sub_sent.find(what);
+ if (i == sub_sent.end() || i->second.start < start) {
+ ceph_mon_subscribe_item& item = sub_new[what];
item.start = start;
item.flags = flags;
return true;