:Default: None
+``rgw_ops_log_file_path``
+
+:Description: The file for writing operations logs.
+:Type: String
+:Default: None
+
+
``rgw_ops_log_data_backlog``
:Description: The maximum data backlog data size for operations logs written
OPTION(rgw_enable_ops_log, OPT_BOOL) // enable logging every rgw operation
OPTION(rgw_enable_usage_log, OPT_BOOL) // enable logging bandwidth usage
OPTION(rgw_ops_log_rados, OPT_BOOL) // whether ops log should go to rados
+OPTION(rgw_ops_log_file_path, OPT_STR) // path to file where ops log can go
OPTION(rgw_ops_log_socket_path, OPT_STR) // path to unix domain socket where ops log can go
OPTION(rgw_ops_log_data_backlog, OPT_INT) // max data backlog for ops log
OPTION(rgw_fcgi_socket_backlog, OPT_INT) // socket backlog for fcgi
"RGW will send ops log data through it.")
.add_see_also({"rgw_enable_ops_log", "rgw_ops_log_data_backlog"}),
+ Option("rgw_ops_log_file_path", Option::TYPE_STR, Option::LEVEL_ADVANCED)
+ .set_default("")
+ .set_description("File-system path for ops log.")
+ .set_long_description("Path to file that RGW will log ops logs to.")
+ .add_see_also({"rgw_enable_ops_log", "rgw_ops_log_data_backlog"}),
+
Option("rgw_ops_log_data_backlog", Option::TYPE_SIZE, Option::LEVEL_ADVANCED)
.set_default(5 << 20)
.set_description("Ops log socket backlog")
"send info through unix domain socket. When data backlog is higher than this, "
"ops log entries will be lost. In order to avoid ops log information loss, the "
"listener needs to clear data (by reading it) quickly enough.")
- .add_see_also({"rgw_enable_ops_log", "rgw_ops_log_socket_path"}),
+ .add_see_also({"rgw_enable_ops_log", "rgw_ops_log_socket_path", "rgw_ops_log_file_path"}),
Option("rgw_fcgi_socket_backlog", Option::TYPE_INT, Option::LEVEL_ADVANCED)
.set_default(1024)
<< e.what() << dendl;
}
if (should_log) {
- rgw_log_op(store->getRados(), nullptr /* !rest */, s,
- (op ? op->name() : "unknown"), olog);
+ rgw_log_op(nullptr /* !rest */, s, (op ? op->name() : "unknown"), olog);
}
int http_ret = s->err.http_ret;
// XXX ex-RGWRESTMgr_lib, mgr->set_logging(true)
+ OpsLogManifold* olog_manifold = new OpsLogManifold();
if (!g_conf()->rgw_ops_log_socket_path.empty()) {
- olog = new OpsLogSocket(g_ceph_context, g_conf()->rgw_ops_log_data_backlog);
- olog->init(g_conf()->rgw_ops_log_socket_path);
+ OpsLogSocket* olog_socket = new OpsLogSocket(g_ceph_context, g_conf()->rgw_ops_log_data_backlog);
+ olog_socket->init(g_conf()->rgw_ops_log_socket_path);
+ olog_manifold->add_sink(olog_socket);
}
+ OpsLogFile* ops_log_file;
+ if (!g_conf()->rgw_ops_log_file_path.empty()) {
+ ops_log_file = new OpsLogFile(g_ceph_context, g_conf()->rgw_ops_log_file_path, g_conf()->rgw_ops_log_data_backlog);
+ ops_log_file->start();
+ olog_manifold->add_sink(ops_log_file);
+ }
+ olog_manifold->add_sink(new OpsLogRados(store->getRados()));
+ olog = olog_manifold;
int port = 80;
RGWProcessEnv env = { store, &rest, olog, port };
shutdown_async_signal_handler();
rgw_log_usage_finalize();
-
+
delete olog;
RGWStoreManager::close_storage(store);
#define dout_subsys ceph_subsys_rgw
-class OpsLogSocket;
+class OpsLogSink;
namespace rgw {
class RGWLib : public DoutPrefixProvider {
RGWFrontendConfig* fec;
RGWLibFrontend* fe;
- OpsLogSocket* olog;
+ OpsLogSink* olog;
rgw::LDAPHelper* ldh{nullptr};
RGWREST rest; // XXX needed for RGWProcessEnv
rgw::sal::RGWRadosStore* store;
#include "services/svc_zone.h"
+#include <chrono>
+#include <math.h>
+
#define dout_subsys ceph_subsys_rgw
static void set_param_str(struct req_state *s, const char *name, string& str)
formatter->close_section();
}
-void OpsLogSocket::formatter_to_bl(bufferlist& bl)
+OpsLogManifold::~OpsLogManifold()
+{
+ for (const auto &sink : sinks) {
+ delete sink;
+ }
+}
+
+void OpsLogManifold::add_sink(OpsLogSink* sink)
+{
+ sinks.push_back(sink);
+}
+
+int OpsLogManifold::log(struct req_state* s, struct rgw_log_entry& entry)
+{
+ int ret = 0;
+ for (const auto &sink : sinks) {
+ if (sink->log(s, entry) < 0) {
+ ret = -1;
+ }
+ }
+ return ret;
+}
+
+OpsLogFile::OpsLogFile(CephContext* cct, std::string& path, uint64_t max_data_size) :
+ cct(cct), file(path, std::ofstream::app), data_size(0), max_data_size(max_data_size)
+{
+}
+
+void OpsLogFile::flush()
+{
+ std::scoped_lock flush_lock(flush_mutex);
+ {
+ std::scoped_lock log_lock(log_mutex);
+ assert(flush_buffer.empty());
+ flush_buffer.swap(log_buffer);
+ data_size = 0;
+ }
+ for (auto bl : flush_buffer) {
+ int try_num = 0;
+ while (true) {
+ bl.write_stream(file);
+ if (!file) {
+ ldpp_dout(this, 0) << "ERROR: failed to log RGW ops log file entry" << dendl;
+ file.clear();
+ if (stopped) {
+ break;
+ }
+ int sleep_time_secs = std::min((int) pow(2, try_num), 60);
+ std::this_thread::sleep_for(std::chrono::seconds(sleep_time_secs));
+ try_num++;
+ } else {
+ break;
+ }
+ }
+ }
+ flush_buffer.clear();
+ file << std::endl;
+}
+
+void* OpsLogFile::entry() {
+ std::unique_lock lock(log_mutex);
+ while (!stopped) {
+ if (!log_buffer.empty()) {
+ lock.unlock();
+ flush();
+ lock.lock();
+ continue;
+ }
+ cond_flush.wait(lock);
+ }
+ flush();
+ return NULL;
+}
+
+void OpsLogFile::start() {
+ stopped = false;
+ create("ops_log_file");
+}
+
+void OpsLogFile::stop() {
+ {
+ cond_flush.notify_one();
+ stopped = true;
+ }
+ join();
+}
+
+OpsLogFile::~OpsLogFile()
+{
+ if (!stopped) {
+ stop();
+ }
+ file.close();
+}
+
+int OpsLogFile::log_json(struct req_state* s, bufferlist& bl)
+{
+ std::unique_lock lock(log_mutex);
+ if (data_size + bl.length() >= max_data_size) {
+ ldout(s->cct, 0) << "ERROR: RGW ops log file buffer too full, dropping log for txn: " << s->trans_id << dendl;
+ return -1;
+ }
+ log_buffer.push_back(bl);
+ data_size += bl.length();
+ cond_flush.notify_all();
+ return 0;
+}
+
+JsonOpsLogSink::JsonOpsLogSink() {
+ formatter = new JSONFormatter;
+}
+
+JsonOpsLogSink::~JsonOpsLogSink() {
+ delete formatter;
+}
+
+void JsonOpsLogSink::formatter_to_bl(bufferlist& bl)
{
stringstream ss;
formatter->flush(ss);
const string& s = ss.str();
-
bl.append(s);
}
+int JsonOpsLogSink::log(struct req_state* s, struct rgw_log_entry& entry)
+{
+ bufferlist bl;
+
+ lock.lock();
+ rgw_format_ops_log_entry(entry, formatter);
+ formatter_to_bl(bl);
+ lock.unlock();
+
+ return log_json(s, bl);
+}
+
void OpsLogSocket::init_connection(bufferlist& bl)
{
bl.append("[");
OpsLogSocket::OpsLogSocket(CephContext *cct, uint64_t _backlog) : OutputDataSocket(cct, _backlog)
{
- formatter = new JSONFormatter;
delim.append(",\n");
}
-OpsLogSocket::~OpsLogSocket()
+int OpsLogSocket::log_json(struct req_state* s, bufferlist& bl)
{
- delete formatter;
+ append_output(bl);
+ return 0;
}
-void OpsLogSocket::log(struct rgw_log_entry& entry)
+OpsLogRados::OpsLogRados(RGWRados* store): store(store)
{
+}
+
+int OpsLogRados::log(struct req_state* s, struct rgw_log_entry& entry)
+{
+ if (!s->cct->_conf->rgw_ops_log_rados) {
+ return 0;
+ }
bufferlist bl;
+ encode(entry, bl);
- lock.lock();
- rgw_format_ops_log_entry(entry, formatter);
- formatter_to_bl(bl);
- lock.unlock();
+ struct tm bdt;
+ time_t t = req_state::Clock::to_time_t(entry.time);
+ if (s->cct->_conf->rgw_log_object_name_utc)
+ gmtime_r(&t, &bdt);
+ else
+ localtime_r(&t, &bdt);
- append_output(bl);
+ string oid = render_log_object_name(s->cct->_conf->rgw_log_object_name, &bdt,
+ entry.bucket_id, entry.bucket);
+ rgw_raw_obj obj(store->svc.zone->get_zone_params().log_pool, oid);
+ int ret = store->append_async(s, obj, bl.length(), bl);
+ if (ret == -ENOENT) {
+ ret = store->create_pool(s, store->svc.zone->get_zone_params().log_pool);
+ if (ret < 0)
+ goto done;
+ // retry
+ ret = store->append_async(s, obj, bl.length(), bl);
+ }
+done:
+ if (ret < 0) {
+ ldpp_dout(s, 0) << "ERROR: failed to log RADOS RGW ops log entry for txn: " << s->trans_id << dendl;
+ }
+ return ret;
}
-int rgw_log_op(RGWRados *store, RGWREST* const rest, struct req_state *s,
- const string& op_name, OpsLogSocket *olog)
+int rgw_log_op(RGWREST* const rest, struct req_state *s, const string& op_name, OpsLogSink *olog)
{
struct rgw_log_entry entry;
string bucket_id;
char buf[16];
snprintf(buf, sizeof(buf), "%d", s->err.http_ret);
entry.http_status = buf;
- } else
+ } else {
entry.http_status = "200"; // default
-
+ }
entry.error_code = s->err.err_code;
entry.bucket_id = bucket_id;
entry.trans_id = s->trans_id;
-
- bufferlist bl;
- encode(entry, bl);
-
- struct tm bdt;
- time_t t = req_state::Clock::to_time_t(entry.time);
- if (s->cct->_conf->rgw_log_object_name_utc)
- gmtime_r(&t, &bdt);
- else
- localtime_r(&t, &bdt);
-
- int ret = 0;
-
- if (s->cct->_conf->rgw_ops_log_rados) {
- string oid = render_log_object_name(s->cct->_conf->rgw_log_object_name, &bdt,
- entry.bucket_id, entry.bucket);
-
- rgw_raw_obj obj(store->svc.zone->get_zone_params().log_pool, oid);
-
- ret = store->append_async(s, obj, bl.length(), bl);
- if (ret == -ENOENT) {
- ret = store->create_pool(s, store->svc.zone->get_zone_params().log_pool);
- if (ret < 0)
- goto done;
- // retry
- ret = store->append_async(s, obj, bl.length(), bl);
- }
- }
-
if (olog) {
- olog->log(entry);
+ return olog->log(s, entry);
}
-done:
- if (ret < 0)
- ldpp_dout(s, 0) << "ERROR: failed to log entry" << dendl;
-
- return ret;
+ return 0;
}
-
#include <boost/container/flat_map.hpp>
#include "rgw_common.h"
#include "common/OutputDataSocket.h"
+#include <vector>
+#include <fstream>
+
+#define dout_subsys ceph_subsys_rgw
class RGWRados;
};
WRITE_CLASS_ENCODER(rgw_log_entry)
-class OpsLogSocket : public OutputDataSocket {
+class OpsLogSink {
+public:
+ virtual int log(struct req_state* s, struct rgw_log_entry& entry) = 0;
+ virtual ~OpsLogSink() = default;
+};
+
+class OpsLogManifold: public OpsLogSink {
+ std::vector<OpsLogSink*> sinks;
+public:
+ ~OpsLogManifold() override;
+ void add_sink(OpsLogSink* sink);
+ int log(struct req_state* s, struct rgw_log_entry& entry) override;
+};
+
+class JsonOpsLogSink : public OpsLogSink {
ceph::Formatter *formatter;
- ceph::mutex lock = ceph::make_mutex("OpsLogSocket");
+ ceph::mutex lock = ceph::make_mutex("JsonOpsLogSink");
void formatter_to_bl(bufferlist& bl);
+protected:
+ virtual int log_json(struct req_state* s, bufferlist& bl) = 0;
+public:
+ JsonOpsLogSink();
+ ~JsonOpsLogSink() override;
+ int log(struct req_state* s, struct rgw_log_entry& entry) override;
+};
+
+class OpsLogFile : public JsonOpsLogSink, public Thread, public DoutPrefixProvider {
+ CephContext* cct;
+ ceph::mutex log_mutex = ceph::make_mutex("OpsLogFile_log");
+ ceph::mutex flush_mutex = ceph::make_mutex("OpsLogFile_flush");
+ std::vector<bufferlist> log_buffer;
+ std::vector<bufferlist> flush_buffer;
+ std::condition_variable cond_flush;
+ std::ofstream file;
+ bool stopped;
+ uint64_t data_size;
+ uint64_t max_data_size;
+
+ void flush();
+protected:
+ int log_json(struct req_state* s, bufferlist& bl) override;
+ void *entry() override;
+public:
+ OpsLogFile(CephContext* cct, std::string& path, uint64_t max_data_size);
+ ~OpsLogFile() override;
+ CephContext *get_cct() const override { return cct; }
+ unsigned get_subsys() const override { return dout_subsys; }
+ std::ostream& gen_prefix(std::ostream& out) const override { return out << "rgw OpsLogFile: "; }
+ void start();
+ void stop();
+};
+class OpsLogSocket : public OutputDataSocket, public JsonOpsLogSink {
protected:
+ int log_json(struct req_state* s, bufferlist& bl) override;
void init_connection(bufferlist& bl) override;
public:
OpsLogSocket(CephContext *cct, uint64_t _backlog);
- ~OpsLogSocket() override;
+};
- void log(struct rgw_log_entry& entry);
+class OpsLogRados : public OpsLogSink {
+ RGWRados* store;
+public:
+ OpsLogRados(RGWRados* store);
+ int log(struct req_state* s, struct rgw_log_entry& entry) override;
};
class RGWREST;
-int rgw_log_op(RGWRados *store, RGWREST* const rest, struct req_state *s,
- const string& op_name, OpsLogSocket *olog);
+int rgw_log_op(RGWREST* const rest, struct req_state* s,
+ const std::string& op_name, OpsLogSink* olog);
void rgw_log_usage_init(CephContext *cct, RGWRados *store);
void rgw_log_usage_finalize();
void rgw_format_ops_log_entry(struct rgw_log_entry& entry,
int RequestLog(lua_State* L)
{
- const auto store = reinterpret_cast<rgw::sal::RGWRadosStore*>(lua_touserdata(L, lua_upvalueindex(1)));
- const auto rest = reinterpret_cast<RGWREST*>(lua_touserdata(L, lua_upvalueindex(2)));
- const auto olog = reinterpret_cast<OpsLogSocket*>(lua_touserdata(L, lua_upvalueindex(3)));
- const auto s = reinterpret_cast<req_state*>(lua_touserdata(L, lua_upvalueindex(4)));
- const std::string op_name(reinterpret_cast<const char*>(lua_touserdata(L, lua_upvalueindex(5))));
- if (store && s) {
- const auto rc = rgw_log_op(store->getRados(), rest, s, op_name, olog);
+ const auto rest = reinterpret_cast<RGWREST*>(lua_touserdata(L, lua_upvalueindex(1)));
+ const auto olog = reinterpret_cast<OpsLogSink*>(lua_touserdata(L, lua_upvalueindex(2)));
+ const auto s = reinterpret_cast<req_state*>(lua_touserdata(L, lua_upvalueindex(3)));
+ const std::string op_name(reinterpret_cast<const char*>(lua_touserdata(L, lua_upvalueindex(4))));
+ if (s) {
+ const auto rc = rgw_log_op(rest, s, op_name, olog);
lua_pushinteger(L, rc);
} else {
- ldpp_dout(s, 1) << "Lua ERROR: missing rados store, cannot use ops log" << dendl;
+ ldpp_dout(s, 1) << "Lua ERROR: missing request state, cannot use ops log" << dendl;
lua_pushinteger(L, -EINVAL);
}
int execute(
rgw::sal::RGWRadosStore* store,
RGWREST* rest,
- OpsLogSocket* olog,
+ OpsLogSink* olog,
req_state* s,
const char* op_name,
const std::string& script)
lua_getglobal(L, RequestMetaTable::TableName().c_str());
ceph_assert(lua_istable(L, -1));
pushstring(L, RequestLogAction);
- lua_pushlightuserdata(L, store);
lua_pushlightuserdata(L, rest);
lua_pushlightuserdata(L, olog);
lua_pushlightuserdata(L, s);
lua_pushlightuserdata(L, const_cast<char*>(op_name));
- lua_pushcclosure(L, RequestLog, FIVE_UPVALS);
+ lua_pushcclosure(L, RequestLog, FOUR_UPVALS);
lua_rawset(L, -3);
try {
class req_state;
class RGWREST;
-class OpsLogSocket;
+class OpsLogSink;
namespace rgw::sal {
class RGWRadosStore;
}
int execute(
rgw::sal::RGWRadosStore* store,
RGWREST* rest,
- OpsLogSocket* olog,
+ OpsLogSink* olog,
req_state *s,
const char* op_name,
const std::string& script);
rgw::dmclock::SchedulerCtx sched_ctx{cct.get()};
- OpsLogSocket *olog = NULL;
-
+ OpsLogManifold *olog = new OpsLogManifold();
if (!g_conf()->rgw_ops_log_socket_path.empty()) {
- olog = new OpsLogSocket(g_ceph_context, g_conf()->rgw_ops_log_data_backlog);
- olog->init(g_conf()->rgw_ops_log_socket_path);
+ OpsLogSocket* olog_socket = new OpsLogSocket(g_ceph_context, g_conf()->rgw_ops_log_data_backlog);
+ olog_socket->init(g_conf()->rgw_ops_log_socket_path);
+ olog->add_sink(olog_socket);
+ }
+ OpsLogFile* ops_log_file;
+ if (!g_conf()->rgw_ops_log_file_path.empty()) {
+ ops_log_file = new OpsLogFile(g_ceph_context, g_conf()->rgw_ops_log_file_path, g_conf()->rgw_ops_log_data_backlog);
+ ops_log_file->start();
+ olog->add_sink(ops_log_file);
}
+ olog->add_sink(new OpsLogRados(store->getRados()));
r = signal_fd_init();
if (r < 0) {
shutdown_async_signal_handler();
rgw_log_usage_finalize();
-
delete olog;
RGWStoreManager::close_storage(store);
const std::string& frontend_prefix,
const rgw_auth_registry_t& auth_registry,
RGWRestfulIO* const client_io,
- OpsLogSocket* const olog,
+ OpsLogSink* const olog,
optional_yield yield,
rgw::dmclock::Scheduler *scheduler,
string* user,
}
if (should_log) {
- rgw_log_op(store->getRados(), rest, s, (op ? op->name() : "unknown"), olog);
+ rgw_log_op(rest, s, (op ? op->name() : "unknown"), olog);
}
if (http_ret != nullptr) {
struct RGWProcessEnv {
rgw::sal::RGWRadosStore *store;
RGWREST *rest;
- OpsLogSocket *olog;
+ OpsLogSink *olog;
int port;
std::string uri_prefix;
std::shared_ptr<rgw::auth::StrategyRegistry> auth_registry;
CephContext *cct;
rgw::sal::RGWRadosStore* store;
rgw_auth_registry_ptr_t auth_registry;
- OpsLogSocket* olog;
+ OpsLogSink* olog;
ThreadPool m_tp;
Throttle req_throttle;
RGWREST* rest;
const std::string& frontend_prefix,
const rgw_auth_registry_t& auth_registry,
RGWRestfulIO* client_io,
- OpsLogSocket* olog,
+ OpsLogSink* olog,
optional_yield y,
rgw::dmclock::Scheduler *scheduler,
std::string* user,
{
librgw_data* data = static_cast<librgw_data*>(td->io_ops_data);
const char* object = io_u->file->file_name;
- struct rgw_file_handle* object_fh;
+ struct rgw_file_handle* object_fh = nullptr;
size_t nbytes;
int r = 0;