]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mon: revamp subscribe protocol [backward compatible protocol change]
authorSage Weil <sage@newdream.net>
Mon, 26 Jul 2010 22:03:11 +0000 (15:03 -0700)
committerSage Weil <sage@newdream.net>
Mon, 26 Jul 2010 22:03:11 +0000 (15:03 -0700)
Before, we would provide "have" and a bool "onetime" flag.  The struct was
also screwed up with an extra __le64.  Then have=0 was a special case
that meant "give me the latest".

The problem is this is ambiguous between the usual "give me everything
since X" and "give me your latest", because you might actually have 0 and
want 1..current.

Changes protocol and cleans up the struct:

 - now "start" and "flags", where only 1 flag (ONETIME) is defined
 - clean up sub_want_* methods throughout
 - fix all sub_want callers to ask for _start_ (not have) epoch, or 0 for
   any/latest
 - add a feature bit; talks old clients w/o that bit

Signed-off-by: Sage Weil <sage@newdream.net>
14 files changed:
src/client/Client.cc
src/include/ceph_fs.h
src/include/types.h
src/mds/MDS.cc
src/messages/MMonSubscribe.h
src/mon/MDSMonitor.cc
src/mon/MonClient.cc
src/mon/MonClient.h
src/mon/Monitor.cc
src/mon/OSDMonitor.cc
src/mon/Session.h
src/msg/SimpleMessenger.h
src/osd/OSD.cc
src/osdc/Objecter.cc

index 8319d926dcb17de3ae8d38266a65078ede1999ba..7bcab7be4b1cff95ea423ee3d247b98c33fe7277 100644 (file)
@@ -264,8 +264,8 @@ void Client::init()
 
   monclient->init();
   monclient->set_want_keys(CEPH_ENTITY_TYPE_MDS | CEPH_ENTITY_TYPE_OSD);
-  monclient->sub_want("mdsmap", mdsmap->get_epoch());
-  monclient->sub_want_onetime("osdmap", 0);
+  monclient->sub_want("mdsmap", 0, 0);
+  monclient->sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME);
 
   // do logger crap only once per process.
   static bool did_init = false;
index bf7dea673237c1c02168b833004678aaa45d663f..f8823e13386b581ace3cb3feeb0eae645fe3020e 100644 (file)
@@ -43,6 +43,7 @@
 #define CEPH_FEATURE_NOSRCADDR      (1<<1)
 #define CEPH_FEATURE_MONCLOCKCHECK  (1<<2)
 #define CEPH_FEATURE_FLOCK          (1<<3)
+#define CEPH_FEATURE_SUBSCRIBE2     (1<<4)
 
 
 /*
@@ -201,9 +202,11 @@ struct ceph_client_mount {
        struct ceph_mon_request_header monhdr;
 } __attribute__ ((packed));
 
+#define CEPH_SUBSCRIBE_ONETIME    1  /* i want only 1 update after have */
+
 struct ceph_mon_subscribe_item {
-       __le64 have_version;    __le64 have;
-       __u8 onetime;
+       __le64 start;
+       __u8 flags;
 } __attribute__ ((packed));
 
 struct ceph_mon_subscribe_ack {
index 00734438dd7ca883efec8569eeee9f39e598b767..fbed468b90f062bbe08cf43467bd0d8090faaeb0 100644 (file)
@@ -472,7 +472,8 @@ inline ostream& operator<<(ostream& out, const kb_t& kb)
 
 inline ostream& operator<<(ostream& out, const ceph_mon_subscribe_item& i)
 {
-  return out << i.have << (i.onetime ? "" : "+");
+  return out << i.start
+            << ((i.flags & CEPH_SUBSCRIBE_ONETIME) ? "" : "+");
 }
 
 #endif
index 6e4b60442c0d8b4533b5b11af8703c0ea98f513f..a2b9cb14e564fd06a7c82cde2892b15f88443a3f 100644 (file)
@@ -456,7 +456,7 @@ int MDS::init()
 
   objecter->init();
 
-  monc->sub_want("mdsmap", 0);
+  monc->sub_want("mdsmap", 0, 0);
   monc->renew_subs();
 
   // schedule tick
index a57f759af71eac697d2e55fa3f3998d6a9480250..0d94ff05c319a62f3cb37df40074d2833a4bca4f 100644 (file)
 
 #include "msg/Message.h"
 
+/*
+ * compatibility with old crap
+ */
+struct ceph_mon_subscribe_item_old {
+       __le64 unused;
+       __le64 have;
+       __u8 onetime;
+} __attribute__ ((packed));
+WRITE_RAW_ENCODER(ceph_mon_subscribe_item_old)
+
+
 struct MMonSubscribe : public Message {
   map<string, ceph_mon_subscribe_item> what;
   
@@ -25,13 +36,9 @@ private:
   ~MMonSubscribe() {}
 
 public:  
-  void sub_onetime(const char *w, version_t have) {
-    what[w].onetime = true;
-    what[w].have = have;
-  }
-  void sub_persistent(const char *w, version_t have) {
-    what[w].onetime = false;
-    what[w].have = have;
+  void sub_want(const char *w, version_t start, unsigned flags) {
+    what[w].start = start;
+    what[w].flags = flags;
   }
 
   const char *get_type_name() { return "mon_subscribe"; }
@@ -41,10 +48,43 @@ public:
 
   void decode_payload() {
     bufferlist::iterator p = payload.begin();
-    ::decode(what, p);
+    if (header.version < 2) {
+      map<string, ceph_mon_subscribe_item_old> oldwhat;
+      ::decode(oldwhat, p);
+      what.clear();
+      for (map<string, ceph_mon_subscribe_item_old>::iterator q = oldwhat.begin();
+          q != oldwhat.end();
+          q++) {
+       if (q->second.have)
+         what[q->first].start = q->second.have + 1;
+       else
+         what[q->first].start = 0;
+       what[q->first].flags = 0;
+       if (q->second.onetime)
+         what[q->first].flags |= CEPH_SUBSCRIBE_ONETIME;
+      }
+    } else {
+      ::decode(what, p);
+    }
   }
   void encode_payload() {
-    ::encode(what, payload);
+    if (get_connection()->has_feature(CEPH_FEATURE_SUBSCRIBE2)) {
+      header.version = 2;
+      ::encode(what, payload);
+    } else {
+      map<string, ceph_mon_subscribe_item_old> oldwhat;
+      for (map<string, ceph_mon_subscribe_item>::iterator q = what.begin();
+          q != what.end();
+          q++) {
+       if (q->second.start)
+         // warning: start=1 -> have=0, which was ambiguous
+         oldwhat[q->first].have = q->second.start - 1;
+       else
+         oldwhat[q->first].have = 0;
+       oldwhat[q->first].onetime = q->second.flags & CEPH_SUBSCRIBE_ONETIME;
+      }
+      ::encode(oldwhat, payload);
+    }
   }
 };
 
index 0e7bd2a71054f99c5d3ecf226730fba338a2c597..814c9e1976e69c176376db96cf4efe3b56a45b6c 100644 (file)
@@ -599,13 +599,13 @@ void MDSMonitor::check_subs()
 
 void MDSMonitor::check_sub(Subscription *sub)
 {
-  if (sub->last < mdsmap.get_epoch()) {
+  if (sub->next <= mdsmap.get_epoch()) {
     mon->messenger->send_message(new MMDSMap(mon->monmap->fsid, &mdsmap),
                                 sub->session->inst);
     if (sub->onetime)
       mon->session_map.remove_sub(sub);
     else
-      sub->last = mdsmap.get_epoch();
+      sub->next = mdsmap.get_epoch() + 1;
   }
 }
 
index 8ba8b0bb311b6d85928f0d522eec68d8ef18f5f7..91d33d4aefffdd3cd98de064987cfeca8f9d1a7c 100644 (file)
@@ -115,7 +115,7 @@ int MonClient::get_monmap()
   dout(10) << "get_monmap" << dendl;
   Mutex::Locker l(monc_lock);
   
-  _sub_want("monmap", monmap.get_epoch());
+  _sub_want("monmap", 0, 0);
   if (cur_mon < 0)
     _reopen_session();
 
@@ -266,7 +266,7 @@ int MonClient::authenticate(double timeout)
     return 0;
   }
 
-  _sub_want("monmap", monmap.get_epoch());
+  _sub_want("monmap", monmap.get_epoch() ? monmap.get_epoch() + 1 : 0, 0);
   if (cur_mon < 0)
     _reopen_session();
 
index b191e83be86cdd6e790d76b5edcd543c28356278..336396d218c0685b673c5ddf29418f56143260ff 100644 (file)
@@ -121,25 +121,21 @@ private:
   void _renew_subs();
   void handle_subscribe_ack(MMonSubscribeAck* m);
 
-  void _sub_want(string what, version_t have) {
-    sub_have[what].have = have;
-    sub_have[what].onetime = false;
-  }
-  bool _sub_want_onetime(string what, version_t have) {
-    if (sub_have.count(what) == 0) {
-      sub_have[what].have = have;
-      sub_have[what].onetime = true;
-      return true;
-    } else
-      sub_have[what].have = have;
-    return false;
-  }
-  void _sub_got(string what, version_t have) {
+  bool _sub_want(string what, version_t start, unsigned flags) {
+    if (sub_have.count(what) &&
+       sub_have[what].start == start &&
+       sub_have[what].flags == flags)
+      return false;
+    sub_have[what].start = start;
+    sub_have[what].flags = flags;
+    return true;
+  }
+  void _sub_got(string what, version_t got) {
     if (sub_have.count(what)) {
-      if (sub_have[what].onetime)
+      if (sub_have[what].flags & CEPH_SUBSCRIBE_ONETIME)
        sub_have.erase(what);
       else
-       sub_have[what].have = have;
+       sub_have[what].start = got + 1;
     }
   }
 
@@ -151,13 +147,9 @@ public:
     Mutex::Locker l(monc_lock);
     _renew_subs();
   }
-  void sub_want(string what, version_t have) {
-    Mutex::Locker l(monc_lock);
-    _sub_want(what, have);
-  }
-  bool sub_want_onetime(string what, version_t have) {
+  bool sub_want(string what, version_t start, unsigned flags) {
     Mutex::Locker l(monc_lock);
-    return _sub_want_onetime(what, have);
+    return _sub_want(what, start, flags);
   }
   void sub_got(string what, version_t have) {
     Mutex::Locker l(monc_lock);
index 6a98d0208cbcd8731e4df26f3b685374ae344875..68f0079c8d25bc6f1fbbb10567c2fa45428f8678 100644 (file)
@@ -775,9 +775,13 @@ void Monitor::handle_subscribe(MMonSubscribe *m)
   for (map<string,ceph_mon_subscribe_item>::iterator p = m->what.begin();
        p != m->what.end();
        p++) {
-    if (!p->second.onetime)
+    // if there are any non-onetime subscriptions, we need to reply to start the resubscribe timer
+    if ((p->second.flags & CEPH_SUBSCRIBE_ONETIME) == 0)
       reply = true;
-    session_map.add_update_sub(s, p->first, p->second.have, p->second.onetime);
+
+    session_map.add_update_sub(s, p->first, p->second.start, 
+                              p->second.flags & CEPH_SUBSCRIBE_ONETIME);
+
     if (p->first == "mdsmap") {
       if ((int)s->caps.check_privileges(PAXOS_MDSMAP, MON_CAP_R)) {
         mdsmon()->check_sub(s->sub_map["mdsmap"]);
@@ -836,13 +840,13 @@ void Monitor::check_subs()
 
 void Monitor::check_sub(Subscription *sub)
 {
-  dout(0) << "check_sub monmap last " << sub->last << " have " << monmap->get_epoch() << dendl;
-  if (sub->last < monmap->get_epoch()) {
+  dout(0) << "check_sub monmap next " << sub->next << " have " << monmap->get_epoch() << dendl;
+  if (sub->next <= monmap->get_epoch()) {
     send_latest_monmap(sub->session->inst);
     if (sub->onetime)
       session_map.remove_sub(sub);
     else
-      sub->last = monmap->get_epoch();
+      sub->next = monmap->get_epoch() + 1;
   }
 }
 
index f3a6b7649ebd8f89cd7e5434f915b6ded8a21df3..b6938e62128096b2cbc6c13ed4d6d2d2aa2480d0 100644 (file)
@@ -816,16 +816,16 @@ void OSDMonitor::check_subs()
 
 void OSDMonitor::check_sub(Subscription *sub)
 {
-  if (sub->last < osdmap.get_epoch()) {
-    if (sub->last)
-      send_incremental(sub->last, sub->session->inst);
+  if (sub->next <= osdmap.get_epoch()) {
+    if (sub->next > 1)
+      send_incremental(sub->next - 1, sub->session->inst);
     else
       mon->messenger->send_message(new MOSDMap(mon->monmap->fsid, &osdmap),
                                   sub->session->inst);
     if (sub->onetime)
       mon->session_map.remove_sub(sub);
     else
-      sub->last = osdmap.get_epoch();
+      sub->next = osdmap.get_epoch() + 1;
   }
 }
 
index ba9c119fd1d5b72ab128d72327f5332ff45815e5..0d537d0ddd17bdc585e5e8b8b9d42c498cacdc99 100644 (file)
@@ -28,10 +28,11 @@ struct Subscription {
   MonSession *session;
   string type;
   xlist<Subscription*>::item type_item;
-  version_t last;
+  version_t next;
   bool onetime;
   
-  Subscription(MonSession *s, const string& t) : session(s), type(t), type_item(this) {};
+  Subscription(MonSession *s, const string& t) : session(s), type(t), type_item(this),
+                                                next(0), onetime(false) {};
 };
 
 struct MonSession : public RefCountedObject {
@@ -106,7 +107,7 @@ struct MonSessionMap {
   }
 
 
-  void add_update_sub(MonSession *s, const string& what, version_t have, bool onetime) {
+  void add_update_sub(MonSession *s, const string& what, version_t start, bool onetime) {
     Subscription *sub = 0;
     if (s->sub_map.count(what)) {
       sub = s->sub_map[what];
@@ -115,7 +116,7 @@ struct MonSessionMap {
       s->sub_map[what] = sub;
       subs[what].push_back(&sub->type_item);
     }
-    sub->last = have;
+    sub->next = start;
     sub->onetime = onetime;
   }
 
index 75d0b5a98b4ed5b3bb2cd2889f351df8d912ac0a..f629c9ba9502f74a80af306f597dc777d6377615 100644 (file)
@@ -52,7 +52,7 @@ using namespace __gnu_cxx;
  */
 
 // default feature(s) everyone gets
-#define MSGR_FEATURES_SUPPORTED  CEPH_FEATURE_NOSRCADDR
+#define MSGR_FEATURES_SUPPORTED  CEPH_FEATURE_NOSRCADDR|CEPH_FEATURE_SUBSCRIBE2
 
 class SimpleMessenger : public Messenger {
 public:
@@ -66,10 +66,12 @@ public:
 
     Policy() :
       lossy(false), server(false), throttler(NULL),
-      features_supported(MSGR_FEATURES_SUPPORTED), features_required(0) {}
+      features_supported(MSGR_FEATURES_SUPPORTED),
+      features_required(0) {}
     Policy(bool l, bool s, uint64_t sup, uint64_t req) :
       lossy(l), server(s), throttler(NULL),
-      features_supported(sup | MSGR_FEATURES_SUPPORTED), features_required(req) {}
+      features_supported(sup | MSGR_FEATURES_SUPPORTED),
+      features_required(req) {}
 
     static Policy stateful_server(uint64_t sup, uint64_t req) { return Policy(false, true, sup, req); }
     static Policy stateless_server(uint64_t sup, uint64_t req) { return Policy(true, true, sup, req); }
index b3c8a95e4b25157caf595c8d1ba25d860c095c24..a6602eb4fc5b1cb90216ab3bddd3f8e07414e212 100644 (file)
@@ -510,8 +510,6 @@ int OSD::init()
   monc->set_want_keys(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD);
   monc->init();
 
-  monc->sub_want("monmap", 0);
-
   osd_lock.Unlock();
 
   monc->authenticate();
@@ -1417,7 +1415,7 @@ void OSD::heartbeat()
     if (now - last_mon_heartbeat > g_conf.osd_mon_heartbeat_interval) {
       last_mon_heartbeat = now;
       dout(10) << "i have no heartbeat peers; checking mon for new map" << dendl;
-      monc->sub_want_onetime("osdmap", osdmap->get_epoch());
+      monc->sub_want("osdmap", osdmap->get_epoch() + 1, CEPH_SUBSCRIBE_ONETIME);
       monc->renew_subs();
     }
   }
@@ -2091,7 +2089,7 @@ void OSD::wait_for_new_map(Message *m)
 {
   // ask?
   if (waiting_for_osdmap.empty()) {
-    monc->sub_want_onetime("osdmap", osdmap->get_epoch());
+    monc->sub_want("osdmap", osdmap->get_epoch() + 1, CEPH_SUBSCRIBE_ONETIME);
     monc->renew_subs();
   }
   
@@ -2345,7 +2343,7 @@ void OSD::handle_osd_map(MOSDMap *m)
     }
     else {
       dout(10) << "handle_osd_map missing epoch " << cur+1 << dendl;
-      monc->sub_want_onetime("osdmap", cur);
+      monc->sub_want("osdmap", cur+1, CEPH_SUBSCRIBE_ONETIME);
       monc->renew_subs();
       break;
     }
index c60ffef176adc7fce0ae6ddbe41b092b457ce01f..eff5a0606f9b35591110e75baee7427a2c82977f 100644 (file)
@@ -143,7 +143,7 @@ void Objecter::handle_osd_map(MOSDMap *m)
        }
        else {
          dout(3) << "handle_osd_map requesting missing epoch " << osdmap->get_epoch()+1 << dendl;
-         monc->sub_want_onetime("osdmap", osdmap->get_epoch());
+         monc->sub_want("osdmap", osdmap->get_epoch() + 1, CEPH_SUBSCRIBE_ONETIME);
          monc->renew_subs();
          break;
        }
@@ -176,7 +176,7 @@ void Objecter::handle_osd_map(MOSDMap *m)
        scan_pgs(changed_pgs);
       } else {
        dout(3) << "handle_osd_map hmm, i want a full map, requesting" << dendl;
-       monc->sub_want_onetime("osdmap", 0);
+       monc->sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME);
        monc->renew_subs();
       }
     }
@@ -213,7 +213,7 @@ void Objecter::handle_osd_map(MOSDMap *m)
 void Objecter::maybe_request_map()
 {
   dout(10) << "maybe_request_map subscribing (onetime) to next osd map" << dendl;
-  if (monc->sub_want_onetime("osdmap", osdmap->get_epoch()))
+  if (monc->sub_want("osdmap", osdmap->get_epoch() ? osdmap->get_epoch()+1 : 0, CEPH_SUBSCRIBE_ONETIME))
     monc->renew_subs();
 }