- rgw_log_object_name
- rgw_ops_log_rados
- rgw_ops_log_socket_path
+ - rgw_ops_log_file_path
with_legacy: true
# enable logging bandwidth usage
- name: rgw_enable_usage_log
- rgw_enable_ops_log
- rgw_ops_log_data_backlog
with_legacy: true
+# path to file where ops log can go
+- name: rgw_ops_log_file_path
+ type: str
+ level: advanced
+ desc: File-system path for ops log.
+ long_desc: Path to file that RGW will log ops logs to.
+ fmt_desc: The file-system path for writing operations logs.
+ services:
+ - rgw
+ see_also:
+ - rgw_enable_ops_log
+ with_legacy: true
# max data backlog for ops log
- name: rgw_ops_log_data_backlog
type: size
<< e.what() << dendl;
}
if (should_log) {
- rgw_log_op(store, nullptr /* !rest */, s,
- (op ? op->name() : "unknown"), olog);
+ rgw_log_op(nullptr /* !rest */, s, (op ? op->name() : "unknown"), olog);
}
int http_ret = s->err.http_ret;
// XXX ex-RGWRESTMgr_lib, mgr->set_logging(true)
+ OpsLogManifold* olog_manifold = new OpsLogManifold();
if (!g_conf()->rgw_ops_log_socket_path.empty()) {
- olog = new OpsLogSocket(g_ceph_context, g_conf()->rgw_ops_log_data_backlog);
- olog->init(g_conf()->rgw_ops_log_socket_path);
+ OpsLogSocket* olog_socket = new OpsLogSocket(g_ceph_context, g_conf()->rgw_ops_log_data_backlog);
+ olog_socket->init(g_conf()->rgw_ops_log_socket_path);
+ olog_manifold->add_sink(olog_socket);
}
+ OpsLogFile* ops_log_file;
+ if (!g_conf()->rgw_ops_log_file_path.empty()) {
+ ops_log_file = new OpsLogFile(g_ceph_context, g_conf()->rgw_ops_log_file_path, g_conf()->rgw_ops_log_data_backlog);
+ ops_log_file->start();
+ olog_manifold->add_sink(ops_log_file);
+ }
+ olog_manifold->add_sink(new OpsLogRados(store));
+ olog = olog_manifold;
int port = 80;
RGWProcessEnv env = { store, &rest, olog, port };
shutdown_async_signal_handler();
rgw_log_usage_finalize();
-
+
delete olog;
StoreManager::close_storage(store);
#define dout_subsys ceph_subsys_rgw
-class OpsLogSocket;
+class OpsLogSink;
namespace rgw {
class RGWLib : public DoutPrefixProvider {
RGWFrontendConfig* fec;
RGWLibFrontend* fe;
- OpsLogSocket* olog;
+ OpsLogSink* olog;
rgw::LDAPHelper* ldh{nullptr};
RGWREST rest; // XXX needed for RGWProcessEnv
rgw::sal::Store* store;
#include "services/svc_zone.h"
+#include <chrono>
+#include <math.h>
+
#define dout_subsys ceph_subsys_rgw
using namespace std;
formatter->close_section();
}
-void OpsLogSocket::formatter_to_bl(bufferlist& bl)
+OpsLogManifold::~OpsLogManifold()
{
- stringstream ss;
- formatter->flush(ss);
- const string& s = ss.str();
+ for (const auto &sink : sinks) {
+ delete sink;
+ }
+}
- bl.append(s);
+void OpsLogManifold::add_sink(OpsLogSink* sink)
+{
+ sinks.push_back(sink);
}
-void OpsLogSocket::init_connection(bufferlist& bl)
+int OpsLogManifold::log(struct req_state* s, struct rgw_log_entry& entry)
{
- bl.append("[");
+ int ret = 0;
+ for (const auto &sink : sinks) {
+ if (sink->log(s, entry) < 0) {
+ ret = -1;
+ }
+ }
+ return ret;
}
-OpsLogSocket::OpsLogSocket(CephContext *cct, uint64_t _backlog) : OutputDataSocket(cct, _backlog)
+OpsLogFile::OpsLogFile(CephContext* cct, std::string& path, uint64_t max_data_size) :
+ cct(cct), file(path, std::ofstream::app), data_size(0), max_data_size(max_data_size)
{
- formatter = new JSONFormatter;
- delim.append(",\n");
}
-OpsLogSocket::~OpsLogSocket()
+void OpsLogFile::flush()
+{
+ std::scoped_lock flush_lock(flush_mutex);
+ {
+ std::scoped_lock log_lock(log_mutex);
+ assert(flush_buffer.empty());
+ flush_buffer.swap(log_buffer);
+ data_size = 0;
+ }
+ for (auto bl : flush_buffer) {
+ int try_num = 0;
+ while (true) {
+ bl.write_stream(file);
+ if (!file) {
+ ldpp_dout(this, 0) << "ERROR: failed to log RGW ops log file entry" << dendl;
+ file.clear();
+ if (stopped) {
+ break;
+ }
+ int sleep_time_secs = std::min((int) pow(2, try_num), 60);
+ std::this_thread::sleep_for(std::chrono::seconds(sleep_time_secs));
+ try_num++;
+ } else {
+ break;
+ }
+ }
+ }
+ flush_buffer.clear();
+ file << std::endl;
+}
+
+void* OpsLogFile::entry() {
+ std::unique_lock lock(log_mutex);
+ while (!stopped) {
+ if (!log_buffer.empty()) {
+ lock.unlock();
+ flush();
+ lock.lock();
+ continue;
+ }
+ cond_flush.wait(lock);
+ }
+ flush();
+ return NULL;
+}
+
+void OpsLogFile::start() {
+ stopped = false;
+ create("ops_log_file");
+}
+
+void OpsLogFile::stop() {
+ {
+ cond_flush.notify_one();
+ stopped = true;
+ }
+ join();
+}
+
+OpsLogFile::~OpsLogFile()
{
+ if (!stopped) {
+ stop();
+ }
+ file.close();
+}
+
+int OpsLogFile::log_json(struct req_state* s, bufferlist& bl)
+{
+ std::unique_lock lock(log_mutex);
+ if (data_size + bl.length() >= max_data_size) {
+ ldout(s->cct, 0) << "ERROR: RGW ops log file buffer too full, dropping log for txn: " << s->trans_id << dendl;
+ return -1;
+ }
+ log_buffer.push_back(bl);
+ data_size += bl.length();
+ cond_flush.notify_all();
+ return 0;
+}
+
+JsonOpsLogSink::JsonOpsLogSink() {
+ formatter = new JSONFormatter;
+}
+
+JsonOpsLogSink::~JsonOpsLogSink() {
delete formatter;
}
-void OpsLogSocket::log(struct rgw_log_entry& entry)
+void JsonOpsLogSink::formatter_to_bl(bufferlist& bl)
+{
+ stringstream ss;
+ formatter->flush(ss);
+ const string& s = ss.str();
+ bl.append(s);
+}
+
+int JsonOpsLogSink::log(struct req_state* s, struct rgw_log_entry& entry)
{
bufferlist bl;
formatter_to_bl(bl);
lock.unlock();
+ return log_json(s, bl);
+}
+
+void OpsLogSocket::init_connection(bufferlist& bl)
+{
+ bl.append("[");
+}
+
+OpsLogSocket::OpsLogSocket(CephContext *cct, uint64_t _backlog) : OutputDataSocket(cct, _backlog)
+{
+ delim.append(",\n");
+}
+
+int OpsLogSocket::log_json(struct req_state* s, bufferlist& bl)
+{
append_output(bl);
+ return 0;
}
-int rgw_log_op(rgw::sal::Store* store, RGWREST* const rest, struct req_state *s,
- const string& op_name, OpsLogSocket *olog)
+OpsLogRados::OpsLogRados(rgw::sal::Store* store): store(store)
+{
+}
+
+int OpsLogRados::log(struct req_state* s, struct rgw_log_entry& entry)
+{
+ if (!s->cct->_conf->rgw_ops_log_rados) {
+ return 0;
+ }
+ bufferlist bl;
+ encode(entry, bl);
+
+ struct tm bdt;
+ time_t t = req_state::Clock::to_time_t(entry.time);
+ if (s->cct->_conf->rgw_log_object_name_utc)
+ gmtime_r(&t, &bdt);
+ else
+ localtime_r(&t, &bdt);
+ string oid = render_log_object_name(s->cct->_conf->rgw_log_object_name, &bdt,
+ entry.bucket_id, entry.bucket);
+ if (store->log_op(s, oid, bl) < 0) {
+ ldpp_dout(s, 0) << "ERROR: failed to log RADOS RGW ops log entry for txn: " << s->trans_id << dendl;
+ return -1;
+ }
+ return 0;
+}
+
+int rgw_log_op(RGWREST* const rest, struct req_state *s, const string& op_name, OpsLogSink *olog)
{
struct rgw_log_entry entry;
string bucket_id;
char buf[16];
snprintf(buf, sizeof(buf), "%d", s->err.http_ret);
entry.http_status = buf;
- } else
+ } else {
entry.http_status = "200"; // default
-
+ }
entry.error_code = s->err.err_code;
entry.bucket_id = bucket_id;
entry.trans_id = s->trans_id;
-
- bufferlist bl;
- encode(entry, bl);
-
- struct tm bdt;
- time_t t = req_state::Clock::to_time_t(entry.time);
- if (s->cct->_conf->rgw_log_object_name_utc)
- gmtime_r(&t, &bdt);
- else
- localtime_r(&t, &bdt);
-
- int ret = 0;
-
- if (s->cct->_conf->rgw_ops_log_rados) {
- string oid = render_log_object_name(s->cct->_conf->rgw_log_object_name, &bdt,
- entry.bucket_id, entry.bucket);
- ret = store->log_op(s, oid, bl);
- }
-
if (olog) {
- olog->log(entry);
+ return olog->log(s, entry);
}
- if (ret < 0)
- ldpp_dout(s, 0) << "ERROR: failed to log entry" << dendl;
-
- return ret;
+ return 0;
}
-
#include <boost/container/flat_map.hpp>
#include "rgw_common.h"
#include "common/OutputDataSocket.h"
+#include <vector>
+#include <fstream>
+
+#define dout_subsys ceph_subsys_rgw
namespace rgw { namespace sal {
class Store;
};
WRITE_CLASS_ENCODER(rgw_log_entry)
-class OpsLogSocket : public OutputDataSocket {
+class OpsLogSink {
+public:
+ virtual int log(struct req_state* s, struct rgw_log_entry& entry) = 0;
+ virtual ~OpsLogSink() = default;
+};
+
+class OpsLogManifold: public OpsLogSink {
+ std::vector<OpsLogSink*> sinks;
+public:
+ ~OpsLogManifold() override;
+ void add_sink(OpsLogSink* sink);
+ int log(struct req_state* s, struct rgw_log_entry& entry) override;
+};
+
+class JsonOpsLogSink : public OpsLogSink {
ceph::Formatter *formatter;
- ceph::mutex lock = ceph::make_mutex("OpsLogSocket");
+ ceph::mutex lock = ceph::make_mutex("JsonOpsLogSink");
void formatter_to_bl(bufferlist& bl);
+protected:
+ virtual int log_json(struct req_state* s, bufferlist& bl) = 0;
+public:
+ JsonOpsLogSink();
+ ~JsonOpsLogSink() override;
+ int log(struct req_state* s, struct rgw_log_entry& entry) override;
+};
+
+class OpsLogFile : public JsonOpsLogSink, public Thread, public DoutPrefixProvider {
+ CephContext* cct;
+ ceph::mutex log_mutex = ceph::make_mutex("OpsLogFile_log");
+ ceph::mutex flush_mutex = ceph::make_mutex("OpsLogFile_flush");
+ std::vector<bufferlist> log_buffer;
+ std::vector<bufferlist> flush_buffer;
+ std::condition_variable cond_flush;
+ std::ofstream file;
+ bool stopped;
+ uint64_t data_size;
+ uint64_t max_data_size;
+
+ void flush();
+protected:
+ int log_json(struct req_state* s, bufferlist& bl) override;
+ void *entry() override;
+public:
+ OpsLogFile(CephContext* cct, std::string& path, uint64_t max_data_size);
+ ~OpsLogFile() override;
+ CephContext *get_cct() const override { return cct; }
+ unsigned get_subsys() const override { return dout_subsys; }
+ std::ostream& gen_prefix(std::ostream& out) const override { return out << "rgw OpsLogFile: "; }
+ void start();
+ void stop();
+};
+class OpsLogSocket : public OutputDataSocket, public JsonOpsLogSink {
protected:
+ int log_json(struct req_state* s, bufferlist& bl) override;
void init_connection(bufferlist& bl) override;
public:
OpsLogSocket(CephContext *cct, uint64_t _backlog);
- ~OpsLogSocket() override;
+};
- void log(struct rgw_log_entry& entry);
+class OpsLogRados : public OpsLogSink {
+ rgw::sal::Store* store;
+public:
+ OpsLogRados(rgw::sal::Store* store);
+ int log(struct req_state* s, struct rgw_log_entry& entry) override;
};
class RGWREST;
-int rgw_log_op(rgw::sal::Store* store, RGWREST* const rest, struct req_state* s,
- const std::string& op_name, OpsLogSocket* olog);
+int rgw_log_op(RGWREST* const rest, struct req_state* s,
+ const std::string& op_name, OpsLogSink* olog);
void rgw_log_usage_init(CephContext* cct, rgw::sal::Store* store);
void rgw_log_usage_finalize();
void rgw_format_ops_log_entry(struct rgw_log_entry& entry,
int RequestLog(lua_State* L)
{
- const auto store = reinterpret_cast<rgw::sal::RadosStore*>(lua_touserdata(L, lua_upvalueindex(1)));
- const auto rest = reinterpret_cast<RGWREST*>(lua_touserdata(L, lua_upvalueindex(2)));
- const auto olog = reinterpret_cast<OpsLogSocket*>(lua_touserdata(L, lua_upvalueindex(3)));
- const auto s = reinterpret_cast<req_state*>(lua_touserdata(L, lua_upvalueindex(4)));
- const std::string op_name(reinterpret_cast<const char*>(lua_touserdata(L, lua_upvalueindex(5))));
- if (store && s) {
- const auto rc = rgw_log_op(store, rest, s, op_name, olog);
+ const auto rest = reinterpret_cast<RGWREST*>(lua_touserdata(L, lua_upvalueindex(1)));
+ const auto olog = reinterpret_cast<OpsLogSink*>(lua_touserdata(L, lua_upvalueindex(2)));
+ const auto s = reinterpret_cast<req_state*>(lua_touserdata(L, lua_upvalueindex(3)));
+ const std::string op_name(reinterpret_cast<const char*>(lua_touserdata(L, lua_upvalueindex(4))));
+ if (s) {
+ const auto rc = rgw_log_op(rest, s, op_name, olog);
lua_pushinteger(L, rc);
} else {
- ldpp_dout(s, 1) << "Lua ERROR: missing rados store, cannot use ops log" << dendl;
+ ldpp_dout(s, 1) << "Lua ERROR: missing request state, cannot use ops log" << dendl;
lua_pushinteger(L, -EINVAL);
}
int execute(
rgw::sal::Store* store,
RGWREST* rest,
- OpsLogSocket* olog,
+ OpsLogSink* olog,
req_state* s,
const char* op_name,
const std::string& script)
lua_getglobal(L, RequestMetaTable::TableName().c_str());
ceph_assert(lua_istable(L, -1));
pushstring(L, RequestLogAction);
- lua_pushlightuserdata(L, store);
lua_pushlightuserdata(L, rest);
lua_pushlightuserdata(L, olog);
lua_pushlightuserdata(L, s);
lua_pushlightuserdata(L, const_cast<char*>(op_name));
- lua_pushcclosure(L, RequestLog, FIVE_UPVALS);
+ lua_pushcclosure(L, RequestLog, FOUR_UPVALS);
lua_rawset(L, -3);
try {
class req_state;
class RGWREST;
-class OpsLogSocket;
+class OpsLogSink;
namespace rgw::sal {
class Store;
}
int execute(
rgw::sal::Store* store,
RGWREST* rest,
- OpsLogSocket* olog,
+ OpsLogSink* olog,
req_state *s,
const char* op_name,
const std::string& script);
rgw::dmclock::SchedulerCtx sched_ctx{cct.get()};
- OpsLogSocket *olog = NULL;
-
+ OpsLogManifold *olog = new OpsLogManifold();
if (!g_conf()->rgw_ops_log_socket_path.empty()) {
- olog = new OpsLogSocket(g_ceph_context, g_conf()->rgw_ops_log_data_backlog);
- olog->init(g_conf()->rgw_ops_log_socket_path);
+ OpsLogSocket* olog_socket = new OpsLogSocket(g_ceph_context, g_conf()->rgw_ops_log_data_backlog);
+ olog_socket->init(g_conf()->rgw_ops_log_socket_path);
+ olog->add_sink(olog_socket);
+ }
+ OpsLogFile* ops_log_file;
+ if (!g_conf()->rgw_ops_log_file_path.empty()) {
+ ops_log_file = new OpsLogFile(g_ceph_context, g_conf()->rgw_ops_log_file_path, g_conf()->rgw_ops_log_data_backlog);
+ ops_log_file->start();
+ olog->add_sink(ops_log_file);
}
+ olog->add_sink(new OpsLogRados(store));
r = signal_fd_init();
if (r < 0) {
shutdown_async_signal_handler();
rgw_log_usage_finalize();
-
delete olog;
StoreManager::close_storage(store);
const std::string& frontend_prefix,
const rgw_auth_registry_t& auth_registry,
RGWRestfulIO* const client_io,
- OpsLogSocket* const olog,
+ OpsLogSink* const olog,
optional_yield yield,
rgw::dmclock::Scheduler *scheduler,
string* user,
}
if (should_log) {
- rgw_log_op(store, rest, s, (op ? op->name() : "unknown"), olog);
+ rgw_log_op(rest, s, (op ? op->name() : "unknown"), olog);
}
if (http_ret != nullptr) {
struct RGWProcessEnv {
rgw::sal::Store* store;
RGWREST *rest;
- OpsLogSocket *olog;
+ OpsLogSink *olog;
int port;
std::string uri_prefix;
std::shared_ptr<rgw::auth::StrategyRegistry> auth_registry;
CephContext *cct;
rgw::sal::Store* store;
rgw_auth_registry_ptr_t auth_registry;
- OpsLogSocket* olog;
+ OpsLogSink* olog;
ThreadPool m_tp;
Throttle req_throttle;
RGWREST* rest;
const std::string& frontend_prefix,
const rgw_auth_registry_t& auth_registry,
RGWRestfulIO* client_io,
- OpsLogSocket* olog,
+ OpsLogSink* olog,
optional_yield y,
rgw::dmclock::Scheduler *scheduler,
std::string* user,
{
librgw_data* data = static_cast<librgw_data*>(td->io_ops_data);
const char* object = io_u->file->file_name;
- struct rgw_file_handle* object_fh;
+ struct rgw_file_handle* object_fh = nullptr;
size_t nbytes;
int r = 0;