From: Cory Snyder Date: Wed, 22 Sep 2021 14:14:12 +0000 (-0400) Subject: rgw: add abstraction for ops log destination and add file logger X-Git-Tag: v17.1.0~565^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=c4c9f391aaa6983cf4ce0af144b70ecd27e76f45;p=ceph.git rgw: add abstraction for ops log destination and add file logger Adds an OpsLogSink abstraction for ops log destinations. Also implements this abstraction for a file logger since it's easier to use files vs. sockets with containers. Fixes: https://tracker.ceph.com/issues/48752 Signed-off-by: Cory Snyder --- diff --git a/src/common/options/rgw.yaml.in b/src/common/options/rgw.yaml.in index 961d22b9e6d7..d116b05b67f8 100644 --- a/src/common/options/rgw.yaml.in +++ b/src/common/options/rgw.yaml.in @@ -1438,6 +1438,7 @@ options: - rgw_log_object_name - rgw_ops_log_rados - rgw_ops_log_socket_path + - rgw_ops_log_file_path with_legacy: true # enable logging bandwidth usage - name: rgw_enable_usage_log @@ -1478,6 +1479,18 @@ options: - rgw_enable_ops_log - rgw_ops_log_data_backlog with_legacy: true +# path to file where ops log can go +- name: rgw_ops_log_file_path + type: str + level: advanced + desc: File-system path for ops log. + long_desc: Path to file that RGW will log ops logs to. + fmt_desc: The file-system path for writing operations logs. + services: + - rgw + see_also: + - rgw_enable_ops_log + with_legacy: true # max data backlog for ops log - name: rgw_ops_log_data_backlog type: size diff --git a/src/rgw/librgw.cc b/src/rgw/librgw.cc index e22010cb452d..18c4140e60b3 100644 --- a/src/rgw/librgw.cc +++ b/src/rgw/librgw.cc @@ -340,8 +340,7 @@ namespace rgw { << e.what() << dendl; } if (should_log) { - rgw_log_op(store, nullptr /* !rest */, s, - (op ? op->name() : "unknown"), olog); + rgw_log_op(nullptr /* !rest */, s, (op ? op->name() : "unknown"), olog); } int http_ret = s->err.http_ret; @@ -589,10 +588,20 @@ namespace rgw { // XXX ex-RGWRESTMgr_lib, mgr->set_logging(true) + OpsLogManifold* olog_manifold = new OpsLogManifold(); if (!g_conf()->rgw_ops_log_socket_path.empty()) { - olog = new OpsLogSocket(g_ceph_context, g_conf()->rgw_ops_log_data_backlog); - olog->init(g_conf()->rgw_ops_log_socket_path); + OpsLogSocket* olog_socket = new OpsLogSocket(g_ceph_context, g_conf()->rgw_ops_log_data_backlog); + olog_socket->init(g_conf()->rgw_ops_log_socket_path); + olog_manifold->add_sink(olog_socket); } + OpsLogFile* ops_log_file; + if (!g_conf()->rgw_ops_log_file_path.empty()) { + ops_log_file = new OpsLogFile(g_ceph_context, g_conf()->rgw_ops_log_file_path, g_conf()->rgw_ops_log_data_backlog); + ops_log_file->start(); + olog_manifold->add_sink(ops_log_file); + } + olog_manifold->add_sink(new OpsLogRados(store)); + olog = olog_manifold; int port = 80; RGWProcessEnv env = { store, &rest, olog, port }; @@ -653,7 +662,7 @@ namespace rgw { shutdown_async_signal_handler(); rgw_log_usage_finalize(); - + delete olog; StoreManager::close_storage(store); diff --git a/src/rgw/rgw_lib.h b/src/rgw/rgw_lib.h index 710c76c80315..0c5e9e721685 100644 --- a/src/rgw/rgw_lib.h +++ b/src/rgw/rgw_lib.h @@ -20,7 +20,7 @@ #define dout_subsys ceph_subsys_rgw -class OpsLogSocket; +class OpsLogSink; namespace rgw { @@ -29,7 +29,7 @@ namespace rgw { class RGWLib : public DoutPrefixProvider { RGWFrontendConfig* fec; RGWLibFrontend* fe; - OpsLogSocket* olog; + OpsLogSink* olog; rgw::LDAPHelper* ldh{nullptr}; RGWREST rest; // XXX needed for RGWProcessEnv rgw::sal::Store* store; diff --git a/src/rgw/rgw_log.cc b/src/rgw/rgw_log.cc index da6eb1e6ad33..aee079daf7f0 100644 --- a/src/rgw/rgw_log.cc +++ b/src/rgw/rgw_log.cc @@ -17,6 +17,9 @@ #include "services/svc_zone.h" +#include +#include + #define dout_subsys ceph_subsys_rgw using namespace std; @@ -317,32 +320,131 @@ void rgw_format_ops_log_entry(struct rgw_log_entry& entry, Formatter *formatter) formatter->close_section(); } -void OpsLogSocket::formatter_to_bl(bufferlist& bl) +OpsLogManifold::~OpsLogManifold() { - stringstream ss; - formatter->flush(ss); - const string& s = ss.str(); + for (const auto &sink : sinks) { + delete sink; + } +} - bl.append(s); +void OpsLogManifold::add_sink(OpsLogSink* sink) +{ + sinks.push_back(sink); } -void OpsLogSocket::init_connection(bufferlist& bl) +int OpsLogManifold::log(struct req_state* s, struct rgw_log_entry& entry) { - bl.append("["); + int ret = 0; + for (const auto &sink : sinks) { + if (sink->log(s, entry) < 0) { + ret = -1; + } + } + return ret; } -OpsLogSocket::OpsLogSocket(CephContext *cct, uint64_t _backlog) : OutputDataSocket(cct, _backlog) +OpsLogFile::OpsLogFile(CephContext* cct, std::string& path, uint64_t max_data_size) : + cct(cct), file(path, std::ofstream::app), data_size(0), max_data_size(max_data_size) { - formatter = new JSONFormatter; - delim.append(",\n"); } -OpsLogSocket::~OpsLogSocket() +void OpsLogFile::flush() +{ + std::scoped_lock flush_lock(flush_mutex); + { + std::scoped_lock log_lock(log_mutex); + assert(flush_buffer.empty()); + flush_buffer.swap(log_buffer); + data_size = 0; + } + for (auto bl : flush_buffer) { + int try_num = 0; + while (true) { + bl.write_stream(file); + if (!file) { + ldpp_dout(this, 0) << "ERROR: failed to log RGW ops log file entry" << dendl; + file.clear(); + if (stopped) { + break; + } + int sleep_time_secs = std::min((int) pow(2, try_num), 60); + std::this_thread::sleep_for(std::chrono::seconds(sleep_time_secs)); + try_num++; + } else { + break; + } + } + } + flush_buffer.clear(); + file << std::endl; +} + +void* OpsLogFile::entry() { + std::unique_lock lock(log_mutex); + while (!stopped) { + if (!log_buffer.empty()) { + lock.unlock(); + flush(); + lock.lock(); + continue; + } + cond_flush.wait(lock); + } + flush(); + return NULL; +} + +void OpsLogFile::start() { + stopped = false; + create("ops_log_file"); +} + +void OpsLogFile::stop() { + { + cond_flush.notify_one(); + stopped = true; + } + join(); +} + +OpsLogFile::~OpsLogFile() { + if (!stopped) { + stop(); + } + file.close(); +} + +int OpsLogFile::log_json(struct req_state* s, bufferlist& bl) +{ + std::unique_lock lock(log_mutex); + if (data_size + bl.length() >= max_data_size) { + ldout(s->cct, 0) << "ERROR: RGW ops log file buffer too full, dropping log for txn: " << s->trans_id << dendl; + return -1; + } + log_buffer.push_back(bl); + data_size += bl.length(); + cond_flush.notify_all(); + return 0; +} + +JsonOpsLogSink::JsonOpsLogSink() { + formatter = new JSONFormatter; +} + +JsonOpsLogSink::~JsonOpsLogSink() { delete formatter; } -void OpsLogSocket::log(struct rgw_log_entry& entry) +void JsonOpsLogSink::formatter_to_bl(bufferlist& bl) +{ + stringstream ss; + formatter->flush(ss); + const string& s = ss.str(); + bl.append(s); +} + +int JsonOpsLogSink::log(struct req_state* s, struct rgw_log_entry& entry) { bufferlist bl; @@ -351,11 +453,53 @@ void OpsLogSocket::log(struct rgw_log_entry& entry) formatter_to_bl(bl); lock.unlock(); + return log_json(s, bl); +} + +void OpsLogSocket::init_connection(bufferlist& bl) +{ + bl.append("["); +} + +OpsLogSocket::OpsLogSocket(CephContext *cct, uint64_t _backlog) : OutputDataSocket(cct, _backlog) +{ + delim.append(",\n"); +} + +int OpsLogSocket::log_json(struct req_state* s, bufferlist& bl) +{ append_output(bl); + return 0; } -int rgw_log_op(rgw::sal::Store* store, RGWREST* const rest, struct req_state *s, - const string& op_name, OpsLogSocket *olog) +OpsLogRados::OpsLogRados(rgw::sal::Store* store): store(store) +{ +} + +int OpsLogRados::log(struct req_state* s, struct rgw_log_entry& entry) +{ + if (!s->cct->_conf->rgw_ops_log_rados) { + return 0; + } + bufferlist bl; + encode(entry, bl); + + struct tm bdt; + time_t t = req_state::Clock::to_time_t(entry.time); + if (s->cct->_conf->rgw_log_object_name_utc) + gmtime_r(&t, &bdt); + else + localtime_r(&t, &bdt); + string oid = render_log_object_name(s->cct->_conf->rgw_log_object_name, &bdt, + entry.bucket_id, entry.bucket); + if (store->log_op(s, oid, bl) < 0) { + ldpp_dout(s, 0) << "ERROR: failed to log RADOS RGW ops log entry for txn: " << s->trans_id << dendl; + return -1; + } + return 0; +} + +int rgw_log_op(RGWREST* const rest, struct req_state *s, const string& op_name, OpsLogSink *olog) { struct rgw_log_entry entry; string bucket_id; @@ -473,37 +617,14 @@ int rgw_log_op(rgw::sal::Store* store, RGWREST* const rest, struct req_state *s, char buf[16]; snprintf(buf, sizeof(buf), "%d", s->err.http_ret); entry.http_status = buf; - } else + } else { entry.http_status = "200"; // default - + } entry.error_code = s->err.err_code; entry.bucket_id = bucket_id; entry.trans_id = s->trans_id; - - bufferlist bl; - encode(entry, bl); - - struct tm bdt; - time_t t = req_state::Clock::to_time_t(entry.time); - if (s->cct->_conf->rgw_log_object_name_utc) - gmtime_r(&t, &bdt); - else - localtime_r(&t, &bdt); - - int ret = 0; - - if (s->cct->_conf->rgw_ops_log_rados) { - string oid = render_log_object_name(s->cct->_conf->rgw_log_object_name, &bdt, - entry.bucket_id, entry.bucket); - ret = store->log_op(s, oid, bl); - } - if (olog) { - olog->log(entry); + return olog->log(s, entry); } - if (ret < 0) - ldpp_dout(s, 0) << "ERROR: failed to log entry" << dendl; - - return ret; + return 0; } - diff --git a/src/rgw/rgw_log.h b/src/rgw/rgw_log.h index 2469282b3301..5ee411e8cfae 100644 --- a/src/rgw/rgw_log.h +++ b/src/rgw/rgw_log.h @@ -7,6 +7,10 @@ #include #include "rgw_common.h" #include "common/OutputDataSocket.h" +#include +#include + +#define dout_subsys ceph_subsys_rgw namespace rgw { namespace sal { class Store; @@ -132,26 +136,79 @@ struct rgw_log_entry { }; WRITE_CLASS_ENCODER(rgw_log_entry) -class OpsLogSocket : public OutputDataSocket { +class OpsLogSink { +public: + virtual int log(struct req_state* s, struct rgw_log_entry& entry) = 0; + virtual ~OpsLogSink() = default; +}; + +class OpsLogManifold: public OpsLogSink { + std::vector sinks; +public: + ~OpsLogManifold() override; + void add_sink(OpsLogSink* sink); + int log(struct req_state* s, struct rgw_log_entry& entry) override; +}; + +class JsonOpsLogSink : public OpsLogSink { ceph::Formatter *formatter; - ceph::mutex lock = ceph::make_mutex("OpsLogSocket"); + ceph::mutex lock = ceph::make_mutex("JsonOpsLogSink"); void formatter_to_bl(bufferlist& bl); +protected: + virtual int log_json(struct req_state* s, bufferlist& bl) = 0; +public: + JsonOpsLogSink(); + ~JsonOpsLogSink() override; + int log(struct req_state* s, struct rgw_log_entry& entry) override; +}; + +class OpsLogFile : public JsonOpsLogSink, public Thread, public DoutPrefixProvider { + CephContext* cct; + ceph::mutex log_mutex = ceph::make_mutex("OpsLogFile_log"); + ceph::mutex flush_mutex = ceph::make_mutex("OpsLogFile_flush"); + std::vector log_buffer; + std::vector flush_buffer; + std::condition_variable cond_flush; + std::ofstream file; + bool stopped; + uint64_t data_size; + uint64_t max_data_size; + + void flush(); +protected: + int log_json(struct req_state* s, bufferlist& bl) override; + void *entry() override; +public: + OpsLogFile(CephContext* cct, std::string& path, uint64_t max_data_size); + ~OpsLogFile() override; + CephContext *get_cct() const override { return cct; } + unsigned get_subsys() const override { return dout_subsys; } + std::ostream& gen_prefix(std::ostream& out) const override { return out << "rgw OpsLogFile: "; } + void start(); + void stop(); +}; +class OpsLogSocket : public OutputDataSocket, public JsonOpsLogSink { protected: + int log_json(struct req_state* s, bufferlist& bl) override; void init_connection(bufferlist& bl) override; public: OpsLogSocket(CephContext *cct, uint64_t _backlog); - ~OpsLogSocket() override; +}; - void log(struct rgw_log_entry& entry); +class OpsLogRados : public OpsLogSink { + rgw::sal::Store* store; +public: + OpsLogRados(rgw::sal::Store* store); + int log(struct req_state* s, struct rgw_log_entry& entry) override; }; class RGWREST; -int rgw_log_op(rgw::sal::Store* store, RGWREST* const rest, struct req_state* s, - const std::string& op_name, OpsLogSocket* olog); +int rgw_log_op(RGWREST* const rest, struct req_state* s, + const std::string& op_name, OpsLogSink* olog); void rgw_log_usage_init(CephContext* cct, rgw::sal::Store* store); void rgw_log_usage_finalize(); void rgw_format_ops_log_entry(struct rgw_log_entry& entry, diff --git a/src/rgw/rgw_lua_request.cc b/src/rgw/rgw_lua_request.cc index a8ea4ee19b2c..7d6d72e0368b 100644 --- a/src/rgw/rgw_lua_request.cc +++ b/src/rgw/rgw_lua_request.cc @@ -24,16 +24,15 @@ constexpr const char* RequestLogAction{"Log"}; int RequestLog(lua_State* L) { - const auto store = reinterpret_cast(lua_touserdata(L, lua_upvalueindex(1))); - const auto rest = reinterpret_cast(lua_touserdata(L, lua_upvalueindex(2))); - const auto olog = reinterpret_cast(lua_touserdata(L, lua_upvalueindex(3))); - const auto s = reinterpret_cast(lua_touserdata(L, lua_upvalueindex(4))); - const std::string op_name(reinterpret_cast(lua_touserdata(L, lua_upvalueindex(5)))); - if (store && s) { - const auto rc = rgw_log_op(store, rest, s, op_name, olog); + const auto rest = reinterpret_cast(lua_touserdata(L, lua_upvalueindex(1))); + const auto olog = reinterpret_cast(lua_touserdata(L, lua_upvalueindex(2))); + const auto s = reinterpret_cast(lua_touserdata(L, lua_upvalueindex(3))); + const std::string op_name(reinterpret_cast(lua_touserdata(L, lua_upvalueindex(4)))); + if (s) { + const auto rc = rgw_log_op(rest, s, op_name, olog); lua_pushinteger(L, rc); } else { - ldpp_dout(s, 1) << "Lua ERROR: missing rados store, cannot use ops log" << dendl; + ldpp_dout(s, 1) << "Lua ERROR: missing request state, cannot use ops log" << dendl; lua_pushinteger(L, -EINVAL); } @@ -777,7 +776,7 @@ struct RequestMetaTable : public EmptyMetaTable { int execute( rgw::sal::Store* store, RGWREST* rest, - OpsLogSocket* olog, + OpsLogSink* olog, req_state* s, const char* op_name, const std::string& script) @@ -799,12 +798,11 @@ int execute( lua_getglobal(L, RequestMetaTable::TableName().c_str()); ceph_assert(lua_istable(L, -1)); pushstring(L, RequestLogAction); - lua_pushlightuserdata(L, store); lua_pushlightuserdata(L, rest); lua_pushlightuserdata(L, olog); lua_pushlightuserdata(L, s); lua_pushlightuserdata(L, const_cast(op_name)); - lua_pushcclosure(L, RequestLog, FIVE_UPVALS); + lua_pushcclosure(L, RequestLog, FOUR_UPVALS); lua_rawset(L, -3); try { diff --git a/src/rgw/rgw_lua_request.h b/src/rgw/rgw_lua_request.h index d31e81b5a4fd..fcf0673f3ba7 100644 --- a/src/rgw/rgw_lua_request.h +++ b/src/rgw/rgw_lua_request.h @@ -5,7 +5,7 @@ class req_state; class RGWREST; -class OpsLogSocket; +class OpsLogSink; namespace rgw::sal { class Store; } @@ -16,7 +16,7 @@ namespace rgw::lua::request { int execute( rgw::sal::Store* store, RGWREST* rest, - OpsLogSocket* olog, + OpsLogSink* olog, req_state *s, const char* op_name, const std::string& script); diff --git a/src/rgw/rgw_main.cc b/src/rgw/rgw_main.cc index c44b0cba61ca..507c07ef2f97 100644 --- a/src/rgw/rgw_main.cc +++ b/src/rgw/rgw_main.cc @@ -518,12 +518,19 @@ int radosgw_Main(int argc, const char **argv) rgw::dmclock::SchedulerCtx sched_ctx{cct.get()}; - OpsLogSocket *olog = NULL; - + OpsLogManifold *olog = new OpsLogManifold(); if (!g_conf()->rgw_ops_log_socket_path.empty()) { - olog = new OpsLogSocket(g_ceph_context, g_conf()->rgw_ops_log_data_backlog); - olog->init(g_conf()->rgw_ops_log_socket_path); + OpsLogSocket* olog_socket = new OpsLogSocket(g_ceph_context, g_conf()->rgw_ops_log_data_backlog); + olog_socket->init(g_conf()->rgw_ops_log_socket_path); + olog->add_sink(olog_socket); + } + OpsLogFile* ops_log_file; + if (!g_conf()->rgw_ops_log_file_path.empty()) { + ops_log_file = new OpsLogFile(g_ceph_context, g_conf()->rgw_ops_log_file_path, g_conf()->rgw_ops_log_data_backlog); + ops_log_file->start(); + olog->add_sink(ops_log_file); } + olog->add_sink(new OpsLogRados(store)); r = signal_fd_init(); if (r < 0) { @@ -671,7 +678,6 @@ int radosgw_Main(int argc, const char **argv) shutdown_async_signal_handler(); rgw_log_usage_finalize(); - delete olog; StoreManager::close_storage(store); diff --git a/src/rgw/rgw_process.cc b/src/rgw/rgw_process.cc index 9cf78fe8809b..3f8c9c33efe6 100644 --- a/src/rgw/rgw_process.cc +++ b/src/rgw/rgw_process.cc @@ -189,7 +189,7 @@ int process_request(rgw::sal::Store* const store, const std::string& frontend_prefix, const rgw_auth_registry_t& auth_registry, RGWRestfulIO* const client_io, - OpsLogSocket* const olog, + OpsLogSink* const olog, optional_yield yield, rgw::dmclock::Scheduler *scheduler, string* user, @@ -356,7 +356,7 @@ done: } if (should_log) { - rgw_log_op(store, rest, s, (op ? op->name() : "unknown"), olog); + rgw_log_op(rest, s, (op ? op->name() : "unknown"), olog); } if (http_ret != nullptr) { diff --git a/src/rgw/rgw_process.h b/src/rgw/rgw_process.h index f8a97d16c29d..1aac4a6cdf1b 100644 --- a/src/rgw/rgw_process.h +++ b/src/rgw/rgw_process.h @@ -34,7 +34,7 @@ namespace rgw::dmclock { struct RGWProcessEnv { rgw::sal::Store* store; RGWREST *rest; - OpsLogSocket *olog; + OpsLogSink *olog; int port; std::string uri_prefix; std::shared_ptr auth_registry; @@ -49,7 +49,7 @@ protected: CephContext *cct; rgw::sal::Store* store; rgw_auth_registry_ptr_t auth_registry; - OpsLogSocket* olog; + OpsLogSink* olog; ThreadPool m_tp; Throttle req_throttle; RGWREST* rest; @@ -169,7 +169,7 @@ extern int process_request(rgw::sal::Store* store, const std::string& frontend_prefix, const rgw_auth_registry_t& auth_registry, RGWRestfulIO* client_io, - OpsLogSocket* olog, + OpsLogSink* olog, optional_yield y, rgw::dmclock::Scheduler *scheduler, std::string* user, diff --git a/src/test/fio/fio_librgw.cc b/src/test/fio/fio_librgw.cc index 0def8aac7240..b088b68f96c5 100644 --- a/src/test/fio/fio_librgw.cc +++ b/src/test/fio/fio_librgw.cc @@ -380,7 +380,7 @@ namespace { { librgw_data* data = static_cast(td->io_ops_data); const char* object = io_u->file->file_name; - struct rgw_file_handle* object_fh; + struct rgw_file_handle* object_fh = nullptr; size_t nbytes; int r = 0;