%if %{with lttng}
%{_libdir}/libos_tp.so*
%{_libdir}/libosd_tp.so*
+%{_libdir}/libmgr_op_tp.so*
%endif
%config(noreplace) %{_sysconfdir}/logrotate.d/ceph
%if 0%{?fedora} || 0%{?rhel} || 0%{?openEuler}
level: dev
desc: Time to wait during shutdown to deregister service with mgr
default: 1
+- name: mgr_enable_op_tracker
+ type: bool
+ level: advanced
+ desc: Enable / disable MGR Op Tracker
+ default: true
+ with_legacy: true
+- name: mgr_num_op_tracker_shard
+ type: uint
+ level: advanced
+ desc: The number of shards for holding the ops
+ default: 32
+ with_legacy: true
+- name: mgr_op_complaint_time
+ type: float
+ level: advanced
+ default: 30
+ desc: An operation becomes complaint worthy after the specified number of seconds have elapsed.
+ with_legacy: true
+- name: mgr_op_log_threshold
+ type: int
+ level: advanced
+ default: 5
+ fmt_desc: How many operations logs to display at once.
+ with_legacy: true
+- name: mgr_op_history_size
+ type: uint
+ level: advanced
+ default: 20
+ fmt_desc: The maximum number of completed operations to track.
+ with_legacy: true
+- name: mgr_op_history_duration
+ type: uint
+ level: advanced
+ default: 600
+ desc: The oldest completed operation to track.
+ with_legacy: true
+- name: mgr_op_history_slow_op_size
+ type: uint
+ level: advanced
+ default: 20
+ desc: Max number of slow ops to track
+ with_legacy: true
+- name: mgr_op_history_slow_op_threshold
+ type: float
+ level: advanced
+ default: 10
+ desc: Duration of an op to be considered as a historical slow op
+ with_legacy: true
- name: throttler_perf_counter
type: bool
level: advanced
PyOSDMap.cc
StandbyPyModules.cc
mgr_commands.cc
+ MgrOpRequest.cc
+ ${CMAKE_SOURCE_DIR}/src/common/TrackedOp.cc
$<TARGET_OBJECTS:mgr_cap_obj>)
add_executable(ceph-mgr ${mgr_srcs})
target_compile_definitions(ceph-mgr PRIVATE PY_SSIZE_T_CLEAN)
${GSSAPI_LIBRARIES})
set_target_properties(ceph-mgr PROPERTIES
POSITION_INDEPENDENT_CODE ${EXE_LINKER_USE_PIE})
+ if(WITH_LTTNG)
+ add_dependencies(ceph-mgr mgr_op_tp)
+ endif()
install(TARGETS ceph-mgr DESTINATION bin)
endif()
#include "mgr/DaemonHealthMetricCollector.h"
#include "mgr/OSDPerfMetricCollector.h"
#include "mgr/MDSPerfMetricCollector.h"
+#include "mgr/MgrOpRequest.h"
#include "mon/MonCommand.h"
#include "messages/MMgrOpen.h"
using namespace TOPNSPC::common;
using std::list;
+using std::ostream;
using std::ostringstream;
using std::string;
using std::stringstream;
py_modules(py_modules_),
clog(clog_),
audit_clog(audit_clog_),
+ asok_hook(nullptr),
pgmap_ready(false),
timer(g_ceph_context, lock),
tick_event(nullptr),
osd_perf_metric_collector_listener(this),
osd_perf_metric_collector(osd_perf_metric_collector_listener),
mds_perf_metric_collector_listener(this),
- mds_perf_metric_collector(mds_perf_metric_collector_listener)
+ mds_perf_metric_collector(mds_perf_metric_collector_listener),
+ op_tracker(g_ceph_context, g_ceph_context->_conf->mgr_enable_op_tracker,
+ g_ceph_context->_conf->mgr_num_op_tracker_shard)
{
g_conf().add_observer(this);
+ /* define op size and time for mgr daemon */
+ op_tracker.set_complaint_and_threshold(cct->_conf->mgr_op_complaint_time,
+ cct->_conf->mgr_op_log_threshold);
+ op_tracker.set_history_size_and_duration(cct->_conf->mgr_op_history_size,
+ cct->_conf->mgr_op_history_duration);
+ op_tracker.set_history_slow_op_size_and_threshold(cct->_conf->mgr_op_history_slow_op_size,
+ cct->_conf->mgr_op_history_slow_op_threshold);
}
DaemonServer::~DaemonServer() {
g_conf().remove_observer(this);
}
+class DaemonServerHook : public AdminSocketHook {
+ DaemonServer *daemon_server;
+public:
+ explicit DaemonServerHook(DaemonServer *o) : daemon_server(o) {}
+ int call(std::string_view admin_command,
+ const cmdmap_t& cmdmap,
+ const bufferlist&,
+ Formatter *f,
+ std::ostream& errss,
+ bufferlist& out) override {
+ stringstream outss;
+ int r = 0;
+ try {
+ r = daemon_server->asok_command(admin_command, cmdmap, f, outss);
+ out.append(outss);
+ } catch (const TOPNSPC::common::bad_cmd_get& e) {
+ errss << e.what();
+ r = -EINVAL;
+ }
+ return r;
+ }
+};
+
int DaemonServer::init(uint64_t gid, entity_addrvec_t client_addrs)
{
// Initialize Messenger
schedule_tick_locked(
g_conf().get_val<std::chrono::seconds>("mgr_tick_period").count());
+ op_tracker.set_tracking(cct->_conf->mgr_enable_op_tracker);
+
+ AdminSocket *admin_socket = g_ceph_context->get_admin_socket();
+ asok_hook = new DaemonServerHook(this);
+ r = admin_socket->register_command("dump_ops_in_flight " \
+ "name=filterstr,type=CephString,n=N,req=false",
+ asok_hook,
+ "show the ops currently in flight");
+ ceph_assert(r == 0);
+ r = admin_socket->register_command("dump_blocked_ops " \
+ "name=filterstr,type=CephString,n=N,req=false",
+ asok_hook,
+ "show the blocked ops currently in flight");
+ ceph_assert(r == 0);
+ r = admin_socket->register_command("dump_blocked_ops_count " \
+ "name=filterstr,type=CephString,n=N,req=false",
+ asok_hook,
+ "show the count of blocked ops currently in flight");
+ ceph_assert(r == 0);
+ r = admin_socket->register_command("dump_historic_ops " \
+ "name=filterstr,type=CephString,n=N,req=false",
+ asok_hook,
+ "show recent ops");
+ ceph_assert(r == 0);
+ r = admin_socket->register_command("dump_historic_slow_ops " \
+ "name=filterstr,type=CephString,n=N,req=false",
+ asok_hook,
+ "show slowest recent ops");
+ ceph_assert(r == 0);
+ r = admin_socket->register_command("dump_historic_ops_by_duration " \
+ "name=filterstr,type=CephString,n=N,req=false",
+ asok_hook,
+ "show slowest recent ops, sorted by duration");
+ ceph_assert(r == 0);
return 0;
}
*/
class ReplyOnFinish : public Context {
std::shared_ptr<CommandContext> cmdctx;
+ MgrOpRequestRef op;
public:
bufferlist from_mon;
string outs;
- explicit ReplyOnFinish(const std::shared_ptr<CommandContext> &cmdctx_)
- : cmdctx(cmdctx_)
- {}
+ explicit ReplyOnFinish(const std::shared_ptr<CommandContext> &cmdctx_,
+ MgrOpRequestRef op_)
+ : cmdctx(cmdctx_),
+ op(op_)
+ {
+ if (op) {
+ op->mark_finish_mon_command();
+ }
+ }
void finish(int r) override {
cmdctx->odata.claim_append(from_mon);
cmdctx->reply(r, outs);
return true;
}
+ // Track non-admin mgr ops only
+ MessageRef mref = m.get();
+ MgrOpRequestRef op = op_tracker.create_request<MgrOpRequest, MessageRef>(mref);
+
+ op->mark_started();
+
// ----------------
// service map commands
if (prefix == "service dump") {
"\"prefix\": \"osd reweightn\", "
"\"weights\": \"" + s + "\""
"}";
- auto on_finish = new ReplyOnFinish(cmdctx);
+ op->mark_start_mon_command();
+ auto on_finish = new ReplyOnFinish(cmdctx, op);
monc->start_mon_command({cmd}, {},
&on_finish->from_mon, &on_finish->outs, on_finish);
return true;
"\"id\": " + stringify(osds) + ", "
"\"yes_i_really_mean_it\": true"
"}";
- auto on_finish = new ReplyOnFinish(cmdctx);
+ op->mark_start_mon_command();
+ auto on_finish = new ReplyOnFinish(cmdctx, op);
monc->start_mon_command({cmd}, {}, nullptr, &on_finish->outs, on_finish);
return true;
} else if (prefix == "osd ok-to-stop") {
"\"prefix\": \"config-key set\", "
"\"key\": \"device/" + devid + "\""
"}";
- auto on_finish = new ReplyOnFinish(cmdctx);
+ op->mark_start_mon_command();
+ auto on_finish = new ReplyOnFinish(cmdctx, op);
monc->start_mon_command({cmd}, json, nullptr, nullptr, on_finish);
}
return true;
"\"key\": \"device/" + devid + "\""
"}";
}
- auto on_finish = new ReplyOnFinish(cmdctx);
+ op->mark_start_mon_command();
+ auto on_finish = new ReplyOnFinish(cmdctx, op);
monc->start_mon_command({cmd}, json, nullptr, nullptr, on_finish);
} else {
cmdctx->reply(0, ss);
return true;
}
+ op->mark_queued_for_module();
+
dout(10) << "passing through command '" << prefix << "' size " << cmdctx->cmdmap.size() << dendl;
Finisher& mod_finisher = py_modules.get_active_module_finisher(mod_name);
- mod_finisher.queue(new LambdaContext([this, cmdctx, session, py_command, prefix]
+
+ mod_finisher.queue(new LambdaContext([this, cmdctx, session, py_command, prefix, op]
(int r_) mutable {
std::stringstream ss;
std::stringstream ds;
bufferlist inbl = cmdctx->data;
+ op->mark_reached(py_command.module_name.c_str());
int r = py_modules.handle_command(py_command, *session, cmdctx->cmdmap,
inbl, &ds, &ss);
if (r == -EACCES) {
{
return mds_perf_metric_collector.get_counters(collector);
}
+
+bool DaemonServer::asok_command(
+ std::string_view admin_command,
+ const cmdmap_t& cmdmap,
+ Formatter *f,
+ ostream& ss)
+{
+ int ret = 0;
+ std::lock_guard l(lock);
+ if (admin_command == "dump_ops_in_flight" ||
+ admin_command == "dump_blocked_ops" ||
+ admin_command == "dump_blocked_ops_count" ||
+ admin_command == "dump_historic_ops" ||
+ admin_command == "dump_historic_ops_by_duration" ||
+ admin_command == "dump_historic_slow_ops") {
+
+ const string error_str = "op_tracker tracking is not enabled now, so no ops are tracked currently, \
+even those get stuck. Please enable \"mgr_enable_op_tracker\", and the tracker \
+will start to track new ops received afterwards.";
+
+ set<string> filters;
+ vector<string> filter_str;
+ if (cmd_getval(cmdmap, "filterstr", filter_str)) {
+ copy(filter_str.begin(), filter_str.end(),
+ inserter(filters, filters.end()));
+ }
+
+ if (admin_command == "dump_ops_in_flight") {
+ if (!op_tracker.dump_ops_in_flight(f, false, filters)) {
+ ss << error_str;
+ ret = -EINVAL;
+ goto out;
+ }
+ } else if (admin_command == "dump_blocked_ops") {
+ if (!op_tracker.dump_ops_in_flight(f, true, filters)) {
+ ss << error_str;
+ ret = -EINVAL;
+ goto out;
+ }
+ } else if (admin_command == "dump_blocked_ops_count") {
+ if (!op_tracker.dump_ops_in_flight(f, true, filters, true)) {
+ ss << error_str;
+ ret = -EINVAL;
+ goto out;
+ }
+ } else if (admin_command == "dump_historic_ops") {
+ if (!op_tracker.dump_historic_ops(f, false, filters)) {
+ ss << error_str;
+ ret = -EINVAL;
+ goto out;
+ }
+ } else if (admin_command == "dump_historic_ops_by_duration") {
+ if (!op_tracker.dump_historic_ops(f, true, filters)) {
+ ss << error_str;
+ ret = -EINVAL;
+ goto out;
+ }
+ } else if (admin_command == "dump_historic_slow_ops") {
+ if (!op_tracker.dump_historic_ops(f, true, filters)) {
+ ss << error_str;
+ ret = -EINVAL;
+ goto out;
+ }
+ }
+ }
+ dout(10) << "ret := " << ret << dendl;
+ return true;
+
+out:
+ return false;
+}
#include "MetricCollector.h"
#include "OSDPerfMetricCollector.h"
#include "MDSPerfMetricCollector.h"
+#include "MgrOpRequest.h"
class MMgrReport;
class MMgrOpen;
}
};
-
/**
* Server used in ceph-mgr to communicate with Ceph daemons like
* MDSs and OSDs.
const std::map<std::string,std::string>& param_str_map,
const MonCommand *this_cmd);
+ class DaemonServerHook *asok_hook;
+
private:
friend class ReplyOnFinish;
bool _reply(MCommand* m,
void update_task_status(DaemonKey key,
const std::map<std::string,std::string>& task_status);
+private:
+ // -- op tracking --
+ OpTracker op_tracker;
public:
int init(uint64_t gid, entity_addrvec_t client_addrs);
void log_access_denied(std::shared_ptr<CommandContext>& cmdctx,
MgrSession* session, std::stringstream& ss);
void dump_pg_ready(ceph::Formatter *f);
+
+ bool asok_command(std::string_view admin_command,
+ const cmdmap_t& cmdmap,
+ Formatter *f,
+ std::ostream& ss);
};
#endif
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+
+#include "MgrOpRequest.h"
+#include <iostream>
+#include <vector>
+#include "common/debug.h"
+#include "common/config.h"
+#include "common/Formatter.h"
+#include "include/ceph_assert.h"
+#include "msg/Message.h"
+
+#ifdef WITH_LTTNG
+#define TRACEPOINT_DEFINE
+#define TRACEPOINT_PROBE_DYNAMIC_LINKAGE
+#include "tracing/mgroprequest.h"
+#undef TRACEPOINT_PROBE_DYNAMIC_LINKAGE
+#undef TRACEPOINT_DEFINE
+#else
+#define tracepoint(...)
+#endif
+
+using std::ostream;
+using std::set;
+using std::string;
+using std::stringstream;
+
+using ceph::Formatter;
+
+MgrOpRequest::MgrOpRequest(MessageRef req, OpTracker* tracker)
+ : TrackedOp(tracker, req->get_recv_stamp()),
+ request(req) {
+ req_src_inst = req->get_source_inst();
+}
+
+void MgrOpRequest::_dump(Formatter *f) const
+{
+ MessageRef m = request;
+ f->dump_string("flag_point", state_string());
+ if (m->get_orig_source().is_client()) {
+ f->open_object_section("client_info");
+ stringstream client_name, client_addr;
+ client_name << req_src_inst.name;
+ client_addr << req_src_inst.addr;
+ f->dump_string("client", client_name.str());
+ f->dump_string("client_addr", client_addr.str());
+ f->dump_unsigned("tid", m->get_tid());
+ f->close_section(); // client_info
+ }
+
+ {
+ f->open_array_section("events");
+ std::lock_guard l(lock);
+
+ for (auto i = events.begin(); i != events.end(); ++i) {
+ f->open_object_section("event");
+ f->dump_string("event", i->str);
+ f->dump_stream("time") << i->stamp;
+
+ double duration = 0;
+
+ if (i != events.begin()) {
+ auto i_prev = i - 1;
+ duration = i->stamp - i_prev->stamp;
+ }
+
+ f->dump_float("duration", duration);
+ f->close_section();
+ }
+ f->close_section();
+ }
+}
+
+void MgrOpRequest::_dump_op_descriptor(ostream& stream) const
+{
+ get_req()->print(stream);
+}
+
+void MgrOpRequest::_unregistered() {
+ request->clear_data();
+ request->clear_payload();
+ request->release_message_throttle();
+ request->set_connection(nullptr);
+}
+
+void MgrOpRequest::mark_flag_point(uint8_t flag, const char *s) {
+ [[maybe_unused]] uint8_t old_flags = hit_flag_points;
+ mark_event(s);
+ last_event_detail = s;
+ hit_flag_points |= flag;
+ latest_flag_point = flag;
+
+ tracepoint(mgroprequest, mark_flag_point,
+ flag, s, old_flags, hit_flag_points);
+}
+
+void MgrOpRequest::mark_flag_point_string(uint8_t flag, const string& s) {
+ [[maybe_unused]] uint8_t old_flags = hit_flag_points;
+ mark_event(s);
+ hit_flag_points |= flag;
+ latest_flag_point = flag;
+
+ tracepoint(mgroprequest, mark_flag_point,
+ flag, s.c_str(), old_flags, hit_flag_points);
+}
+
+bool MgrOpRequest::filter_out(const set<string>& filters)
+{
+ set<entity_addr_t> addrs;
+ for (const auto& filter : filters) {
+ entity_addr_t addr;
+ if (addr.parse(filter.c_str())) {
+ addrs.insert(addr);
+ }
+ }
+ if (addrs.empty())
+ return true;
+
+ entity_addr_t cmp_addr = req_src_inst.addr;
+ if (addrs.count(cmp_addr)) {
+ return true;
+ }
+ cmp_addr.set_nonce(0);
+ if (addrs.count(cmp_addr)) {
+ return true;
+ }
+ cmp_addr.set_port(0);
+ if (addrs.count(cmp_addr)) {
+ return true;
+ }
+
+ return false;
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2023 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef OPREQUEST_H_
+#define OPREQUEST_H_
+
+#include "common/TrackedOp.h"
+#include "common/tracer.h"
+/**
+ * The MgrOpRequest takes in a MessageRef and takes over a single reference
+ * to it, which it puts() when destroyed.
+ */
+struct MgrOpRequest : public TrackedOp {
+ friend class OpTracker;
+
+public:
+ void _dump(ceph::Formatter *f) const override;
+
+private:
+ MessageRef request; /// the logical request we are tracking
+ entity_inst_t req_src_inst;
+ uint8_t hit_flag_points;
+ uint8_t latest_flag_point;
+ const char* last_event_detail = nullptr;
+
+ static const uint8_t flag_started = 1 << 0;
+ static const uint8_t flag_queued_for_module = 1 << 1;
+ static const uint8_t flag_reached_module = 1 << 2;
+ static const uint8_t flag_start_mon_command = 1 << 3;
+ static const uint8_t flag_finish_mon_command = 1 << 4;
+
+ MgrOpRequest(MessageRef req, OpTracker *tracker);
+
+protected:
+ void _dump_op_descriptor(std::ostream& stream) const override;
+ void _unregistered() override;
+ bool filter_out(const std::set<std::string>& filters) override;
+
+public:
+ ~MgrOpRequest() override {
+ request->put();
+ }
+
+ template<class T>
+ const T* get_req() const { return static_cast<const T*>(request); }
+
+ const MessageRef get_req() const { return request; }
+ MessageRef get_nonconst_req() { return request; }
+
+ entity_name_t get_source() {
+ if (request) {
+ return request->get_source();
+ } else {
+ return {};
+ }
+ }
+ uint8_t state_flag() const {
+ return latest_flag_point;
+ }
+
+ std::string _get_state_string() const override {
+ switch(latest_flag_point) {
+ case flag_started: return "started";
+ case flag_queued_for_module: return "queued for module";
+ case flag_reached_module: return last_event_detail;
+ case flag_start_mon_command: return "start mon command";
+ case flag_finish_mon_command: return "mon command finished";
+ default: break;
+ }
+ return "no flag points reached";
+ }
+
+ static std::string get_state_string(uint8_t flag) {
+ std::string flag_point;
+ switch(flag) {
+ case flag_started:
+ flag_point = "started";
+ break;
+ case flag_queued_for_module:
+ flag_point = "queued for module";
+ break;
+ case flag_reached_module:
+ flag_point = "reached module";
+ break;
+ case flag_start_mon_command:
+ flag_point = "start mon command";
+ break;
+ case flag_finish_mon_command:
+ flag_point = "mon command finished";
+ break;
+ }
+ return flag_point;
+ }
+
+ void mark_started() {
+ mark_flag_point(flag_started, "started");
+ }
+ void mark_queued_for_module() {
+ mark_flag_point(flag_queued_for_module, "queued_for_module");
+ }
+ void mark_reached(const char *s) {
+ mark_flag_point(flag_reached_module, s);
+ }
+ void mark_start_mon_command() {
+ mark_flag_point(flag_start_mon_command, "start_mon_command");
+ }
+ void mark_finish_mon_command() {
+ mark_flag_point(flag_start_mon_command, "mon_command_finished");
+ }
+
+ typedef boost::intrusive_ptr<MgrOpRequest> Ref;
+
+private:
+ void mark_flag_point(uint8_t flag, const char *s);
+ void mark_flag_point_string(uint8_t flag, const std::string& s);
+};
+
+typedef MgrOpRequest::Ref MgrOpRequestRef;
+
+#endif /* OPREQUEST_H_ */
add_tracing_library(bluestore_tp bluestore.tp 1.0.0)
add_tracing_library(rgw_op_tp rgw_op.tp 2.0.0)
add_tracing_library(rgw_rados_tp rgw_rados.tp 2.0.0)
+add_tracing_library(mgr_op_tp mgroprequest.tp 1.0.0)
-install(TARGETS rados_tp osd_tp os_tp rgw_rados_tp rgw_op_tp DESTINATION ${CMAKE_INSTALL_LIBDIR})
+install(TARGETS rados_tp osd_tp os_tp rgw_rados_tp rgw_op_tp mgr_op_tp DESTINATION ${CMAKE_INSTALL_LIBDIR})
if(WITH_RBD)
add_tracing_library(rbd_tp librbd.tp 1.0.0)
install(TARGETS rbd_tp DESTINATION ${CMAKE_INSTALL_LIBDIR})
--- /dev/null
+#define TRACEPOINT_CREATE_PROBES
+/*
+ * The header containing our TRACEPOINT_EVENTs.
+ */
+#include "tracing/mgroprequest.h"
--- /dev/null
+#include "include/int_types.h"
+
+TRACEPOINT_EVENT(mgroprequest, set_rmw_flags,
+ TP_ARGS(
+ int, flag,
+ int, old_rmw_flags,
+ int, new_rmw_flags),
+ TP_FIELDS(
+ ctf_integer_hex(int, flag, flag)
+ ctf_integer_hex(int, old_rmw_flags, old_rmw_flags)
+ ctf_integer_hex(int, new_rmw_flags, new_rmw_flags)
+ )
+)
+
+TRACEPOINT_EVENT(mgroprequest, mark_flag_point,
+ TP_ARGS(
+ uint8_t, flag,
+ const char*, msg,
+ uint8_t, old_hit_flag_points,
+ uint8_t, new_hit_flag_points),
+ TP_FIELDS(
+ ctf_integer_hex(uint8_t, flag, flag)
+ ctf_string(msg, msg)
+ ctf_integer_hex(uint8_t, old_hit_flag_points, old_hit_flag_points)
+ ctf_integer_hex(uint8_t, new_hit_flag_points, new_hit_flag_points)
+ )
+)