Log buckets can now be created within erasure-coded (EC) pools.
To support append operations, a temporary log record object is initially
created in the replicated default.rgw.log pool. This object is then copied
to the EC pool upon log record commitment.
All implicit log commit operations will execute asynchronously. A new
BucketLoggingManager class is responsible for processing these pending
commits at set intervals. Explicit commit operations, however, will
continue to be performed synchronously.
Fixes: https://tracker.ceph.com/issues/71365
Signed-off-by: Nithya Balachandran <nithya.balachandran@ibm.com>
+* RGW: Bucket Logging suppports creating log buckets in EC pools.
+ Implicit logging object commits are now performed asynchronously.
* RGW: OpenSSL engine support is deprecated in favor of provider support.
- Removed the `openssl_engine_opts` configuration option. OpenSSL engine configuration in string format is no longer supported.
- Added the `openssl_conf` configuration option for loading specified providers as default providers.
+--------------------------+-----------+--------------+
| ``rgw notification`` | 1 | 5 |
+--------------------------+-----------+--------------+
+| ``rgw bucket logging`` | 1 | 5 |
++--------------------------+-----------+--------------+
| ``javaclient`` | 1 | 5 |
+--------------------------+-----------+--------------+
| ``asok`` | 1 | 5 |
object. Flushing will happen automatically when logging is disabled on a
bucket, or its logging configuration is changed, or the bucket is deleted.
+The process of adding a new log object to the log bucket is asynchronous when
+triggered by a log record being written. Consequently, the operation that
+generated the log is completed immediately and does not wait for the log object
+addition to finish; this addition occurs later.
+The log object is added to the log bucket immediately when flushed. Consequently,
+this action may result in temporary gaps in the log records within the bucket
+until any pending log objects are also added.
+
+To check which log objects for a source bucket are currently pending addition to
+the log bucket, execute the following command:
+
+.. prompt:: bash #
+
+ radosgw-admin bucket logging list --bucket <source bucket>
+
+
Standard
````````
If the logging type is set to "Standard" (the default) the log records are
SUBSYS(rgw_lifecycle, 1, 5)
SUBSYS(rgw_restore, 1, 5)
SUBSYS(rgw_notification, 1, 5)
+SUBSYS(rgw_bucket_logging, 1, 5)
SUBSYS(javaclient, 1, 5)
SUBSYS(asok, 1, 5)
SUBSYS(throttle, 1, 1)
driver/rados/group.cc
driver/rados/groups.cc
driver/rados/rgw_bucket.cc
+ driver/rados/rgw_bl_rados.cc
driver/rados/rgw_cr_rados.cc
driver/rados/rgw_cr_tools.cc
driver/rados/rgw_d3n_datacache.cc
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "rgw_bucket_logging.h"
+#include "rgw_bl_rados.h"
+#include <chrono>
+#include <fmt/format.h>
+#include <future>
+#include <memory>
+#include <thread>
+#include <boost/algorithm/hex.hpp>
+#include <boost/asio/basic_waitable_timer.hpp>
+#include <boost/asio/executor_work_guard.hpp>
+#include <boost/asio/io_context.hpp>
+#include <boost/asio/spawn.hpp>
+#include <boost/context/protected_fixedsize_stack.hpp>
+#include "common/async/yield_context.h"
+#include "common/async/yield_waiter.h"
+#include "common/ceph_time.h"
+#include "common/dout.h"
+#include "cls/lock/cls_lock_client.h"
+#include "include/common_fwd.h"
+#include "include/function2.hpp"
+#include "librados/AioCompletionImpl.h"
+#include "services/svc_zone.h"
+#include "rgw_common.h"
+#include "rgw_perf_counters.h"
+#include "rgw_sal_rados.h"
+#include "rgw_zone_features.h"
+
+
+#define dout_subsys ceph_subsys_rgw_bucket_logging
+
+namespace rgw::bucketlogging {
+
+using commit_list_t = std::set<std::string>;
+
+// use mmap/mprotect to allocate 128k coroutine stacks
+auto make_stack_allocator() {
+ return boost::context::protected_fixedsize_stack{128*1024};
+}
+
+const std::string COMMIT_LIST_OBJECT_NAME = "bucket_logging_global_commit_list";
+static const std::string TEMP_POOL_ATTR = "temp_logging_pool";
+
+static std::string get_commit_list_name (const rgw::sal::Bucket* log_bucket,
+ const std::string& prefix) {
+ return fmt::format("{}/{}/{}/{}", log_bucket->get_tenant(),
+ log_bucket->get_name(),
+ log_bucket->get_bucket_id(), prefix);
+}
+
+int add_commit_target_entry(const DoutPrefixProvider* dpp,
+ rgw::sal::RadosStore* store,
+ const rgw::sal::Bucket* log_bucket,
+ const std::string& prefix,
+ const std::string& obj_name,
+ const std::string& tail_obj_name,
+ const rgw_pool& temp_data_pool,
+ optional_yield y) {
+
+ auto& ioctx_logging = store->getRados()->get_logging_pool_ctx();
+ std::string target_obj_name = get_commit_list_name (log_bucket, prefix);
+ {
+ librados::ObjectWriteOperation op;
+ op.create(false);
+
+ bufferlist bl;
+ bl.append(tail_obj_name);
+ std::map<std::string, bufferlist> new_commit_list{{obj_name, bl}};
+ op.omap_set(new_commit_list);
+
+ bufferlist pool_bl;
+ encode(temp_data_pool, pool_bl);
+ op.setxattr(TEMP_POOL_ATTR.c_str(), pool_bl);
+
+ auto ret = rgw_rados_operate(dpp, ioctx_logging, target_obj_name, std::move(op), y);
+ if (ret < 0){
+ ldpp_dout(dpp, 1) << "ERROR: failed to add logging object entry " << obj_name
+ << " to commit list object:" << target_obj_name
+ << ". ret = " << ret << dendl;
+ return ret;
+ }
+ }
+
+ bufferlist empty_bl;
+ std::map<std::string, bufferlist> new_commit_list{{target_obj_name, empty_bl}};
+
+ librados::ObjectWriteOperation op;
+ op.create(false);
+ op.omap_set(new_commit_list);
+
+ auto ret = rgw_rados_operate(dpp, ioctx_logging, COMMIT_LIST_OBJECT_NAME,
+ std::move(op), y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1) << "ERROR: failed to add entry " << target_obj_name
+ << " to global bucket logging list object: "
+ << COMMIT_LIST_OBJECT_NAME << ". ret = " << ret << dendl;
+ return ret;
+ }
+ ldpp_dout(dpp, 20) << "INFO: added entry " << target_obj_name
+ << " to global bucket logging object: "
+ << COMMIT_LIST_OBJECT_NAME << dendl;
+ return 0;
+}
+
+int list_pending_commit_objects(const DoutPrefixProvider* dpp,
+ rgw::sal::RadosStore* store,
+ const rgw::sal::Bucket* log_bucket,
+ const std::string& prefix,
+ std::set<std::string>& entries,
+ optional_yield y) {
+
+ auto& ioctx_logging = store->getRados()->get_logging_pool_ctx();
+ std::string target_obj_name = get_commit_list_name (log_bucket, prefix);
+
+ entries.clear();
+
+ constexpr auto max_chunk = 1024U;
+ std::string start_after;
+ bool more = true;
+ int rval;
+ while (more) {
+ librados::ObjectReadOperation op;
+ std::set <std::string> entries_chunk;
+ op.omap_get_keys2(start_after, max_chunk, &entries_chunk, &more, &rval);
+ const auto ret = rgw_rados_operate(dpp, ioctx_logging, target_obj_name,
+ std::move(op), nullptr, y);
+ if (ret == -ENOENT) {
+ // log commit list object was not created - nothing to do
+ return 0;
+ }
+ if (ret < 0) {
+ // TODO: do we need to check on rval as well as ret?
+ ldpp_dout(dpp, 1) << "ERROR: failed to read logging commit list "
+ << target_obj_name << " ret: " << ret << dendl;
+ return ret;
+ }
+ entries.merge(entries_chunk);
+ if (more) {
+ start_after = *entries.rbegin();
+ }
+ }
+ return 0;
+}
+
+class BucketLoggingManager : public DoutPrefixProvider {
+ using Executor = boost::asio::io_context::executor_type;
+ bool m_shutdown = false;
+ static constexpr auto m_log_commits_update_period = std::chrono::milliseconds(10000); // 10s
+ static constexpr auto m_log_commits_update_retry = std::chrono::milliseconds(1000); // 1s
+ static constexpr auto m_log_commits_idle_sleep = std::chrono::milliseconds(5000); // 5s
+ const utime_t m_failover_time = utime_t(m_log_commits_update_period*9); // 90s
+ CephContext* const m_cct;
+ static constexpr auto COOKIE_LEN = 16;
+ const std::string m_lock_cookie;
+ const std::string m_lock_name = "bl_mgr_lock";
+ boost::asio::io_context m_io_context;
+ boost::asio::executor_work_guard<Executor> m_work_guard;
+ std::thread m_worker;
+ const SiteConfig& m_site;
+ rgw::sal::RadosStore* const m_rados_store;
+
+private:
+
+ CephContext *get_cct() const override { return m_cct; }
+
+ unsigned get_subsys() const override { return dout_subsys; }
+
+ std::ostream& gen_prefix(std::ostream& out) const override {
+ return out << "rgw bucket_logging: ";
+ }
+
+ int parse_list_name(std::string list_obj_name, std::string &tenant_name,
+ std::string &bucket_name, std::string &bucket_id,
+ std::string &prefix) {
+ // <tenant_name>/<bucket_name>/<bucket_id>/<prefix>
+ size_t pstart = 0, pend = 0;
+ pend =list_obj_name.find('/');
+ if (pend == std::string::npos) {
+ return -EINVAL;
+ }
+ tenant_name = list_obj_name.substr(pstart, pend);
+
+ pstart = pend + 1;
+ pend =list_obj_name.find('/', pstart);
+ if (pend == std::string::npos) {
+ return -EINVAL;
+ }
+ bucket_name = list_obj_name.substr(pstart, pend - pstart);
+ if (bucket_name.empty()) {
+ return -EINVAL;
+ }
+
+ pstart = pend + 1;
+ pend =list_obj_name.find('/', pstart);
+ if (pend == std::string::npos) {
+ return -EINVAL;
+ }
+ bucket_id = list_obj_name.substr(pstart, pend - pstart);
+ if (bucket_id.empty()) {
+ return -EINVAL;
+ }
+ prefix = list_obj_name.substr(pend + 1);
+
+ /*
+ //TODO: Fix this
+ std::regex pattern(R"(([^/]+)/([^/]+)/([^/]+)/(.*))");
+ std::smatch matches;
+
+ if (std::regex_match(list_obj_name, matches, pattern)) {
+ tenant_name = matches[1].str();
+ bucket_name = matches[2].str();
+ bucket_id = matches[3].str();
+ prefix = matches[4].str();
+ } else {
+ return -EINVAL;
+ }
+*/
+ return 0;
+ }
+
+ // read the list of commit lists from the global list object
+ int read_global_logging_list(commit_list_t& commits, optional_yield y) {
+ constexpr auto max_chunk = 1024U;
+ std::string start_after;
+ bool more = true;
+ int rval;
+ while (more) {
+ librados::ObjectReadOperation op;
+ commit_list_t commits_chunk;
+ op.omap_get_keys2(start_after, max_chunk, &commits_chunk, &more, &rval);
+ const auto ret = rgw_rados_operate(this, m_rados_store->getRados()->get_logging_pool_ctx(),
+ COMMIT_LIST_OBJECT_NAME, std::move(op),
+ nullptr, y);
+ if (ret == -ENOENT) {
+ // global commit list object was not created - nothing to do
+ return 0;
+ }
+ if (ret < 0) {
+ // TODO: do we need to check on rval as well as ret?
+ ldpp_dout(this, 5) << "ERROR: failed to read bucket logging global commit list. error: "
+ << ret << dendl;
+ return ret;
+ }
+ commits.merge(commits_chunk);
+ if (more) {
+ start_after = *commits.rbegin();
+ }
+ }
+ return 0;
+ }
+
+ class tokens_waiter {
+ size_t pending_tokens = 0;
+ DoutPrefixProvider* const dpp;
+ ceph::async::yield_waiter<void> waiter;
+
+ public:
+ class token{
+ tokens_waiter* tw;
+ public:
+ token(const token& other) = delete;
+ token(token&& other) : tw(other.tw) {
+ other.tw = nullptr; // mark as moved
+ }
+ token& operator=(const token& other) = delete;
+ token(tokens_waiter* _tw) : tw(_tw) {
+ ++tw->pending_tokens;
+ }
+
+ ~token() {
+ if (!tw) {
+ return; // already moved
+ }
+ --tw->pending_tokens;
+ if (tw->pending_tokens == 0 && tw->waiter) {
+ tw->waiter.complete(boost::system::error_code{});
+ }
+ }
+ };
+
+ tokens_waiter(DoutPrefixProvider* _dpp) : dpp(_dpp) {}
+ tokens_waiter(const tokens_waiter& other) = delete;
+ tokens_waiter& operator=(const tokens_waiter& other) = delete;
+
+ void async_wait(boost::asio::yield_context yield) {
+ if (pending_tokens == 0) {
+ return;
+ }
+ ldpp_dout(dpp, 20) << "INFO: tokens waiter is waiting on "
+ << pending_tokens << " tokens" << dendl;
+ boost::system::error_code ec;
+ waiter.async_wait(yield[ec]);
+ ldpp_dout(dpp, 20) << "INFO: tokens waiter finished waiting for all tokens" << dendl;
+ }
+ };
+
+ // processing of a specific entry
+ // return whether processing was successful (true) or not (false)
+ int process_entry(
+ const ConfigProxy& conf,
+ const std::string& entry,
+ const std::string& temp_obj_name,
+ const std::string& tenant_name,
+ const std::string& bucket_name,
+ const std::string& bucket_id,
+ const std::string& prefix,
+ const rgw_pool& obj_pool,
+ bool& bucket_deleted,
+ boost::asio::yield_context yield) {
+
+ std::unique_ptr<rgw::sal::Bucket> log_bucket;
+ rgw_bucket bucket = rgw_bucket{tenant_name, bucket_name, bucket_id};
+ int ret = m_rados_store->load_bucket(this, bucket, &log_bucket, yield);
+ if (ret < 0 && ret != -ENOENT) {
+ ldpp_dout(this, 1) << "ERROR: failed to load bucket : "
+ << bucket_name << " id : " << bucket_id
+ << ", error: " << ret << dendl;
+ return ret;
+ }
+ if (ret == 0) {
+ ldpp_dout(this, 20) << "INFO: loaded bucket : " << bucket_name
+ << ", id: "<< bucket_id << dendl;
+ ret = log_bucket->commit_logging_object(entry, yield, this, prefix,
+ nullptr, false);
+ if (ret < 0) {
+ ldpp_dout(this, 1) << "ERROR: failed to commit logging object : "
+ << entry << " , error: " << ret << dendl;
+ return ret;
+ }
+ ldpp_dout(this, 20) << "INFO: committed logging object : " << entry
+ << " to bucket " << bucket_name
+ << ", id: "<< bucket_id << dendl;
+ return 0;
+ }
+ // The log bucket has been deleted. Clean up pending objects.
+ // This is done because the bucket_deletion_cleanup only removes
+ // the current temp log objects.
+ bucket_deleted = true;
+ ldpp_dout(this, 20) << "INFO: log bucket : " << bucket_name
+ << " no longer exists. Removing pending logging object "
+ << temp_obj_name << " from pool " << obj_pool
+ << dendl;
+ ret = rgw_delete_system_obj(this, m_rados_store->svc()->sysobj, obj_pool,
+ temp_obj_name, nullptr, yield);
+ if (ret < 0 && ret != -ENOENT) {
+ ldpp_dout(this, 1) << "ERROR: failed to delete pending logging object : "
+ << temp_obj_name << " in pool " << obj_pool
+ << " for deleted log bucket : " << bucket_name
+ << " . ret = "<< ret << dendl;
+ return ret;
+ }
+ ldpp_dout(this, 20) << "INFO: Deleted pending logging object "
+ << temp_obj_name << " from pool " << obj_pool
+ << " for deleted log bucket : " << bucket_name
+ << dendl;
+ return 0;
+ }
+
+ void async_sleep(boost::asio::yield_context yield, const std::chrono::milliseconds& duration) {
+ using Clock = ceph::coarse_mono_clock;
+ using Timer = boost::asio::basic_waitable_timer<Clock,
+ boost::asio::wait_traits<Clock>, Executor>;
+ Timer timer(m_io_context);
+ timer.expires_after(duration);
+ boost::system::error_code ec;
+ timer.async_wait(yield[ec]);
+ if (ec) {
+ ldpp_dout(this, 10) << "ERROR: async_sleep failed with error: "
+ << ec.message() << dendl;
+ }
+ }
+
+ // unlock (lose ownership) list object
+ int unlock_list_object(librados::IoCtx& rados_ioctx,
+ const std::string& list_obj_name,
+ boost::asio::yield_context yield) {
+
+ librados::ObjectWriteOperation op;
+ op.assert_exists();
+ rados::cls::lock::unlock(&op, m_lock_name, m_lock_cookie);
+ const auto ret = rgw_rados_operate(this, rados_ioctx, list_obj_name,
+ std::move(op), yield);
+ if (ret == -ENOENT) {
+ ldpp_dout(this, 20) << "INFO: log commit list: " << list_obj_name
+ << " was removed. Nothing to unlock." << dendl;
+ return 0;
+ }
+ if (ret == -EBUSY) {
+ ldpp_dout(this, 20) << "INFO: log commit list: " << list_obj_name
+ << " already owned by another RGW. No need to unlock."
+ << dendl;
+ return 0;
+ }
+ return ret;
+ }
+
+ int remove_commit_list(librados::IoCtx& rados_ioctx,
+ const std::string& commit_list_name,
+ optional_yield y) {
+ {
+ librados::ObjectWriteOperation op;
+ rados::cls::lock::assert_locked(&op, m_lock_name,
+ ClsLockType::EXCLUSIVE,
+ m_lock_cookie,
+ "" /*no tag*/);
+ op.remove();
+ auto ret = rgw_rados_operate(this, rados_ioctx, commit_list_name, std::move(op), y);
+ if (ret < 0 && ret != -ENOENT) {
+ ldpp_dout(this, 1) << "ERROR: failed to remove bucket logging commit list :"
+ << commit_list_name << ". ret = " << ret << dendl;
+ return ret;
+ }
+ }
+
+ librados::ObjectWriteOperation op;
+ op.omap_rm_keys({commit_list_name});
+ auto ret = rgw_rados_operate(this, rados_ioctx, COMMIT_LIST_OBJECT_NAME,
+ std::move(op), y);
+ if (ret < 0) {
+ ldpp_dout(this, 1) << "ERROR: failed to remove entry " << commit_list_name
+ << " from the global bucket logging list : "
+ << COMMIT_LIST_OBJECT_NAME << ". ret = " << ret << dendl;
+ return ret;
+ }
+ ldpp_dout(this, 20) << "INFO: removed entry " << commit_list_name
+ << " from the global bucket logging list : "
+ << COMMIT_LIST_OBJECT_NAME << dendl;
+
+ return 0;
+ }
+
+ // processing of a specific target list
+ void process_commit_list(librados::IoCtx& rados_ioctx,
+ const std::string& list_obj_name,
+ boost::asio::yield_context yield) {
+ auto is_idle = false;
+ auto bucket_deleted = false;
+ auto skip_bucket_deletion = false;
+ const std::string start_marker;
+
+ std::string tenant_name, bucket_name, bucket_id, prefix;
+ int ret = parse_list_name(list_obj_name, tenant_name, bucket_name,
+ bucket_id, prefix);
+ if (ret) {
+ ldpp_dout(this, 1) << "ERROR: commit list name: " << list_obj_name
+ << " is invalid. Processing will stop." << dendl;
+ return;
+ }
+ ldpp_dout(this, 20) << "INFO: parse commit list name: " << list_obj_name
+ << ", bucket_name: " << bucket_name
+ << ", bucket_id: " << bucket_id
+ << ", tenant_name: " << tenant_name
+ << ", prefix: " << prefix << dendl;
+
+ while (!m_shutdown) {
+ // if the list was empty the last time, sleep for idle timeout
+ if (is_idle) {
+ async_sleep(yield, m_log_commits_idle_sleep);
+ }
+
+ // Get the entries in the list after locking it
+ is_idle = true;
+ skip_bucket_deletion = false;
+ std::map<std::string, bufferlist> entries;
+ rgw_pool temp_data_pool;
+ auto total_entries = 0U;
+ {
+ librados::ObjectReadOperation op;
+ op.assert_exists();
+ bufferlist obl;
+ int rval;
+ bufferlist pool_obl;
+ int pool_rval;
+ constexpr auto max_chunk = 1024U;
+ std::string start_after;
+ bool more = true;
+ rados::cls::lock::assert_locked(&op, m_lock_name,
+ ClsLockType::EXCLUSIVE,
+ m_lock_cookie,
+ "" /*no tag*/);
+ op.omap_get_vals2(start_after, max_chunk, &entries, &more, &rval);
+ op.getxattr(TEMP_POOL_ATTR.c_str(), &pool_obl, &pool_rval);
+ // check ownership and list entries in one batch
+ auto ret = rgw_rados_operate(this, rados_ioctx, list_obj_name,
+ std::move(op), nullptr, yield);
+ if (ret == -ENOENT) {
+ // list object was deleted
+ ldpp_dout(this, 20) << "INFO: object: " << list_obj_name
+ << " was removed. Processing will stop" << dendl;
+ return;
+ }
+ if (ret == -EBUSY) {
+ ldpp_dout(this, 10) << "WARNING: commit list : " << list_obj_name
+ << " ownership moved to another daemon. Processing will stop"
+ << dendl;
+ return;
+ }
+ if (ret < 0) {
+ ldpp_dout(this, 10) << "WARNING: failed to get list of entries in "
+ << "and/or lock object: " << list_obj_name
+ << ". error: " << ret << " (will retry)" << dendl;
+ continue;
+ }
+ if(pool_rval < 0) {
+ ldpp_dout(this, 10) << "WARNING: failed to get pool information"
+ << "from commit list: " << list_obj_name << dendl;
+ return;
+ }
+ // Get temp logging object pool information - this is required for
+ // deleting the objects when the log bucket is deleted.
+ try {
+ auto p = pool_obl.cbegin();
+ decode(temp_data_pool, p);
+ } catch (buffer::error& err) {
+ ldpp_dout(this, 1) << "ERROR: failed to get pool information from object: "
+ << list_obj_name << dendl;
+ return;
+ }
+ }
+ total_entries = entries.size();
+ if (total_entries == 0) {
+ // nothing in the list object
+ // Delete the list object if the bucket has been deleted
+ std::unique_ptr<rgw::sal::Bucket> log_bucket;
+ rgw_bucket bucket = rgw_bucket{tenant_name, bucket_name, bucket_id};
+ int ret = m_rados_store->load_bucket(this, bucket, &log_bucket, yield);
+ if (ret == -ENOENT) {
+ ldpp_dout(this, 20) << "ERROR: failed to load bucket : "
+ << bucket_name << ", id : " << bucket_id
+ << ", error: " << ret << dendl;
+ bucket_deleted = true;
+ }
+ } else {
+ // log when not idle
+ ldpp_dout(this, 20) << "INFO: found: " << total_entries
+ << " entries in commit list: " << list_obj_name
+ << dendl;
+ }
+ auto stop_processing = false;
+ std::set<std::string> remove_entries;
+ tokens_waiter tw(this);
+ for (auto const& [key, val] : entries) {
+ if (stop_processing) {
+ break;
+ }
+ std::string temp_obj_name = val.to_str();
+
+ ldpp_dout(this, 20) << "INFO: processing entry: " << key
+ << ", value : " << temp_obj_name << dendl;
+
+ tokens_waiter::token token(&tw);
+ boost::asio::spawn(yield, std::allocator_arg, make_stack_allocator(),
+ [this, &is_idle, &list_obj_name, &remove_entries, &stop_processing,
+ &tenant_name, &bucket_name, &bucket_id, &prefix, token = std::move(token),
+ key, temp_obj_name, temp_data_pool, &bucket_deleted , &skip_bucket_deletion]
+ (boost::asio::yield_context yield) {
+ auto result =
+ process_entry(this->get_cct()->_conf, key, temp_obj_name, tenant_name,
+ bucket_name, bucket_id, prefix, temp_data_pool,
+ bucket_deleted, yield);
+ if (result == 0) {
+ ldpp_dout(this, 20) << "INFO: processing of entry: " << key
+ << " from: " << list_obj_name
+ << " was successful." << dendl;
+ remove_entries.insert(key);
+ is_idle = false;
+ return;
+ } else {
+ is_idle = true;
+ // Don't delete the bucket if we couldn't process the entry
+ // and the bucket has been deleted
+ skip_bucket_deletion = true;
+ ldpp_dout(this, 20) << "INFO: failed to process entry: " << key
+ << " from: " << list_obj_name << dendl;
+ }
+ stop_processing = true;
+ }, [] (std::exception_ptr eptr) {
+ if (eptr) std::rethrow_exception(eptr);
+ });
+ }
+
+ if (!entries.empty()) {
+ // wait for all pending work to finish
+ tw.async_wait(yield);
+ }
+
+ if (bucket_deleted && !skip_bucket_deletion) {
+ ret = remove_commit_list(rados_ioctx, list_obj_name, yield);
+ if (ret < 0) {
+ ldpp_dout(this, 1) << "ERROR: failed to remove bucket logging commit list :"
+ << list_obj_name << ". ret = " << ret << dendl;
+ return;
+ }
+ ldpp_dout(this, 10) << "INFO: bucket :" << bucket_name
+ << ". was removed. processing will stop" << dendl;
+ return;
+ }
+ // delete all processed entries from list
+ if (!remove_entries.empty()) {
+ // Delete the entries even if the lock no longer belongs
+ // to this instance as we have processed the entries.
+ librados::ObjectWriteOperation op;
+ op.omap_rm_keys(remove_entries);
+ auto ret = rgw_rados_operate(this, rados_ioctx, list_obj_name, std::move(op), yield);
+ if (ret == -ENOENT) {
+ // list was deleted
+ ldpp_dout(this, 10) << "INFO: commit list " << list_obj_name
+ << ". was removed. processing will stop" << dendl;
+ return;
+ }
+ if (ret < 0) {
+ ldpp_dout(this, 1) << "ERROR: failed to remove entries from commit list: "
+ << list_obj_name << ". error: " << ret
+ << ". This could cause some log records to be lost."
+ << dendl;
+ return;
+ } else {
+ ldpp_dout(this, 20) << "INFO: removed entries from commit list: "
+ << list_obj_name << dendl;
+ }
+ }
+ }
+ ldpp_dout(this, 5) << "INFO: BucketLogging manager stopped processing : "
+ << list_obj_name << dendl;
+ }
+
+ // process all commit log objects
+ // find which of the commit log objects is owned by this daemon and process it
+ void process_global_logging_list(boost::asio::yield_context yield) {
+ auto has_error = false;
+ std::unordered_set<std::string> owned_commit_logs;
+ std::atomic<size_t> processed_list_count = 0;
+
+ std::vector<std::string> commit_list_gc;
+ std::mutex commit_list_gc_lock;
+ auto& rados_ioctx = m_rados_store->getRados()->get_logging_pool_ctx();
+ auto next_check_time = ceph::coarse_real_clock::zero();
+ while (!m_shutdown) {
+ // check if log commit list needs to be refreshed
+ if (ceph::coarse_real_clock::now() > next_check_time) {
+ next_check_time = ceph::coarse_real_clock::now() + m_log_commits_update_period;
+ const auto tp = ceph::coarse_real_time::clock::to_time_t(next_check_time);
+ ldpp_dout(this, 20) << "INFO: processing global logging commit list. next "
+ << "processing will happen at: " << std::ctime(&tp)
+ << dendl;
+ } else {
+ // short sleep duration to prevent busy wait when refreshing list
+ // or retrying after error
+ ldpp_dout(this, 20) << "INFO: processing global logging commit list. Sleep now." << dendl;
+ async_sleep(yield, m_log_commits_update_retry);
+ if (!has_error) {
+ // in case of error we will retry
+ continue;
+ }
+ }
+
+ std::set<std::string> commit_lists;
+ auto ret = read_global_logging_list(commit_lists, yield);
+ if (ret < 0) {
+ has_error = true;
+ ldpp_dout(this, 1) << "ERROR: failed to read global logging list. Retry. ret:"
+ << ret << dendl;
+ continue;
+ }
+
+ for (const auto& list_obj_name : commit_lists) {
+ // try to lock the object to check if it is owned by this rgw
+ // or if ownership needs to be taken
+ ldpp_dout(this, 20) << "INFO: processing commit list: " << list_obj_name
+ << dendl;
+ librados::ObjectWriteOperation op;
+ op.assert_exists();
+ rados::cls::lock::lock(&op, m_lock_name, ClsLockType::EXCLUSIVE,
+ m_lock_cookie, "" /*no tag*/,
+ "" /*no description*/, m_failover_time,
+ LOCK_FLAG_MAY_RENEW);
+
+ ret = rgw_rados_operate(this, rados_ioctx, list_obj_name, std::move(op), yield);
+ if (ret == -EBUSY) {
+ // lock is already taken by another RGW
+ ldpp_dout(this, 20) << "INFO: commit list: " << list_obj_name
+ << " owned (locked) by another daemon" << dendl;
+ // if the list was owned by this RGW, processing should be stopped, would be deleted from list afterwards
+ continue;
+ }
+ if (ret == -ENOENT) {
+ // commit list is deleted - processing will stop the next time we try to read from it
+ ldpp_dout(this, 20) << "INFO: commit list: " << list_obj_name
+ << " should not be locked - already deleted" << dendl;
+ continue;
+ }
+ if (ret < 0) {
+ // failed to lock for another reason, continue to process other lists
+ ldpp_dout(this, 10) << "ERROR: failed to lock commit list: "
+ << list_obj_name << ". error: " << ret << dendl;
+ has_error = true;
+ continue;
+ }
+ // add the commit list to the list of owned commit_logs
+ if (owned_commit_logs.insert(list_obj_name).second) {
+ ldpp_dout(this, 20) << "INFO: " << list_obj_name << " now owned (locked) by this daemon" << dendl;
+ // start processing this list
+ boost::asio::spawn(make_strand(m_io_context), std::allocator_arg, make_stack_allocator(),
+ [this, &commit_list_gc, &commit_list_gc_lock, &rados_ioctx,
+ list_obj_name, &processed_list_count](boost::asio::yield_context yield) {
+ ++processed_list_count;
+ process_commit_list(rados_ioctx, list_obj_name, yield);
+ // if list processing ended, it means that the list object was removed or not owned anymore
+ const auto ret = unlock_list_object(rados_ioctx, list_obj_name, yield);
+ if (ret < 0) {
+ ldpp_dout(this, 15) << "WARNING: failed to unlock commit list: "
+ << list_obj_name << " with error: " << ret
+ << " (ownership would still move if not renewed)"
+ << dendl;
+ } else {
+ ldpp_dout(this, 20) << "INFO: commit list: " << list_obj_name
+ << " not locked (ownership can move)" << dendl;
+ }
+ // mark it for deletion
+ std::lock_guard lock_guard(commit_list_gc_lock);
+ commit_list_gc.push_back(list_obj_name);
+ --processed_list_count;
+ ldpp_dout(this, 20) << "INFO: " << list_obj_name << " marked for removal" << dendl;
+ }, [this, list_obj_name] (std::exception_ptr eptr) {
+ ldpp_dout(this, 10) << "ERROR: " << list_obj_name << " processing failed" << dendl;
+ if (eptr) std::rethrow_exception(eptr);
+ });
+ } else {
+ ldpp_dout(this, 20) << "INFO: " << list_obj_name << " ownership (lock) renewed" << dendl;
+ }
+ }
+ // erase all list objects that were deleted
+ {
+ std::lock_guard lock_guard(commit_list_gc_lock);
+ std::for_each(commit_list_gc.begin(), commit_list_gc.end(), [this, &owned_commit_logs](const std::string& list_obj_name) {
+ owned_commit_logs.erase(list_obj_name);
+ ldpp_dout(this, 20) << "INFO: commit list object: " << list_obj_name << " was removed" << dendl;
+ });
+ commit_list_gc.clear();
+ }
+ }
+ while (processed_list_count > 0) {
+ ldpp_dout(this, 20) << "INFO: BucketLogging manager stopped. "
+ << processed_list_count
+ << " commit lists are still being processed" << dendl;
+ async_sleep(yield, m_log_commits_update_retry);
+ }
+ ldpp_dout(this, 5) << "INFO: BucketLogging manager stopped. Done processing all commit lists" << dendl;
+ }
+
+public:
+
+ ~BucketLoggingManager() {
+ }
+
+ void stop() {
+ ldpp_dout(this, 5) << "INFO: BucketLogging manager received stop signal. shutting down..." << dendl;
+ m_shutdown = true;
+ m_work_guard.reset();
+ if (m_worker.joinable()) {
+ // try graceful shutdown first
+ auto future = std::async(std::launch::async, [this]() {m_worker.join();});
+ if (future.wait_for(m_log_commits_update_retry*2) == std::future_status::timeout) {
+ // force stop if graceful shutdown takes too long
+ if (!m_io_context.stopped()) {
+ ldpp_dout(this, 5) << "INFO: force shutdown of BucketLogging Manager" << dendl;
+ m_io_context.stop();
+ }
+ m_worker.join();
+ }
+ }
+ ldpp_dout(this, 5) << "INFO: BucketLogging manager shutdown complete" << dendl;
+ }
+
+ void init() {
+ boost::asio::spawn(make_strand(m_io_context), std::allocator_arg, make_stack_allocator(),
+ [this](boost::asio::yield_context yield) {
+ process_global_logging_list(yield);
+ }, [] (std::exception_ptr eptr) {
+ if (eptr) std::rethrow_exception(eptr);
+ });
+ // start the worker threads to do the actual list processing
+ m_worker = std::thread([this]() {
+ ceph_pthread_setname("bucket-logging-worker");
+ try {
+ ldpp_dout(this, 20) << "INFO: bucket logging worker started" << dendl;
+ m_io_context.run();
+ ldpp_dout(this, 20) << "INFO: bucket logging worker ended" << dendl;
+ } catch (const std::exception& err) {
+ ldpp_dout(this, 10) << "ERROR: bucket logging worker failed with error: "
+ << err.what() << dendl;
+ throw err;
+ }
+ });
+ ldpp_dout(this, 10) << "INFO: started bucket logging manager" << dendl;
+ }
+
+ BucketLoggingManager(CephContext* _cct, rgw::sal::RadosStore* store, const SiteConfig& site) :
+ m_cct(_cct),
+ m_lock_cookie(gen_rand_alphanumeric(m_cct, COOKIE_LEN)),
+ m_work_guard(boost::asio::make_work_guard(m_io_context)),
+ m_site(site),
+ m_rados_store(store)
+ {
+ ldpp_dout(this, 5) << "INFO: BucketLoggingManager() constructor" << dendl;
+ }
+};
+
+std::unique_ptr<BucketLoggingManager> s_manager;
+
+bool init(const DoutPrefixProvider* dpp, rgw::sal::RadosStore* store,
+ const SiteConfig& site) {
+ if (s_manager) {
+ ldpp_dout(dpp, 1) << "ERROR: failed to init BucketLogging manager: already exists" << dendl;
+ return false;
+ }
+ // TODO: take conf from CephContext
+ ldpp_dout(dpp, 1) << "INFO: initialising BucketLogging manager" << dendl;
+ s_manager = std::make_unique<BucketLoggingManager>(dpp->get_cct(), store, site);
+ s_manager->init();
+ return true;
+}
+
+void shutdown() {
+ if (!s_manager) return;
+ s_manager->stop();
+ s_manager.reset();
+}
+
+} // namespace rgw::bucket_logging
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#pragma once
+
+#include <string>
+
+// forward declarations
+namespace rgw { class SiteConfig; }
+namespace rgw::sal {
+ class RadosStore;
+}
+class DoutPrefixProvider;
+
+namespace rgw::bucketlogging {
+
+ // initialize the bucket logging commit manager
+ bool init(const DoutPrefixProvider* dpp, rgw::sal::RadosStore* store,
+ const rgw::SiteConfig& site);
+
+ // shutdown the bucket logging commit manager
+ void shutdown();
+
+ int add_commit_target_entry(const DoutPrefixProvider* dpp,
+ rgw::sal::RadosStore* store,
+ const rgw::sal::Bucket* log_bucket,
+ const std::string& prefix,
+ const std::string& obj_name,
+ const std::string& tail_obj_name,
+ const rgw_pool& temp_data_pool,
+ optional_yield y);
+
+int list_pending_commit_objects(const DoutPrefixProvider* dpp,
+ rgw::sal::RadosStore* store,
+ const rgw::sal::Bucket* log_bucket,
+ const std::string& prefix,
+ std::set<std::string>& entries,
+ optional_yield y);
+
+}
+
#include "rgw_etag_verifier.h"
#include "rgw_worker.h"
#include "rgw_notify.h"
+#include "rgw_bl_rados.h"
#include "rgw_http_errors.h"
#include "rgw_multipart_meta_filter.h"
v1_topic_migration.stop();
}
+ if (run_bucket_logging_thread) {
+ rgw::bucketlogging::shutdown();
+ }
+
if (use_restore_thread) {
restore->stop_processor();
}
if (ret < 0)
return ret;
+ ret = open_logging_pool_ctx(dpp);
+ if (ret < 0)
+ return ret;
+
pools_initialized = true;
if (use_gc) {
index_completion_manager = new RGWIndexCompletionManager(this);
+ if (run_bucket_logging_thread) {
+ if (!rgw::bucketlogging::init(dpp, this->driver, *svc.site)) {
+ ldpp_dout(dpp, 0) << "ERROR: failed to initialize bucket logging manager" << dendl;
+ }
+ ldpp_dout(dpp, 0) << "INFO: initialized bucket logging manager" << dendl;
+ }
if (run_notification_thread) {
if (!rgw::notify::init(dpp, driver, *svc.site)) {
ldpp_dout(dpp, 0) << "ERROR: failed to initialize notification manager" << dendl;
v1_topic_migration.start(1);
}
}
-
return ret;
}
return rgw_init_ioctx(dpp, get_rados_handle(), svc.zone->get_zone_params().notif_pool, notif_pool_ctx, true, true);
}
+int RGWRados::open_logging_pool_ctx(const DoutPrefixProvider *dpp)
+{
+ return rgw_init_ioctx(dpp, get_rados_handle(), svc.zone->get_zone_params().bucket_logging_pool, logging_pool_ctx, true, true);
+}
+
int RGWRados::open_pool_ctx(const DoutPrefixProvider *dpp, const rgw_pool& pool, librados::IoCtx& io_ctx,
bool mostly_omap, bool bulk)
{
int open_objexp_pool_ctx(const DoutPrefixProvider *dpp);
int open_reshard_pool_ctx(const DoutPrefixProvider *dpp);
int open_notif_pool_ctx(const DoutPrefixProvider *dpp);
+ int open_logging_pool_ctx(const DoutPrefixProvider *dpp);
int open_pool_ctx(const DoutPrefixProvider *dpp, const rgw_pool& pool, librados::IoCtx& io_ctx,
bool mostly_omap, bool bulk);
bool run_sync_thread{false};
bool run_reshard_thread{false};
bool run_notification_thread{false};
+ bool run_bucket_logging_thread{false};
RGWMetaNotifier* meta_notifier{nullptr};
RGWDataNotifier* data_notifier{nullptr};
librados::IoCtx objexp_pool_ctx;
librados::IoCtx reshard_pool_ctx;
librados::IoCtx notif_pool_ctx; // .rgw.notif
+ librados::IoCtx logging_pool_ctx; // .rgw.logging
bool pools_initialized{false};
return *this;
}
+ RGWRados& set_run_bucket_logging_thread(bool _run_bucket_logging_thread) {
+ run_bucket_logging_thread = _run_bucket_logging_thread;
+ return *this;
+ }
+
librados::IoCtx* get_lc_pool_ctx() {
return &lc_pool_ctx;
}
librados::IoCtx& get_notif_pool_ctx() {
return notif_pool_ctx;
}
+
+ librados::IoCtx& get_logging_pool_ctx() {
+ return logging_pool_ctx;
+ }
void set_context(CephContext *_cct) {
cct = _cct;
#include "rgw_aio_throttle.h"
#include "rgw_bucket.h"
#include "rgw_bucket_logging.h"
+#include "rgw_bl_rados.h"
#include "rgw_lc.h"
#include "rgw_lc_tier.h"
#include "rgw_lc_tier.h"
#define RGW_ATTR_COMMITTED_LOGGING_OBJ RGW_ATTR_PREFIX "committed-logging-obj"
+#define RGW_ATTR_LOGGING_EC_POOL RGW_ATTR_PREFIX "logging-ec-pool"
int get_committed_logging_object(RadosStore* store,
const std::string& obj_name_oid,
return rgw_rados_operate(dpp, io_ctx, obj_name_oid, std::move(op), y);
}
+int get_logging_temp_object_pool (const DoutPrefixProvider *dpp,
+ RadosBucket* bucket,
+ RadosStore* rados_store,
+ const std::string& obj_name_oid,
+ rgw_pool& data_pool, bool& is_ec_pool, optional_yield y) {
+
+ int ret;
+ rgw_obj obj {bucket->get_key(), "dummy"};
+ if (!rados_store->getRados()->get_obj_data_pool(bucket->get_placement_rule(), obj, &data_pool)) {
+ ldpp_dout(dpp, 1) << "ERROR: failed to get data pool for bucket '" << bucket->get_key() <<
+ "' when writing logging object" << dendl;
+ return -EIO;
+ }
+
+ // Find out if the log bucket is on an EC pool
+ // Use an attribute on the logging name object instead of the bucket so
+ // it is not synced in a multi-site setup
+
+ librados::Rados* rados = rados_store->getRados()->get_rados_handle();
+ librados::IoCtx io_ctx;
+ if (ret = rgw_init_ioctx(dpp, rados, data_pool, io_ctx); ret < 0) {
+ ldpp_dout(dpp, 1) << "ERROR: get data pool ioctx for bucket '" << bucket->get_key() <<
+ "' when writing logging object. ret = " << ret << dendl;
+ return ret;
+ }
+ if (!io_ctx.is_valid()) {
+ ldpp_dout(dpp, 1) << "ERROR: invalid data pool ioctx for bucket '" << bucket->get_key() <<
+ "' when writing logging object" << dendl;
+ return ret;
+ }
+
+ is_ec_pool = false;
+ bufferlist bl;
+ librados::ObjectReadOperation op;
+ int rval;
+ op.getxattr(RGW_ATTR_LOGGING_EC_POOL, &bl, &rval);
+ ret = rgw_rados_operate(dpp, io_ctx, obj_name_oid, std::move(op), nullptr, y);
+ if (ret == 0) {
+ try {
+ ceph::decode(is_ec_pool, bl);
+ } catch (buffer::error& err) {
+ ldpp_dout(dpp, 1) << "ERROR: failed to decode " << RGW_ATTR_LOGGING_EC_POOL
+ << " attr for bucket '" << bucket->get_key()
+ << "'. error = " << err.what() << dendl;
+ return -EINVAL;
+ }
+ } else if (ret < 0){
+ // Try to find out if it is an EC pool
+ ret = rados->mon_command(
+ "{\"prefix\": \"osd pool get\", \"pool\": \"" +
+ data_pool.name + "\", \"var\": \"erasure_code_profile\"}",
+ {}, NULL, NULL);
+ if (ret == -EACCES) {
+ is_ec_pool = false;
+ } else if (ret == 0){
+ is_ec_pool = true;
+ } else {
+ ldpp_dout(dpp, 10) << __func__ << " ERROR: failed to find out if pool"
+ << data_pool.name << " is erasure coded. ret = "
+ << ret << dendl;
+ return ret;
+ }
+ bufferlist bl;
+ ceph::encode(is_ec_pool, bl);
+ librados::ObjectWriteOperation op;
+ // if object does not exist, we should not create the attribute
+ op.assert_exists();
+ op.setxattr(RGW_ATTR_LOGGING_EC_POOL, std::move(bl));
+ if (const int ret = rgw_rados_operate(dpp, io_ctx, obj_name_oid, std::move(op), y); ret < 0){
+ // Don't return an error as we know if it is an ec pool
+ ldpp_dout(dpp, 10) << "ERROR: failed to set is_ec_pool attr on :" << obj_name_oid
+ << " ret: " << ret << dendl;
+ }
+ }
+ // If the target log bucket is on an EC pool, the temp
+ // logging object will be created in the default.rgw.log pool
+ if (is_ec_pool) {
+ data_pool = rados_store->svc()->zone->get_zone_params().bucket_logging_pool;
+ }
+ return 0;
+}
+
int RadosBucket::get_logging_object_name(std::string& obj_name,
const std::string& prefix,
optional_yield y,
obj_name);
}
-int RadosBucket::remove_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp) {
+int RadosBucket::remove_logging_object(const std::string& obj_name, const std::string& prefix, optional_yield y, const DoutPrefixProvider *dpp) {
rgw_pool data_pool;
- const rgw_obj head_obj{get_key(), obj_name};
- const auto placement_rule = get_placement_rule();
-
- if (!store->getRados()->get_obj_data_pool(placement_rule, head_obj, &data_pool)) {
- ldpp_dout(dpp, 1) << "ERROR: failed to get data pool for bucket '" << get_key() <<
- "' when deleting logging object" << dendl;
+ const auto obj_name_oid = bucketlogging::object_name_oid(this, prefix);
+ bool is_ec_pool;
+ auto ret = get_logging_temp_object_pool (dpp, this, store, obj_name_oid, data_pool, is_ec_pool, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1) << "ERROR: failed to get temp data pool for bucket '" << get_key() <<
+ "' when deleting logging object: " << obj_name << ". ret: " << ret << dendl;
return -EIO;
}
optional_yield y,
const DoutPrefixProvider *dpp,
const std::string& prefix,
- std::string* last_committed) {
+ std::string* last_committed, bool async) {
rgw_pool data_pool;
const rgw_obj head_obj{get_key(), obj_name};
const auto placement_rule = get_placement_rule();
"' when committing logging object" << dendl;
return -EIO;
}
+ int ret;
const auto obj_name_oid = bucketlogging::object_name_oid(this, prefix);
if (last_committed) {
- if (const int ret = get_committed_logging_object(store,
+ if (ret = get_committed_logging_object(store,
obj_name_oid,
data_pool,
y,
}
const auto temp_obj_name = to_temp_object_name(this, obj_name);
+ rgw_pool temp_data_pool;
+ bool is_ec_pool = false;
+
+ ret = get_logging_temp_object_pool (dpp, this, store, obj_name_oid, temp_data_pool, is_ec_pool, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1) << "ERROR: failed to get data pool for bucket '"
+ << get_key() << "' when committing logging object: "
+ << obj_name << ". ret: " << ret << dendl;
+ return -EIO;
+ }
+ if (async) {
+ ret = rgw::bucketlogging::add_commit_target_entry(dpp, store, this, prefix,
+ obj_name, temp_obj_name,
+ temp_data_pool, y);
+ if (ret < 0){
+ return ret;
+ }
+
+ ldpp_dout(dpp, 20) << "INFO: added logging object entry " << obj_name
+ << " to commit list object." << dendl;
+ return 0;
+ }
+
std::map<string, bufferlist> obj_attrs;
ceph::real_time mtime;
bufferlist bl_data;
- if (auto ret = rgw_get_system_obj(store->svc()->sysobj,
- data_pool,
+ if (ret = rgw_get_system_obj(store->svc()->sysobj,
+ temp_data_pool,
temp_obj_name,
bl_data,
nullptr,
}
mtime = ceph::real_time::clock::now();
ldpp_dout(dpp, 20) << "INFO: temporary logging object '" << temp_obj_name << "' does not exist. committing it empty" << dendl;
+ } else if (is_ec_pool) {
+ if (ret = rgw_put_system_obj(dpp, store->svc()->sysobj,
+ data_pool,
+ temp_obj_name,
+ bl_data,
+ true,
+ nullptr,
+ mtime,
+ y,
+ &obj_attrs); ret < 0){
+
+ if (ret == -EEXIST) {
+ ldpp_dout(dpp, 5) << "WARNING: race detected in committing logging object '" << temp_obj_name << dendl;
+ } else {
+ ldpp_dout(dpp, 1) << "ERROR: failed to write logging data when committing object '" << temp_obj_name
+ << ". error: " << ret << dendl;
+ }
+ return ret;
+ }
+ ldpp_dout(dpp, 20) << "INFO: wrote logging data when committing object '" << temp_obj_name << dendl;
+
+ if (ret = rgw_delete_system_obj(dpp, store->svc()->sysobj,
+ temp_data_pool,
+ temp_obj_name,
+ nullptr,
+ y); ret < 0 && ret != -ENOENT) {
+ ldpp_dout(dpp, 1) << "ERROR: failed to delete temp logging object when "
+ << "committing object '" << temp_obj_name
+ << ". error: " << ret << dendl;
+ }
}
uint64_t size = bl_data.length();
int RadosBucket::write_logging_object(const std::string& obj_name,
const std::string& record,
+ const std::string& prefix,
optional_yield y,
const DoutPrefixProvider *dpp,
bool async_completion) {
const auto temp_obj_name = to_temp_object_name(this, obj_name);
rgw_pool data_pool;
- rgw_obj obj{get_key(), obj_name};
- if (!store->getRados()->get_obj_data_pool(get_placement_rule(), obj, &data_pool)) {
- ldpp_dout(dpp, 1) << "ERROR: failed to get data pool for bucket '" << get_key() <<
- "' when writing logging object" << dendl;
+
+ bool is_ec_pool = false;
+ const auto obj_name_oid = bucketlogging::object_name_oid(this, prefix);
+ int ret = get_logging_temp_object_pool (dpp, this, store, obj_name_oid, data_pool, is_ec_pool, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1) << "ERROR: failed to get data pool for bucket '"
+ << get_key() << "' when writing logging object "
+ << temp_obj_name << ", ret: " << ret << dendl;
return -EIO;
}
+
librados::IoCtx io_ctx;
- if (const auto ret = rgw_init_ioctx(dpp, store->getRados()->get_rados_handle(), data_pool, io_ctx); ret < 0) {
+ if (const auto ret = rgw_init_ioctx(dpp, store->getRados()->get_rados_handle(), data_pool, io_ctx, true); ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to get IO context for logging object from data pool:" << data_pool.to_str() << dendl;
return -EIO;
}
+
bufferlist bl;
bl.append(record);
bl.append("\n");
optional_yield y,
const DoutPrefixProvider *dpp,
RGWObjVersionTracker* objv_tracker) override;
- int commit_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp, const std::string& prefix, std::string* last_committed) override;
- int remove_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp) override;
- int write_logging_object(const std::string& obj_name, const std::string& record, optional_yield y, const DoutPrefixProvider *dpp, bool async_completion) override;
+ int commit_logging_object(const std::string& obj_name, optional_yield y,
+ const DoutPrefixProvider *dpp, const std::string& prefix,
+ std::string* last_committed, bool async) override;
+ int remove_logging_object(const std::string& obj_name, const std::string& prefix, optional_yield y, const DoutPrefixProvider *dpp) override;
+ int write_logging_object(const std::string& obj_name, const std::string& record, const std::string& prefix, optional_yield y, const DoutPrefixProvider *dpp, bool async_completion) override;
private:
int link(const DoutPrefixProvider* dpp, const rgw_owner& new_owner, optional_yield y, bool update_entrypoint = true, RGWObjVersionTracker* objv = nullptr);
#include "driver/rados/rgw_bucket.h"
#include "driver/rados/rgw_sal_rados.h"
+#include "driver/rados/rgw_bl_rados.h"
#include <iomanip>
cout << " bucket radoslist list rados objects backing bucket's objects\n";
cout << " bucket logging flush flush pending log records object of source bucket to the log bucket\n";
cout << " bucket logging info get info on bucket logging configuration on source bucket or list of sources in log bucket\n";
+ cout << " bucket logging list list the log objects pending commit for the source bucket\n";
cout << " bi get retrieve bucket index object entries\n";
cout << " bi put store bucket index object entries\n";
cout << " bi list list raw bucket index entries\n";
BUCKET_RESYNC_ENCRYPTED_MULTIPART,
BUCKET_LOGGING_FLUSH,
BUCKET_LOGGING_INFO,
+ BUCKET_LOGGING_LIST,
POLICY,
LOG_LIST,
LOG_SHOW,
{ "bucket resync encrypted multipart", OPT::BUCKET_RESYNC_ENCRYPTED_MULTIPART },
{ "bucket logging flush", OPT::BUCKET_LOGGING_FLUSH },
{ "bucket logging info", OPT::BUCKET_LOGGING_INFO },
+ { "bucket logging list", OPT::BUCKET_LOGGING_LIST },
{ "policy", OPT::POLICY },
{ "log list", OPT::LOG_LIST },
{ "log show", OPT::LOG_SHOW },
false,
false,
false,
+ false,
false, // No background tasks!
null_yield,
cfgstore.get(),
}
std::string old_obj;
const auto region = driver->get_zone()->get_zonegroup().get_api_name();
- ret = rgw::bucketlogging::rollover_logging_object(configuration, target_bucket, obj_name, dpp(), region, bucket, null_yield, true, &objv_tracker, &old_obj);
+ ret = rgw::bucketlogging::rollover_logging_object(configuration, target_bucket, obj_name, dpp(), region, bucket, null_yield, true, &objv_tracker, false, &old_obj);
if (ret < 0) {
cerr << "ERROR: failed to flush pending logging object '" << obj_name << "' to target bucket '" << configuration.target_bucket
<< "'. error: " << cpp_strerror(-ret) << std::endl;
return 0;
}
+ if (opt_cmd == OPT::BUCKET_LOGGING_LIST) {
+ if (bucket_name.empty()) {
+ cerr << "ERROR: bucket not specified" << std::endl;
+ return EINVAL;
+ }
+ if (driver->get_name() != "rados") {
+ cerr << "ERROR: this command is only available with the RADOS driver." << std::endl;
+ return EINVAL;
+ }
+
+ int ret = init_bucket(tenant, bucket_name, bucket_id, &bucket);
+ if (ret < 0) {
+ return -ret;
+ }
+
+ rgw::bucketlogging::configuration configuration;
+ std::unique_ptr<rgw::sal::Bucket> target_bucket;
+ ret = rgw::bucketlogging::get_target_and_conf_from_source(dpp(),
+ driver, bucket.get(), tenant, configuration, target_bucket, null_yield);
+ if (ret < 0 && ret != -ENODATA) {
+ cerr << "ERROR: failed to get target bucket and logging conf from source bucket '"
+ << bucket_name << "': " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ } else if (ret == -ENODATA) {
+ cerr << "ERROR: bucket '" << bucket_name << "' does not have logging enabled" << std::endl;
+ return 0;
+ }
+ std::string target_prefix = configuration.target_prefix;
+ std::set<std::string> entries;
+
+ ret = rgw::bucketlogging::list_pending_commit_objects(dpp(),
+ static_cast<rgw::sal::RadosStore*>(driver), target_bucket.get(),
+ target_prefix, entries, null_yield);
+
+ if (ret < 0) {
+ cerr << "ERROR: failed to get pending log entries for bucket '" << bucket_name
+ << "': " << cpp_strerror(-ret) << std::endl;
+ return ret;
+ }
+
+ formatter->open_array_section("pending_logs");
+ for (auto &entry: entries) {
+ formatter->dump_string("log", entry);
+ }
+ formatter->close_section(); // objs
+ formatter->flush(cout);
+ return 0;
+ }
+
if (opt_cmd == OPT::LOG_LIST) {
// filter by date?
if (date.size() && date.size() != 10) {
run_sync,
g_conf().get_val<bool>("rgw_dynamic_resharding"),
true, // run notification thread
+ true, // run bucket-logging thread
true, null_yield, env.cfgstore,
g_conf()->rgw_cache_enabled);
if (!env.driver) {
target_bucket->get_key() << "'. ret = " << ret << dendl;
return ret;
}
- if (const int ret = target_bucket->commit_logging_object(obj_name, y, dpp, conf.target_prefix, last_committed); ret < 0) {
+ if (const int ret = target_bucket->commit_logging_object(obj_name, y, dpp, conf.target_prefix, last_committed, false); ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to commit logging object '" << obj_name << "' of logging bucket '" <<
target_bucket->get_key() << "'. ret = " << ret << dendl;
return ret;
optional_yield y,
bool must_commit,
RGWObjVersionTracker* objv_tracker,
+ bool async,
std::string* last_committed,
std::string* err_message) {
std::string target_bucket_name;
return ret;
}
}
- if (const int ret = target_bucket->commit_logging_object(*old_obj, y, dpp, conf.target_prefix, last_committed); ret < 0) {
+ if (const int ret = target_bucket->commit_logging_object(*old_obj, y, dpp, conf.target_prefix, last_committed, async); ret < 0) {
if (must_commit) {
if (err_message) {
*err_message = fmt::format("Failed to commit logging object of logging bucket '{}'", target_bucket->get_name());
if (ceph::coarse_real_time::clock::now() > time_to_commit) {
ldpp_dout(dpp, 20) << "INFO: logging object '" << obj_name << "' exceeded its time, will be committed to logging bucket '" <<
target_bucket_id << "'" << dendl;
- if (ret = rollover_logging_object(conf, target_bucket, obj_name, dpp, region, s->bucket, y, false, &objv_tracker, nullptr, &err_message); ret < 0 && ret != -ECANCELED) {
+ if (ret = rollover_logging_object(conf, target_bucket, obj_name, dpp, region, s->bucket, y, false, &objv_tracker, true, nullptr, &err_message); ret < 0 && ret != -ECANCELED) {
set_journal_err(err_message);
return ret;
}
if (ret = target_bucket->write_logging_object(obj_name,
record,
+ conf.target_prefix,
y,
dpp,
async_completion); ret < 0 && ret != -EFBIG) {
if (ret == -EFBIG) {
ldpp_dout(dpp, 5) << "WARNING: logging object '" << obj_name << "' is full, will be committed to logging bucket '" <<
target_bucket->get_key() << "'" << dendl;
- if (ret = rollover_logging_object(conf, target_bucket, obj_name, dpp, region, s->bucket, y, true, &objv_tracker, nullptr, &err_message); ret < 0 && ret != -ECANCELED) {
+ if (ret = rollover_logging_object(conf, target_bucket, obj_name, dpp, region, s->bucket, y, true, &objv_tracker, true, nullptr, &err_message); ret < 0 && ret != -ECANCELED) {
set_journal_err(err_message);
return ret;
}
if (ret = target_bucket->write_logging_object(obj_name,
record,
+ conf.target_prefix,
y,
dpp,
async_completion); ret < 0) {
}
ldpp_dout(dpp, 20) << "INFO: successfully deleted object holding bucket logging object name from deleted logging bucket '" <<
bucket->get_key() << "'" << dendl;
- if (const int ret = bucket->remove_logging_object(obj_name, y, dpp); ret < 0) {
+ if (const int ret = bucket->remove_logging_object(obj_name, conf.target_prefix, y, dpp); ret < 0) {
ldpp_dout(dpp, 5) << "WARNING: failed to delete pending logging object '" << obj_name << "' for logging bucket '" <<
bucket->get_key() << "' during cleanup. ret = " << ret << dendl;
continue;
ldpp_dout(dpp, 20) << "INFO: successfully deleted object holding bucket logging object name for bucket '" <<
bucket->get_key() << "' during source cleanup" << dendl;
// since the object holding the name was deleted, we cannot fetch the last commited name from it
- if (const int ret = target_bucket->commit_logging_object(obj_name, y, dpp, conf->target_prefix, nullptr); ret < 0) {
+ if (const int ret = target_bucket->commit_logging_object(obj_name, y, dpp, conf->target_prefix, nullptr, false); ret < 0) {
ldpp_dout(dpp, 5) << "WARNING: could not commit pending logging object of bucket '" <<
bucket->get_key() << "' during source cleanup. ret = " << ret << dendl;
return 0;
optional_yield y,
bool must_commit,
RGWObjVersionTracker* objv_tracker,
+ bool async,
std::string* last_committed,
std::string* err_message = nullptr);
exit(1);
}
- driver = DriverManager::get_storage(&dp, g_ceph_context, cfg, context_pool, site, false, false, false, false, false, false, false, true, null_yield, cfgstore.get());
+ driver = DriverManager::get_storage(&dp, g_ceph_context, cfg, context_pool, site, false, false, false, false, false, false, false, false, true, null_yield, cfgstore.get());
if (!driver) {
std::cerr << "couldn't init storage provider" << std::endl;
return EIO;
cct->_conf->rgw_enable_quota_threads,
cct->_conf->rgw_run_sync_thread,
cct->_conf.get_val<bool>("rgw_dynamic_resharding"),
- true, true, null_yield, env.cfgstore, // run notification thread
+ true,
+ true, // run notification thread
+ true, // run bucket logging thread
+ null_yield, env.cfgstore,
cct->_conf->rgw_cache_enabled);
}
y,
false, // rollover should happen even if commit failed
&objv_tracker,
+ false,
&old_obj); ret < 0) {
ldpp_dout(this, 1) << "WARNING: failed to flush pending logging object '" << obj_name << "'"
<< " to target bucket '" << target_bucket_id << "'. "
ldpp_dout(this, 5) << "INFO: no pending logging object in logging bucket '" << target_bucket_id << "'. new object should be created" << dendl;
}
const auto region = driver->get_zone()->get_zonegroup().get_api_name();
- op_ret = rgw::bucketlogging::rollover_logging_object(configuration, target_bucket, obj_name, this, region, source_bucket, y, true, &objv_tracker, &old_obj);
+ op_ret = rgw::bucketlogging::rollover_logging_object(configuration, target_bucket, obj_name, this, region, source_bucket, y, true, &objv_tracker, false, &old_obj);
if (op_ret < 0) {
ldpp_dout(this, 1) << "ERROR: failed to flush pending logging object '" << obj_name << "'"
<< " to logging bucket '" << target_bucket_id << "'. "
bool quota_threads,
bool run_sync_thread,
bool run_reshard_thread,
- bool run_notification_thread,
+ bool run_notification_thread,
+ bool run_bucket_logging_thread,
bool use_cache,
bool use_gc,
bool background_tasks,
.set_run_sync_thread(run_sync_thread)
.set_run_reshard_thread(run_reshard_thread)
.set_run_notification_thread(run_notification_thread)
+ .set_run_bucket_logging_thread(run_bucket_logging_thread)
.init_begin(cct, dpp, background_tasks, site_config, cfgstore) < 0) {
delete driver;
return nullptr;
.set_run_sync_thread(run_sync_thread)
.set_run_reshard_thread(run_reshard_thread)
.set_run_notification_thread(run_notification_thread)
+ .set_run_bucket_logging_thread(run_bucket_logging_thread)
.init_begin(cct, dpp, background_tasks, site_config, cfgstore) < 0) {
delete driver;
return nullptr;
RGWObjVersionTracker* objv_tracker) = 0;
/** Move the pending bucket logging object into the bucket
if "last_committed" is not null, it will be set to the name of the last committed object
+ if async is true, write the entry to the commit lists to be processed by the BucketLoggingManager
* */
- virtual int commit_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp, const std::string& prefix, std::string* last_committed) = 0;
+ virtual int commit_logging_object(const std::string& obj_name, optional_yield y,
+ const DoutPrefixProvider *dpp, const std::string& prefix,
+ std::string* last_committed, bool async) = 0;
//** Remove the pending bucket logging object */
- virtual int remove_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp) = 0;
+ virtual int remove_logging_object(const std::string& obj_name, const std::string& prefix, optional_yield y, const DoutPrefixProvider *dpp) = 0;
/** Write a record to the pending bucket logging object */
- virtual int write_logging_object(const std::string& obj_name, const std::string& record, optional_yield y, const DoutPrefixProvider *dpp, bool async_completion) = 0;
+ virtual int write_logging_object(const std::string& obj_name, const std::string& record, const std::string& prefix, optional_yield y, const DoutPrefixProvider *dpp, bool async_completion) = 0;
/* dang - This is temporary, until the API is completed */
virtual rgw_bucket& get_key() = 0;
bool run_sync_thread,
bool run_reshard_thread,
bool run_notification_thread,
+ bool run_bucket_logging_thread,
bool background_tasks,
optional_yield y,
rgw::sal::ConfigStore* cfgstore,
run_sync_thread,
run_reshard_thread,
run_notification_thread,
+ run_bucket_logging_thread,
use_cache, use_gc,
background_tasks, y, cfgstore, admin);
return driver;
bool quota_threads,
bool run_sync_thread,
bool run_reshard_thread,
- bool run_notification_thread,
+ bool run_notification_thread,
+ bool run_bucket_logging_thread,
bool use_metadata_cache,
bool use_gc, bool background_tasks,
optional_yield y, rgw::sal::ConfigStore* cfgstore, bool admin);
RGWObjVersionTracker* objv_tracker) override {
return next->remove_logging_object_name(prefix, y, dpp, objv_tracker);
}
- int commit_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp, const std::string& prefix, std::string* last_committed) override {
- return next->commit_logging_object(obj_name, y, dpp, prefix, last_committed);
+ int commit_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp, const std::string& prefix, std::string* last_committed, bool async) override {
+ return next->commit_logging_object(obj_name, y, dpp, prefix, last_committed, async);
}
- int remove_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp) override {
- return next->remove_logging_object(obj_name, y, dpp);
+ int remove_logging_object(const std::string& obj_name, const std::string& prefix, optional_yield y, const DoutPrefixProvider *dpp) override {
+ return next->remove_logging_object(obj_name, prefix, y, dpp);
}
- int write_logging_object(const std::string& obj_name, const std::string& record, optional_yield y, const DoutPrefixProvider *dpp, bool async_completion) override {
- return next->write_logging_object(obj_name, record, y, dpp, async_completion);
+ int write_logging_object(const std::string& obj_name, const std::string& record, const std::string& prefix, optional_yield y, const DoutPrefixProvider *dpp, bool async_completion) override {
+ return next->write_logging_object(obj_name, record, prefix, y, dpp, async_completion);
}
virtual rgw_bucket& get_key() override { return next->get_key(); }
optional_yield y,
const DoutPrefixProvider *dpp,
RGWObjVersionTracker* objv_tracker) override { return 0; }
- int commit_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp, const std::string& prefix, std::string* last_committed) override { return 0; }
- int remove_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp) override { return 0; }
- int write_logging_object(const std::string& obj_name, const std::string& record, optional_yield y, const DoutPrefixProvider *dpp, bool async_completion) override {
+ int commit_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp, const std::string& prefix, std::string* last_committed, bool async) override { return 0; }
+ int remove_logging_object(const std::string& obj_name, const std::string& prefix, optional_yield y, const DoutPrefixProvider *dpp) override { return 0; }
+ int write_logging_object(const std::string& obj_name, const std::string& record, const std::string& prefix, optional_yield y, const DoutPrefixProvider *dpp, bool async_completion) override {
return 0;
}
JSONDecoder::decode_json("topics_pool", topics_pool, obj);
JSONDecoder::decode_json("account_pool", account_pool, obj);
JSONDecoder::decode_json("group_pool", group_pool, obj);
+ JSONDecoder::decode_json("bucket_logging_pool", bucket_logging_pool, obj);
JSONDecoder::decode_json("system_key", system_key, obj);
JSONDecoder::decode_json("placement_pools", placement_pools, obj);
JSONDecoder::decode_json("tier_config", tier_config, obj);
encode_json("topics_pool", topics_pool, f);
encode_json("account_pool", account_pool, f);
encode_json("group_pool", group_pool, f);
+ encode_json("bucket_logging_pool", bucket_logging_pool, f);
encode_json_plain("system_key", system_key, f);
encode_json("placement_pools", placement_pools, f);
encode_json("tier_config", tier_config, f);
pools.insert(info.dedup_pool);
pools.insert(info.gc_pool);
pools.insert(info.log_pool);
+ pools.insert(info.bucket_logging_pool);
pools.insert(info.intent_log_pool);
pools.insert(info.usage_log_pool);
pools.insert(info.user_keys_pool);
info.otp_pool = fix_zone_pool_dup(pools, info.name, ".rgw.otp", info.otp_pool);
info.oidc_pool = fix_zone_pool_dup(pools, info.name, ".rgw.meta:oidc", info.oidc_pool);
info.notif_pool = fix_zone_pool_dup(pools, info.name, ".rgw.log:notif", info.notif_pool);
+ info.bucket_logging_pool = fix_zone_pool_dup(pools, info.name, ".rgw.log:logging", info.bucket_logging_pool);
info.topics_pool =
fix_zone_pool_dup(pools, info.name, ".rgw.meta:topics", info.topics_pool);
info.account_pool = fix_zone_pool_dup(pools, info.name, ".rgw.meta:accounts", info.account_pool);
rgw_pool account_pool;
rgw_pool group_pool;
rgw_pool dedup_pool;
+ rgw_pool bucket_logging_pool;
RGWAccessKey system_key;
const std::string& get_compression_type(const rgw_placement_rule& placement_rule) const;
void encode(bufferlist& bl) const {
- ENCODE_START(17, 1, bl);
+ ENCODE_START(18, 1, bl);
encode(domain_root, bl);
encode(control_pool, bl);
encode(gc_pool, bl);
encode(group_pool, bl);
encode(restore_pool, bl);
encode(dedup_pool, bl);
+ encode(bucket_logging_pool, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::const_iterator& bl) {
- DECODE_START(17, bl);
+ DECODE_START(18, bl);
decode(domain_root, bl);
decode(control_pool, bl);
decode(gc_pool, bl);
} else {
dedup_pool = name + ".rgw.dedup";
}
+ if (struct_v >= 18) {
+ decode(bucket_logging_pool, bl);
+ } else {
+ bucket_logging_pool = log_pool.name + ":logging";
+ }
DECODE_FINISH(bl);
}
void dump(Formatter *f) const;
bucket radoslist list rados objects backing bucket's objects
bucket logging flush flush pending log records object of source bucket to the log bucket
bucket logging info get info on bucket logging configuration on source bucket or list of sources in log bucket
+ bucket logging list list the log objects pending commit for the source bucket
bi get retrieve bucket index object entries
bi put store bucket index object entries
bi list list raw bucket index entries
false,
false,
false,
- true, true, null_yield, cfgstore.get(),
+ true, true, true, null_yield, cfgstore.get(),
false));
if (!store) {
std::cerr << "couldn't init storage provider" << std::endl;