From fcc52298b0e116587fd360b38f8703e5d504e29d Mon Sep 17 00:00:00 2001 From: Nithya Balachandran Date: Thu, 27 Nov 2025 12:03:36 +0000 Subject: [PATCH] rgw/bucket-logging: support for EC pools Log buckets can now be created within erasure-coded (EC) pools. To support append operations, a temporary log record object is initially created in the replicated default.rgw.log pool. This object is then copied to the EC pool upon log record commitment. All implicit log commit operations will execute asynchronously. A new BucketLoggingManager class is responsible for processing these pending commits at set intervals. Explicit commit operations, however, will continue to be performed synchronously. Fixes: https://tracker.ceph.com/issues/71365 Signed-off-by: Nithya Balachandran --- PendingReleaseNotes | 2 + doc/rados/troubleshooting/log-and-debug.rst | 2 + doc/radosgw/bucket_logging.rst | 16 + src/common/subsys.h | 1 + src/rgw/CMakeLists.txt | 1 + src/rgw/driver/rados/rgw_bl_rados.cc | 832 ++++++++++++++++++++ src/rgw/driver/rados/rgw_bl_rados.h | 41 + src/rgw/driver/rados/rgw_rados.cc | 21 +- src/rgw/driver/rados/rgw_rados.h | 12 + src/rgw/driver/rados/rgw_sal_rados.cc | 177 ++++- src/rgw/driver/rados/rgw_sal_rados.h | 8 +- src/rgw/radosgw-admin/radosgw-admin.cc | 56 +- src/rgw/rgw_appmain.cc | 1 + src/rgw/rgw_bucket_logging.cc | 15 +- src/rgw/rgw_bucket_logging.h | 1 + src/rgw/rgw_object_expirer.cc | 2 +- src/rgw/rgw_realm_reloader.cc | 5 +- src/rgw/rgw_rest_bucket_logging.cc | 3 +- src/rgw/rgw_sal.cc | 5 +- src/rgw/rgw_sal.h | 14 +- src/rgw/rgw_sal_filter.h | 12 +- src/rgw/rgw_sal_store.h | 6 +- src/rgw/rgw_zone.cc | 4 + src/rgw/rgw_zone.h | 11 +- src/test/cli/radosgw-admin/help.t | 1 + src/test/rgw/rgw_cr_test.cc | 2 +- 26 files changed, 1204 insertions(+), 47 deletions(-) create mode 100644 src/rgw/driver/rados/rgw_bl_rados.cc create mode 100644 src/rgw/driver/rados/rgw_bl_rados.h diff --git a/PendingReleaseNotes b/PendingReleaseNotes index f056f8e8c61..1bdbd1a7e3a 100644 --- a/PendingReleaseNotes +++ b/PendingReleaseNotes @@ -1,3 +1,5 @@ +* RGW: Bucket Logging suppports creating log buckets in EC pools. + Implicit logging object commits are now performed asynchronously. * RGW: OpenSSL engine support is deprecated in favor of provider support. - Removed the `openssl_engine_opts` configuration option. OpenSSL engine configuration in string format is no longer supported. - Added the `openssl_conf` configuration option for loading specified providers as default providers. diff --git a/doc/rados/troubleshooting/log-and-debug.rst b/doc/rados/troubleshooting/log-and-debug.rst index 66996b5e4ae..89d0f4c9988 100644 --- a/doc/rados/troubleshooting/log-and-debug.rst +++ b/doc/rados/troubleshooting/log-and-debug.rst @@ -306,6 +306,8 @@ values to their defaults or to a level suitable for normal operations. +--------------------------+-----------+--------------+ | ``rgw notification`` | 1 | 5 | +--------------------------+-----------+--------------+ +| ``rgw bucket logging`` | 1 | 5 | ++--------------------------+-----------+--------------+ | ``javaclient`` | 1 | 5 | +--------------------------+-----------+--------------+ | ``asok`` | 1 | 5 | diff --git a/doc/radosgw/bucket_logging.rst b/doc/radosgw/bucket_logging.rst index 78313fe084a..3915ae59a71 100644 --- a/doc/radosgw/bucket_logging.rst +++ b/doc/radosgw/bucket_logging.rst @@ -54,6 +54,22 @@ them, regardless if enough time passed or if no more records are written to the object. Flushing will happen automatically when logging is disabled on a bucket, or its logging configuration is changed, or the bucket is deleted. +The process of adding a new log object to the log bucket is asynchronous when +triggered by a log record being written. Consequently, the operation that +generated the log is completed immediately and does not wait for the log object +addition to finish; this addition occurs later. +The log object is added to the log bucket immediately when flushed. Consequently, +this action may result in temporary gaps in the log records within the bucket +until any pending log objects are also added. + +To check which log objects for a source bucket are currently pending addition to +the log bucket, execute the following command: + +.. prompt:: bash # + + radosgw-admin bucket logging list --bucket + + Standard ```````` If the logging type is set to "Standard" (the default) the log records are diff --git a/src/common/subsys.h b/src/common/subsys.h index f307929f621..523f7cf3118 100644 --- a/src/common/subsys.h +++ b/src/common/subsys.h @@ -69,6 +69,7 @@ SUBSYS(rgw_flight, 1, 5) SUBSYS(rgw_lifecycle, 1, 5) SUBSYS(rgw_restore, 1, 5) SUBSYS(rgw_notification, 1, 5) +SUBSYS(rgw_bucket_logging, 1, 5) SUBSYS(javaclient, 1, 5) SUBSYS(asok, 1, 5) SUBSYS(throttle, 1, 1) diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index f52dd9e5864..5c84d92cbf0 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -174,6 +174,7 @@ if(WITH_RADOSGW_RADOS) driver/rados/group.cc driver/rados/groups.cc driver/rados/rgw_bucket.cc + driver/rados/rgw_bl_rados.cc driver/rados/rgw_cr_rados.cc driver/rados/rgw_cr_tools.cc driver/rados/rgw_d3n_datacache.cc diff --git a/src/rgw/driver/rados/rgw_bl_rados.cc b/src/rgw/driver/rados/rgw_bl_rados.cc new file mode 100644 index 00000000000..14b38533429 --- /dev/null +++ b/src/rgw/driver/rados/rgw_bl_rados.cc @@ -0,0 +1,832 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "rgw_bucket_logging.h" +#include "rgw_bl_rados.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "common/async/yield_context.h" +#include "common/async/yield_waiter.h" +#include "common/ceph_time.h" +#include "common/dout.h" +#include "cls/lock/cls_lock_client.h" +#include "include/common_fwd.h" +#include "include/function2.hpp" +#include "librados/AioCompletionImpl.h" +#include "services/svc_zone.h" +#include "rgw_common.h" +#include "rgw_perf_counters.h" +#include "rgw_sal_rados.h" +#include "rgw_zone_features.h" + + +#define dout_subsys ceph_subsys_rgw_bucket_logging + +namespace rgw::bucketlogging { + +using commit_list_t = std::set; + +// use mmap/mprotect to allocate 128k coroutine stacks +auto make_stack_allocator() { + return boost::context::protected_fixedsize_stack{128*1024}; +} + +const std::string COMMIT_LIST_OBJECT_NAME = "bucket_logging_global_commit_list"; +static const std::string TEMP_POOL_ATTR = "temp_logging_pool"; + +static std::string get_commit_list_name (const rgw::sal::Bucket* log_bucket, + const std::string& prefix) { + return fmt::format("{}/{}/{}/{}", log_bucket->get_tenant(), + log_bucket->get_name(), + log_bucket->get_bucket_id(), prefix); +} + +int add_commit_target_entry(const DoutPrefixProvider* dpp, + rgw::sal::RadosStore* store, + const rgw::sal::Bucket* log_bucket, + const std::string& prefix, + const std::string& obj_name, + const std::string& tail_obj_name, + const rgw_pool& temp_data_pool, + optional_yield y) { + + auto& ioctx_logging = store->getRados()->get_logging_pool_ctx(); + std::string target_obj_name = get_commit_list_name (log_bucket, prefix); + { + librados::ObjectWriteOperation op; + op.create(false); + + bufferlist bl; + bl.append(tail_obj_name); + std::map new_commit_list{{obj_name, bl}}; + op.omap_set(new_commit_list); + + bufferlist pool_bl; + encode(temp_data_pool, pool_bl); + op.setxattr(TEMP_POOL_ATTR.c_str(), pool_bl); + + auto ret = rgw_rados_operate(dpp, ioctx_logging, target_obj_name, std::move(op), y); + if (ret < 0){ + ldpp_dout(dpp, 1) << "ERROR: failed to add logging object entry " << obj_name + << " to commit list object:" << target_obj_name + << ". ret = " << ret << dendl; + return ret; + } + } + + bufferlist empty_bl; + std::map new_commit_list{{target_obj_name, empty_bl}}; + + librados::ObjectWriteOperation op; + op.create(false); + op.omap_set(new_commit_list); + + auto ret = rgw_rados_operate(dpp, ioctx_logging, COMMIT_LIST_OBJECT_NAME, + std::move(op), y); + if (ret < 0) { + ldpp_dout(dpp, 1) << "ERROR: failed to add entry " << target_obj_name + << " to global bucket logging list object: " + << COMMIT_LIST_OBJECT_NAME << ". ret = " << ret << dendl; + return ret; + } + ldpp_dout(dpp, 20) << "INFO: added entry " << target_obj_name + << " to global bucket logging object: " + << COMMIT_LIST_OBJECT_NAME << dendl; + return 0; +} + +int list_pending_commit_objects(const DoutPrefixProvider* dpp, + rgw::sal::RadosStore* store, + const rgw::sal::Bucket* log_bucket, + const std::string& prefix, + std::set& entries, + optional_yield y) { + + auto& ioctx_logging = store->getRados()->get_logging_pool_ctx(); + std::string target_obj_name = get_commit_list_name (log_bucket, prefix); + + entries.clear(); + + constexpr auto max_chunk = 1024U; + std::string start_after; + bool more = true; + int rval; + while (more) { + librados::ObjectReadOperation op; + std::set entries_chunk; + op.omap_get_keys2(start_after, max_chunk, &entries_chunk, &more, &rval); + const auto ret = rgw_rados_operate(dpp, ioctx_logging, target_obj_name, + std::move(op), nullptr, y); + if (ret == -ENOENT) { + // log commit list object was not created - nothing to do + return 0; + } + if (ret < 0) { + // TODO: do we need to check on rval as well as ret? + ldpp_dout(dpp, 1) << "ERROR: failed to read logging commit list " + << target_obj_name << " ret: " << ret << dendl; + return ret; + } + entries.merge(entries_chunk); + if (more) { + start_after = *entries.rbegin(); + } + } + return 0; +} + +class BucketLoggingManager : public DoutPrefixProvider { + using Executor = boost::asio::io_context::executor_type; + bool m_shutdown = false; + static constexpr auto m_log_commits_update_period = std::chrono::milliseconds(10000); // 10s + static constexpr auto m_log_commits_update_retry = std::chrono::milliseconds(1000); // 1s + static constexpr auto m_log_commits_idle_sleep = std::chrono::milliseconds(5000); // 5s + const utime_t m_failover_time = utime_t(m_log_commits_update_period*9); // 90s + CephContext* const m_cct; + static constexpr auto COOKIE_LEN = 16; + const std::string m_lock_cookie; + const std::string m_lock_name = "bl_mgr_lock"; + boost::asio::io_context m_io_context; + boost::asio::executor_work_guard m_work_guard; + std::thread m_worker; + const SiteConfig& m_site; + rgw::sal::RadosStore* const m_rados_store; + +private: + + CephContext *get_cct() const override { return m_cct; } + + unsigned get_subsys() const override { return dout_subsys; } + + std::ostream& gen_prefix(std::ostream& out) const override { + return out << "rgw bucket_logging: "; + } + + int parse_list_name(std::string list_obj_name, std::string &tenant_name, + std::string &bucket_name, std::string &bucket_id, + std::string &prefix) { + // /// + size_t pstart = 0, pend = 0; + pend =list_obj_name.find('/'); + if (pend == std::string::npos) { + return -EINVAL; + } + tenant_name = list_obj_name.substr(pstart, pend); + + pstart = pend + 1; + pend =list_obj_name.find('/', pstart); + if (pend == std::string::npos) { + return -EINVAL; + } + bucket_name = list_obj_name.substr(pstart, pend - pstart); + if (bucket_name.empty()) { + return -EINVAL; + } + + pstart = pend + 1; + pend =list_obj_name.find('/', pstart); + if (pend == std::string::npos) { + return -EINVAL; + } + bucket_id = list_obj_name.substr(pstart, pend - pstart); + if (bucket_id.empty()) { + return -EINVAL; + } + prefix = list_obj_name.substr(pend + 1); + + /* + //TODO: Fix this + std::regex pattern(R"(([^/]+)/([^/]+)/([^/]+)/(.*))"); + std::smatch matches; + + if (std::regex_match(list_obj_name, matches, pattern)) { + tenant_name = matches[1].str(); + bucket_name = matches[2].str(); + bucket_id = matches[3].str(); + prefix = matches[4].str(); + } else { + return -EINVAL; + } +*/ + return 0; + } + + // read the list of commit lists from the global list object + int read_global_logging_list(commit_list_t& commits, optional_yield y) { + constexpr auto max_chunk = 1024U; + std::string start_after; + bool more = true; + int rval; + while (more) { + librados::ObjectReadOperation op; + commit_list_t commits_chunk; + op.omap_get_keys2(start_after, max_chunk, &commits_chunk, &more, &rval); + const auto ret = rgw_rados_operate(this, m_rados_store->getRados()->get_logging_pool_ctx(), + COMMIT_LIST_OBJECT_NAME, std::move(op), + nullptr, y); + if (ret == -ENOENT) { + // global commit list object was not created - nothing to do + return 0; + } + if (ret < 0) { + // TODO: do we need to check on rval as well as ret? + ldpp_dout(this, 5) << "ERROR: failed to read bucket logging global commit list. error: " + << ret << dendl; + return ret; + } + commits.merge(commits_chunk); + if (more) { + start_after = *commits.rbegin(); + } + } + return 0; + } + + class tokens_waiter { + size_t pending_tokens = 0; + DoutPrefixProvider* const dpp; + ceph::async::yield_waiter waiter; + + public: + class token{ + tokens_waiter* tw; + public: + token(const token& other) = delete; + token(token&& other) : tw(other.tw) { + other.tw = nullptr; // mark as moved + } + token& operator=(const token& other) = delete; + token(tokens_waiter* _tw) : tw(_tw) { + ++tw->pending_tokens; + } + + ~token() { + if (!tw) { + return; // already moved + } + --tw->pending_tokens; + if (tw->pending_tokens == 0 && tw->waiter) { + tw->waiter.complete(boost::system::error_code{}); + } + } + }; + + tokens_waiter(DoutPrefixProvider* _dpp) : dpp(_dpp) {} + tokens_waiter(const tokens_waiter& other) = delete; + tokens_waiter& operator=(const tokens_waiter& other) = delete; + + void async_wait(boost::asio::yield_context yield) { + if (pending_tokens == 0) { + return; + } + ldpp_dout(dpp, 20) << "INFO: tokens waiter is waiting on " + << pending_tokens << " tokens" << dendl; + boost::system::error_code ec; + waiter.async_wait(yield[ec]); + ldpp_dout(dpp, 20) << "INFO: tokens waiter finished waiting for all tokens" << dendl; + } + }; + + // processing of a specific entry + // return whether processing was successful (true) or not (false) + int process_entry( + const ConfigProxy& conf, + const std::string& entry, + const std::string& temp_obj_name, + const std::string& tenant_name, + const std::string& bucket_name, + const std::string& bucket_id, + const std::string& prefix, + const rgw_pool& obj_pool, + bool& bucket_deleted, + boost::asio::yield_context yield) { + + std::unique_ptr log_bucket; + rgw_bucket bucket = rgw_bucket{tenant_name, bucket_name, bucket_id}; + int ret = m_rados_store->load_bucket(this, bucket, &log_bucket, yield); + if (ret < 0 && ret != -ENOENT) { + ldpp_dout(this, 1) << "ERROR: failed to load bucket : " + << bucket_name << " id : " << bucket_id + << ", error: " << ret << dendl; + return ret; + } + if (ret == 0) { + ldpp_dout(this, 20) << "INFO: loaded bucket : " << bucket_name + << ", id: "<< bucket_id << dendl; + ret = log_bucket->commit_logging_object(entry, yield, this, prefix, + nullptr, false); + if (ret < 0) { + ldpp_dout(this, 1) << "ERROR: failed to commit logging object : " + << entry << " , error: " << ret << dendl; + return ret; + } + ldpp_dout(this, 20) << "INFO: committed logging object : " << entry + << " to bucket " << bucket_name + << ", id: "<< bucket_id << dendl; + return 0; + } + // The log bucket has been deleted. Clean up pending objects. + // This is done because the bucket_deletion_cleanup only removes + // the current temp log objects. + bucket_deleted = true; + ldpp_dout(this, 20) << "INFO: log bucket : " << bucket_name + << " no longer exists. Removing pending logging object " + << temp_obj_name << " from pool " << obj_pool + << dendl; + ret = rgw_delete_system_obj(this, m_rados_store->svc()->sysobj, obj_pool, + temp_obj_name, nullptr, yield); + if (ret < 0 && ret != -ENOENT) { + ldpp_dout(this, 1) << "ERROR: failed to delete pending logging object : " + << temp_obj_name << " in pool " << obj_pool + << " for deleted log bucket : " << bucket_name + << " . ret = "<< ret << dendl; + return ret; + } + ldpp_dout(this, 20) << "INFO: Deleted pending logging object " + << temp_obj_name << " from pool " << obj_pool + << " for deleted log bucket : " << bucket_name + << dendl; + return 0; + } + + void async_sleep(boost::asio::yield_context yield, const std::chrono::milliseconds& duration) { + using Clock = ceph::coarse_mono_clock; + using Timer = boost::asio::basic_waitable_timer, Executor>; + Timer timer(m_io_context); + timer.expires_after(duration); + boost::system::error_code ec; + timer.async_wait(yield[ec]); + if (ec) { + ldpp_dout(this, 10) << "ERROR: async_sleep failed with error: " + << ec.message() << dendl; + } + } + + // unlock (lose ownership) list object + int unlock_list_object(librados::IoCtx& rados_ioctx, + const std::string& list_obj_name, + boost::asio::yield_context yield) { + + librados::ObjectWriteOperation op; + op.assert_exists(); + rados::cls::lock::unlock(&op, m_lock_name, m_lock_cookie); + const auto ret = rgw_rados_operate(this, rados_ioctx, list_obj_name, + std::move(op), yield); + if (ret == -ENOENT) { + ldpp_dout(this, 20) << "INFO: log commit list: " << list_obj_name + << " was removed. Nothing to unlock." << dendl; + return 0; + } + if (ret == -EBUSY) { + ldpp_dout(this, 20) << "INFO: log commit list: " << list_obj_name + << " already owned by another RGW. No need to unlock." + << dendl; + return 0; + } + return ret; + } + + int remove_commit_list(librados::IoCtx& rados_ioctx, + const std::string& commit_list_name, + optional_yield y) { + { + librados::ObjectWriteOperation op; + rados::cls::lock::assert_locked(&op, m_lock_name, + ClsLockType::EXCLUSIVE, + m_lock_cookie, + "" /*no tag*/); + op.remove(); + auto ret = rgw_rados_operate(this, rados_ioctx, commit_list_name, std::move(op), y); + if (ret < 0 && ret != -ENOENT) { + ldpp_dout(this, 1) << "ERROR: failed to remove bucket logging commit list :" + << commit_list_name << ". ret = " << ret << dendl; + return ret; + } + } + + librados::ObjectWriteOperation op; + op.omap_rm_keys({commit_list_name}); + auto ret = rgw_rados_operate(this, rados_ioctx, COMMIT_LIST_OBJECT_NAME, + std::move(op), y); + if (ret < 0) { + ldpp_dout(this, 1) << "ERROR: failed to remove entry " << commit_list_name + << " from the global bucket logging list : " + << COMMIT_LIST_OBJECT_NAME << ". ret = " << ret << dendl; + return ret; + } + ldpp_dout(this, 20) << "INFO: removed entry " << commit_list_name + << " from the global bucket logging list : " + << COMMIT_LIST_OBJECT_NAME << dendl; + + return 0; + } + + // processing of a specific target list + void process_commit_list(librados::IoCtx& rados_ioctx, + const std::string& list_obj_name, + boost::asio::yield_context yield) { + auto is_idle = false; + auto bucket_deleted = false; + auto skip_bucket_deletion = false; + const std::string start_marker; + + std::string tenant_name, bucket_name, bucket_id, prefix; + int ret = parse_list_name(list_obj_name, tenant_name, bucket_name, + bucket_id, prefix); + if (ret) { + ldpp_dout(this, 1) << "ERROR: commit list name: " << list_obj_name + << " is invalid. Processing will stop." << dendl; + return; + } + ldpp_dout(this, 20) << "INFO: parse commit list name: " << list_obj_name + << ", bucket_name: " << bucket_name + << ", bucket_id: " << bucket_id + << ", tenant_name: " << tenant_name + << ", prefix: " << prefix << dendl; + + while (!m_shutdown) { + // if the list was empty the last time, sleep for idle timeout + if (is_idle) { + async_sleep(yield, m_log_commits_idle_sleep); + } + + // Get the entries in the list after locking it + is_idle = true; + skip_bucket_deletion = false; + std::map entries; + rgw_pool temp_data_pool; + auto total_entries = 0U; + { + librados::ObjectReadOperation op; + op.assert_exists(); + bufferlist obl; + int rval; + bufferlist pool_obl; + int pool_rval; + constexpr auto max_chunk = 1024U; + std::string start_after; + bool more = true; + rados::cls::lock::assert_locked(&op, m_lock_name, + ClsLockType::EXCLUSIVE, + m_lock_cookie, + "" /*no tag*/); + op.omap_get_vals2(start_after, max_chunk, &entries, &more, &rval); + op.getxattr(TEMP_POOL_ATTR.c_str(), &pool_obl, &pool_rval); + // check ownership and list entries in one batch + auto ret = rgw_rados_operate(this, rados_ioctx, list_obj_name, + std::move(op), nullptr, yield); + if (ret == -ENOENT) { + // list object was deleted + ldpp_dout(this, 20) << "INFO: object: " << list_obj_name + << " was removed. Processing will stop" << dendl; + return; + } + if (ret == -EBUSY) { + ldpp_dout(this, 10) << "WARNING: commit list : " << list_obj_name + << " ownership moved to another daemon. Processing will stop" + << dendl; + return; + } + if (ret < 0) { + ldpp_dout(this, 10) << "WARNING: failed to get list of entries in " + << "and/or lock object: " << list_obj_name + << ". error: " << ret << " (will retry)" << dendl; + continue; + } + if(pool_rval < 0) { + ldpp_dout(this, 10) << "WARNING: failed to get pool information" + << "from commit list: " << list_obj_name << dendl; + return; + } + // Get temp logging object pool information - this is required for + // deleting the objects when the log bucket is deleted. + try { + auto p = pool_obl.cbegin(); + decode(temp_data_pool, p); + } catch (buffer::error& err) { + ldpp_dout(this, 1) << "ERROR: failed to get pool information from object: " + << list_obj_name << dendl; + return; + } + } + total_entries = entries.size(); + if (total_entries == 0) { + // nothing in the list object + // Delete the list object if the bucket has been deleted + std::unique_ptr log_bucket; + rgw_bucket bucket = rgw_bucket{tenant_name, bucket_name, bucket_id}; + int ret = m_rados_store->load_bucket(this, bucket, &log_bucket, yield); + if (ret == -ENOENT) { + ldpp_dout(this, 20) << "ERROR: failed to load bucket : " + << bucket_name << ", id : " << bucket_id + << ", error: " << ret << dendl; + bucket_deleted = true; + } + } else { + // log when not idle + ldpp_dout(this, 20) << "INFO: found: " << total_entries + << " entries in commit list: " << list_obj_name + << dendl; + } + auto stop_processing = false; + std::set remove_entries; + tokens_waiter tw(this); + for (auto const& [key, val] : entries) { + if (stop_processing) { + break; + } + std::string temp_obj_name = val.to_str(); + + ldpp_dout(this, 20) << "INFO: processing entry: " << key + << ", value : " << temp_obj_name << dendl; + + tokens_waiter::token token(&tw); + boost::asio::spawn(yield, std::allocator_arg, make_stack_allocator(), + [this, &is_idle, &list_obj_name, &remove_entries, &stop_processing, + &tenant_name, &bucket_name, &bucket_id, &prefix, token = std::move(token), + key, temp_obj_name, temp_data_pool, &bucket_deleted , &skip_bucket_deletion] + (boost::asio::yield_context yield) { + auto result = + process_entry(this->get_cct()->_conf, key, temp_obj_name, tenant_name, + bucket_name, bucket_id, prefix, temp_data_pool, + bucket_deleted, yield); + if (result == 0) { + ldpp_dout(this, 20) << "INFO: processing of entry: " << key + << " from: " << list_obj_name + << " was successful." << dendl; + remove_entries.insert(key); + is_idle = false; + return; + } else { + is_idle = true; + // Don't delete the bucket if we couldn't process the entry + // and the bucket has been deleted + skip_bucket_deletion = true; + ldpp_dout(this, 20) << "INFO: failed to process entry: " << key + << " from: " << list_obj_name << dendl; + } + stop_processing = true; + }, [] (std::exception_ptr eptr) { + if (eptr) std::rethrow_exception(eptr); + }); + } + + if (!entries.empty()) { + // wait for all pending work to finish + tw.async_wait(yield); + } + + if (bucket_deleted && !skip_bucket_deletion) { + ret = remove_commit_list(rados_ioctx, list_obj_name, yield); + if (ret < 0) { + ldpp_dout(this, 1) << "ERROR: failed to remove bucket logging commit list :" + << list_obj_name << ". ret = " << ret << dendl; + return; + } + ldpp_dout(this, 10) << "INFO: bucket :" << bucket_name + << ". was removed. processing will stop" << dendl; + return; + } + // delete all processed entries from list + if (!remove_entries.empty()) { + // Delete the entries even if the lock no longer belongs + // to this instance as we have processed the entries. + librados::ObjectWriteOperation op; + op.omap_rm_keys(remove_entries); + auto ret = rgw_rados_operate(this, rados_ioctx, list_obj_name, std::move(op), yield); + if (ret == -ENOENT) { + // list was deleted + ldpp_dout(this, 10) << "INFO: commit list " << list_obj_name + << ". was removed. processing will stop" << dendl; + return; + } + if (ret < 0) { + ldpp_dout(this, 1) << "ERROR: failed to remove entries from commit list: " + << list_obj_name << ". error: " << ret + << ". This could cause some log records to be lost." + << dendl; + return; + } else { + ldpp_dout(this, 20) << "INFO: removed entries from commit list: " + << list_obj_name << dendl; + } + } + } + ldpp_dout(this, 5) << "INFO: BucketLogging manager stopped processing : " + << list_obj_name << dendl; + } + + // process all commit log objects + // find which of the commit log objects is owned by this daemon and process it + void process_global_logging_list(boost::asio::yield_context yield) { + auto has_error = false; + std::unordered_set owned_commit_logs; + std::atomic processed_list_count = 0; + + std::vector commit_list_gc; + std::mutex commit_list_gc_lock; + auto& rados_ioctx = m_rados_store->getRados()->get_logging_pool_ctx(); + auto next_check_time = ceph::coarse_real_clock::zero(); + while (!m_shutdown) { + // check if log commit list needs to be refreshed + if (ceph::coarse_real_clock::now() > next_check_time) { + next_check_time = ceph::coarse_real_clock::now() + m_log_commits_update_period; + const auto tp = ceph::coarse_real_time::clock::to_time_t(next_check_time); + ldpp_dout(this, 20) << "INFO: processing global logging commit list. next " + << "processing will happen at: " << std::ctime(&tp) + << dendl; + } else { + // short sleep duration to prevent busy wait when refreshing list + // or retrying after error + ldpp_dout(this, 20) << "INFO: processing global logging commit list. Sleep now." << dendl; + async_sleep(yield, m_log_commits_update_retry); + if (!has_error) { + // in case of error we will retry + continue; + } + } + + std::set commit_lists; + auto ret = read_global_logging_list(commit_lists, yield); + if (ret < 0) { + has_error = true; + ldpp_dout(this, 1) << "ERROR: failed to read global logging list. Retry. ret:" + << ret << dendl; + continue; + } + + for (const auto& list_obj_name : commit_lists) { + // try to lock the object to check if it is owned by this rgw + // or if ownership needs to be taken + ldpp_dout(this, 20) << "INFO: processing commit list: " << list_obj_name + << dendl; + librados::ObjectWriteOperation op; + op.assert_exists(); + rados::cls::lock::lock(&op, m_lock_name, ClsLockType::EXCLUSIVE, + m_lock_cookie, "" /*no tag*/, + "" /*no description*/, m_failover_time, + LOCK_FLAG_MAY_RENEW); + + ret = rgw_rados_operate(this, rados_ioctx, list_obj_name, std::move(op), yield); + if (ret == -EBUSY) { + // lock is already taken by another RGW + ldpp_dout(this, 20) << "INFO: commit list: " << list_obj_name + << " owned (locked) by another daemon" << dendl; + // if the list was owned by this RGW, processing should be stopped, would be deleted from list afterwards + continue; + } + if (ret == -ENOENT) { + // commit list is deleted - processing will stop the next time we try to read from it + ldpp_dout(this, 20) << "INFO: commit list: " << list_obj_name + << " should not be locked - already deleted" << dendl; + continue; + } + if (ret < 0) { + // failed to lock for another reason, continue to process other lists + ldpp_dout(this, 10) << "ERROR: failed to lock commit list: " + << list_obj_name << ". error: " << ret << dendl; + has_error = true; + continue; + } + // add the commit list to the list of owned commit_logs + if (owned_commit_logs.insert(list_obj_name).second) { + ldpp_dout(this, 20) << "INFO: " << list_obj_name << " now owned (locked) by this daemon" << dendl; + // start processing this list + boost::asio::spawn(make_strand(m_io_context), std::allocator_arg, make_stack_allocator(), + [this, &commit_list_gc, &commit_list_gc_lock, &rados_ioctx, + list_obj_name, &processed_list_count](boost::asio::yield_context yield) { + ++processed_list_count; + process_commit_list(rados_ioctx, list_obj_name, yield); + // if list processing ended, it means that the list object was removed or not owned anymore + const auto ret = unlock_list_object(rados_ioctx, list_obj_name, yield); + if (ret < 0) { + ldpp_dout(this, 15) << "WARNING: failed to unlock commit list: " + << list_obj_name << " with error: " << ret + << " (ownership would still move if not renewed)" + << dendl; + } else { + ldpp_dout(this, 20) << "INFO: commit list: " << list_obj_name + << " not locked (ownership can move)" << dendl; + } + // mark it for deletion + std::lock_guard lock_guard(commit_list_gc_lock); + commit_list_gc.push_back(list_obj_name); + --processed_list_count; + ldpp_dout(this, 20) << "INFO: " << list_obj_name << " marked for removal" << dendl; + }, [this, list_obj_name] (std::exception_ptr eptr) { + ldpp_dout(this, 10) << "ERROR: " << list_obj_name << " processing failed" << dendl; + if (eptr) std::rethrow_exception(eptr); + }); + } else { + ldpp_dout(this, 20) << "INFO: " << list_obj_name << " ownership (lock) renewed" << dendl; + } + } + // erase all list objects that were deleted + { + std::lock_guard lock_guard(commit_list_gc_lock); + std::for_each(commit_list_gc.begin(), commit_list_gc.end(), [this, &owned_commit_logs](const std::string& list_obj_name) { + owned_commit_logs.erase(list_obj_name); + ldpp_dout(this, 20) << "INFO: commit list object: " << list_obj_name << " was removed" << dendl; + }); + commit_list_gc.clear(); + } + } + while (processed_list_count > 0) { + ldpp_dout(this, 20) << "INFO: BucketLogging manager stopped. " + << processed_list_count + << " commit lists are still being processed" << dendl; + async_sleep(yield, m_log_commits_update_retry); + } + ldpp_dout(this, 5) << "INFO: BucketLogging manager stopped. Done processing all commit lists" << dendl; + } + +public: + + ~BucketLoggingManager() { + } + + void stop() { + ldpp_dout(this, 5) << "INFO: BucketLogging manager received stop signal. shutting down..." << dendl; + m_shutdown = true; + m_work_guard.reset(); + if (m_worker.joinable()) { + // try graceful shutdown first + auto future = std::async(std::launch::async, [this]() {m_worker.join();}); + if (future.wait_for(m_log_commits_update_retry*2) == std::future_status::timeout) { + // force stop if graceful shutdown takes too long + if (!m_io_context.stopped()) { + ldpp_dout(this, 5) << "INFO: force shutdown of BucketLogging Manager" << dendl; + m_io_context.stop(); + } + m_worker.join(); + } + } + ldpp_dout(this, 5) << "INFO: BucketLogging manager shutdown complete" << dendl; + } + + void init() { + boost::asio::spawn(make_strand(m_io_context), std::allocator_arg, make_stack_allocator(), + [this](boost::asio::yield_context yield) { + process_global_logging_list(yield); + }, [] (std::exception_ptr eptr) { + if (eptr) std::rethrow_exception(eptr); + }); + // start the worker threads to do the actual list processing + m_worker = std::thread([this]() { + ceph_pthread_setname("bucket-logging-worker"); + try { + ldpp_dout(this, 20) << "INFO: bucket logging worker started" << dendl; + m_io_context.run(); + ldpp_dout(this, 20) << "INFO: bucket logging worker ended" << dendl; + } catch (const std::exception& err) { + ldpp_dout(this, 10) << "ERROR: bucket logging worker failed with error: " + << err.what() << dendl; + throw err; + } + }); + ldpp_dout(this, 10) << "INFO: started bucket logging manager" << dendl; + } + + BucketLoggingManager(CephContext* _cct, rgw::sal::RadosStore* store, const SiteConfig& site) : + m_cct(_cct), + m_lock_cookie(gen_rand_alphanumeric(m_cct, COOKIE_LEN)), + m_work_guard(boost::asio::make_work_guard(m_io_context)), + m_site(site), + m_rados_store(store) + { + ldpp_dout(this, 5) << "INFO: BucketLoggingManager() constructor" << dendl; + } +}; + +std::unique_ptr s_manager; + +bool init(const DoutPrefixProvider* dpp, rgw::sal::RadosStore* store, + const SiteConfig& site) { + if (s_manager) { + ldpp_dout(dpp, 1) << "ERROR: failed to init BucketLogging manager: already exists" << dendl; + return false; + } + // TODO: take conf from CephContext + ldpp_dout(dpp, 1) << "INFO: initialising BucketLogging manager" << dendl; + s_manager = std::make_unique(dpp->get_cct(), store, site); + s_manager->init(); + return true; +} + +void shutdown() { + if (!s_manager) return; + s_manager->stop(); + s_manager.reset(); +} + +} // namespace rgw::bucket_logging diff --git a/src/rgw/driver/rados/rgw_bl_rados.h b/src/rgw/driver/rados/rgw_bl_rados.h new file mode 100644 index 00000000000..1075adb8aa5 --- /dev/null +++ b/src/rgw/driver/rados/rgw_bl_rados.h @@ -0,0 +1,41 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#pragma once + +#include + +// forward declarations +namespace rgw { class SiteConfig; } +namespace rgw::sal { + class RadosStore; +} +class DoutPrefixProvider; + +namespace rgw::bucketlogging { + + // initialize the bucket logging commit manager + bool init(const DoutPrefixProvider* dpp, rgw::sal::RadosStore* store, + const rgw::SiteConfig& site); + + // shutdown the bucket logging commit manager + void shutdown(); + + int add_commit_target_entry(const DoutPrefixProvider* dpp, + rgw::sal::RadosStore* store, + const rgw::sal::Bucket* log_bucket, + const std::string& prefix, + const std::string& obj_name, + const std::string& tail_obj_name, + const rgw_pool& temp_data_pool, + optional_yield y); + +int list_pending_commit_objects(const DoutPrefixProvider* dpp, + rgw::sal::RadosStore* store, + const rgw::sal::Bucket* log_bucket, + const std::string& prefix, + std::set& entries, + optional_yield y); + +} + diff --git a/src/rgw/driver/rados/rgw_rados.cc b/src/rgw/driver/rados/rgw_rados.cc index 2a1b5d541ea..de1ed2c99c7 100644 --- a/src/rgw/driver/rados/rgw_rados.cc +++ b/src/rgw/driver/rados/rgw_rados.cc @@ -57,6 +57,7 @@ #include "rgw_etag_verifier.h" #include "rgw_worker.h" #include "rgw_notify.h" +#include "rgw_bl_rados.h" #include "rgw_http_errors.h" #include "rgw_multipart_meta_filter.h" @@ -1149,6 +1150,10 @@ void RGWRados::finalize() v1_topic_migration.stop(); } + if (run_bucket_logging_thread) { + rgw::bucketlogging::shutdown(); + } + if (use_restore_thread) { restore->stop_processor(); } @@ -1276,6 +1281,10 @@ int RGWRados::init_complete(const DoutPrefixProvider *dpp, optional_yield y, rgw if (ret < 0) return ret; + ret = open_logging_pool_ctx(dpp); + if (ret < 0) + return ret; + pools_initialized = true; if (use_gc) { @@ -1428,6 +1437,12 @@ int RGWRados::init_complete(const DoutPrefixProvider *dpp, optional_yield y, rgw index_completion_manager = new RGWIndexCompletionManager(this); + if (run_bucket_logging_thread) { + if (!rgw::bucketlogging::init(dpp, this->driver, *svc.site)) { + ldpp_dout(dpp, 0) << "ERROR: failed to initialize bucket logging manager" << dendl; + } + ldpp_dout(dpp, 0) << "INFO: initialized bucket logging manager" << dendl; + } if (run_notification_thread) { if (!rgw::notify::init(dpp, driver, *svc.site)) { ldpp_dout(dpp, 0) << "ERROR: failed to initialize notification manager" << dendl; @@ -1448,7 +1463,6 @@ int RGWRados::init_complete(const DoutPrefixProvider *dpp, optional_yield y, rgw v1_topic_migration.start(1); } } - return ret; } @@ -1552,6 +1566,11 @@ int RGWRados::open_notif_pool_ctx(const DoutPrefixProvider *dpp) return rgw_init_ioctx(dpp, get_rados_handle(), svc.zone->get_zone_params().notif_pool, notif_pool_ctx, true, true); } +int RGWRados::open_logging_pool_ctx(const DoutPrefixProvider *dpp) +{ + return rgw_init_ioctx(dpp, get_rados_handle(), svc.zone->get_zone_params().bucket_logging_pool, logging_pool_ctx, true, true); +} + int RGWRados::open_pool_ctx(const DoutPrefixProvider *dpp, const rgw_pool& pool, librados::IoCtx& io_ctx, bool mostly_omap, bool bulk) { diff --git a/src/rgw/driver/rados/rgw_rados.h b/src/rgw/driver/rados/rgw_rados.h index 38e2f3dcb7d..a1dea906724 100644 --- a/src/rgw/driver/rados/rgw_rados.h +++ b/src/rgw/driver/rados/rgw_rados.h @@ -341,6 +341,7 @@ class RGWRados int open_objexp_pool_ctx(const DoutPrefixProvider *dpp); int open_reshard_pool_ctx(const DoutPrefixProvider *dpp); int open_notif_pool_ctx(const DoutPrefixProvider *dpp); + int open_logging_pool_ctx(const DoutPrefixProvider *dpp); int open_pool_ctx(const DoutPrefixProvider *dpp, const rgw_pool& pool, librados::IoCtx& io_ctx, bool mostly_omap, bool bulk); @@ -361,6 +362,7 @@ class RGWRados bool run_sync_thread{false}; bool run_reshard_thread{false}; bool run_notification_thread{false}; + bool run_bucket_logging_thread{false}; RGWMetaNotifier* meta_notifier{nullptr}; RGWDataNotifier* data_notifier{nullptr}; @@ -436,6 +438,7 @@ protected: librados::IoCtx objexp_pool_ctx; librados::IoCtx reshard_pool_ctx; librados::IoCtx notif_pool_ctx; // .rgw.notif + librados::IoCtx logging_pool_ctx; // .rgw.logging bool pools_initialized{false}; @@ -523,6 +526,11 @@ public: return *this; } + RGWRados& set_run_bucket_logging_thread(bool _run_bucket_logging_thread) { + run_bucket_logging_thread = _run_bucket_logging_thread; + return *this; + } + librados::IoCtx* get_lc_pool_ctx() { return &lc_pool_ctx; } @@ -538,6 +546,10 @@ public: librados::IoCtx& get_notif_pool_ctx() { return notif_pool_ctx; } + + librados::IoCtx& get_logging_pool_ctx() { + return logging_pool_ctx; + } void set_context(CephContext *_cct) { cct = _cct; diff --git a/src/rgw/driver/rados/rgw_sal_rados.cc b/src/rgw/driver/rados/rgw_sal_rados.cc index b193a0a4c5c..5228f1f1cad 100644 --- a/src/rgw/driver/rados/rgw_sal_rados.cc +++ b/src/rgw/driver/rados/rgw_sal_rados.cc @@ -41,6 +41,7 @@ #include "rgw_aio_throttle.h" #include "rgw_bucket.h" #include "rgw_bucket_logging.h" +#include "rgw_bl_rados.h" #include "rgw_lc.h" #include "rgw_lc_tier.h" #include "rgw_lc_tier.h" @@ -1097,6 +1098,7 @@ int RadosBucket::remove_topics(RGWObjVersionTracker* objv_tracker, #define RGW_ATTR_COMMITTED_LOGGING_OBJ RGW_ATTR_PREFIX "committed-logging-obj" +#define RGW_ATTR_LOGGING_EC_POOL RGW_ATTR_PREFIX "logging-ec-pool" int get_committed_logging_object(RadosStore* store, const std::string& obj_name_oid, @@ -1147,6 +1149,88 @@ int set_committed_logging_object(RadosStore* store, return rgw_rados_operate(dpp, io_ctx, obj_name_oid, std::move(op), y); } +int get_logging_temp_object_pool (const DoutPrefixProvider *dpp, + RadosBucket* bucket, + RadosStore* rados_store, + const std::string& obj_name_oid, + rgw_pool& data_pool, bool& is_ec_pool, optional_yield y) { + + int ret; + rgw_obj obj {bucket->get_key(), "dummy"}; + if (!rados_store->getRados()->get_obj_data_pool(bucket->get_placement_rule(), obj, &data_pool)) { + ldpp_dout(dpp, 1) << "ERROR: failed to get data pool for bucket '" << bucket->get_key() << + "' when writing logging object" << dendl; + return -EIO; + } + + // Find out if the log bucket is on an EC pool + // Use an attribute on the logging name object instead of the bucket so + // it is not synced in a multi-site setup + + librados::Rados* rados = rados_store->getRados()->get_rados_handle(); + librados::IoCtx io_ctx; + if (ret = rgw_init_ioctx(dpp, rados, data_pool, io_ctx); ret < 0) { + ldpp_dout(dpp, 1) << "ERROR: get data pool ioctx for bucket '" << bucket->get_key() << + "' when writing logging object. ret = " << ret << dendl; + return ret; + } + if (!io_ctx.is_valid()) { + ldpp_dout(dpp, 1) << "ERROR: invalid data pool ioctx for bucket '" << bucket->get_key() << + "' when writing logging object" << dendl; + return ret; + } + + is_ec_pool = false; + bufferlist bl; + librados::ObjectReadOperation op; + int rval; + op.getxattr(RGW_ATTR_LOGGING_EC_POOL, &bl, &rval); + ret = rgw_rados_operate(dpp, io_ctx, obj_name_oid, std::move(op), nullptr, y); + if (ret == 0) { + try { + ceph::decode(is_ec_pool, bl); + } catch (buffer::error& err) { + ldpp_dout(dpp, 1) << "ERROR: failed to decode " << RGW_ATTR_LOGGING_EC_POOL + << " attr for bucket '" << bucket->get_key() + << "'. error = " << err.what() << dendl; + return -EINVAL; + } + } else if (ret < 0){ + // Try to find out if it is an EC pool + ret = rados->mon_command( + "{\"prefix\": \"osd pool get\", \"pool\": \"" + + data_pool.name + "\", \"var\": \"erasure_code_profile\"}", + {}, NULL, NULL); + if (ret == -EACCES) { + is_ec_pool = false; + } else if (ret == 0){ + is_ec_pool = true; + } else { + ldpp_dout(dpp, 10) << __func__ << " ERROR: failed to find out if pool" + << data_pool.name << " is erasure coded. ret = " + << ret << dendl; + return ret; + } + bufferlist bl; + ceph::encode(is_ec_pool, bl); + librados::ObjectWriteOperation op; + // if object does not exist, we should not create the attribute + op.assert_exists(); + op.setxattr(RGW_ATTR_LOGGING_EC_POOL, std::move(bl)); + if (const int ret = rgw_rados_operate(dpp, io_ctx, obj_name_oid, std::move(op), y); ret < 0){ + // Don't return an error as we know if it is an ec pool + ldpp_dout(dpp, 10) << "ERROR: failed to set is_ec_pool attr on :" << obj_name_oid + << " ret: " << ret << dendl; + } + } + // If the target log bucket is on an EC pool, the temp + // logging object will be created in the default.rgw.log pool + if (is_ec_pool) { + data_pool = rados_store->svc()->zone->get_zone_params().bucket_logging_pool; + } + return 0; +} + int RadosBucket::get_logging_object_name(std::string& obj_name, const std::string& prefix, optional_yield y, @@ -1243,14 +1327,14 @@ std::string to_temp_object_name(const rgw::sal::Bucket* bucket, const std::strin obj_name); } -int RadosBucket::remove_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp) { +int RadosBucket::remove_logging_object(const std::string& obj_name, const std::string& prefix, optional_yield y, const DoutPrefixProvider *dpp) { rgw_pool data_pool; - const rgw_obj head_obj{get_key(), obj_name}; - const auto placement_rule = get_placement_rule(); - - if (!store->getRados()->get_obj_data_pool(placement_rule, head_obj, &data_pool)) { - ldpp_dout(dpp, 1) << "ERROR: failed to get data pool for bucket '" << get_key() << - "' when deleting logging object" << dendl; + const auto obj_name_oid = bucketlogging::object_name_oid(this, prefix); + bool is_ec_pool; + auto ret = get_logging_temp_object_pool (dpp, this, store, obj_name_oid, data_pool, is_ec_pool, y); + if (ret < 0) { + ldpp_dout(dpp, 1) << "ERROR: failed to get temp data pool for bucket '" << get_key() << + "' when deleting logging object: " << obj_name << ". ret: " << ret << dendl; return -EIO; } @@ -1266,7 +1350,7 @@ int RadosBucket::commit_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp, const std::string& prefix, - std::string* last_committed) { + std::string* last_committed, bool async) { rgw_pool data_pool; const rgw_obj head_obj{get_key(), obj_name}; const auto placement_rule = get_placement_rule(); @@ -1276,9 +1360,10 @@ int RadosBucket::commit_logging_object(const std::string& obj_name, "' when committing logging object" << dendl; return -EIO; } + int ret; const auto obj_name_oid = bucketlogging::object_name_oid(this, prefix); if (last_committed) { - if (const int ret = get_committed_logging_object(store, + if (ret = get_committed_logging_object(store, obj_name_oid, data_pool, y, @@ -1296,11 +1381,34 @@ int RadosBucket::commit_logging_object(const std::string& obj_name, } const auto temp_obj_name = to_temp_object_name(this, obj_name); + rgw_pool temp_data_pool; + bool is_ec_pool = false; + + ret = get_logging_temp_object_pool (dpp, this, store, obj_name_oid, temp_data_pool, is_ec_pool, y); + if (ret < 0) { + ldpp_dout(dpp, 1) << "ERROR: failed to get data pool for bucket '" + << get_key() << "' when committing logging object: " + << obj_name << ". ret: " << ret << dendl; + return -EIO; + } + if (async) { + ret = rgw::bucketlogging::add_commit_target_entry(dpp, store, this, prefix, + obj_name, temp_obj_name, + temp_data_pool, y); + if (ret < 0){ + return ret; + } + + ldpp_dout(dpp, 20) << "INFO: added logging object entry " << obj_name + << " to commit list object." << dendl; + return 0; + } + std::map obj_attrs; ceph::real_time mtime; bufferlist bl_data; - if (auto ret = rgw_get_system_obj(store->svc()->sysobj, - data_pool, + if (ret = rgw_get_system_obj(store->svc()->sysobj, + temp_data_pool, temp_obj_name, bl_data, nullptr, @@ -1316,6 +1424,36 @@ int RadosBucket::commit_logging_object(const std::string& obj_name, } mtime = ceph::real_time::clock::now(); ldpp_dout(dpp, 20) << "INFO: temporary logging object '" << temp_obj_name << "' does not exist. committing it empty" << dendl; + } else if (is_ec_pool) { + if (ret = rgw_put_system_obj(dpp, store->svc()->sysobj, + data_pool, + temp_obj_name, + bl_data, + true, + nullptr, + mtime, + y, + &obj_attrs); ret < 0){ + + if (ret == -EEXIST) { + ldpp_dout(dpp, 5) << "WARNING: race detected in committing logging object '" << temp_obj_name << dendl; + } else { + ldpp_dout(dpp, 1) << "ERROR: failed to write logging data when committing object '" << temp_obj_name + << ". error: " << ret << dendl; + } + return ret; + } + ldpp_dout(dpp, 20) << "INFO: wrote logging data when committing object '" << temp_obj_name << dendl; + + if (ret = rgw_delete_system_obj(dpp, store->svc()->sysobj, + temp_data_pool, + temp_obj_name, + nullptr, + y); ret < 0 && ret != -ENOENT) { + ldpp_dout(dpp, 1) << "ERROR: failed to delete temp logging object when " + << "committing object '" << temp_obj_name + << ". error: " << ret << dendl; + } } uint64_t size = bl_data.length(); @@ -1424,22 +1562,29 @@ void bucket_logging_completion(rados_completion_t completion, void* args) { int RadosBucket::write_logging_object(const std::string& obj_name, const std::string& record, + const std::string& prefix, optional_yield y, const DoutPrefixProvider *dpp, bool async_completion) { const auto temp_obj_name = to_temp_object_name(this, obj_name); rgw_pool data_pool; - rgw_obj obj{get_key(), obj_name}; - if (!store->getRados()->get_obj_data_pool(get_placement_rule(), obj, &data_pool)) { - ldpp_dout(dpp, 1) << "ERROR: failed to get data pool for bucket '" << get_key() << - "' when writing logging object" << dendl; + + bool is_ec_pool = false; + const auto obj_name_oid = bucketlogging::object_name_oid(this, prefix); + int ret = get_logging_temp_object_pool (dpp, this, store, obj_name_oid, data_pool, is_ec_pool, y); + if (ret < 0) { + ldpp_dout(dpp, 1) << "ERROR: failed to get data pool for bucket '" + << get_key() << "' when writing logging object " + << temp_obj_name << ", ret: " << ret << dendl; return -EIO; } + librados::IoCtx io_ctx; - if (const auto ret = rgw_init_ioctx(dpp, store->getRados()->get_rados_handle(), data_pool, io_ctx); ret < 0) { + if (const auto ret = rgw_init_ioctx(dpp, store->getRados()->get_rados_handle(), data_pool, io_ctx, true); ret < 0) { ldpp_dout(dpp, 1) << "ERROR: failed to get IO context for logging object from data pool:" << data_pool.to_str() << dendl; return -EIO; } + bufferlist bl; bl.append(record); bl.append("\n"); diff --git a/src/rgw/driver/rados/rgw_sal_rados.h b/src/rgw/driver/rados/rgw_sal_rados.h index 5fa42b6c4d4..42f4331192e 100644 --- a/src/rgw/driver/rados/rgw_sal_rados.h +++ b/src/rgw/driver/rados/rgw_sal_rados.h @@ -804,9 +804,11 @@ class RadosBucket : public StoreBucket { optional_yield y, const DoutPrefixProvider *dpp, RGWObjVersionTracker* objv_tracker) override; - int commit_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp, const std::string& prefix, std::string* last_committed) override; - int remove_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp) override; - int write_logging_object(const std::string& obj_name, const std::string& record, optional_yield y, const DoutPrefixProvider *dpp, bool async_completion) override; + int commit_logging_object(const std::string& obj_name, optional_yield y, + const DoutPrefixProvider *dpp, const std::string& prefix, + std::string* last_committed, bool async) override; + int remove_logging_object(const std::string& obj_name, const std::string& prefix, optional_yield y, const DoutPrefixProvider *dpp) override; + int write_logging_object(const std::string& obj_name, const std::string& record, const std::string& prefix, optional_yield y, const DoutPrefixProvider *dpp, bool async_completion) override; private: int link(const DoutPrefixProvider* dpp, const rgw_owner& new_owner, optional_yield y, bool update_entrypoint = true, RGWObjVersionTracker* objv = nullptr); diff --git a/src/rgw/radosgw-admin/radosgw-admin.cc b/src/rgw/radosgw-admin/radosgw-admin.cc index 769b7cc5e26..95ee9adcf2b 100644 --- a/src/rgw/radosgw-admin/radosgw-admin.cc +++ b/src/rgw/radosgw-admin/radosgw-admin.cc @@ -91,6 +91,7 @@ extern "C" { #include "driver/rados/rgw_bucket.h" #include "driver/rados/rgw_sal_rados.h" +#include "driver/rados/rgw_bl_rados.h" #include @@ -192,6 +193,7 @@ void usage() cout << " bucket radoslist list rados objects backing bucket's objects\n"; cout << " bucket logging flush flush pending log records object of source bucket to the log bucket\n"; cout << " bucket logging info get info on bucket logging configuration on source bucket or list of sources in log bucket\n"; + cout << " bucket logging list list the log objects pending commit for the source bucket\n"; cout << " bi get retrieve bucket index object entries\n"; cout << " bi put store bucket index object entries\n"; cout << " bi list list raw bucket index entries\n"; @@ -738,6 +740,7 @@ enum class OPT { BUCKET_RESYNC_ENCRYPTED_MULTIPART, BUCKET_LOGGING_FLUSH, BUCKET_LOGGING_INFO, + BUCKET_LOGGING_LIST, POLICY, LOG_LIST, LOG_SHOW, @@ -990,6 +993,7 @@ static SimpleCmd::Commands all_cmds = { { "bucket resync encrypted multipart", OPT::BUCKET_RESYNC_ENCRYPTED_MULTIPART }, { "bucket logging flush", OPT::BUCKET_LOGGING_FLUSH }, { "bucket logging info", OPT::BUCKET_LOGGING_INFO }, + { "bucket logging list", OPT::BUCKET_LOGGING_LIST }, { "policy", OPT::POLICY }, { "log list", OPT::LOG_LIST }, { "log show", OPT::LOG_SHOW }, @@ -4703,6 +4707,7 @@ int main(int argc, const char **argv) false, false, false, + false, false, // No background tasks! null_yield, cfgstore.get(), @@ -7900,7 +7905,7 @@ int main(int argc, const char **argv) } std::string old_obj; const auto region = driver->get_zone()->get_zonegroup().get_api_name(); - ret = rgw::bucketlogging::rollover_logging_object(configuration, target_bucket, obj_name, dpp(), region, bucket, null_yield, true, &objv_tracker, &old_obj); + ret = rgw::bucketlogging::rollover_logging_object(configuration, target_bucket, obj_name, dpp(), region, bucket, null_yield, true, &objv_tracker, false, &old_obj); if (ret < 0) { cerr << "ERROR: failed to flush pending logging object '" << obj_name << "' to target bucket '" << configuration.target_bucket << "'. error: " << cpp_strerror(-ret) << std::endl; @@ -7952,6 +7957,55 @@ int main(int argc, const char **argv) return 0; } + if (opt_cmd == OPT::BUCKET_LOGGING_LIST) { + if (bucket_name.empty()) { + cerr << "ERROR: bucket not specified" << std::endl; + return EINVAL; + } + if (driver->get_name() != "rados") { + cerr << "ERROR: this command is only available with the RADOS driver." << std::endl; + return EINVAL; + } + + int ret = init_bucket(tenant, bucket_name, bucket_id, &bucket); + if (ret < 0) { + return -ret; + } + + rgw::bucketlogging::configuration configuration; + std::unique_ptr target_bucket; + ret = rgw::bucketlogging::get_target_and_conf_from_source(dpp(), + driver, bucket.get(), tenant, configuration, target_bucket, null_yield); + if (ret < 0 && ret != -ENODATA) { + cerr << "ERROR: failed to get target bucket and logging conf from source bucket '" + << bucket_name << "': " << cpp_strerror(-ret) << std::endl; + return -ret; + } else if (ret == -ENODATA) { + cerr << "ERROR: bucket '" << bucket_name << "' does not have logging enabled" << std::endl; + return 0; + } + std::string target_prefix = configuration.target_prefix; + std::set entries; + + ret = rgw::bucketlogging::list_pending_commit_objects(dpp(), + static_cast(driver), target_bucket.get(), + target_prefix, entries, null_yield); + + if (ret < 0) { + cerr << "ERROR: failed to get pending log entries for bucket '" << bucket_name + << "': " << cpp_strerror(-ret) << std::endl; + return ret; + } + + formatter->open_array_section("pending_logs"); + for (auto &entry: entries) { + formatter->dump_string("log", entry); + } + formatter->close_section(); // objs + formatter->flush(cout); + return 0; + } + if (opt_cmd == OPT::LOG_LIST) { // filter by date? if (date.size() && date.size() != 10) { diff --git a/src/rgw/rgw_appmain.cc b/src/rgw/rgw_appmain.cc index 3ccd2c18a04..a5926cf9024 100644 --- a/src/rgw/rgw_appmain.cc +++ b/src/rgw/rgw_appmain.cc @@ -258,6 +258,7 @@ int rgw::AppMain::init_storage() run_sync, g_conf().get_val("rgw_dynamic_resharding"), true, // run notification thread + true, // run bucket-logging thread true, null_yield, env.cfgstore, g_conf()->rgw_cache_enabled); if (!env.driver) { diff --git a/src/rgw/rgw_bucket_logging.cc b/src/rgw/rgw_bucket_logging.cc index eedc822e9b1..bfb76a46c77 100644 --- a/src/rgw/rgw_bucket_logging.cc +++ b/src/rgw/rgw_bucket_logging.cc @@ -334,7 +334,7 @@ int commit_logging_object(const configuration& conf, target_bucket->get_key() << "'. ret = " << ret << dendl; return ret; } - if (const int ret = target_bucket->commit_logging_object(obj_name, y, dpp, conf.target_prefix, last_committed); ret < 0) { + if (const int ret = target_bucket->commit_logging_object(obj_name, y, dpp, conf.target_prefix, last_committed, false); ret < 0) { ldpp_dout(dpp, 1) << "ERROR: failed to commit logging object '" << obj_name << "' of logging bucket '" << target_bucket->get_key() << "'. ret = " << ret << dendl; return ret; @@ -351,6 +351,7 @@ int rollover_logging_object(const configuration& conf, optional_yield y, bool must_commit, RGWObjVersionTracker* objv_tracker, + bool async, std::string* last_committed, std::string* err_message) { std::string target_bucket_name; @@ -393,7 +394,7 @@ int rollover_logging_object(const configuration& conf, return ret; } } - if (const int ret = target_bucket->commit_logging_object(*old_obj, y, dpp, conf.target_prefix, last_committed); ret < 0) { + if (const int ret = target_bucket->commit_logging_object(*old_obj, y, dpp, conf.target_prefix, last_committed, async); ret < 0) { if (must_commit) { if (err_message) { *err_message = fmt::format("Failed to commit logging object of logging bucket '{}'", target_bucket->get_name()); @@ -516,7 +517,7 @@ int log_record(rgw::sal::Driver* driver, if (ceph::coarse_real_time::clock::now() > time_to_commit) { ldpp_dout(dpp, 20) << "INFO: logging object '" << obj_name << "' exceeded its time, will be committed to logging bucket '" << target_bucket_id << "'" << dendl; - if (ret = rollover_logging_object(conf, target_bucket, obj_name, dpp, region, s->bucket, y, false, &objv_tracker, nullptr, &err_message); ret < 0 && ret != -ECANCELED) { + if (ret = rollover_logging_object(conf, target_bucket, obj_name, dpp, region, s->bucket, y, false, &objv_tracker, true, nullptr, &err_message); ret < 0 && ret != -ECANCELED) { set_journal_err(err_message); return ret; } @@ -656,6 +657,7 @@ int log_record(rgw::sal::Driver* driver, if (ret = target_bucket->write_logging_object(obj_name, record, + conf.target_prefix, y, dpp, async_completion); ret < 0 && ret != -EFBIG) { @@ -667,12 +669,13 @@ int log_record(rgw::sal::Driver* driver, if (ret == -EFBIG) { ldpp_dout(dpp, 5) << "WARNING: logging object '" << obj_name << "' is full, will be committed to logging bucket '" << target_bucket->get_key() << "'" << dendl; - if (ret = rollover_logging_object(conf, target_bucket, obj_name, dpp, region, s->bucket, y, true, &objv_tracker, nullptr, &err_message); ret < 0 && ret != -ECANCELED) { + if (ret = rollover_logging_object(conf, target_bucket, obj_name, dpp, region, s->bucket, y, true, &objv_tracker, true, nullptr, &err_message); ret < 0 && ret != -ECANCELED) { set_journal_err(err_message); return ret; } if (ret = target_bucket->write_logging_object(obj_name, record, + conf.target_prefix, y, dpp, async_completion); ret < 0) { @@ -849,7 +852,7 @@ int bucket_deletion_cleanup(const DoutPrefixProvider* dpp, } ldpp_dout(dpp, 20) << "INFO: successfully deleted object holding bucket logging object name from deleted logging bucket '" << bucket->get_key() << "'" << dendl; - if (const int ret = bucket->remove_logging_object(obj_name, y, dpp); ret < 0) { + if (const int ret = bucket->remove_logging_object(obj_name, conf.target_prefix, y, dpp); ret < 0) { ldpp_dout(dpp, 5) << "WARNING: failed to delete pending logging object '" << obj_name << "' for logging bucket '" << bucket->get_key() << "' during cleanup. ret = " << ret << dendl; continue; @@ -948,7 +951,7 @@ int source_bucket_cleanup(const DoutPrefixProvider* dpp, ldpp_dout(dpp, 20) << "INFO: successfully deleted object holding bucket logging object name for bucket '" << bucket->get_key() << "' during source cleanup" << dendl; // since the object holding the name was deleted, we cannot fetch the last commited name from it - if (const int ret = target_bucket->commit_logging_object(obj_name, y, dpp, conf->target_prefix, nullptr); ret < 0) { + if (const int ret = target_bucket->commit_logging_object(obj_name, y, dpp, conf->target_prefix, nullptr, false); ret < 0) { ldpp_dout(dpp, 5) << "WARNING: could not commit pending logging object of bucket '" << bucket->get_key() << "' during source cleanup. ret = " << ret << dendl; return 0; diff --git a/src/rgw/rgw_bucket_logging.h b/src/rgw/rgw_bucket_logging.h index 1238ef4f359..6d5b70bc9c9 100644 --- a/src/rgw/rgw_bucket_logging.h +++ b/src/rgw/rgw_bucket_logging.h @@ -185,6 +185,7 @@ int rollover_logging_object(const configuration& conf, optional_yield y, bool must_commit, RGWObjVersionTracker* objv_tracker, + bool async, std::string* last_committed, std::string* err_message = nullptr); diff --git a/src/rgw/rgw_object_expirer.cc b/src/rgw/rgw_object_expirer.cc index 02a5ca85e96..be2546d61ec 100644 --- a/src/rgw/rgw_object_expirer.cc +++ b/src/rgw/rgw_object_expirer.cc @@ -104,7 +104,7 @@ int main(const int argc, const char **argv) exit(1); } - driver = DriverManager::get_storage(&dp, g_ceph_context, cfg, context_pool, site, false, false, false, false, false, false, false, true, null_yield, cfgstore.get()); + driver = DriverManager::get_storage(&dp, g_ceph_context, cfg, context_pool, site, false, false, false, false, false, false, false, false, true, null_yield, cfgstore.get()); if (!driver) { std::cerr << "couldn't init storage provider" << std::endl; return EIO; diff --git a/src/rgw/rgw_realm_reloader.cc b/src/rgw/rgw_realm_reloader.cc index 1f206a1ca8e..d8c034fc571 100644 --- a/src/rgw/rgw_realm_reloader.cc +++ b/src/rgw/rgw_realm_reloader.cc @@ -128,7 +128,10 @@ void RGWRealmReloader::reload() cct->_conf->rgw_enable_quota_threads, cct->_conf->rgw_run_sync_thread, cct->_conf.get_val("rgw_dynamic_resharding"), - true, true, null_yield, env.cfgstore, // run notification thread + true, + true, // run notification thread + true, // run bucket logging thread + null_yield, env.cfgstore, cct->_conf->rgw_cache_enabled); } diff --git a/src/rgw/rgw_rest_bucket_logging.cc b/src/rgw/rgw_rest_bucket_logging.cc index ba0374a94fe..9aa03204c3c 100644 --- a/src/rgw/rgw_rest_bucket_logging.cc +++ b/src/rgw/rgw_rest_bucket_logging.cc @@ -327,6 +327,7 @@ class RGWPutBucketLoggingOp : public RGWDefaultResponseOp { y, false, // rollover should happen even if commit failed &objv_tracker, + false, &old_obj); ret < 0) { ldpp_dout(this, 1) << "WARNING: failed to flush pending logging object '" << obj_name << "'" << " to target bucket '" << target_bucket_id << "'. " @@ -435,7 +436,7 @@ class RGWPostBucketLoggingOp : public RGWDefaultResponseOp { ldpp_dout(this, 5) << "INFO: no pending logging object in logging bucket '" << target_bucket_id << "'. new object should be created" << dendl; } const auto region = driver->get_zone()->get_zonegroup().get_api_name(); - op_ret = rgw::bucketlogging::rollover_logging_object(configuration, target_bucket, obj_name, this, region, source_bucket, y, true, &objv_tracker, &old_obj); + op_ret = rgw::bucketlogging::rollover_logging_object(configuration, target_bucket, obj_name, this, region, source_bucket, y, true, &objv_tracker, false, &old_obj); if (op_ret < 0) { ldpp_dout(this, 1) << "ERROR: failed to flush pending logging object '" << obj_name << "'" << " to logging bucket '" << target_bucket_id << "'. " diff --git a/src/rgw/rgw_sal.cc b/src/rgw/rgw_sal.cc index 8670d37d645..7fe620955c1 100644 --- a/src/rgw/rgw_sal.cc +++ b/src/rgw/rgw_sal.cc @@ -106,7 +106,8 @@ rgw::sal::Driver* DriverManager::init_storage_provider(const DoutPrefixProvider* bool quota_threads, bool run_sync_thread, bool run_reshard_thread, - bool run_notification_thread, + bool run_notification_thread, + bool run_bucket_logging_thread, bool use_cache, bool use_gc, bool background_tasks, @@ -129,6 +130,7 @@ rgw::sal::Driver* DriverManager::init_storage_provider(const DoutPrefixProvider* .set_run_sync_thread(run_sync_thread) .set_run_reshard_thread(run_reshard_thread) .set_run_notification_thread(run_notification_thread) + .set_run_bucket_logging_thread(run_bucket_logging_thread) .init_begin(cct, dpp, background_tasks, site_config, cfgstore) < 0) { delete driver; return nullptr; @@ -163,6 +165,7 @@ rgw::sal::Driver* DriverManager::init_storage_provider(const DoutPrefixProvider* .set_run_sync_thread(run_sync_thread) .set_run_reshard_thread(run_reshard_thread) .set_run_notification_thread(run_notification_thread) + .set_run_bucket_logging_thread(run_bucket_logging_thread) .init_begin(cct, dpp, background_tasks, site_config, cfgstore) < 0) { delete driver; return nullptr; diff --git a/src/rgw/rgw_sal.h b/src/rgw/rgw_sal.h index 2d820af5dea..9aff21eada5 100644 --- a/src/rgw/rgw_sal.h +++ b/src/rgw/rgw_sal.h @@ -1050,12 +1050,15 @@ class Bucket { RGWObjVersionTracker* objv_tracker) = 0; /** Move the pending bucket logging object into the bucket if "last_committed" is not null, it will be set to the name of the last committed object + if async is true, write the entry to the commit lists to be processed by the BucketLoggingManager * */ - virtual int commit_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp, const std::string& prefix, std::string* last_committed) = 0; + virtual int commit_logging_object(const std::string& obj_name, optional_yield y, + const DoutPrefixProvider *dpp, const std::string& prefix, + std::string* last_committed, bool async) = 0; //** Remove the pending bucket logging object */ - virtual int remove_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp) = 0; + virtual int remove_logging_object(const std::string& obj_name, const std::string& prefix, optional_yield y, const DoutPrefixProvider *dpp) = 0; /** Write a record to the pending bucket logging object */ - virtual int write_logging_object(const std::string& obj_name, const std::string& record, optional_yield y, const DoutPrefixProvider *dpp, bool async_completion) = 0; + virtual int write_logging_object(const std::string& obj_name, const std::string& record, const std::string& prefix, optional_yield y, const DoutPrefixProvider *dpp, bool async_completion) = 0; /* dang - This is temporary, until the API is completed */ virtual rgw_bucket& get_key() = 0; @@ -1946,6 +1949,7 @@ public: bool run_sync_thread, bool run_reshard_thread, bool run_notification_thread, + bool run_bucket_logging_thread, bool background_tasks, optional_yield y, rgw::sal::ConfigStore* cfgstore, @@ -1961,6 +1965,7 @@ public: run_sync_thread, run_reshard_thread, run_notification_thread, + run_bucket_logging_thread, use_cache, use_gc, background_tasks, y, cfgstore, admin); return driver; @@ -1989,7 +1994,8 @@ public: bool quota_threads, bool run_sync_thread, bool run_reshard_thread, - bool run_notification_thread, + bool run_notification_thread, + bool run_bucket_logging_thread, bool use_metadata_cache, bool use_gc, bool background_tasks, optional_yield y, rgw::sal::ConfigStore* cfgstore, bool admin); diff --git a/src/rgw/rgw_sal_filter.h b/src/rgw/rgw_sal_filter.h index c3f628baded..a7b7e4b9642 100644 --- a/src/rgw/rgw_sal_filter.h +++ b/src/rgw/rgw_sal_filter.h @@ -697,14 +697,14 @@ public: RGWObjVersionTracker* objv_tracker) override { return next->remove_logging_object_name(prefix, y, dpp, objv_tracker); } - int commit_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp, const std::string& prefix, std::string* last_committed) override { - return next->commit_logging_object(obj_name, y, dpp, prefix, last_committed); + int commit_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp, const std::string& prefix, std::string* last_committed, bool async) override { + return next->commit_logging_object(obj_name, y, dpp, prefix, last_committed, async); } - int remove_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp) override { - return next->remove_logging_object(obj_name, y, dpp); + int remove_logging_object(const std::string& obj_name, const std::string& prefix, optional_yield y, const DoutPrefixProvider *dpp) override { + return next->remove_logging_object(obj_name, prefix, y, dpp); } - int write_logging_object(const std::string& obj_name, const std::string& record, optional_yield y, const DoutPrefixProvider *dpp, bool async_completion) override { - return next->write_logging_object(obj_name, record, y, dpp, async_completion); + int write_logging_object(const std::string& obj_name, const std::string& record, const std::string& prefix, optional_yield y, const DoutPrefixProvider *dpp, bool async_completion) override { + return next->write_logging_object(obj_name, record, prefix, y, dpp, async_completion); } virtual rgw_bucket& get_key() override { return next->get_key(); } diff --git a/src/rgw/rgw_sal_store.h b/src/rgw/rgw_sal_store.h index d64403d7f1d..0f2f91f5fef 100644 --- a/src/rgw/rgw_sal_store.h +++ b/src/rgw/rgw_sal_store.h @@ -267,9 +267,9 @@ class StoreBucket : public Bucket { optional_yield y, const DoutPrefixProvider *dpp, RGWObjVersionTracker* objv_tracker) override { return 0; } - int commit_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp, const std::string& prefix, std::string* last_committed) override { return 0; } - int remove_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp) override { return 0; } - int write_logging_object(const std::string& obj_name, const std::string& record, optional_yield y, const DoutPrefixProvider *dpp, bool async_completion) override { + int commit_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp, const std::string& prefix, std::string* last_committed, bool async) override { return 0; } + int remove_logging_object(const std::string& obj_name, const std::string& prefix, optional_yield y, const DoutPrefixProvider *dpp) override { return 0; } + int write_logging_object(const std::string& obj_name, const std::string& record, const std::string& prefix, optional_yield y, const DoutPrefixProvider *dpp, bool async_completion) override { return 0; } diff --git a/src/rgw/rgw_zone.cc b/src/rgw/rgw_zone.cc index 9c5cb513939..ad08e4b69be 100644 --- a/src/rgw/rgw_zone.cc +++ b/src/rgw/rgw_zone.cc @@ -166,6 +166,7 @@ void RGWZoneParams::decode_json(JSONObj *obj) JSONDecoder::decode_json("topics_pool", topics_pool, obj); JSONDecoder::decode_json("account_pool", account_pool, obj); JSONDecoder::decode_json("group_pool", group_pool, obj); + JSONDecoder::decode_json("bucket_logging_pool", bucket_logging_pool, obj); JSONDecoder::decode_json("system_key", system_key, obj); JSONDecoder::decode_json("placement_pools", placement_pools, obj); JSONDecoder::decode_json("tier_config", tier_config, obj); @@ -196,6 +197,7 @@ void RGWZoneParams::dump(Formatter *f) const encode_json("topics_pool", topics_pool, f); encode_json("account_pool", account_pool, f); encode_json("group_pool", group_pool, f); + encode_json("bucket_logging_pool", bucket_logging_pool, f); encode_json_plain("system_key", system_key, f); encode_json("placement_pools", placement_pools, f); encode_json("tier_config", tier_config, f); @@ -243,6 +245,7 @@ void add_zone_pools(const RGWZoneParams& info, pools.insert(info.dedup_pool); pools.insert(info.gc_pool); pools.insert(info.log_pool); + pools.insert(info.bucket_logging_pool); pools.insert(info.intent_log_pool); pools.insert(info.usage_log_pool); pools.insert(info.user_keys_pool); @@ -845,6 +848,7 @@ int init_zone_pool_names(const DoutPrefixProvider *dpp, optional_yield y, info.otp_pool = fix_zone_pool_dup(pools, info.name, ".rgw.otp", info.otp_pool); info.oidc_pool = fix_zone_pool_dup(pools, info.name, ".rgw.meta:oidc", info.oidc_pool); info.notif_pool = fix_zone_pool_dup(pools, info.name, ".rgw.log:notif", info.notif_pool); + info.bucket_logging_pool = fix_zone_pool_dup(pools, info.name, ".rgw.log:logging", info.bucket_logging_pool); info.topics_pool = fix_zone_pool_dup(pools, info.name, ".rgw.meta:topics", info.topics_pool); info.account_pool = fix_zone_pool_dup(pools, info.name, ".rgw.meta:accounts", info.account_pool); diff --git a/src/rgw/rgw_zone.h b/src/rgw/rgw_zone.h index 3095b921f1d..27e0919c924 100644 --- a/src/rgw/rgw_zone.h +++ b/src/rgw/rgw_zone.h @@ -33,6 +33,7 @@ struct RGWZoneParams { rgw_pool account_pool; rgw_pool group_pool; rgw_pool dedup_pool; + rgw_pool bucket_logging_pool; RGWAccessKey system_key; @@ -62,7 +63,7 @@ struct RGWZoneParams { const std::string& get_compression_type(const rgw_placement_rule& placement_rule) const; void encode(bufferlist& bl) const { - ENCODE_START(17, 1, bl); + ENCODE_START(18, 1, bl); encode(domain_root, bl); encode(control_pool, bl); encode(gc_pool, bl); @@ -100,11 +101,12 @@ struct RGWZoneParams { encode(group_pool, bl); encode(restore_pool, bl); encode(dedup_pool, bl); + encode(bucket_logging_pool, bl); ENCODE_FINISH(bl); } void decode(bufferlist::const_iterator& bl) { - DECODE_START(17, bl); + DECODE_START(18, bl); decode(domain_root, bl); decode(control_pool, bl); decode(gc_pool, bl); @@ -199,6 +201,11 @@ struct RGWZoneParams { } else { dedup_pool = name + ".rgw.dedup"; } + if (struct_v >= 18) { + decode(bucket_logging_pool, bl); + } else { + bucket_logging_pool = log_pool.name + ":logging"; + } DECODE_FINISH(bl); } void dump(Formatter *f) const; diff --git a/src/test/cli/radosgw-admin/help.t b/src/test/cli/radosgw-admin/help.t index 998b99860ff..12448ec219a 100644 --- a/src/test/cli/radosgw-admin/help.t +++ b/src/test/cli/radosgw-admin/help.t @@ -53,6 +53,7 @@ bucket radoslist list rados objects backing bucket's objects bucket logging flush flush pending log records object of source bucket to the log bucket bucket logging info get info on bucket logging configuration on source bucket or list of sources in log bucket + bucket logging list list the log objects pending commit for the source bucket bi get retrieve bucket index object entries bi put store bucket index object entries bi list list raw bucket index entries diff --git a/src/test/rgw/rgw_cr_test.cc b/src/test/rgw/rgw_cr_test.cc index 2fa0a536f20..ae9cd683f7f 100644 --- a/src/test/rgw/rgw_cr_test.cc +++ b/src/test/rgw/rgw_cr_test.cc @@ -351,7 +351,7 @@ int main(int argc, const char **argv) false, false, false, - true, true, null_yield, cfgstore.get(), + true, true, true, null_yield, cfgstore.get(), false)); if (!store) { std::cerr << "couldn't init storage provider" << std::endl; -- 2.47.3