]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw/bucket-logging: support for EC pools
authorN Balachandran <nithya.balachandran@ibm.com>
Mon, 5 Jan 2026 15:48:01 +0000 (21:18 +0530)
committerN Balachandran <nithya.balachandran@ibm.com>
Mon, 5 Jan 2026 15:48:01 +0000 (21:18 +0530)
Log buckets can now be created within erasure-coded (EC) pools.
To support append operations, a temporary log record object is initially
created in the replicated default.rgw.log pool. This object is then copied
to the EC pool upon log record commitment.
All implicit log commit operations will execute asynchronously. A new
BucketLoggingManager class is responsible for processing these pending
commits at set intervals. Explicit commit operations, however, will
continue to be performed synchronously.

Fixes: https://tracker.ceph.com/issues/71365
Signed-off-by: Nithya Balachandran <nithya.balachandran@ibm.com>
(cherry picked from commit fcc5229)

 Conflicts:
src/rgw/CMakeLists.txt
src/rgw/driver/rados/rgw_zone.h
src/rgw/rgw_appmain.cc
src/rgw/rgw_object_expirer.cc
src/rgw/rgw_realm_reloader.cc
src/rgw/rgw_sal.cc
src/rgw/rgw_sal.h
src/test/rgw/rgw_cr_test.cc

Signed-off-by: N Balachandran <nithya.balachandran@ibm.com>
26 files changed:
PendingReleaseNotes
doc/rados/troubleshooting/log-and-debug.rst
doc/radosgw/bucket_logging.rst
src/common/subsys.h
src/rgw/CMakeLists.txt
src/rgw/driver/rados/rgw_bl_rados.cc [new file with mode: 0644]
src/rgw/driver/rados/rgw_bl_rados.h [new file with mode: 0644]
src/rgw/driver/rados/rgw_rados.cc
src/rgw/driver/rados/rgw_rados.h
src/rgw/driver/rados/rgw_sal_rados.cc
src/rgw/driver/rados/rgw_sal_rados.h
src/rgw/driver/rados/rgw_zone.h
src/rgw/radosgw-admin/radosgw-admin.cc
src/rgw/rgw_appmain.cc
src/rgw/rgw_bucket_logging.cc
src/rgw/rgw_bucket_logging.h
src/rgw/rgw_object_expirer.cc
src/rgw/rgw_realm_reloader.cc
src/rgw/rgw_rest_bucket_logging.cc
src/rgw/rgw_sal.cc
src/rgw/rgw_sal.h
src/rgw/rgw_sal_filter.h
src/rgw/rgw_sal_store.h
src/rgw/rgw_zone.cc
src/test/cli/radosgw-admin/help.t
src/test/rgw/rgw_cr_test.cc

index 8f92a8a9801d86b333b7ece9ae24d84d3930613b..982cbe8a8ad1f2f82611171cbde2116984e43dc9 100644 (file)
@@ -1,5 +1,7 @@
 >=20.0.0
 
+* RGW: Bucket Logging suppports creating log buckets in EC pools.
+  Implicit logging object commits are now performed asynchronously.
 * RADOS: leader monitor and stretch mode status are now included in the `ceph status` output.
   Related Tracker: https://tracker.ceph.com/issues/70406
 * RGW: The User Account feature introduced in Squid provides first-class support for
index 46775df9d051f9f63da66c1cd442f15211dd23ba..fbb776e09af9562d92e8b89c96fa14ca9bf43928 100644 (file)
@@ -306,6 +306,8 @@ values to their defaults or to a level suitable for normal operations.
 +--------------------------+-----------+--------------+
 | ``rgw notification``     |     1     |      5       |
 +--------------------------+-----------+--------------+
+| ``rgw bucket logging``   |     1     |      5       |
++--------------------------+-----------+--------------+
 | ``javaclient``           |     1     |      5       |
 +--------------------------+-----------+--------------+
 | ``asok``                 |     1     |      5       |
index 307817d71e0da76e4b4070552c7c0b484e418198..d0f1dec2fe858abd5ac46f0b0a6aa7c1299e41ac 100644 (file)
@@ -54,6 +54,22 @@ them, regardless if enough time passed or if no more records are written to the
 object. Flushing will happen automatically when logging is disabled on a
 bucket, or its logging configuration is changed, or the bucket is deleted.
 
+The process of adding a new log object to the log bucket is asynchronous when
+triggered by a log record being written. Consequently, the operation that
+generated the log is completed immediately and does not wait for the log object
+addition to finish; this addition occurs later.
+The log object is added to the log bucket immediately when flushed. Consequently,
+this action may result in temporary gaps in the log records within the bucket
+until any pending log objects are also added.
+
+To check which log objects for a source bucket are currently pending addition to
+the log bucket, execute the following command:
+
+.. prompt:: bash #
+
+   radosgw-admin bucket logging list --bucket <source bucket>
+
+
 Standard
 ````````
 If the logging type is set to "Standard" (the default) the log records are
index d756124f1a13e5e916a8fe5379f62be241d8f7c9..49eee0b7bb15d7cb5edc58002b8b4dfb29248fe8 100644 (file)
@@ -68,6 +68,7 @@ SUBSYS(rgw_flight, 1, 5)
 SUBSYS(rgw_lifecycle, 1, 5)
 SUBSYS(rgw_restore, 1, 5)
 SUBSYS(rgw_notification, 1, 5)
+SUBSYS(rgw_bucket_logging, 1, 5)
 SUBSYS(javaclient, 1, 5)
 SUBSYS(asok, 1, 5)
 SUBSYS(throttle, 1, 1)
index f6ae85fd4e56c8caea0d8b85d45e7b761ca8a46e..f11a3913ce5ce15f292a74cbf2ae9097c23377f7 100644 (file)
@@ -174,6 +174,7 @@ set(librgw_common_srcs
   driver/rados/groups.cc
   driver/rados/rgw_bucket.cc
   driver/rados/rgw_bucket_sync.cc
+  driver/rados/rgw_bl_rados.cc
   driver/rados/rgw_cr_rados.cc
   driver/rados/rgw_cr_tools.cc
   driver/rados/rgw_d3n_datacache.cc
diff --git a/src/rgw/driver/rados/rgw_bl_rados.cc b/src/rgw/driver/rados/rgw_bl_rados.cc
new file mode 100644 (file)
index 0000000..14b3853
--- /dev/null
@@ -0,0 +1,832 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "rgw_bucket_logging.h"
+#include "rgw_bl_rados.h"
+#include <chrono>
+#include <fmt/format.h>
+#include <future>
+#include <memory>
+#include <thread>
+#include <boost/algorithm/hex.hpp>
+#include <boost/asio/basic_waitable_timer.hpp>
+#include <boost/asio/executor_work_guard.hpp>
+#include <boost/asio/io_context.hpp>
+#include <boost/asio/spawn.hpp>
+#include <boost/context/protected_fixedsize_stack.hpp>
+#include "common/async/yield_context.h"
+#include "common/async/yield_waiter.h"
+#include "common/ceph_time.h"
+#include "common/dout.h"
+#include "cls/lock/cls_lock_client.h"
+#include "include/common_fwd.h"
+#include "include/function2.hpp"
+#include "librados/AioCompletionImpl.h"
+#include "services/svc_zone.h"
+#include "rgw_common.h"
+#include "rgw_perf_counters.h"
+#include "rgw_sal_rados.h"
+#include "rgw_zone_features.h"
+
+
+#define dout_subsys ceph_subsys_rgw_bucket_logging
+
+namespace rgw::bucketlogging {
+
+using commit_list_t = std::set<std::string>;
+
+// use mmap/mprotect to allocate 128k coroutine stacks
+auto make_stack_allocator() {
+  return boost::context::protected_fixedsize_stack{128*1024};
+}
+
+const std::string COMMIT_LIST_OBJECT_NAME = "bucket_logging_global_commit_list";
+static const std::string TEMP_POOL_ATTR = "temp_logging_pool";
+
+static std::string get_commit_list_name (const rgw::sal::Bucket* log_bucket,
+                                         const std::string& prefix) {
+  return fmt::format("{}/{}/{}/{}", log_bucket->get_tenant(),
+                     log_bucket->get_name(),
+                     log_bucket->get_bucket_id(), prefix);
+}
+
+int add_commit_target_entry(const DoutPrefixProvider* dpp,
+                            rgw::sal::RadosStore* store,
+                            const rgw::sal::Bucket* log_bucket,
+                            const std::string& prefix,
+                            const std::string& obj_name,
+                            const std::string& tail_obj_name,
+                            const rgw_pool& temp_data_pool,
+                            optional_yield y) {
+
+  auto& ioctx_logging = store->getRados()->get_logging_pool_ctx();
+  std::string target_obj_name = get_commit_list_name (log_bucket, prefix);
+  {
+    librados::ObjectWriteOperation op;
+    op.create(false);
+
+    bufferlist bl;
+    bl.append(tail_obj_name);
+    std::map<std::string, bufferlist> new_commit_list{{obj_name, bl}};
+    op.omap_set(new_commit_list);
+
+    bufferlist pool_bl;
+    encode(temp_data_pool, pool_bl);
+    op.setxattr(TEMP_POOL_ATTR.c_str(), pool_bl);
+
+    auto ret = rgw_rados_operate(dpp, ioctx_logging, target_obj_name, std::move(op), y);
+    if (ret < 0){
+      ldpp_dout(dpp, 1) << "ERROR: failed to add logging object entry " << obj_name
+                        << " to commit list object:" << target_obj_name
+                        << ". ret = " << ret << dendl;
+      return ret;
+    }
+  }
+
+  bufferlist empty_bl;
+  std::map<std::string, bufferlist> new_commit_list{{target_obj_name, empty_bl}};
+
+  librados::ObjectWriteOperation op;
+  op.create(false);
+  op.omap_set(new_commit_list);
+
+  auto ret = rgw_rados_operate(dpp, ioctx_logging, COMMIT_LIST_OBJECT_NAME,
+                               std::move(op), y);
+  if (ret < 0) {
+    ldpp_dout(dpp, 1) << "ERROR: failed to add entry " << target_obj_name
+                      << " to global bucket logging list object: "
+                      << COMMIT_LIST_OBJECT_NAME << ". ret = " << ret << dendl;
+    return ret;
+  }
+  ldpp_dout(dpp, 20) << "INFO: added entry " << target_obj_name
+                     << " to global bucket logging object: "
+                     << COMMIT_LIST_OBJECT_NAME << dendl;
+  return 0;
+}
+
+int list_pending_commit_objects(const DoutPrefixProvider* dpp,
+                                rgw::sal::RadosStore* store,
+                                const rgw::sal::Bucket* log_bucket,
+                                const std::string& prefix,
+                                std::set<std::string>& entries,
+                                optional_yield y) {
+
+  auto& ioctx_logging = store->getRados()->get_logging_pool_ctx();
+  std::string target_obj_name = get_commit_list_name (log_bucket, prefix);
+
+  entries.clear();
+
+  constexpr auto max_chunk = 1024U;
+  std::string start_after;
+  bool more = true;
+  int rval;
+  while (more) {
+    librados::ObjectReadOperation op;
+    std::set <std::string> entries_chunk;
+    op.omap_get_keys2(start_after, max_chunk, &entries_chunk, &more, &rval);
+    const auto ret = rgw_rados_operate(dpp, ioctx_logging, target_obj_name,
+                                       std::move(op), nullptr, y);
+    if (ret == -ENOENT) {
+      // log commit list object was not created - nothing to do
+      return 0;
+    }
+    if (ret < 0) {
+      // TODO: do we need to check on rval as well as ret?
+      ldpp_dout(dpp, 1) << "ERROR: failed to read logging commit list "
+                        <<  target_obj_name << " ret: " << ret << dendl;
+      return ret;
+    }
+    entries.merge(entries_chunk);
+    if (more) {
+      start_after = *entries.rbegin();
+    }
+  }
+  return 0;
+}
+
+class BucketLoggingManager : public DoutPrefixProvider {
+  using Executor = boost::asio::io_context::executor_type;
+  bool m_shutdown = false;
+  static constexpr auto m_log_commits_update_period = std::chrono::milliseconds(10000); // 10s
+  static constexpr auto m_log_commits_update_retry = std::chrono::milliseconds(1000); // 1s
+  static constexpr auto m_log_commits_idle_sleep = std::chrono::milliseconds(5000); // 5s
+  const utime_t m_failover_time = utime_t(m_log_commits_update_period*9); // 90s
+  CephContext* const m_cct;
+  static constexpr auto COOKIE_LEN = 16;
+  const std::string m_lock_cookie;
+  const std::string m_lock_name = "bl_mgr_lock";
+  boost::asio::io_context m_io_context;
+  boost::asio::executor_work_guard<Executor> m_work_guard;
+  std::thread m_worker;
+  const SiteConfig& m_site;
+  rgw::sal::RadosStore* const m_rados_store;
+
+private:
+
+  CephContext *get_cct() const override { return m_cct; }
+
+  unsigned get_subsys() const override { return dout_subsys; }
+
+  std::ostream& gen_prefix(std::ostream& out) const override {
+    return out << "rgw bucket_logging: ";
+  }
+
+  int parse_list_name(std::string list_obj_name, std::string &tenant_name,
+                      std::string &bucket_name, std::string &bucket_id,
+                      std::string &prefix) {
+    // <tenant_name>/<bucket_name>/<bucket_id>/<prefix>
+    size_t pstart = 0, pend = 0;
+    pend =list_obj_name.find('/');
+    if (pend == std::string::npos) {
+      return -EINVAL;
+    }
+    tenant_name = list_obj_name.substr(pstart, pend);
+
+    pstart = pend + 1;
+    pend =list_obj_name.find('/', pstart);
+    if (pend == std::string::npos) {
+      return -EINVAL;
+    }
+    bucket_name = list_obj_name.substr(pstart, pend - pstart);
+    if (bucket_name.empty()) {
+    return -EINVAL;
+    }
+
+    pstart = pend + 1;
+    pend =list_obj_name.find('/', pstart);
+    if (pend == std::string::npos) {
+      return -EINVAL;
+    }
+    bucket_id = list_obj_name.substr(pstart, pend - pstart);
+    if (bucket_id.empty()) {
+      return -EINVAL;
+    }
+    prefix = list_obj_name.substr(pend + 1);
+
+  /*
+    //TODO: Fix this
+    std::regex pattern(R"(([^/]+)/([^/]+)/([^/]+)/(.*))");
+    std::smatch matches;
+
+    if (std::regex_match(list_obj_name, matches, pattern)) {
+        tenant_name = matches[1].str();
+        bucket_name = matches[2].str();
+        bucket_id = matches[3].str();
+        prefix = matches[4].str();
+    } else {
+        return -EINVAL;
+    }
+*/
+    return 0;
+  }
+
+  // read the list of commit lists from the global list object
+  int read_global_logging_list(commit_list_t& commits, optional_yield y) {
+    constexpr auto max_chunk = 1024U;
+    std::string start_after;
+    bool more = true;
+    int rval;
+    while (more) {
+      librados::ObjectReadOperation op;
+      commit_list_t commits_chunk;
+      op.omap_get_keys2(start_after, max_chunk, &commits_chunk, &more, &rval);
+      const auto ret = rgw_rados_operate(this, m_rados_store->getRados()->get_logging_pool_ctx(),
+                                         COMMIT_LIST_OBJECT_NAME, std::move(op),
+                                         nullptr, y);
+      if (ret == -ENOENT) {
+        // global commit list object was not created - nothing to do
+        return 0;
+      }
+      if (ret < 0) {
+        // TODO: do we need to check on rval as well as ret?
+        ldpp_dout(this, 5) << "ERROR: failed to read bucket logging global commit list. error: "
+                           << ret << dendl;
+        return ret;
+      }
+      commits.merge(commits_chunk);
+      if (more) {
+        start_after = *commits.rbegin();
+      }
+    }
+    return 0;
+  }
+
+  class tokens_waiter {
+    size_t pending_tokens = 0;
+    DoutPrefixProvider* const dpp;
+    ceph::async::yield_waiter<void> waiter;
+
+  public:
+    class token{
+      tokens_waiter* tw;
+    public:
+      token(const token& other) = delete;
+      token(token&& other) : tw(other.tw) {
+        other.tw = nullptr; // mark as moved
+      }
+      token& operator=(const token& other) = delete;
+      token(tokens_waiter* _tw) : tw(_tw) {
+        ++tw->pending_tokens;
+      }
+
+      ~token() {
+        if (!tw) {
+          return; // already moved
+        }
+        --tw->pending_tokens;
+        if (tw->pending_tokens == 0 && tw->waiter) {
+          tw->waiter.complete(boost::system::error_code{});
+        }
+      }
+    };
+
+    tokens_waiter(DoutPrefixProvider* _dpp) : dpp(_dpp) {}
+    tokens_waiter(const tokens_waiter& other) = delete;
+    tokens_waiter& operator=(const tokens_waiter& other) = delete;
+
+    void async_wait(boost::asio::yield_context yield) {
+      if (pending_tokens == 0) {
+        return;
+      }
+      ldpp_dout(dpp, 20) << "INFO: tokens waiter is waiting on "
+                         << pending_tokens << " tokens" << dendl;
+      boost::system::error_code ec;
+      waiter.async_wait(yield[ec]);
+      ldpp_dout(dpp, 20) << "INFO: tokens waiter finished waiting for all tokens" << dendl;
+    }
+  };
+
+  // processing of a specific entry
+  // return whether processing was successful (true) or not (false)
+  int process_entry(
+      const ConfigProxy& conf,
+      const std::string& entry,
+      const std::string& temp_obj_name,
+      const std::string& tenant_name,
+      const std::string& bucket_name,
+      const std::string& bucket_id,
+      const std::string& prefix,
+      const rgw_pool& obj_pool,
+      bool& bucket_deleted,
+      boost::asio::yield_context yield) {
+
+    std::unique_ptr<rgw::sal::Bucket> log_bucket;
+    rgw_bucket bucket = rgw_bucket{tenant_name, bucket_name, bucket_id};
+    int ret = m_rados_store->load_bucket(this, bucket, &log_bucket, yield);
+    if (ret < 0 && ret != -ENOENT) {
+      ldpp_dout(this, 1) << "ERROR: failed to load bucket : "
+                         << bucket_name << " id : " << bucket_id
+                         << ", error: " << ret << dendl;
+      return ret;
+    }
+    if (ret == 0) {
+      ldpp_dout(this, 20) << "INFO: loaded bucket : " << bucket_name
+                          << ", id: "<< bucket_id << dendl;
+      ret = log_bucket->commit_logging_object(entry, yield, this, prefix,
+                                              nullptr, false);
+      if (ret < 0) {
+        ldpp_dout(this, 1) << "ERROR: failed to commit logging object : "
+                            << entry << " , error: " << ret << dendl;
+        return ret;
+      }
+      ldpp_dout(this, 20) << "INFO: committed logging object : " << entry
+                          << " to bucket " << bucket_name
+                          << ", id: "<< bucket_id << dendl;
+      return 0;
+    }
+    // The log bucket has been deleted. Clean up pending objects.
+    // This is done because the bucket_deletion_cleanup only removes
+    // the current temp log objects.
+    bucket_deleted = true;
+    ldpp_dout(this, 20) << "INFO: log bucket : " << bucket_name
+                        << " no longer exists. Removing pending logging object "
+                        << temp_obj_name << " from pool " << obj_pool
+                        << dendl;
+    ret = rgw_delete_system_obj(this, m_rados_store->svc()->sysobj, obj_pool,
+                                temp_obj_name, nullptr, yield);
+    if (ret < 0 && ret != -ENOENT) {
+      ldpp_dout(this, 1) << "ERROR: failed to delete pending logging object : "
+                         << temp_obj_name << " in pool " << obj_pool
+                         << " for deleted log bucket : " << bucket_name
+                         << " . ret = "<< ret << dendl;
+      return ret;
+    }
+    ldpp_dout(this, 20) << "INFO: Deleted pending logging object "
+                        << temp_obj_name << " from pool " << obj_pool
+                        << " for deleted log bucket : " << bucket_name
+                        << dendl;
+    return 0;
+  }
+
+  void async_sleep(boost::asio::yield_context yield, const std::chrono::milliseconds& duration) {
+    using Clock = ceph::coarse_mono_clock;
+    using Timer = boost::asio::basic_waitable_timer<Clock,
+        boost::asio::wait_traits<Clock>, Executor>;
+    Timer timer(m_io_context);
+    timer.expires_after(duration);
+    boost::system::error_code ec;
+    timer.async_wait(yield[ec]);
+    if (ec) {
+      ldpp_dout(this, 10) << "ERROR: async_sleep failed with error: "
+                         << ec.message() << dendl;
+    }
+  }
+
+  // unlock (lose ownership) list object
+  int unlock_list_object(librados::IoCtx& rados_ioctx,
+                         const std::string& list_obj_name,
+                         boost::asio::yield_context yield) {
+
+    librados::ObjectWriteOperation op;
+    op.assert_exists();
+    rados::cls::lock::unlock(&op, m_lock_name, m_lock_cookie);
+    const auto ret = rgw_rados_operate(this, rados_ioctx, list_obj_name,
+                                       std::move(op), yield);
+    if (ret == -ENOENT) {
+      ldpp_dout(this, 20) << "INFO: log commit list: " << list_obj_name
+                          << " was removed. Nothing to unlock." << dendl;
+      return 0;
+    }
+    if (ret == -EBUSY) {
+      ldpp_dout(this, 20) << "INFO: log commit list: " << list_obj_name
+                          << " already owned by another RGW. No need to unlock."
+                          << dendl;
+      return 0;
+    }
+    return ret;
+  }
+
+  int remove_commit_list(librados::IoCtx& rados_ioctx,
+                         const std::string& commit_list_name,
+                         optional_yield y) {
+    {
+      librados::ObjectWriteOperation op;
+      rados::cls::lock::assert_locked(&op, m_lock_name,
+                                      ClsLockType::EXCLUSIVE,
+                                      m_lock_cookie,
+                                      "" /*no tag*/);
+      op.remove();
+      auto ret = rgw_rados_operate(this, rados_ioctx, commit_list_name, std::move(op), y);
+      if (ret < 0 && ret != -ENOENT) {
+        ldpp_dout(this, 1) << "ERROR: failed to remove bucket logging commit list :"
+                           << commit_list_name << ". ret = " << ret << dendl;
+        return ret;
+      }
+    }
+
+    librados::ObjectWriteOperation op;
+    op.omap_rm_keys({commit_list_name});
+    auto ret = rgw_rados_operate(this, rados_ioctx, COMMIT_LIST_OBJECT_NAME,
+                                 std::move(op), y);
+    if (ret < 0) {
+      ldpp_dout(this, 1) << "ERROR: failed to remove entry " << commit_list_name
+                         << " from the global bucket logging list : "
+                         << COMMIT_LIST_OBJECT_NAME << ". ret = " << ret << dendl;
+      return ret;
+    }
+    ldpp_dout(this, 20) << "INFO: removed entry " << commit_list_name
+                        << " from the global bucket logging list : "
+                        << COMMIT_LIST_OBJECT_NAME << dendl;
+
+    return 0;
+  }
+
+  // processing of a specific target list
+  void process_commit_list(librados::IoCtx& rados_ioctx,
+                           const std::string& list_obj_name,
+                           boost::asio::yield_context yield) {
+    auto is_idle = false;
+    auto bucket_deleted = false;
+    auto skip_bucket_deletion = false;
+    const std::string start_marker;
+
+    std::string tenant_name, bucket_name, bucket_id, prefix;
+    int ret = parse_list_name(list_obj_name, tenant_name, bucket_name,
+                              bucket_id, prefix);
+    if (ret) {
+      ldpp_dout(this, 1) << "ERROR: commit list name: " << list_obj_name
+                          << " is invalid. Processing will stop." << dendl;
+      return;
+    }
+    ldpp_dout(this, 20) << "INFO: parse commit list name: " << list_obj_name
+                        << ", bucket_name: " << bucket_name
+                        << ", bucket_id: " << bucket_id
+                        << ", tenant_name: " << tenant_name
+                        << ", prefix: " << prefix << dendl;
+
+    while (!m_shutdown) {
+      // if the list was empty the last time, sleep for idle timeout
+      if (is_idle) {
+        async_sleep(yield, m_log_commits_idle_sleep);
+      }
+
+      // Get the entries in the list after locking it
+      is_idle = true;
+      skip_bucket_deletion = false;
+      std::map<std::string, bufferlist> entries;
+      rgw_pool temp_data_pool;
+      auto total_entries = 0U;
+      {
+        librados::ObjectReadOperation op;
+        op.assert_exists();
+        bufferlist obl;
+        int rval;
+        bufferlist pool_obl;
+        int pool_rval;
+        constexpr auto max_chunk = 1024U;
+        std::string start_after;
+        bool more = true;
+        rados::cls::lock::assert_locked(&op, m_lock_name,
+                                        ClsLockType::EXCLUSIVE,
+                                        m_lock_cookie,
+                                        "" /*no tag*/);
+        op.omap_get_vals2(start_after, max_chunk, &entries, &more, &rval);
+        op.getxattr(TEMP_POOL_ATTR.c_str(), &pool_obl, &pool_rval);
+        // check ownership and list entries in one batch
+        auto ret = rgw_rados_operate(this, rados_ioctx, list_obj_name,
+                                     std::move(op), nullptr, yield);
+        if (ret == -ENOENT) {
+          // list object was deleted
+          ldpp_dout(this, 20) << "INFO: object: " << list_obj_name
+                              << " was removed. Processing will stop" << dendl;
+          return;
+        }
+        if (ret == -EBUSY) {
+          ldpp_dout(this, 10) << "WARNING: commit list : " << list_obj_name
+                              << " ownership moved to another daemon. Processing will stop"
+                              << dendl;
+          return;
+        }
+        if (ret < 0) {
+          ldpp_dout(this, 10) << "WARNING: failed to get list of entries in "
+                              << "and/or lock object: " << list_obj_name
+                              << ". error: " << ret << " (will retry)" << dendl;
+          continue;
+        }
+        if(pool_rval < 0) {
+          ldpp_dout(this, 10) << "WARNING: failed to get pool information"
+                              << "from commit list: " << list_obj_name << dendl;
+          return;
+        }
+        // Get temp logging object pool information - this is required for
+        // deleting the objects when the log bucket is deleted.
+        try {
+          auto p = pool_obl.cbegin();
+          decode(temp_data_pool, p);
+        } catch (buffer::error& err) {
+          ldpp_dout(this, 1) << "ERROR:  failed to get pool information from object: "
+                             << list_obj_name << dendl;
+          return;
+        }
+      }
+      total_entries = entries.size();
+      if (total_entries == 0) {
+        // nothing in the list object
+        // Delete the list object if the bucket has been deleted
+        std::unique_ptr<rgw::sal::Bucket> log_bucket;
+        rgw_bucket bucket = rgw_bucket{tenant_name, bucket_name, bucket_id};
+        int ret = m_rados_store->load_bucket(this, bucket, &log_bucket, yield);
+        if (ret == -ENOENT) {
+          ldpp_dout(this, 20) << "ERROR: failed to load bucket : "
+                              << bucket_name << ", id : " << bucket_id
+                              << ", error: " << ret << dendl;
+          bucket_deleted = true;
+        }
+      } else {
+      // log when not idle
+        ldpp_dout(this, 20) << "INFO: found: " << total_entries
+                            << " entries in commit list: " << list_obj_name
+                            << dendl;
+      }
+      auto stop_processing = false;
+      std::set<std::string> remove_entries;
+      tokens_waiter tw(this);
+      for (auto const& [key, val] : entries) {
+        if (stop_processing) {
+          break;
+        }
+        std::string temp_obj_name = val.to_str();
+
+        ldpp_dout(this, 20) << "INFO: processing entry: " << key
+                            << ", value : " << temp_obj_name << dendl;
+
+        tokens_waiter::token token(&tw);
+        boost::asio::spawn(yield, std::allocator_arg, make_stack_allocator(),
+          [this, &is_idle, &list_obj_name, &remove_entries, &stop_processing,
+           &tenant_name, &bucket_name, &bucket_id, &prefix, token = std::move(token),
+           key, temp_obj_name, temp_data_pool, &bucket_deleted , &skip_bucket_deletion]
+           (boost::asio::yield_context yield) {
+            auto result =
+                process_entry(this->get_cct()->_conf, key, temp_obj_name, tenant_name,
+                              bucket_name, bucket_id, prefix, temp_data_pool,
+                              bucket_deleted, yield);
+            if (result == 0) {
+              ldpp_dout(this, 20) << "INFO: processing of entry: " << key
+                                  << " from: " << list_obj_name
+                                  << " was successful." << dendl;
+              remove_entries.insert(key);
+              is_idle = false;
+              return;
+            } else {
+              is_idle = true;
+              // Don't delete the bucket if we couldn't process the entry
+              // and the bucket has been deleted
+              skip_bucket_deletion = true;
+              ldpp_dout(this, 20) << "INFO: failed to process entry: " << key
+                                  << " from: " << list_obj_name << dendl;
+            }
+            stop_processing = true;
+        }, [] (std::exception_ptr eptr) {
+          if (eptr) std::rethrow_exception(eptr);
+        });
+      }
+
+      if (!entries.empty()) {
+        // wait for all pending work to finish
+        tw.async_wait(yield);
+      }
+
+      if (bucket_deleted && !skip_bucket_deletion) {
+        ret = remove_commit_list(rados_ioctx, list_obj_name, yield);
+        if (ret < 0) {
+          ldpp_dout(this, 1) << "ERROR: failed to remove bucket logging commit list :"
+                              << list_obj_name << ". ret = " << ret << dendl;
+          return;
+        }
+        ldpp_dout(this, 10) << "INFO: bucket :" << bucket_name
+                            << ". was removed. processing will stop" << dendl;
+        return;
+      }
+      // delete all processed entries from list
+      if (!remove_entries.empty()) {
+        // Delete the entries even if the lock no longer belongs
+        // to this instance as we have processed the entries.
+        librados::ObjectWriteOperation op;
+        op.omap_rm_keys(remove_entries);
+        auto ret = rgw_rados_operate(this, rados_ioctx, list_obj_name, std::move(op), yield);
+        if (ret == -ENOENT) {
+          // list was deleted
+          ldpp_dout(this, 10) << "INFO: commit list " << list_obj_name
+                              << ". was removed. processing will stop" << dendl;
+          return;
+        }
+        if (ret < 0) {
+          ldpp_dout(this, 1) << "ERROR: failed to remove entries from commit list: "
+                             << list_obj_name << ". error: " << ret
+                             << ". This could cause some log records to be lost."
+                             << dendl;
+          return;
+        } else {
+          ldpp_dout(this, 20) << "INFO: removed entries from commit list: "
+                              << list_obj_name << dendl;
+        }
+      }
+    }
+    ldpp_dout(this, 5) << "INFO: BucketLogging manager stopped processing : "
+                       << list_obj_name << dendl;
+  }
+
+  // process all commit log objects
+  // find which of the commit log objects is owned by this daemon and process it
+  void process_global_logging_list(boost::asio::yield_context yield) {
+    auto has_error = false;
+    std::unordered_set<std::string> owned_commit_logs;
+    std::atomic<size_t> processed_list_count = 0;
+
+    std::vector<std::string> commit_list_gc;
+    std::mutex commit_list_gc_lock;
+    auto& rados_ioctx = m_rados_store->getRados()->get_logging_pool_ctx();
+    auto next_check_time = ceph::coarse_real_clock::zero();
+    while (!m_shutdown) {
+      // check if log commit list needs to be refreshed
+      if (ceph::coarse_real_clock::now() > next_check_time) {
+        next_check_time = ceph::coarse_real_clock::now() + m_log_commits_update_period;
+        const auto tp = ceph::coarse_real_time::clock::to_time_t(next_check_time);
+        ldpp_dout(this, 20) << "INFO: processing global logging commit list. next "
+                            << "processing will happen at: " << std::ctime(&tp)
+                            << dendl;
+      } else {
+        // short sleep duration to prevent busy wait when refreshing list
+        // or retrying after error
+        ldpp_dout(this, 20) << "INFO: processing global logging commit list. Sleep now." << dendl;
+        async_sleep(yield, m_log_commits_update_retry);
+        if (!has_error) {
+          // in case of error we will retry
+          continue;
+        }
+      }
+
+      std::set<std::string> commit_lists;
+      auto ret = read_global_logging_list(commit_lists, yield);
+      if (ret < 0) {
+        has_error = true;
+        ldpp_dout(this, 1) << "ERROR: failed to read global logging list. Retry. ret:"
+                           << ret << dendl;
+        continue;
+      }
+
+      for (const auto& list_obj_name : commit_lists) {
+        // try to lock the object to check if it is owned by this rgw
+        // or if ownership needs to be taken
+        ldpp_dout(this, 20) << "INFO: processing commit list: " << list_obj_name
+                            << dendl;
+        librados::ObjectWriteOperation op;
+        op.assert_exists();
+        rados::cls::lock::lock(&op, m_lock_name, ClsLockType::EXCLUSIVE,
+                               m_lock_cookie, "" /*no tag*/,
+                               "" /*no description*/, m_failover_time,
+                               LOCK_FLAG_MAY_RENEW);
+
+        ret = rgw_rados_operate(this, rados_ioctx, list_obj_name, std::move(op), yield);
+        if (ret == -EBUSY) {
+          // lock is already taken by another RGW
+          ldpp_dout(this, 20) << "INFO: commit list: " << list_obj_name
+                              << " owned (locked) by another daemon" << dendl;
+          // if the list was owned by this RGW, processing should be stopped, would be deleted from list afterwards
+          continue;
+        }
+        if (ret == -ENOENT) {
+          // commit list is deleted - processing will stop the next time we try to read from it
+          ldpp_dout(this, 20) << "INFO: commit list: " << list_obj_name
+                              << " should not be locked - already deleted" << dendl;
+          continue;
+        }
+        if (ret < 0) {
+          // failed to lock for another reason, continue to process other lists
+          ldpp_dout(this, 10) << "ERROR: failed to lock commit list: "
+                              << list_obj_name << ". error: " << ret << dendl;
+          has_error = true;
+          continue;
+        }
+        // add the commit list to the list of owned commit_logs
+        if (owned_commit_logs.insert(list_obj_name).second) {
+          ldpp_dout(this, 20) << "INFO: " << list_obj_name << " now owned (locked) by this daemon" << dendl;
+          // start processing this list
+          boost::asio::spawn(make_strand(m_io_context), std::allocator_arg, make_stack_allocator(),
+                             [this, &commit_list_gc, &commit_list_gc_lock, &rados_ioctx,
+                             list_obj_name, &processed_list_count](boost::asio::yield_context yield) {
+            ++processed_list_count;
+            process_commit_list(rados_ioctx, list_obj_name, yield);
+            // if list processing ended, it means that the list object was removed or not owned anymore
+            const auto ret = unlock_list_object(rados_ioctx, list_obj_name, yield);
+            if (ret < 0) {
+              ldpp_dout(this, 15) << "WARNING: failed to unlock commit list: "
+                                  << list_obj_name << " with error: " << ret
+                                  << " (ownership would still move if not renewed)"
+                                  << dendl;
+            } else {
+              ldpp_dout(this, 20) << "INFO: commit list: " << list_obj_name
+                                  << " not locked (ownership can move)" << dendl;
+            }
+            // mark it for deletion
+            std::lock_guard lock_guard(commit_list_gc_lock);
+            commit_list_gc.push_back(list_obj_name);
+            --processed_list_count;
+            ldpp_dout(this, 20) << "INFO: " << list_obj_name << " marked for removal" << dendl;
+          }, [this, list_obj_name] (std::exception_ptr eptr) {
+            ldpp_dout(this, 10) << "ERROR: " << list_obj_name << " processing failed" << dendl;
+            if (eptr) std::rethrow_exception(eptr);
+          });
+        } else {
+          ldpp_dout(this, 20) << "INFO: " << list_obj_name << " ownership (lock) renewed" << dendl;
+        }
+      }
+      // erase all list objects that were deleted
+      {
+        std::lock_guard lock_guard(commit_list_gc_lock);
+        std::for_each(commit_list_gc.begin(), commit_list_gc.end(), [this, &owned_commit_logs](const std::string& list_obj_name) {
+          owned_commit_logs.erase(list_obj_name);
+          ldpp_dout(this, 20) << "INFO: commit list object: " << list_obj_name << " was removed" << dendl;
+        });
+        commit_list_gc.clear();
+      }
+    }
+    while (processed_list_count > 0) {
+      ldpp_dout(this, 20) << "INFO: BucketLogging manager stopped. "
+                          << processed_list_count
+                          << " commit lists are still being processed" << dendl;
+      async_sleep(yield, m_log_commits_update_retry);
+    }
+    ldpp_dout(this, 5) << "INFO: BucketLogging manager stopped. Done processing all commit lists" << dendl;
+  }
+
+public:
+
+  ~BucketLoggingManager() {
+  }
+
+  void stop() {
+    ldpp_dout(this, 5) << "INFO: BucketLogging manager received stop signal. shutting down..." << dendl;
+    m_shutdown = true;
+    m_work_guard.reset();
+    if (m_worker.joinable()) {
+      // try graceful shutdown first
+      auto future = std::async(std::launch::async, [this]() {m_worker.join();});
+      if (future.wait_for(m_log_commits_update_retry*2) == std::future_status::timeout) {
+        // force stop if graceful shutdown takes too long
+        if (!m_io_context.stopped()) {
+          ldpp_dout(this, 5) << "INFO: force shutdown of BucketLogging Manager" << dendl;
+          m_io_context.stop();
+        }
+        m_worker.join();
+      }
+    }
+    ldpp_dout(this, 5) << "INFO: BucketLogging manager shutdown complete" << dendl;
+  }
+
+  void init() {
+    boost::asio::spawn(make_strand(m_io_context), std::allocator_arg, make_stack_allocator(),
+        [this](boost::asio::yield_context yield) {
+          process_global_logging_list(yield);
+        }, [] (std::exception_ptr eptr) {
+          if (eptr) std::rethrow_exception(eptr);
+        });
+    // start the worker threads to do the actual list processing
+    m_worker = std::thread([this]() {
+      ceph_pthread_setname("bucket-logging-worker");
+      try {
+        ldpp_dout(this, 20) << "INFO: bucket logging worker started" << dendl;
+        m_io_context.run();
+        ldpp_dout(this, 20) << "INFO: bucket logging worker ended" << dendl;
+      } catch (const std::exception& err) {
+        ldpp_dout(this, 10) << "ERROR: bucket logging worker failed with error: "
+                           << err.what() << dendl;
+        throw err;
+      }
+    });
+    ldpp_dout(this, 10) << "INFO: started bucket logging manager" << dendl;
+  }
+
+  BucketLoggingManager(CephContext* _cct, rgw::sal::RadosStore* store, const SiteConfig& site) :
+    m_cct(_cct),
+    m_lock_cookie(gen_rand_alphanumeric(m_cct, COOKIE_LEN)),
+    m_work_guard(boost::asio::make_work_guard(m_io_context)),
+    m_site(site),
+    m_rados_store(store)
+    {
+      ldpp_dout(this, 5) << "INFO: BucketLoggingManager() constructor" << dendl;
+    }
+};
+
+std::unique_ptr<BucketLoggingManager> s_manager;
+
+bool init(const DoutPrefixProvider* dpp, rgw::sal::RadosStore* store,
+          const SiteConfig& site) {
+  if (s_manager) {
+    ldpp_dout(dpp, 1) << "ERROR: failed to init BucketLogging manager: already exists" << dendl;
+    return false;
+  }
+  // TODO: take conf from CephContext
+  ldpp_dout(dpp, 1) << "INFO: initialising BucketLogging manager" << dendl;
+  s_manager = std::make_unique<BucketLoggingManager>(dpp->get_cct(), store, site);
+  s_manager->init();
+  return true;
+}
+
+void shutdown() {
+  if (!s_manager) return;
+  s_manager->stop();
+  s_manager.reset();
+}
+
+} // namespace rgw::bucket_logging
diff --git a/src/rgw/driver/rados/rgw_bl_rados.h b/src/rgw/driver/rados/rgw_bl_rados.h
new file mode 100644 (file)
index 0000000..1075adb
--- /dev/null
@@ -0,0 +1,41 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#pragma once
+
+#include <string>
+
+// forward declarations
+namespace rgw { class SiteConfig; }
+namespace rgw::sal {
+  class RadosStore;
+}
+class DoutPrefixProvider;
+
+namespace rgw::bucketlogging {
+
+  // initialize the bucket logging commit manager
+  bool init(const DoutPrefixProvider* dpp, rgw::sal::RadosStore* store,
+            const rgw::SiteConfig& site);
+
+  // shutdown the bucket logging commit manager
+  void shutdown();
+
+  int add_commit_target_entry(const DoutPrefixProvider* dpp,
+                              rgw::sal::RadosStore* store,
+                              const rgw::sal::Bucket* log_bucket,
+                              const std::string& prefix,
+                              const std::string& obj_name,
+                              const std::string& tail_obj_name,
+                              const rgw_pool& temp_data_pool,
+                              optional_yield y);
+
+int list_pending_commit_objects(const DoutPrefixProvider* dpp,
+                                rgw::sal::RadosStore* store,
+                                const rgw::sal::Bucket* log_bucket,
+                                const std::string& prefix,
+                                std::set<std::string>& entries,
+                                optional_yield y);
+
+}
+
index 6d7df6163a96d1a2037ab22391c73e002193a25f..bf103295a3b3392867d27e3028e0b465e36915c3 100644 (file)
@@ -57,6 +57,7 @@
 #include "rgw_etag_verifier.h"
 #include "rgw_worker.h"
 #include "rgw_notify.h"
+#include "rgw_bl_rados.h"
 #include "rgw_http_errors.h"
 
 #undef fork // fails to compile RGWPeriod::fork() below
@@ -1144,6 +1145,10 @@ void RGWRados::finalize()
     v1_topic_migration.stop();
   }
 
+  if (run_bucket_logging_thread) {
+    rgw::bucketlogging::shutdown();
+  }
+
   if (use_restore_thread) {
     restore->stop_processor();
   }
@@ -1271,6 +1276,10 @@ int RGWRados::init_complete(const DoutPrefixProvider *dpp, optional_yield y)
   if (ret < 0)
     return ret;
 
+  ret = open_logging_pool_ctx(dpp);
+  if (ret < 0)
+    return ret;
+
   pools_initialized = true;
 
   if (use_gc) {
@@ -1423,6 +1432,12 @@ int RGWRados::init_complete(const DoutPrefixProvider *dpp, optional_yield y)
 
   index_completion_manager = new RGWIndexCompletionManager(this);
 
+  if (run_bucket_logging_thread) {
+    if (!rgw::bucketlogging::init(dpp, this->driver, *svc.site)) {
+      ldpp_dout(dpp, 0) << "ERROR: failed to initialize bucket logging manager" << dendl;
+    }
+      ldpp_dout(dpp, 0) << "INFO: initialized bucket logging manager" << dendl;
+  }
   if (run_notification_thread) {
     if (!rgw::notify::init(dpp, driver, *svc.site)) {
       ldpp_dout(dpp, 0) << "ERROR: failed to initialize notification manager" << dendl;
@@ -1443,7 +1458,6 @@ int RGWRados::init_complete(const DoutPrefixProvider *dpp, optional_yield y)
       v1_topic_migration.start(1);
     }
   }
-
   return ret;
 }
 
@@ -1547,6 +1561,11 @@ int RGWRados::open_notif_pool_ctx(const DoutPrefixProvider *dpp)
   return rgw_init_ioctx(dpp, get_rados_handle(), svc.zone->get_zone_params().notif_pool, notif_pool_ctx, true, true);
 }
 
+int RGWRados::open_logging_pool_ctx(const DoutPrefixProvider *dpp)
+{
+  return rgw_init_ioctx(dpp, get_rados_handle(), svc.zone->get_zone_params().bucket_logging_pool, logging_pool_ctx, true, true);
+}
+
 int RGWRados::open_pool_ctx(const DoutPrefixProvider *dpp, const rgw_pool& pool, librados::IoCtx& io_ctx,
                            bool mostly_omap, bool bulk)
 {
index 79e42359b2e5e78290467673a7c230aacb5fe7b4..fd870b945eba1534e24463dd75d8674a6f3bcef7 100644 (file)
@@ -364,6 +364,7 @@ class RGWRados
   int open_objexp_pool_ctx(const DoutPrefixProvider *dpp);
   int open_reshard_pool_ctx(const DoutPrefixProvider *dpp);
   int open_notif_pool_ctx(const DoutPrefixProvider *dpp);
+  int open_logging_pool_ctx(const DoutPrefixProvider *dpp);
 
   int open_pool_ctx(const DoutPrefixProvider *dpp, const rgw_pool& pool, librados::IoCtx&  io_ctx,
                    bool mostly_omap, bool bulk);
@@ -384,6 +385,7 @@ class RGWRados
   bool run_sync_thread{false};
   bool run_reshard_thread{false};
   bool run_notification_thread{false};
+  bool run_bucket_logging_thread{false};
 
   RGWMetaNotifier* meta_notifier{nullptr};
   RGWDataNotifier* data_notifier{nullptr};
@@ -459,6 +461,7 @@ protected:
   librados::IoCtx objexp_pool_ctx;
   librados::IoCtx reshard_pool_ctx;
   librados::IoCtx notif_pool_ctx;     // .rgw.notif
+  librados::IoCtx logging_pool_ctx;     // .rgw.logging
 
   bool pools_initialized{false};
 
@@ -546,6 +549,11 @@ public:
     return *this;
   }
 
+  RGWRados& set_run_bucket_logging_thread(bool _run_bucket_logging_thread) {
+    run_bucket_logging_thread = _run_bucket_logging_thread;
+    return *this;
+  }
+
   librados::IoCtx* get_lc_pool_ctx() {
     return &lc_pool_ctx;
   }
@@ -561,6 +569,10 @@ public:
   librados::IoCtx& get_notif_pool_ctx() {
     return notif_pool_ctx;
   }
+
+  librados::IoCtx& get_logging_pool_ctx() {
+    return logging_pool_ctx;
+  }
   
   void set_context(CephContext *_cct) {
     cct = _cct;
index 5f0f962de22621f0612d203779b9c86722d814cd..994de64419f0d381985860920620c2ca7964eae0 100644 (file)
@@ -41,6 +41,7 @@
 #include "rgw_aio_throttle.h"
 #include "rgw_bucket.h"
 #include "rgw_bucket_logging.h"
+#include "rgw_bl_rados.h"
 #include "rgw_lc.h"
 #include "rgw_lc_tier.h"
 #include "rgw_lc_tier.h"
@@ -1096,6 +1097,7 @@ int RadosBucket::remove_topics(RGWObjVersionTracker* objv_tracker,
 
 
 #define RGW_ATTR_COMMITTED_LOGGING_OBJ RGW_ATTR_PREFIX "committed-logging-obj"
+#define RGW_ATTR_LOGGING_EC_POOL RGW_ATTR_PREFIX "logging-ec-pool"
 
 int get_committed_logging_object(RadosStore* store,
     const std::string& obj_name_oid,
@@ -1146,6 +1148,88 @@ int set_committed_logging_object(RadosStore* store,
   return rgw_rados_operate(dpp, io_ctx, obj_name_oid, std::move(op), y);
 }
 
+int get_logging_temp_object_pool (const DoutPrefixProvider *dpp,
+    RadosBucket* bucket,
+    RadosStore* rados_store,
+    const std::string& obj_name_oid,
+    rgw_pool& data_pool, bool& is_ec_pool, optional_yield y) {
+
+  int ret;
+  rgw_obj obj {bucket->get_key(), "dummy"};
+  if (!rados_store->getRados()->get_obj_data_pool(bucket->get_placement_rule(), obj, &data_pool)) {
+    ldpp_dout(dpp, 1) << "ERROR: failed to get data pool for bucket '" << bucket->get_key() <<
+        "' when writing logging object" << dendl;
+    return -EIO;
+  }
+
+  // Find out if the log bucket is on an EC pool
+  // Use an attribute on the logging name object instead of the bucket so
+  // it is not synced in a multi-site setup
+
+  librados::Rados* rados = rados_store->getRados()->get_rados_handle();
+  librados::IoCtx io_ctx;
+  if (ret = rgw_init_ioctx(dpp, rados, data_pool, io_ctx); ret < 0) {
+    ldpp_dout(dpp, 1) << "ERROR: get data pool ioctx for bucket '" << bucket->get_key() <<
+        "' when writing logging object. ret = " << ret << dendl;
+    return ret;
+  }
+  if (!io_ctx.is_valid()) {
+    ldpp_dout(dpp, 1) << "ERROR: invalid data pool ioctx for bucket '" << bucket->get_key() <<
+        "' when writing logging object" << dendl;
+    return ret;
+  }
+
+  is_ec_pool = false;
+  bufferlist bl;
+  librados::ObjectReadOperation op;
+  int rval;
+  op.getxattr(RGW_ATTR_LOGGING_EC_POOL, &bl, &rval);
+  ret = rgw_rados_operate(dpp, io_ctx, obj_name_oid, std::move(op), nullptr, y);
+  if (ret == 0) {
+    try {
+      ceph::decode(is_ec_pool, bl);
+    } catch (buffer::error& err) {
+      ldpp_dout(dpp, 1) << "ERROR: failed to decode " << RGW_ATTR_LOGGING_EC_POOL
+                        << " attr for bucket '" << bucket->get_key()
+                        << "'. error = " << err.what() << dendl;
+      return -EINVAL;
+    }
+  } else if (ret < 0){
+    // Try to find out if it is an EC pool
+    ret = rados->mon_command(
+      "{\"prefix\": \"osd pool get\", \"pool\": \"" +
+      data_pool.name + "\", \"var\": \"erasure_code_profile\"}",
+      {}, NULL, NULL);
+    if (ret == -EACCES) {
+      is_ec_pool = false;
+    } else if (ret == 0){
+      is_ec_pool = true;
+    } else {
+      ldpp_dout(dpp, 10) << __func__ << " ERROR: failed to find out if pool"
+                         << data_pool.name << " is erasure coded. ret = "
+                         << ret << dendl;
+      return ret;
+    }
+    bufferlist bl;
+    ceph::encode(is_ec_pool, bl);
+    librados::ObjectWriteOperation op;
+    // if object does not exist, we should not create the attribute
+    op.assert_exists();
+    op.setxattr(RGW_ATTR_LOGGING_EC_POOL, std::move(bl));
+    if (const int ret = rgw_rados_operate(dpp, io_ctx, obj_name_oid, std::move(op), y); ret < 0){
+      // Don't return an error as we know if it is an ec pool
+      ldpp_dout(dpp, 10) << "ERROR: failed to set is_ec_pool attr on :" << obj_name_oid
+                         << " ret: " << ret << dendl;
+    }
+  }
+  // If the target log bucket is on an EC pool, the temp
+  // logging object will be created in the default.rgw.log pool
+  if (is_ec_pool) {
+    data_pool = rados_store->svc()->zone->get_zone_params().bucket_logging_pool;
+  }
+  return 0;
+}
+
 int RadosBucket::get_logging_object_name(std::string& obj_name,
     const std::string& prefix,
     optional_yield y,
@@ -1242,14 +1326,14 @@ std::string to_temp_object_name(const rgw::sal::Bucket* bucket, const std::strin
       obj_name);
 }
 
-int RadosBucket::remove_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp) {
+int RadosBucket::remove_logging_object(const std::string& obj_name, const std::string& prefix, optional_yield y, const DoutPrefixProvider *dpp) {
   rgw_pool data_pool;
-  const rgw_obj head_obj{get_key(), obj_name};
-  const auto placement_rule = get_placement_rule();
-
-  if (!store->getRados()->get_obj_data_pool(placement_rule, head_obj, &data_pool)) {
-    ldpp_dout(dpp, 1) << "ERROR: failed to get data pool for bucket '" << get_key() <<
-      "' when deleting logging object << dendl;
+  const auto obj_name_oid = bucketlogging::object_name_oid(this, prefix);
+  bool is_ec_pool;
+  auto ret = get_logging_temp_object_pool (dpp, this, store, obj_name_oid, data_pool, is_ec_pool, y);
+  if (ret < 0) {
+    ldpp_dout(dpp, 1) << "ERROR: failed to get temp data pool for bucket '" << get_key() <<
+      "' when deleting logging object: " << obj_name << ". ret: " << ret << dendl;
     return -EIO;
   }
 
@@ -1265,7 +1349,7 @@ int RadosBucket::commit_logging_object(const std::string& obj_name,
     optional_yield y,
     const DoutPrefixProvider *dpp,
     const std::string& prefix,
-    std::string* last_committed) {
+    std::string* last_committed, bool async) {
   rgw_pool data_pool;
   const rgw_obj head_obj{get_key(), obj_name};
   const auto placement_rule = get_placement_rule();
@@ -1275,9 +1359,10 @@ int RadosBucket::commit_logging_object(const std::string& obj_name,
       "' when committing logging object"  << dendl;
     return -EIO;
   }
+  int ret;
   const auto obj_name_oid = bucketlogging::object_name_oid(this, prefix);
   if (last_committed) {
-    if (const int ret = get_committed_logging_object(store,
+    if (ret = get_committed_logging_object(store,
           obj_name_oid,
           data_pool,
           y,
@@ -1295,11 +1380,34 @@ int RadosBucket::commit_logging_object(const std::string& obj_name,
   }
 
   const auto temp_obj_name = to_temp_object_name(this, obj_name);
+  rgw_pool temp_data_pool;
+  bool is_ec_pool = false;
+
+  ret = get_logging_temp_object_pool (dpp, this, store, obj_name_oid, temp_data_pool, is_ec_pool, y);
+  if (ret < 0) {
+    ldpp_dout(dpp, 1) << "ERROR: failed to get data pool for bucket '"
+                      << get_key() << "' when committing logging object: "
+                      << obj_name << ". ret: " << ret << dendl;
+    return -EIO;
+  }
+  if (async) {
+    ret = rgw::bucketlogging::add_commit_target_entry(dpp, store, this, prefix,
+                                                      obj_name, temp_obj_name,
+                                                      temp_data_pool, y);
+    if (ret < 0){
+      return ret;
+    }
+
+    ldpp_dout(dpp, 20) << "INFO: added logging object entry " << obj_name
+                       << " to commit list object." << dendl;
+    return 0;
+  }
+
   std::map<string, bufferlist> obj_attrs;
   ceph::real_time mtime;
   bufferlist bl_data;
-  if (auto ret = rgw_get_system_obj(store->svc()->sysobj,
-                     data_pool,
+  if (ret = rgw_get_system_obj(store->svc()->sysobj,
+                     temp_data_pool,
                      temp_obj_name,
                      bl_data,
                      nullptr,
@@ -1315,6 +1423,36 @@ int RadosBucket::commit_logging_object(const std::string& obj_name,
     }
     mtime = ceph::real_time::clock::now();
     ldpp_dout(dpp, 20) << "INFO: temporary logging object '" << temp_obj_name << "' does not exist. committing it empty" << dendl;
+  } else if (is_ec_pool) {
+    if (ret = rgw_put_system_obj(dpp, store->svc()->sysobj,
+                             data_pool,
+                             temp_obj_name,
+                             bl_data,
+                             true,
+                             nullptr,
+                             mtime,
+                             y,
+                             &obj_attrs); ret < 0){
+
+      if (ret == -EEXIST) {
+        ldpp_dout(dpp, 5) << "WARNING: race detected in committing logging object '" << temp_obj_name << dendl;
+      } else {
+        ldpp_dout(dpp, 1) << "ERROR: failed to write logging data when committing object '" << temp_obj_name
+                          << ". error: " << ret << dendl;
+      }
+      return ret;
+    }
+    ldpp_dout(dpp, 20) << "INFO: wrote logging data when committing object '" << temp_obj_name << dendl;
+
+    if (ret = rgw_delete_system_obj(dpp, store->svc()->sysobj,
+                                temp_data_pool,
+                                temp_obj_name,
+                                nullptr,
+                                y); ret < 0 && ret != -ENOENT) {
+      ldpp_dout(dpp, 1) << "ERROR: failed to delete temp logging object when "
+                        << "committing object '" << temp_obj_name
+                        << ". error: " << ret << dendl;
+    }
   }
 
   uint64_t size = bl_data.length();
@@ -1423,22 +1561,29 @@ void bucket_logging_completion(rados_completion_t completion, void* args) {
 
 int RadosBucket::write_logging_object(const std::string& obj_name,
     const std::string& record,
+    const std::string& prefix,
     optional_yield y,
     const DoutPrefixProvider *dpp,
     bool async_completion) {
   const auto temp_obj_name = to_temp_object_name(this, obj_name);
   rgw_pool data_pool;
-  rgw_obj obj{get_key(), obj_name};
-  if (!store->getRados()->get_obj_data_pool(get_placement_rule(), obj, &data_pool)) {
-    ldpp_dout(dpp, 1) << "ERROR: failed to get data pool for bucket '" << get_key() <<
-      "' when writing logging object" << dendl;
+
+  bool is_ec_pool = false;
+  const auto obj_name_oid = bucketlogging::object_name_oid(this, prefix);
+  int ret = get_logging_temp_object_pool (dpp, this, store, obj_name_oid, data_pool, is_ec_pool, y);
+  if (ret < 0) {
+    ldpp_dout(dpp, 1) << "ERROR: failed to get data pool for bucket '"
+                      << get_key() << "' when writing logging object "
+                      << temp_obj_name << ", ret: " << ret << dendl;
     return -EIO;
   }
+
   librados::IoCtx io_ctx;
-  if (const auto ret = rgw_init_ioctx(dpp, store->getRados()->get_rados_handle(), data_pool, io_ctx); ret < 0) {
+  if (const auto ret = rgw_init_ioctx(dpp, store->getRados()->get_rados_handle(), data_pool, io_ctx, true); ret < 0) {
     ldpp_dout(dpp, 1) << "ERROR: failed to get IO context for logging object from data pool:" << data_pool.to_str() << dendl;
     return -EIO;
   }
+
   bufferlist bl;
   bl.append(record);
   bl.append("\n");
index 6b7f5a8e7ff68cf78253cf1bad384f3b57b18685..eeedd0b6513e6d35c0ffabce42fd7fd093087338 100644 (file)
@@ -803,9 +803,11 @@ class RadosBucket : public StoreBucket {
         optional_yield y,
         const DoutPrefixProvider *dpp,
         RGWObjVersionTracker* objv_tracker) override;
-    int commit_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp, const std::string& prefix, std::string* last_committed) override;
-    int remove_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp) override;
-    int write_logging_object(const std::string& obj_name, const std::string& record, optional_yield y, const DoutPrefixProvider *dpp, bool async_completion) override;
+    int commit_logging_object(const std::string& obj_name, optional_yield y,
+       const DoutPrefixProvider *dpp, const std::string& prefix,
+       std::string* last_committed, bool async) override;
+    int remove_logging_object(const std::string& obj_name, const std::string& prefix, optional_yield y, const DoutPrefixProvider *dpp) override;
+    int write_logging_object(const std::string& obj_name, const std::string& record, const std::string& prefix, optional_yield y, const DoutPrefixProvider *dpp, bool async_completion) override;
 
   private:
     int link(const DoutPrefixProvider* dpp, const rgw_owner& new_owner, optional_yield y, bool update_entrypoint = true, RGWObjVersionTracker* objv = nullptr);
index f99b813c2a9f4faa72511dbc67236d8375c63523..125c4fc39e62230d40624aac1815df55f2c8119e 100644 (file)
@@ -118,6 +118,7 @@ struct RGWZoneParams : RGWSystemMetaObj {
   rgw_pool account_pool;
   rgw_pool group_pool;
   rgw_pool dedup_pool;
+  rgw_pool bucket_logging_pool;
 
   RGWAccessKey system_key;
 
@@ -156,7 +157,7 @@ struct RGWZoneParams : RGWSystemMetaObj {
   const std::string& get_compression_type(const rgw_placement_rule& placement_rule) const;
   
   void encode(bufferlist& bl) const override {
-    ENCODE_START(17, 1, bl);
+    ENCODE_START(18, 1, bl);
     encode(domain_root, bl);
     encode(control_pool, bl);
     encode(gc_pool, bl);
@@ -187,11 +188,12 @@ struct RGWZoneParams : RGWSystemMetaObj {
     encode(group_pool, bl);
     encode(restore_pool, bl);
     encode(dedup_pool, bl);
+    encode(bucket_logging_pool, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::const_iterator& bl) override {
-    DECODE_START(17, bl);
+    DECODE_START(18, bl);
     decode(domain_root, bl);
     decode(control_pool, bl);
     decode(gc_pool, bl);
@@ -279,6 +281,11 @@ struct RGWZoneParams : RGWSystemMetaObj {
     } else {
       dedup_pool = name + ".rgw.dedup";
     }
+    if (struct_v >= 18) {
+      decode(bucket_logging_pool, bl);
+    } else {
+      bucket_logging_pool = log_pool.name + ":logging";
+    }
     DECODE_FINISH(bl);
   }
   void dump(Formatter *f) const;
index eb5c8057fdf53965147b3f86c203a9e8c586f3ed..c4cc935d844ce6db0089f526b3c93e41923b2f0b 100644 (file)
@@ -90,6 +90,7 @@ extern "C" {
 
 #include "driver/rados/rgw_bucket.h"
 #include "driver/rados/rgw_sal_rados.h"
+#include "driver/rados/rgw_bl_rados.h"
 
 #include <iomanip>
 
@@ -190,6 +191,7 @@ void usage()
   cout << "  bucket radoslist                 list rados objects backing bucket's objects\n";
   cout << "  bucket logging flush             flush pending log records object of source bucket to the log bucket\n";
   cout << "  bucket logging info              get info on bucket logging configuration on source bucket or list of sources in log bucket\n";
+  cout << "  bucket logging list              list the log objects pending commit for the source bucket\n";
   cout << "  bi get                           retrieve bucket index object entries\n";
   cout << "  bi put                           store bucket index object entries\n";
   cout << "  bi list                          list raw bucket index entries\n";
@@ -725,6 +727,7 @@ enum class OPT {
   BUCKET_RESYNC_ENCRYPTED_MULTIPART,
   BUCKET_LOGGING_FLUSH,
   BUCKET_LOGGING_INFO,
+  BUCKET_LOGGING_LIST,
   POLICY,
   LOG_LIST,
   LOG_SHOW,
@@ -974,6 +977,7 @@ static SimpleCmd::Commands all_cmds = {
   { "bucket resync encrypted multipart", OPT::BUCKET_RESYNC_ENCRYPTED_MULTIPART },
   { "bucket logging flush", OPT::BUCKET_LOGGING_FLUSH },
   { "bucket logging info", OPT::BUCKET_LOGGING_INFO },
+  { "bucket logging list", OPT::BUCKET_LOGGING_LIST },
   { "policy", OPT::POLICY },
   { "log list", OPT::LOG_LIST },
   { "log show", OPT::LOG_SHOW },
@@ -4620,6 +4624,7 @@ int main(int argc, const char **argv)
                                        false,
                                        false,
                                         false,
+                                        false,
                                        false, // No background tasks!
                                         null_yield,
                                        need_cache && g_conf()->rgw_cache_enabled,
@@ -7832,7 +7837,7 @@ int main(int argc, const char **argv)
     }
     std::string old_obj;
     const auto region = driver->get_zone()->get_zonegroup().get_api_name();
-    ret = rgw::bucketlogging::rollover_logging_object(configuration, target_bucket, obj_name, dpp(), region, bucket, null_yield, true, &objv_tracker, &old_obj);
+    ret = rgw::bucketlogging::rollover_logging_object(configuration, target_bucket, obj_name, dpp(), region, bucket, null_yield, true, &objv_tracker, false, &old_obj);
     if (ret < 0) {
       cerr << "ERROR: failed to flush pending logging object '" << obj_name << "' to target bucket '" << configuration.target_bucket
         << "'. error: " << cpp_strerror(-ret) << std::endl;
@@ -7884,6 +7889,55 @@ int main(int argc, const char **argv)
     return 0;
   }
 
+  if (opt_cmd == OPT::BUCKET_LOGGING_LIST) {
+    if (bucket_name.empty()) {
+      cerr << "ERROR: bucket not specified" << std::endl;
+      return EINVAL;
+    }
+    if (driver->get_name() != "rados") {
+      cerr << "ERROR: this command is only available with the RADOS driver." << std::endl;
+      return EINVAL;
+    }
+
+    int ret = init_bucket(tenant, bucket_name, bucket_id, &bucket);
+    if (ret < 0) {
+      return -ret;
+    }
+
+    rgw::bucketlogging::configuration configuration;
+    std::unique_ptr<rgw::sal::Bucket> target_bucket;
+    ret =  rgw::bucketlogging::get_target_and_conf_from_source(dpp(),
+         driver, bucket.get(), tenant, configuration, target_bucket, null_yield);
+    if (ret < 0 && ret != -ENODATA) {
+      cerr << "ERROR: failed to get target bucket and logging conf from source bucket '"
+        << bucket_name << "': " << cpp_strerror(-ret) << std::endl;
+      return -ret;
+    } else if (ret == -ENODATA) {
+      cerr << "ERROR: bucket '" << bucket_name << "' does not have logging enabled" << std::endl;
+      return 0;
+    }
+    std::string target_prefix = configuration.target_prefix;
+    std::set<std::string> entries;
+
+    ret = rgw::bucketlogging::list_pending_commit_objects(dpp(),
+        static_cast<rgw::sal::RadosStore*>(driver), target_bucket.get(),
+        target_prefix, entries, null_yield);
+
+    if (ret < 0) {
+      cerr << "ERROR: failed to get pending log entries for bucket '" << bucket_name
+           << "': " << cpp_strerror(-ret) << std::endl;
+      return ret;
+    }
+
+    formatter->open_array_section("pending_logs");
+    for (auto &entry: entries) {
+        formatter->dump_string("log", entry);
+    }
+    formatter->close_section(); // objs
+    formatter->flush(cout);
+    return 0;
+  }
+
   if (opt_cmd == OPT::LOG_LIST) {
     // filter by date?
     if (date.size() && date.size() != 10) {
index 07561ca55f6b0461a72d6215572fbec98b798548..f2a28a47c3e61d4bbf36c3e515a659a1b44946ec 100644 (file)
@@ -258,7 +258,9 @@ int rgw::AppMain::init_storage()
           run_quota,
           run_sync,
           g_conf().get_val<bool>("rgw_dynamic_resharding"),
-         true, true, null_yield, // run notification thread
+         true, // run notification thread
+         true, // run bucket-logging thread
+         true, null_yield,
           g_conf()->rgw_cache_enabled);
   if (!env.driver) {
     return -EIO;
index ba9da5eed39087d56595f0547d7efb7545c9506b..bf7a4ef738d6d4100e9f6daf6f93b1b1c7912075 100644 (file)
@@ -334,7 +334,7 @@ int commit_logging_object(const configuration& conf,
       target_bucket->get_key() << "'. ret = " << ret << dendl;
     return ret;
   }
-  if (const int ret = target_bucket->commit_logging_object(obj_name, y, dpp, conf.target_prefix, last_committed); ret < 0) {
+  if (const int ret = target_bucket->commit_logging_object(obj_name, y, dpp, conf.target_prefix, last_committed, false); ret < 0) {
     ldpp_dout(dpp, 1) << "ERROR: failed to commit logging object '" << obj_name << "' of logging bucket '" <<
       target_bucket->get_key() << "'. ret = " << ret << dendl;
     return ret;
@@ -351,6 +351,7 @@ int rollover_logging_object(const configuration& conf,
     optional_yield y,
     bool must_commit,
     RGWObjVersionTracker* objv_tracker,
+    bool async,
     std::string* last_committed,
     std::string* err_message) {
   std::string target_bucket_name;
@@ -393,7 +394,7 @@ int rollover_logging_object(const configuration& conf,
       return ret;
     }
   }
-  if (const int ret = target_bucket->commit_logging_object(*old_obj, y, dpp, conf.target_prefix, last_committed); ret < 0) {
+  if (const int ret = target_bucket->commit_logging_object(*old_obj, y, dpp, conf.target_prefix, last_committed, async); ret < 0) {
     if (must_commit) {
       if (err_message) {
         *err_message = fmt::format("Failed to commit logging object of logging bucket '{}'", target_bucket->get_name());
@@ -516,7 +517,7 @@ int log_record(rgw::sal::Driver* driver,
     if (ceph::coarse_real_time::clock::now() > time_to_commit) {
       ldpp_dout(dpp, 20) << "INFO: logging object '" << obj_name << "' exceeded its time, will be committed to logging bucket '" <<
         target_bucket_id << "'" << dendl;
-      if (ret = rollover_logging_object(conf, target_bucket, obj_name, dpp, region, s->bucket, y, false, &objv_tracker, nullptr, &err_message); ret < 0 && ret != -ECANCELED) {
+      if (ret = rollover_logging_object(conf, target_bucket, obj_name, dpp, region, s->bucket, y, false, &objv_tracker, true, nullptr, &err_message); ret < 0 && ret != -ECANCELED) {
         set_journal_err(err_message);
         return ret;
       }
@@ -656,6 +657,7 @@ int log_record(rgw::sal::Driver* driver,
 
   if (ret = target_bucket->write_logging_object(obj_name,
         record,
+        conf.target_prefix,
         y,
         dpp,
         async_completion); ret < 0 && ret != -EFBIG) {
@@ -667,12 +669,13 @@ int log_record(rgw::sal::Driver* driver,
   if (ret == -EFBIG) {
     ldpp_dout(dpp, 5) << "WARNING: logging object '" << obj_name << "' is full, will be committed to logging bucket '" <<
       target_bucket->get_key() << "'" << dendl;
-    if (ret = rollover_logging_object(conf, target_bucket, obj_name, dpp, region, s->bucket, y, true, &objv_tracker, nullptr, &err_message); ret < 0 && ret != -ECANCELED) {
+    if (ret = rollover_logging_object(conf, target_bucket, obj_name, dpp, region, s->bucket, y, true, &objv_tracker, true, nullptr, &err_message); ret < 0 && ret != -ECANCELED) {
       set_journal_err(err_message);
       return ret;
     }
     if (ret = target_bucket->write_logging_object(obj_name,
         record,
+        conf.target_prefix,
         y,
         dpp,
         async_completion); ret < 0) {
@@ -849,7 +852,7 @@ int bucket_deletion_cleanup(const DoutPrefixProvider* dpp,
         }
         ldpp_dout(dpp, 20) << "INFO: successfully deleted object holding bucket logging object name from deleted logging bucket '" <<
           bucket->get_key() << "'" << dendl;
-        if (const int ret = bucket->remove_logging_object(obj_name, y, dpp); ret < 0) {
+        if (const int ret = bucket->remove_logging_object(obj_name, conf.target_prefix, y, dpp); ret < 0) {
           ldpp_dout(dpp, 5) << "WARNING: failed to delete pending logging object '" << obj_name << "' for logging bucket '" <<
             bucket->get_key() << "' during cleanup. ret = " << ret << dendl;
           continue;
@@ -948,7 +951,7 @@ int source_bucket_cleanup(const DoutPrefixProvider* dpp,
   ldpp_dout(dpp, 20) << "INFO: successfully deleted object holding bucket logging object name for bucket '" <<
       bucket->get_key() << "' during source cleanup" << dendl;
   // since the object holding the name was deleted, we cannot fetch the last commited name from it
-  if (const int ret = target_bucket->commit_logging_object(obj_name, y, dpp, conf->target_prefix, nullptr); ret < 0) {
+  if (const int ret = target_bucket->commit_logging_object(obj_name, y, dpp, conf->target_prefix, nullptr, false); ret < 0) {
     ldpp_dout(dpp, 5) << "WARNING: could not commit pending logging object of bucket '" <<
       bucket->get_key() << "' during source cleanup. ret = " << ret << dendl;
     return 0;
index 9f010e873c73212b5d660386a02e2e79531a21b9..c06967f426f9d4f0088ce46919f1124a213a0a68 100644 (file)
@@ -183,6 +183,7 @@ int rollover_logging_object(const configuration& conf,
     optional_yield y,
     bool must_commit,
     RGWObjVersionTracker* objv_tracker,
+    bool async,
     std::string* last_committed,
     std::string* err_message = nullptr);
 
index f12737feffdd3ac4f5f00ff025420d35cfe4af84..9ec1ea0a967f96a40f0c3a6ba11bbe6b62320460 100644 (file)
@@ -105,7 +105,7 @@ int main(const int argc, const char **argv)
     exit(1);
   }
 
-  driver = DriverManager::get_storage(&dp, g_ceph_context, cfg, context_pool, site, false, false, false, false, false, false, false, true, null_yield);
+  driver = DriverManager::get_storage(&dp, g_ceph_context, cfg, context_pool, site, false, false, false, false, false, false, false, false, true, null_yield);
   if (!driver) {
     std::cerr << "couldn't init storage provider" << std::endl;
     return EIO;
index d63c8fe3c5dda7686211afdd3c11292eba065cba..0542faaa09932cf2fe7884b7ec0c5ffa638f4b75 100644 (file)
@@ -129,7 +129,10 @@ void RGWRealmReloader::reload()
           cct->_conf->rgw_enable_quota_threads,
           cct->_conf->rgw_run_sync_thread,
           cct->_conf.get_val<bool>("rgw_dynamic_resharding"),
-         true, true, null_yield, // run notification thread
+          true,
+          true, // run notification thread
+          true, // run bucket logging thread
+          null_yield,
           cct->_conf->rgw_cache_enabled);
     }
 
index b2584eba1647cf621c85fb65a6b36860816aa8d6..07e6e2ec73230658c1fe41cf325f5d00c9f4f02d 100644 (file)
@@ -327,6 +327,7 @@ class RGWPutBucketLoggingOp : public RGWDefaultResponseOp {
             y,
             false, // rollover should happen even if commit failed
             &objv_tracker,
+            false,
             &old_obj); ret < 0) {
         ldpp_dout(this, 1) << "WARNING: failed to flush pending logging object '" << obj_name << "'"
             << " to target bucket '" << target_bucket_id << "'. "
@@ -435,7 +436,7 @@ class RGWPostBucketLoggingOp : public RGWDefaultResponseOp {
       ldpp_dout(this, 5) << "INFO: no pending logging object in logging bucket '" << target_bucket_id << "'. new object should be created" << dendl;
     }
     const auto region = driver->get_zone()->get_zonegroup().get_api_name();
-    op_ret = rgw::bucketlogging::rollover_logging_object(configuration, target_bucket, obj_name, this, region, source_bucket, y, true, &objv_tracker, &old_obj);
+    op_ret = rgw::bucketlogging::rollover_logging_object(configuration, target_bucket, obj_name, this, region, source_bucket, y, true, &objv_tracker, false, &old_obj);
     if (op_ret < 0) {
       ldpp_dout(this, 1) << "ERROR: failed to flush pending logging object '" << obj_name << "'"
           << " to logging bucket '" << target_bucket_id << "'. "
index ebf9220510a6a0b7923835aeb00d8855ef5827e5..087d30fc18aca1686117c6b43748815cb968ceee 100644 (file)
@@ -94,7 +94,8 @@ rgw::sal::Driver* DriverManager::init_storage_provider(const DoutPrefixProvider*
                                                     bool quota_threads,
                                                     bool run_sync_thread,
                                                     bool run_reshard_thread,
-                                                     bool run_notification_thread,
+                                                    bool run_notification_thread,
+                                                    bool run_bucket_logging_thread,
                                                     bool use_cache,
                                                     bool use_gc,
                                                     bool background_tasks,
@@ -116,6 +117,7 @@ rgw::sal::Driver* DriverManager::init_storage_provider(const DoutPrefixProvider*
                 .set_run_sync_thread(run_sync_thread)
                 .set_run_reshard_thread(run_reshard_thread)
                 .set_run_notification_thread(run_notification_thread)
+                .set_run_bucket_logging_thread(run_bucket_logging_thread)
                .init_begin(cct, dpp, background_tasks, site_config) < 0) {
       delete driver;
       return nullptr;
@@ -148,6 +150,7 @@ rgw::sal::Driver* DriverManager::init_storage_provider(const DoutPrefixProvider*
                 .set_run_sync_thread(run_sync_thread)
                 .set_run_reshard_thread(run_reshard_thread)
                 .set_run_notification_thread(run_notification_thread)
+                .set_run_bucket_logging_thread(run_bucket_logging_thread)
                .init_begin(cct, dpp, background_tasks, site_config) < 0) {
       delete driver;
       return nullptr;
index 1991882d9d344f39f3555c8b9d6d899bb6b7ce50..0e6c237708d3e2848a9fe94972f2b5de068430cd 100644 (file)
@@ -1053,12 +1053,15 @@ class Bucket {
         RGWObjVersionTracker* objv_tracker) = 0;
     /** Move the pending bucket logging object into the bucket
      if "last_committed" is not null, it will be set to the name of the last committed object
+     if async is true, write the entry to the commit lists to be processed by the BucketLoggingManager
      * */
-    virtual int commit_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp, const std::string& prefix, std::string* last_committed) = 0;
+    virtual int commit_logging_object(const std::string& obj_name, optional_yield y,
+       const DoutPrefixProvider *dpp, const std::string& prefix,
+       std::string* last_committed, bool async) = 0;
     //** Remove the pending bucket logging object */
-    virtual int remove_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp) = 0;
+    virtual int remove_logging_object(const std::string& obj_name, const std::string& prefix, optional_yield y, const DoutPrefixProvider *dpp) = 0;
     /** Write a record to the pending bucket logging object */
-    virtual int write_logging_object(const std::string& obj_name, const std::string& record, optional_yield y, const DoutPrefixProvider *dpp, bool async_completion) = 0;
+    virtual int write_logging_object(const std::string& obj_name, const std::string& record, const std::string& prefix, optional_yield y, const DoutPrefixProvider *dpp, bool async_completion) = 0;
 
     /* dang - This is temporary, until the API is completed */
     virtual rgw_bucket& get_key() = 0;
@@ -1943,6 +1946,7 @@ public:
                                      bool run_sync_thread,
                                      bool run_reshard_thread,
                                      bool run_notification_thread,
+                                     bool run_bucket_logging_thread,
                                      bool background_tasks,
                                      optional_yield y,
                                      bool use_cache = true,
@@ -1956,6 +1960,7 @@ public:
                                                   run_sync_thread,
                                                   run_reshard_thread,
                                                    run_notification_thread,
+                                                  run_bucket_logging_thread,
                                                   use_cache, use_gc,
                                                   background_tasks, y);
     return driver;
@@ -1983,6 +1988,7 @@ public:
                                                bool run_sync_thread,
                                                bool run_reshard_thread,
                                                 bool run_notification_thread,
+                                                bool run_bucket_logging_thread,
                                                bool use_metadata_cache,
                                                bool use_gc, bool background_tasks,
                                                optional_yield y);
index f0624bdcf927698822b59834918fcbf251ebda5a..a9eb9b5264ac075507fcdd93e648dcae236d52df 100644 (file)
@@ -697,14 +697,14 @@ public:
       RGWObjVersionTracker* objv_tracker) override {
     return next->remove_logging_object_name(prefix, y, dpp, objv_tracker);
   }
-  int commit_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp, const std::string& prefix, std::string* last_committed) override {
-    return next->commit_logging_object(obj_name, y, dpp, prefix, last_committed);
+  int commit_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp, const std::string& prefix, std::string* last_committed, bool async) override {
+    return next->commit_logging_object(obj_name, y, dpp, prefix, last_committed, async);
   }
-  int remove_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp) override {
-    return next->remove_logging_object(obj_name, y, dpp);
+  int remove_logging_object(const std::string& obj_name, const std::string& prefix, optional_yield y, const DoutPrefixProvider *dpp) override {
+    return next->remove_logging_object(obj_name, prefix, y, dpp);
   }
-  int write_logging_object(const std::string& obj_name, const std::string& record, optional_yield y, const DoutPrefixProvider *dpp, bool async_completion) override {
-    return next->write_logging_object(obj_name, record, y, dpp, async_completion);
+  int write_logging_object(const std::string& obj_name, const std::string& record, const std::string& prefix, optional_yield y, const DoutPrefixProvider *dpp, bool async_completion) override {
+    return next->write_logging_object(obj_name, record, prefix, y, dpp, async_completion);
   }
 
   virtual rgw_bucket& get_key() override { return next->get_key(); }
index 08499b0bb543cbd4b4b24988ec02bc0684dde764..f2ab7686d278fcdcd502bdc8f775caf6e391efb1 100644 (file)
@@ -266,9 +266,9 @@ class StoreBucket : public Bucket {
         optional_yield y,
         const DoutPrefixProvider *dpp,
         RGWObjVersionTracker* objv_tracker) override { return 0; }
-    int commit_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp, const std::string& prefix, std::string* last_committed) override { return 0; }
-    int remove_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp) override { return 0; }
-    int write_logging_object(const std::string& obj_name, const std::string& record, optional_yield y, const DoutPrefixProvider *dpp, bool async_completion) override {
+    int commit_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp, const std::string& prefix, std::string* last_committed, bool async) override { return 0; }
+    int remove_logging_object(const std::string& obj_name, const std::string& prefix, optional_yield y, const DoutPrefixProvider *dpp) override { return 0; }
+    int write_logging_object(const std::string& obj_name, const std::string& record, const std::string& prefix, optional_yield y, const DoutPrefixProvider *dpp, bool async_completion) override {
       return 0;
     }
 
index 90ff7cef106c4ccf2776468c03efb8a4145fb363..2036a9c8057709c981d087078082bdaa635f63b9 100644 (file)
@@ -301,6 +301,7 @@ void RGWZoneParams::decode_json(JSONObj *obj)
   JSONDecoder::decode_json("topics_pool", topics_pool, obj);
   JSONDecoder::decode_json("account_pool", account_pool, obj);
   JSONDecoder::decode_json("group_pool", group_pool, obj);
+  JSONDecoder::decode_json("bucket_logging_pool", bucket_logging_pool, obj);
   JSONDecoder::decode_json("system_key", system_key, obj);
   JSONDecoder::decode_json("placement_pools", placement_pools, obj);
   JSONDecoder::decode_json("tier_config", tier_config, obj);
@@ -330,6 +331,7 @@ void RGWZoneParams::dump(Formatter *f) const
   encode_json("topics_pool", topics_pool, f);
   encode_json("account_pool", account_pool, f);
   encode_json("group_pool", group_pool, f);
+  encode_json("bucket_logging_pool", bucket_logging_pool, f);
   encode_json_plain("system_key", system_key, f);
   encode_json("placement_pools", placement_pools, f);
   encode_json("tier_config", tier_config, f);
@@ -479,6 +481,7 @@ void add_zone_pools(const RGWZoneParams& info,
   pools.insert(info.dedup_pool);
   pools.insert(info.gc_pool);
   pools.insert(info.log_pool);
+  pools.insert(info.bucket_logging_pool);
   pools.insert(info.intent_log_pool);
   pools.insert(info.usage_log_pool);
   pools.insert(info.user_keys_pool);
@@ -1295,6 +1298,7 @@ int init_zone_pool_names(const DoutPrefixProvider *dpp, optional_yield y,
   info.otp_pool = fix_zone_pool_dup(pools, info.name, ".rgw.otp", info.otp_pool);
   info.oidc_pool = fix_zone_pool_dup(pools, info.name, ".rgw.meta:oidc", info.oidc_pool);
   info.notif_pool = fix_zone_pool_dup(pools, info.name, ".rgw.log:notif", info.notif_pool);
+  info.bucket_logging_pool = fix_zone_pool_dup(pools, info.name, ".rgw.log:logging", info.bucket_logging_pool);
   info.topics_pool =
       fix_zone_pool_dup(pools, info.name, ".rgw.meta:topics", info.topics_pool);
   info.account_pool = fix_zone_pool_dup(pools, info.name, ".rgw.meta:accounts", info.account_pool);
index 2146de6ed8a93d1eb2b40b650d0e14c4265dc964..0786868d78cc2a59b6e842638427c9401ecb3577 100644 (file)
@@ -52,6 +52,7 @@
     bucket radoslist                 list rados objects backing bucket's objects
     bucket logging flush             flush pending log records object of source bucket to the log bucket
     bucket logging info              get info on bucket logging configuration on source bucket or list of sources in log bucket
+    bucket logging list              list the log objects pending commit for the source bucket
     bi get                           retrieve bucket index object entries
     bi put                           store bucket index object entries
     bi list                          list raw bucket index entries
index f7be1abd2d43f07a7b0e4dda539e0a67775549f3..c369919ba81d1737ceba628f49cca1250541907e 100644 (file)
@@ -351,7 +351,7 @@ int main(int argc, const char **argv)
                              false,
                              false,
                              false,
-                              true, true, null_yield, 
+                              true, true, true, null_yield, 
                              false));
   if (!store) {
     std::cerr << "couldn't init storage provider" << std::endl;