public:
// PGMapDigest is in data payload
list<pair<health_status_t,std::string>> health_summary, health_detail;
+ bufferlist service_map_bl; // encoded ServiceMap
MMonMgrReport()
: PaxosServiceMessage(MSG_MON_MGR_REPORT, 0, HEAD_VERSION, COMPAT_VERSION)
paxos_encode();
::encode(health_summary, payload);
::encode(health_detail, payload);
+ ::encode(service_map_bl, payload);
}
void decode_payload() override {
bufferlist::iterator p = payload.begin();
paxos_decode(p);
::decode(health_summary, p);
::decode(health_detail, p);
+ ::decode(service_map_bl, p);
}
};
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "msg/Message.h"
+#include "mgr/ServiceMap.h"
+
+class MServiceMap : public Message {
+public:
+ ServiceMap service_map;
+
+ MServiceMap() : Message(MSG_SERVICE_MAP) { }
+ explicit MServiceMap(const ServiceMap& sm)
+ : Message(MSG_SERVICE_MAP),
+ service_map(sm) {
+ }
+private:
+ ~MServiceMap() override {}
+
+public:
+ const char *get_type_name() const override { return "service_map"; }
+ void print(ostream& out) const override {
+ out << "service_map(e" << service_map.epoch << " "
+ << service_map.services.size() << " svc)";
+ }
+ void encode_payload(uint64_t features) override {
+ ::encode(service_map, payload, features);
+ }
+ void decode_payload() override {
+ bufferlist::iterator p = payload.begin();
+ ::decode(service_map, p);
+ }
+};
#include "messages/MMonMgrReport.h"
#include "messages/MStatfs.h"
#include "messages/MStatfsReply.h"
+#include "messages/MServiceMap.h"
class MgrPGStatService : public MonPGStatService {
PGMapDigest& digest;
{
dout(10) << dendl;
version = 0;
+ service_map.epoch = 1;
+ ::encode(service_map, pending_service_map_bl, CEPH_FEATURES_ALL);
}
void MgrStatMonitor::update_from_paxos(bool *need_bootstrap)
::decode(digest, p);
::decode(health_summary, p);
::decode(health_detail, p);
+ ::decode(service_map, p);
+ dout(10) << __func__ << " v" << version
+ << " service_map e" << service_map.epoch << dendl;
}
+ check_subs();
}
void MgrStatMonitor::create_pending()
pending_digest = digest;
pending_health_summary = health_summary;
pending_health_detail = health_detail;
+ pending_service_map_bl.clear();
+ ::encode(service_map, pending_service_map_bl, mon->get_quorum_con_features());
}
void MgrStatMonitor::encode_pending(MonitorDBStore::TransactionRef t)
::encode(pending_digest, bl, mon->get_quorum_con_features());
::encode(pending_health_summary, bl);
::encode(pending_health_detail, bl);
+ assert(pending_service_map_bl.length());
+ bl.append(pending_service_map_bl);
put_version(t, version, bl);
put_last_committed(t, version);
}
dout(10) << __func__ << " " << pending_digest << dendl;
pending_health_summary.swap(m->health_summary);
pending_health_detail.swap(m->health_detail);
+ if (m->service_map_bl.length()) {
+ pending_service_map_bl.swap(m->service_map_bl);
+ }
return true;
}
mon->send_reply(op, reply);
return true;
}
+
+void MgrStatMonitor::check_sub(Subscription *sub)
+{
+ const auto epoch = mon->monmap->get_epoch();
+ dout(10) << __func__
+ << " next " << sub->next
+ << " have " << epoch << dendl;
+ if (sub->next <= service_map.epoch) {
+ auto m = new MServiceMap(service_map);
+ sub->session->con->send_message(m);
+ if (sub->onetime) {
+ mon->with_session_map([this, sub](MonSessionMap& session_map) {
+ session_map.remove_sub(sub);
+ });
+ } else {
+ sub->next = epoch + 1;
+ }
+ }
+}
+
+void MgrStatMonitor::check_subs()
+{
+ dout(10) << __func__ << dendl;
+ if (!service_map.epoch) {
+ return;
+ }
+ auto subs = mon->session_map.subs.find("servicemap");
+ if (subs == mon->session_map.subs.end()) {
+ return;
+ }
+ auto p = subs->second->begin();
+ while (!p.end()) {
+ auto sub = *p;
+ ++p;
+ check_sub(sub);
+ }
+}
#include "include/Context.h"
#include "PaxosService.h"
#include "mon/PGMap.h"
+#include "mgr/ServiceMap.h"
class MonPGStatService;
class MgrPGStatService;
PGMapDigest digest;
list<pair<health_status_t,string>> health_summary;
list<pair<health_status_t,string>> health_detail;
+ ServiceMap service_map;
// pending commit
PGMapDigest pending_digest;
list<pair<health_status_t,string>> pending_health_summary;
list<pair<health_status_t,string>> pending_health_detail;
+ bufferlist pending_service_map_bl;
std::unique_ptr<MgrPGStatService> pgservice;
void print_summary(Formatter *f, std::ostream *ss) const;
MonPGStatService *get_pg_stat_service();
+ const ServiceMap& get_service_map() {
+ return service_map;
+ }
friend class C_Updated;
};
logmon()->check_sub(s->sub_map[p->first]);
} else if (p->first == "mgrmap" || p->first == "mgrdigest") {
mgrmon()->check_sub(s->sub_map[p->first]);
+ } else if (p->first == "servicemap") {
+ mgrstatmon()->check_sub(s->sub_map[p->first]);
}
}
#include "messages/MMgrOpen.h"
#include "messages/MMgrConfigure.h"
#include "messages/MMonMgrReport.h"
+#include "messages/MServiceMap.h"
#include "messages/MLock.h"
m = new MMonMgrReport();
break;
+ case MSG_SERVICE_MAP:
+ m = new MServiceMap();
+ break;
+
case MSG_MGR_MAP:
m = new MMgrMap();
break;
#define MSG_MGR_DIGEST 0x705
// *** cephmgr -> ceph-mon
#define MSG_MON_MGR_REPORT 0x706
+#define MSG_SERVICE_MAP 0x707
// ======================================================