]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: add abstraction for ops log destination and add file logger 43329/head
authorCory Snyder <csnyder@iland.com>
Wed, 22 Sep 2021 14:14:12 +0000 (10:14 -0400)
committerCory Snyder <csnyder@iland.com>
Tue, 5 Oct 2021 08:46:24 +0000 (08:46 +0000)
Adds an OpsLogSink abstraction for ops log destinations. Also implements
this abstraction for a file logger since it's easier to use files vs.
sockets with containers.

Fixes: https://tracker.ceph.com/issues/48752
Signed-off-by: Cory Snyder <csnyder@iland.com>
src/common/options/rgw.yaml.in
src/rgw/librgw.cc
src/rgw/rgw_lib.h
src/rgw/rgw_log.cc
src/rgw/rgw_log.h
src/rgw/rgw_lua_request.cc
src/rgw/rgw_lua_request.h
src/rgw/rgw_main.cc
src/rgw/rgw_process.cc
src/rgw/rgw_process.h
src/test/fio/fio_librgw.cc

index 961d22b9e6d71f32729e8df3d3c50d9faad0ca0a..d116b05b67f8553716ec0bf2fa9951dbfb8671ba 100644 (file)
@@ -1438,6 +1438,7 @@ options:
   - rgw_log_object_name
   - rgw_ops_log_rados
   - rgw_ops_log_socket_path
+  - rgw_ops_log_file_path
   with_legacy: true
 # enable logging bandwidth usage
 - name: rgw_enable_usage_log
@@ -1478,6 +1479,18 @@ options:
   - rgw_enable_ops_log
   - rgw_ops_log_data_backlog
   with_legacy: true
+# path to file where ops log can go
+- name: rgw_ops_log_file_path
+  type: str
+  level: advanced
+  desc: File-system path for ops log.
+  long_desc: Path to file that RGW will log ops logs to.
+  fmt_desc: The file-system path for writing operations logs.
+  services:
+  - rgw
+  see_also:
+  - rgw_enable_ops_log
+  with_legacy: true
 # max data backlog for ops log
 - name: rgw_ops_log_data_backlog
   type: size
index e22010cb452d578cdbe88d6318928ba9f76c14c1..18c4140e60b31d6ada5113eb4e7f642b920eab80 100644 (file)
@@ -340,8 +340,7 @@ namespace rgw {
               << e.what() << dendl;
     }
     if (should_log) {
-      rgw_log_op(store, nullptr /* !rest */, s,
-                (op ? op->name() : "unknown"), olog);
+      rgw_log_op(nullptr /* !rest */, s, (op ? op->name() : "unknown"), olog);
     }
 
     int http_ret = s->err.http_ret;
@@ -589,10 +588,20 @@ namespace rgw {
 
     // XXX ex-RGWRESTMgr_lib, mgr->set_logging(true)
 
+    OpsLogManifold* olog_manifold = new OpsLogManifold();
     if (!g_conf()->rgw_ops_log_socket_path.empty()) {
-      olog = new OpsLogSocket(g_ceph_context, g_conf()->rgw_ops_log_data_backlog);
-      olog->init(g_conf()->rgw_ops_log_socket_path);
+      OpsLogSocket* olog_socket = new OpsLogSocket(g_ceph_context, g_conf()->rgw_ops_log_data_backlog);
+      olog_socket->init(g_conf()->rgw_ops_log_socket_path);
+      olog_manifold->add_sink(olog_socket);
     }
+    OpsLogFile* ops_log_file;
+    if (!g_conf()->rgw_ops_log_file_path.empty()) {
+      ops_log_file = new OpsLogFile(g_ceph_context, g_conf()->rgw_ops_log_file_path, g_conf()->rgw_ops_log_data_backlog);
+      ops_log_file->start();
+      olog_manifold->add_sink(ops_log_file);
+    }
+    olog_manifold->add_sink(new OpsLogRados(store));
+    olog = olog_manifold;
 
     int port = 80;
     RGWProcessEnv env = { store, &rest, olog, port };
@@ -653,7 +662,7 @@ namespace rgw {
     shutdown_async_signal_handler();
 
     rgw_log_usage_finalize();
-
+    
     delete olog;
 
     StoreManager::close_storage(store);
index 710c76c803158f040aca5b05e1aef95d215ff235..0c5e9e721685f42755c9dfcc3bb70f386ce0914f 100644 (file)
@@ -20,7 +20,7 @@
 
 #define dout_subsys ceph_subsys_rgw
 
-class OpsLogSocket;
+class OpsLogSink;
 
 namespace rgw {
 
@@ -29,7 +29,7 @@ namespace rgw {
   class RGWLib : public DoutPrefixProvider {
     RGWFrontendConfig* fec;
     RGWLibFrontend* fe;
-    OpsLogSocket* olog;
+    OpsLogSink* olog;
     rgw::LDAPHelper* ldh{nullptr};
     RGWREST rest; // XXX needed for RGWProcessEnv
     rgw::sal::Store* store;
index da6eb1e6ad337a9471e25406bea21b29391d3b85..aee079daf7f0e1e1c01607c2a0c3079136841138 100644 (file)
@@ -17,6 +17,9 @@
 
 #include "services/svc_zone.h"
 
+#include <chrono>
+#include <math.h>
+
 #define dout_subsys ceph_subsys_rgw
 
 using namespace std;
@@ -317,32 +320,131 @@ void rgw_format_ops_log_entry(struct rgw_log_entry& entry, Formatter *formatter)
   formatter->close_section();
 }
 
-void OpsLogSocket::formatter_to_bl(bufferlist& bl)
+OpsLogManifold::~OpsLogManifold()
 {
-  stringstream ss;
-  formatter->flush(ss);
-  const string& s = ss.str();
+    for (const auto &sink : sinks) {
+        delete sink;
+    }
+}
 
-  bl.append(s);
+void OpsLogManifold::add_sink(OpsLogSink* sink)
+{
+    sinks.push_back(sink);
 }
 
-void OpsLogSocket::init_connection(bufferlist& bl)
+int OpsLogManifold::log(struct req_state* s, struct rgw_log_entry& entry)
 {
-  bl.append("[");
+  int ret = 0;
+  for (const auto &sink : sinks) {
+    if (sink->log(s, entry) < 0) {
+      ret = -1;
+    }
+  }
+  return ret;
 }
 
-OpsLogSocket::OpsLogSocket(CephContext *cct, uint64_t _backlog) : OutputDataSocket(cct, _backlog)
+OpsLogFile::OpsLogFile(CephContext* cct, std::string& path, uint64_t max_data_size) :
+  cct(cct), file(path, std::ofstream::app), data_size(0), max_data_size(max_data_size)
 {
-  formatter = new JSONFormatter;
-  delim.append(",\n");
 }
 
-OpsLogSocket::~OpsLogSocket()
+void OpsLogFile::flush()
+{
+  std::scoped_lock flush_lock(flush_mutex);
+  {
+    std::scoped_lock log_lock(log_mutex);
+    assert(flush_buffer.empty());
+    flush_buffer.swap(log_buffer);
+    data_size = 0;
+  }
+  for (auto bl : flush_buffer) {
+    int try_num = 0;
+    while (true) {
+      bl.write_stream(file);
+      if (!file) {
+        ldpp_dout(this, 0) << "ERROR: failed to log RGW ops log file entry" << dendl;
+        file.clear();
+        if (stopped) {
+          break;
+        }
+        int sleep_time_secs = std::min((int) pow(2, try_num), 60);
+        std::this_thread::sleep_for(std::chrono::seconds(sleep_time_secs));
+        try_num++;
+      } else {
+        break;
+      }
+    }
+  }
+  flush_buffer.clear();
+  file << std::endl;
+}
+
+void* OpsLogFile::entry() {
+  std::unique_lock lock(log_mutex);
+  while (!stopped) {
+    if (!log_buffer.empty()) {
+      lock.unlock();
+      flush();
+      lock.lock();
+      continue;
+    }
+    cond_flush.wait(lock);
+  }
+  flush();
+  return NULL;
+}
+
+void OpsLogFile::start() {
+  stopped = false;
+  create("ops_log_file");
+}
+
+void OpsLogFile::stop() {
+  {
+    cond_flush.notify_one();
+    stopped = true;
+  }
+  join();
+}
+
+OpsLogFile::~OpsLogFile()
 {
+  if (!stopped) {
+    stop();
+  }
+  file.close();
+}
+
+int OpsLogFile::log_json(struct req_state* s, bufferlist& bl)
+{
+  std::unique_lock lock(log_mutex);
+  if (data_size + bl.length() >= max_data_size) {
+    ldout(s->cct, 0) << "ERROR: RGW ops log file buffer too full, dropping log for txn: " << s->trans_id << dendl;
+    return -1;
+  }
+  log_buffer.push_back(bl);
+  data_size += bl.length();
+  cond_flush.notify_all();
+  return 0;
+}
+
+JsonOpsLogSink::JsonOpsLogSink() {
+  formatter = new JSONFormatter;
+}
+
+JsonOpsLogSink::~JsonOpsLogSink() {
   delete formatter;
 }
 
-void OpsLogSocket::log(struct rgw_log_entry& entry)
+void JsonOpsLogSink::formatter_to_bl(bufferlist& bl)
+{
+  stringstream ss;
+  formatter->flush(ss);
+  const string& s = ss.str();
+  bl.append(s);
+}
+
+int JsonOpsLogSink::log(struct req_state* s, struct rgw_log_entry& entry)
 {
   bufferlist bl;
 
@@ -351,11 +453,53 @@ void OpsLogSocket::log(struct rgw_log_entry& entry)
   formatter_to_bl(bl);
   lock.unlock();
 
+  return log_json(s, bl);
+}
+
+void OpsLogSocket::init_connection(bufferlist& bl)
+{
+  bl.append("[");
+}
+
+OpsLogSocket::OpsLogSocket(CephContext *cct, uint64_t _backlog) : OutputDataSocket(cct, _backlog)
+{
+  delim.append(",\n");
+}
+
+int OpsLogSocket::log_json(struct req_state* s, bufferlist& bl)
+{
   append_output(bl);
+  return 0;
 }
 
-int rgw_log_op(rgw::sal::Store* store, RGWREST* const rest, struct req_state *s,
-              const string& op_name, OpsLogSocket *olog)
+OpsLogRados::OpsLogRados(rgw::sal::Store* store): store(store)
+{
+}
+
+int OpsLogRados::log(struct req_state* s, struct rgw_log_entry& entry)
+{
+  if (!s->cct->_conf->rgw_ops_log_rados) {
+    return 0;
+  }
+  bufferlist bl;
+  encode(entry, bl);
+
+  struct tm bdt;
+  time_t t = req_state::Clock::to_time_t(entry.time);
+  if (s->cct->_conf->rgw_log_object_name_utc)
+    gmtime_r(&t, &bdt);
+  else
+    localtime_r(&t, &bdt);
+  string oid = render_log_object_name(s->cct->_conf->rgw_log_object_name, &bdt,
+                                      entry.bucket_id, entry.bucket);
+  if (store->log_op(s, oid, bl) < 0) {
+    ldpp_dout(s, 0) << "ERROR: failed to log RADOS RGW ops log entry for txn: " << s->trans_id << dendl;
+    return -1;
+  }
+  return 0;
+}
+
+int rgw_log_op(RGWREST* const rest, struct req_state *s, const string& op_name, OpsLogSink *olog)
 {
   struct rgw_log_entry entry;
   string bucket_id;
@@ -473,37 +617,14 @@ int rgw_log_op(rgw::sal::Store* store, RGWREST* const rest, struct req_state *s,
     char buf[16];
     snprintf(buf, sizeof(buf), "%d", s->err.http_ret);
     entry.http_status = buf;
-  } else
+  } else {
     entry.http_status = "200"; // default
-
+  }
   entry.error_code = s->err.err_code;
   entry.bucket_id = bucket_id;
   entry.trans_id = s->trans_id;
-
-  bufferlist bl;
-  encode(entry, bl);
-
-  struct tm bdt;
-  time_t t = req_state::Clock::to_time_t(entry.time);
-  if (s->cct->_conf->rgw_log_object_name_utc)
-    gmtime_r(&t, &bdt);
-  else
-    localtime_r(&t, &bdt);
-
-  int ret = 0;
-
-  if (s->cct->_conf->rgw_ops_log_rados) {
-    string oid = render_log_object_name(s->cct->_conf->rgw_log_object_name, &bdt,
-                                       entry.bucket_id, entry.bucket);
-    ret = store->log_op(s, oid, bl);
-  }
-
   if (olog) {
-    olog->log(entry);
+    return olog->log(s, entry);
   }
-  if (ret < 0)
-    ldpp_dout(s, 0) << "ERROR: failed to log entry" << dendl;
-
-  return ret;
+  return 0;
 }
-
index 2469282b33012603c4db659c384c4bfbe2acf583..5ee411e8cfaec57ba1dfaa1bcc012f9362fe4be2 100644 (file)
@@ -7,6 +7,10 @@
 #include <boost/container/flat_map.hpp>
 #include "rgw_common.h"
 #include "common/OutputDataSocket.h"
+#include <vector>
+#include <fstream>
+
+#define dout_subsys ceph_subsys_rgw
 
 namespace rgw { namespace sal {
   class Store;
@@ -132,26 +136,79 @@ struct rgw_log_entry {
 };
 WRITE_CLASS_ENCODER(rgw_log_entry)
 
-class OpsLogSocket : public OutputDataSocket {
+class OpsLogSink {
+public:
+  virtual int log(struct req_state* s, struct rgw_log_entry& entry) = 0;
+  virtual ~OpsLogSink() = default;
+};
+
+class OpsLogManifold: public OpsLogSink {
+  std::vector<OpsLogSink*> sinks;
+public:
+  ~OpsLogManifold() override;
+  void add_sink(OpsLogSink* sink);
+  int log(struct req_state* s, struct rgw_log_entry& entry) override;
+};
+
+class JsonOpsLogSink : public OpsLogSink {
   ceph::Formatter *formatter;
-  ceph::mutex lock = ceph::make_mutex("OpsLogSocket");
+  ceph::mutex lock = ceph::make_mutex("JsonOpsLogSink");
 
   void formatter_to_bl(bufferlist& bl);
+protected:
+  virtual int log_json(struct req_state* s, bufferlist& bl) = 0;
+public:
+  JsonOpsLogSink();
+  ~JsonOpsLogSink() override;
+  int log(struct req_state* s, struct rgw_log_entry& entry) override;
+};
+
+class OpsLogFile : public JsonOpsLogSink, public Thread, public DoutPrefixProvider {
+  CephContext* cct;
+  ceph::mutex log_mutex = ceph::make_mutex("OpsLogFile_log");
+  ceph::mutex flush_mutex = ceph::make_mutex("OpsLogFile_flush");
+  std::vector<bufferlist> log_buffer;
+  std::vector<bufferlist> flush_buffer;
+  std::condition_variable cond_flush;
+  std::ofstream file;
+  bool stopped;
+  uint64_t data_size;
+  uint64_t max_data_size;
+
+  void flush();
+protected:
+  int log_json(struct req_state* s, bufferlist& bl) override;
+  void *entry() override;
+public:
+  OpsLogFile(CephContext* cct, std::string& path, uint64_t max_data_size);
+  ~OpsLogFile() override;
+  CephContext *get_cct() const override { return cct; }
+  unsigned get_subsys() const override { return dout_subsys; }
+  std::ostream& gen_prefix(std::ostream& out) const override { return out << "rgw OpsLogFile: "; }
+  void start();
+  void stop();
+};
 
+class OpsLogSocket : public OutputDataSocket, public JsonOpsLogSink {
 protected:
+  int log_json(struct req_state* s, bufferlist& bl) override;
   void init_connection(bufferlist& bl) override;
 
 public:
   OpsLogSocket(CephContext *cct, uint64_t _backlog);
-  ~OpsLogSocket() override;
+};
 
-  void log(struct rgw_log_entry& entry);
+class OpsLogRados : public OpsLogSink {
+  rgw::sal::Store* store;
+public:
+  OpsLogRados(rgw::sal::Store* store);
+  int log(struct req_state* s, struct rgw_log_entry& entry) override;
 };
 
 class RGWREST;
 
-int rgw_log_op(rgw::sal::Store* store, RGWREST* const rest, struct req_state* s,
-              const std::string& op_name, OpsLogSocket* olog);
+int rgw_log_op(RGWREST* const rest, struct req_state* s,
+              const std::string& op_name, OpsLogSink* olog);
 void rgw_log_usage_init(CephContext* cct, rgw::sal::Store* store);
 void rgw_log_usage_finalize();
 void rgw_format_ops_log_entry(struct rgw_log_entry& entry,
index a8ea4ee19b2c9f4a3b88c24cd051c145ceb0a063..7d6d72e0368b78cd87ee1a0a24165e2a3b082292 100644 (file)
@@ -24,16 +24,15 @@ constexpr const char* RequestLogAction{"Log"};
 
 int RequestLog(lua_State* L) 
 {
-  const auto store = reinterpret_cast<rgw::sal::RadosStore*>(lua_touserdata(L, lua_upvalueindex(1)));
-  const auto rest = reinterpret_cast<RGWREST*>(lua_touserdata(L, lua_upvalueindex(2)));
-  const auto olog = reinterpret_cast<OpsLogSocket*>(lua_touserdata(L, lua_upvalueindex(3)));
-  const auto s = reinterpret_cast<req_state*>(lua_touserdata(L, lua_upvalueindex(4)));
-  const std::string op_name(reinterpret_cast<const char*>(lua_touserdata(L, lua_upvalueindex(5))));
-  if (store && s) {
-    const auto rc = rgw_log_op(store, rest, s, op_name, olog);
+  const auto rest = reinterpret_cast<RGWREST*>(lua_touserdata(L, lua_upvalueindex(1)));
+  const auto olog = reinterpret_cast<OpsLogSink*>(lua_touserdata(L, lua_upvalueindex(2)));
+  const auto s = reinterpret_cast<req_state*>(lua_touserdata(L, lua_upvalueindex(3)));
+  const std::string op_name(reinterpret_cast<const char*>(lua_touserdata(L, lua_upvalueindex(4))));
+  if (s) {
+    const auto rc = rgw_log_op(rest, s, op_name, olog);
     lua_pushinteger(L, rc);
   } else {
-    ldpp_dout(s, 1) << "Lua ERROR: missing rados store, cannot use ops log"  << dendl;
+    ldpp_dout(s, 1) << "Lua ERROR: missing request state, cannot use ops log"  << dendl;
     lua_pushinteger(L, -EINVAL);
   }
 
@@ -777,7 +776,7 @@ struct RequestMetaTable : public EmptyMetaTable {
 int execute(
     rgw::sal::Store* store,
     RGWREST* rest,
-    OpsLogSocket* olog,
+    OpsLogSink* olog,
     req_state* s, 
     const char* op_name,
     const std::string& script)
@@ -799,12 +798,11 @@ int execute(
   lua_getglobal(L, RequestMetaTable::TableName().c_str());
   ceph_assert(lua_istable(L, -1));
   pushstring(L, RequestLogAction);
-  lua_pushlightuserdata(L, store);
   lua_pushlightuserdata(L, rest);
   lua_pushlightuserdata(L, olog);
   lua_pushlightuserdata(L, s);
   lua_pushlightuserdata(L, const_cast<char*>(op_name));
-  lua_pushcclosure(L, RequestLog, FIVE_UPVALS);
+  lua_pushcclosure(L, RequestLog, FOUR_UPVALS);
   lua_rawset(L, -3);
 
   try {
index d31e81b5a4fd5f1e4e7f20674627b57aed20ef8f..fcf0673f3ba7b5bb71c9cca394c9cd5f1bf3b642 100644 (file)
@@ -5,7 +5,7 @@
 
 class req_state;
 class RGWREST;
-class OpsLogSocket;
+class OpsLogSink;
 namespace rgw::sal {
   class Store;
 }
@@ -16,7 +16,7 @@ namespace rgw::lua::request {
 int execute(
     rgw::sal::Store* store,
     RGWREST* rest,
-    OpsLogSocket* olog,
+    OpsLogSink* olog,
     req_state *s, 
     const char* op_name,
     const std::string& script);
index c44b0cba61cab73ba9fa68e0988f0cc2f837d717..507c07ef2f972a06eb6ba1673e6f200422de1228 100644 (file)
@@ -518,12 +518,19 @@ int radosgw_Main(int argc, const char **argv)
 
   rgw::dmclock::SchedulerCtx sched_ctx{cct.get()};
 
-  OpsLogSocket *olog = NULL;
-
+  OpsLogManifold *olog = new OpsLogManifold();
   if (!g_conf()->rgw_ops_log_socket_path.empty()) {
-    olog = new OpsLogSocket(g_ceph_context, g_conf()->rgw_ops_log_data_backlog);
-    olog->init(g_conf()->rgw_ops_log_socket_path);
+    OpsLogSocket* olog_socket = new OpsLogSocket(g_ceph_context, g_conf()->rgw_ops_log_data_backlog);
+    olog_socket->init(g_conf()->rgw_ops_log_socket_path);
+    olog->add_sink(olog_socket);
+  }
+  OpsLogFile* ops_log_file;
+  if (!g_conf()->rgw_ops_log_file_path.empty()) {
+    ops_log_file = new OpsLogFile(g_ceph_context, g_conf()->rgw_ops_log_file_path, g_conf()->rgw_ops_log_data_backlog);
+    ops_log_file->start();
+    olog->add_sink(ops_log_file);
   }
+  olog->add_sink(new OpsLogRados(store));
 
   r = signal_fd_init();
   if (r < 0) {
@@ -671,7 +678,6 @@ int radosgw_Main(int argc, const char **argv)
   shutdown_async_signal_handler();
 
   rgw_log_usage_finalize();
-
   delete olog;
 
   StoreManager::close_storage(store);
index 9cf78fe8809b7a5145a1292a82f37d5a03bca56e..3f8c9c33efe6aff713a5d7e603812ab102ac3d8b 100644 (file)
@@ -189,7 +189,7 @@ int process_request(rgw::sal::Store* const store,
                     const std::string& frontend_prefix,
                     const rgw_auth_registry_t& auth_registry,
                     RGWRestfulIO* const client_io,
-                    OpsLogSocket* const olog,
+                    OpsLogSink* const olog,
                     optional_yield yield,
                    rgw::dmclock::Scheduler *scheduler,
                     string* user,
@@ -356,7 +356,7 @@ done:
   }
 
   if (should_log) {
-    rgw_log_op(store, rest, s, (op ? op->name() : "unknown"), olog);
+    rgw_log_op(rest, s, (op ? op->name() : "unknown"), olog);
   }
 
   if (http_ret != nullptr) {
index f8a97d16c29d4498b5dfde11ce1efe8d62bc34b2..1aac4a6cdf1bc8e4cbe7a6cdf6ce4d788178594e 100644 (file)
@@ -34,7 +34,7 @@ namespace rgw::dmclock {
 struct RGWProcessEnv {
   rgw::sal::Store* store;
   RGWREST *rest;
-  OpsLogSocket *olog;
+  OpsLogSink *olog;
   int port;
   std::string uri_prefix;
   std::shared_ptr<rgw::auth::StrategyRegistry> auth_registry;
@@ -49,7 +49,7 @@ protected:
   CephContext *cct;
   rgw::sal::Store* store;
   rgw_auth_registry_ptr_t auth_registry;
-  OpsLogSocket* olog;
+  OpsLogSink* olog;
   ThreadPool m_tp;
   Throttle req_throttle;
   RGWREST* rest;
@@ -169,7 +169,7 @@ extern int process_request(rgw::sal::Store* store,
                            const std::string& frontend_prefix,
                            const rgw_auth_registry_t& auth_registry,
                            RGWRestfulIO* client_io,
-                           OpsLogSocket* olog,
+                           OpsLogSink* olog,
                            optional_yield y,
                            rgw::dmclock::Scheduler *scheduler,
                            std::string* user,
index 0def8aac7240b63c49924b32654c6cca464acd57..b088b68f96c5ceb1296edc59a0e7d2fc1e700a3f 100644 (file)
@@ -380,7 +380,7 @@ namespace {
   {
     librgw_data* data = static_cast<librgw_data*>(td->io_ops_data);
     const char* object = io_u->file->file_name;
-    struct rgw_file_handle* object_fh;
+    struct rgw_file_handle* object_fh = nullptr;
     size_t nbytes;
     int r = 0;