]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mon/MonClient: only send new subscriptions
authorSage Weil <sage@redhat.com>
Wed, 7 Oct 2015 04:09:18 +0000 (00:09 -0400)
committerSage Weil <sage@redhat.com>
Mon, 23 Nov 2015 13:38:50 +0000 (08:38 -0500)
Instead of resending all subscriptions, only send the new ones.  This
avoids races like

 - ask for 4+
 - mon sends maps 4-50
 - ask for 4+ and something else
 - mon has to resend same maps and the other thing

Signed-off-by: Sage Weil <sage@redhat.com>
src/mon/MonClient.cc
src/mon/MonClient.h

index ede6eebae59c9fa297cb88a34195ed9bd815b88b..718355142e6f75842dcc7169a6c5f8a76241e839 100644 (file)
@@ -659,7 +659,13 @@ void MonClient::_reopen_session(int rank, string name)
   ::encode(global_id, m->auth_payload);
   _send_mon_message(m, true);
 
-  if (!sub_have.empty())
+  for (map<string,ceph_mon_subscribe_item>::iterator p = sub_sent.begin();
+       p != sub_sent.end();
+       ++p) {
+    if (sub_new.count(p->first) == 0)
+      sub_new[p->first] = p->second;
+  }
+  if (!sub_new.empty())
     _renew_subs();
 }
 
@@ -753,7 +759,7 @@ void MonClient::schedule_tick()
 void MonClient::_renew_subs()
 {
   assert(monc_lock.is_locked());
-  if (sub_have.empty()) {
+  if (sub_new.empty()) {
     ldout(cct, 10) << "renew_subs - empty" << dendl;
     return;
   }
@@ -766,8 +772,11 @@ void MonClient::_renew_subs()
       sub_renew_sent = ceph_clock_now(cct);
 
     MMonSubscribe *m = new MMonSubscribe;
-    m->what = sub_have;
+    m->what = sub_new;
     _send_mon_message(m);
+
+    sub_sent.insert(sub_new.begin(), sub_new.end());
+    sub_new.clear();
   }
 }
 
index a9761d1df4b64e345a09925b58929d8c79936380..70b84b3537d60209332d624c6b56e40a7ac4f0ca 100644 (file)
@@ -209,31 +209,42 @@ public:
 
   // mon subscriptions
 private:
-  map<string,ceph_mon_subscribe_item> sub_have;  // my subs, and current versions
+  map<string,ceph_mon_subscribe_item> sub_sent; // my subs, and current versions
+  map<string,ceph_mon_subscribe_item> sub_new;  // unsent new subs
   utime_t sub_renew_sent, sub_renew_after;
 
   void _renew_subs();
   void handle_subscribe_ack(MMonSubscribeAck* m);
 
   bool _sub_want(string what, version_t start, unsigned flags) {
-    if (sub_have.count(what) &&
-       sub_have[what].start == start &&
-       sub_have[what].flags == flags)
+    if ((sub_new.count(what) == 0 &&
+        sub_sent.count(what) &&
+        sub_sent[what].start == start &&
+        sub_sent[what].flags == flags) ||
+       (sub_new.count(what) &&
+        sub_new[what].start == start &&
+        sub_new[what].flags == flags))
       return false;
-    sub_have[what].start = start;
-    sub_have[what].flags = flags;
+    sub_new[what].start = start;
+    sub_new[what].flags = flags;
     return true;
   }
   void _sub_got(string what, version_t got) {
-    if (sub_have.count(what)) {
-      if (sub_have[what].flags & CEPH_SUBSCRIBE_ONETIME)
-       sub_have.erase(what);
+    if (sub_new.count(what)) {
+      if (sub_new[what].flags & CEPH_SUBSCRIBE_ONETIME)
+       sub_new.erase(what);
       else
-       sub_have[what].start = got + 1;
+       sub_new[what].start = got + 1;
+    } else if (sub_sent.count(what)) {
+      if (sub_sent[what].flags & CEPH_SUBSCRIBE_ONETIME)
+       sub_sent.erase(what);
+      else
+       sub_sent[what].start = got + 1;
     }
   }
   void _sub_unwant(string what) {
-    sub_have.erase(what);
+    sub_sent.erase(what);
+    sub_new.erase(what);
   }
 
   // auth tickets
@@ -262,10 +273,18 @@ public:
    */
   bool sub_want_increment(string what, version_t start, unsigned flags) {
     Mutex::Locker l(monc_lock);
-    map<string,ceph_mon_subscribe_item>::iterator i =
-            sub_have.find(what);
-    if (i == sub_have.end() || i->second.start < start) {
-      ceph_mon_subscribe_item& item = sub_have[what];
+    map<string,ceph_mon_subscribe_item>::iterator i = sub_new.find(what);
+    if (i != sub_new.end()) {
+      if (i->second.start >= start)
+       return false;
+      i->second.start = start;
+      i->second.flags = flags;
+      return true;
+    }
+
+    i = sub_sent.find(what);
+    if (i == sub_sent.end() || i->second.start < start) {
+      ceph_mon_subscribe_item& item = sub_new[what];
       item.start = start;
       item.flags = flags;
       return true;