}
}
+void ActivePyModules::reregister_mds_perf_queries()
+{
+ server.reregister_mds_perf_queries();
+}
+
PyObject *ActivePyModules::get_mds_perf_counters(MetricQueryID query_id)
{
MDSPerfCollector collector(query_id);
f.close_section(); // i
}
f.close_section(); // counters
+
+ f.open_array_section("last_updated");
+ f.dump_float("last_updated_mono", collector.last_updated_mono);
+ f.close_section(); // last_updated
+
f.close_section(); // metrics
return f.get();
const MDSPerfMetricQuery &query,
const std::optional<MDSPerfMetricLimit> &limit);
void remove_mds_perf_query(MetricQueryID query_id);
+ void reregister_mds_perf_queries();
PyObject *get_mds_perf_counters(MetricQueryID query_id);
bool get_store(const std::string &module_name,
Py_RETURN_NONE;
}
+static PyObject*
+ceph_reregister_mds_perf_queries(BaseMgrModule *self, PyObject *args)
+{
+ self->py_modules->reregister_mds_perf_queries();
+ Py_RETURN_NONE;
+}
+
static PyObject*
ceph_get_mds_perf_counters(BaseMgrModule *self, PyObject *args)
{
{"_ceph_remove_mds_perf_query", (PyCFunction)ceph_remove_mds_perf_query,
METH_VARARGS, "Remove an mds perf query"},
+ {"_ceph_reregister_mds_perf_queries", (PyCFunction)ceph_reregister_mds_perf_queries,
+ METH_NOARGS, "Re-register mds perf queries"},
+
{"_ceph_get_mds_perf_counters", (PyCFunction)ceph_get_mds_perf_counters,
METH_VARARGS, "Get mds perf counters"},
return mds_perf_metric_collector.remove_query(query_id);
}
+void DaemonServer::reregister_mds_perf_queries()
+{
+ mds_perf_metric_collector.reregister_queries();
+}
+
int DaemonServer::get_mds_perf_counters(MDSPerfCollector *collector)
{
return mds_perf_metric_collector.get_counters(collector);
MetricQueryID add_mds_perf_query(const MDSPerfMetricQuery &query,
const std::optional<MDSPerfMetricLimit> &limit);
int remove_mds_perf_query(MetricQueryID query_id);
+ void reregister_mds_perf_queries();
int get_mds_perf_counters(MDSPerfCollector *collector);
virtual const char** get_tracked_conf_keys() const override;
// update delayed rank set
delayed_ranks = metric_report.rank_metrics_delayed;
dout(20) << ": delayed ranks=[" << delayed_ranks << "]" << dendl;
+
+ clock_gettime(CLOCK_MONOTONIC_COARSE, &last_updated_mono);
}
int MDSPerfMetricCollector::get_counters(PerfCollector *collector) {
get_delayed_ranks(&c->delayed_ranks);
+ get_last_updated(&c->last_updated_mono);
return r;
}
ceph_assert(ceph_mutex_is_locked(lock));
*ranks = delayed_ranks;
}
+
+void MDSPerfMetricCollector::get_last_updated(utime_t *ts) {
+ ceph_assert(ceph_mutex_is_locked(lock));
+ *ts = utime_t(last_updated_mono);
+}
MDSPerfMetrics> {
private:
std::set<mds_rank_t> delayed_ranks;
+ struct timespec last_updated_mono;
void get_delayed_ranks(std::set<mds_rank_t> *ranks);
+ void get_last_updated(utime_t *ts);
public:
MDSPerfMetricCollector(MetricListener &listener);
struct MDSPerfCollector : PerfCollector {
std::map<MDSPerfMetricKey, PerformanceCounters> counters;
std::set<mds_rank_t> delayed_ranks;
+ utime_t last_updated_mono;
MDSPerfCollector(MetricQueryID query_id)
: PerfCollector(query_id) {
}
}
+template <typename Query, typename Limit, typename Key, typename Report>
+void MetricCollector<Query, Limit, Key, Report>::reregister_queries() {
+ dout(20) << dendl;
+ listener.handle_query_updated();
+}
+
template <typename Query, typename Limit, typename Key, typename Report>
int MetricCollector<Query, Limit, Key, Report>::get_counters_generic(
MetricQueryID query_id, std::map<Key, PerformanceCounters> *c) {
void remove_all_queries();
+ void reregister_queries();
+
std::map<Query, Limits> get_queries() const {
std::lock_guard locker(lock);
def _ceph_get_osd_perf_counters(self, query_id: int) -> Optional[Dict[str, List[PerfCounterT]]]: ...
def _ceph_add_mds_perf_query(self, query: Dict[str, Dict[str, Any]]) -> Optional[int]: ...
def _ceph_remove_mds_perf_query(self, query_id: int) -> None: ...
+ def _ceph_reregister_mds_perf_queries(self) -> None: ...
def _ceph_get_mds_perf_counters(self, query_id: int) -> Optional[Dict[str, List[PerfCounterT]]]: ...
def _ceph_unregister_client(self, addrs: str) -> None: ...
def _ceph_register_client(self, addrs: str) -> None: ...
return self._ceph_remove_mds_perf_query(query_id)
@API.expose
+
+ def reregister_mds_perf_queries(self) -> None:
+ """
+ Re-register MDS perf queries.
+ """
+ return self._ceph_reregister_mds_perf_queries()
+
def get_mds_perf_counters(self, query_id: int) -> Optional[Dict[str, List[PerfCounterT]]]:
"""
Get stats collected for an MDS perf query.
import uuid
import errno
import traceback
+import logging
from collections import OrderedDict
from typing import List, Dict, Set
from mgr_module import CommandResult
from datetime import datetime, timedelta
-from threading import Lock, Condition, Thread
+from threading import Lock, Condition, Thread, Timer
from ipaddress import ip_address
PERF_STATS_VERSION = 1
MDS_GLOBAL_PERF_QUERY_COUNTERS = ['cap_hit', 'read_latency', 'write_latency', 'metadata_latency', 'dentry_lease', 'opened_files', 'pinned_icaps', 'opened_inodes', 'read_io_sizes', 'write_io_sizes'] # type: List[str]
QUERY_EXPIRE_INTERVAL = timedelta(minutes=1)
+REREGISTER_TIMER_INTERVAL = 1
CLIENT_METADATA_KEY = "client_metadata"
CLIENT_METADATA_SUBKEYS = ["hostname", "root"]
NON_EXISTENT_KEY_STR = "N/A"
+logger = logging.getLogger(__name__)
+
class FilterSpec(object):
"""
query filters encapsulated and used as key for query map
user_queries = {} # type: Dict[str, Dict]
meta_lock = Lock()
+ rqtimer = None
client_metadata = {
'metadata' : {},
'to_purge' : set(),
def __init__(self, module):
self.module = module
self.log = module.log
+ self.prev_rank0_gid = None
# report processor thread
self.report_processor = Thread(target=self.run)
self.report_processor.start()
if not key in result or not result[key] == meta:
result[key] = meta
- def notify(self, cmdtag):
+ def notify_cmd(self, cmdtag):
self.log.debug("cmdtag={0}".format(cmdtag))
with self.meta_lock:
try:
self.log.debug("client_metadata={0}, to_purge={1}".format(
self.client_metadata['metadata'], self.client_metadata['to_purge']))
+ def notify_fsmap(self):
+ #Reregister the user queries when there is a new rank0 mds
+ with self.lock:
+ gid_state = FSPerfStats.get_rank0_mds_gid_state(self.module.get('fs_map'))
+ if not gid_state:
+ return
+ rank0_gid, state = gid_state
+ if (rank0_gid and rank0_gid != self.prev_rank0_gid and state == 'up:active'):
+ #the new rank0 MDS is up:active
+ ua_last_updated = time.monotonic()
+ if (self.rqtimer and self.rqtimer.is_alive()):
+ self.rqtimer.cancel()
+ self.rqtimer = Timer(REREGISTER_TIMER_INTERVAL,
+ self.re_register_queries, args=(rank0_gid, ua_last_updated,))
+ self.rqtimer.start()
+
+ def re_register_queries(self, rank0_gid, ua_last_updated):
+ #reregister queries if the metrics are the latest. Otherwise reschedule the timer and
+ #wait for the empty metrics
+ with self.lock:
+ if self.mx_last_updated >= ua_last_updated:
+ self.log.debug("reregistering queries...")
+ self.module.reregister_mds_perf_queries()
+ self.prev_rank0_gid = rank0_gid
+ else:
+ #reschedule the timer
+ self.rqtimer = Timer(REREGISTER_TIMER_INTERVAL,
+ self.re_register_queries, args=(rank0_gid, ua_last_updated,))
+ self.rqtimer.start()
+
+ @staticmethod
+ def get_rank0_mds_gid_state(fsmap):
+ for fs in fsmap['filesystems']:
+ mds_map = fs['mdsmap']
+ if mds_map is not None:
+ for mds_id, mds_status in mds_map['info'].items():
+ if mds_status['rank'] == 0:
+ return mds_status['gid'], mds_status['state']
+ logger.warn("No rank0 mds in the fsmap")
+
def update_client_meta(self, rank_set):
new_updates = {}
pending_updates = [v[0] for v in self.client_metadata['in_progress'].values()]
# what's received from MDS
incoming_metrics = result['metrics'][1]
+ # metrics updated (monotonic) time
+ self.mx_last_updated = result['metrics'][2][0]
+
# cull missing MDSs and clients
self.cull_missing_entries(raw_perf_counters, incoming_metrics)
},
]
MODULE_OPTIONS: List[Option] = []
- NOTIFY_TYPES = [NotifyType.command]
+ NOTIFY_TYPES = [NotifyType.command, NotifyType.fs_map]
def __init__(self, *args, **kwargs):
super(Module, self).__init__(*args, **kwargs)
def notify(self, notify_type: NotifyType, notify_id):
if notify_type == NotifyType.command:
- self.fs_perf_stats.notify(notify_id)
+ self.fs_perf_stats.notify_cmd(notify_id)
+ elif notify_type == NotifyType.fs_map:
+ self.fs_perf_stats.notify_fsmap()
def handle_command(self, inbuf, cmd):
prefix = cmd['prefix']