From 997d67e5b1ce6df1b467a9c8b284f1289fbee22a Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 26 Jul 2010 15:03:11 -0700 Subject: [PATCH] mon: revamp subscribe protocol [backward compatible protocol change] Before, we would provide "have" and a bool "onetime" flag. The struct was also screwed up with an extra __le64. Then have=0 was a special case that meant "give me the latest". The problem is this is ambiguous between the usual "give me everything since X" and "give me your latest", because you might actually have 0 and want 1..current. Changes protocol and cleans up the struct: - now "start" and "flags", where only 1 flag (ONETIME) is defined - clean up sub_want_* methods throughout - fix all sub_want callers to ask for _start_ (not have) epoch, or 0 for any/latest - add a feature bit; talks old clients w/o that bit Signed-off-by: Sage Weil --- src/client/Client.cc | 4 +-- src/include/ceph_fs.h | 7 +++-- src/include/types.h | 3 +- src/mds/MDS.cc | 2 +- src/messages/MMonSubscribe.h | 58 ++++++++++++++++++++++++++++++------ src/mon/MDSMonitor.cc | 4 +-- src/mon/MonClient.cc | 4 +-- src/mon/MonClient.h | 36 +++++++++------------- src/mon/Monitor.cc | 14 +++++---- src/mon/OSDMonitor.cc | 8 ++--- src/mon/Session.h | 9 +++--- src/msg/SimpleMessenger.h | 8 +++-- src/osd/OSD.cc | 8 ++--- src/osdc/Objecter.cc | 6 ++-- 14 files changed, 106 insertions(+), 65 deletions(-) diff --git a/src/client/Client.cc b/src/client/Client.cc index 8319d926dcb17..7bcab7be4b1cf 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -264,8 +264,8 @@ void Client::init() monclient->init(); monclient->set_want_keys(CEPH_ENTITY_TYPE_MDS | CEPH_ENTITY_TYPE_OSD); - monclient->sub_want("mdsmap", mdsmap->get_epoch()); - monclient->sub_want_onetime("osdmap", 0); + monclient->sub_want("mdsmap", 0, 0); + monclient->sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME); // do logger crap only once per process. static bool did_init = false; diff --git a/src/include/ceph_fs.h b/src/include/ceph_fs.h index bf7dea673237c..f8823e13386b5 100644 --- a/src/include/ceph_fs.h +++ b/src/include/ceph_fs.h @@ -43,6 +43,7 @@ #define CEPH_FEATURE_NOSRCADDR (1<<1) #define CEPH_FEATURE_MONCLOCKCHECK (1<<2) #define CEPH_FEATURE_FLOCK (1<<3) +#define CEPH_FEATURE_SUBSCRIBE2 (1<<4) /* @@ -201,9 +202,11 @@ struct ceph_client_mount { struct ceph_mon_request_header monhdr; } __attribute__ ((packed)); +#define CEPH_SUBSCRIBE_ONETIME 1 /* i want only 1 update after have */ + struct ceph_mon_subscribe_item { - __le64 have_version; __le64 have; - __u8 onetime; + __le64 start; + __u8 flags; } __attribute__ ((packed)); struct ceph_mon_subscribe_ack { diff --git a/src/include/types.h b/src/include/types.h index 00734438dd7ca..fbed468b90f06 100644 --- a/src/include/types.h +++ b/src/include/types.h @@ -472,7 +472,8 @@ inline ostream& operator<<(ostream& out, const kb_t& kb) inline ostream& operator<<(ostream& out, const ceph_mon_subscribe_item& i) { - return out << i.have << (i.onetime ? "" : "+"); + return out << i.start + << ((i.flags & CEPH_SUBSCRIBE_ONETIME) ? "" : "+"); } #endif diff --git a/src/mds/MDS.cc b/src/mds/MDS.cc index 6e4b60442c0d8..a2b9cb14e564f 100644 --- a/src/mds/MDS.cc +++ b/src/mds/MDS.cc @@ -456,7 +456,7 @@ int MDS::init() objecter->init(); - monc->sub_want("mdsmap", 0); + monc->sub_want("mdsmap", 0, 0); monc->renew_subs(); // schedule tick diff --git a/src/messages/MMonSubscribe.h b/src/messages/MMonSubscribe.h index a57f759af71ea..0d94ff05c319a 100644 --- a/src/messages/MMonSubscribe.h +++ b/src/messages/MMonSubscribe.h @@ -17,6 +17,17 @@ #include "msg/Message.h" +/* + * compatibility with old crap + */ +struct ceph_mon_subscribe_item_old { + __le64 unused; + __le64 have; + __u8 onetime; +} __attribute__ ((packed)); +WRITE_RAW_ENCODER(ceph_mon_subscribe_item_old) + + struct MMonSubscribe : public Message { map what; @@ -25,13 +36,9 @@ private: ~MMonSubscribe() {} public: - void sub_onetime(const char *w, version_t have) { - what[w].onetime = true; - what[w].have = have; - } - void sub_persistent(const char *w, version_t have) { - what[w].onetime = false; - what[w].have = have; + void sub_want(const char *w, version_t start, unsigned flags) { + what[w].start = start; + what[w].flags = flags; } const char *get_type_name() { return "mon_subscribe"; } @@ -41,10 +48,43 @@ public: void decode_payload() { bufferlist::iterator p = payload.begin(); - ::decode(what, p); + if (header.version < 2) { + map oldwhat; + ::decode(oldwhat, p); + what.clear(); + for (map::iterator q = oldwhat.begin(); + q != oldwhat.end(); + q++) { + if (q->second.have) + what[q->first].start = q->second.have + 1; + else + what[q->first].start = 0; + what[q->first].flags = 0; + if (q->second.onetime) + what[q->first].flags |= CEPH_SUBSCRIBE_ONETIME; + } + } else { + ::decode(what, p); + } } void encode_payload() { - ::encode(what, payload); + if (get_connection()->has_feature(CEPH_FEATURE_SUBSCRIBE2)) { + header.version = 2; + ::encode(what, payload); + } else { + map oldwhat; + for (map::iterator q = what.begin(); + q != what.end(); + q++) { + if (q->second.start) + // warning: start=1 -> have=0, which was ambiguous + oldwhat[q->first].have = q->second.start - 1; + else + oldwhat[q->first].have = 0; + oldwhat[q->first].onetime = q->second.flags & CEPH_SUBSCRIBE_ONETIME; + } + ::encode(oldwhat, payload); + } } }; diff --git a/src/mon/MDSMonitor.cc b/src/mon/MDSMonitor.cc index 0e7bd2a71054f..814c9e1976e69 100644 --- a/src/mon/MDSMonitor.cc +++ b/src/mon/MDSMonitor.cc @@ -599,13 +599,13 @@ void MDSMonitor::check_subs() void MDSMonitor::check_sub(Subscription *sub) { - if (sub->last < mdsmap.get_epoch()) { + if (sub->next <= mdsmap.get_epoch()) { mon->messenger->send_message(new MMDSMap(mon->monmap->fsid, &mdsmap), sub->session->inst); if (sub->onetime) mon->session_map.remove_sub(sub); else - sub->last = mdsmap.get_epoch(); + sub->next = mdsmap.get_epoch() + 1; } } diff --git a/src/mon/MonClient.cc b/src/mon/MonClient.cc index 8ba8b0bb311b6..91d33d4aefffd 100644 --- a/src/mon/MonClient.cc +++ b/src/mon/MonClient.cc @@ -115,7 +115,7 @@ int MonClient::get_monmap() dout(10) << "get_monmap" << dendl; Mutex::Locker l(monc_lock); - _sub_want("monmap", monmap.get_epoch()); + _sub_want("monmap", 0, 0); if (cur_mon < 0) _reopen_session(); @@ -266,7 +266,7 @@ int MonClient::authenticate(double timeout) return 0; } - _sub_want("monmap", monmap.get_epoch()); + _sub_want("monmap", monmap.get_epoch() ? monmap.get_epoch() + 1 : 0, 0); if (cur_mon < 0) _reopen_session(); diff --git a/src/mon/MonClient.h b/src/mon/MonClient.h index b191e83be86cd..336396d218c06 100644 --- a/src/mon/MonClient.h +++ b/src/mon/MonClient.h @@ -121,25 +121,21 @@ private: void _renew_subs(); void handle_subscribe_ack(MMonSubscribeAck* m); - void _sub_want(string what, version_t have) { - sub_have[what].have = have; - sub_have[what].onetime = false; - } - bool _sub_want_onetime(string what, version_t have) { - if (sub_have.count(what) == 0) { - sub_have[what].have = have; - sub_have[what].onetime = true; - return true; - } else - sub_have[what].have = have; - return false; - } - void _sub_got(string what, version_t have) { + bool _sub_want(string what, version_t start, unsigned flags) { + if (sub_have.count(what) && + sub_have[what].start == start && + sub_have[what].flags == flags) + return false; + sub_have[what].start = start; + sub_have[what].flags = flags; + return true; + } + void _sub_got(string what, version_t got) { if (sub_have.count(what)) { - if (sub_have[what].onetime) + if (sub_have[what].flags & CEPH_SUBSCRIBE_ONETIME) sub_have.erase(what); else - sub_have[what].have = have; + sub_have[what].start = got + 1; } } @@ -151,13 +147,9 @@ public: Mutex::Locker l(monc_lock); _renew_subs(); } - void sub_want(string what, version_t have) { - Mutex::Locker l(monc_lock); - _sub_want(what, have); - } - bool sub_want_onetime(string what, version_t have) { + bool sub_want(string what, version_t start, unsigned flags) { Mutex::Locker l(monc_lock); - return _sub_want_onetime(what, have); + return _sub_want(what, start, flags); } void sub_got(string what, version_t have) { Mutex::Locker l(monc_lock); diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index 6a98d0208cbcd..68f0079c8d25b 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -775,9 +775,13 @@ void Monitor::handle_subscribe(MMonSubscribe *m) for (map::iterator p = m->what.begin(); p != m->what.end(); p++) { - if (!p->second.onetime) + // if there are any non-onetime subscriptions, we need to reply to start the resubscribe timer + if ((p->second.flags & CEPH_SUBSCRIBE_ONETIME) == 0) reply = true; - session_map.add_update_sub(s, p->first, p->second.have, p->second.onetime); + + session_map.add_update_sub(s, p->first, p->second.start, + p->second.flags & CEPH_SUBSCRIBE_ONETIME); + if (p->first == "mdsmap") { if ((int)s->caps.check_privileges(PAXOS_MDSMAP, MON_CAP_R)) { mdsmon()->check_sub(s->sub_map["mdsmap"]); @@ -836,13 +840,13 @@ void Monitor::check_subs() void Monitor::check_sub(Subscription *sub) { - dout(0) << "check_sub monmap last " << sub->last << " have " << monmap->get_epoch() << dendl; - if (sub->last < monmap->get_epoch()) { + dout(0) << "check_sub monmap next " << sub->next << " have " << monmap->get_epoch() << dendl; + if (sub->next <= monmap->get_epoch()) { send_latest_monmap(sub->session->inst); if (sub->onetime) session_map.remove_sub(sub); else - sub->last = monmap->get_epoch(); + sub->next = monmap->get_epoch() + 1; } } diff --git a/src/mon/OSDMonitor.cc b/src/mon/OSDMonitor.cc index f3a6b7649ebd8..b6938e6212809 100644 --- a/src/mon/OSDMonitor.cc +++ b/src/mon/OSDMonitor.cc @@ -816,16 +816,16 @@ void OSDMonitor::check_subs() void OSDMonitor::check_sub(Subscription *sub) { - if (sub->last < osdmap.get_epoch()) { - if (sub->last) - send_incremental(sub->last, sub->session->inst); + if (sub->next <= osdmap.get_epoch()) { + if (sub->next > 1) + send_incremental(sub->next - 1, sub->session->inst); else mon->messenger->send_message(new MOSDMap(mon->monmap->fsid, &osdmap), sub->session->inst); if (sub->onetime) mon->session_map.remove_sub(sub); else - sub->last = osdmap.get_epoch(); + sub->next = osdmap.get_epoch() + 1; } } diff --git a/src/mon/Session.h b/src/mon/Session.h index ba9c119fd1d5b..0d537d0ddd17b 100644 --- a/src/mon/Session.h +++ b/src/mon/Session.h @@ -28,10 +28,11 @@ struct Subscription { MonSession *session; string type; xlist::item type_item; - version_t last; + version_t next; bool onetime; - Subscription(MonSession *s, const string& t) : session(s), type(t), type_item(this) {}; + Subscription(MonSession *s, const string& t) : session(s), type(t), type_item(this), + next(0), onetime(false) {}; }; struct MonSession : public RefCountedObject { @@ -106,7 +107,7 @@ struct MonSessionMap { } - void add_update_sub(MonSession *s, const string& what, version_t have, bool onetime) { + void add_update_sub(MonSession *s, const string& what, version_t start, bool onetime) { Subscription *sub = 0; if (s->sub_map.count(what)) { sub = s->sub_map[what]; @@ -115,7 +116,7 @@ struct MonSessionMap { s->sub_map[what] = sub; subs[what].push_back(&sub->type_item); } - sub->last = have; + sub->next = start; sub->onetime = onetime; } diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index 75d0b5a98b4ed..f629c9ba9502f 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -52,7 +52,7 @@ using namespace __gnu_cxx; */ // default feature(s) everyone gets -#define MSGR_FEATURES_SUPPORTED CEPH_FEATURE_NOSRCADDR +#define MSGR_FEATURES_SUPPORTED CEPH_FEATURE_NOSRCADDR|CEPH_FEATURE_SUBSCRIBE2 class SimpleMessenger : public Messenger { public: @@ -66,10 +66,12 @@ public: Policy() : lossy(false), server(false), throttler(NULL), - features_supported(MSGR_FEATURES_SUPPORTED), features_required(0) {} + features_supported(MSGR_FEATURES_SUPPORTED), + features_required(0) {} Policy(bool l, bool s, uint64_t sup, uint64_t req) : lossy(l), server(s), throttler(NULL), - features_supported(sup | MSGR_FEATURES_SUPPORTED), features_required(req) {} + features_supported(sup | MSGR_FEATURES_SUPPORTED), + features_required(req) {} static Policy stateful_server(uint64_t sup, uint64_t req) { return Policy(false, true, sup, req); } static Policy stateless_server(uint64_t sup, uint64_t req) { return Policy(true, true, sup, req); } diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index b3c8a95e4b251..a6602eb4fc5b1 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -510,8 +510,6 @@ int OSD::init() monc->set_want_keys(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD); monc->init(); - monc->sub_want("monmap", 0); - osd_lock.Unlock(); monc->authenticate(); @@ -1417,7 +1415,7 @@ void OSD::heartbeat() if (now - last_mon_heartbeat > g_conf.osd_mon_heartbeat_interval) { last_mon_heartbeat = now; dout(10) << "i have no heartbeat peers; checking mon for new map" << dendl; - monc->sub_want_onetime("osdmap", osdmap->get_epoch()); + monc->sub_want("osdmap", osdmap->get_epoch() + 1, CEPH_SUBSCRIBE_ONETIME); monc->renew_subs(); } } @@ -2091,7 +2089,7 @@ void OSD::wait_for_new_map(Message *m) { // ask? if (waiting_for_osdmap.empty()) { - monc->sub_want_onetime("osdmap", osdmap->get_epoch()); + monc->sub_want("osdmap", osdmap->get_epoch() + 1, CEPH_SUBSCRIBE_ONETIME); monc->renew_subs(); } @@ -2345,7 +2343,7 @@ void OSD::handle_osd_map(MOSDMap *m) } else { dout(10) << "handle_osd_map missing epoch " << cur+1 << dendl; - monc->sub_want_onetime("osdmap", cur); + monc->sub_want("osdmap", cur+1, CEPH_SUBSCRIBE_ONETIME); monc->renew_subs(); break; } diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index c60ffef176adc..eff5a0606f9b3 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -143,7 +143,7 @@ void Objecter::handle_osd_map(MOSDMap *m) } else { dout(3) << "handle_osd_map requesting missing epoch " << osdmap->get_epoch()+1 << dendl; - monc->sub_want_onetime("osdmap", osdmap->get_epoch()); + monc->sub_want("osdmap", osdmap->get_epoch() + 1, CEPH_SUBSCRIBE_ONETIME); monc->renew_subs(); break; } @@ -176,7 +176,7 @@ void Objecter::handle_osd_map(MOSDMap *m) scan_pgs(changed_pgs); } else { dout(3) << "handle_osd_map hmm, i want a full map, requesting" << dendl; - monc->sub_want_onetime("osdmap", 0); + monc->sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME); monc->renew_subs(); } } @@ -213,7 +213,7 @@ void Objecter::handle_osd_map(MOSDMap *m) void Objecter::maybe_request_map() { dout(10) << "maybe_request_map subscribing (onetime) to next osd map" << dendl; - if (monc->sub_want_onetime("osdmap", osdmap->get_epoch())) + if (monc->sub_want("osdmap", osdmap->get_epoch() ? osdmap->get_epoch()+1 : 0, CEPH_SUBSCRIBE_ONETIME)) monc->renew_subs(); } -- 2.39.5