using json_value = boost::json::value;
using json_array = boost::json::array;
-void DaemonMetricCollector::request_loop(boost::asio::steady_timer &timer) {
- timer.async_wait([&](const boost::system::error_code &e) {
- std::cerr << e << std::endl;
+void DaemonMetricCollector::request_loop() {
+ timer.async_wait([this](const boost::system::error_code &e) {
+ if (shutdown_flag) {
+ dout(1) << "Metric collector request loop cancelled" << dendl;
+ return;
+ }
+
+ if (e) return; // Exit on error or cancellation
+
+ dout(10) << "Getting metrics loop..." << dendl;
update_sockets();
bool sort_metrics = g_conf().get_val<bool>("exporter_sort_metrics");
auto stats_period = g_conf().get_val<int64_t>("exporter_stats_period");
// time to wait before sending requests again
timer.expires_from_now(std::chrono::seconds(stats_period));
- request_loop(timer);
+ request_loop();
});
}
void DaemonMetricCollector::main() {
- // time to wait before sending requests again
-
- boost::asio::io_context io;
- boost::asio::steady_timer timer{io, std::chrono::seconds(0)};
- request_loop(timer);
+ shutdown_flag = false;
+ timer.expires_from_now(std::chrono::seconds(0));
+ request_loop();
io.run();
}
+void DaemonMetricCollector::shutdown(){
+ shutdown_flag = true;
+ timer.cancel(); // Explicitly cancel the timer
+ dout(1) << "Collector shutdown initiated, timer canceled" << dendl;
+ io.stop();
+}
+
std::string DaemonMetricCollector::get_metrics() {
const std::lock_guard<std::mutex> lock(metrics_mutex);
return metrics;
static DaemonMetricCollector instance;
return instance;
}
+
#pragma once
#include "common/admin_socket_client.h"
+#include <atomic>
#include <map>
#include <string>
#include <vector>
#include <boost/asio/steady_timer.hpp>
+#include <boost/thread.hpp>
#include <boost/json/object.hpp>
#include <filesystem>
#include <map>
#include <string>
#include <vector>
+
struct pstat {
unsigned long utime;
unsigned long stime;
std::string metrics;
std::pair<labels_t, std::string> add_fixed_name_metrics(std::string metric_name);
void update_sockets();
+ void shutdown();
private:
std::mutex metrics_mutex;
std::unique_ptr<MetricsBuilder> builder;
- void request_loop(boost::asio::steady_timer &timer);
+ boost::asio::io_context io;
+ boost::asio::steady_timer timer{io};
+ std::atomic<bool> shutdown_flag{false};
+
+ void request_loop();
void dump_asok_metric(boost::json::object perf_info,
boost::json::value perf_values, std::string name,
};
DaemonMetricCollector &collector_instance();
+
#include "common/ceph_argparse.h"
#include "common/config.h"
-#include "exporter/DaemonMetricCollector.h"
-#include "exporter/web_server.h"
+#include "common/debug.h"
#include "global/global_init.h"
#include "global/global_context.h"
-
+#include "global/signal_handler.h"
+#include "exporter/DaemonMetricCollector.h"
+#include "exporter/web_server.h"
#include <boost/thread/thread.hpp>
#include <iostream>
#include <map>
#include <string>
+#include <atomic>
+#include <chrono>
+#include <thread>
#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_ceph_exporter
+
+DaemonMetricCollector &collector = collector_instance();
+
+static void handle_signal(int signum)
+{
+ ceph_assert(signum == SIGINT || signum == SIGTERM);
+ derr << "*** Got signal " << sig_str(signum) << " ***" << dendl;
+ // Finish the DaemonMetricCollector
+ collector.shutdown();
+}
static void usage() {
std::cout << "usage: ceph-exporter [options]\n"
}
int main(int argc, char **argv) {
-
auto args = argv_to_vec(argc, argv);
if (args.empty()) {
std::cerr << argv[0] << ": -h or --help for usage" << std::endl;
}
common_init_finish(g_ceph_context);
+ // Register signal handlers
+ init_async_signal_handler();
+ register_async_signal_handler(SIGHUP, sighup_handler);
+ register_async_signal_handler_oneshot(SIGINT, handle_signal);
+ register_async_signal_handler_oneshot(SIGTERM, handle_signal);
+
+ // Start the web server thread
boost::thread server_thread(web_server_thread_entrypoint);
- DaemonMetricCollector &collector = collector_instance();
+
+ // Start the DaemonMetricCollector
collector.main();
+
+ // Interrupted. Time to terminate
+ unregister_async_signal_handler(SIGHUP, sighup_handler);
+ unregister_async_signal_handler(SIGINT, handle_signal);
+ unregister_async_signal_handler(SIGTERM, handle_signal);
+ shutdown_async_signal_handler();
+
+ // Stop the web server thread by interrupting it
+ stop_web_server();
+ server_thread.interrupt(); // Interrupt the web server thread
server_thread.join();
+
+ dout(1) << "Ceph exporter stopped" << dendl;
+
+ return 0;
}
namespace ssl = boost::asio::ssl; // from <boost/asio/ssl.hpp>
using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
+//common io context for the web servers
+std::shared_ptr<net::io_context> global_ioc;
+
// Base class for common functionality
class web_connection {
public:
web_connection(net::any_io_executor executor, std::chrono::seconds timeout)
: deadline_(executor, timeout) {}
- // Common request processing logic
+ // Common request processing logic
void process_request() {
response_.version(request_.version());
response_.keep_alive(request_.keep_alive());
write_response();
}
- // Construct a response message based on the request target
+ // Construct a response message based on the request target
void create_response() {
if (request_.target() == "/") {
response_.result(http::status::moved_permanently);
}
}
- // Asynchronously transmit the response message
+ // Asynchronously transmit the response message
virtual void write_response() = 0;
// Check whether we have spent enough time on this connection
}
void run_http_server(const std::string& exporter_addr, short unsigned int port) {
- net::io_context ioc{1};
- tcp::acceptor acceptor{ioc, {net::ip::make_address(exporter_addr), port}};
- tcp::socket socket{ioc};
+ tcp::acceptor acceptor{*global_ioc, {net::ip::make_address(exporter_addr), port}};
+ tcp::socket socket{*global_ioc};
http_server(acceptor, socket);
dout(1) << "HTTP server running on " << exporter_addr << ":" << port << dendl;
- ioc.run();
+ global_ioc->run();
}
void run_https_server(const std::string& exporter_addr, short unsigned int port, const std::string& cert_file, const std::string& key_file) {
- net::io_context ioc{1};
ssl::context ssl_ctx(ssl::context::tlsv13);
ssl_ctx.use_certificate_chain_file(cert_file);
ssl_ctx.use_private_key_file(key_file, ssl::context::pem);
- tcp::acceptor acceptor{ioc, {net::ip::make_address(exporter_addr), port}};
+ tcp::acceptor acceptor{*global_ioc, {net::ip::make_address(exporter_addr), port}};
https_server(acceptor, ssl_ctx);
dout(1) << "HTTPS server running on " << exporter_addr << ":" << port << dendl;
- ioc.run();
+ global_ioc->run();
+}
+
+void stop_web_server() {
+ if (global_ioc) {
+ global_ioc->stop();
+ dout(1) << "Ceph exporter web server stopped" << dendl;
+ }
}
void web_server_thread_entrypoint() {
std::string cert_file = g_conf().get_val<std::string>("exporter_cert_file");
std::string key_file = g_conf().get_val<std::string>("exporter_key_file");
+ // Initialize global_ioc
+ global_ioc = std::make_shared<net::io_context>(1);
+
if (cert_file.empty() && key_file.empty()) {
run_http_server(exporter_addr, port);
} else {
try {
run_https_server(exporter_addr, port, cert_file, key_file);
} catch (const std::exception &e) {
- dout(1) << "Failed to start HTTPS server: " << e.what() << dendl;
+ derr << "Failed to start HTTPS server: " << e.what() << dendl;
exit(EXIT_FAILURE);
}
}
} catch (std::exception const &e) {
- dout(1) << "Error: " << e.what() << dendl;
+ derr << "Error: " << e.what() << dendl;
exit(EXIT_FAILURE);
}
}
#include <string>
void web_server_thread_entrypoint();
+void stop_web_server();