DECODE_FINISH(bl);
}
};
+inline bool operator ==(const cls_log_header& lhs, const cls_log_header& rhs) {
+ return (lhs.max_marker == rhs.max_marker &&
+ lhs.max_time == rhs.max_time);
+}
WRITE_CLASS_ENCODER(cls_log_header)
.set_description("mclock limit for metadata requests")
.add_see_also("rgw_dmclock_metadata_res")
.add_see_also("rgw_dmclock_metadata_wgt"),
+
+ Option("rgw_data_log_backing", Option::TYPE_STR, Option::LEVEL_ADVANCED)
+ .set_default("auto")
+ .set_enum_allowed( { "auto", "fifo", "omap" } )
+ .set_description("Backing store for the RGW data sync log")
+ .set_long_description(
+ "Whether to use the older OMAP backing store or the high performance "
+ "FIFO based backing store. Auto uses whatever already exists "
+ "but will default to FIFO if there isn't an existing log. Either of "
+ "the explicit options will cause startup to fail if the other log is "
+ "still around."),
});
}
rgw_kms.cc
rgw_url.cc
rgw_oidc_provider
+ rgw_datalog.cc
cls_fifo_legacy.cc)
if(WITH_RADOSGW_AMQP_ENDPOINT)
add_executable(radosgwd radosgw.cc)
target_link_libraries(radosgwd radosgw librados
cls_rgw_client cls_otp_client cls_lock_client cls_refcount_client
- cls_log_client cls_timeindex_client
+ cls_log_client cls_timeindex_client neorados_cls_fifo
cls_version_client cls_user_client
global
${FCGI_LIBRARY} ${LIB_RESOLV}
add_executable(radosgw-admin ${radosgw_admin_srcs})
target_link_libraries(radosgw-admin ${rgw_libs} librados
cls_rgw_client cls_otp_client cls_lock_client cls_refcount_client
- cls_log_client cls_timeindex_client
+ cls_log_client cls_timeindex_client neorados_cls_fifo
cls_version_client cls_user_client
global ${FCGI_LIBRARY} ${LIB_RESOLV}
${CURL_LIBRARIES} ${EXPAT_LIBRARIES} ${BLKID_LIBRARIES})
add_executable(radosgw-es ${radosgw_es_srcs})
target_link_libraries(radosgw-es ${rgw_libs} librados
cls_rgw_client cls_otp_client cls_lock_client cls_refcount_client
- cls_log_client cls_timeindex_client
+ cls_log_client cls_timeindex_client neorados_cls_fifo
cls_version_client cls_user_client
global ${FCGI_LIBRARY} ${LIB_RESOLV}
${CURL_LIBRARIES} ${EXPAT_LIBRARIES} ${BLKID_LIBRARIES})
add_executable(radosgw-object-expirer ${radosgw_object_expirer_srcs})
target_link_libraries(radosgw-object-expirer ${rgw_libs} librados
cls_rgw_client cls_otp_client cls_lock_client cls_refcount_client
- cls_log_client cls_timeindex_client
+ cls_log_client cls_timeindex_client neorados_cls_fifo
cls_version_client cls_user_client
global ${FCGI_LIBRARY} ${LIB_RESOLV}
${CURL_LIBRARIES} ${EXPAT_LIBRARIES})
cls_refcount_client
cls_log_client
cls_timeindex_client
+ neorados_cls_fifo
cls_version_client
cls_user_client
global
#include "rgw_rados.h"
#include "rgw_acl.h"
#include "rgw_acl_s3.h"
+#include "rgw_datalog.h"
#include "rgw_lc.h"
#include "rgw_log.h"
#include "rgw_formats.h"
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab ft=cpp
-#include <errno.h>
-
-#include <string>
+#include <cerrno>
#include <map>
#include <sstream>
+#include <string>
#include <string_view>
#include <boost/format.hpp>
-#undef FMT_HEADER_ONLY
-#define FMT_HEADER_ONLY 1
-#include "fmt/format.h"
-
#include "common/errno.h"
#include "common/ceph_json.h"
#include "include/scope_guard.h"
+#include "rgw_datalog.h"
#include "rgw_rados.h"
#include "rgw_zone.h"
#include "rgw_acl.h"
return fix_bucket_obj_expiry(store, admin_bucket.get_bucket_info(), flusher, dry_run);
}
-void rgw_data_change::dump(Formatter *f) const
-{
- string type;
- switch (entity_type) {
- case ENTITY_TYPE_BUCKET:
- type = "bucket";
- break;
- default:
- type = "unknown";
- }
- encode_json("entity_type", type, f);
- encode_json("key", key, f);
- utime_t ut(timestamp);
- encode_json("timestamp", ut, f);
-}
-
-void rgw_data_change::decode_json(JSONObj *obj) {
- string s;
- JSONDecoder::decode_json("entity_type", s, obj);
- if (s == "bucket") {
- entity_type = ENTITY_TYPE_BUCKET;
- } else {
- entity_type = ENTITY_TYPE_UNKNOWN;
- }
- JSONDecoder::decode_json("key", key, obj);
- utime_t ut;
- JSONDecoder::decode_json("timestamp", ut, obj);
- timestamp = ut.to_real_time();
-}
-
-void rgw_data_change_log_entry::dump(Formatter *f) const
-{
- encode_json("log_id", log_id, f);
- utime_t ut(log_timestamp);
- encode_json("log_timestamp", ut, f);
- encode_json("entry", entry, f);
-}
-
-void rgw_data_change_log_entry::decode_json(JSONObj *obj) {
- JSONDecoder::decode_json("log_id", log_id, obj);
- utime_t ut;
- JSONDecoder::decode_json("log_timestamp", ut, obj);
- log_timestamp = ut.to_real_time();
- JSONDecoder::decode_json("entry", entry, obj);
-}
-
-
-RGWDataChangesLog::RGWDataChangesLog(CephContext* cct)
- : cct(cct),
- num_shards(cct->_conf->rgw_data_log_num_shards),
- changes(cct->_conf->rgw_data_log_changes_size) {}
-
-void RGWDataChangesLog::init(RGWSI_Cls *cls_svc)
-{
- svc.cls = cls_svc;
- assert(svc.cls);
-
- oids = new string[num_shards];
- for (int i = 0; i < num_shards; i++) {
- oids[i] = get_oid(i);
- }
-}
-
-int RGWDataChangesLog::start(const RGWZone* _zone)
-{
- zone = _zone;
- assert(zone);
- renew_thread = make_named_thread("rgw_dt_lg_renew",
- &RGWDataChangesLog::renew_run, this);
- return 0;
-}
-
-int RGWDataChangesLog::choose_oid(const rgw_bucket_shard& bs) {
- const string& name = bs.bucket.name;
- int shard_shift = (bs.shard_id > 0 ? bs.shard_id : 0);
- uint32_t r = (ceph_str_hash_linux(name.c_str(), name.size()) + shard_shift) % num_shards;
-
- return (int)r;
-}
-
-int RGWDataChangesLog::renew_entries()
-{
- if (!zone->log_data)
- return 0;
-
- /* we can't keep the bucket name as part of the cls_log_entry, and we need
- * it later, so we keep two lists under the map */
- map<int, pair<list<rgw_bucket_shard>, list<cls_log_entry> > > m;
-
- lock.lock();
- map<rgw_bucket_shard, bool> entries;
- entries.swap(cur_cycle);
- lock.unlock();
-
- map<rgw_bucket_shard, bool>::iterator iter;
- string section;
- real_time ut = real_clock::now();
- for (iter = entries.begin(); iter != entries.end(); ++iter) {
- const rgw_bucket_shard& bs = iter->first;
-
- int index = choose_oid(bs);
-
- cls_log_entry entry;
-
- rgw_data_change change;
- bufferlist bl;
- change.entity_type = ENTITY_TYPE_BUCKET;
- change.key = bs.get_key();
- change.timestamp = ut;
- encode(change, bl);
-
- svc.cls->timelog.prepare_entry(entry, ut, section, change.key, bl);
-
- m[index].first.push_back(bs);
- m[index].second.emplace_back(std::move(entry));
- }
-
- map<int, pair<list<rgw_bucket_shard>, list<cls_log_entry> > >::iterator miter;
- for (miter = m.begin(); miter != m.end(); ++miter) {
- list<cls_log_entry>& entries = miter->second.second;
-
- real_time now = real_clock::now();
-
- int ret = svc.cls->timelog.add(oids[miter->first], entries, nullptr, true, null_yield);
- if (ret < 0) {
- /* we don't really need to have a special handling for failed cases here,
- * as this is just an optimization. */
- lderr(cct) << "ERROR: svc.cls->timelog.add() returned " << ret << dendl;
- return ret;
- }
-
- real_time expiration = now;
- expiration += make_timespan(cct->_conf->rgw_data_log_window);
-
- list<rgw_bucket_shard>& buckets = miter->second.first;
- list<rgw_bucket_shard>::iterator liter;
- for (liter = buckets.begin(); liter != buckets.end(); ++liter) {
- update_renewed(*liter, expiration);
- }
- }
-
- return 0;
-}
-
-void RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status)
-{
- ceph_assert(ceph_mutex_is_locked(lock));
- if (!changes.find(bs, status)) {
- status = ChangeStatusPtr(new ChangeStatus);
- changes.add(bs, status);
- }
-}
-
-void RGWDataChangesLog::register_renew(rgw_bucket_shard& bs)
-{
- std::lock_guard l{lock};
- cur_cycle[bs] = true;
-}
-
-void RGWDataChangesLog::update_renewed(rgw_bucket_shard& bs, real_time& expiration)
-{
- std::lock_guard l{lock};
- ChangeStatusPtr status;
- _get_change(bs, status);
-
- ldout(cct, 20) << "RGWDataChangesLog::update_renewd() bucket_name=" << bs.bucket.name << " shard_id=" << bs.shard_id << " expiration=" << expiration << dendl;
- status->cur_expiration = expiration;
-}
-
-int RGWDataChangesLog::get_log_shard_id(rgw_bucket& bucket, int shard_id) {
- rgw_bucket_shard bs(bucket, shard_id);
-
- return choose_oid(bs);
-}
-
-bool RGWDataChangesLog::filter_bucket(const rgw_bucket& bucket, optional_yield y) const
-{
- if (!bucket_filter) {
- return true;
- }
-
- return bucket_filter(bucket, y);
-}
-
-std::string RGWDataChangesLog::get_oid(int i) const {
- std::string_view prefix = cct->_conf->rgw_data_log_obj_prefix;
- if (prefix.empty()) {
- prefix = "data_log"sv;
- }
- return fmt::format("{}.{}", prefix, i);
-}
-
-
-int RGWDataChangesLog::add_entry(const RGWBucketInfo& bucket_info, int shard_id) {
- auto& bucket = bucket_info.bucket;
-
- if (!filter_bucket(bucket, null_yield)) {
- return 0;
- }
-
- if (observer) {
- observer->on_bucket_changed(bucket.get_key());
- }
-
- rgw_bucket_shard bs(bucket, shard_id);
-
- int index = choose_oid(bs);
- mark_modified(index, bs);
-
- lock.lock();
-
- ChangeStatusPtr status;
- _get_change(bs, status);
-
- lock.unlock();
-
- real_time now = real_clock::now();
-
- status->lock.lock();
-
- ldout(cct, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name
- << " shard_id=" << shard_id << " now=" << now
- << " cur_expiration=" << status->cur_expiration << dendl;
-
- if (now < status->cur_expiration) {
- /* no need to send, recently completed */
- status->lock.unlock();
-
- register_renew(bs);
- return 0;
- }
-
- RefCountedCond *cond;
-
- if (status->pending) {
- cond = status->cond;
-
- ceph_assert(cond);
-
- status->cond->get();
- status->lock.unlock();
-
- int ret = cond->wait();
- cond->put();
- if (!ret) {
- register_renew(bs);
- }
- return ret;
- }
-
- status->cond = new RefCountedCond;
- status->pending = true;
-
- string& oid = oids[index];
- real_time expiration;
-
- int ret;
-
- do {
- status->cur_sent = now;
-
- expiration = now;
- expiration += ceph::make_timespan(cct->_conf->rgw_data_log_window);
-
- status->lock.unlock();
-
- bufferlist bl;
- rgw_data_change change;
- change.entity_type = ENTITY_TYPE_BUCKET;
- change.key = bs.get_key();
- change.timestamp = now;
- encode(change, bl);
- string section;
-
- ldout(cct, 20) << "RGWDataChangesLog::add_entry() sending update with now=" << now << " cur_expiration=" << expiration << dendl;
-
- ret = svc.cls->timelog.add(oid, now, section, change.key, bl, null_yield);
-
- now = real_clock::now();
-
- status->lock.lock();
-
- } while (!ret && real_clock::now() > expiration);
-
- cond = status->cond;
-
- status->pending = false;
- status->cur_expiration = status->cur_sent; /* time of when operation started, not completed */
- status->cur_expiration += make_timespan(cct->_conf->rgw_data_log_window);
- status->cond = NULL;
- status->lock.unlock();
-
- cond->done(ret);
- cond->put();
-
- return ret;
-}
-
-int RGWDataChangesLog::list_entries(int shard, int max_entries,
- std::vector<rgw_data_change_log_entry>& entries,
- std::optional<std::string_view> marker,
- std::string* out_marker, bool* truncated)
-{
- assert(shard < num_shards);
- std::list<cls_log_entry> log_entries;
-
- int ret = svc.cls->timelog.list(oids[shard], {}, {},
- max_entries, log_entries,
- std::string(marker.value_or("")),
- out_marker, truncated, null_yield);
- if (ret < 0)
- return ret;
-
- for (auto iter = log_entries.begin(); iter != log_entries.end(); ++iter) {
- rgw_data_change_log_entry log_entry;
- log_entry.log_id = iter->id;
- real_time rt = iter->timestamp.to_real_time();
- log_entry.log_timestamp = rt;
- auto liter = iter->data.cbegin();
- try {
- decode(log_entry.entry, liter);
- } catch (buffer::error& err) {
- lderr(cct) << "ERROR: failed to decode data changes log entry" << dendl;
- return -EIO;
- }
- entries.push_back(log_entry);
- }
-
- return 0;
-}
-
-int RGWDataChangesLog::list_entries(int max_entries,
- std::vector<rgw_data_change_log_entry>& entries,
- LogMarker& marker, bool *ptruncated)
-{
- bool truncated;
- entries.clear();
-
- for (; marker.shard < num_shards && (int)entries.size() < max_entries;
- marker.shard++, marker.marker.reset()) {
- int ret = list_entries(marker.shard, max_entries - entries.size(),
- entries, marker.marker, NULL, &truncated);
- if (ret == -ENOENT) {
- continue;
- }
- if (ret < 0) {
- return ret;
- }
- if (truncated) {
- *ptruncated = true;
- return 0;
- }
- }
-
- *ptruncated = (marker.shard < num_shards);
-
- return 0;
-}
-
-int RGWDataChangesLog::get_info(int shard_id, RGWDataChangesLogInfo *info)
-{
- assert(shard_id < num_shards);
- string oid = oids[shard_id];
-
- cls_log_header header;
-
- int ret = svc.cls->timelog.info(oid, &header, null_yield);
- if ((ret < 0) && (ret != -ENOENT))
- return ret;
-
- info->marker = header.max_marker;
- info->last_update = header.max_time.to_real_time();
-
- return 0;
-}
-
-int RGWDataChangesLog::trim_entries(int shard_id, std::string_view marker)
-{
- assert(shard_id < num_shards);
- return svc.cls->timelog.trim(oids[shard_id], {}, {},
- {}, std::string(marker), nullptr, null_yield);
-}
-
-int RGWDataChangesLog::trim_entries(int shard_id, std::string_view marker,
- librados::AioCompletion* c)
-{
- assert(shard_id < num_shards);
- return svc.cls->timelog.trim(oids[shard_id], {}, {},
- {}, std::string(marker), c, null_yield);
-}
-
-bool RGWDataChangesLog::going_down() const
-{
- return down_flag;
-}
-
-RGWDataChangesLog::~RGWDataChangesLog() {
- down_flag = true;
- if (renew_thread.joinable()) {
- renew_stop();
- renew_thread.join();
- }
- delete[] oids;
-}
-
-void RGWDataChangesLog::renew_run() {
- for (;;) {
- dout(2) << "RGWDataChangesLog::ChangesRenewThread: start" << dendl;
- int r = renew_entries();
- if (r < 0) {
- dout(0) << "ERROR: RGWDataChangesLog::renew_entries returned error r=" << r << dendl;
- }
-
- if (going_down())
- break;
-
- int interval = cct->_conf->rgw_data_log_window * 3 / 4;
- std::unique_lock locker{renew_lock};
- renew_cond.wait_for(locker, std::chrono::seconds(interval));
- }
-}
-
-void RGWDataChangesLog::renew_stop()
-{
- std::lock_guard l{renew_lock};
- renew_cond.notify_all();
-}
-
-void RGWDataChangesLog::mark_modified(int shard_id, const rgw_bucket_shard& bs)
-{
- auto key = bs.get_key();
- {
- std::shared_lock rl{modified_lock}; // read lock to check for existence
- auto shard = modified_shards.find(shard_id);
- if (shard != modified_shards.end() && shard->second.count(key)) {
- return;
- }
- }
-
- std::unique_lock wl{modified_lock}; // write lock for insertion
- modified_shards[shard_id].insert(key);
-}
-
-void RGWDataChangesLog::read_clear_modified(map<int, set<string> > &modified)
-{
- std::unique_lock wl{modified_lock};
- modified.swap(modified_shards);
- modified_shards.clear();
-}
-
-std::string_view RGWDataChangesLog::max_marker() const {
- return "99999999"sv;
-}
-
void RGWBucketCompleteInfo::dump(Formatter *f) const {
encode_json("bucket_info", info, f);
encode_json("attrs", attrs, f);
#include <string>
#include <memory>
+#include <variant>
+
+#include <boost/container/flat_map.hpp>
+#include <boost/container/flat_set.hpp>
#include "include/types.h"
#include "rgw_common.h"
#include "services/svc_bucket_types.h"
#include "services/svc_bucket_sync.h"
-
// define as static when RGWBucket implementation completes
extern void rgw_get_buckets_obj(const rgw_user& user_id, string& buckets_obj_id);
class RGWUserCtl;
class RGWBucketCtl;
class RGWZone;
-
+struct RGWZoneParams;
namespace rgw { namespace sal {
class RGWRadosStore;
class RGWBucketList;
static int sync_bucket(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state, string *err_msg = NULL);
};
-
-enum DataLogEntityType {
- ENTITY_TYPE_UNKNOWN = 0,
- ENTITY_TYPE_BUCKET = 1,
-};
-
-struct rgw_data_change {
- DataLogEntityType entity_type;
- std::string key;
- ceph::real_time timestamp;
-
- void encode(ceph::buffer::list& bl) const {
- ENCODE_START(1, 1, bl);
- auto t = std::uint8_t(entity_type);
- encode(t, bl);
- encode(key, bl);
- encode(timestamp, bl);
- ENCODE_FINISH(bl);
- }
-
- void decode(bufferlist::const_iterator& bl) {
- DECODE_START(1, bl);
- std::uint8_t t;
- decode(t, bl);
- entity_type = DataLogEntityType(t);
- decode(key, bl);
- decode(timestamp, bl);
- DECODE_FINISH(bl);
- }
-
- void dump(ceph::Formatter* f) const;
- void decode_json(JSONObj* obj);
-};
-WRITE_CLASS_ENCODER(rgw_data_change)
-
-struct rgw_data_change_log_entry {
- std::string log_id;
- ceph::real_time log_timestamp;
- rgw_data_change entry;
-
- void encode(ceph::buffer::list& bl) const {
- ENCODE_START(1, 1, bl);
- encode(log_id, bl);
- encode(log_timestamp, bl);
- encode(entry, bl);
- ENCODE_FINISH(bl);
- }
-
- void decode(ceph::buffer::list::const_iterator& bl) {
- DECODE_START(1, bl);
- decode(log_id, bl);
- decode(log_timestamp, bl);
- decode(entry, bl);
- DECODE_FINISH(bl);
- }
-
- void dump(ceph::Formatter* f) const;
- void decode_json(JSONObj* obj);
-};
-WRITE_CLASS_ENCODER(rgw_data_change_log_entry)
-
-struct RGWDataChangesLogInfo {
- std::string marker;
- ceph::real_time last_update;
-
- void dump(ceph::Formatter* f) const;
- void decode_json(JSONObj* obj);
-};
-
-namespace rgw {
-struct BucketChangeObserver;
-}
-
-struct RGWDataChangesLogMarker {
- int shard = 0;
- std::optional<std::string> marker;
-
- RGWDataChangesLogMarker() = default;
-};
-
-class RGWDataChangesLog {
- CephContext *cct;
- rgw::BucketChangeObserver *observer = nullptr;
- const RGWZone* zone;
-
- struct Svc {
- RGWSI_Cls *cls{nullptr};
- } svc;
-
- int num_shards;
- std::string* oids;
-
- ceph::mutex lock = ceph::make_mutex("RGWDataChangesLog::lock");
- ceph::shared_mutex modified_lock =
- ceph::make_shared_mutex("RGWDataChangesLog::modified_lock");
- std::map<int, set<string> > modified_shards;
-
- std::atomic<bool> down_flag = { false };
-
- struct ChangeStatus {
- std::shared_ptr<const rgw_sync_policy_info> sync_policy;
- ceph::real_time cur_expiration;
- ceph::real_time cur_sent;
- bool pending = false;
- RefCountedCond *cond = nullptr;
- ceph::mutex lock =
- ceph::make_mutex("RGWDataChangesLog::ChangeStatus");
- };
-
- using ChangeStatusPtr = std::shared_ptr<ChangeStatus>;
-
- lru_map<rgw_bucket_shard, ChangeStatusPtr> changes;
-
- std::map<rgw_bucket_shard, bool> cur_cycle;
-
- void _get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status);
- void register_renew(rgw_bucket_shard& bs);
- void update_renewed(rgw_bucket_shard& bs, ceph::real_time& expiration);
-
- ceph::mutex renew_lock = ceph::make_mutex("ChangesRenewThread::lock");
- ceph::condition_variable renew_cond;
- void renew_run();
- void renew_stop();
- std::thread renew_thread;
-
- std::function<bool(const rgw_bucket& bucket, optional_yield y)> bucket_filter;
- int choose_oid(const rgw_bucket_shard& bs);
- bool going_down() const;
- bool filter_bucket(const rgw_bucket& bucket, optional_yield y) const;
- int renew_entries();
-
-public:
-
- RGWDataChangesLog(CephContext* cct);
- ~RGWDataChangesLog();
-
- void init(RGWSI_Cls *cls_svc);
- int start(const RGWZone* _zone);
-
- int add_entry(const RGWBucketInfo& bucket_info, int shard_id);
- int get_log_shard_id(rgw_bucket& bucket, int shard_id);
- int list_entries(int shard, int max_entries,
- std::vector<rgw_data_change_log_entry>& entries,
- std::optional<std::string_view> marker,
- std::string* out_marker, bool* truncated);
- int trim_entries(int shard_id, std::string_view marker);
- int trim_entries(int shard_id, std::string_view marker,
- librados::AioCompletion* c); // :(
- int get_info(int shard_id, RGWDataChangesLogInfo *info);
-
- using LogMarker = RGWDataChangesLogMarker;
-
- int list_entries(int max_entries,
- std::vector<rgw_data_change_log_entry>& entries,
- LogMarker& marker, bool* ptruncated);
-
- void mark_modified(int shard_id, const rgw_bucket_shard& bs);
- void read_clear_modified(map<int, set<string> > &modified);
-
- void set_observer(rgw::BucketChangeObserver *observer) {
- this->observer = observer;
- }
-
- void set_bucket_filter(decltype(bucket_filter)&& f) {
- bucket_filter = std::move(f);
- }
- // a marker that compares greater than any other
- std::string_view max_marker() const;
- std::string get_oid(int shard_id) const;
-};
-
struct rgw_ep_info {
RGWBucketEntryPoint &ep;
map<std::string, buffer::list>& attrs;
#include "rgw_bucket.h"
#include "rgw_bucket_sync.h"
#include "rgw_bucket_sync_cache.h"
+#include "rgw_datalog.h"
#include "rgw_metadata.h"
#include "rgw_sync_counters.h"
#include "rgw_sync_error_repo.h"
#include "rgw_http_client.h"
#include "rgw_sal.h"
+#include "rgw_datalog.h"
#include "rgw_sync_module.h"
#include "rgw_sync_trace.h"
#include "rgw_sync_policy.h"
};
class RGWRados;
-class RGWDataChangesLogInfo;
class RGWRemoteDataLog : public RGWCoroutinesManager {
const DoutPrefixProvider *dpp;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#include <vector>
+
+#include "common/debug.h"
+#include "common/errno.h"
+#include "common/error_code.h"
+
+#include "common/async/blocked_completion.h"
+#include "common/async/librados_completion.h"
+
+#include "cls/fifo/cls_fifo_types.h"
+
+#include "cls_fifo_legacy.h"
+#include "rgw_datalog.h"
+#include "rgw_tools.h"
+
+#define dout_context g_ceph_context
+static constexpr auto dout_subsys = ceph_subsys_rgw;
+
+namespace bs = boost::system;
+
+void rgw_data_change::dump(ceph::Formatter *f) const
+{
+ std::string type;
+ switch (entity_type) {
+ case ENTITY_TYPE_BUCKET:
+ type = "bucket";
+ break;
+ default:
+ type = "unknown";
+ }
+ encode_json("entity_type", type, f);
+ encode_json("key", key, f);
+ utime_t ut(timestamp);
+ encode_json("timestamp", ut, f);
+}
+
+void rgw_data_change::decode_json(JSONObj *obj) {
+ std::string s;
+ JSONDecoder::decode_json("entity_type", s, obj);
+ if (s == "bucket") {
+ entity_type = ENTITY_TYPE_BUCKET;
+ } else {
+ entity_type = ENTITY_TYPE_UNKNOWN;
+ }
+ JSONDecoder::decode_json("key", key, obj);
+ utime_t ut;
+ JSONDecoder::decode_json("timestamp", ut, obj);
+ timestamp = ut.to_real_time();
+}
+
+void rgw_data_change_log_entry::dump(Formatter *f) const
+{
+ encode_json("log_id", log_id, f);
+ utime_t ut(log_timestamp);
+ encode_json("log_timestamp", ut, f);
+ encode_json("entry", entry, f);
+}
+
+void rgw_data_change_log_entry::decode_json(JSONObj *obj) {
+ JSONDecoder::decode_json("log_id", log_id, obj);
+ utime_t ut;
+ JSONDecoder::decode_json("log_timestamp", ut, obj);
+ log_timestamp = ut.to_real_time();
+ JSONDecoder::decode_json("entry", entry, obj);
+}
+
+int RGWDataChangesBE::remove(CephContext* cct, librados::Rados* rados,
+ const rgw_pool& log_pool)
+{
+ auto num_shards = cct->_conf->rgw_data_log_num_shards;
+ librados::IoCtx ioctx;
+ auto r = rgw_init_ioctx(rados, log_pool.name, ioctx,
+ false, false);
+ if (r < 0) {
+ if (r == -ENOENT) {
+ return 0;
+ } else {
+ lderr(cct) << __PRETTY_FUNCTION__
+ << ": rgw_init_ioctx failed: " << log_pool.name
+ << ": " << cpp_strerror(-r) << dendl;
+ return r;
+ }
+ }
+ for (auto i = 0; i < num_shards; ++i) {
+ auto oid = get_oid(cct, i);
+ librados::ObjectWriteOperation op;
+ op.remove();
+ auto r = rgw_rados_operate(ioctx, oid, &op, null_yield);
+ if (r < 0 && r != -ENOENT) {
+ lderr(cct) << __PRETTY_FUNCTION__
+ << ": remove failed: " << log_pool.name << "/" << oid
+ << ": " << cpp_strerror(-r) << dendl;
+ }
+ }
+ return 0;
+}
+
+
+class RGWDataChangesOmap final : public RGWDataChangesBE {
+ using centries = std::list<cls_log_entry>;
+ RGWSI_Cls& cls;
+ std::vector<std::string> oids;
+public:
+ RGWDataChangesOmap(CephContext* cct, RGWSI_Cls& cls)
+ : RGWDataChangesBE(cct), cls(cls) {
+ auto num_shards = cct->_conf->rgw_data_log_num_shards;
+ oids.reserve(num_shards);
+ for (auto i = 0; i < num_shards; ++i) {
+ oids.push_back(get_oid(i));
+ }
+ }
+ ~RGWDataChangesOmap() override = default;
+ static int exists(CephContext* cct, RGWSI_Cls& cls, bool* exists,
+ bool* has_entries) {
+ auto num_shards = cct->_conf->rgw_data_log_num_shards;
+ std::string out_marker;
+ bool truncated = false;
+ std::list<cls_log_entry> log_entries;
+ const cls_log_header empty_info;
+ *exists = false;
+ *has_entries = false;
+ for (auto i = 0; i < num_shards; ++i) {
+ cls_log_header info;
+ auto oid = get_oid(cct, i);
+ auto r = cls.timelog.info(oid, &info, null_yield);
+ if (r < 0 && r != -ENOENT) {
+ lderr(cct) << __PRETTY_FUNCTION__
+ << ": failed to get info " << oid << ": " << cpp_strerror(-r)
+ << dendl;
+ return r;
+ } else if ((r == -ENOENT) || (info == empty_info)) {
+ continue;
+ }
+ *exists = true;
+ r = cls.timelog.list(oid, {}, {}, 100, log_entries, "", &out_marker,
+ &truncated, null_yield);
+ if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__
+ << ": failed to list " << oid << ": " << cpp_strerror(-r)
+ << dendl;
+ return r;
+ } else if (!log_entries.empty()) {
+ *has_entries = true;
+ break; // No reason to continue, once we have both existence
+ // AND non-emptiness
+ }
+ }
+ return 0;
+ }
+
+ void prepare(ceph::real_time ut, const std::string& key,
+ ceph::buffer::list&& entry, entries& out) override {
+ if (!std::holds_alternative<centries>(out)) {
+ ceph_assert(std::visit([](const auto& v) { return std::empty(v); }, out));
+ out = centries();
+ }
+
+ cls_log_entry e;
+ cls.timelog.prepare_entry(e, ut, {}, key, entry);
+ std::get<centries>(out).push_back(std::move(e));
+ }
+ int push(int index, entries&& items) override {
+ auto r = cls.timelog.add(oids[index], std::get<centries>(items),
+ nullptr, true, null_yield);
+ if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__
+ << ": failed to push to " << oids[index] << cpp_strerror(-r)
+ << dendl;
+ }
+ return r;
+ }
+ int push(int index, ceph::real_time now,
+ const std::string& key,
+ ceph::buffer::list&& bl) override {
+ auto r = cls.timelog.add(oids[index], now, {}, key, bl, null_yield);
+ if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__
+ << ": failed to push to " << oids[index]
+ << cpp_strerror(-r) << dendl;
+ }
+ return r;
+ }
+ int list(int index, int max_entries,
+ std::vector<rgw_data_change_log_entry>& entries,
+ std::optional<std::string_view> marker,
+ std::string* out_marker, bool* truncated) override {
+ std::list<cls_log_entry> log_entries;
+ auto r = cls.timelog.list(oids[index], {}, {},
+ max_entries, log_entries,
+ std::string(marker.value_or("")),
+ out_marker, truncated, null_yield);
+ if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__
+ << ": failed to list " << oids[index]
+ << cpp_strerror(-r) << dendl;
+ return r;
+ }
+ for (auto iter = log_entries.begin(); iter != log_entries.end(); ++iter) {
+ rgw_data_change_log_entry log_entry;
+ log_entry.log_id = iter->id;
+ auto rt = iter->timestamp.to_real_time();
+ log_entry.log_timestamp = rt;
+ auto liter = iter->data.cbegin();
+ try {
+ decode(log_entry.entry, liter);
+ } catch (ceph::buffer::error& err) {
+ lderr(cct) << __PRETTY_FUNCTION__
+ << ": failed to decode data changes log entry: "
+ << err.what() << dendl;
+ return -EIO;
+ }
+ entries.push_back(log_entry);
+ }
+ return 0;
+ }
+ int get_info(int index, RGWDataChangesLogInfo *info) override {
+ cls_log_header header;
+ auto r = cls.timelog.info(oids[index], &header, null_yield);
+ if (r == -ENOENT) r = 0;
+ if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__
+ << ": failed to get info from " << oids[index]
+ << cpp_strerror(-r) << dendl;
+ } else {
+ info->marker = header.max_marker;
+ info->last_update = header.max_time.to_real_time();
+ }
+ return r;
+ }
+ int trim(int index, std::string_view marker) override {
+ auto r = cls.timelog.trim(oids[index], {}, {},
+ {}, std::string(marker), nullptr,
+ null_yield);
+
+ if (r == -ENOENT) r = 0;
+ if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__
+ << ": failed to get info from " << oids[index]
+ << cpp_strerror(-r) << dendl;
+ }
+ return r;
+ }
+ int trim(int index, std::string_view marker,
+ librados::AioCompletion* c) override {
+ auto r = cls.timelog.trim(oids[index], {}, {},
+ {}, std::string(marker), c, null_yield);
+
+ if (r == -ENOENT) r = 0;
+ if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__
+ << ": failed to get info from " << oids[index]
+ << cpp_strerror(-r) << dendl;
+ }
+ return r;
+ }
+ std::string_view max_marker() const override {
+ return "99999999"sv;
+ }
+};
+
+class RGWDataChangesFIFO final : public RGWDataChangesBE {
+ using centries = std::vector<ceph::buffer::list>;
+ std::vector<std::unique_ptr<rgw::cls::fifo::FIFO>> fifos;
+public:
+ RGWDataChangesFIFO(CephContext* cct, librados::Rados* rados,
+ const rgw_pool& log_pool)
+ : RGWDataChangesBE(cct) {
+ librados::IoCtx ioctx;
+ auto shards = cct->_conf->rgw_data_log_num_shards;
+ auto r = rgw_init_ioctx(rados, log_pool.name, ioctx,
+ true, false);
+ if (r < 0) {
+ throw bs::system_error(ceph::to_error_code(r));
+ }
+ fifos.resize(shards);
+ for (auto i = 0; i < shards; ++i) {
+ r = rgw::cls::fifo::FIFO::create(ioctx, get_oid(i),
+ &fifos[i], null_yield);
+ if (r < 0) {
+ throw bs::system_error(ceph::to_error_code(r));
+ }
+ }
+ ceph_assert(fifos.size() == unsigned(shards));
+ ceph_assert(std::none_of(fifos.cbegin(), fifos.cend(),
+ [](const auto& p) {
+ return p == nullptr;
+ }));
+ }
+ ~RGWDataChangesFIFO() override = default;
+ static int exists(CephContext* cct, librados::Rados* rados,
+ const rgw_pool& log_pool, bool* exists, bool* has_entries) {
+ auto num_shards = cct->_conf->rgw_data_log_num_shards;
+ librados::IoCtx ioctx;
+ auto r = rgw_init_ioctx(rados, log_pool.name, ioctx,
+ false, false);
+ if (r < 0) {
+ if (r == -ENOENT) {
+ return 0;
+ } else {
+ lderr(cct) << __PRETTY_FUNCTION__
+ << ": rgw_init_ioctx failed: " << log_pool.name
+ << ": " << cpp_strerror(-r) << dendl;
+ return r;
+ }
+ }
+ *exists = false;
+ *has_entries = false;
+ for (auto i = 0; i < num_shards; ++i) {
+ std::unique_ptr<rgw::cls::fifo::FIFO> fifo;
+ auto oid = get_oid(cct, i);
+ std::vector<rgw::cls::fifo::list_entry> log_entries;
+ bool more = false;
+ auto r = rgw::cls::fifo::FIFO::open(ioctx, oid,
+ &fifo, null_yield);
+ if (r == -ENOENT || r == -ENODATA) {
+ continue;
+ } else if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__
+ << ": unable to open FIFO: " << log_pool << "/" << oid
+ << ": " << cpp_strerror(-r) << dendl;
+ return r;
+ }
+ *exists = true;
+ r = fifo->list(1, nullopt, &log_entries, &more,
+ null_yield);
+ if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__
+ << ": unable to list entries: " << log_pool << "/" << oid
+ << ": " << cpp_strerror(-r) << dendl;
+ } else if (!log_entries.empty()) {
+ *has_entries = true;
+ break;
+ }
+ }
+ return 0;
+ }
+ void prepare(ceph::real_time, const std::string&,
+ ceph::buffer::list&& entry, entries& out) override {
+ if (!std::holds_alternative<centries>(out)) {
+ ceph_assert(std::visit([](auto& v) { return std::empty(v); }, out));
+ out = centries();
+ }
+ std::get<centries>(out).push_back(std::move(entry));
+ }
+ int push(int index, entries&& items) override {
+ auto r = fifos[index]->push(std::get<centries>(items), null_yield);
+ if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__
+ << ": unable to push to FIFO: " << get_oid(index)
+ << ": " << cpp_strerror(-r) << dendl;
+ }
+ return r;
+ }
+ int push(int index, ceph::real_time,
+ const std::string&,
+ ceph::buffer::list&& bl) override {
+ auto r = fifos[index]->push(std::move(bl), null_yield);
+ if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__
+ << ": unable to push to FIFO: " << get_oid(index)
+ << ": " << cpp_strerror(-r) << dendl;
+ }
+ return r;
+ }
+ int list(int index, int max_entries,
+ std::vector<rgw_data_change_log_entry>& entries,
+ std::optional<std::string_view> marker,
+ std::string* out_marker, bool* truncated) override {
+ std::vector<rgw::cls::fifo::list_entry> log_entries;
+ bool more = false;
+ auto r = fifos[index]->list(max_entries, marker, &log_entries, &more,
+ null_yield);
+ if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__
+ << ": unable to list FIFO: " << get_oid(index)
+ << ": " << cpp_strerror(-r) << dendl;
+ return r;
+ }
+ for (const auto& entry : log_entries) {
+ rgw_data_change_log_entry log_entry;
+ log_entry.log_id = entry.marker;
+ log_entry.log_timestamp = entry.mtime;
+ auto liter = entry.data.cbegin();
+ try {
+ decode(log_entry.entry, liter);
+ } catch (const buffer::error& err) {
+ lderr(cct) << __PRETTY_FUNCTION__
+ << ": failed to decode data changes log entry: "
+ << err.what() << dendl;
+ return -EIO;
+ }
+ entries.push_back(std::move(log_entry));
+ }
+ if (truncated)
+ *truncated = more;
+ if (out_marker && !log_entries.empty()) {
+ *out_marker = log_entries.back().marker;
+ }
+ return 0;
+ }
+ int get_info(int index, RGWDataChangesLogInfo *info) override {
+ auto& fifo = fifos[index];
+ auto r = fifo->read_meta(null_yield);
+ if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__
+ << ": unable to get FIFO metadata: " << get_oid(index)
+ << ": " << cpp_strerror(-r) << dendl;
+ return r;
+ }
+ auto m = fifo->meta();
+ auto p = m.head_part_num;
+ if (p < 0) {
+ info->marker = rgw::cls::fifo::marker{}.to_string();
+ info->last_update = ceph::real_clock::zero();
+ return 0;
+ }
+ rgw::cls::fifo::part_info h;
+ r = fifo->get_part_info(p, &h, null_yield);
+ if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__
+ << ": unable to get part info: " << get_oid(index) << "/" << p
+ << ": " << cpp_strerror(-r) << dendl;
+ return r;
+ }
+ info->marker = rgw::cls::fifo::marker{p, h.last_ofs}.to_string();
+ info->last_update = h.max_time;
+ return 0;
+ }
+ int trim(int index, std::string_view marker) override {
+ auto r = fifos[index]->trim(marker, null_yield);
+ if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__
+ << ": unable to trim FIFO: " << get_oid(index)
+ << ": " << cpp_strerror(-r) << dendl;
+ }
+ return r;
+ }
+ int trim(int index, std::string_view marker,
+ librados::AioCompletion* c) override {
+ int r = 0;
+ if (marker == rgw::cls::fifo::marker(0, 0).to_string()) {
+ auto pc = c->pc;
+ pc->get();
+ pc->lock.lock();
+ pc->rval = 0;
+ pc->complete = true;
+ pc->lock.unlock();
+ auto cb_complete = pc->callback_complete;
+ auto cb_complete_arg = pc->callback_complete_arg;
+ if (cb_complete)
+ cb_complete(pc, cb_complete_arg);
+
+ auto cb_safe = pc->callback_safe;
+ auto cb_safe_arg = pc->callback_safe_arg;
+ if (cb_safe)
+ cb_safe(pc, cb_safe_arg);
+
+ pc->lock.lock();
+ pc->callback_complete = NULL;
+ pc->callback_safe = NULL;
+ pc->cond.notify_all();
+ pc->put_unlock();
+ } else {
+ r = fifos[index]->trim(marker, c);
+ if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__
+ << ": unable to trim FIFO: " << get_oid(index)
+ << ": " << cpp_strerror(-r) << dendl;
+ }
+ }
+ return r;
+ }
+ std::string_view max_marker() const override {
+ static const std::string mm =
+ rgw::cls::fifo::marker::max().to_string();
+ return std::string_view(mm);
+ }
+};
+
+RGWDataChangesLog::RGWDataChangesLog(CephContext* cct)
+ : cct(cct),
+ num_shards(cct->_conf->rgw_data_log_num_shards),
+ changes(cct->_conf->rgw_data_log_changes_size) {}
+
+int RGWDataChangesLog::start(const RGWZone* _zone,
+ const RGWZoneParams& zoneparams,
+ RGWSI_Cls *cls, librados::Rados* lr)
+{
+ zone = _zone;
+ assert(zone);
+ auto backing = cct->_conf.get_val<std::string>("rgw_data_log_backing");
+ // Should be guaranteed by `set_enum_allowed`
+ ceph_assert(backing == "auto" || backing == "fifo" || backing == "omap");
+ auto log_pool = zoneparams.log_pool;
+ bool omapexists = false, omaphasentries = false;
+ auto r = RGWDataChangesOmap::exists(cct, *cls, &omapexists, &omaphasentries);
+ if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__
+ << ": Error when checking for existing Omap datalog backend: "
+ << cpp_strerror(-r) << dendl;
+ }
+ bool fifoexists = false, fifohasentries = false;
+ r = RGWDataChangesFIFO::exists(cct, lr, log_pool, &fifoexists, &fifohasentries);
+ if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__
+ << ": Error when checking for existing FIFO datalog backend: "
+ << cpp_strerror(-r) << dendl;
+ }
+ bool has_entries = omaphasentries || fifohasentries;
+ bool remove = false;
+
+ if (omapexists && fifoexists) {
+ if (has_entries) {
+ lderr(cct) << __PRETTY_FUNCTION__
+ << ": Both Omap and FIFO backends exist, cannot continue."
+ << dendl;
+ return -EINVAL;
+ }
+ ldout(cct, 0)
+ << __PRETTY_FUNCTION__
+ << ": Both Omap and FIFO backends exist, but are empty. Will remove."
+ << dendl;
+ remove = true;
+ }
+ if (backing == "omap" && fifoexists) {
+ if (has_entries) {
+ lderr(cct) << __PRETTY_FUNCTION__
+ << ": Omap requested, but FIFO backend exists, cannot continue."
+ << dendl;
+ return -EINVAL;
+ }
+ ldout(cct, 0) << __PRETTY_FUNCTION__
+ << ": Omap requested, FIFO exists, but is empty. Deleting."
+ << dendl;
+ remove = true;
+ }
+ if (backing == "fifo" && omapexists) {
+ if (has_entries) {
+ lderr(cct) << __PRETTY_FUNCTION__
+ << ": FIFO requested, but Omap backend exists, cannot continue."
+ << dendl;
+ return -EINVAL;
+ }
+ ldout(cct, 0) << __PRETTY_FUNCTION__
+ << ": FIFO requested, Omap exists, but is empty. Deleting."
+ << dendl;
+ remove = true;
+ }
+
+ if (remove) {
+ r = RGWDataChangesBE::remove(cct, lr, log_pool);
+ if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__
+ << ": remove failed, cannot continue."
+ << dendl;
+ return r;
+ }
+ omapexists = false;
+ fifoexists = false;
+ }
+
+ try {
+ if (backing == "omap" || (backing == "auto" && omapexists)) {
+ be = std::make_unique<RGWDataChangesOmap>(cct, *cls);
+ } else if (backing != "omap") {
+ be = std::make_unique<RGWDataChangesFIFO>(cct, lr, log_pool);
+ }
+ } catch (bs::system_error& e) {
+ lderr(cct) << __PRETTY_FUNCTION__
+ << ": Error when starting backend: "
+ << e.what() << dendl;
+ return ceph::from_error_code(e.code());
+ }
+
+ ceph_assert(be);
+ renew_thread = make_named_thread("rgw_dt_lg_renew",
+ &RGWDataChangesLog::renew_run, this);
+ return 0;
+}
+
+int RGWDataChangesLog::choose_oid(const rgw_bucket_shard& bs) {
+ const auto& name = bs.bucket.name;
+ auto shard_shift = (bs.shard_id > 0 ? bs.shard_id : 0);
+ auto r = (ceph_str_hash_linux(name.data(), name.size()) +
+ shard_shift) % num_shards;
+ return static_cast<int>(r);
+}
+
+int RGWDataChangesLog::renew_entries()
+{
+ if (!zone->log_data)
+ return 0;
+
+ /* we can't keep the bucket name as part of the cls_log_entry, and we need
+ * it later, so we keep two lists under the map */
+ bc::flat_map<int, std::pair<std::vector<rgw_bucket_shard>,
+ RGWDataChangesBE::entries>> m;
+
+ std::unique_lock l(lock);
+ decltype(cur_cycle) entries;
+ entries.swap(cur_cycle);
+ l.unlock();
+
+ auto ut = real_clock::now();
+ for (const auto& bs : entries) {
+ auto index = choose_oid(bs);
+
+ rgw_data_change change;
+ bufferlist bl;
+ change.entity_type = ENTITY_TYPE_BUCKET;
+ change.key = bs.get_key();
+ change.timestamp = ut;
+ encode(change, bl);
+
+ m[index].first.push_back(bs);
+ be->prepare(ut, change.key, std::move(bl), m[index].second);
+ }
+
+ for (auto& [index, p] : m) {
+ auto& [buckets, entries] = p;
+
+ auto now = real_clock::now();
+
+ auto ret = be->push(index, std::move(entries));
+ if (ret < 0) {
+ /* we don't really need to have a special handling for failed cases here,
+ * as this is just an optimization. */
+ lderr(cct) << "ERROR: svc.cls->timelog.add() returned " << ret << dendl;
+ return ret;
+ }
+
+ auto expiration = now;
+ expiration += ceph::make_timespan(cct->_conf->rgw_data_log_window);
+ for (auto& bs : buckets) {
+ update_renewed(bs, expiration);
+ }
+ }
+
+ return 0;
+}
+
+void RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs,
+ ChangeStatusPtr& status)
+{
+ ceph_assert(ceph_mutex_is_locked(lock));
+ if (!changes.find(bs, status)) {
+ status = ChangeStatusPtr(new ChangeStatus);
+ changes.add(bs, status);
+ }
+}
+
+void RGWDataChangesLog::register_renew(const rgw_bucket_shard& bs)
+{
+ std::scoped_lock l{lock};
+ cur_cycle.insert(bs);
+}
+
+void RGWDataChangesLog::update_renewed(const rgw_bucket_shard& bs,
+ real_time expiration)
+{
+ std::scoped_lock l{lock};
+ ChangeStatusPtr status;
+ _get_change(bs, status);
+
+ ldout(cct, 20) << "RGWDataChangesLog::update_renewd() bucket_name="
+ << bs.bucket.name << " shard_id=" << bs.shard_id
+ << " expiration=" << expiration << dendl;
+ status->cur_expiration = expiration;
+}
+
+int RGWDataChangesLog::get_log_shard_id(rgw_bucket& bucket, int shard_id) {
+ rgw_bucket_shard bs(bucket, shard_id);
+ return choose_oid(bs);
+}
+
+bool RGWDataChangesLog::filter_bucket(const rgw_bucket& bucket,
+ optional_yield y) const
+{
+ if (!bucket_filter) {
+ return true;
+ }
+
+ return bucket_filter(bucket, y);
+}
+
+std::string RGWDataChangesLog::get_oid(int i) const {
+ return be->get_oid(i);
+}
+
+int RGWDataChangesLog::add_entry(const RGWBucketInfo& bucket_info, int shard_id) {
+ auto& bucket = bucket_info.bucket;
+
+ if (!filter_bucket(bucket, null_yield)) {
+ return 0;
+ }
+
+ if (observer) {
+ observer->on_bucket_changed(bucket.get_key());
+ }
+
+ rgw_bucket_shard bs(bucket, shard_id);
+
+ int index = choose_oid(bs);
+ mark_modified(index, bs);
+
+ std::unique_lock l(lock);
+
+ ChangeStatusPtr status;
+ _get_change(bs, status);
+ l.unlock();
+
+ auto now = real_clock::now();
+
+ std::unique_lock sl(status->lock);
+
+ ldout(cct, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name
+ << " shard_id=" << shard_id << " now=" << now
+ << " cur_expiration=" << status->cur_expiration << dendl;
+
+ if (now < status->cur_expiration) {
+ /* no need to send, recently completed */
+ sl.unlock();
+ register_renew(bs);
+ return 0;
+ }
+
+ RefCountedCond* cond;
+
+ if (status->pending) {
+ cond = status->cond;
+
+ ceph_assert(cond);
+
+ status->cond->get();
+ sl.unlock();
+
+ int ret = cond->wait();
+ cond->put();
+ if (!ret) {
+ register_renew(bs);
+ }
+ return ret;
+ }
+
+ status->cond = new RefCountedCond;
+ status->pending = true;
+
+ ceph::real_time expiration;
+
+ int ret;
+
+ do {
+ status->cur_sent = now;
+
+ expiration = now;
+ expiration += ceph::make_timespan(cct->_conf->rgw_data_log_window);
+
+ sl.unlock();
+
+ ceph::buffer::list bl;
+ rgw_data_change change;
+ change.entity_type = ENTITY_TYPE_BUCKET;
+ change.key = bs.get_key();
+ change.timestamp = now;
+ encode(change, bl);
+
+ ldout(cct, 20) << "RGWDataChangesLog::add_entry() sending update with now=" << now << " cur_expiration=" << expiration << dendl;
+
+ ret = be->push(index, now, change.key, std::move(bl));
+
+ now = real_clock::now();
+
+ sl.lock();
+
+ } while (!ret && real_clock::now() > expiration);
+
+ cond = status->cond;
+
+ status->pending = false;
+ /* time of when operation started, not completed */
+ status->cur_expiration = status->cur_sent;
+ status->cur_expiration += make_timespan(cct->_conf->rgw_data_log_window);
+ status->cond = nullptr;
+ sl.unlock();
+
+ cond->done(ret);
+ cond->put();
+
+ return ret;
+}
+
+int RGWDataChangesLog::list_entries(int shard, int max_entries,
+ std::vector<rgw_data_change_log_entry>& entries,
+ std::optional<std::string_view> marker,
+ std::string* out_marker, bool* truncated)
+{
+ assert(shard < num_shards);
+ return be->list(shard, max_entries, entries, std::string(marker.value_or("")),
+ out_marker, truncated);
+}
+
+int RGWDataChangesLog::list_entries(int max_entries,
+ std::vector<rgw_data_change_log_entry>& entries,
+ LogMarker& marker, bool *ptruncated)
+{
+ bool truncated;
+ entries.clear();
+ for (; marker.shard < num_shards && int(entries.size()) < max_entries;
+ marker.shard++, marker.marker.reset()) {
+ int ret = list_entries(marker.shard, max_entries - entries.size(),
+ entries, marker.marker, NULL, &truncated);
+ if (ret == -ENOENT) {
+ continue;
+ }
+ if (ret < 0) {
+ return ret;
+ }
+ if (truncated) {
+ *ptruncated = true;
+ return 0;
+ }
+ }
+ *ptruncated = (marker.shard < num_shards);
+ return 0;
+}
+
+int RGWDataChangesLog::get_info(int shard_id, RGWDataChangesLogInfo *info)
+{
+ assert(shard_id < num_shards);
+ return be->get_info(shard_id, info);
+}
+
+int RGWDataChangesLog::trim_entries(int shard_id, std::string_view marker)
+{
+ assert(shard_id < num_shards);
+ return be->trim(shard_id, marker);
+}
+
+int RGWDataChangesLog::trim_entries(int shard_id, std::string_view marker,
+ librados::AioCompletion* c)
+{
+ assert(shard_id < num_shards);
+ return be->trim(shard_id, marker, c);
+}
+
+bool RGWDataChangesLog::going_down() const
+{
+ return down_flag;
+}
+
+RGWDataChangesLog::~RGWDataChangesLog() {
+ down_flag = true;
+ if (renew_thread.joinable()) {
+ renew_stop();
+ renew_thread.join();
+ }
+}
+
+void RGWDataChangesLog::renew_run() {
+ for (;;) {
+ dout(2) << "RGWDataChangesLog::ChangesRenewThread: start" << dendl;
+ int r = renew_entries();
+ if (r < 0) {
+ dout(0) << "ERROR: RGWDataChangesLog::renew_entries returned error r=" << r << dendl;
+ }
+
+ if (going_down())
+ break;
+
+ int interval = cct->_conf->rgw_data_log_window * 3 / 4;
+ std::unique_lock locker{renew_lock};
+ renew_cond.wait_for(locker, std::chrono::seconds(interval));
+ }
+}
+
+void RGWDataChangesLog::renew_stop()
+{
+ std::lock_guard l{renew_lock};
+ renew_cond.notify_all();
+}
+
+void RGWDataChangesLog::mark_modified(int shard_id, const rgw_bucket_shard& bs)
+{
+ auto key = bs.get_key();
+ {
+ std::shared_lock rl{modified_lock}; // read lock to check for existence
+ auto shard = modified_shards.find(shard_id);
+ if (shard != modified_shards.end() && shard->second.count(key)) {
+ return;
+ }
+ }
+
+ std::unique_lock wl{modified_lock}; // write lock for insertion
+ modified_shards[shard_id].insert(key);
+}
+
+std::string_view RGWDataChangesLog::max_marker() const {
+ return be->max_marker();
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#ifndef CEPH_RGW_DATALOG_H
+#define CEPH_RGW_DATALOG_H
+
+#include <cstdint>
+#include <list>
+#include <memory>
+#include <string>
+#include <string_view>
+#include <variant>
+#include <vector>
+
+#include <boost/container/flat_map.hpp>
+
+#undef FMT_HEADER_ONLY
+#define FMT_HEADER_ONLY 1
+#include <fmt/format.h>
+
+#include "include/buffer.h"
+#include "include/encoding.h"
+
+#include "include/rados/librados.hpp"
+
+#include "common/ceph_context.h"
+#include "common/ceph_json.h"
+#include "common/ceph_time.h"
+#include "common/Formatter.h"
+#include "common/lru_map.h"
+#include "common/RefCountedObj.h"
+
+#include "cls/log/cls_log_types.h"
+
+#include "rgw_basic_types.h"
+#include "rgw_sync_policy.h"
+#include "rgw_zone.h"
+#include "rgw_trim_bilog.h"
+
+#include "services/svc_cls.h"
+
+namespace bc = boost::container;
+
+enum DataLogEntityType {
+ ENTITY_TYPE_UNKNOWN = 0,
+ ENTITY_TYPE_BUCKET = 1,
+};
+
+struct rgw_data_change {
+ DataLogEntityType entity_type;
+ std::string key;
+ ceph::real_time timestamp;
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ auto t = std::uint8_t(entity_type);
+ encode(t, bl);
+ encode(key, bl);
+ encode(timestamp, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::const_iterator& bl) {
+ DECODE_START(1, bl);
+ std::uint8_t t;
+ decode(t, bl);
+ entity_type = DataLogEntityType(t);
+ decode(key, bl);
+ decode(timestamp, bl);
+ DECODE_FINISH(bl);
+ }
+
+ void dump(ceph::Formatter* f) const;
+ void decode_json(JSONObj* obj);
+};
+WRITE_CLASS_ENCODER(rgw_data_change)
+
+struct rgw_data_change_log_entry {
+ std::string log_id;
+ ceph::real_time log_timestamp;
+ rgw_data_change entry;
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(log_id, bl);
+ encode(log_timestamp, bl);
+ encode(entry, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(log_id, bl);
+ decode(log_timestamp, bl);
+ decode(entry, bl);
+ DECODE_FINISH(bl);
+ }
+
+ void dump(ceph::Formatter* f) const;
+ void decode_json(JSONObj* obj);
+};
+WRITE_CLASS_ENCODER(rgw_data_change_log_entry)
+
+struct RGWDataChangesLogInfo {
+ std::string marker;
+ ceph::real_time last_update;
+
+ void dump(ceph::Formatter* f) const;
+ void decode_json(JSONObj* obj);
+};
+
+struct RGWDataChangesLogMarker {
+ int shard = 0;
+ std::optional<std::string> marker;
+
+ RGWDataChangesLogMarker() = default;
+};
+
+class RGWDataChangesBE {
+protected:
+ CephContext* const cct;
+private:
+ std::string prefix;
+ static std::string_view get_prefix(CephContext* cct) {
+ std::string_view prefix = cct->_conf->rgw_data_log_obj_prefix;
+ if (prefix.empty()) {
+ prefix = "data_log"sv;
+ }
+ return prefix;
+ }
+public:
+ using entries = std::variant<std::list<cls_log_entry>,
+ std::vector<ceph::buffer::list>>;
+
+ RGWDataChangesBE(CephContext* const cct)
+ : cct(cct), prefix(get_prefix(cct)) {}
+ virtual ~RGWDataChangesBE() = default;
+
+ static std::string get_oid(CephContext* cct, int i) {
+ return fmt::format("{}.{}", get_prefix(cct), i);
+ }
+ std::string get_oid(int i) {
+ return fmt::format("{}.{}", prefix, i);
+ }
+ static int remove(CephContext* cct, librados::Rados* rados,
+ const rgw_pool& log_pool);
+
+
+ virtual void prepare(ceph::real_time now,
+ const std::string& key,
+ ceph::buffer::list&& entry,
+ entries& out) = 0;
+ virtual int push(int index, entries&& items) = 0;
+ virtual int push(int index, ceph::real_time now,
+ const std::string& key,
+ ceph::buffer::list&& bl) = 0;
+ virtual int list(int shard, int max_entries,
+ std::vector<rgw_data_change_log_entry>& entries,
+ std::optional<std::string_view> marker,
+ std::string* out_marker, bool* truncated) = 0;
+ virtual int get_info(int index, RGWDataChangesLogInfo *info) = 0;
+ virtual int trim(int index, std::string_view marker) = 0;
+ virtual int trim(int index, std::string_view marker,
+ librados::AioCompletion* c) = 0;
+ virtual std::string_view max_marker() const = 0;
+};
+
+class RGWDataChangesLog {
+ CephContext *cct;
+ rgw::BucketChangeObserver *observer = nullptr;
+ const RGWZone* zone;
+ std::unique_ptr<RGWDataChangesBE> be;
+
+ const int num_shards;
+
+ ceph::mutex lock = ceph::make_mutex("RGWDataChangesLog::lock");
+ ceph::shared_mutex modified_lock =
+ ceph::make_shared_mutex("RGWDataChangesLog::modified_lock");
+ bc::flat_map<int, bc::flat_set<std::string>> modified_shards;
+
+ std::atomic<bool> down_flag = { false };
+
+ struct ChangeStatus {
+ std::shared_ptr<const rgw_sync_policy_info> sync_policy;
+ ceph::real_time cur_expiration;
+ ceph::real_time cur_sent;
+ bool pending = false;
+ RefCountedCond* cond = nullptr;
+ ceph::mutex lock = ceph::make_mutex("RGWDataChangesLog::ChangeStatus");
+ };
+
+ using ChangeStatusPtr = std::shared_ptr<ChangeStatus>;
+
+ lru_map<rgw_bucket_shard, ChangeStatusPtr> changes;
+
+ bc::flat_set<rgw_bucket_shard> cur_cycle;
+
+ void _get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status);
+ void register_renew(const rgw_bucket_shard& bs);
+ void update_renewed(const rgw_bucket_shard& bs, ceph::real_time expiration);
+
+ ceph::mutex renew_lock = ceph::make_mutex("ChangesRenewThread::lock");
+ ceph::condition_variable renew_cond;
+ void renew_run();
+ void renew_stop();
+ std::thread renew_thread;
+
+ std::function<bool(const rgw_bucket& bucket, optional_yield y)> bucket_filter;
+ int choose_oid(const rgw_bucket_shard& bs);
+ bool going_down() const;
+ bool filter_bucket(const rgw_bucket& bucket, optional_yield y) const;
+ int renew_entries();
+
+public:
+
+ RGWDataChangesLog(CephContext* cct);
+ ~RGWDataChangesLog();
+
+ int start(const RGWZone* _zone, const RGWZoneParams& zoneparams,
+ RGWSI_Cls *cls_svc, librados::Rados* lr);
+
+ int add_entry(const RGWBucketInfo& bucket_info, int shard_id);
+ int get_log_shard_id(rgw_bucket& bucket, int shard_id);
+ int list_entries(int shard, int max_entries,
+ std::vector<rgw_data_change_log_entry>& entries,
+ std::optional<std::string_view> marker,
+ std::string* out_marker, bool* truncated);
+ int trim_entries(int shard_id, std::string_view marker);
+ int trim_entries(int shard_id, std::string_view marker,
+ librados::AioCompletion* c); // :(
+ int get_info(int shard_id, RGWDataChangesLogInfo *info);
+
+ using LogMarker = RGWDataChangesLogMarker;
+
+ int list_entries(int max_entries,
+ std::vector<rgw_data_change_log_entry>& entries,
+ LogMarker& marker, bool* ptruncated);
+
+ void mark_modified(int shard_id, const rgw_bucket_shard& bs);
+ auto read_clear_modified() {
+ std::unique_lock wl{modified_lock};
+ decltype(modified_shards) modified;
+ modified.swap(modified_shards);
+ modified_shards.clear();
+ return modified;
+ }
+
+ void set_observer(rgw::BucketChangeObserver *observer) {
+ this->observer = observer;
+ }
+
+ void set_bucket_filter(decltype(bucket_filter)&& f) {
+ bucket_filter = std::move(f);
+ }
+ // a marker that compares greater than any other
+ std::string_view max_marker() const;
+ std::string get_oid(int shard_id) const;
+};
+
+#endif
#include "rgw_acl_s3.h"
#include "rgw_cache.h"
#include "rgw_bucket.h"
+#include "rgw_datalog.h"
#include "rgw_keystone.h"
#include "rgw_basic_types.h"
#include "rgw_op.h"
#include "rgw_rest_conn.h"
#include "rgw_cr_rados.h"
#include "rgw_cr_rest.h"
+#include "rgw_datalog.h"
#include "rgw_putobj_processor.h"
#include "cls/rgw/cls_rgw_ops.h"
http_manager.start();
}
- int notify_all(map<rgw_zone_id, RGWRESTConn *>& conn_map, map<int, set<string> >& shards) {
+ int notify_all(map<rgw_zone_id, RGWRESTConn *>& conn_map,
+ bc::flat_map<int, bc::flat_set<string> >& shards) {
rgw_http_param_pair pairs[] = { { "type", "data" },
{ "notify", NULL },
{ "source-zone", store->svc.zone->get_zone_params().get_id().c_str() },
for (auto iter = conn_map.begin(); iter != conn_map.end(); ++iter) {
RGWRESTConn *conn = iter->second;
RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), this);
- stack->call(new RGWPostRESTResourceCR<map<int, set<string> >, int>(store->ctx(), conn, &http_manager, "/admin/log", pairs, shards, NULL));
+ stack->call(new RGWPostRESTResourceCR<bc::flat_map<int, bc::flat_set<string> >, int>(store->ctx(), conn, &http_manager, "/admin/log", pairs, shards, NULL));
stacks.push_back(stack);
}
return 0;
}
- map<int, set<string> > shards;
-
- data_log->read_clear_modified(shards);
+ auto shards = data_log->read_clear_modified();
if (shards.empty()) {
return 0;
}
- for (map<int, set<string> >::iterator iter = shards.begin(); iter != shards.end(); ++iter) {
- ldout(cct, 20) << __func__ << "(): notifying datalog change, shard_id=" << iter->first << ": " << iter->second << dendl;
+ for (const auto& [shard_id, keys] : shards) {
+ ldout(cct, 20) << __func__ << "(): notifying datalog change, shard_id="
+ << shard_id << ": " << keys << dendl;
}
notify_mgr.notify_all(store->svc.zone->get_zone_data_notify_to_map(), shards);
#pragma once
+#include "rgw_datalog.h"
#include "rgw_rest.h"
#include "rgw_rest_s3.h"
#include "rgw_metadata.h"
#include "common/errno.h"
-#include "rgw_metadata.h"
-#include "rgw_user.h"
#include "rgw_bucket.h"
+#include "rgw_datalog.h"
+#include "rgw_metadata.h"
#include "rgw_otp.h"
+#include "rgw_user.h"
#define dout_subsys ceph_subsys_rgw
bucket_sobj.get());
cls->init(zone.get(), rados.get());
config_key_rados->init(rados.get());
- datalog_rados->init(cls.get());
mdlog->init(rados.get(), zone.get(), sysobj.get(), cls.get());
meta->init(sysobj.get(), mdlog.get(), meta_bes);
meta_be_sobj->init(sysobj.get(), mdlog.get());
return r;
}
- r = datalog_rados->start(&zone->get_zone());
+ r = datalog_rados->start(&zone->get_zone(),
+ zone->get_zone_params(), cls.get(),
+ rados->get_rados_handle());
if (r < 0) {
- ldout(cct, 0) << "ERROR: failed to start datalog service (" << cpp_strerror(-r) << dendl;
+ ldout(cct, 0) << "ERROR: failed to start datalog_rados service (" << cpp_strerror(-r) << dendl;
return r;
}
#pragma once
#include "rgw_basic_types.h"
+#include "rgw_tag.h"
struct rgw_sync_symmetric_group {
#include <vector>
#include <string>
+#include "common/errno.h"
+
#include "rgw_trim_datalog.h"
#include "rgw_cr_rados.h"
#include "rgw_cr_rest.h"
+#include "rgw_datalog.h"
#include "rgw_data_sync.h"
#include "rgw_zone.h"
#include "rgw_bucket.h"
}
int request_complete() override {
int r = cn->completion()->get_return_value();
+ ldout(cct, 0) << __PRETTY_FUNCTION__ << "(): trim of shard=" << shard
+ << " marker=" << marker << "failed with r=" << r
+ << ", " << cpp_strerror(r) << dendl;
+
set_status() << "request complete; ret=" << r;
if (r != -ENODATA) {
return r;
#include "rgw/rgw_bucket.h"
#include "rgw/rgw_zone.h"
+#include "rgw/rgw_datalog.h"
#include "cls/rgw/cls_rgw_client.h"
#pragma once
+#include "rgw/rgw_datalog.h"
#include "rgw/rgw_service.h"
#include "rgw/rgw_tools.h"
struct rgw_bucket_dir_header;
class RGWSI_BILog_RADOS;
-class RGWDataChangesLog;
#define RGW_NO_SHARD -1
};
private:
- librados::Rados* get_rados_handle();
int open_pool_ctx(const rgw_pool& pool, librados::IoCtx& io_ctx,
const OpenParams& params = {});
int pool_iterate(librados::IoCtx& ioctx,
public:
RGWSI_RADOS(CephContext *cct);
~RGWSI_RADOS();
+ librados::Rados* get_rados_handle();
void init() {}
void shutdown() override;
#include "common/Finisher.h"
#include "global/global_init.h"
#include "rgw/rgw_common.h"
+#include "rgw/rgw_datalog.h"
#include "rgw/rgw_mdlog.h"
#include "rgw/rgw_bucket.h"
#include "rgw/rgw_rados.h"