services/svc_finisher.cc
services/svc_bucket.cc
services/svc_cls.cc
+ services/svc_mdlog.cc
+ services/svc_meta.cc
+ services/svc_meta_be.cc
+ services/svc_meta_be_sobj.cc
services/svc_notify.cc
services/svc_quota.cc
services/svc_sync_modules.cc
buckets_obj_id += RGW_BUCKETS_OBJ_SUFFIX;
}
+/*
+ * The tenant_name is always returned on purpose. May be empty, of course.
+ */
+static void parse_bucket(const string& bucket,
+ string *tenant_name,
+ string *bucket_name,
+ string *bucket_instance = nullptr /* optional */)
+{
+ int pos = bucket.find('/');
+ if (pos >= 0) {
+ *tenant_name = bucket.substr(0, pos);
+ } else {
+ tenant_name->clear();
+ }
+ string bn = bucket.substr(pos + 1);
+ pos = bn.find (':');
+ if (pos < 0) {
+ *bucket_name = std::move(bn);
+ return;
+ }
+ *bucket_name = bn.substr(0, pos);
+ if (bucket_instance) {
+ *bucket_instance = bn.substr(pos + 1);
+ }
+}
+
/*
* Note that this is not a reversal of parse_bucket(). That one deals
* with the syntax we need in metadata and such. This one deals with
return store->meta_mgr->remove_entry(bucket_instance_meta_handler, entry, objv_tracker);
}
-// 'tenant/' is used in bucket instance keys for sync to avoid parsing ambiguity
-// with the existing instance[:shard] format. once we parse the shard, the / is
-// replaced with a : to match the [tenant:]instance format
-void rgw_bucket_instance_key_to_oid(string& key)
-{
- // replace tenant/ with tenant:
- auto c = key.find('/');
- if (c != string::npos) {
- key[c] = ':';
- }
-}
-
-// convert bucket instance oids back to the tenant/ format for metadata keys.
-// it's safe to parse 'tenant:' only for oids, because they won't contain the
-// optional :shard at the end
-void rgw_bucket_instance_oid_to_key(string& oid)
-{
- // find first : (could be tenant:bucket or bucket:instance)
- auto c = oid.find(':');
- if (c != string::npos) {
- // if we find another :, the first one was for tenant
- if (oid.find(':', c + 1) != string::npos) {
- oid[c] = '/';
- }
- }
-}
-
int rgw_bucket_parse_bucket_instance(const string& bucket_instance, string *target_bucket_instance, int *shard_id)
{
ssize_t pos = bucket_instance.rfind(':');
return ret;
}
-int rgw_bucket_delete_bucket_obj(RGWRados *store,
- const string& tenant_name,
- const string& bucket_name,
- RGWObjVersionTracker& objv_tracker)
-{
- string key;
-
- rgw_make_bucket_entry_name(tenant_name, bucket_name, key);
- return store->meta_mgr->remove_entry(bucket_meta_handler, key, &objv_tracker);
-}
-
static void set_err_msg(std::string *sink, std::string msg)
{
if (sink && !msg.empty())
JSONDecoder::decode_json("attrs", attrs, obj);
}
+class RGW_MB_Handler_Module_Bucket : public RGWSI_MBSObj_Handler_Module {
+ RGWSI_Zone *zone_svc;
+pubic:
+ RGW_MB_Handler_Module_Bucket(RGWSI_Zone *_zone_svc) : zone_svc {}
+
+ void get_pool_and_oid(RGWRados *store, const string& key, rgw_pool& pool, string& oid) override {
+ oid = key;
+ pool = zone_svc->get_zone_params().domain_root;
+ }
+};
+
class RGWBucketMetadataHandler : public RGWMetadataHandler {
public:
string get_type() override { return "bucket"; }
+ RGWSI_MetaBackend::ModuleRef get_backend_module(RGWSI_MetaBackend::Type be_type) override {
+ return RGWSI_MetaBackend::ModuleRef(new RGW_MB_Handler_Module_Bucket(store->svc.zone));
+ }
+
+ int read_bucket_entrypoint_info(RGWSI_MetaBackend *ctx,
+ string& entry,
+ RGWBucketEntrypointInfo *be,
+ RGWObjVersionTracker *objv_tracker,
+ ceph::real_time *pmtime,
+ map<string, bufferlist> *pattrs) {
+ bufferlist bl;
+ int ret = meta_be->get_entry(ctx, &bl,
+ objv_tracker, pmtime, pattrs,
+ nullptr, nullopt);
+ if (ret < 0) {
+ return ret;
+ }
+
+ try {
+ auto iter = bl.cbegin();
+ ceph::decode(*be, iter);
+ } catch (buffer::error& err) {
+ return -EIO;
+ }
+ return 0;
+ }
+
+ int store_bucket_entrypoint_info(RGWSI_MetaBackend *ctx,
+ string& entry,
+ const RGWBucketEntrypointInfo& be,
+ RGWObjVersionTracker *objv_tracker,
+ const ceph::real_time& mtime,
+ map<string, bufferlist> *pattrs) {
+ bufferlist bl;
+ ceph::encode(be, bl);
+ int ret = meta_be->put(ctx, bl,
+ false, objv_tracker, mtime, pattrs,
+ APPLY_ALWAYS);
+ if (ret < 0) {
+ return ret;
+ }
+
+ return 0;
+ }
+
+ int remove_bucket_entrypoint_info(RGWSI_MetaBackend *ctx,
+ string& entry,
+ RGWObjVersionTracker *objv_tracker,
+ const ceph::real_time& mtime)
+
+ bufferlist bl;
+ ceph::encode(be, bl);
+ int ret = meta_be->remove(ctx, bl,
+ objv_tracker, mtime,
+ APPLY_ALWAYS);
+ if (ret < 0) {
+ return ret;
+ }
+
+ return 0;
+ }
+
int get(RGWRados *store, string& entry, RGWMetadataObject **obj) override {
RGWObjVersionTracker ot;
RGWBucketEntryPoint be;
return 0;
}
- int put(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker,
+ int put(RGWSI_MetaBackend::Context *ctx, string& entry, RGWObjVersionTracker& objv_tracker,
real_time mtime, JSONObj *obj, sync_type_t sync_type) override {
RGWBucketEntryPoint be, old_be;
try {
}
real_time orig_mtime;
- map<string, bufferlist> attrs;
RGWObjVersionTracker old_ot;
- auto obj_ctx = store->svc.sysobj->init_obj_ctx();
- string tenant_name, bucket_name;
- parse_bucket(entry, &tenant_name, &bucket_name);
- int ret = store->get_bucket_entrypoint_info(obj_ctx, tenant_name, bucket_name, old_be, &old_ot, &orig_mtime, &attrs);
+ map<string, bufferlist> attrs;
+ int ret = read_bucket_entrypoint_info(ctx, entry, &old_be, &old_ot, &orig_mtime, &attrs);
if (ret < 0 && ret != -ENOENT)
return ret;
objv_tracker.read_version = old_ot.read_version; /* maintain the obj version we just read */
- ret = store->put_bucket_entrypoint_info(tenant_name, bucket_name, be, false, objv_tracker, mtime, &attrs);
+ ret = store_bucket_entrypoint_info(entry, be, false, objv_tracker, mtime, &attrs);
if (ret < 0)
return ret;
RGWListRawObjsCtx ctx;
};
- int remove(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker) override {
+ int remove(RGWSI_MetaBackend::Context *ctx, string& entry, RGWObjVersionTracker& objv_tracker) override {
RGWBucketEntryPoint be;
- auto obj_ctx = store->svc.sysobj->init_obj_ctx();
- string tenant_name, bucket_name;
- parse_bucket(entry, &tenant_name, &bucket_name);
- int ret = store->get_bucket_entrypoint_info(obj_ctx, tenant_name, bucket_name, be, &objv_tracker, NULL, NULL);
+ real_time orig_mtime;
+
+ int ret = read_bucket_entrypoint_info(ctx, entry, be, &objv_tracker, &orig_mtime, nullptr);
if (ret < 0)
return ret;
lderr(store->ctx()) << "could not unlink bucket=" << entry << " owner=" << be.owner << dendl;
}
- ret = rgw_bucket_delete_bucket_obj(store, tenant_name, bucket_name, objv_tracker);
+ ret = remove_bucket_entrypoint_info(ctx, entry, objv_tracker, orig_mtime);
if (ret < 0) {
lderr(store->ctx()) << "could not delete bucket=" << entry << dendl;
}
return 0;
}
- void get_pool_and_oid(RGWRados *store, const string& key, rgw_pool& pool, string& oid) override {
- oid = key;
- pool = store->svc.zone->get_zone_params().domain_root;
- }
-
int list_keys_init(RGWRados *store, const string& marker, void **phandle) override {
auto info = std::make_unique<list_keys_info>();
};
+class RGW_MB_Handler_Module_BI : public RGWSI_MBSObj_Handler_Module {
+ RGWSI_Zone *zone_svc;
+pubic:
+ RGW_MB_Handler_Module_BI(RGWSI_Zone *_zone_svc) : zone_svc {}
+
+ /*
+ * hash entry for mdlog placement. Use the same hash key we'd have for the bucket entry
+ * point, so that the log entries end up at the same log shard, so that we process them
+ * in order
+ */
+ void get_hash_key(const string& section, const string& key, string& hash_key) override {
+ string k;
+ int pos = key.find(':');
+ if (pos < 0)
+ k = key;
+ else
+ k = key.substr(0, pos);
+ hash_key = "bucket:" + k;
+ }
+
+ void get_pool_and_oid(RGWRados *store, const string& key, rgw_pool& pool, string& oid) override {
+ oid = RGW_BUCKET_INSTANCE_MD_PREFIX + key;
+ rgw_bucket_instance_key_to_oid(oid);
+ pool = store->svc.zone->get_zone_params().domain_root;
+ }
+
+// 'tenant/' is used in bucket instance keys for sync to avoid parsing ambiguity
+// with the existing instance[:shard] format. once we parse the shard, the / is
+// replaced with a : to match the [tenant:]instance format
+ void key_to_oid(string& key) override {
+ // replace tenant/ with tenant:
+ auto c = key.find('/');
+ if (c != string::npos) {
+ key[c] = ':';
+ }
+ }
+
+ // convert bucket instance oids back to the tenant/ format for metadata keys.
+ // it's safe to parse 'tenant:' only for oids, because they won't contain the
+ // optional :shard at the end
+ void oid_to_key(string& oid) override {
+ // find first : (could be tenant:bucket or bucket:instance)
+ auto c = oid.find(':');
+ if (c != string::npos) {
+ // if we find another :, the first one was for tenant
+ if (oid.find(':', c + 1) != string::npos) {
+ oid[c] = '/';
+ }
+ }
+ }
+};
+
class RGWBucketInstanceMetadataHandler : public RGWMetadataHandler {
public:
&info.objv_tracker);
}
- void get_pool_and_oid(RGWRados *store, const string& key, rgw_pool& pool, string& oid) override {
- oid = RGW_BUCKET_INSTANCE_MD_PREFIX + key;
- rgw_bucket_instance_key_to_oid(oid);
- pool = store->svc.zone->get_zone_params().domain_root;
- }
-
int list_keys_init(RGWRados *store, const string& marker, void **phandle) override {
auto info = std::make_unique<list_keys_info>();
list_keys_info *info = static_cast<list_keys_info *>(handle);
return info->store->list_raw_objs_get_cursor(info->ctx);
}
-
- /*
- * hash entry for mdlog placement. Use the same hash key we'd have for the bucket entry
- * point, so that the log entries end up at the same log shard, so that we process them
- * in order
- */
- void get_hash_key(const string& section, const string& key, string& hash_key) override {
- string k;
- int pos = key.find(':');
- if (pos < 0)
- k = key;
- else
- k = key.substr(0, pos);
- hash_key = "bucket:" + k;
- }
};
class RGWArchiveBucketInstanceMetadataHandler : public RGWBucketInstanceMetadataHandler {
bucket_meta_handler = RGWBucketMetaHandlerAllocator::alloc();
bucket_instance_meta_handler = RGWBucketInstanceMetaHandlerAllocator::alloc();
}
- mm->register_handler(bucket_meta_handler);
- mm->register_handler(bucket_instance_meta_handler);
+#warning handle failures
+ bucket_meta_handler->init(mm);
+ bucket_instance_meta_handler->init(mm);
}
extern void rgw_bucket_instance_key_to_oid(string& key);
extern void rgw_bucket_instance_oid_to_key(string& oid);
-extern int rgw_bucket_delete_bucket_obj(RGWRados *store,
- const string& tenant_name,
- const string& bucket_name,
- RGWObjVersionTracker& objv_tracker);
-
extern int rgw_bucket_sync_user_stats(RGWRados *store, const rgw_user& user_id, const RGWBucketInfo& bucket_info);
extern int rgw_bucket_sync_user_stats(RGWRados *store, const string& tenant_name, const string& bucket_name);
encode_json("otp_pool", otp_pool, f);
encode_json_plain("system_key", system_key, f);
encode_json("placement_pools", placement_pools, f);
- encode_json("metadata_heap", metadata_heap, f);
encode_json("tier_config", tier_config, f);
encode_json("realm_id", realm_id, f);
}
JSONDecoder::decode_json("otp_pool", otp_pool, obj);
JSONDecoder::decode_json("system_key", system_key, obj);
JSONDecoder::decode_json("placement_pools", placement_pools, obj);
- JSONDecoder::decode_json("metadata_heap", metadata_heap, obj);
JSONDecoder::decode_json("tier_config", tier_config, obj);
JSONDecoder::decode_json("realm_id", realm_id, obj);
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2019 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#pragma once
+
+#include "rgw_metadata.h"
+#include "rgw_mdlog_types.h"
+
+#define META_LOG_OBJ_PREFIX "meta.log."
+
+struct RGWMetadataLogInfo {
+ string marker;
+ real_time last_update;
+
+ void dump(Formatter *f) const;
+ void decode_json(JSONObj *obj);
+};
+
+class RGWCompletionManager;
+
+class RGWMetadataLogInfoCompletion : public RefCountedObject {
+ public:
+ using info_callback_t = std::function<void(int, const cls_log_header&)>;
+ private:
+ cls_log_header header;
+ librados::IoCtx io_ctx;
+ librados::AioCompletion *completion;
+ std::mutex mutex; //< protects callback between cancel/complete
+ boost::optional<info_callback_t> callback; //< cleared on cancel
+ public:
+ explicit RGWMetadataLogInfoCompletion(info_callback_t callback);
+ ~RGWMetadataLogInfoCompletion() override;
+
+ librados::IoCtx& get_io_ctx() { return io_ctx; }
+ cls_log_header& get_header() { return header; }
+ librados::AioCompletion* get_completion() { return completion; }
+
+ void finish(librados::completion_t cb) {
+ std::lock_guard<std::mutex> lock(mutex);
+ if (callback) {
+ (*callback)(completion->get_return_value(), header);
+ }
+ }
+ void cancel() {
+ std::lock_guard<std::mutex> lock(mutex);
+ callback = boost::none;
+ }
+};
+
+class RGWMetadataLog {
+ CephContext *cct;
+ RGWRados *store;
+ const string prefix;
+
+ static std::string make_prefix(const std::string& period) {
+ if (period.empty())
+ return META_LOG_OBJ_PREFIX;
+ return META_LOG_OBJ_PREFIX + period + ".";
+ }
+
+ RWLock lock;
+ set<int> modified_shards;
+
+ void mark_modified(int shard_id);
+public:
+ RGWMetadataLog(CephContext *_cct, RGWRados *_store, const std::string& period)
+ : cct(_cct), store(_store),
+ prefix(make_prefix(period)),
+ lock("RGWMetaLog::lock") {}
+
+ void get_shard_oid(int id, string& oid) const {
+ char buf[16];
+ snprintf(buf, sizeof(buf), "%d", id);
+ oid = prefix + buf;
+ }
+
+ int add_entry(RGWSI_MetaBackend::Module *module, const string& section, const string& key, bufferlist& bl);
+ int store_entries_in_shard(list<cls_log_entry>& entries, int shard_id, librados::AioCompletion *completion);
+
+ struct LogListCtx {
+ int cur_shard;
+ string marker;
+ real_time from_time;
+ real_time end_time;
+
+ string cur_oid;
+
+ bool done;
+
+ LogListCtx() : cur_shard(0), done(false) {}
+ };
+
+ void init_list_entries(int shard_id, const real_time& from_time, const real_time& end_time, string& marker, void **handle);
+ void complete_list_entries(void *handle);
+ int list_entries(void *handle,
+ int max_entries,
+ list<cls_log_entry>& entries,
+ string *out_marker,
+ bool *truncated);
+
+ int trim(int shard_id, const real_time& from_time, const real_time& end_time, const string& start_marker, const string& end_marker);
+ int get_info(int shard_id, RGWMetadataLogInfo *info);
+ int get_info_async(int shard_id, RGWMetadataLogInfoCompletion *completion);
+ int lock_exclusive(int shard_id, timespan duration, string&zone_id, string& owner_id);
+ int unlock(int shard_id, string& zone_id, string& owner_id);
+
+ int update_shards(list<int>& shards);
+
+ void read_clear_modified(set<int> &modified);
+};
+
+struct LogStatusDump {
+ RGWMDLogStatus status;
+
+ explicit LogStatusDump(RGWMDLogStatus _status) : status(_status) {}
+ void dump(Formatter *f) const;
+};
+
+struct RGWMetadataLogData {
+ obj_version read_version;
+ obj_version write_version;
+ RGWMDLogStatus status;
+
+ RGWMetadataLogData() : status(MDLOG_STATUS_UNKNOWN) {}
+
+ void encode(bufferlist& bl) const;
+ void decode(bufferlist::const_iterator& bl);
+ void dump(Formatter *f) const;
+ void decode_json(JSONObj *obj);
+};
+WRITE_CLASS_ENCODER(RGWMetadataLogData)
+
+struct RGWMetadataLogHistory {
+ epoch_t oldest_realm_epoch;
+ std::string oldest_period_id;
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(oldest_realm_epoch, bl);
+ encode(oldest_period_id, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(bufferlist::const_iterator& p) {
+ DECODE_START(1, p);
+ decode(oldest_realm_epoch, p);
+ decode(oldest_period_id, p);
+ DECODE_FINISH(p);
+ }
+
+ static const std::string oid;
+};
+WRITE_CLASS_ENCODER(RGWMetadataLogHistory)
+
#define dout_subsys ceph_subsys_rgw
+const std::string RGWMetadataLogHistory::oid = "meta.history";
+
void LogStatusDump::dump(Formatter *f) const {
string s;
switch (status) {
}
-int RGWMetadataLog::add_entry(RGWMetadataHandler *handler, const string& section, const string& key, bufferlist& bl) {
+int RGWMetadataLog::add_entry(RGWSI_MetaBackend::Module *module, const string& section, const string& key, bufferlist& bl) {
if (!store->svc.zone->need_to_log_metadata())
return 0;
string oid;
string hash_key;
- handler->get_hash_key(section, key, hash_key);
+ module->get_hash_key(section, key, hash_key);
int shard_id;
store->shard_name(prefix, cct->_conf->rgw_md_log_max_shards, hash_key, oid, &shard_id);
handlers.clear();
}
-const std::string RGWMetadataLogHistory::oid = "meta.history";
-
-namespace {
-
-int read_history(RGWRados *store, RGWMetadataLogHistory *state,
- RGWObjVersionTracker *objv_tracker)
-{
- auto obj_ctx = store->svc.sysobj->init_obj_ctx();
- auto& pool = store->svc.zone->get_zone_params().log_pool;
- const auto& oid = RGWMetadataLogHistory::oid;
- bufferlist bl;
- int ret = rgw_get_system_obj(obj_ctx, pool, oid, bl, objv_tracker, nullptr, null_yield);
- if (ret < 0) {
- return ret;
- }
- if (bl.length() == 0) {
- /* bad history object, remove it */
- rgw_raw_obj obj(pool, oid);
- auto sysobj = obj_ctx.get_obj(obj);
- ret = sysobj.wop().remove(null_yield);
- if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: meta history is empty, but cannot remove it (" << cpp_strerror(-ret) << ")" << dendl;
- return ret;
- }
- return -ENOENT;
- }
- try {
- auto p = bl.cbegin();
- state->decode(p);
- } catch (buffer::error& e) {
- ldout(store->ctx(), 1) << "failed to decode the mdlog history: "
- << e.what() << dendl;
- return -EIO;
- }
- return 0;
-}
-
-int write_history(RGWRados *store, const RGWMetadataLogHistory& state,
- RGWObjVersionTracker *objv_tracker, bool exclusive = false)
-{
- bufferlist bl;
- state.encode(bl);
-
- auto& pool = store->svc.zone->get_zone_params().log_pool;
- const auto& oid = RGWMetadataLogHistory::oid;
- return rgw_put_system_obj(store, pool, oid, bl,
- exclusive, objv_tracker, real_time{});
-}
-
-using Cursor = RGWPeriodHistory::Cursor;
-
-/// read the mdlog history and use it to initialize the given cursor
-class ReadHistoryCR : public RGWCoroutine {
- RGWRados *store;
- Cursor *cursor;
- RGWObjVersionTracker *objv_tracker;
- RGWMetadataLogHistory state;
- public:
- ReadHistoryCR(RGWRados *store, Cursor *cursor,
- RGWObjVersionTracker *objv_tracker)
- : RGWCoroutine(store->ctx()), store(store), cursor(cursor),
- objv_tracker(objv_tracker)
- {}
-
- int operate() {
- reenter(this) {
- yield {
- rgw_raw_obj obj{store->svc.zone->get_zone_params().log_pool,
- RGWMetadataLogHistory::oid};
- constexpr bool empty_on_enoent = false;
-
- using ReadCR = RGWSimpleRadosReadCR<RGWMetadataLogHistory>;
- call(new ReadCR(store->get_async_rados(), store->svc.sysobj, obj,
- &state, empty_on_enoent, objv_tracker));
- }
- if (retcode < 0) {
- ldout(cct, 1) << "failed to read mdlog history: "
- << cpp_strerror(retcode) << dendl;
- return set_cr_error(retcode);
- }
- *cursor = store->period_history->lookup(state.oldest_realm_epoch);
- if (!*cursor) {
- return set_cr_error(cursor->get_error());
- }
-
- ldout(cct, 10) << "read mdlog history with oldest period id="
- << state.oldest_period_id << " realm_epoch="
- << state.oldest_realm_epoch << dendl;
- return set_cr_done();
- }
- return 0;
- }
-};
-
-/// write the given cursor to the mdlog history
-class WriteHistoryCR : public RGWCoroutine {
- RGWRados *store;
- Cursor cursor;
- RGWObjVersionTracker *objv;
- RGWMetadataLogHistory state;
- public:
- WriteHistoryCR(RGWRados *store, const Cursor& cursor,
- RGWObjVersionTracker *objv)
- : RGWCoroutine(store->ctx()), store(store), cursor(cursor), objv(objv)
- {}
-
- int operate() {
- reenter(this) {
- state.oldest_period_id = cursor.get_period().get_id();
- state.oldest_realm_epoch = cursor.get_epoch();
-
- yield {
- rgw_raw_obj obj{store->svc.zone->get_zone_params().log_pool,
- RGWMetadataLogHistory::oid};
-
- using WriteCR = RGWSimpleRadosWriteCR<RGWMetadataLogHistory>;
- call(new WriteCR(store->get_async_rados(), store->svc.sysobj, obj, state, objv));
- }
- if (retcode < 0) {
- ldout(cct, 1) << "failed to write mdlog history: "
- << cpp_strerror(retcode) << dendl;
- return set_cr_error(retcode);
- }
-
- ldout(cct, 10) << "wrote mdlog history with oldest period id="
- << state.oldest_period_id << " realm_epoch="
- << state.oldest_realm_epoch << dendl;
- return set_cr_done();
- }
- return 0;
- }
-};
-
-/// update the mdlog history to reflect trimmed logs
-class TrimHistoryCR : public RGWCoroutine {
- RGWRados *store;
- const Cursor cursor; //< cursor to trimmed period
- RGWObjVersionTracker *objv; //< to prevent racing updates
- Cursor next; //< target cursor for oldest log period
- Cursor existing; //< existing cursor read from disk
-
- public:
- TrimHistoryCR(RGWRados *store, Cursor cursor, RGWObjVersionTracker *objv)
- : RGWCoroutine(store->ctx()),
- store(store), cursor(cursor), objv(objv), next(cursor)
- {
- next.next(); // advance past cursor
- }
-
- int operate() {
- reenter(this) {
- // read an existing history, and write the new history if it's newer
- yield call(new ReadHistoryCR(store, &existing, objv));
- if (retcode < 0) {
- return set_cr_error(retcode);
- }
- // reject older trims with ECANCELED
- if (cursor.get_epoch() < existing.get_epoch()) {
- ldout(cct, 4) << "found oldest log epoch=" << existing.get_epoch()
- << ", rejecting trim at epoch=" << cursor.get_epoch() << dendl;
- return set_cr_error(-ECANCELED);
- }
- // overwrite with updated history
- yield call(new WriteHistoryCR(store, next, objv));
- if (retcode < 0) {
- return set_cr_error(retcode);
- }
- return set_cr_done();
- }
- return 0;
- }
-};
-
-// traverse all the way back to the beginning of the period history, and
-// return a cursor to the first period in a fully attached history
-Cursor find_oldest_period(RGWRados *store)
+int RGWMetadataHandler::init(RGWMetadataManager *manager)
{
- auto cct = store->ctx();
- auto cursor = store->period_history->get_current();
-
- while (cursor) {
- // advance to the period's predecessor
- if (!cursor.has_prev()) {
- auto& predecessor = cursor.get_period().get_predecessor();
- if (predecessor.empty()) {
- // this is the first period, so our logs must start here
- ldout(cct, 10) << "find_oldest_period returning first "
- "period " << cursor.get_period().get_id() << dendl;
- return cursor;
- }
- // pull the predecessor and add it to our history
- RGWPeriod period;
- int r = store->period_puller->pull(predecessor, period);
- if (r < 0) {
- return Cursor{r};
- }
- auto prev = store->period_history->insert(std::move(period));
- if (!prev) {
- return prev;
- }
- ldout(cct, 20) << "find_oldest_period advancing to "
- "predecessor period " << predecessor << dendl;
- ceph_assert(cursor.has_prev());
- }
- cursor.prev();
- }
- ldout(cct, 10) << "find_oldest_period returning empty cursor" << dendl;
- return cursor;
+ return register_handler(this, &be_handle);
}
-} // anonymous namespace
-
-Cursor RGWMetadataManager::init_oldest_log_period()
-{
- // read the mdlog history
- RGWMetadataLogHistory state;
- RGWObjVersionTracker objv;
- int ret = read_history(store, &state, &objv);
-
- if (ret == -ENOENT) {
- // initialize the mdlog history and write it
- ldout(cct, 10) << "initializing mdlog history" << dendl;
- auto cursor = find_oldest_period(store);
- if (!cursor) {
- return cursor;
- }
-
- // write the initial history
- state.oldest_realm_epoch = cursor.get_epoch();
- state.oldest_period_id = cursor.get_period().get_id();
-
- constexpr bool exclusive = true; // don't overwrite
- int ret = write_history(store, state, &objv, exclusive);
- if (ret < 0 && ret != -EEXIST) {
- ldout(cct, 1) << "failed to write mdlog history: "
- << cpp_strerror(ret) << dendl;
- return Cursor{ret};
- }
- return cursor;
- } else if (ret < 0) {
- ldout(cct, 1) << "failed to read mdlog history: "
- << cpp_strerror(ret) << dendl;
- return Cursor{ret};
- }
-
- // if it's already in the history, return it
- auto cursor = store->period_history->lookup(state.oldest_realm_epoch);
- if (cursor) {
- return cursor;
- }
- // pull the oldest period by id
- RGWPeriod period;
- ret = store->period_puller->pull(state.oldest_period_id, period);
- if (ret < 0) {
- ldout(cct, 1) << "failed to read period id=" << state.oldest_period_id
- << " for mdlog history: " << cpp_strerror(ret) << dendl;
- return Cursor{ret};
- }
- // verify its realm_epoch
- if (period.get_realm_epoch() != state.oldest_realm_epoch) {
- ldout(cct, 1) << "inconsistent mdlog history: read period id="
- << period.get_id() << " with realm_epoch=" << period.get_realm_epoch()
- << ", expected realm_epoch=" << state.oldest_realm_epoch << dendl;
- return Cursor{-EINVAL};
- }
- // attach the period to our history
- return store->period_history->attach(std::move(period));
-}
-
-Cursor RGWMetadataManager::read_oldest_log_period() const
-{
- RGWMetadataLogHistory state;
- int ret = read_history(store, &state, nullptr);
- if (ret < 0) {
- ldout(store->ctx(), 1) << "failed to read mdlog history: "
- << cpp_strerror(ret) << dendl;
- return Cursor{ret};
- }
-
- ldout(store->ctx(), 10) << "read mdlog history with oldest period id="
- << state.oldest_period_id << " realm_epoch="
- << state.oldest_realm_epoch << dendl;
-
- return store->period_history->lookup(state.oldest_realm_epoch);
-}
-
-RGWCoroutine* RGWMetadataManager::read_oldest_log_period_cr(Cursor *period,
- RGWObjVersionTracker *objv) const
-{
- return new ReadHistoryCR(store, period, objv);
-}
-
-RGWCoroutine* RGWMetadataManager::trim_log_period_cr(Cursor period,
- RGWObjVersionTracker *objv) const
-{
- return new TrimHistoryCR(store, period, objv);
-}
-
-int RGWMetadataManager::init(const std::string& current_period)
-{
- // open a log for the current period
- current_log = get_log(current_period);
- return 0;
-}
-
-RGWMetadataLog* RGWMetadataManager::get_log(const std::string& period)
-{
- // construct the period's log in place if it doesn't exist
- auto insert = md_logs.emplace(std::piecewise_construct,
- std::forward_as_tuple(period),
- std::forward_as_tuple(cct, store, period));
- return &insert.first->second;
-}
-
-int RGWMetadataManager::register_handler(RGWMetadataHandler *handler)
+int RGWMetadataManager::register_handler(RGWMetadataHandler *handler, RGWSI_MetaBackend::Handle *phandle)
{
string type = handler->get_type();
if (handlers.find(type) != handlers.end())
return -EINVAL;
+ int ret = store->svc.meta->init_handler(handler, phandle);
+ if (ret < 0) {
+ return ret;
+ }
+
handlers[type] = handler;
return 0;
return ret;
}
-int RGWMetadataManager::prepare_mutate(RGWRados *store,
- rgw_pool& pool, const string& oid,
- const real_time& mtime,
- RGWObjVersionTracker *objv_tracker,
- RGWMetadataHandler::sync_type_t sync_mode)
-{
- bufferlist bl;
- real_time orig_mtime;
- auto obj_ctx = store->svc.sysobj->init_obj_ctx();
- int ret = rgw_get_system_obj(obj_ctx, pool, oid,
- bl, objv_tracker, &orig_mtime,
- null_yield, nullptr, nullptr);
- if (ret < 0 && ret != -ENOENT) {
- return ret;
- }
- if (ret != -ENOENT &&
- !RGWMetadataHandler::check_versions(objv_tracker->read_version, orig_mtime,
- objv_tracker->write_version, mtime, sync_mode)) {
- return STATUS_NO_APPLY;
- }
-
- if (objv_tracker->write_version.tag.empty()) {
- if (objv_tracker->read_version.tag.empty()) {
- objv_tracker->generate_new_write_ver(store->ctx());
- } else {
- objv_tracker->write_version = objv_tracker->read_version;
- objv_tracker->write_version.ver++;
- }
- }
- return 0;
-}
-
int RGWMetadataManager::remove(string& metadata_key)
{
RGWMetadataHandler *handler;
return handler->remove(store, entry, objv_tracker);
}
-int RGWMetadataManager::lock_exclusive(string& metadata_key, timespan duration, string& owner_id) {
- RGWMetadataHandler *handler;
- string entry;
- string zone_id;
-
- int ret = find_handler(metadata_key, &handler, entry);
- if (ret < 0)
- return ret;
-
- rgw_pool pool;
- string oid;
-
- handler->get_pool_and_oid(store, entry, pool, oid);
-
- return store->lock_exclusive(pool, oid, duration, zone_id, owner_id);
-}
-
-int RGWMetadataManager::unlock(string& metadata_key, string& owner_id) {
- librados::IoCtx io_ctx;
- RGWMetadataHandler *handler;
- string entry;
- string zone_id;
-
- int ret = find_handler(metadata_key, &handler, entry);
- if (ret < 0)
- return ret;
-
- rgw_pool pool;
- string oid;
-
- handler->get_pool_and_oid(store, entry, pool, oid);
-
- return store->unlock(pool, oid, zone_id, owner_id);
-}
-
struct list_keys_handle {
void *handle;
RGWMetadataHandler *handler;
}
}
-int RGWMetadataManager::pre_modify(RGWMetadataHandler *handler, string& section, const string& key,
- RGWMetadataLogData& log_data, RGWObjVersionTracker *objv_tracker,
- RGWMDLogStatus op_type)
-{
- section = handler->get_type();
-
- /* if write version has not been set, and there's a read version, set it so that we can
- * log it
- */
- if (objv_tracker) {
- if (objv_tracker->read_version.ver && !objv_tracker->write_version.ver) {
- objv_tracker->write_version = objv_tracker->read_version;
- objv_tracker->write_version.ver++;
- }
- log_data.read_version = objv_tracker->read_version;
- log_data.write_version = objv_tracker->write_version;
- }
-
- log_data.status = op_type;
-
- bufferlist logbl;
- encode(log_data, logbl);
-
- ceph_assert(current_log); // must have called init()
- int ret = current_log->add_entry(handler, section, key, logbl);
- if (ret < 0)
- return ret;
-
- return 0;
-}
-
-int RGWMetadataManager::post_modify(RGWMetadataHandler *handler, const string& section, const string& key, RGWMetadataLogData& log_data,
- RGWObjVersionTracker *objv_tracker, int ret)
-{
- if (ret >= 0)
- log_data.status = MDLOG_STATUS_COMPLETE;
- else
- log_data.status = MDLOG_STATUS_ABORT;
-
- bufferlist logbl;
- encode(log_data, logbl);
-
- ceph_assert(current_log); // must have called init()
- int r = current_log->add_entry(handler, section, key, logbl);
- if (ret < 0)
- return ret;
-
- if (r < 0)
- return r;
-
- return 0;
-}
-
-string RGWMetadataManager::heap_oid(RGWMetadataHandler *handler, const string& key, const obj_version& objv)
-{
- char buf[objv.tag.size() + 32];
- snprintf(buf, sizeof(buf), "%s:%lld", objv.tag.c_str(), (long long)objv.ver);
- return string(".meta:") + handler->get_type() + ":" + key + ":" + buf;
-}
-
-int RGWMetadataManager::store_in_heap(RGWMetadataHandler *handler, const string& key, bufferlist& bl,
- RGWObjVersionTracker *objv_tracker, real_time mtime,
- map<string, bufferlist> *pattrs)
-{
- if (!objv_tracker) {
- return -EINVAL;
- }
-
- rgw_pool heap_pool(store->svc.zone->get_zone_params().metadata_heap);
-
- if (heap_pool.empty()) {
- return 0;
- }
-
- RGWObjVersionTracker otracker;
- otracker.write_version = objv_tracker->write_version;
- string oid = heap_oid(handler, key, objv_tracker->write_version);
- int ret = rgw_put_system_obj(store, heap_pool, oid,
- bl, false, &otracker, mtime, pattrs);
- if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: rgw_put_system_obj() oid=" << oid << " returned ret=" << ret << dendl;
- return ret;
- }
-
- return 0;
-}
-
-int RGWMetadataManager::remove_from_heap(RGWMetadataHandler *handler, const string& key, RGWObjVersionTracker *objv_tracker)
-{
- if (!objv_tracker) {
- return -EINVAL;
- }
-
- rgw_pool heap_pool(store->svc.zone->get_zone_params().metadata_heap);
-
- if (heap_pool.empty()) {
- return 0;
- }
-
- string oid = heap_oid(handler, key, objv_tracker->write_version);
- rgw_raw_obj obj(heap_pool, oid);
- auto obj_ctx = store->svc.sysobj->init_obj_ctx();
- auto sysobj = obj_ctx.get_obj(obj);
- int ret = sysobj.wop().remove(null_yield);
- if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: sysobj.wop().remove() oid=" << oid << " returned ret=" << ret << dendl;
- return ret;
- }
-
- return 0;
-}
-
-int RGWMetadataManager::put_entry(RGWMetadataHandler *handler, const string& key, bufferlist& bl, bool exclusive,
- RGWObjVersionTracker *objv_tracker, real_time mtime, map<string, bufferlist> *pattrs)
-{
- string section;
- RGWMetadataLogData log_data;
- int ret = pre_modify(handler, section, key, log_data, objv_tracker, MDLOG_STATUS_WRITE);
- if (ret < 0)
- return ret;
-
- string oid;
- rgw_pool pool;
-
- handler->get_pool_and_oid(store, key, pool, oid);
-
- ret = store_in_heap(handler, key, bl, objv_tracker, mtime, pattrs);
- if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: " << __func__ << ": store_in_heap() key=" << key << " returned ret=" << ret << dendl;
- goto done;
- }
-
- ret = rgw_put_system_obj(store, pool, oid, bl, exclusive,
- objv_tracker, mtime, pattrs);
-
- if (ret < 0) {
- int r = remove_from_heap(handler, key, objv_tracker);
- if (r < 0) {
- ldout(store->ctx(), 0) << "ERROR: " << __func__ << ": remove_from_heap() key=" << key << " returned ret=" << r << dendl;
- }
- }
-done:
- /* cascading ret into post_modify() */
-
- ret = post_modify(handler, section, key, log_data, objv_tracker, ret);
- if (ret < 0)
- return ret;
-
- return 0;
-}
-
-int RGWMetadataManager::remove_entry(RGWMetadataHandler *handler,
- const string& key,
- RGWObjVersionTracker *objv_tracker)
-{
- string section;
- RGWMetadataLogData log_data;
- int ret = pre_modify(handler, section, key, log_data, objv_tracker, MDLOG_STATUS_REMOVE);
- if (ret < 0) {
- return ret;
- }
-
- string oid;
- rgw_pool pool;
-
- handler->get_pool_and_oid(store, key, pool, oid);
-
- rgw_raw_obj obj(pool, oid);
-
- auto obj_ctx = store->svc.sysobj->init_obj_ctx();
- auto sysobj = obj_ctx.get_obj(obj);
- ret = sysobj.wop()
- .set_objv_tracker(objv_tracker)
- .remove(null_yield);
- /* cascading ret into post_modify() */
-
- ret = post_modify(handler, section, key, log_data, objv_tracker, ret);
- if (ret < 0) {
- return ret;
- }
-
- return 0;
-}
-
int RGWMetadataManager::get_log_shard_id(const string& section,
const string& key, int *shard_id)
{
#include "include/types.h"
#include "rgw_common.h"
#include "rgw_period_history.h"
+#include "rgw_mdlog_types.h"
#include "cls/version/cls_version_types.h"
#include "cls/log/cls_log_types.h"
#include "common/RWLock.h"
#include "common/RefCountedObj.h"
#include "common/ceph_time.h"
+#include "services/svc_meta_be.h"
+
class RGWRados;
class RGWCoroutine;
struct obj_version;
-enum RGWMDLogStatus {
- MDLOG_STATUS_UNKNOWN,
- MDLOG_STATUS_WRITE,
- MDLOG_STATUS_SETATTRS,
- MDLOG_STATUS_REMOVE,
- MDLOG_STATUS_COMPLETE,
- MDLOG_STATUS_ABORT,
-};
-
class RGWMetadataObject {
protected:
obj_version objv;
class RGWMetadataManager;
class RGWMetadataHandler {
+ friend class RGWSI_MetaBackend;
friend class RGWMetadataManager;
-public:
- enum sync_type_t {
- APPLY_ALWAYS,
- APPLY_UPDATES,
- APPLY_NEWER
- };
- static bool string_to_sync_type(const string& sync_string,
- sync_type_t& type) {
- if (sync_string.compare("update-by-version") == 0)
- type = APPLY_UPDATES;
- else if (sync_string.compare("update-by-timestamp") == 0)
- type = APPLY_NEWER;
- else if (sync_string.compare("always") == 0)
- type = APPLY_ALWAYS;
- else
- return false;
- return true;
- }
+ RGWSI_MetaBackend_Handle be_handle{0};
+public:
virtual ~RGWMetadataHandler() {}
virtual string get_type() = 0;
+ virtual RGWSI_MetaBackend::ModuleRef get_backend_module(RGWSI_MetaBackend::Type be_type) = 0;
+
virtual int get(RGWRados *store, string& entry, RGWMetadataObject **obj) = 0;
virtual int put(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker,
- real_time mtime, JSONObj *obj, sync_type_t type) = 0;
+ real_time mtime, JSONObj *obj, RGWMDLogSyncType type) = 0;
virtual int remove(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker) = 0;
virtual int list_keys_init(RGWRados *store, const string& marker, void **phandle) = 0;
virtual string get_marker(void *handle) = 0;
- /* key to use for hashing entries for log shard placement */
- virtual void get_hash_key(const string& section, const string& key, string& hash_key) {
- hash_key = section + ":" + key;
- }
+ int init(RGWMetadataManager *manager);
+
protected:
- virtual void get_pool_and_oid(RGWRados *store, const string& key, rgw_pool& pool, string& oid) = 0;
/**
* Compare an incoming versus on-disk tag/version+mtime combo against
* the sync mode to see if the new one should replace the on-disk one.
*/
static bool check_versions(const obj_version& ondisk, const real_time& ondisk_time,
const obj_version& incoming, const real_time& incoming_time,
- sync_type_t sync_mode) {
+ RGWMDLogSyncType sync_mode) {
switch (sync_mode) {
case APPLY_UPDATES:
if ((ondisk.tag != incoming.tag) ||
}
return true;
}
-
- /*
- * The tenant_name is always returned on purpose. May be empty, of course.
- */
- static void parse_bucket(const string& bucket,
- string *tenant_name,
- string *bucket_name,
- string *bucket_instance = nullptr /* optional */)
- {
- int pos = bucket.find('/');
- if (pos >= 0) {
- *tenant_name = bucket.substr(0, pos);
- } else {
- tenant_name->clear();
- }
- string bn = bucket.substr(pos + 1);
- pos = bn.find (':');
- if (pos < 0) {
- *bucket_name = std::move(bn);
- return;
- }
- *bucket_name = bn.substr(0, pos);
- if (bucket_instance) {
- *bucket_instance = bn.substr(pos + 1);
- }
- }
-};
-
-#define META_LOG_OBJ_PREFIX "meta.log."
-
-struct RGWMetadataLogInfo {
- string marker;
- real_time last_update;
-
- void dump(Formatter *f) const;
- void decode_json(JSONObj *obj);
-};
-
-class RGWCompletionManager;
-
-class RGWMetadataLogInfoCompletion : public RefCountedObject {
- public:
- using info_callback_t = std::function<void(int, const cls_log_header&)>;
- private:
- cls_log_header header;
- librados::IoCtx io_ctx;
- librados::AioCompletion *completion;
- std::mutex mutex; //< protects callback between cancel/complete
- boost::optional<info_callback_t> callback; //< cleared on cancel
- public:
- explicit RGWMetadataLogInfoCompletion(info_callback_t callback);
- ~RGWMetadataLogInfoCompletion() override;
-
- librados::IoCtx& get_io_ctx() { return io_ctx; }
- cls_log_header& get_header() { return header; }
- librados::AioCompletion* get_completion() { return completion; }
-
- void finish(librados::completion_t cb) {
- std::lock_guard<std::mutex> lock(mutex);
- if (callback) {
- (*callback)(completion->get_return_value(), header);
- }
- }
- void cancel() {
- std::lock_guard<std::mutex> lock(mutex);
- callback = boost::none;
- }
};
-class RGWMetadataLog {
- CephContext *cct;
- RGWRados *store;
- const string prefix;
-
- static std::string make_prefix(const std::string& period) {
- if (period.empty())
- return META_LOG_OBJ_PREFIX;
- return META_LOG_OBJ_PREFIX + period + ".";
- }
-
- RWLock lock;
- set<int> modified_shards;
-
- void mark_modified(int shard_id);
-public:
- RGWMetadataLog(CephContext *_cct, RGWRados *_store, const std::string& period)
- : cct(_cct), store(_store),
- prefix(make_prefix(period)),
- lock("RGWMetaLog::lock") {}
-
- void get_shard_oid(int id, string& oid) const {
- char buf[16];
- snprintf(buf, sizeof(buf), "%d", id);
- oid = prefix + buf;
- }
-
- int add_entry(RGWMetadataHandler *handler, const string& section, const string& key, bufferlist& bl);
- int store_entries_in_shard(list<cls_log_entry>& entries, int shard_id, librados::AioCompletion *completion);
-
- struct LogListCtx {
- int cur_shard;
- string marker;
- real_time from_time;
- real_time end_time;
-
- string cur_oid;
-
- bool done;
-
- LogListCtx() : cur_shard(0), done(false) {}
- };
-
- void init_list_entries(int shard_id, const real_time& from_time, const real_time& end_time, string& marker, void **handle);
- void complete_list_entries(void *handle);
- int list_entries(void *handle,
- int max_entries,
- list<cls_log_entry>& entries,
- string *out_marker,
- bool *truncated);
-
- int trim(int shard_id, const real_time& from_time, const real_time& end_time, const string& start_marker, const string& end_marker);
- int get_info(int shard_id, RGWMetadataLogInfo *info);
- int get_info_async(int shard_id, RGWMetadataLogInfoCompletion *completion);
- int lock_exclusive(int shard_id, timespan duration, string&zone_id, string& owner_id);
- int unlock(int shard_id, string& zone_id, string& owner_id);
-
- int update_shards(list<int>& shards);
-
- void read_clear_modified(set<int> &modified);
-};
-
-struct LogStatusDump {
- RGWMDLogStatus status;
-
- explicit LogStatusDump(RGWMDLogStatus _status) : status(_status) {}
- void dump(Formatter *f) const;
-};
-
-struct RGWMetadataLogData {
- obj_version read_version;
- obj_version write_version;
- RGWMDLogStatus status;
-
- RGWMetadataLogData() : status(MDLOG_STATUS_UNKNOWN) {}
-
- void encode(bufferlist& bl) const;
- void decode(bufferlist::const_iterator& bl);
- void dump(Formatter *f) const;
- void decode_json(JSONObj *obj);
-};
-WRITE_CLASS_ENCODER(RGWMetadataLogData)
-
-struct RGWMetadataLogHistory {
- epoch_t oldest_realm_epoch;
- std::string oldest_period_id;
-
- void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
- encode(oldest_realm_epoch, bl);
- encode(oldest_period_id, bl);
- ENCODE_FINISH(bl);
- }
- void decode(bufferlist::const_iterator& p) {
- DECODE_START(1, p);
- decode(oldest_realm_epoch, p);
- decode(oldest_period_id, p);
- DECODE_FINISH(p);
- }
-
- static const std::string oid;
-};
-WRITE_CLASS_ENCODER(RGWMetadataLogHistory)
-
class RGWMetadataManager {
map<string, RGWMetadataHandler *> handlers;
CephContext *cct;
- RGWRados *store;
-
- // maintain a separate metadata log for each period
- std::map<std::string, RGWMetadataLog> md_logs;
- // use the current period's log for mutating operations
- RGWMetadataLog* current_log = nullptr;
int find_handler(const string& metadata_key, RGWMetadataHandler **handler, string& entry);
- int pre_modify(RGWMetadataHandler *handler, string& section, const string& key,
- RGWMetadataLogData& log_data, RGWObjVersionTracker *objv_tracker,
- RGWMDLogStatus op_type);
- int post_modify(RGWMetadataHandler *handler, const string& section, const string& key, RGWMetadataLogData& log_data,
- RGWObjVersionTracker *objv_tracker, int ret);
-
- string heap_oid(RGWMetadataHandler *handler, const string& key, const obj_version& objv);
- int store_in_heap(RGWMetadataHandler *handler, const string& key, bufferlist& bl,
- RGWObjVersionTracker *objv_tracker, real_time mtime,
- map<string, bufferlist> *pattrs);
- int remove_from_heap(RGWMetadataHandler *handler, const string& key, RGWObjVersionTracker *objv_tracker);
- int prepare_mutate(RGWRados *store, rgw_pool& pool, const string& oid,
- const real_time& mtime,
- RGWObjVersionTracker *objv_tracker,
- RGWMetadataHandler::sync_type_t sync_mode);
+
+protected:
+ int register_handler(RGWMetadataHandler *handler, RGWSI_MetaBackend_Handle *phandle);
public:
- RGWMetadataManager(CephContext *_cct, RGWRados *_store);
+ RGWMetadataManager(CephContext *_cct);
~RGWMetadataManager();
- RGWRados* get_store() { return store; }
-
- int init(const std::string& current_period);
-
- /// initialize the oldest log period if it doesn't exist, and attach it to
- /// our current history
- RGWPeriodHistory::Cursor init_oldest_log_period();
-
- /// read the oldest log period, and return a cursor to it in our existing
- /// period history
- RGWPeriodHistory::Cursor read_oldest_log_period() const;
-
- /// read the oldest log period asynchronously and write its result to the
- /// given cursor pointer
- RGWCoroutine* read_oldest_log_period_cr(RGWPeriodHistory::Cursor *period,
- RGWObjVersionTracker *objv) const;
-
- /// try to advance the oldest log period when the given period is trimmed,
- /// using a rados lock to provide atomicity
- RGWCoroutine* trim_log_period_cr(RGWPeriodHistory::Cursor period,
- RGWObjVersionTracker *objv) const;
-
- /// find or create the metadata log for the given period
- RGWMetadataLog* get_log(const std::string& period);
-
- int register_handler(RGWMetadataHandler *handler);
-
- template <typename F>
- int mutate(RGWMetadataHandler *handler, const string& key,
- const ceph::real_time& mtime, RGWObjVersionTracker *objv_tracker,
- RGWMDLogStatus op_type,
- RGWMetadataHandler::sync_type_t sync_mode,
- F&& f);
-
RGWMetadataHandler *get_handler(const string& type);
- int put_entry(RGWMetadataHandler *handler, const string& key, bufferlist& bl, bool exclusive,
- RGWObjVersionTracker *objv_tracker, real_time mtime, map<string, bufferlist> *pattrs = NULL);
- int remove_entry(RGWMetadataHandler *handler,
- const string& key,
- RGWObjVersionTracker *objv_tracker);
int get(string& metadata_key, Formatter *f);
int put(string& metadata_key, bufferlist& bl,
- RGWMetadataHandler::sync_type_t sync_mode,
+ RGWMDLogSyncType sync_mode,
obj_version *existing_version = NULL);
int remove(string& metadata_key);
void dump_log_entry(cls_log_entry& entry, Formatter *f);
void get_sections(list<string>& sections);
- int lock_exclusive(string& metadata_key, timespan duration, string& owner_id);
- int unlock(string& metadata_key, string& owner_id);
int get_log_shard_id(const string& section, const string& key, int *shard_id);
void parse_metadata_key(const string& metadata_key, string& type, string& entry);
};
-template <typename F>
-int RGWMetadataManager::mutate(RGWMetadataHandler *handler, const string& key,
- const ceph::real_time& mtime, RGWObjVersionTracker *objv_tracker,
- RGWMDLogStatus op_type,
- RGWMetadataHandler::sync_type_t sync_mode,
- F&& f)
-{
- string oid;
- rgw_pool pool;
-
- handler->get_pool_and_oid(store, key, pool, oid);
-
- int ret = prepare_mutate(store, pool, oid, mtime, objv_tracker, sync_mode);
- if (ret < 0 ||
- ret == STATUS_NO_APPLY) {
- return ret;
- }
-
- string section;
- RGWMetadataLogData log_data;
- ret = pre_modify(handler, section, key, log_data, objv_tracker, MDLOG_STATUS_WRITE);
- if (ret < 0) {
- return ret;
- }
-
- ret = std::forward<F>(f)();
-
- /* cascading ret into post_modify() */
-
- ret = post_modify(handler, section, key, log_data, objv_tracker, ret);
- if (ret < 0)
- return ret;
-
- return 0;
-}
#endif
svc.shutdown();
- delete meta_mgr;
delete binfo_cache;
delete obj_tombstone_cache;
return ret;
}
- meta_mgr = new RGWMetadataManager(cct, this);
data_log = new RGWDataChangesLog(cct, this);
cr_registry = crs.release();
return ret;
return ret;
}
- period_puller.reset(new RGWPeriodPuller(this));
- period_history.reset(new RGWPeriodHistory(cct, period_puller.get(),
- svc.zone->get_current_period()));
-
ret = open_root_pool_ctx();
if (ret < 0)
return ret;
rgw_cache_entry_info *cache_info,
boost::optional<obj_version> refresh_version)
{
+#warning FIXME
+#if 0
bufferlist bl;
string bucket_entry;
return -EIO;
}
return 0;
+#endif
}
int RGWRados::get_bucket_info(RGWSysObjectCtx& obj_ctx,
.set_mtime(pmtime)
.set_attrs(pattrs)
.set_pinfo(&info)
- .set_refresh_version(info.objv_tracker.read_version)
+ .set_refresh_version(rv)
.exec();
if (r < 0) {
return r;
encode(entry_point, epbl);
string bucket_entry;
rgw_make_bucket_entry_name(tenant_name, bucket_name, bucket_entry);
+#warning FIXME
+#if 0
return rgw_bucket_store_info(this, bucket_entry, epbl, exclusive, pattrs, &objv_tracker, mtime);
+#endif
}
int RGWRados::put_bucket_instance_info(RGWBucketInfo& info, bool exclusive,
pools_initialized(false),
quota_handler(NULL),
cr_registry(NULL),
- meta_mgr(NULL), data_log(NULL), reshard(NULL) {}
+ data_log(NULL), reshard(NULL) {}
RGWRados& set_use_cache(bool status) {
use_cache = status;
*/
string host_id;
- // pulls missing periods for period_history
- std::unique_ptr<RGWPeriodPuller> period_puller;
- // maintains a connected history of periods
- std::unique_ptr<RGWPeriodHistory> period_history;
-
RGWAsyncRadosProcessor* get_async_rados() const { return async_rados; };
- RGWMetadataManager *meta_mgr;
-
RGWDataChangesLog *data_log;
RGWReshard *reshard;
return 0;
}
+static bool string_to_sync_type(const string& sync_string,
+ sync_type_t& type) {
+ if (sync_string.compare("update-by-version") == 0)
+ type = APPLY_UPDATES;
+ else if (sync_string.compare("update-by-timestamp") == 0)
+ type = APPLY_NEWER;
+ else if (sync_string.compare("always") == 0)
+ type = APPLY_ALWAYS;
+ else
+ return false;
+ return true;
+}
+
void RGWOp_Metadata_Put::execute() {
bufferlist bl;
string metadata_key;
class RGWSI_Finisher;
class RGWSI_Bucket;
class RGWSI_Cls;
+class RGWSI_MDLog;
+class RGWSI_Meta;
+class RGWSI_MetaBackend_SObj;
class RGWSI_Notify;
class RGWSI_RADOS;
class RGWSI_Zone;
std::unique_ptr<RGWSI_Finisher> finisher;
std::unique_ptr<RGWSI_Bucket> bucket;
std::unique_ptr<RGWSI_Cls> cls;
+ std::unique_ptr<RGWSI_MDLog> mdlog;
+ std::unique_ptr<RGWSI_Meta> meta;
+ std::unique_ptr<RGWSI_MetaBackend_SObj> meta_be_sobj;
std::unique_ptr<RGWSI_Notify> notify;
std::unique_ptr<RGWSI_RADOS> rados;
std::unique_ptr<RGWSI_Zone> zone;
RGWSI_Finisher *finisher{nullptr};
RGWSI_Bucket *bucket{nullptr};
RGWSI_Cls *cls{nullptr};
+ RGWSI_Meta *mdlog{nullptr};
+ RGWSI_Meta *meta{nullptr};
+ RGWSI_MetaBackend_SObj *meta_be_sobj{nullptr};
RGWSI_Notify *notify{nullptr};
RGWSI_RADOS *rados{nullptr};
RGWSI_Zone *zone{nullptr};
return 0;
}
-int rgw_put_system_obj(RGWRados *rgwstore, const rgw_pool& pool, const string& oid, bufferlist& data, bool exclusive,
+int rgw_put_system_obj(RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& oid, bufferlist& data, bool exclusive,
RGWObjVersionTracker *objv_tracker, real_time set_mtime, map<string, bufferlist> *pattrs)
{
map<string,bufferlist> no_attrs;
rgw_raw_obj obj(pool, oid);
- auto obj_ctx = rgwstore->svc.sysobj->init_obj_ctx();
auto sysobj = obj_ctx.get_obj(obj);
int ret = sysobj.wop()
.set_objv_tracker(objv_tracker)
.set_attrs(*pattrs)
.write(data, null_yield);
- if (ret == -ENOENT) {
- ret = rgwstore->create_pool(pool);
- if (ret >= 0) {
- ret = sysobj.wop()
- .set_objv_tracker(objv_tracker)
- .set_exclusive(exclusive)
- .set_mtime(set_mtime)
- .set_attrs(*pattrs)
- .write(data, null_yield);
- }
- }
-
return ret;
}
bool create = false,
bool mostly_omap = false);
-int rgw_put_system_obj(RGWRados *rgwstore, const rgw_pool& pool, const string& oid, bufferlist& data, bool exclusive,
+int rgw_put_system_obj(RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& oid, bufferlist& data, bool exclusive,
RGWObjVersionTracker *objv_tracker, real_time set_mtime, map<string, bufferlist> *pattrs = NULL);
int rgw_get_system_obj(RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& key, bufferlist& bl,
RGWObjVersionTracker *objv_tracker, real_time *pmtime, optional_yield y, map<string, bufferlist> *pattrs = NULL,
}
if (zone.get_id() != my_zone_id) {
pool_names.insert(zone.domain_root);
- pool_names.insert(zone.metadata_heap);
pool_names.insert(zone.control_pool);
pool_names.insert(zone.gc_pool);
pool_names.insert(zone.log_pool);
}
domain_root = fix_zone_pool_dup(pools, name, ".rgw.meta:root", domain_root);
- if (!metadata_heap.name.empty()) {
- metadata_heap = fix_zone_pool_dup(pools, name, ".rgw.meta:heap", metadata_heap);
- }
control_pool = fix_zone_pool_dup(pools, name, ".rgw.control", control_pool);
gc_pool = fix_zone_pool_dup(pools, name ,".rgw.log:gc", gc_pool);
lc_pool = fix_zone_pool_dup(pools, name ,".rgw.log:lc", lc_pool);
struct RGWZoneParams : RGWSystemMetaObj {
rgw_pool domain_root;
- rgw_pool metadata_heap;
rgw_pool control_pool;
rgw_pool gc_pool;
rgw_pool lc_pool;
RGWSystemMetaObj::encode(bl);
encode(system_key, bl);
encode(placement_pools, bl);
- encode(metadata_heap, bl);
+ rgw_pool unused_metadata_heap;
+ encode(unused_metadata_heap, bl);
encode(realm_id, bl);
encode(lc_pool, bl);
map<std::string, std::string, ltstr_nocase> old_tier_config;
decode(system_key, bl);
if (struct_v >= 4)
decode(placement_pools, bl);
- if (struct_v >= 5)
- decode(metadata_heap, bl);
+ if (struct_v >= 5) {
+ rgw_pool unused_metadata_heap;
+ decode(unused_metadata_heap, bl);
+ }
if (struct_v >= 6) {
decode(realm_id, bl);
}
--- /dev/null
+#include "svc_mdlog.h"
+#include "svc_zone.h"
+
+#include "rgw/rgw_tools.h"
+#include "rgw/rgw_mdlog.h"
+#include "rgw/rgw_coroutine.h"
+
+int RGWSI_MDLog::read_history(RGWRados *store, RGWMetadataLogHistory *state,
+ RGWObjVersionTracker *objv_tracker)
+{
+ auto obj_ctx = sysobj_svc->init_obj_ctx();
+ auto& pool = zone_svc->get_zone_params().log_pool;
+ const auto& oid = RGWMetadataLogHistory::oid;
+ bufferlist bl;
+ int ret = rgw_get_system_obj(obj_ctx, pool, oid, bl, objv_tracker, nullptr, null_yield);
+ if (ret < 0) {
+ return ret;
+ }
+ if (bl.length() == 0) {
+ /* bad history object, remove it */
+ rgw_raw_obj obj(pool, oid);
+ auto sysobj = obj_ctx.get_obj(obj);
+ ret = sysobj.wop().remove(null_yield);
+ if (ret < 0) {
+ ldout(cct, 0) << "ERROR: meta history is empty, but cannot remove it (" << cpp_strerror(-ret) << ")" << dendl;
+ return ret;
+ }
+ return -ENOENT;
+ }
+ try {
+ auto p = bl.cbegin();
+ state->decode(p);
+ } catch (buffer::error& e) {
+ ldout(cct, 1) << "failed to decode the mdlog history: "
+ << e.what() << dendl;
+ return -EIO;
+ }
+ return 0;
+}
+
+int RGWSI_MDLog::write_history(const RGWMetadataLogHistory& state,
+ RGWObjVersionTracker *objv_tracker,
+ bool exclusive)
+{
+ bufferlist bl;
+ state.encode(bl);
+
+ auto& pool = zone_svc->get_zone_params().log_pool;
+ const auto& oid = RGWMetadataLogHistory::oid;
+ return rgw_put_system_obj(sysobj_svc, pool, oid, bl,
+ exclusive, objv_tracker, real_time{});
+}
+
+namespace {
+
+using Cursor = RGWPeriodHistory::Cursor;
+
+/// read the mdlog history and use it to initialize the given cursor
+class ReadHistoryCR : public RGWCoroutine {
+ RGWSI_Zone *zone_svc;
+ RGWSI_SysObj *sysobj_svc;
+ Cursor *cursor;
+ RGWObjVersionTracker *objv_tracker;
+ RGWMetadataLogHistory state;
+ public:
+ ReadHistoryCR(RGWSI_Zone *zone_svc,
+ RGWSI_SysObj *sysobj_svc,
+ Cursor *cursor,
+ RGWObjVersionTracker *objv_tracker)
+ : RGWCoroutine(zone_svc->ctx()), zone_svc(zone_svc),
+ sysobj_svc(sysobj_svc),
+ cursor(cursor),
+ objv_tracker(objv_tracker)
+ {}
+
+ int operate() {
+ reenter(this) {
+ yield {
+ rgw_raw_obj obj{zone_svc->get_zone_params().log_pool,
+ RGWMetadataLogHistory::oid};
+ constexpr bool empty_on_enoent = false;
+
+ using ReadCR = RGWSimpleRadosReadCR<RGWMetadataLogHistory>;
+ call(new ReadCR(store->get_async_rados(), sysobj_svc, obj,
+ &state, empty_on_enoent, objv_tracker));
+ }
+ if (retcode < 0) {
+ ldout(cct, 1) << "failed to read mdlog history: "
+ << cpp_strerror(retcode) << dendl;
+ return set_cr_error(retcode);
+ }
+ *cursor = store->period_history->lookup(state.oldest_realm_epoch);
+ if (!*cursor) {
+ return set_cr_error(cursor->get_error());
+ }
+
+ ldout(cct, 10) << "read mdlog history with oldest period id="
+ << state.oldest_period_id << " realm_epoch="
+ << state.oldest_realm_epoch << dendl;
+ return set_cr_done();
+ }
+ return 0;
+ }
+};
+
+/// write the given cursor to the mdlog history
+class WriteHistoryCR : public RGWCoroutine {
+ RGWSI_Zone *zone_svc;
+ RGWSI_SysObj *sysobj_svc;
+ Cursor cursor;
+ RGWObjVersionTracker *objv;
+ RGWMetadataLogHistory state;
+ public:
+ WriteHistoryCR(RGWSI_Zone *zone_svc, RGWSI_SysObj *sysobj_svc,
+ const Cursor& cursor,
+ RGWObjVersionTracker *objv)
+ : RGWCoroutine(zone_svc->ctx()), zone_svc(zone_svc), cursor(cursor), objv(objv)
+ {}
+
+ int operate() {
+ reenter(this) {
+ state.oldest_period_id = cursor.get_period().get_id();
+ state.oldest_realm_epoch = cursor.get_epoch();
+
+ yield {
+ rgw_raw_obj obj{zone_svc->get_zone_params().log_pool,
+ RGWMetadataLogHistory::oid};
+
+ using WriteCR = RGWSimpleRadosWriteCR<RGWMetadataLogHistory>;
+ call(new WriteCR(store->get_async_rados(), sysobj_svc, obj, state, objv));
+ }
+ if (retcode < 0) {
+ ldout(cct, 1) << "failed to write mdlog history: "
+ << cpp_strerror(retcode) << dendl;
+ return set_cr_error(retcode);
+ }
+
+ ldout(cct, 10) << "wrote mdlog history with oldest period id="
+ << state.oldest_period_id << " realm_epoch="
+ << state.oldest_realm_epoch << dendl;
+ return set_cr_done();
+ }
+ return 0;
+ }
+};
+
+/// update the mdlog history to reflect trimmed logs
+class TrimHistoryCR : public RGWCoroutine {
+ RGWRados *store;
+ const Cursor cursor; //< cursor to trimmed period
+ RGWObjVersionTracker *objv; //< to prevent racing updates
+ Cursor next; //< target cursor for oldest log period
+ Cursor existing; //< existing cursor read from disk
+
+ public:
+ TrimHistoryCR(RGWRados *store, Cursor cursor, RGWObjVersionTracker *objv)
+ : RGWCoroutine(store->ctx()),
+ store(store), cursor(cursor), objv(objv), next(cursor)
+ {
+ next.next(); // advance past cursor
+ }
+
+ int operate() {
+ reenter(this) {
+ // read an existing history, and write the new history if it's newer
+ yield call(new ReadHistoryCR(store, &existing, objv));
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+ // reject older trims with ECANCELED
+ if (cursor.get_epoch() < existing.get_epoch()) {
+ ldout(cct, 4) << "found oldest log epoch=" << existing.get_epoch()
+ << ", rejecting trim at epoch=" << cursor.get_epoch() << dendl;
+ return set_cr_error(-ECANCELED);
+ }
+ // overwrite with updated history
+ yield call(new WriteHistoryCR(store, next, objv));
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+ return set_cr_done();
+ }
+ return 0;
+ }
+};
+
+// traverse all the way back to the beginning of the period history, and
+// return a cursor to the first period in a fully attached history
+Cursor find_oldest_period(RGWRados *store)
+{
+ auto cct = store->ctx();
+ auto cursor = store->period_history->get_current();
+
+ while (cursor) {
+ // advance to the period's predecessor
+ if (!cursor.has_prev()) {
+ auto& predecessor = cursor.get_period().get_predecessor();
+ if (predecessor.empty()) {
+ // this is the first period, so our logs must start here
+ ldout(cct, 10) << "find_oldest_period returning first "
+ "period " << cursor.get_period().get_id() << dendl;
+ return cursor;
+ }
+ // pull the predecessor and add it to our history
+ RGWPeriod period;
+ int r = store->period_puller->pull(predecessor, period);
+ if (r < 0) {
+ return Cursor{r};
+ }
+ auto prev = store->period_history->insert(std::move(period));
+ if (!prev) {
+ return prev;
+ }
+ ldout(cct, 20) << "find_oldest_period advancing to "
+ "predecessor period " << predecessor << dendl;
+ ceph_assert(cursor.has_prev());
+ }
+ cursor.prev();
+ }
+ ldout(cct, 10) << "find_oldest_period returning empty cursor" << dendl;
+ return cursor;
+}
+
+} // anonymous namespace
+
+Cursor RGWMetadataManager::init_oldest_log_period()
+{
+ // read the mdlog history
+ RGWMetadataLogHistory state;
+ RGWObjVersionTracker objv;
+ int ret = read_history(store, &state, &objv);
+
+ if (ret == -ENOENT) {
+ // initialize the mdlog history and write it
+ ldout(cct, 10) << "initializing mdlog history" << dendl;
+ auto cursor = find_oldest_period(store);
+ if (!cursor) {
+ return cursor;
+ }
+
+ // write the initial history
+ state.oldest_realm_epoch = cursor.get_epoch();
+ state.oldest_period_id = cursor.get_period().get_id();
+
+ constexpr bool exclusive = true; // don't overwrite
+ int ret = write_history(store, state, &objv, exclusive);
+ if (ret < 0 && ret != -EEXIST) {
+ ldout(cct, 1) << "failed to write mdlog history: "
+ << cpp_strerror(ret) << dendl;
+ return Cursor{ret};
+ }
+ return cursor;
+ } else if (ret < 0) {
+ ldout(cct, 1) << "failed to read mdlog history: "
+ << cpp_strerror(ret) << dendl;
+ return Cursor{ret};
+ }
+
+ // if it's already in the history, return it
+ auto cursor = store->period_history->lookup(state.oldest_realm_epoch);
+ if (cursor) {
+ return cursor;
+ }
+ // pull the oldest period by id
+ RGWPeriod period;
+ ret = store->period_puller->pull(state.oldest_period_id, period);
+ if (ret < 0) {
+ ldout(cct, 1) << "failed to read period id=" << state.oldest_period_id
+ << " for mdlog history: " << cpp_strerror(ret) << dendl;
+ return Cursor{ret};
+ }
+ // verify its realm_epoch
+ if (period.get_realm_epoch() != state.oldest_realm_epoch) {
+ ldout(cct, 1) << "inconsistent mdlog history: read period id="
+ << period.get_id() << " with realm_epoch=" << period.get_realm_epoch()
+ << ", expected realm_epoch=" << state.oldest_realm_epoch << dendl;
+ return Cursor{-EINVAL};
+ }
+ // attach the period to our history
+ return store->period_history->attach(std::move(period));
+}
+
+Cursor RGWMetadataManager::read_oldest_log_period() const
+{
+ RGWMetadataLogHistory state;
+ int ret = read_history(store, &state, nullptr);
+ if (ret < 0) {
+ ldout(store->ctx(), 1) << "failed to read mdlog history: "
+ << cpp_strerror(ret) << dendl;
+ return Cursor{ret};
+ }
+
+ ldout(store->ctx(), 10) << "read mdlog history with oldest period id="
+ << state.oldest_period_id << " realm_epoch="
+ << state.oldest_realm_epoch << dendl;
+
+ return store->period_history->lookup(state.oldest_realm_epoch);
+}
+
+RGWCoroutine* RGWMetadataManager::read_oldest_log_period_cr(Cursor *period,
+ RGWObjVersionTracker *objv) const
+{
+ return new ReadHistoryCR(store, period, objv);
+}
+
+RGWCoroutine* RGWMetadataManager::trim_log_period_cr(Cursor period,
+ RGWObjVersionTracker *objv) const
+{
+ return new TrimHistoryCR(store, period, objv);
+}
+
+RGWMetadataLog* RGWMetadataManager::get_log(const std::string& period)
+{
+ // construct the period's log in place if it doesn't exist
+ auto insert = md_logs.emplace(std::piecewise_construct,
+ std::forward_as_tuple(period),
+ std::forward_as_tuple(cct, store, period));
+ return &insert.first->second;
+}
+
+int init(RGWSI_Zone *_zone_svc, RGWSI_SysObj *_sysobj_svc,
+ const std::string& current_period)
+{
+ zone_svc = _zone_svc;
+ sysobj_svc = _sysobj_svc;
+ current_log = get_log(current_period);
+
+ period_puller.reset(new RGWPeriodPuller(this));
+ period_history.reset(new RGWPeriodHistory(cct, period_puller.get(),
+ zone_svc->get_current_period()));
+
+ return 0;
+}
+}
+
+int RGWSI_MDLog::add_entry(RGWSI_MetaBacked::Module *module, const string& section, const string& key, bufferlist& bl)
+{
+ ceph_assert(current_log); // must have called init()
+ return current_log->add_entry(module, section, key, logbl);
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2019 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#pragma once
+
+#include "common/static_ptr.h"
+
+#include "rgw/rgw_service.h"
+#include "rgw/rgw_period_history.h"
+#include "rgw/rgw_period_puller.h"
+
+#include "svc_meta_be.h"
+
+
+class RGWMetadataLog;
+class RGWMetadataLogHistory;
+class RGWCoroutine;
+
+class RGWSI_Zone;
+class RGWSI_SysObj;
+
+
+class RGWSI_MDLog : public RGWServiceInstance
+{
+ RGWSI_Zone *zone_svc{nullptr};
+ RGWSI_SysObj *sysobj_svc{nullptr};
+
+ // maintain a separate metadata log for each period
+ std::map<std::string, RGWMetadataLog> md_logs;
+
+ // use the current period's log for mutating operations
+ RGWMetadataLog* current_log{nullptr};
+
+ /// find or create the metadata log for the given period
+ RGWMetadataLog* get_log(const std::string& period);
+
+ // pulls missing periods for period_history
+ std::unique_ptr<RGWPeriodPuller> period_puller;
+ // maintains a connected history of periods
+ std::unique_ptr<RGWPeriodHistory> period_history;
+
+public:
+ RGWSI_MDLog(CephContext *cct) : RGWServiceInstance(cct) {}
+ virtual ~RGWSI_MDLog() {}
+
+ /// initialize the oldest log period if it doesn't exist, and attach it to
+ /// our current history
+ RGWPeriodHistory::Cursor init_oldest_log_period();
+
+ /// read the oldest log period, and return a cursor to it in our existing
+ /// period history
+ RGWPeriodHistory::Cursor read_oldest_log_period() const;
+
+ /// read the oldest log period asynchronously and write its result to the
+ /// given cursor pointer
+ RGWCoroutine* read_oldest_log_period_cr(RGWPeriodHistory::Cursor *period,
+ RGWObjVersionTracker *objv) const;
+
+ /// try to advance the oldest log period when the given period is trimmed,
+ /// using a rados lock to provide atomicity
+ RGWCoroutine* trim_log_period_cr(RGWPeriodHistory::Cursor period,
+ RGWObjVersionTracker *objv) const;
+
+ int init(RGWSI_Zone *_zone_svc, RGWSI_SysObj *_sysobj_svc,
+ const std::string& current_period);
+
+ int read_history(RGWMetadataLogHistory *state, RGWObjVersionTracker *objv_tracker);
+ int write_history(const RGWMetadataLogHistory& state,
+ RGWObjVersionTracker *objv_tracker,
+ bool exclusive = false);
+
+ int add_entry(RGWSI_MetaBackend::Module *module, const string& section, const string& key, bufferlist& bl);
+};
+
--- /dev/null
+
+
+#include "svc_meta_be.h"
+#include "svc_mdlog.h"
+
+#include "rgw/rgw_mdlog.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+
+RGWSI_MetaBackend::Context::~Context() {} // needed, even though destructor is pure virtual
+RGWSI_MetaBackend::Module::~Module() {} // needed, even though destructor is pure virtual
+
+int RGWSI_MetaBackend::pre_modify(RGWSI_MetaBackend::Context *ctx,
+ RGWMetadataLogData& log_data,
+ RGWObjVersionTracker *objv_tracker,
+ RGWMDLogStatus op_type)
+{
+ /* if write version has not been set, and there's a read version, set it so that we can
+ * log it
+ */
+ if (objv_tracker) {
+ if (objv_tracker->read_version.ver && !objv_tracker->write_version.ver) {
+ objv_tracker->write_version = objv_tracker->read_version;
+ objv_tracker->write_version.ver++;
+ }
+ log_data.read_version = objv_tracker->read_version;
+ log_data.write_version = objv_tracker->write_version;
+ }
+
+ log_data.status = op_type;
+
+ bufferlist logbl;
+ encode(log_data, logbl);
+
+ int ret = mdlog_svc->add_entry(ctx->module, ctx->section, ctx->key, logbl);
+ if (ret < 0)
+ return ret;
+
+ return 0;
+}
+
+int RGWSI_MetaBackend::post_modify(RGWSI_MetaBackend::Context *ctx,
+ RGWMetadataLogData& log_data,
+ RGWObjVersionTracker *objv_tracker, int ret)
+{
+ if (ret >= 0)
+ log_data.status = MDLOG_STATUS_COMPLETE;
+ else
+ log_data.status = MDLOG_STATUS_ABORT;
+
+ bufferlist logbl;
+ encode(log_data, logbl);
+
+ int r = mdlog_svc->add_entry(ctx->module, ctx->section, ctx->key, logbl);
+ if (ret < 0)
+ return ret;
+
+ if (r < 0)
+ return r;
+
+ return 0;
+}
+
+int RGWSI_MetaBackend::prepare_mutate(RGWSI_MetaBackend::Context *ctx,
+ const real_time& mtime,
+ RGWObjVersionTracker *objv_tracker,
+ RGWMDLogSyncType sync_mode)
+{
+ bufferlist bl;
+ real_time orig_mtime;
+ int ret = get_entry(ctx, &bl, objv_tracker, &orig_mtime,
+ nullptr, nullptr, boost::none);
+ if (ret < 0 && ret != -ENOENT) {
+ return ret;
+ }
+ if (ret != -ENOENT &&
+ !RGWMetadataHandler::check_versions(objv_tracker->read_version, orig_mtime,
+ objv_tracker->write_version, mtime, sync_mode)) {
+ return STATUS_NO_APPLY;
+ }
+
+ if (objv_tracker->write_version.tag.empty()) {
+ if (objv_tracker->read_version.tag.empty()) {
+ objv_tracker->generate_new_write_ver(cct);
+ } else {
+ objv_tracker->write_version = objv_tracker->read_version;
+ objv_tracker->write_version.ver++;
+ }
+ }
+ return 0;
+}
+
+int RGWSI_MetaBackend::mutate(RGWSI_MetaBackend::Context *ctx,
+ const ceph::real_time& mtime,
+ RGWObjVersionTracker *objv_tracker,
+ RGWMDLogStatus op_type,
+ RGWMDLogSyncType sync_mode,
+ std::function<int()> f,
+ bool generic_prepare)
+{
+ int ret;
+
+ if (generic_prepare) {
+ ret = prepare_mutate(ctx, mtime, objv_tracker, sync_mode);
+ if (ret < 0 ||
+ ret == STATUS_NO_APPLY) {
+ return ret;
+ }
+ }
+
+ RGWMetadataLogData log_data;
+ ret = pre_modify(ctx, log_data, objv_tracker, op_type);
+ if (ret < 0) {
+ return ret;
+ }
+
+ ret = f();
+
+ /* cascading ret into post_modify() */
+
+ ret = post_modify(ctx, log_data, objv_tracker, ret);
+ if (ret < 0)
+ return ret;
+
+ return 0;
+}
+
+int RGWSI_MetaBackend::get(Context *ctx,
+ bufferlist *pbl,
+ RGWObjVersionTracker *objv_tracker,
+ real_time *pmtime,
+ map<string, bufferlist> *pattrs,
+ rgw_cache_entry_info *cache_info,
+ boost::optional<obj_version> refresh_version)
+{
+ return get_entry(ctx, pbl,
+ objv_tracker, pmtime,
+ pattrs,
+ cache_info,
+ refresh_version);
+}
+
+int RGWSI_MetaBackend::put(Context *ctx,
+ bufferlist& bl,
+ bool exclusive,
+ RGWObjVersionTracker *objv_tracker,
+ const ceph::real_time& mtime,
+ map<string, bufferlist> *pattrs,
+ RGWMDLogSyncType sync_mode)
+{
+ std::function<int()> f = [&]() {
+ return put_entry(ctx, bl,
+ exclusive, objv_tracker,
+ mtime, pattrs);
+ };
+
+ return mutate(ctx, mtime, objv_tracker,
+ MDLOG_STATUS_WRITE, sync_mode,
+ f,
+ false);
+}
+
+int RGWSI_MetaBackend::remove(Context *ctx,
+ RGWObjVersionTracker *objv_tracker,
+ const ceph::real_time& mtime,
+ RGWMDLogSyncType sync_mode)
+{
+ std::function<int()> f = [&]() {
+ return remove_entry(ctx, objv_tracker);
+ };
+
+ return mutate(ctx, mtime, objv_tracker,
+ MDLOG_STATUS_REMOVE, sync_mode,
+ f,
+ false);
+}
--- /dev/null
+
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2019 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#pragma once
+
+#include "rgw/rgw_service.h"
+#include "rgw/rgw_mdlog_types.h"
+
+class RGWMetadataHandler;
+class RGWMetadataLogData;
+
+class RGWSI_MDLog;
+class RGWSI_Meta;
+
+typedef void *RGWSI_MetaBackend_Handle;
+
+class RGWSI_MetaBackend : public RGWServiceInstance
+{
+ friend class RGWSI_Meta;
+public:
+ class Module;
+ class Context;
+protected:
+ map<string, RGWMetadataHandler *> handlers;
+
+ RGWSI_MDLog *mdlog_svc{nullptr};
+
+ int find_handler(const string& metadata_key, RGWMetadataHandler **handler, string& entry);
+
+ void base_init(RGWSI_MDLog *_mdlog_svc) {
+ mdlog_svc = _mdlog_svc;
+ }
+
+ virtual int init_handler(RGWMetadataHandler *handler, RGWSI_MetaBackend_Handle *phandle) { return 0; }
+
+ int prepare_mutate(RGWSI_MetaBackend::Context *ctx,
+ const real_time& mtime,
+ RGWObjVersionTracker *objv_tracker,
+ RGWMDLogSyncType sync_mode);
+
+ virtual int mutate(Context *ctx,
+ const ceph::real_time& mtime, RGWObjVersionTracker *objv_tracker,
+ RGWMDLogStatus op_type,
+ RGWMDLogSyncType sync_mode,
+ std::function<int()> f,
+ bool generic_prepare);
+
+ virtual int pre_modify(Context *ctx,
+ RGWMetadataLogData& log_data,
+ RGWObjVersionTracker *objv_tracker,
+ RGWMDLogStatus op_type);
+ virtual int post_modify(Context *ctx,
+ RGWMetadataLogData& log_data,
+ RGWObjVersionTracker *objv_tracker, int ret);
+public:
+ class Module {
+ /*
+ * Backend specialization module
+ */
+ public:
+ virtual ~Module() = 0;
+ /* key to use for hashing entries for log shard placement */
+ virtual void get_hash_key(const string& section, const string& key, string& hash_key) {
+ hash_key = section + ":" + key;
+ }
+ };
+
+ using ModuleRef = std::shared_ptr<Module>;
+
+ struct Context { /*
+ * A single metadata operation context. Will be holding info about
+ * backend and operation itself; operation might span multiple backend
+ * calls.
+ */
+ virtual ~Context() = 0;
+
+ RGWSI_MetaBackend_Handle handle;
+ Module *module{nullptr};
+ std::string section;
+ std::string key;
+ };
+
+ enum Type {
+ MDBE_SOBJ = 0,
+ };
+
+ RGWSI_MetaBackend(CephContext *cct) : RGWServiceInstance(cct) {}
+ virtual ~RGWSI_MetaBackend() {}
+
+ virtual Type get_type() = 0;
+
+ virtual void init_ctx(RGWSI_MetaBackend_Handle handle, const string& key, Context *ctx) = 0;
+
+ /* these should be implemented by backends */
+ virtual int get_entry(Context *ctx,
+ bufferlist *pbl,
+ RGWObjVersionTracker *objv_tracker,
+ real_time *pmtime,
+ map<string, bufferlist> *pattrs = nullptr,
+ rgw_cache_entry_info *cache_info = nullptr,
+ boost::optional<obj_version> refresh_version = boost::none) = 0;
+ virtual int put_entry(Context *ctx,
+ bufferlist& bl,
+ bool exclusive,
+ RGWObjVersionTracker *objv_tracker,
+ real_time mtime, map<string, bufferlist> *pattrs = nullptr) = 0;
+ virtual int remove_entry(Context *ctx,
+ RGWObjVersionTracker *objv_tracker) = 0;
+
+ /* these should be called by handlers */
+ virtual int get(Context *ctx,
+ bufferlist *pbl,
+ RGWObjVersionTracker *objv_tracker,
+ real_time *pmtime,
+ map<string, bufferlist> *pattrs = nullptr,
+ rgw_cache_entry_info *cache_info = nullptr,
+ boost::optional<obj_version> refresh_version = boost::none) = 0;
+
+ virtual int put(Context *ctx,
+ bufferlist& bl,
+ bool exclusive,
+ RGWObjVersionTracker *objv_tracker,
+ const ceph::real_time& mtime,
+ map<string, bufferlist> *pattrs,
+ RGWMDLogSyncType sync_mode);
+
+ virtual int remove(Context *ctx,
+ RGWObjVersionTracker *objv_tracker,
+ const ceph::real_time& mtime,
+ RGWMDLogSyncType sync_mode);
+};
+
--- /dev/null
+
+#include "svc_meta_be_sobj.h"
+
+#include "rgw/rgw_tools.h"
+#include "rgw/rgw_metadata.h"
+#include "rgw/rgw_mdlog.h"
+
+struct rgwsi_meta_be_sobj_handler_info {
+ RGWSI_MetaBackend::ModuleRef _module;
+ RGWSI_MBSObj_Handler_Module *module;
+ string section;
+};
+
+RGWSI_MetaBackend_SObj::RGWSI_MetaBackend_SObj(CephContext *cct) : RGWSI_MetaBackend(cct) {
+}
+
+RGWSI_MetaBackend_SObj::~RGWSI_MetaBackend_SObj() {
+}
+
+int RGWSI_MetaBackend_SObj::init_handler(RGWMetadataHandler *handler, RGWSI_MetaBackend_Handle *phandle)
+{
+ const auto& section = handler->get_type();
+
+ auto& info = handlers[handler->get_type()];
+ info.section = section;
+
+ info._module = handler->get_backend_module(get_type());
+ info.module = static_cast<RGWSI_MBSObj_Handler_Module *>(info._module.get());
+
+ *phandle = (RGWSI_MetaBackend_Handle)(&info);
+
+ return 0;
+}
+
+void RGWSI_MetaBackend_SObj::init_ctx(RGWSI_MetaBackend_Handle handle, const string& key, RGWSI_MetaBackend::Context *_ctx)
+{
+ RGWSI_MetaBackend_SObj::Context_SObj *ctx = static_cast<RGWSI_MetaBackend_SObj::Context_SObj *>(_ctx);
+ rgwsi_meta_be_sobj_handler_info *h = static_cast<rgwsi_meta_be_sobj_handler_info *>(ctx->handle);
+
+ ctx->handle = handle;
+ ctx->module = h->module;
+ ctx->section = h->section;
+ ctx->key = key;
+ ctx->obj_ctx.emplace(sysobj_svc->init_obj_ctx());
+ static_cast<RGWSI_MBSObj_Handler_Module *>(ctx->module)->get_pool_and_oid(key, ctx->pool, ctx->oid);
+}
+
+int RGWSI_MetaBackend_SObj::get_entry(RGWSI_MetaBackend::Context *_ctx,
+ bufferlist *pbl,
+ RGWObjVersionTracker *objv_tracker,
+ real_time *pmtime,
+ map<string, bufferlist> *pattrs,
+ rgw_cache_entry_info *cache_info,
+ boost::optional<obj_version> refresh_version)
+{
+ RGWSI_MetaBackend_SObj::Context_SObj *ctx = static_cast<RGWSI_MetaBackend_SObj::Context_SObj *>(_ctx);
+
+ return rgw_get_system_obj(*ctx->obj_ctx, ctx->pool, ctx->oid, *pbl,
+ objv_tracker, pmtime,
+ null_yield,
+ pattrs, cache_info,
+ refresh_version);
+}
+
+int RGWSI_MetaBackend_SObj::put_entry(RGWSI_MetaBackend::Context *_ctx,
+ bufferlist& bl, bool exclusive,
+ RGWObjVersionTracker *objv_tracker,
+ real_time mtime, map<string, bufferlist> *pattrs)
+{
+ RGWSI_MetaBackend_SObj::Context_SObj *ctx = static_cast<RGWSI_MetaBackend_SObj::Context_SObj *>(_ctx);
+
+ return rgw_put_system_obj(*ctx->obj_ctx, ctx->pool, ctx->oid, bl, exclusive,
+ objv_tracker, mtime, pattrs);
+}
+
+int RGWSI_MetaBackend_SObj::remove_entry(RGWSI_MetaBackend::Context *_ctx,
+ RGWObjVersionTracker *objv_tracker)
+{
+ RGWSI_MetaBackend_SObj::Context_SObj *ctx = static_cast<RGWSI_MetaBackend_SObj::Context_SObj *>(_ctx);
+
+ rgw_raw_obj k(ctx->pool, ctx->oid);
+
+ auto sysobj = ctx->obj_ctx->get_obj(k);
+ return sysobj.wop()
+ .set_objv_tracker(objv_tracker)
+ .remove(null_yield);
+}
+
--- /dev/null
+
+
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2019 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#pragma once
+
+#include "rgw/rgw_service.h"
+
+#include "svc_meta_be.h"
+#include "svc_sys_obj.h"
+
+
+struct rgwsi_meta_be_sobj_handler_info;
+
+class RGWSI_MBSObj_Handler_Module : public RGWSI_MetaBackend::Module {
+public:
+ virtual void get_pool_and_oid(const string& key, rgw_pool& pool, string& oid) = 0;
+ virtual void key_to_oid(string& key) {}
+ virtual void oid_to_key(string& oid) {}
+};
+
+
+class RGWSI_MetaBackend_SObj : public RGWSI_MetaBackend
+{
+ RGWSI_SysObj *sysobj_svc{nullptr};
+
+ map<string, rgwsi_meta_be_sobj_handler_info> handlers;
+
+protected:
+ int init_handler(RGWMetadataHandler *handler, RGWSI_MetaBackend_Handle *phandle) override;
+
+public:
+ struct Context_SObj : public RGWSI_MetaBackend::Context {
+ std::optional<RGWSysObjectCtx> obj_ctx;
+ rgw_pool pool;
+ string oid;
+ };
+
+ RGWSI_MetaBackend_SObj(CephContext *cct);
+ virtual ~RGWSI_MetaBackend_SObj();
+
+ RGWSI_MetaBackend::Type get_type() {
+ return MDBE_SOBJ;
+ }
+
+ void init(RGWSI_SysObj *_sysobj_svc,
+ RGWSI_MDLog *_mdlog_svc) {
+ base_init(mdlog_svc);
+ sysobj_svc = _sysobj_svc;
+ }
+
+ void init_ctx(RGWSI_MetaBackend_Handle handle, const string& key, RGWSI_MetaBackend::Context *ctx) override;
+
+ virtual int get_entry(RGWSI_MetaBackend::Context *ctx,
+ bufferlist *pbl,
+ RGWObjVersionTracker *objv_tracker,
+ real_time *pmtime,
+ map<string, bufferlist> *pattrs = nullptr,
+ rgw_cache_entry_info *cache_info = nullptr,
+ boost::optional<obj_version> refresh_version = boost::none) = 0;
+ virtual int put_entry(RGWSI_MetaBackend::Context *ctx, bufferlist& bl, bool exclusive,
+ RGWObjVersionTracker *objv_tracker, real_time mtime, map<string, bufferlist> *pattrs = nullptr) = 0;
+ virtual int remove_entry(RGWSI_MetaBackend::Context *ctx,
+ RGWObjVersionTracker *objv_tracker) = 0;
+};
+
+