]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw/bucket-logging: support for EC pools
authorNithya Balachandran <nithya.balachandran@ibm.com>
Thu, 27 Nov 2025 12:03:36 +0000 (12:03 +0000)
committerNithya Balachandran <nithya.balachandran@ibm.com>
Mon, 1 Dec 2025 12:34:42 +0000 (12:34 +0000)
Log buckets can now be created within erasure-coded (EC) pools.
To support append operations, a temporary log record object is initially
created in the replicated default.rgw.log pool. This object is then copied
to the EC pool upon log record commitment.
All implicit log commit operations will execute asynchronously. A new
BucketLoggingManager class is responsible for processing these pending
commits at set intervals. Explicit commit operations, however, will
continue to be performed synchronously.

Fixes: https://tracker.ceph.com/issues/71365
Signed-off-by: Nithya Balachandran <nithya.balachandran@ibm.com>
26 files changed:
PendingReleaseNotes
doc/rados/troubleshooting/log-and-debug.rst
doc/radosgw/bucket_logging.rst
src/common/subsys.h
src/rgw/CMakeLists.txt
src/rgw/driver/rados/rgw_bl_rados.cc [new file with mode: 0644]
src/rgw/driver/rados/rgw_bl_rados.h [new file with mode: 0644]
src/rgw/driver/rados/rgw_rados.cc
src/rgw/driver/rados/rgw_rados.h
src/rgw/driver/rados/rgw_sal_rados.cc
src/rgw/driver/rados/rgw_sal_rados.h
src/rgw/radosgw-admin/radosgw-admin.cc
src/rgw/rgw_appmain.cc
src/rgw/rgw_bucket_logging.cc
src/rgw/rgw_bucket_logging.h
src/rgw/rgw_object_expirer.cc
src/rgw/rgw_realm_reloader.cc
src/rgw/rgw_rest_bucket_logging.cc
src/rgw/rgw_sal.cc
src/rgw/rgw_sal.h
src/rgw/rgw_sal_filter.h
src/rgw/rgw_sal_store.h
src/rgw/rgw_zone.cc
src/rgw/rgw_zone.h
src/test/cli/radosgw-admin/help.t
src/test/rgw/rgw_cr_test.cc

index f056f8e8c61cace50740f70bef0da777cf35ca4f..1bdbd1a7e3a838f550a84342443e7fa1f83915f0 100644 (file)
@@ -1,3 +1,5 @@
+* RGW: Bucket Logging suppports creating log buckets in EC pools.
+  Implicit logging object commits are now performed asynchronously.
 * RGW: OpenSSL engine support is deprecated in favor of provider support.
   - Removed the `openssl_engine_opts` configuration option. OpenSSL engine configuration in string format is no longer supported.
   - Added the `openssl_conf` configuration option for loading specified providers as default providers.
index 66996b5e4ae257db71a80987f2ce058df0d2bfce..89d0f4c9988d418e7584eee66704144e51356bcb 100644 (file)
@@ -306,6 +306,8 @@ values to their defaults or to a level suitable for normal operations.
 +--------------------------+-----------+--------------+
 | ``rgw notification``     |     1     |      5       |
 +--------------------------+-----------+--------------+
+| ``rgw bucket logging``   |     1     |      5       |
++--------------------------+-----------+--------------+
 | ``javaclient``           |     1     |      5       |
 +--------------------------+-----------+--------------+
 | ``asok``                 |     1     |      5       |
index 78313fe084a6d31df28e6e7fcb40628257691f1f..3915ae59a71ac09d1f0baa97f6535b739cff9597 100644 (file)
@@ -54,6 +54,22 @@ them, regardless if enough time passed or if no more records are written to the
 object. Flushing will happen automatically when logging is disabled on a
 bucket, or its logging configuration is changed, or the bucket is deleted.
 
+The process of adding a new log object to the log bucket is asynchronous when
+triggered by a log record being written. Consequently, the operation that
+generated the log is completed immediately and does not wait for the log object
+addition to finish; this addition occurs later.
+The log object is added to the log bucket immediately when flushed. Consequently,
+this action may result in temporary gaps in the log records within the bucket
+until any pending log objects are also added.
+
+To check which log objects for a source bucket are currently pending addition to
+the log bucket, execute the following command:
+
+.. prompt:: bash #
+
+   radosgw-admin bucket logging list --bucket <source bucket>
+
+
 Standard
 ````````
 If the logging type is set to "Standard" (the default) the log records are
index f307929f6212712cecd7e801b8d1d834f7c0ce10..523f7cf3118b357a919b9c9257152f74f4deea3f 100644 (file)
@@ -69,6 +69,7 @@ SUBSYS(rgw_flight, 1, 5)
 SUBSYS(rgw_lifecycle, 1, 5)
 SUBSYS(rgw_restore, 1, 5)
 SUBSYS(rgw_notification, 1, 5)
+SUBSYS(rgw_bucket_logging, 1, 5)
 SUBSYS(javaclient, 1, 5)
 SUBSYS(asok, 1, 5)
 SUBSYS(throttle, 1, 1)
index f52dd9e5864685e8197bf2781465f2bc79108d68..5c84d92cbf0b1fe7437fc24c1aa2d891671a2737 100644 (file)
@@ -174,6 +174,7 @@ if(WITH_RADOSGW_RADOS)
           driver/rados/group.cc
           driver/rados/groups.cc
           driver/rados/rgw_bucket.cc
+          driver/rados/rgw_bl_rados.cc
           driver/rados/rgw_cr_rados.cc
           driver/rados/rgw_cr_tools.cc
           driver/rados/rgw_d3n_datacache.cc
diff --git a/src/rgw/driver/rados/rgw_bl_rados.cc b/src/rgw/driver/rados/rgw_bl_rados.cc
new file mode 100644 (file)
index 0000000..14b3853
--- /dev/null
@@ -0,0 +1,832 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "rgw_bucket_logging.h"
+#include "rgw_bl_rados.h"
+#include <chrono>
+#include <fmt/format.h>
+#include <future>
+#include <memory>
+#include <thread>
+#include <boost/algorithm/hex.hpp>
+#include <boost/asio/basic_waitable_timer.hpp>
+#include <boost/asio/executor_work_guard.hpp>
+#include <boost/asio/io_context.hpp>
+#include <boost/asio/spawn.hpp>
+#include <boost/context/protected_fixedsize_stack.hpp>
+#include "common/async/yield_context.h"
+#include "common/async/yield_waiter.h"
+#include "common/ceph_time.h"
+#include "common/dout.h"
+#include "cls/lock/cls_lock_client.h"
+#include "include/common_fwd.h"
+#include "include/function2.hpp"
+#include "librados/AioCompletionImpl.h"
+#include "services/svc_zone.h"
+#include "rgw_common.h"
+#include "rgw_perf_counters.h"
+#include "rgw_sal_rados.h"
+#include "rgw_zone_features.h"
+
+
+#define dout_subsys ceph_subsys_rgw_bucket_logging
+
+namespace rgw::bucketlogging {
+
+using commit_list_t = std::set<std::string>;
+
+// use mmap/mprotect to allocate 128k coroutine stacks
+auto make_stack_allocator() {
+  return boost::context::protected_fixedsize_stack{128*1024};
+}
+
+const std::string COMMIT_LIST_OBJECT_NAME = "bucket_logging_global_commit_list";
+static const std::string TEMP_POOL_ATTR = "temp_logging_pool";
+
+static std::string get_commit_list_name (const rgw::sal::Bucket* log_bucket,
+                                         const std::string& prefix) {
+  return fmt::format("{}/{}/{}/{}", log_bucket->get_tenant(),
+                     log_bucket->get_name(),
+                     log_bucket->get_bucket_id(), prefix);
+}
+
+int add_commit_target_entry(const DoutPrefixProvider* dpp,
+                            rgw::sal::RadosStore* store,
+                            const rgw::sal::Bucket* log_bucket,
+                            const std::string& prefix,
+                            const std::string& obj_name,
+                            const std::string& tail_obj_name,
+                            const rgw_pool& temp_data_pool,
+                            optional_yield y) {
+
+  auto& ioctx_logging = store->getRados()->get_logging_pool_ctx();
+  std::string target_obj_name = get_commit_list_name (log_bucket, prefix);
+  {
+    librados::ObjectWriteOperation op;
+    op.create(false);
+
+    bufferlist bl;
+    bl.append(tail_obj_name);
+    std::map<std::string, bufferlist> new_commit_list{{obj_name, bl}};
+    op.omap_set(new_commit_list);
+
+    bufferlist pool_bl;
+    encode(temp_data_pool, pool_bl);
+    op.setxattr(TEMP_POOL_ATTR.c_str(), pool_bl);
+
+    auto ret = rgw_rados_operate(dpp, ioctx_logging, target_obj_name, std::move(op), y);
+    if (ret < 0){
+      ldpp_dout(dpp, 1) << "ERROR: failed to add logging object entry " << obj_name
+                        << " to commit list object:" << target_obj_name
+                        << ". ret = " << ret << dendl;
+      return ret;
+    }
+  }
+
+  bufferlist empty_bl;
+  std::map<std::string, bufferlist> new_commit_list{{target_obj_name, empty_bl}};
+
+  librados::ObjectWriteOperation op;
+  op.create(false);
+  op.omap_set(new_commit_list);
+
+  auto ret = rgw_rados_operate(dpp, ioctx_logging, COMMIT_LIST_OBJECT_NAME,
+                               std::move(op), y);
+  if (ret < 0) {
+    ldpp_dout(dpp, 1) << "ERROR: failed to add entry " << target_obj_name
+                      << " to global bucket logging list object: "
+                      << COMMIT_LIST_OBJECT_NAME << ". ret = " << ret << dendl;
+    return ret;
+  }
+  ldpp_dout(dpp, 20) << "INFO: added entry " << target_obj_name
+                     << " to global bucket logging object: "
+                     << COMMIT_LIST_OBJECT_NAME << dendl;
+  return 0;
+}
+
+int list_pending_commit_objects(const DoutPrefixProvider* dpp,
+                                rgw::sal::RadosStore* store,
+                                const rgw::sal::Bucket* log_bucket,
+                                const std::string& prefix,
+                                std::set<std::string>& entries,
+                                optional_yield y) {
+
+  auto& ioctx_logging = store->getRados()->get_logging_pool_ctx();
+  std::string target_obj_name = get_commit_list_name (log_bucket, prefix);
+
+  entries.clear();
+
+  constexpr auto max_chunk = 1024U;
+  std::string start_after;
+  bool more = true;
+  int rval;
+  while (more) {
+    librados::ObjectReadOperation op;
+    std::set <std::string> entries_chunk;
+    op.omap_get_keys2(start_after, max_chunk, &entries_chunk, &more, &rval);
+    const auto ret = rgw_rados_operate(dpp, ioctx_logging, target_obj_name,
+                                       std::move(op), nullptr, y);
+    if (ret == -ENOENT) {
+      // log commit list object was not created - nothing to do
+      return 0;
+    }
+    if (ret < 0) {
+      // TODO: do we need to check on rval as well as ret?
+      ldpp_dout(dpp, 1) << "ERROR: failed to read logging commit list "
+                        <<  target_obj_name << " ret: " << ret << dendl;
+      return ret;
+    }
+    entries.merge(entries_chunk);
+    if (more) {
+      start_after = *entries.rbegin();
+    }
+  }
+  return 0;
+}
+
+class BucketLoggingManager : public DoutPrefixProvider {
+  using Executor = boost::asio::io_context::executor_type;
+  bool m_shutdown = false;
+  static constexpr auto m_log_commits_update_period = std::chrono::milliseconds(10000); // 10s
+  static constexpr auto m_log_commits_update_retry = std::chrono::milliseconds(1000); // 1s
+  static constexpr auto m_log_commits_idle_sleep = std::chrono::milliseconds(5000); // 5s
+  const utime_t m_failover_time = utime_t(m_log_commits_update_period*9); // 90s
+  CephContext* const m_cct;
+  static constexpr auto COOKIE_LEN = 16;
+  const std::string m_lock_cookie;
+  const std::string m_lock_name = "bl_mgr_lock";
+  boost::asio::io_context m_io_context;
+  boost::asio::executor_work_guard<Executor> m_work_guard;
+  std::thread m_worker;
+  const SiteConfig& m_site;
+  rgw::sal::RadosStore* const m_rados_store;
+
+private:
+
+  CephContext *get_cct() const override { return m_cct; }
+
+  unsigned get_subsys() const override { return dout_subsys; }
+
+  std::ostream& gen_prefix(std::ostream& out) const override {
+    return out << "rgw bucket_logging: ";
+  }
+
+  int parse_list_name(std::string list_obj_name, std::string &tenant_name,
+                      std::string &bucket_name, std::string &bucket_id,
+                      std::string &prefix) {
+    // <tenant_name>/<bucket_name>/<bucket_id>/<prefix>
+    size_t pstart = 0, pend = 0;
+    pend =list_obj_name.find('/');
+    if (pend == std::string::npos) {
+      return -EINVAL;
+    }
+    tenant_name = list_obj_name.substr(pstart, pend);
+
+    pstart = pend + 1;
+    pend =list_obj_name.find('/', pstart);
+    if (pend == std::string::npos) {
+      return -EINVAL;
+    }
+    bucket_name = list_obj_name.substr(pstart, pend - pstart);
+    if (bucket_name.empty()) {
+    return -EINVAL;
+    }
+
+    pstart = pend + 1;
+    pend =list_obj_name.find('/', pstart);
+    if (pend == std::string::npos) {
+      return -EINVAL;
+    }
+    bucket_id = list_obj_name.substr(pstart, pend - pstart);
+    if (bucket_id.empty()) {
+      return -EINVAL;
+    }
+    prefix = list_obj_name.substr(pend + 1);
+
+  /*
+    //TODO: Fix this
+    std::regex pattern(R"(([^/]+)/([^/]+)/([^/]+)/(.*))");
+    std::smatch matches;
+
+    if (std::regex_match(list_obj_name, matches, pattern)) {
+        tenant_name = matches[1].str();
+        bucket_name = matches[2].str();
+        bucket_id = matches[3].str();
+        prefix = matches[4].str();
+    } else {
+        return -EINVAL;
+    }
+*/
+    return 0;
+  }
+
+  // read the list of commit lists from the global list object
+  int read_global_logging_list(commit_list_t& commits, optional_yield y) {
+    constexpr auto max_chunk = 1024U;
+    std::string start_after;
+    bool more = true;
+    int rval;
+    while (more) {
+      librados::ObjectReadOperation op;
+      commit_list_t commits_chunk;
+      op.omap_get_keys2(start_after, max_chunk, &commits_chunk, &more, &rval);
+      const auto ret = rgw_rados_operate(this, m_rados_store->getRados()->get_logging_pool_ctx(),
+                                         COMMIT_LIST_OBJECT_NAME, std::move(op),
+                                         nullptr, y);
+      if (ret == -ENOENT) {
+        // global commit list object was not created - nothing to do
+        return 0;
+      }
+      if (ret < 0) {
+        // TODO: do we need to check on rval as well as ret?
+        ldpp_dout(this, 5) << "ERROR: failed to read bucket logging global commit list. error: "
+                           << ret << dendl;
+        return ret;
+      }
+      commits.merge(commits_chunk);
+      if (more) {
+        start_after = *commits.rbegin();
+      }
+    }
+    return 0;
+  }
+
+  class tokens_waiter {
+    size_t pending_tokens = 0;
+    DoutPrefixProvider* const dpp;
+    ceph::async::yield_waiter<void> waiter;
+
+  public:
+    class token{
+      tokens_waiter* tw;
+    public:
+      token(const token& other) = delete;
+      token(token&& other) : tw(other.tw) {
+        other.tw = nullptr; // mark as moved
+      }
+      token& operator=(const token& other) = delete;
+      token(tokens_waiter* _tw) : tw(_tw) {
+        ++tw->pending_tokens;
+      }
+
+      ~token() {
+        if (!tw) {
+          return; // already moved
+        }
+        --tw->pending_tokens;
+        if (tw->pending_tokens == 0 && tw->waiter) {
+          tw->waiter.complete(boost::system::error_code{});
+        }
+      }
+    };
+
+    tokens_waiter(DoutPrefixProvider* _dpp) : dpp(_dpp) {}
+    tokens_waiter(const tokens_waiter& other) = delete;
+    tokens_waiter& operator=(const tokens_waiter& other) = delete;
+
+    void async_wait(boost::asio::yield_context yield) {
+      if (pending_tokens == 0) {
+        return;
+      }
+      ldpp_dout(dpp, 20) << "INFO: tokens waiter is waiting on "
+                         << pending_tokens << " tokens" << dendl;
+      boost::system::error_code ec;
+      waiter.async_wait(yield[ec]);
+      ldpp_dout(dpp, 20) << "INFO: tokens waiter finished waiting for all tokens" << dendl;
+    }
+  };
+
+  // processing of a specific entry
+  // return whether processing was successful (true) or not (false)
+  int process_entry(
+      const ConfigProxy& conf,
+      const std::string& entry,
+      const std::string& temp_obj_name,
+      const std::string& tenant_name,
+      const std::string& bucket_name,
+      const std::string& bucket_id,
+      const std::string& prefix,
+      const rgw_pool& obj_pool,
+      bool& bucket_deleted,
+      boost::asio::yield_context yield) {
+
+    std::unique_ptr<rgw::sal::Bucket> log_bucket;
+    rgw_bucket bucket = rgw_bucket{tenant_name, bucket_name, bucket_id};
+    int ret = m_rados_store->load_bucket(this, bucket, &log_bucket, yield);
+    if (ret < 0 && ret != -ENOENT) {
+      ldpp_dout(this, 1) << "ERROR: failed to load bucket : "
+                         << bucket_name << " id : " << bucket_id
+                         << ", error: " << ret << dendl;
+      return ret;
+    }
+    if (ret == 0) {
+      ldpp_dout(this, 20) << "INFO: loaded bucket : " << bucket_name
+                          << ", id: "<< bucket_id << dendl;
+      ret = log_bucket->commit_logging_object(entry, yield, this, prefix,
+                                              nullptr, false);
+      if (ret < 0) {
+        ldpp_dout(this, 1) << "ERROR: failed to commit logging object : "
+                            << entry << " , error: " << ret << dendl;
+        return ret;
+      }
+      ldpp_dout(this, 20) << "INFO: committed logging object : " << entry
+                          << " to bucket " << bucket_name
+                          << ", id: "<< bucket_id << dendl;
+      return 0;
+    }
+    // The log bucket has been deleted. Clean up pending objects.
+    // This is done because the bucket_deletion_cleanup only removes
+    // the current temp log objects.
+    bucket_deleted = true;
+    ldpp_dout(this, 20) << "INFO: log bucket : " << bucket_name
+                        << " no longer exists. Removing pending logging object "
+                        << temp_obj_name << " from pool " << obj_pool
+                        << dendl;
+    ret = rgw_delete_system_obj(this, m_rados_store->svc()->sysobj, obj_pool,
+                                temp_obj_name, nullptr, yield);
+    if (ret < 0 && ret != -ENOENT) {
+      ldpp_dout(this, 1) << "ERROR: failed to delete pending logging object : "
+                         << temp_obj_name << " in pool " << obj_pool
+                         << " for deleted log bucket : " << bucket_name
+                         << " . ret = "<< ret << dendl;
+      return ret;
+    }
+    ldpp_dout(this, 20) << "INFO: Deleted pending logging object "
+                        << temp_obj_name << " from pool " << obj_pool
+                        << " for deleted log bucket : " << bucket_name
+                        << dendl;
+    return 0;
+  }
+
+  void async_sleep(boost::asio::yield_context yield, const std::chrono::milliseconds& duration) {
+    using Clock = ceph::coarse_mono_clock;
+    using Timer = boost::asio::basic_waitable_timer<Clock,
+        boost::asio::wait_traits<Clock>, Executor>;
+    Timer timer(m_io_context);
+    timer.expires_after(duration);
+    boost::system::error_code ec;
+    timer.async_wait(yield[ec]);
+    if (ec) {
+      ldpp_dout(this, 10) << "ERROR: async_sleep failed with error: "
+                         << ec.message() << dendl;
+    }
+  }
+
+  // unlock (lose ownership) list object
+  int unlock_list_object(librados::IoCtx& rados_ioctx,
+                         const std::string& list_obj_name,
+                         boost::asio::yield_context yield) {
+
+    librados::ObjectWriteOperation op;
+    op.assert_exists();
+    rados::cls::lock::unlock(&op, m_lock_name, m_lock_cookie);
+    const auto ret = rgw_rados_operate(this, rados_ioctx, list_obj_name,
+                                       std::move(op), yield);
+    if (ret == -ENOENT) {
+      ldpp_dout(this, 20) << "INFO: log commit list: " << list_obj_name
+                          << " was removed. Nothing to unlock." << dendl;
+      return 0;
+    }
+    if (ret == -EBUSY) {
+      ldpp_dout(this, 20) << "INFO: log commit list: " << list_obj_name
+                          << " already owned by another RGW. No need to unlock."
+                          << dendl;
+      return 0;
+    }
+    return ret;
+  }
+
+  int remove_commit_list(librados::IoCtx& rados_ioctx,
+                         const std::string& commit_list_name,
+                         optional_yield y) {
+    {
+      librados::ObjectWriteOperation op;
+      rados::cls::lock::assert_locked(&op, m_lock_name,
+                                      ClsLockType::EXCLUSIVE,
+                                      m_lock_cookie,
+                                      "" /*no tag*/);
+      op.remove();
+      auto ret = rgw_rados_operate(this, rados_ioctx, commit_list_name, std::move(op), y);
+      if (ret < 0 && ret != -ENOENT) {
+        ldpp_dout(this, 1) << "ERROR: failed to remove bucket logging commit list :"
+                           << commit_list_name << ". ret = " << ret << dendl;
+        return ret;
+      }
+    }
+
+    librados::ObjectWriteOperation op;
+    op.omap_rm_keys({commit_list_name});
+    auto ret = rgw_rados_operate(this, rados_ioctx, COMMIT_LIST_OBJECT_NAME,
+                                 std::move(op), y);
+    if (ret < 0) {
+      ldpp_dout(this, 1) << "ERROR: failed to remove entry " << commit_list_name
+                         << " from the global bucket logging list : "
+                         << COMMIT_LIST_OBJECT_NAME << ". ret = " << ret << dendl;
+      return ret;
+    }
+    ldpp_dout(this, 20) << "INFO: removed entry " << commit_list_name
+                        << " from the global bucket logging list : "
+                        << COMMIT_LIST_OBJECT_NAME << dendl;
+
+    return 0;
+  }
+
+  // processing of a specific target list
+  void process_commit_list(librados::IoCtx& rados_ioctx,
+                           const std::string& list_obj_name,
+                           boost::asio::yield_context yield) {
+    auto is_idle = false;
+    auto bucket_deleted = false;
+    auto skip_bucket_deletion = false;
+    const std::string start_marker;
+
+    std::string tenant_name, bucket_name, bucket_id, prefix;
+    int ret = parse_list_name(list_obj_name, tenant_name, bucket_name,
+                              bucket_id, prefix);
+    if (ret) {
+      ldpp_dout(this, 1) << "ERROR: commit list name: " << list_obj_name
+                          << " is invalid. Processing will stop." << dendl;
+      return;
+    }
+    ldpp_dout(this, 20) << "INFO: parse commit list name: " << list_obj_name
+                        << ", bucket_name: " << bucket_name
+                        << ", bucket_id: " << bucket_id
+                        << ", tenant_name: " << tenant_name
+                        << ", prefix: " << prefix << dendl;
+
+    while (!m_shutdown) {
+      // if the list was empty the last time, sleep for idle timeout
+      if (is_idle) {
+        async_sleep(yield, m_log_commits_idle_sleep);
+      }
+
+      // Get the entries in the list after locking it
+      is_idle = true;
+      skip_bucket_deletion = false;
+      std::map<std::string, bufferlist> entries;
+      rgw_pool temp_data_pool;
+      auto total_entries = 0U;
+      {
+        librados::ObjectReadOperation op;
+        op.assert_exists();
+        bufferlist obl;
+        int rval;
+        bufferlist pool_obl;
+        int pool_rval;
+        constexpr auto max_chunk = 1024U;
+        std::string start_after;
+        bool more = true;
+        rados::cls::lock::assert_locked(&op, m_lock_name,
+                                        ClsLockType::EXCLUSIVE,
+                                        m_lock_cookie,
+                                        "" /*no tag*/);
+        op.omap_get_vals2(start_after, max_chunk, &entries, &more, &rval);
+        op.getxattr(TEMP_POOL_ATTR.c_str(), &pool_obl, &pool_rval);
+        // check ownership and list entries in one batch
+        auto ret = rgw_rados_operate(this, rados_ioctx, list_obj_name,
+                                     std::move(op), nullptr, yield);
+        if (ret == -ENOENT) {
+          // list object was deleted
+          ldpp_dout(this, 20) << "INFO: object: " << list_obj_name
+                              << " was removed. Processing will stop" << dendl;
+          return;
+        }
+        if (ret == -EBUSY) {
+          ldpp_dout(this, 10) << "WARNING: commit list : " << list_obj_name
+                              << " ownership moved to another daemon. Processing will stop"
+                              << dendl;
+          return;
+        }
+        if (ret < 0) {
+          ldpp_dout(this, 10) << "WARNING: failed to get list of entries in "
+                              << "and/or lock object: " << list_obj_name
+                              << ". error: " << ret << " (will retry)" << dendl;
+          continue;
+        }
+        if(pool_rval < 0) {
+          ldpp_dout(this, 10) << "WARNING: failed to get pool information"
+                              << "from commit list: " << list_obj_name << dendl;
+          return;
+        }
+        // Get temp logging object pool information - this is required for
+        // deleting the objects when the log bucket is deleted.
+        try {
+          auto p = pool_obl.cbegin();
+          decode(temp_data_pool, p);
+        } catch (buffer::error& err) {
+          ldpp_dout(this, 1) << "ERROR:  failed to get pool information from object: "
+                             << list_obj_name << dendl;
+          return;
+        }
+      }
+      total_entries = entries.size();
+      if (total_entries == 0) {
+        // nothing in the list object
+        // Delete the list object if the bucket has been deleted
+        std::unique_ptr<rgw::sal::Bucket> log_bucket;
+        rgw_bucket bucket = rgw_bucket{tenant_name, bucket_name, bucket_id};
+        int ret = m_rados_store->load_bucket(this, bucket, &log_bucket, yield);
+        if (ret == -ENOENT) {
+          ldpp_dout(this, 20) << "ERROR: failed to load bucket : "
+                              << bucket_name << ", id : " << bucket_id
+                              << ", error: " << ret << dendl;
+          bucket_deleted = true;
+        }
+      } else {
+      // log when not idle
+        ldpp_dout(this, 20) << "INFO: found: " << total_entries
+                            << " entries in commit list: " << list_obj_name
+                            << dendl;
+      }
+      auto stop_processing = false;
+      std::set<std::string> remove_entries;
+      tokens_waiter tw(this);
+      for (auto const& [key, val] : entries) {
+        if (stop_processing) {
+          break;
+        }
+        std::string temp_obj_name = val.to_str();
+
+        ldpp_dout(this, 20) << "INFO: processing entry: " << key
+                            << ", value : " << temp_obj_name << dendl;
+
+        tokens_waiter::token token(&tw);
+        boost::asio::spawn(yield, std::allocator_arg, make_stack_allocator(),
+          [this, &is_idle, &list_obj_name, &remove_entries, &stop_processing,
+           &tenant_name, &bucket_name, &bucket_id, &prefix, token = std::move(token),
+           key, temp_obj_name, temp_data_pool, &bucket_deleted , &skip_bucket_deletion]
+           (boost::asio::yield_context yield) {
+            auto result =
+                process_entry(this->get_cct()->_conf, key, temp_obj_name, tenant_name,
+                              bucket_name, bucket_id, prefix, temp_data_pool,
+                              bucket_deleted, yield);
+            if (result == 0) {
+              ldpp_dout(this, 20) << "INFO: processing of entry: " << key
+                                  << " from: " << list_obj_name
+                                  << " was successful." << dendl;
+              remove_entries.insert(key);
+              is_idle = false;
+              return;
+            } else {
+              is_idle = true;
+              // Don't delete the bucket if we couldn't process the entry
+              // and the bucket has been deleted
+              skip_bucket_deletion = true;
+              ldpp_dout(this, 20) << "INFO: failed to process entry: " << key
+                                  << " from: " << list_obj_name << dendl;
+            }
+            stop_processing = true;
+        }, [] (std::exception_ptr eptr) {
+          if (eptr) std::rethrow_exception(eptr);
+        });
+      }
+
+      if (!entries.empty()) {
+        // wait for all pending work to finish
+        tw.async_wait(yield);
+      }
+
+      if (bucket_deleted && !skip_bucket_deletion) {
+        ret = remove_commit_list(rados_ioctx, list_obj_name, yield);
+        if (ret < 0) {
+          ldpp_dout(this, 1) << "ERROR: failed to remove bucket logging commit list :"
+                              << list_obj_name << ". ret = " << ret << dendl;
+          return;
+        }
+        ldpp_dout(this, 10) << "INFO: bucket :" << bucket_name
+                            << ". was removed. processing will stop" << dendl;
+        return;
+      }
+      // delete all processed entries from list
+      if (!remove_entries.empty()) {
+        // Delete the entries even if the lock no longer belongs
+        // to this instance as we have processed the entries.
+        librados::ObjectWriteOperation op;
+        op.omap_rm_keys(remove_entries);
+        auto ret = rgw_rados_operate(this, rados_ioctx, list_obj_name, std::move(op), yield);
+        if (ret == -ENOENT) {
+          // list was deleted
+          ldpp_dout(this, 10) << "INFO: commit list " << list_obj_name
+                              << ". was removed. processing will stop" << dendl;
+          return;
+        }
+        if (ret < 0) {
+          ldpp_dout(this, 1) << "ERROR: failed to remove entries from commit list: "
+                             << list_obj_name << ". error: " << ret
+                             << ". This could cause some log records to be lost."
+                             << dendl;
+          return;
+        } else {
+          ldpp_dout(this, 20) << "INFO: removed entries from commit list: "
+                              << list_obj_name << dendl;
+        }
+      }
+    }
+    ldpp_dout(this, 5) << "INFO: BucketLogging manager stopped processing : "
+                       << list_obj_name << dendl;
+  }
+
+  // process all commit log objects
+  // find which of the commit log objects is owned by this daemon and process it
+  void process_global_logging_list(boost::asio::yield_context yield) {
+    auto has_error = false;
+    std::unordered_set<std::string> owned_commit_logs;
+    std::atomic<size_t> processed_list_count = 0;
+
+    std::vector<std::string> commit_list_gc;
+    std::mutex commit_list_gc_lock;
+    auto& rados_ioctx = m_rados_store->getRados()->get_logging_pool_ctx();
+    auto next_check_time = ceph::coarse_real_clock::zero();
+    while (!m_shutdown) {
+      // check if log commit list needs to be refreshed
+      if (ceph::coarse_real_clock::now() > next_check_time) {
+        next_check_time = ceph::coarse_real_clock::now() + m_log_commits_update_period;
+        const auto tp = ceph::coarse_real_time::clock::to_time_t(next_check_time);
+        ldpp_dout(this, 20) << "INFO: processing global logging commit list. next "
+                            << "processing will happen at: " << std::ctime(&tp)
+                            << dendl;
+      } else {
+        // short sleep duration to prevent busy wait when refreshing list
+        // or retrying after error
+        ldpp_dout(this, 20) << "INFO: processing global logging commit list. Sleep now." << dendl;
+        async_sleep(yield, m_log_commits_update_retry);
+        if (!has_error) {
+          // in case of error we will retry
+          continue;
+        }
+      }
+
+      std::set<std::string> commit_lists;
+      auto ret = read_global_logging_list(commit_lists, yield);
+      if (ret < 0) {
+        has_error = true;
+        ldpp_dout(this, 1) << "ERROR: failed to read global logging list. Retry. ret:"
+                           << ret << dendl;
+        continue;
+      }
+
+      for (const auto& list_obj_name : commit_lists) {
+        // try to lock the object to check if it is owned by this rgw
+        // or if ownership needs to be taken
+        ldpp_dout(this, 20) << "INFO: processing commit list: " << list_obj_name
+                            << dendl;
+        librados::ObjectWriteOperation op;
+        op.assert_exists();
+        rados::cls::lock::lock(&op, m_lock_name, ClsLockType::EXCLUSIVE,
+                               m_lock_cookie, "" /*no tag*/,
+                               "" /*no description*/, m_failover_time,
+                               LOCK_FLAG_MAY_RENEW);
+
+        ret = rgw_rados_operate(this, rados_ioctx, list_obj_name, std::move(op), yield);
+        if (ret == -EBUSY) {
+          // lock is already taken by another RGW
+          ldpp_dout(this, 20) << "INFO: commit list: " << list_obj_name
+                              << " owned (locked) by another daemon" << dendl;
+          // if the list was owned by this RGW, processing should be stopped, would be deleted from list afterwards
+          continue;
+        }
+        if (ret == -ENOENT) {
+          // commit list is deleted - processing will stop the next time we try to read from it
+          ldpp_dout(this, 20) << "INFO: commit list: " << list_obj_name
+                              << " should not be locked - already deleted" << dendl;
+          continue;
+        }
+        if (ret < 0) {
+          // failed to lock for another reason, continue to process other lists
+          ldpp_dout(this, 10) << "ERROR: failed to lock commit list: "
+                              << list_obj_name << ". error: " << ret << dendl;
+          has_error = true;
+          continue;
+        }
+        // add the commit list to the list of owned commit_logs
+        if (owned_commit_logs.insert(list_obj_name).second) {
+          ldpp_dout(this, 20) << "INFO: " << list_obj_name << " now owned (locked) by this daemon" << dendl;
+          // start processing this list
+          boost::asio::spawn(make_strand(m_io_context), std::allocator_arg, make_stack_allocator(),
+                             [this, &commit_list_gc, &commit_list_gc_lock, &rados_ioctx,
+                             list_obj_name, &processed_list_count](boost::asio::yield_context yield) {
+            ++processed_list_count;
+            process_commit_list(rados_ioctx, list_obj_name, yield);
+            // if list processing ended, it means that the list object was removed or not owned anymore
+            const auto ret = unlock_list_object(rados_ioctx, list_obj_name, yield);
+            if (ret < 0) {
+              ldpp_dout(this, 15) << "WARNING: failed to unlock commit list: "
+                                  << list_obj_name << " with error: " << ret
+                                  << " (ownership would still move if not renewed)"
+                                  << dendl;
+            } else {
+              ldpp_dout(this, 20) << "INFO: commit list: " << list_obj_name
+                                  << " not locked (ownership can move)" << dendl;
+            }
+            // mark it for deletion
+            std::lock_guard lock_guard(commit_list_gc_lock);
+            commit_list_gc.push_back(list_obj_name);
+            --processed_list_count;
+            ldpp_dout(this, 20) << "INFO: " << list_obj_name << " marked for removal" << dendl;
+          }, [this, list_obj_name] (std::exception_ptr eptr) {
+            ldpp_dout(this, 10) << "ERROR: " << list_obj_name << " processing failed" << dendl;
+            if (eptr) std::rethrow_exception(eptr);
+          });
+        } else {
+          ldpp_dout(this, 20) << "INFO: " << list_obj_name << " ownership (lock) renewed" << dendl;
+        }
+      }
+      // erase all list objects that were deleted
+      {
+        std::lock_guard lock_guard(commit_list_gc_lock);
+        std::for_each(commit_list_gc.begin(), commit_list_gc.end(), [this, &owned_commit_logs](const std::string& list_obj_name) {
+          owned_commit_logs.erase(list_obj_name);
+          ldpp_dout(this, 20) << "INFO: commit list object: " << list_obj_name << " was removed" << dendl;
+        });
+        commit_list_gc.clear();
+      }
+    }
+    while (processed_list_count > 0) {
+      ldpp_dout(this, 20) << "INFO: BucketLogging manager stopped. "
+                          << processed_list_count
+                          << " commit lists are still being processed" << dendl;
+      async_sleep(yield, m_log_commits_update_retry);
+    }
+    ldpp_dout(this, 5) << "INFO: BucketLogging manager stopped. Done processing all commit lists" << dendl;
+  }
+
+public:
+
+  ~BucketLoggingManager() {
+  }
+
+  void stop() {
+    ldpp_dout(this, 5) << "INFO: BucketLogging manager received stop signal. shutting down..." << dendl;
+    m_shutdown = true;
+    m_work_guard.reset();
+    if (m_worker.joinable()) {
+      // try graceful shutdown first
+      auto future = std::async(std::launch::async, [this]() {m_worker.join();});
+      if (future.wait_for(m_log_commits_update_retry*2) == std::future_status::timeout) {
+        // force stop if graceful shutdown takes too long
+        if (!m_io_context.stopped()) {
+          ldpp_dout(this, 5) << "INFO: force shutdown of BucketLogging Manager" << dendl;
+          m_io_context.stop();
+        }
+        m_worker.join();
+      }
+    }
+    ldpp_dout(this, 5) << "INFO: BucketLogging manager shutdown complete" << dendl;
+  }
+
+  void init() {
+    boost::asio::spawn(make_strand(m_io_context), std::allocator_arg, make_stack_allocator(),
+        [this](boost::asio::yield_context yield) {
+          process_global_logging_list(yield);
+        }, [] (std::exception_ptr eptr) {
+          if (eptr) std::rethrow_exception(eptr);
+        });
+    // start the worker threads to do the actual list processing
+    m_worker = std::thread([this]() {
+      ceph_pthread_setname("bucket-logging-worker");
+      try {
+        ldpp_dout(this, 20) << "INFO: bucket logging worker started" << dendl;
+        m_io_context.run();
+        ldpp_dout(this, 20) << "INFO: bucket logging worker ended" << dendl;
+      } catch (const std::exception& err) {
+        ldpp_dout(this, 10) << "ERROR: bucket logging worker failed with error: "
+                           << err.what() << dendl;
+        throw err;
+      }
+    });
+    ldpp_dout(this, 10) << "INFO: started bucket logging manager" << dendl;
+  }
+
+  BucketLoggingManager(CephContext* _cct, rgw::sal::RadosStore* store, const SiteConfig& site) :
+    m_cct(_cct),
+    m_lock_cookie(gen_rand_alphanumeric(m_cct, COOKIE_LEN)),
+    m_work_guard(boost::asio::make_work_guard(m_io_context)),
+    m_site(site),
+    m_rados_store(store)
+    {
+      ldpp_dout(this, 5) << "INFO: BucketLoggingManager() constructor" << dendl;
+    }
+};
+
+std::unique_ptr<BucketLoggingManager> s_manager;
+
+bool init(const DoutPrefixProvider* dpp, rgw::sal::RadosStore* store,
+          const SiteConfig& site) {
+  if (s_manager) {
+    ldpp_dout(dpp, 1) << "ERROR: failed to init BucketLogging manager: already exists" << dendl;
+    return false;
+  }
+  // TODO: take conf from CephContext
+  ldpp_dout(dpp, 1) << "INFO: initialising BucketLogging manager" << dendl;
+  s_manager = std::make_unique<BucketLoggingManager>(dpp->get_cct(), store, site);
+  s_manager->init();
+  return true;
+}
+
+void shutdown() {
+  if (!s_manager) return;
+  s_manager->stop();
+  s_manager.reset();
+}
+
+} // namespace rgw::bucket_logging
diff --git a/src/rgw/driver/rados/rgw_bl_rados.h b/src/rgw/driver/rados/rgw_bl_rados.h
new file mode 100644 (file)
index 0000000..1075adb
--- /dev/null
@@ -0,0 +1,41 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#pragma once
+
+#include <string>
+
+// forward declarations
+namespace rgw { class SiteConfig; }
+namespace rgw::sal {
+  class RadosStore;
+}
+class DoutPrefixProvider;
+
+namespace rgw::bucketlogging {
+
+  // initialize the bucket logging commit manager
+  bool init(const DoutPrefixProvider* dpp, rgw::sal::RadosStore* store,
+            const rgw::SiteConfig& site);
+
+  // shutdown the bucket logging commit manager
+  void shutdown();
+
+  int add_commit_target_entry(const DoutPrefixProvider* dpp,
+                              rgw::sal::RadosStore* store,
+                              const rgw::sal::Bucket* log_bucket,
+                              const std::string& prefix,
+                              const std::string& obj_name,
+                              const std::string& tail_obj_name,
+                              const rgw_pool& temp_data_pool,
+                              optional_yield y);
+
+int list_pending_commit_objects(const DoutPrefixProvider* dpp,
+                                rgw::sal::RadosStore* store,
+                                const rgw::sal::Bucket* log_bucket,
+                                const std::string& prefix,
+                                std::set<std::string>& entries,
+                                optional_yield y);
+
+}
+
index 2a1b5d541ea9df288a06e70c64bb0bc370735be3..de1ed2c99c753e356c8b0f514e91782ada12b1bb 100644 (file)
@@ -57,6 +57,7 @@
 #include "rgw_etag_verifier.h"
 #include "rgw_worker.h"
 #include "rgw_notify.h"
+#include "rgw_bl_rados.h"
 #include "rgw_http_errors.h"
 #include "rgw_multipart_meta_filter.h"
 
@@ -1149,6 +1150,10 @@ void RGWRados::finalize()
     v1_topic_migration.stop();
   }
 
+  if (run_bucket_logging_thread) {
+    rgw::bucketlogging::shutdown();
+  }
+
   if (use_restore_thread) {
     restore->stop_processor();
   }
@@ -1276,6 +1281,10 @@ int RGWRados::init_complete(const DoutPrefixProvider *dpp, optional_yield y, rgw
   if (ret < 0)
     return ret;
 
+  ret = open_logging_pool_ctx(dpp);
+  if (ret < 0)
+    return ret;
+
   pools_initialized = true;
 
   if (use_gc) {
@@ -1428,6 +1437,12 @@ int RGWRados::init_complete(const DoutPrefixProvider *dpp, optional_yield y, rgw
 
   index_completion_manager = new RGWIndexCompletionManager(this);
 
+  if (run_bucket_logging_thread) {
+    if (!rgw::bucketlogging::init(dpp, this->driver, *svc.site)) {
+      ldpp_dout(dpp, 0) << "ERROR: failed to initialize bucket logging manager" << dendl;
+    }
+      ldpp_dout(dpp, 0) << "INFO: initialized bucket logging manager" << dendl;
+  }
   if (run_notification_thread) {
     if (!rgw::notify::init(dpp, driver, *svc.site)) {
       ldpp_dout(dpp, 0) << "ERROR: failed to initialize notification manager" << dendl;
@@ -1448,7 +1463,6 @@ int RGWRados::init_complete(const DoutPrefixProvider *dpp, optional_yield y, rgw
       v1_topic_migration.start(1);
     }
   }
-
   return ret;
 }
 
@@ -1552,6 +1566,11 @@ int RGWRados::open_notif_pool_ctx(const DoutPrefixProvider *dpp)
   return rgw_init_ioctx(dpp, get_rados_handle(), svc.zone->get_zone_params().notif_pool, notif_pool_ctx, true, true);
 }
 
+int RGWRados::open_logging_pool_ctx(const DoutPrefixProvider *dpp)
+{
+  return rgw_init_ioctx(dpp, get_rados_handle(), svc.zone->get_zone_params().bucket_logging_pool, logging_pool_ctx, true, true);
+}
+
 int RGWRados::open_pool_ctx(const DoutPrefixProvider *dpp, const rgw_pool& pool, librados::IoCtx& io_ctx,
                            bool mostly_omap, bool bulk)
 {
index 38e2f3dcb7da18cbbcb40692a22a799871644822..a1dea9067246c205bb6b33c71054a1e8b7b1016e 100644 (file)
@@ -341,6 +341,7 @@ class RGWRados
   int open_objexp_pool_ctx(const DoutPrefixProvider *dpp);
   int open_reshard_pool_ctx(const DoutPrefixProvider *dpp);
   int open_notif_pool_ctx(const DoutPrefixProvider *dpp);
+  int open_logging_pool_ctx(const DoutPrefixProvider *dpp);
 
   int open_pool_ctx(const DoutPrefixProvider *dpp, const rgw_pool& pool, librados::IoCtx&  io_ctx,
                    bool mostly_omap, bool bulk);
@@ -361,6 +362,7 @@ class RGWRados
   bool run_sync_thread{false};
   bool run_reshard_thread{false};
   bool run_notification_thread{false};
+  bool run_bucket_logging_thread{false};
 
   RGWMetaNotifier* meta_notifier{nullptr};
   RGWDataNotifier* data_notifier{nullptr};
@@ -436,6 +438,7 @@ protected:
   librados::IoCtx objexp_pool_ctx;
   librados::IoCtx reshard_pool_ctx;
   librados::IoCtx notif_pool_ctx;     // .rgw.notif
+  librados::IoCtx logging_pool_ctx;     // .rgw.logging
 
   bool pools_initialized{false};
 
@@ -523,6 +526,11 @@ public:
     return *this;
   }
 
+  RGWRados& set_run_bucket_logging_thread(bool _run_bucket_logging_thread) {
+    run_bucket_logging_thread = _run_bucket_logging_thread;
+    return *this;
+  }
+
   librados::IoCtx* get_lc_pool_ctx() {
     return &lc_pool_ctx;
   }
@@ -538,6 +546,10 @@ public:
   librados::IoCtx& get_notif_pool_ctx() {
     return notif_pool_ctx;
   }
+
+  librados::IoCtx& get_logging_pool_ctx() {
+    return logging_pool_ctx;
+  }
   
   void set_context(CephContext *_cct) {
     cct = _cct;
index b193a0a4c5c70262d9028659919117185c3bfdeb..5228f1f1cad2b207a5c70df3750d352363b8f3f6 100644 (file)
@@ -41,6 +41,7 @@
 #include "rgw_aio_throttle.h"
 #include "rgw_bucket.h"
 #include "rgw_bucket_logging.h"
+#include "rgw_bl_rados.h"
 #include "rgw_lc.h"
 #include "rgw_lc_tier.h"
 #include "rgw_lc_tier.h"
@@ -1097,6 +1098,7 @@ int RadosBucket::remove_topics(RGWObjVersionTracker* objv_tracker,
 
 
 #define RGW_ATTR_COMMITTED_LOGGING_OBJ RGW_ATTR_PREFIX "committed-logging-obj"
+#define RGW_ATTR_LOGGING_EC_POOL RGW_ATTR_PREFIX "logging-ec-pool"
 
 int get_committed_logging_object(RadosStore* store,
     const std::string& obj_name_oid,
@@ -1147,6 +1149,88 @@ int set_committed_logging_object(RadosStore* store,
   return rgw_rados_operate(dpp, io_ctx, obj_name_oid, std::move(op), y);
 }
 
+int get_logging_temp_object_pool (const DoutPrefixProvider *dpp,
+    RadosBucket* bucket,
+    RadosStore* rados_store,
+    const std::string& obj_name_oid,
+    rgw_pool& data_pool, bool& is_ec_pool, optional_yield y) {
+
+  int ret;
+  rgw_obj obj {bucket->get_key(), "dummy"};
+  if (!rados_store->getRados()->get_obj_data_pool(bucket->get_placement_rule(), obj, &data_pool)) {
+    ldpp_dout(dpp, 1) << "ERROR: failed to get data pool for bucket '" << bucket->get_key() <<
+        "' when writing logging object" << dendl;
+    return -EIO;
+  }
+
+  // Find out if the log bucket is on an EC pool
+  // Use an attribute on the logging name object instead of the bucket so
+  // it is not synced in a multi-site setup
+
+  librados::Rados* rados = rados_store->getRados()->get_rados_handle();
+  librados::IoCtx io_ctx;
+  if (ret = rgw_init_ioctx(dpp, rados, data_pool, io_ctx); ret < 0) {
+    ldpp_dout(dpp, 1) << "ERROR: get data pool ioctx for bucket '" << bucket->get_key() <<
+        "' when writing logging object. ret = " << ret << dendl;
+    return ret;
+  }
+  if (!io_ctx.is_valid()) {
+    ldpp_dout(dpp, 1) << "ERROR: invalid data pool ioctx for bucket '" << bucket->get_key() <<
+        "' when writing logging object" << dendl;
+    return ret;
+  }
+
+  is_ec_pool = false;
+  bufferlist bl;
+  librados::ObjectReadOperation op;
+  int rval;
+  op.getxattr(RGW_ATTR_LOGGING_EC_POOL, &bl, &rval);
+  ret = rgw_rados_operate(dpp, io_ctx, obj_name_oid, std::move(op), nullptr, y);
+  if (ret == 0) {
+    try {
+      ceph::decode(is_ec_pool, bl);
+    } catch (buffer::error& err) {
+      ldpp_dout(dpp, 1) << "ERROR: failed to decode " << RGW_ATTR_LOGGING_EC_POOL
+                        << " attr for bucket '" << bucket->get_key()
+                        << "'. error = " << err.what() << dendl;
+      return -EINVAL;
+    }
+  } else if (ret < 0){
+    // Try to find out if it is an EC pool
+    ret = rados->mon_command(
+      "{\"prefix\": \"osd pool get\", \"pool\": \"" +
+      data_pool.name + "\", \"var\": \"erasure_code_profile\"}",
+      {}, NULL, NULL);
+    if (ret == -EACCES) {
+      is_ec_pool = false;
+    } else if (ret == 0){
+      is_ec_pool = true;
+    } else {
+      ldpp_dout(dpp, 10) << __func__ << " ERROR: failed to find out if pool"
+                         << data_pool.name << " is erasure coded. ret = "
+                         << ret << dendl;
+      return ret;
+    }
+    bufferlist bl;
+    ceph::encode(is_ec_pool, bl);
+    librados::ObjectWriteOperation op;
+    // if object does not exist, we should not create the attribute
+    op.assert_exists();
+    op.setxattr(RGW_ATTR_LOGGING_EC_POOL, std::move(bl));
+    if (const int ret = rgw_rados_operate(dpp, io_ctx, obj_name_oid, std::move(op), y); ret < 0){
+      // Don't return an error as we know if it is an ec pool
+      ldpp_dout(dpp, 10) << "ERROR: failed to set is_ec_pool attr on :" << obj_name_oid
+                         << " ret: " << ret << dendl;
+    }
+  }
+  // If the target log bucket is on an EC pool, the temp
+  // logging object will be created in the default.rgw.log pool
+  if (is_ec_pool) {
+    data_pool = rados_store->svc()->zone->get_zone_params().bucket_logging_pool;
+  }
+  return 0;
+}
+
 int RadosBucket::get_logging_object_name(std::string& obj_name,
     const std::string& prefix,
     optional_yield y,
@@ -1243,14 +1327,14 @@ std::string to_temp_object_name(const rgw::sal::Bucket* bucket, const std::strin
       obj_name);
 }
 
-int RadosBucket::remove_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp) {
+int RadosBucket::remove_logging_object(const std::string& obj_name, const std::string& prefix, optional_yield y, const DoutPrefixProvider *dpp) {
   rgw_pool data_pool;
-  const rgw_obj head_obj{get_key(), obj_name};
-  const auto placement_rule = get_placement_rule();
-
-  if (!store->getRados()->get_obj_data_pool(placement_rule, head_obj, &data_pool)) {
-    ldpp_dout(dpp, 1) << "ERROR: failed to get data pool for bucket '" << get_key() <<
-      "' when deleting logging object << dendl;
+  const auto obj_name_oid = bucketlogging::object_name_oid(this, prefix);
+  bool is_ec_pool;
+  auto ret = get_logging_temp_object_pool (dpp, this, store, obj_name_oid, data_pool, is_ec_pool, y);
+  if (ret < 0) {
+    ldpp_dout(dpp, 1) << "ERROR: failed to get temp data pool for bucket '" << get_key() <<
+      "' when deleting logging object: " << obj_name << ". ret: " << ret << dendl;
     return -EIO;
   }
 
@@ -1266,7 +1350,7 @@ int RadosBucket::commit_logging_object(const std::string& obj_name,
     optional_yield y,
     const DoutPrefixProvider *dpp,
     const std::string& prefix,
-    std::string* last_committed) {
+    std::string* last_committed, bool async) {
   rgw_pool data_pool;
   const rgw_obj head_obj{get_key(), obj_name};
   const auto placement_rule = get_placement_rule();
@@ -1276,9 +1360,10 @@ int RadosBucket::commit_logging_object(const std::string& obj_name,
       "' when committing logging object"  << dendl;
     return -EIO;
   }
+  int ret;
   const auto obj_name_oid = bucketlogging::object_name_oid(this, prefix);
   if (last_committed) {
-    if (const int ret = get_committed_logging_object(store,
+    if (ret = get_committed_logging_object(store,
           obj_name_oid,
           data_pool,
           y,
@@ -1296,11 +1381,34 @@ int RadosBucket::commit_logging_object(const std::string& obj_name,
   }
 
   const auto temp_obj_name = to_temp_object_name(this, obj_name);
+  rgw_pool temp_data_pool;
+  bool is_ec_pool = false;
+
+  ret = get_logging_temp_object_pool (dpp, this, store, obj_name_oid, temp_data_pool, is_ec_pool, y);
+  if (ret < 0) {
+    ldpp_dout(dpp, 1) << "ERROR: failed to get data pool for bucket '"
+                      << get_key() << "' when committing logging object: "
+                      << obj_name << ". ret: " << ret << dendl;
+    return -EIO;
+  }
+  if (async) {
+    ret = rgw::bucketlogging::add_commit_target_entry(dpp, store, this, prefix,
+                                                      obj_name, temp_obj_name,
+                                                      temp_data_pool, y);
+    if (ret < 0){
+      return ret;
+    }
+
+    ldpp_dout(dpp, 20) << "INFO: added logging object entry " << obj_name
+                       << " to commit list object." << dendl;
+    return 0;
+  }
+
   std::map<string, bufferlist> obj_attrs;
   ceph::real_time mtime;
   bufferlist bl_data;
-  if (auto ret = rgw_get_system_obj(store->svc()->sysobj,
-                     data_pool,
+  if (ret = rgw_get_system_obj(store->svc()->sysobj,
+                     temp_data_pool,
                      temp_obj_name,
                      bl_data,
                      nullptr,
@@ -1316,6 +1424,36 @@ int RadosBucket::commit_logging_object(const std::string& obj_name,
     }
     mtime = ceph::real_time::clock::now();
     ldpp_dout(dpp, 20) << "INFO: temporary logging object '" << temp_obj_name << "' does not exist. committing it empty" << dendl;
+  } else if (is_ec_pool) {
+    if (ret = rgw_put_system_obj(dpp, store->svc()->sysobj,
+                             data_pool,
+                             temp_obj_name,
+                             bl_data,
+                             true,
+                             nullptr,
+                             mtime,
+                             y,
+                             &obj_attrs); ret < 0){
+
+      if (ret == -EEXIST) {
+        ldpp_dout(dpp, 5) << "WARNING: race detected in committing logging object '" << temp_obj_name << dendl;
+      } else {
+        ldpp_dout(dpp, 1) << "ERROR: failed to write logging data when committing object '" << temp_obj_name
+                          << ". error: " << ret << dendl;
+      }
+      return ret;
+    }
+    ldpp_dout(dpp, 20) << "INFO: wrote logging data when committing object '" << temp_obj_name << dendl;
+
+    if (ret = rgw_delete_system_obj(dpp, store->svc()->sysobj,
+                                temp_data_pool,
+                                temp_obj_name,
+                                nullptr,
+                                y); ret < 0 && ret != -ENOENT) {
+      ldpp_dout(dpp, 1) << "ERROR: failed to delete temp logging object when "
+                        << "committing object '" << temp_obj_name
+                        << ". error: " << ret << dendl;
+    }
   }
 
   uint64_t size = bl_data.length();
@@ -1424,22 +1562,29 @@ void bucket_logging_completion(rados_completion_t completion, void* args) {
 
 int RadosBucket::write_logging_object(const std::string& obj_name,
     const std::string& record,
+    const std::string& prefix,
     optional_yield y,
     const DoutPrefixProvider *dpp,
     bool async_completion) {
   const auto temp_obj_name = to_temp_object_name(this, obj_name);
   rgw_pool data_pool;
-  rgw_obj obj{get_key(), obj_name};
-  if (!store->getRados()->get_obj_data_pool(get_placement_rule(), obj, &data_pool)) {
-    ldpp_dout(dpp, 1) << "ERROR: failed to get data pool for bucket '" << get_key() <<
-      "' when writing logging object" << dendl;
+
+  bool is_ec_pool = false;
+  const auto obj_name_oid = bucketlogging::object_name_oid(this, prefix);
+  int ret = get_logging_temp_object_pool (dpp, this, store, obj_name_oid, data_pool, is_ec_pool, y);
+  if (ret < 0) {
+    ldpp_dout(dpp, 1) << "ERROR: failed to get data pool for bucket '"
+                      << get_key() << "' when writing logging object "
+                      << temp_obj_name << ", ret: " << ret << dendl;
     return -EIO;
   }
+
   librados::IoCtx io_ctx;
-  if (const auto ret = rgw_init_ioctx(dpp, store->getRados()->get_rados_handle(), data_pool, io_ctx); ret < 0) {
+  if (const auto ret = rgw_init_ioctx(dpp, store->getRados()->get_rados_handle(), data_pool, io_ctx, true); ret < 0) {
     ldpp_dout(dpp, 1) << "ERROR: failed to get IO context for logging object from data pool:" << data_pool.to_str() << dendl;
     return -EIO;
   }
+
   bufferlist bl;
   bl.append(record);
   bl.append("\n");
index 5fa42b6c4d45a119b3e34d45d1b6e37dedabdcd7..42f4331192e35808cd301c2c18651be1beb0235e 100644 (file)
@@ -804,9 +804,11 @@ class RadosBucket : public StoreBucket {
         optional_yield y,
         const DoutPrefixProvider *dpp,
         RGWObjVersionTracker* objv_tracker) override;
-    int commit_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp, const std::string& prefix, std::string* last_committed) override;
-    int remove_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp) override;
-    int write_logging_object(const std::string& obj_name, const std::string& record, optional_yield y, const DoutPrefixProvider *dpp, bool async_completion) override;
+    int commit_logging_object(const std::string& obj_name, optional_yield y,
+       const DoutPrefixProvider *dpp, const std::string& prefix,
+       std::string* last_committed, bool async) override;
+    int remove_logging_object(const std::string& obj_name, const std::string& prefix, optional_yield y, const DoutPrefixProvider *dpp) override;
+    int write_logging_object(const std::string& obj_name, const std::string& record, const std::string& prefix, optional_yield y, const DoutPrefixProvider *dpp, bool async_completion) override;
 
   private:
     int link(const DoutPrefixProvider* dpp, const rgw_owner& new_owner, optional_yield y, bool update_entrypoint = true, RGWObjVersionTracker* objv = nullptr);
index 769b7cc5e2677740cac5853f84b5b5a96eca1c0f..95ee9adcf2b2ff56028bf163724e8dc861b4e442 100644 (file)
@@ -91,6 +91,7 @@ extern "C" {
 
 #include "driver/rados/rgw_bucket.h"
 #include "driver/rados/rgw_sal_rados.h"
+#include "driver/rados/rgw_bl_rados.h"
 
 #include <iomanip>
 
@@ -192,6 +193,7 @@ void usage()
   cout << "  bucket radoslist                 list rados objects backing bucket's objects\n";
   cout << "  bucket logging flush             flush pending log records object of source bucket to the log bucket\n";
   cout << "  bucket logging info              get info on bucket logging configuration on source bucket or list of sources in log bucket\n";
+  cout << "  bucket logging list              list the log objects pending commit for the source bucket\n";
   cout << "  bi get                           retrieve bucket index object entries\n";
   cout << "  bi put                           store bucket index object entries\n";
   cout << "  bi list                          list raw bucket index entries\n";
@@ -738,6 +740,7 @@ enum class OPT {
   BUCKET_RESYNC_ENCRYPTED_MULTIPART,
   BUCKET_LOGGING_FLUSH,
   BUCKET_LOGGING_INFO,
+  BUCKET_LOGGING_LIST,
   POLICY,
   LOG_LIST,
   LOG_SHOW,
@@ -990,6 +993,7 @@ static SimpleCmd::Commands all_cmds = {
   { "bucket resync encrypted multipart", OPT::BUCKET_RESYNC_ENCRYPTED_MULTIPART },
   { "bucket logging flush", OPT::BUCKET_LOGGING_FLUSH },
   { "bucket logging info", OPT::BUCKET_LOGGING_INFO },
+  { "bucket logging list", OPT::BUCKET_LOGGING_LIST },
   { "policy", OPT::POLICY },
   { "log list", OPT::LOG_LIST },
   { "log show", OPT::LOG_SHOW },
@@ -4703,6 +4707,7 @@ int main(int argc, const char **argv)
                                        false,
                                        false,
                                         false,
+                                        false,
                                        false, // No background tasks!
                                         null_yield,
                                        cfgstore.get(),
@@ -7900,7 +7905,7 @@ int main(int argc, const char **argv)
     }
     std::string old_obj;
     const auto region = driver->get_zone()->get_zonegroup().get_api_name();
-    ret = rgw::bucketlogging::rollover_logging_object(configuration, target_bucket, obj_name, dpp(), region, bucket, null_yield, true, &objv_tracker, &old_obj);
+    ret = rgw::bucketlogging::rollover_logging_object(configuration, target_bucket, obj_name, dpp(), region, bucket, null_yield, true, &objv_tracker, false, &old_obj);
     if (ret < 0) {
       cerr << "ERROR: failed to flush pending logging object '" << obj_name << "' to target bucket '" << configuration.target_bucket
         << "'. error: " << cpp_strerror(-ret) << std::endl;
@@ -7952,6 +7957,55 @@ int main(int argc, const char **argv)
     return 0;
   }
 
+  if (opt_cmd == OPT::BUCKET_LOGGING_LIST) {
+    if (bucket_name.empty()) {
+      cerr << "ERROR: bucket not specified" << std::endl;
+      return EINVAL;
+    }
+    if (driver->get_name() != "rados") {
+      cerr << "ERROR: this command is only available with the RADOS driver." << std::endl;
+      return EINVAL;
+    }
+
+    int ret = init_bucket(tenant, bucket_name, bucket_id, &bucket);
+    if (ret < 0) {
+      return -ret;
+    }
+
+    rgw::bucketlogging::configuration configuration;
+    std::unique_ptr<rgw::sal::Bucket> target_bucket;
+    ret =  rgw::bucketlogging::get_target_and_conf_from_source(dpp(),
+         driver, bucket.get(), tenant, configuration, target_bucket, null_yield);
+    if (ret < 0 && ret != -ENODATA) {
+      cerr << "ERROR: failed to get target bucket and logging conf from source bucket '"
+        << bucket_name << "': " << cpp_strerror(-ret) << std::endl;
+      return -ret;
+    } else if (ret == -ENODATA) {
+      cerr << "ERROR: bucket '" << bucket_name << "' does not have logging enabled" << std::endl;
+      return 0;
+    }
+    std::string target_prefix = configuration.target_prefix;
+    std::set<std::string> entries;
+
+    ret = rgw::bucketlogging::list_pending_commit_objects(dpp(),
+        static_cast<rgw::sal::RadosStore*>(driver), target_bucket.get(),
+        target_prefix, entries, null_yield);
+
+    if (ret < 0) {
+      cerr << "ERROR: failed to get pending log entries for bucket '" << bucket_name
+           << "': " << cpp_strerror(-ret) << std::endl;
+      return ret;
+    }
+
+    formatter->open_array_section("pending_logs");
+    for (auto &entry: entries) {
+        formatter->dump_string("log", entry);
+    }
+    formatter->close_section(); // objs
+    formatter->flush(cout);
+    return 0;
+  }
+
   if (opt_cmd == OPT::LOG_LIST) {
     // filter by date?
     if (date.size() && date.size() != 10) {
index 3ccd2c18a04cc83270ed981a9691bba3d01981fd..a5926cf9024261969f3ef0d5dfc27dee3f3703fe 100644 (file)
@@ -258,6 +258,7 @@ int rgw::AppMain::init_storage()
           run_sync,
           g_conf().get_val<bool>("rgw_dynamic_resharding"),
          true, // run notification thread
+         true, // run bucket-logging thread
          true, null_yield, env.cfgstore,
           g_conf()->rgw_cache_enabled);
   if (!env.driver) {
index eedc822e9b1f364d4e649f95dff3f1924797c9c3..bfb76a46c77d8e4adfa1638976e0eec0751a0fc3 100644 (file)
@@ -334,7 +334,7 @@ int commit_logging_object(const configuration& conf,
       target_bucket->get_key() << "'. ret = " << ret << dendl;
     return ret;
   }
-  if (const int ret = target_bucket->commit_logging_object(obj_name, y, dpp, conf.target_prefix, last_committed); ret < 0) {
+  if (const int ret = target_bucket->commit_logging_object(obj_name, y, dpp, conf.target_prefix, last_committed, false); ret < 0) {
     ldpp_dout(dpp, 1) << "ERROR: failed to commit logging object '" << obj_name << "' of logging bucket '" <<
       target_bucket->get_key() << "'. ret = " << ret << dendl;
     return ret;
@@ -351,6 +351,7 @@ int rollover_logging_object(const configuration& conf,
     optional_yield y,
     bool must_commit,
     RGWObjVersionTracker* objv_tracker,
+    bool async,
     std::string* last_committed,
     std::string* err_message) {
   std::string target_bucket_name;
@@ -393,7 +394,7 @@ int rollover_logging_object(const configuration& conf,
       return ret;
     }
   }
-  if (const int ret = target_bucket->commit_logging_object(*old_obj, y, dpp, conf.target_prefix, last_committed); ret < 0) {
+  if (const int ret = target_bucket->commit_logging_object(*old_obj, y, dpp, conf.target_prefix, last_committed, async); ret < 0) {
     if (must_commit) {
       if (err_message) {
         *err_message = fmt::format("Failed to commit logging object of logging bucket '{}'", target_bucket->get_name());
@@ -516,7 +517,7 @@ int log_record(rgw::sal::Driver* driver,
     if (ceph::coarse_real_time::clock::now() > time_to_commit) {
       ldpp_dout(dpp, 20) << "INFO: logging object '" << obj_name << "' exceeded its time, will be committed to logging bucket '" <<
         target_bucket_id << "'" << dendl;
-      if (ret = rollover_logging_object(conf, target_bucket, obj_name, dpp, region, s->bucket, y, false, &objv_tracker, nullptr, &err_message); ret < 0 && ret != -ECANCELED) {
+      if (ret = rollover_logging_object(conf, target_bucket, obj_name, dpp, region, s->bucket, y, false, &objv_tracker, true, nullptr, &err_message); ret < 0 && ret != -ECANCELED) {
         set_journal_err(err_message);
         return ret;
       }
@@ -656,6 +657,7 @@ int log_record(rgw::sal::Driver* driver,
 
   if (ret = target_bucket->write_logging_object(obj_name,
         record,
+        conf.target_prefix,
         y,
         dpp,
         async_completion); ret < 0 && ret != -EFBIG) {
@@ -667,12 +669,13 @@ int log_record(rgw::sal::Driver* driver,
   if (ret == -EFBIG) {
     ldpp_dout(dpp, 5) << "WARNING: logging object '" << obj_name << "' is full, will be committed to logging bucket '" <<
       target_bucket->get_key() << "'" << dendl;
-    if (ret = rollover_logging_object(conf, target_bucket, obj_name, dpp, region, s->bucket, y, true, &objv_tracker, nullptr, &err_message); ret < 0 && ret != -ECANCELED) {
+    if (ret = rollover_logging_object(conf, target_bucket, obj_name, dpp, region, s->bucket, y, true, &objv_tracker, true, nullptr, &err_message); ret < 0 && ret != -ECANCELED) {
       set_journal_err(err_message);
       return ret;
     }
     if (ret = target_bucket->write_logging_object(obj_name,
         record,
+        conf.target_prefix,
         y,
         dpp,
         async_completion); ret < 0) {
@@ -849,7 +852,7 @@ int bucket_deletion_cleanup(const DoutPrefixProvider* dpp,
         }
         ldpp_dout(dpp, 20) << "INFO: successfully deleted object holding bucket logging object name from deleted logging bucket '" <<
           bucket->get_key() << "'" << dendl;
-        if (const int ret = bucket->remove_logging_object(obj_name, y, dpp); ret < 0) {
+        if (const int ret = bucket->remove_logging_object(obj_name, conf.target_prefix, y, dpp); ret < 0) {
           ldpp_dout(dpp, 5) << "WARNING: failed to delete pending logging object '" << obj_name << "' for logging bucket '" <<
             bucket->get_key() << "' during cleanup. ret = " << ret << dendl;
           continue;
@@ -948,7 +951,7 @@ int source_bucket_cleanup(const DoutPrefixProvider* dpp,
   ldpp_dout(dpp, 20) << "INFO: successfully deleted object holding bucket logging object name for bucket '" <<
       bucket->get_key() << "' during source cleanup" << dendl;
   // since the object holding the name was deleted, we cannot fetch the last commited name from it
-  if (const int ret = target_bucket->commit_logging_object(obj_name, y, dpp, conf->target_prefix, nullptr); ret < 0) {
+  if (const int ret = target_bucket->commit_logging_object(obj_name, y, dpp, conf->target_prefix, nullptr, false); ret < 0) {
     ldpp_dout(dpp, 5) << "WARNING: could not commit pending logging object of bucket '" <<
       bucket->get_key() << "' during source cleanup. ret = " << ret << dendl;
     return 0;
index 1238ef4f359b5eeed8c633f42916dc9f67330a79..6d5b70bc9c94147c7b10fb849b4818a311c7adce 100644 (file)
@@ -185,6 +185,7 @@ int rollover_logging_object(const configuration& conf,
     optional_yield y,
     bool must_commit,
     RGWObjVersionTracker* objv_tracker,
+    bool async,
     std::string* last_committed,
     std::string* err_message = nullptr);
 
index 02a5ca85e960ecc2913fb24ccb890b1760c93725..be2546d61ece4edc55ea1dc36c37ab11b5d0b87f 100644 (file)
@@ -104,7 +104,7 @@ int main(const int argc, const char **argv)
     exit(1);
   }
 
-  driver = DriverManager::get_storage(&dp, g_ceph_context, cfg, context_pool, site, false, false, false, false, false, false, false, true, null_yield, cfgstore.get());
+  driver = DriverManager::get_storage(&dp, g_ceph_context, cfg, context_pool, site, false, false, false, false, false, false, false, false, true, null_yield, cfgstore.get());
   if (!driver) {
     std::cerr << "couldn't init storage provider" << std::endl;
     return EIO;
index 1f206a1ca8e5254df8c35e90e6bc8f1745ae3748..d8c034fc571df3a13e6868e412c05e96a271ad6e 100644 (file)
@@ -128,7 +128,10 @@ void RGWRealmReloader::reload()
           cct->_conf->rgw_enable_quota_threads,
           cct->_conf->rgw_run_sync_thread,
           cct->_conf.get_val<bool>("rgw_dynamic_resharding"),
-               true, true, null_yield, env.cfgstore, // run notification thread
+          true,
+          true, // run notification thread
+          true, // run bucket logging thread
+          null_yield, env.cfgstore,
           cct->_conf->rgw_cache_enabled);
     }
 
index ba0374a94fe645b09e0e539ebcb5f7dca70403aa..9aa03204c3cd15cab50eb92c79438f17f3e6cd59 100644 (file)
@@ -327,6 +327,7 @@ class RGWPutBucketLoggingOp : public RGWDefaultResponseOp {
             y,
             false, // rollover should happen even if commit failed
             &objv_tracker,
+            false,
             &old_obj); ret < 0) {
         ldpp_dout(this, 1) << "WARNING: failed to flush pending logging object '" << obj_name << "'"
             << " to target bucket '" << target_bucket_id << "'. "
@@ -435,7 +436,7 @@ class RGWPostBucketLoggingOp : public RGWDefaultResponseOp {
       ldpp_dout(this, 5) << "INFO: no pending logging object in logging bucket '" << target_bucket_id << "'. new object should be created" << dendl;
     }
     const auto region = driver->get_zone()->get_zonegroup().get_api_name();
-    op_ret = rgw::bucketlogging::rollover_logging_object(configuration, target_bucket, obj_name, this, region, source_bucket, y, true, &objv_tracker, &old_obj);
+    op_ret = rgw::bucketlogging::rollover_logging_object(configuration, target_bucket, obj_name, this, region, source_bucket, y, true, &objv_tracker, false, &old_obj);
     if (op_ret < 0) {
       ldpp_dout(this, 1) << "ERROR: failed to flush pending logging object '" << obj_name << "'"
           << " to logging bucket '" << target_bucket_id << "'. "
index 8670d37d64557ed41edb3f0f8c23332217ac1ed9..7fe620955c1fc0b96fe6250103b3b1438decf11a 100644 (file)
@@ -106,7 +106,8 @@ rgw::sal::Driver* DriverManager::init_storage_provider(const DoutPrefixProvider*
                                                     bool quota_threads,
                                                     bool run_sync_thread,
                                                     bool run_reshard_thread,
-                 bool run_notification_thread,
+                                                    bool run_notification_thread,
+                                                    bool run_bucket_logging_thread,
                                                     bool use_cache,
                                                     bool use_gc,
                                                     bool background_tasks,
@@ -129,6 +130,7 @@ rgw::sal::Driver* DriverManager::init_storage_provider(const DoutPrefixProvider*
                 .set_run_sync_thread(run_sync_thread)
                 .set_run_reshard_thread(run_reshard_thread)
                 .set_run_notification_thread(run_notification_thread)
+                .set_run_bucket_logging_thread(run_bucket_logging_thread)
                      .init_begin(cct, dpp, background_tasks, site_config, cfgstore) < 0) {
       delete driver;
       return nullptr;
@@ -163,6 +165,7 @@ rgw::sal::Driver* DriverManager::init_storage_provider(const DoutPrefixProvider*
                 .set_run_sync_thread(run_sync_thread)
                 .set_run_reshard_thread(run_reshard_thread)
                 .set_run_notification_thread(run_notification_thread)
+                .set_run_bucket_logging_thread(run_bucket_logging_thread)
                      .init_begin(cct, dpp, background_tasks, site_config, cfgstore) < 0) {
       delete driver;
       return nullptr;
index 2d820af5dea8a48c4dd210fceb6037de7f207602..9aff21eada505e4f83b04806c9a5a0b8c847e4f7 100644 (file)
@@ -1050,12 +1050,15 @@ class Bucket {
         RGWObjVersionTracker* objv_tracker) = 0;
     /** Move the pending bucket logging object into the bucket
      if "last_committed" is not null, it will be set to the name of the last committed object
+     if async is true, write the entry to the commit lists to be processed by the BucketLoggingManager
      * */
-    virtual int commit_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp, const std::string& prefix, std::string* last_committed) = 0;
+    virtual int commit_logging_object(const std::string& obj_name, optional_yield y,
+       const DoutPrefixProvider *dpp, const std::string& prefix,
+       std::string* last_committed, bool async) = 0;
     //** Remove the pending bucket logging object */
-    virtual int remove_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp) = 0;
+    virtual int remove_logging_object(const std::string& obj_name, const std::string& prefix, optional_yield y, const DoutPrefixProvider *dpp) = 0;
     /** Write a record to the pending bucket logging object */
-    virtual int write_logging_object(const std::string& obj_name, const std::string& record, optional_yield y, const DoutPrefixProvider *dpp, bool async_completion) = 0;
+    virtual int write_logging_object(const std::string& obj_name, const std::string& record, const std::string& prefix, optional_yield y, const DoutPrefixProvider *dpp, bool async_completion) = 0;
 
     /* dang - This is temporary, until the API is completed */
     virtual rgw_bucket& get_key() = 0;
@@ -1946,6 +1949,7 @@ public:
                                      bool run_sync_thread,
                                      bool run_reshard_thread,
                                      bool run_notification_thread,
+                                     bool run_bucket_logging_thread,
                                      bool background_tasks,
                                      optional_yield y,
               rgw::sal::ConfigStore* cfgstore,
@@ -1961,6 +1965,7 @@ public:
                                                   run_sync_thread,
                                                   run_reshard_thread,
                run_notification_thread,
+                                                  run_bucket_logging_thread,
                                                   use_cache, use_gc,
                                                   background_tasks, y, cfgstore, admin);
     return driver;
@@ -1989,7 +1994,8 @@ public:
                                                bool quota_threads,
                                                bool run_sync_thread,
                                                bool run_reshard_thread,
-            bool run_notification_thread,
+                                                bool run_notification_thread,
+                                                bool run_bucket_logging_thread,
                                                bool use_metadata_cache,
                                                bool use_gc, bool background_tasks,
                                                optional_yield y, rgw::sal::ConfigStore* cfgstore, bool admin);
index c3f628badedb5df7fc2c14f46eee4fa6f2813bd5..a7b7e4b964281069b89d07109bc333177f242126 100644 (file)
@@ -697,14 +697,14 @@ public:
       RGWObjVersionTracker* objv_tracker) override {
     return next->remove_logging_object_name(prefix, y, dpp, objv_tracker);
   }
-  int commit_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp, const std::string& prefix, std::string* last_committed) override {
-    return next->commit_logging_object(obj_name, y, dpp, prefix, last_committed);
+  int commit_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp, const std::string& prefix, std::string* last_committed, bool async) override {
+    return next->commit_logging_object(obj_name, y, dpp, prefix, last_committed, async);
   }
-  int remove_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp) override {
-    return next->remove_logging_object(obj_name, y, dpp);
+  int remove_logging_object(const std::string& obj_name, const std::string& prefix, optional_yield y, const DoutPrefixProvider *dpp) override {
+    return next->remove_logging_object(obj_name, prefix, y, dpp);
   }
-  int write_logging_object(const std::string& obj_name, const std::string& record, optional_yield y, const DoutPrefixProvider *dpp, bool async_completion) override {
-    return next->write_logging_object(obj_name, record, y, dpp, async_completion);
+  int write_logging_object(const std::string& obj_name, const std::string& record, const std::string& prefix, optional_yield y, const DoutPrefixProvider *dpp, bool async_completion) override {
+    return next->write_logging_object(obj_name, record, prefix, y, dpp, async_completion);
   }
 
   virtual rgw_bucket& get_key() override { return next->get_key(); }
index d64403d7f1d06107c68447163f32897a2af6c299..0f2f91f5fef76749fb392c8ce8279001070f127f 100644 (file)
@@ -267,9 +267,9 @@ class StoreBucket : public Bucket {
         optional_yield y,
         const DoutPrefixProvider *dpp,
         RGWObjVersionTracker* objv_tracker) override { return 0; }
-    int commit_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp, const std::string& prefix, std::string* last_committed) override { return 0; }
-    int remove_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp) override { return 0; }
-    int write_logging_object(const std::string& obj_name, const std::string& record, optional_yield y, const DoutPrefixProvider *dpp, bool async_completion) override {
+    int commit_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp, const std::string& prefix, std::string* last_committed, bool async) override { return 0; }
+    int remove_logging_object(const std::string& obj_name, const std::string& prefix, optional_yield y, const DoutPrefixProvider *dpp) override { return 0; }
+    int write_logging_object(const std::string& obj_name, const std::string& record, const std::string& prefix, optional_yield y, const DoutPrefixProvider *dpp, bool async_completion) override {
       return 0;
     }
 
index 9c5cb513939704e65cb7cc68e861e84fb008f800..ad08e4b69beed43f10e45d13f0166669ec69f9ba 100644 (file)
@@ -166,6 +166,7 @@ void RGWZoneParams::decode_json(JSONObj *obj)
   JSONDecoder::decode_json("topics_pool", topics_pool, obj);
   JSONDecoder::decode_json("account_pool", account_pool, obj);
   JSONDecoder::decode_json("group_pool", group_pool, obj);
+  JSONDecoder::decode_json("bucket_logging_pool", bucket_logging_pool, obj);
   JSONDecoder::decode_json("system_key", system_key, obj);
   JSONDecoder::decode_json("placement_pools", placement_pools, obj);
   JSONDecoder::decode_json("tier_config", tier_config, obj);
@@ -196,6 +197,7 @@ void RGWZoneParams::dump(Formatter *f) const
   encode_json("topics_pool", topics_pool, f);
   encode_json("account_pool", account_pool, f);
   encode_json("group_pool", group_pool, f);
+  encode_json("bucket_logging_pool", bucket_logging_pool, f);
   encode_json_plain("system_key", system_key, f);
   encode_json("placement_pools", placement_pools, f);
   encode_json("tier_config", tier_config, f);
@@ -243,6 +245,7 @@ void add_zone_pools(const RGWZoneParams& info,
   pools.insert(info.dedup_pool);
   pools.insert(info.gc_pool);
   pools.insert(info.log_pool);
+  pools.insert(info.bucket_logging_pool);
   pools.insert(info.intent_log_pool);
   pools.insert(info.usage_log_pool);
   pools.insert(info.user_keys_pool);
@@ -845,6 +848,7 @@ int init_zone_pool_names(const DoutPrefixProvider *dpp, optional_yield y,
   info.otp_pool = fix_zone_pool_dup(pools, info.name, ".rgw.otp", info.otp_pool);
   info.oidc_pool = fix_zone_pool_dup(pools, info.name, ".rgw.meta:oidc", info.oidc_pool);
   info.notif_pool = fix_zone_pool_dup(pools, info.name, ".rgw.log:notif", info.notif_pool);
+  info.bucket_logging_pool = fix_zone_pool_dup(pools, info.name, ".rgw.log:logging", info.bucket_logging_pool);
   info.topics_pool =
       fix_zone_pool_dup(pools, info.name, ".rgw.meta:topics", info.topics_pool);
   info.account_pool = fix_zone_pool_dup(pools, info.name, ".rgw.meta:accounts", info.account_pool);
index 3095b921f1d1810f17308ef9bbe517463799fa24..27e0919c924374c08dc6b1a3ee0356f0f6e57cab 100644 (file)
@@ -33,6 +33,7 @@ struct RGWZoneParams {
   rgw_pool account_pool;
   rgw_pool group_pool;
   rgw_pool dedup_pool;
+  rgw_pool bucket_logging_pool;
 
   RGWAccessKey system_key;
 
@@ -62,7 +63,7 @@ struct RGWZoneParams {
   const std::string& get_compression_type(const rgw_placement_rule& placement_rule) const;
   
   void encode(bufferlist& bl) const {
-    ENCODE_START(17, 1, bl);
+    ENCODE_START(18, 1, bl);
     encode(domain_root, bl);
     encode(control_pool, bl);
     encode(gc_pool, bl);
@@ -100,11 +101,12 @@ struct RGWZoneParams {
     encode(group_pool, bl);
     encode(restore_pool, bl);
     encode(dedup_pool, bl);
+    encode(bucket_logging_pool, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::const_iterator& bl) {
-    DECODE_START(17, bl);
+    DECODE_START(18, bl);
     decode(domain_root, bl);
     decode(control_pool, bl);
     decode(gc_pool, bl);
@@ -199,6 +201,11 @@ struct RGWZoneParams {
     } else {
       dedup_pool = name + ".rgw.dedup";
     }
+    if (struct_v >= 18) {
+      decode(bucket_logging_pool, bl);
+    } else {
+      bucket_logging_pool = log_pool.name + ":logging";
+    }
     DECODE_FINISH(bl);
   }
   void dump(Formatter *f) const;
index 998b99860ffad9d9fad96db04c82f2671b28ef3e..12448ec219a424ef57e91ef581a282f088d596d9 100644 (file)
@@ -53,6 +53,7 @@
     bucket radoslist                 list rados objects backing bucket's objects
     bucket logging flush             flush pending log records object of source bucket to the log bucket
     bucket logging info              get info on bucket logging configuration on source bucket or list of sources in log bucket
+    bucket logging list              list the log objects pending commit for the source bucket
     bi get                           retrieve bucket index object entries
     bi put                           store bucket index object entries
     bi list                          list raw bucket index entries
index 2fa0a536f20acf99b5df715e911efd9c7b358bb6..ae9cd683f7fdecd708e553f298480611618e2aea 100644 (file)
@@ -351,7 +351,7 @@ int main(int argc, const char **argv)
                              false,
                              false,
                              false,
-                              true, true, null_yield, cfgstore.get(), 
+                              true, true, true, null_yield, cfgstore.get(),
                              false));
   if (!store) {
     std::cerr << "couldn't init storage provider" << std::endl;