--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2022 Prashant D <pdhange@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ */
+
+
+#ifndef CEPH_MMGRUPDATE_H_
+#define CEPH_MMGRUPDATE_H_
+
+#include "msg/Message.h"
+
+class MMgrUpdate : public Message {
+private:
+ static constexpr int HEAD_VERSION = 3;
+ static constexpr int COMPAT_VERSION = 1;
+
+public:
+
+ std::string daemon_name;
+ std::string service_name; // optional; otherwise infer from entity type
+
+ bool service_daemon = false;
+ std::map<std::string,std::string> daemon_metadata;
+ std::map<std::string,std::string> daemon_status;
+
+ bool need_metadata_update = false;
+
+ void decode_payload() override
+ {
+ using ceph::decode;
+ auto p = payload.cbegin();
+ decode(daemon_name, p);
+ if (header.version >= 2) {
+ decode(service_name, p);
+ decode(service_daemon, p);
+ if (service_daemon) {
+ decode(daemon_metadata, p);
+ decode(daemon_status, p);
+ }
+ }
+ if (header.version >= 3) {
+ decode(need_metadata_update, p);
+ }
+ }
+
+ void encode_payload(uint64_t features) override {
+ using ceph::encode;
+ encode(daemon_name, payload);
+ encode(service_name, payload);
+ encode(service_daemon, payload);
+ if (service_daemon) {
+ encode(daemon_metadata, payload);
+ encode(daemon_status, payload);
+ }
+ encode(need_metadata_update, payload);
+ }
+
+ std::string_view get_type_name() const override { return "mgrupdate"; }
+ void print(std::ostream& out) const override {
+ out << get_type_name() << "(";
+ if (service_name.length()) {
+ out << service_name;
+ } else {
+ out << ceph_entity_type_name(get_source().type());
+ }
+ out << "." << daemon_name;
+ if (service_daemon) {
+ out << " daemon";
+ }
+ out << ")";
+ }
+
+private:
+ MMgrUpdate()
+ : Message{MSG_MGR_UPDATE, HEAD_VERSION, COMPAT_VERSION}
+ {}
+ using RefCountedObject::put;
+ using RefCountedObject::get;
+ template<class T, typename... Args>
+ friend boost::intrusive_ptr<T> ceph::make_message(Args&&... args);
+};
+
+#endif
+
#include "mon/MonCommand.h"
#include "messages/MMgrOpen.h"
+#include "messages/MMgrUpdate.h"
#include "messages/MMgrClose.h"
#include "messages/MMgrConfigure.h"
#include "messages/MMonMgrReport.h"
return handle_report(ref_cast<MMgrReport>(m));
case MSG_MGR_OPEN:
return handle_open(ref_cast<MMgrOpen>(m));
+ case MSG_MGR_UPDATE:
+ return handle_update(ref_cast<MMgrUpdate>(m));
case MSG_MGR_CLOSE:
return handle_close(ref_cast<MMgrClose>(m));
case MSG_COMMAND:
return true;
}
+bool DaemonServer::handle_update(const ref_t<MMgrUpdate>& m)
+{
+ DaemonKey key;
+ if (!m->service_name.empty()) {
+ key.type = m->service_name;
+ } else {
+ key.type = ceph_entity_type_name(m->get_connection()->get_peer_type());
+ }
+ key.name = m->daemon_name;
+
+ dout(10) << "from " << m->get_connection() << " " << key << dendl;
+
+ if (m->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT &&
+ m->service_name.empty()) {
+ // Clients should not be sending us update request
+ dout(10) << "rejecting update request from non-daemon client " << m->daemon_name
+ << dendl;
+ clog->warn() << "rejecting report from non-daemon client " << m->daemon_name
+ << " at " << m->get_connection()->get_peer_addrs();
+ m->get_connection()->mark_down();
+ return true;
+ }
+
+
+ {
+ std::unique_lock locker(lock);
+
+ DaemonStatePtr daemon;
+ // Look up the DaemonState
+ if (daemon_state.exists(key)) {
+ dout(20) << "updating existing DaemonState for " << key << dendl;
+
+ daemon = daemon_state.get(key);
+ if (m->need_metadata_update == true &&
+ !m->daemon_metadata.empty()) {
+ daemon_state.update_metadata(daemon, m->daemon_metadata);
+ }
+ }
+ }
+
+ return true;
+}
+
bool DaemonServer::handle_close(const ref_t<MMgrClose>& m)
{
std::lock_guard l(lock);
class MMgrReport;
class MMgrOpen;
+class MMgrUpdate;
class MMgrClose;
class MMonMgrReport;
class MCommand;
void fetch_missing_metadata(const DaemonKey& key, const entity_addr_t& addr);
bool handle_open(const ceph::ref_t<MMgrOpen>& m);
+ bool handle_update(const ceph::ref_t<MMgrUpdate>& m);
bool handle_close(const ceph::ref_t<MMgrClose>& m);
bool handle_report(const ceph::ref_t<MMgrReport>& m);
bool handle_command(const ceph::ref_t<MCommand>& m);
#include "messages/MMgrMap.h"
#include "messages/MMgrReport.h"
#include "messages/MMgrOpen.h"
+#include "messages/MMgrUpdate.h"
#include "messages/MMgrClose.h"
#include "messages/MMgrConfigure.h"
#include "messages/MCommand.h"
}
}
+void MgrClient::_send_update()
+{
+ if (session && session->con) {
+ auto update = make_message<MMgrUpdate>();
+ if (!service_name.empty()) {
+ update->service_name = service_name;
+ update->daemon_name = daemon_name;
+ } else {
+ update->daemon_name = cct->_conf->name.get_id();
+ }
+ if (service_daemon) {
+ update->service_daemon = service_daemon;
+ update->daemon_metadata = daemon_metadata;
+ }
+ update->need_metadata_update = need_metadata_update;
+ session->con->send_message2(update);
+ }
+}
+
bool MgrClient::handle_mgr_map(ref_t<MMgrMap> m)
{
ceph_assert(ceph_mutex_is_locked_by_me(lock));
return true;
}
+int MgrClient::update_daemon_metadata(
+ const std::string& service,
+ const std::string& name,
+ const std::map<std::string,std::string>& metadata)
+{
+ std::lock_guard l(lock);
+ if (service_daemon) {
+ return -EEXIST;
+ }
+ ldout(cct,1) << service << "." << name << " metadata " << metadata << dendl;
+ service_daemon = true;
+ service_name = service;
+ daemon_name = name;
+ daemon_metadata = metadata;
+ daemon_dirty_status = true;
+
+ if (need_metadata_update == true &&
+ !daemon_metadata.empty()) {
+ _send_update();
+ need_metadata_update = false;
+ }
+
+ return 0;
+}
+
int MgrClient::service_daemon_register(
const std::string& service,
const std::string& name,
bool service_daemon = false;
bool daemon_dirty_status = false;
bool task_dirty_status = false;
+ bool need_metadata_update = true;
std::string service_name, daemon_name;
std::map<std::string,std::string> daemon_metadata;
std::map<std::string,std::string> daemon_status;
void reconnect();
void _send_open();
+ void _send_update();
// In pre-luminous clusters, the ceph-mgr service is absent or optional,
// so we must not block in start_command waiting for it.
ceph::buffer::list *outbl, std::string *outs,
Context *onfinish);
+ int update_daemon_metadata(
+ const std::string& service,
+ const std::string& name,
+ const std::map<std::string,std::string>& metadata);
int service_daemon_register(
const std::string& service,
const std::string& name,
}
}
+void Monitor::update_pending_metadata()
+{
+ Metadata metadata;
+ collect_metadata(&metadata);
+ size_t version_size = mon_metadata[rank]["ceph_version_short"].size();
+ const std::string current_version = mon_metadata[rank]["ceph_version_short"];
+ const std::string pending_version = metadata["ceph_version_short"];
+
+ if (current_version.compare(0, version_size, pending_version) < 0) {
+ mgr_client.update_daemon_metadata("mon", name, metadata);
+ }
+}
+
void Monitor::get_cluster_status(stringstream &ss, Formatter *f,
MonSession *session)
{
const health_check_map_t& previous,
MonitorDBStore::TransactionRef t);
+ void update_pending_metadata();
+
protected:
class HealthCheckLogStatus {
apply_mon_features(mon.get_quorum_mon_features(),
mon.quorum_min_mon_release);
+
+ mon.update_pending_metadata();
}
bool MonmapMonitor::preprocess_query(MonOpRequestRef op)
#include "messages/MMgrDigest.h"
#include "messages/MMgrReport.h"
#include "messages/MMgrOpen.h"
+#include "messages/MMgrUpdate.h"
#include "messages/MMgrClose.h"
#include "messages/MMgrConfigure.h"
#include "messages/MMonMgrReport.h"
m = make_message<MMgrOpen>();
break;
+ case MSG_MGR_UPDATE:
+ m = make_message<MMgrUpdate>();
+ break;
+
case MSG_MGR_CLOSE:
m = make_message<MMgrClose>();
break;
#define MSG_MGR_COMMAND 0x709
#define MSG_MGR_COMMAND_REPLY 0x70a
+// *** ceph-mgr <-> MON daemons ***
+#define MSG_MGR_UPDATE 0x70b
+
// ======================================================
// abstract Message class
class MMgrDigest;
class MMgrMap;
class MMgrOpen;
+class MMgrUpdate;
class MMgrReport;
class MMonCommandAck;
class MMonCommand;