From: Yehuda Sadeh Date: Fri, 8 Mar 2019 04:42:14 +0000 (-0800) Subject: rgw: initial meta manager refactoring X-Git-Tag: v15.1.0~1898^2^2~100 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=830cc7380134226847a4231f58d476def4910376;p=ceph-ci.git rgw: initial meta manager refactoring I didn't intend for it to be such a big commit, and it's not even compiling yet. This changes the structure of how the metadata manager and handlers work. The idea is to be able to relatively easily hook in different meta backends (or same backends with different handling -- such as the otp). Added new services for meta, meta backend, and meta backend sysobj implementation. The meta backend service is responsible for the final data storage, and updating the meta log (log might be split later on, but at the moment it keeping it together for simplicity). The handlers themselves are the ones responsible for reading or modifying the metadata. This means that they need to call the meta backend service instead of calling the utility functions. The utility functions need to call the handlers, and not the other way around. Handlers can have utility methods to assist. Left to do: get everything actually compiling and implemented. The structure is there, now need to fill in the gaps. Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index eefb1873522..8aa90368781 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -23,6 +23,10 @@ set(librgw_common_srcs services/svc_finisher.cc services/svc_bucket.cc services/svc_cls.cc + services/svc_mdlog.cc + services/svc_meta.cc + services/svc_meta_be.cc + services/svc_meta_be_sobj.cc services/svc_notify.cc services/svc_quota.cc services/svc_sync_modules.cc diff --git a/src/rgw/rgw_bucket.cc b/src/rgw/rgw_bucket.cc index 40092cab172..8cef3b6bfcc 100644 --- a/src/rgw/rgw_bucket.cc +++ b/src/rgw/rgw_bucket.cc @@ -53,6 +53,32 @@ void rgw_get_buckets_obj(const rgw_user& user_id, string& buckets_obj_id) buckets_obj_id += RGW_BUCKETS_OBJ_SUFFIX; } +/* + * The tenant_name is always returned on purpose. May be empty, of course. + */ +static void parse_bucket(const string& bucket, + string *tenant_name, + string *bucket_name, + string *bucket_instance = nullptr /* optional */) +{ + int pos = bucket.find('/'); + if (pos >= 0) { + *tenant_name = bucket.substr(0, pos); + } else { + tenant_name->clear(); + } + string bn = bucket.substr(pos + 1); + pos = bn.find (':'); + if (pos < 0) { + *bucket_name = std::move(bn); + return; + } + *bucket_name = bn.substr(0, pos); + if (bucket_instance) { + *bucket_instance = bn.substr(pos + 1); + } +} + /* * Note that this is not a reversal of parse_bucket(). That one deals * with the syntax we need in metadata and such. This one deals with @@ -309,33 +335,6 @@ int rgw_bucket_instance_remove_entry(RGWRados *store, const string& entry, return store->meta_mgr->remove_entry(bucket_instance_meta_handler, entry, objv_tracker); } -// 'tenant/' is used in bucket instance keys for sync to avoid parsing ambiguity -// with the existing instance[:shard] format. once we parse the shard, the / is -// replaced with a : to match the [tenant:]instance format -void rgw_bucket_instance_key_to_oid(string& key) -{ - // replace tenant/ with tenant: - auto c = key.find('/'); - if (c != string::npos) { - key[c] = ':'; - } -} - -// convert bucket instance oids back to the tenant/ format for metadata keys. -// it's safe to parse 'tenant:' only for oids, because they won't contain the -// optional :shard at the end -void rgw_bucket_instance_oid_to_key(string& oid) -{ - // find first : (could be tenant:bucket or bucket:instance) - auto c = oid.find(':'); - if (c != string::npos) { - // if we find another :, the first one was for tenant - if (oid.find(':', c + 1) != string::npos) { - oid[c] = '/'; - } - } -} - int rgw_bucket_parse_bucket_instance(const string& bucket_instance, string *target_bucket_instance, int *shard_id) { ssize_t pos = bucket_instance.rfind(':'); @@ -810,17 +809,6 @@ int rgw_remove_bucket_bypass_gc(RGWRados *store, rgw_bucket& bucket, return ret; } -int rgw_bucket_delete_bucket_obj(RGWRados *store, - const string& tenant_name, - const string& bucket_name, - RGWObjVersionTracker& objv_tracker) -{ - string key; - - rgw_make_bucket_entry_name(tenant_name, bucket_name, key); - return store->meta_mgr->remove_entry(bucket_meta_handler, key, &objv_tracker); -} - static void set_err_msg(std::string *sink, std::string msg) { if (sink && !msg.empty()) @@ -2565,11 +2553,84 @@ void RGWBucketCompleteInfo::decode_json(JSONObj *obj) { JSONDecoder::decode_json("attrs", attrs, obj); } +class RGW_MB_Handler_Module_Bucket : public RGWSI_MBSObj_Handler_Module { + RGWSI_Zone *zone_svc; +pubic: + RGW_MB_Handler_Module_Bucket(RGWSI_Zone *_zone_svc) : zone_svc {} + + void get_pool_and_oid(RGWRados *store, const string& key, rgw_pool& pool, string& oid) override { + oid = key; + pool = zone_svc->get_zone_params().domain_root; + } +}; + class RGWBucketMetadataHandler : public RGWMetadataHandler { public: string get_type() override { return "bucket"; } + RGWSI_MetaBackend::ModuleRef get_backend_module(RGWSI_MetaBackend::Type be_type) override { + return RGWSI_MetaBackend::ModuleRef(new RGW_MB_Handler_Module_Bucket(store->svc.zone)); + } + + int read_bucket_entrypoint_info(RGWSI_MetaBackend *ctx, + string& entry, + RGWBucketEntrypointInfo *be, + RGWObjVersionTracker *objv_tracker, + ceph::real_time *pmtime, + map *pattrs) { + bufferlist bl; + int ret = meta_be->get_entry(ctx, &bl, + objv_tracker, pmtime, pattrs, + nullptr, nullopt); + if (ret < 0) { + return ret; + } + + try { + auto iter = bl.cbegin(); + ceph::decode(*be, iter); + } catch (buffer::error& err) { + return -EIO; + } + return 0; + } + + int store_bucket_entrypoint_info(RGWSI_MetaBackend *ctx, + string& entry, + const RGWBucketEntrypointInfo& be, + RGWObjVersionTracker *objv_tracker, + const ceph::real_time& mtime, + map *pattrs) { + bufferlist bl; + ceph::encode(be, bl); + int ret = meta_be->put(ctx, bl, + false, objv_tracker, mtime, pattrs, + APPLY_ALWAYS); + if (ret < 0) { + return ret; + } + + return 0; + } + + int remove_bucket_entrypoint_info(RGWSI_MetaBackend *ctx, + string& entry, + RGWObjVersionTracker *objv_tracker, + const ceph::real_time& mtime) + + bufferlist bl; + ceph::encode(be, bl); + int ret = meta_be->remove(ctx, bl, + objv_tracker, mtime, + APPLY_ALWAYS); + if (ret < 0) { + return ret; + } + + return 0; + } + int get(RGWRados *store, string& entry, RGWMetadataObject **obj) override { RGWObjVersionTracker ot; RGWBucketEntryPoint be; @@ -2591,7 +2652,7 @@ public: return 0; } - int put(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker, + int put(RGWSI_MetaBackend::Context *ctx, string& entry, RGWObjVersionTracker& objv_tracker, real_time mtime, JSONObj *obj, sync_type_t sync_type) override { RGWBucketEntryPoint be, old_be; try { @@ -2601,14 +2662,11 @@ public: } real_time orig_mtime; - map attrs; RGWObjVersionTracker old_ot; - auto obj_ctx = store->svc.sysobj->init_obj_ctx(); - string tenant_name, bucket_name; - parse_bucket(entry, &tenant_name, &bucket_name); - int ret = store->get_bucket_entrypoint_info(obj_ctx, tenant_name, bucket_name, old_be, &old_ot, &orig_mtime, &attrs); + map attrs; + int ret = read_bucket_entrypoint_info(ctx, entry, &old_be, &old_ot, &orig_mtime, &attrs); if (ret < 0 && ret != -ENOENT) return ret; @@ -2621,7 +2679,7 @@ public: objv_tracker.read_version = old_ot.read_version; /* maintain the obj version we just read */ - ret = store->put_bucket_entrypoint_info(tenant_name, bucket_name, be, false, objv_tracker, mtime, &attrs); + ret = store_bucket_entrypoint_info(entry, be, false, objv_tracker, mtime, &attrs); if (ret < 0) return ret; @@ -2641,13 +2699,12 @@ public: RGWListRawObjsCtx ctx; }; - int remove(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker) override { + int remove(RGWSI_MetaBackend::Context *ctx, string& entry, RGWObjVersionTracker& objv_tracker) override { RGWBucketEntryPoint be; - auto obj_ctx = store->svc.sysobj->init_obj_ctx(); - string tenant_name, bucket_name; - parse_bucket(entry, &tenant_name, &bucket_name); - int ret = store->get_bucket_entrypoint_info(obj_ctx, tenant_name, bucket_name, be, &objv_tracker, NULL, NULL); + real_time orig_mtime; + + int ret = read_bucket_entrypoint_info(ctx, entry, be, &objv_tracker, &orig_mtime, nullptr); if (ret < 0) return ret; @@ -2661,7 +2718,7 @@ public: lderr(store->ctx()) << "could not unlink bucket=" << entry << " owner=" << be.owner << dendl; } - ret = rgw_bucket_delete_bucket_obj(store, tenant_name, bucket_name, objv_tracker); + ret = remove_bucket_entrypoint_info(ctx, entry, objv_tracker, orig_mtime); if (ret < 0) { lderr(store->ctx()) << "could not delete bucket=" << entry << dendl; } @@ -2669,11 +2726,6 @@ public: return 0; } - void get_pool_and_oid(RGWRados *store, const string& key, rgw_pool& pool, string& oid) override { - oid = key; - pool = store->svc.zone->get_zone_params().domain_root; - } - int list_keys_init(RGWRados *store, const string& marker, void **phandle) override { auto info = std::make_unique(); @@ -2932,6 +2984,58 @@ public: }; +class RGW_MB_Handler_Module_BI : public RGWSI_MBSObj_Handler_Module { + RGWSI_Zone *zone_svc; +pubic: + RGW_MB_Handler_Module_BI(RGWSI_Zone *_zone_svc) : zone_svc {} + + /* + * hash entry for mdlog placement. Use the same hash key we'd have for the bucket entry + * point, so that the log entries end up at the same log shard, so that we process them + * in order + */ + void get_hash_key(const string& section, const string& key, string& hash_key) override { + string k; + int pos = key.find(':'); + if (pos < 0) + k = key; + else + k = key.substr(0, pos); + hash_key = "bucket:" + k; + } + + void get_pool_and_oid(RGWRados *store, const string& key, rgw_pool& pool, string& oid) override { + oid = RGW_BUCKET_INSTANCE_MD_PREFIX + key; + rgw_bucket_instance_key_to_oid(oid); + pool = store->svc.zone->get_zone_params().domain_root; + } + +// 'tenant/' is used in bucket instance keys for sync to avoid parsing ambiguity +// with the existing instance[:shard] format. once we parse the shard, the / is +// replaced with a : to match the [tenant:]instance format + void key_to_oid(string& key) override { + // replace tenant/ with tenant: + auto c = key.find('/'); + if (c != string::npos) { + key[c] = ':'; + } + } + + // convert bucket instance oids back to the tenant/ format for metadata keys. + // it's safe to parse 'tenant:' only for oids, because they won't contain the + // optional :shard at the end + void oid_to_key(string& oid) override { + // find first : (could be tenant:bucket or bucket:instance) + auto c = oid.find(':'); + if (c != string::npos) { + // if we find another :, the first one was for tenant + if (oid.find(':', c + 1) != string::npos) { + oid[c] = '/'; + } + } + } +}; + class RGWBucketInstanceMetadataHandler : public RGWMetadataHandler { public: @@ -3067,12 +3171,6 @@ public: &info.objv_tracker); } - void get_pool_and_oid(RGWRados *store, const string& key, rgw_pool& pool, string& oid) override { - oid = RGW_BUCKET_INSTANCE_MD_PREFIX + key; - rgw_bucket_instance_key_to_oid(oid); - pool = store->svc.zone->get_zone_params().domain_root; - } - int list_keys_init(RGWRados *store, const string& marker, void **phandle) override { auto info = std::make_unique(); @@ -3134,21 +3232,6 @@ public: list_keys_info *info = static_cast(handle); return info->store->list_raw_objs_get_cursor(info->ctx); } - - /* - * hash entry for mdlog placement. Use the same hash key we'd have for the bucket entry - * point, so that the log entries end up at the same log shard, so that we process them - * in order - */ - void get_hash_key(const string& section, const string& key, string& hash_key) override { - string k; - int pos = key.find(':'); - if (pos < 0) - k = key; - else - k = key.substr(0, pos); - hash_key = "bucket:" + k; - } }; class RGWArchiveBucketInstanceMetadataHandler : public RGWBucketInstanceMetadataHandler { @@ -3186,6 +3269,7 @@ void rgw_bucket_init(RGWMetadataManager *mm) bucket_meta_handler = RGWBucketMetaHandlerAllocator::alloc(); bucket_instance_meta_handler = RGWBucketInstanceMetaHandlerAllocator::alloc(); } - mm->register_handler(bucket_meta_handler); - mm->register_handler(bucket_instance_meta_handler); +#warning handle failures + bucket_meta_handler->init(mm); + bucket_instance_meta_handler->init(mm); } diff --git a/src/rgw/rgw_bucket.h b/src/rgw/rgw_bucket.h index f8f42891f97..48b2e9d95d7 100644 --- a/src/rgw/rgw_bucket.h +++ b/src/rgw/rgw_bucket.h @@ -40,11 +40,6 @@ extern int rgw_bucket_instance_remove_entry(RGWRados *store, const string& entry extern void rgw_bucket_instance_key_to_oid(string& key); extern void rgw_bucket_instance_oid_to_key(string& oid); -extern int rgw_bucket_delete_bucket_obj(RGWRados *store, - const string& tenant_name, - const string& bucket_name, - RGWObjVersionTracker& objv_tracker); - extern int rgw_bucket_sync_user_stats(RGWRados *store, const rgw_user& user_id, const RGWBucketInfo& bucket_info); extern int rgw_bucket_sync_user_stats(RGWRados *store, const string& tenant_name, const string& bucket_name); diff --git a/src/rgw/rgw_json_enc.cc b/src/rgw/rgw_json_enc.cc index 6b4756cda90..6543406cdc5 100644 --- a/src/rgw/rgw_json_enc.cc +++ b/src/rgw/rgw_json_enc.cc @@ -960,7 +960,6 @@ void RGWZoneParams::dump(Formatter *f) const encode_json("otp_pool", otp_pool, f); encode_json_plain("system_key", system_key, f); encode_json("placement_pools", placement_pools, f); - encode_json("metadata_heap", metadata_heap, f); encode_json("tier_config", tier_config, f); encode_json("realm_id", realm_id, f); } @@ -1057,7 +1056,6 @@ void RGWZoneParams::decode_json(JSONObj *obj) JSONDecoder::decode_json("otp_pool", otp_pool, obj); JSONDecoder::decode_json("system_key", system_key, obj); JSONDecoder::decode_json("placement_pools", placement_pools, obj); - JSONDecoder::decode_json("metadata_heap", metadata_heap, obj); JSONDecoder::decode_json("tier_config", tier_config, obj); JSONDecoder::decode_json("realm_id", realm_id, obj); diff --git a/src/rgw/rgw_mdlog.h b/src/rgw/rgw_mdlog.h new file mode 100644 index 00000000000..aeb068aff68 --- /dev/null +++ b/src/rgw/rgw_mdlog.h @@ -0,0 +1,166 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2019 Red Hat, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#pragma once + +#include "rgw_metadata.h" +#include "rgw_mdlog_types.h" + +#define META_LOG_OBJ_PREFIX "meta.log." + +struct RGWMetadataLogInfo { + string marker; + real_time last_update; + + void dump(Formatter *f) const; + void decode_json(JSONObj *obj); +}; + +class RGWCompletionManager; + +class RGWMetadataLogInfoCompletion : public RefCountedObject { + public: + using info_callback_t = std::function; + private: + cls_log_header header; + librados::IoCtx io_ctx; + librados::AioCompletion *completion; + std::mutex mutex; //< protects callback between cancel/complete + boost::optional callback; //< cleared on cancel + public: + explicit RGWMetadataLogInfoCompletion(info_callback_t callback); + ~RGWMetadataLogInfoCompletion() override; + + librados::IoCtx& get_io_ctx() { return io_ctx; } + cls_log_header& get_header() { return header; } + librados::AioCompletion* get_completion() { return completion; } + + void finish(librados::completion_t cb) { + std::lock_guard lock(mutex); + if (callback) { + (*callback)(completion->get_return_value(), header); + } + } + void cancel() { + std::lock_guard lock(mutex); + callback = boost::none; + } +}; + +class RGWMetadataLog { + CephContext *cct; + RGWRados *store; + const string prefix; + + static std::string make_prefix(const std::string& period) { + if (period.empty()) + return META_LOG_OBJ_PREFIX; + return META_LOG_OBJ_PREFIX + period + "."; + } + + RWLock lock; + set modified_shards; + + void mark_modified(int shard_id); +public: + RGWMetadataLog(CephContext *_cct, RGWRados *_store, const std::string& period) + : cct(_cct), store(_store), + prefix(make_prefix(period)), + lock("RGWMetaLog::lock") {} + + void get_shard_oid(int id, string& oid) const { + char buf[16]; + snprintf(buf, sizeof(buf), "%d", id); + oid = prefix + buf; + } + + int add_entry(RGWSI_MetaBackend::Module *module, const string& section, const string& key, bufferlist& bl); + int store_entries_in_shard(list& entries, int shard_id, librados::AioCompletion *completion); + + struct LogListCtx { + int cur_shard; + string marker; + real_time from_time; + real_time end_time; + + string cur_oid; + + bool done; + + LogListCtx() : cur_shard(0), done(false) {} + }; + + void init_list_entries(int shard_id, const real_time& from_time, const real_time& end_time, string& marker, void **handle); + void complete_list_entries(void *handle); + int list_entries(void *handle, + int max_entries, + list& entries, + string *out_marker, + bool *truncated); + + int trim(int shard_id, const real_time& from_time, const real_time& end_time, const string& start_marker, const string& end_marker); + int get_info(int shard_id, RGWMetadataLogInfo *info); + int get_info_async(int shard_id, RGWMetadataLogInfoCompletion *completion); + int lock_exclusive(int shard_id, timespan duration, string&zone_id, string& owner_id); + int unlock(int shard_id, string& zone_id, string& owner_id); + + int update_shards(list& shards); + + void read_clear_modified(set &modified); +}; + +struct LogStatusDump { + RGWMDLogStatus status; + + explicit LogStatusDump(RGWMDLogStatus _status) : status(_status) {} + void dump(Formatter *f) const; +}; + +struct RGWMetadataLogData { + obj_version read_version; + obj_version write_version; + RGWMDLogStatus status; + + RGWMetadataLogData() : status(MDLOG_STATUS_UNKNOWN) {} + + void encode(bufferlist& bl) const; + void decode(bufferlist::const_iterator& bl); + void dump(Formatter *f) const; + void decode_json(JSONObj *obj); +}; +WRITE_CLASS_ENCODER(RGWMetadataLogData) + +struct RGWMetadataLogHistory { + epoch_t oldest_realm_epoch; + std::string oldest_period_id; + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(oldest_realm_epoch, bl); + encode(oldest_period_id, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::const_iterator& p) { + DECODE_START(1, p); + decode(oldest_realm_epoch, p); + decode(oldest_period_id, p); + DECODE_FINISH(p); + } + + static const std::string oid; +}; +WRITE_CLASS_ENCODER(RGWMetadataLogHistory) + diff --git a/src/rgw/rgw_metadata.cc b/src/rgw/rgw_metadata.cc index 06aae393f03..c8d50cf272a 100644 --- a/src/rgw/rgw_metadata.cc +++ b/src/rgw/rgw_metadata.cc @@ -22,6 +22,8 @@ #define dout_subsys ceph_subsys_rgw +const std::string RGWMetadataLogHistory::oid = "meta.history"; + void LogStatusDump::dump(Formatter *f) const { string s; switch (status) { @@ -97,14 +99,14 @@ void RGWMetadataLogData::decode_json(JSONObj *obj) { } -int RGWMetadataLog::add_entry(RGWMetadataHandler *handler, const string& section, const string& key, bufferlist& bl) { +int RGWMetadataLog::add_entry(RGWSI_MetaBackend::Module *module, const string& section, const string& key, bufferlist& bl) { if (!store->svc.zone->need_to_log_metadata()) return 0; string oid; string hash_key; - handler->get_hash_key(section, key, hash_key); + module->get_hash_key(section, key, hash_key); int shard_id; store->shard_name(prefix, cct->_conf->rgw_md_log_max_shards, hash_key, oid, &shard_id); @@ -353,327 +355,23 @@ RGWMetadataManager::~RGWMetadataManager() handlers.clear(); } -const std::string RGWMetadataLogHistory::oid = "meta.history"; - -namespace { - -int read_history(RGWRados *store, RGWMetadataLogHistory *state, - RGWObjVersionTracker *objv_tracker) -{ - auto obj_ctx = store->svc.sysobj->init_obj_ctx(); - auto& pool = store->svc.zone->get_zone_params().log_pool; - const auto& oid = RGWMetadataLogHistory::oid; - bufferlist bl; - int ret = rgw_get_system_obj(obj_ctx, pool, oid, bl, objv_tracker, nullptr, null_yield); - if (ret < 0) { - return ret; - } - if (bl.length() == 0) { - /* bad history object, remove it */ - rgw_raw_obj obj(pool, oid); - auto sysobj = obj_ctx.get_obj(obj); - ret = sysobj.wop().remove(null_yield); - if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: meta history is empty, but cannot remove it (" << cpp_strerror(-ret) << ")" << dendl; - return ret; - } - return -ENOENT; - } - try { - auto p = bl.cbegin(); - state->decode(p); - } catch (buffer::error& e) { - ldout(store->ctx(), 1) << "failed to decode the mdlog history: " - << e.what() << dendl; - return -EIO; - } - return 0; -} - -int write_history(RGWRados *store, const RGWMetadataLogHistory& state, - RGWObjVersionTracker *objv_tracker, bool exclusive = false) -{ - bufferlist bl; - state.encode(bl); - - auto& pool = store->svc.zone->get_zone_params().log_pool; - const auto& oid = RGWMetadataLogHistory::oid; - return rgw_put_system_obj(store, pool, oid, bl, - exclusive, objv_tracker, real_time{}); -} - -using Cursor = RGWPeriodHistory::Cursor; - -/// read the mdlog history and use it to initialize the given cursor -class ReadHistoryCR : public RGWCoroutine { - RGWRados *store; - Cursor *cursor; - RGWObjVersionTracker *objv_tracker; - RGWMetadataLogHistory state; - public: - ReadHistoryCR(RGWRados *store, Cursor *cursor, - RGWObjVersionTracker *objv_tracker) - : RGWCoroutine(store->ctx()), store(store), cursor(cursor), - objv_tracker(objv_tracker) - {} - - int operate() { - reenter(this) { - yield { - rgw_raw_obj obj{store->svc.zone->get_zone_params().log_pool, - RGWMetadataLogHistory::oid}; - constexpr bool empty_on_enoent = false; - - using ReadCR = RGWSimpleRadosReadCR; - call(new ReadCR(store->get_async_rados(), store->svc.sysobj, obj, - &state, empty_on_enoent, objv_tracker)); - } - if (retcode < 0) { - ldout(cct, 1) << "failed to read mdlog history: " - << cpp_strerror(retcode) << dendl; - return set_cr_error(retcode); - } - *cursor = store->period_history->lookup(state.oldest_realm_epoch); - if (!*cursor) { - return set_cr_error(cursor->get_error()); - } - - ldout(cct, 10) << "read mdlog history with oldest period id=" - << state.oldest_period_id << " realm_epoch=" - << state.oldest_realm_epoch << dendl; - return set_cr_done(); - } - return 0; - } -}; - -/// write the given cursor to the mdlog history -class WriteHistoryCR : public RGWCoroutine { - RGWRados *store; - Cursor cursor; - RGWObjVersionTracker *objv; - RGWMetadataLogHistory state; - public: - WriteHistoryCR(RGWRados *store, const Cursor& cursor, - RGWObjVersionTracker *objv) - : RGWCoroutine(store->ctx()), store(store), cursor(cursor), objv(objv) - {} - - int operate() { - reenter(this) { - state.oldest_period_id = cursor.get_period().get_id(); - state.oldest_realm_epoch = cursor.get_epoch(); - - yield { - rgw_raw_obj obj{store->svc.zone->get_zone_params().log_pool, - RGWMetadataLogHistory::oid}; - - using WriteCR = RGWSimpleRadosWriteCR; - call(new WriteCR(store->get_async_rados(), store->svc.sysobj, obj, state, objv)); - } - if (retcode < 0) { - ldout(cct, 1) << "failed to write mdlog history: " - << cpp_strerror(retcode) << dendl; - return set_cr_error(retcode); - } - - ldout(cct, 10) << "wrote mdlog history with oldest period id=" - << state.oldest_period_id << " realm_epoch=" - << state.oldest_realm_epoch << dendl; - return set_cr_done(); - } - return 0; - } -}; - -/// update the mdlog history to reflect trimmed logs -class TrimHistoryCR : public RGWCoroutine { - RGWRados *store; - const Cursor cursor; //< cursor to trimmed period - RGWObjVersionTracker *objv; //< to prevent racing updates - Cursor next; //< target cursor for oldest log period - Cursor existing; //< existing cursor read from disk - - public: - TrimHistoryCR(RGWRados *store, Cursor cursor, RGWObjVersionTracker *objv) - : RGWCoroutine(store->ctx()), - store(store), cursor(cursor), objv(objv), next(cursor) - { - next.next(); // advance past cursor - } - - int operate() { - reenter(this) { - // read an existing history, and write the new history if it's newer - yield call(new ReadHistoryCR(store, &existing, objv)); - if (retcode < 0) { - return set_cr_error(retcode); - } - // reject older trims with ECANCELED - if (cursor.get_epoch() < existing.get_epoch()) { - ldout(cct, 4) << "found oldest log epoch=" << existing.get_epoch() - << ", rejecting trim at epoch=" << cursor.get_epoch() << dendl; - return set_cr_error(-ECANCELED); - } - // overwrite with updated history - yield call(new WriteHistoryCR(store, next, objv)); - if (retcode < 0) { - return set_cr_error(retcode); - } - return set_cr_done(); - } - return 0; - } -}; - -// traverse all the way back to the beginning of the period history, and -// return a cursor to the first period in a fully attached history -Cursor find_oldest_period(RGWRados *store) +int RGWMetadataHandler::init(RGWMetadataManager *manager) { - auto cct = store->ctx(); - auto cursor = store->period_history->get_current(); - - while (cursor) { - // advance to the period's predecessor - if (!cursor.has_prev()) { - auto& predecessor = cursor.get_period().get_predecessor(); - if (predecessor.empty()) { - // this is the first period, so our logs must start here - ldout(cct, 10) << "find_oldest_period returning first " - "period " << cursor.get_period().get_id() << dendl; - return cursor; - } - // pull the predecessor and add it to our history - RGWPeriod period; - int r = store->period_puller->pull(predecessor, period); - if (r < 0) { - return Cursor{r}; - } - auto prev = store->period_history->insert(std::move(period)); - if (!prev) { - return prev; - } - ldout(cct, 20) << "find_oldest_period advancing to " - "predecessor period " << predecessor << dendl; - ceph_assert(cursor.has_prev()); - } - cursor.prev(); - } - ldout(cct, 10) << "find_oldest_period returning empty cursor" << dendl; - return cursor; + return register_handler(this, &be_handle); } -} // anonymous namespace - -Cursor RGWMetadataManager::init_oldest_log_period() -{ - // read the mdlog history - RGWMetadataLogHistory state; - RGWObjVersionTracker objv; - int ret = read_history(store, &state, &objv); - - if (ret == -ENOENT) { - // initialize the mdlog history and write it - ldout(cct, 10) << "initializing mdlog history" << dendl; - auto cursor = find_oldest_period(store); - if (!cursor) { - return cursor; - } - - // write the initial history - state.oldest_realm_epoch = cursor.get_epoch(); - state.oldest_period_id = cursor.get_period().get_id(); - - constexpr bool exclusive = true; // don't overwrite - int ret = write_history(store, state, &objv, exclusive); - if (ret < 0 && ret != -EEXIST) { - ldout(cct, 1) << "failed to write mdlog history: " - << cpp_strerror(ret) << dendl; - return Cursor{ret}; - } - return cursor; - } else if (ret < 0) { - ldout(cct, 1) << "failed to read mdlog history: " - << cpp_strerror(ret) << dendl; - return Cursor{ret}; - } - - // if it's already in the history, return it - auto cursor = store->period_history->lookup(state.oldest_realm_epoch); - if (cursor) { - return cursor; - } - // pull the oldest period by id - RGWPeriod period; - ret = store->period_puller->pull(state.oldest_period_id, period); - if (ret < 0) { - ldout(cct, 1) << "failed to read period id=" << state.oldest_period_id - << " for mdlog history: " << cpp_strerror(ret) << dendl; - return Cursor{ret}; - } - // verify its realm_epoch - if (period.get_realm_epoch() != state.oldest_realm_epoch) { - ldout(cct, 1) << "inconsistent mdlog history: read period id=" - << period.get_id() << " with realm_epoch=" << period.get_realm_epoch() - << ", expected realm_epoch=" << state.oldest_realm_epoch << dendl; - return Cursor{-EINVAL}; - } - // attach the period to our history - return store->period_history->attach(std::move(period)); -} - -Cursor RGWMetadataManager::read_oldest_log_period() const -{ - RGWMetadataLogHistory state; - int ret = read_history(store, &state, nullptr); - if (ret < 0) { - ldout(store->ctx(), 1) << "failed to read mdlog history: " - << cpp_strerror(ret) << dendl; - return Cursor{ret}; - } - - ldout(store->ctx(), 10) << "read mdlog history with oldest period id=" - << state.oldest_period_id << " realm_epoch=" - << state.oldest_realm_epoch << dendl; - - return store->period_history->lookup(state.oldest_realm_epoch); -} - -RGWCoroutine* RGWMetadataManager::read_oldest_log_period_cr(Cursor *period, - RGWObjVersionTracker *objv) const -{ - return new ReadHistoryCR(store, period, objv); -} - -RGWCoroutine* RGWMetadataManager::trim_log_period_cr(Cursor period, - RGWObjVersionTracker *objv) const -{ - return new TrimHistoryCR(store, period, objv); -} - -int RGWMetadataManager::init(const std::string& current_period) -{ - // open a log for the current period - current_log = get_log(current_period); - return 0; -} - -RGWMetadataLog* RGWMetadataManager::get_log(const std::string& period) -{ - // construct the period's log in place if it doesn't exist - auto insert = md_logs.emplace(std::piecewise_construct, - std::forward_as_tuple(period), - std::forward_as_tuple(cct, store, period)); - return &insert.first->second; -} - -int RGWMetadataManager::register_handler(RGWMetadataHandler *handler) +int RGWMetadataManager::register_handler(RGWMetadataHandler *handler, RGWSI_MetaBackend::Handle *phandle) { string type = handler->get_type(); if (handlers.find(type) != handlers.end()) return -EINVAL; + int ret = store->svc.meta->init_handler(handler, phandle); + if (ret < 0) { + return ret; + } + handlers[type] = handler; return 0; @@ -795,38 +493,6 @@ int RGWMetadataManager::put(string& metadata_key, bufferlist& bl, return ret; } -int RGWMetadataManager::prepare_mutate(RGWRados *store, - rgw_pool& pool, const string& oid, - const real_time& mtime, - RGWObjVersionTracker *objv_tracker, - RGWMetadataHandler::sync_type_t sync_mode) -{ - bufferlist bl; - real_time orig_mtime; - auto obj_ctx = store->svc.sysobj->init_obj_ctx(); - int ret = rgw_get_system_obj(obj_ctx, pool, oid, - bl, objv_tracker, &orig_mtime, - null_yield, nullptr, nullptr); - if (ret < 0 && ret != -ENOENT) { - return ret; - } - if (ret != -ENOENT && - !RGWMetadataHandler::check_versions(objv_tracker->read_version, orig_mtime, - objv_tracker->write_version, mtime, sync_mode)) { - return STATUS_NO_APPLY; - } - - if (objv_tracker->write_version.tag.empty()) { - if (objv_tracker->read_version.tag.empty()) { - objv_tracker->generate_new_write_ver(store->ctx()); - } else { - objv_tracker->write_version = objv_tracker->read_version; - objv_tracker->write_version.ver++; - } - } - return 0; -} - int RGWMetadataManager::remove(string& metadata_key) { RGWMetadataHandler *handler; @@ -849,41 +515,6 @@ int RGWMetadataManager::remove(string& metadata_key) return handler->remove(store, entry, objv_tracker); } -int RGWMetadataManager::lock_exclusive(string& metadata_key, timespan duration, string& owner_id) { - RGWMetadataHandler *handler; - string entry; - string zone_id; - - int ret = find_handler(metadata_key, &handler, entry); - if (ret < 0) - return ret; - - rgw_pool pool; - string oid; - - handler->get_pool_and_oid(store, entry, pool, oid); - - return store->lock_exclusive(pool, oid, duration, zone_id, owner_id); -} - -int RGWMetadataManager::unlock(string& metadata_key, string& owner_id) { - librados::IoCtx io_ctx; - RGWMetadataHandler *handler; - string entry; - string zone_id; - - int ret = find_handler(metadata_key, &handler, entry); - if (ret < 0) - return ret; - - rgw_pool pool; - string oid; - - handler->get_pool_and_oid(store, entry, pool, oid); - - return store->unlock(pool, oid, zone_id, owner_id); -} - struct list_keys_handle { void *handle; RGWMetadataHandler *handler; @@ -973,190 +604,6 @@ void RGWMetadataManager::get_sections(list& sections) } } -int RGWMetadataManager::pre_modify(RGWMetadataHandler *handler, string& section, const string& key, - RGWMetadataLogData& log_data, RGWObjVersionTracker *objv_tracker, - RGWMDLogStatus op_type) -{ - section = handler->get_type(); - - /* if write version has not been set, and there's a read version, set it so that we can - * log it - */ - if (objv_tracker) { - if (objv_tracker->read_version.ver && !objv_tracker->write_version.ver) { - objv_tracker->write_version = objv_tracker->read_version; - objv_tracker->write_version.ver++; - } - log_data.read_version = objv_tracker->read_version; - log_data.write_version = objv_tracker->write_version; - } - - log_data.status = op_type; - - bufferlist logbl; - encode(log_data, logbl); - - ceph_assert(current_log); // must have called init() - int ret = current_log->add_entry(handler, section, key, logbl); - if (ret < 0) - return ret; - - return 0; -} - -int RGWMetadataManager::post_modify(RGWMetadataHandler *handler, const string& section, const string& key, RGWMetadataLogData& log_data, - RGWObjVersionTracker *objv_tracker, int ret) -{ - if (ret >= 0) - log_data.status = MDLOG_STATUS_COMPLETE; - else - log_data.status = MDLOG_STATUS_ABORT; - - bufferlist logbl; - encode(log_data, logbl); - - ceph_assert(current_log); // must have called init() - int r = current_log->add_entry(handler, section, key, logbl); - if (ret < 0) - return ret; - - if (r < 0) - return r; - - return 0; -} - -string RGWMetadataManager::heap_oid(RGWMetadataHandler *handler, const string& key, const obj_version& objv) -{ - char buf[objv.tag.size() + 32]; - snprintf(buf, sizeof(buf), "%s:%lld", objv.tag.c_str(), (long long)objv.ver); - return string(".meta:") + handler->get_type() + ":" + key + ":" + buf; -} - -int RGWMetadataManager::store_in_heap(RGWMetadataHandler *handler, const string& key, bufferlist& bl, - RGWObjVersionTracker *objv_tracker, real_time mtime, - map *pattrs) -{ - if (!objv_tracker) { - return -EINVAL; - } - - rgw_pool heap_pool(store->svc.zone->get_zone_params().metadata_heap); - - if (heap_pool.empty()) { - return 0; - } - - RGWObjVersionTracker otracker; - otracker.write_version = objv_tracker->write_version; - string oid = heap_oid(handler, key, objv_tracker->write_version); - int ret = rgw_put_system_obj(store, heap_pool, oid, - bl, false, &otracker, mtime, pattrs); - if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: rgw_put_system_obj() oid=" << oid << " returned ret=" << ret << dendl; - return ret; - } - - return 0; -} - -int RGWMetadataManager::remove_from_heap(RGWMetadataHandler *handler, const string& key, RGWObjVersionTracker *objv_tracker) -{ - if (!objv_tracker) { - return -EINVAL; - } - - rgw_pool heap_pool(store->svc.zone->get_zone_params().metadata_heap); - - if (heap_pool.empty()) { - return 0; - } - - string oid = heap_oid(handler, key, objv_tracker->write_version); - rgw_raw_obj obj(heap_pool, oid); - auto obj_ctx = store->svc.sysobj->init_obj_ctx(); - auto sysobj = obj_ctx.get_obj(obj); - int ret = sysobj.wop().remove(null_yield); - if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: sysobj.wop().remove() oid=" << oid << " returned ret=" << ret << dendl; - return ret; - } - - return 0; -} - -int RGWMetadataManager::put_entry(RGWMetadataHandler *handler, const string& key, bufferlist& bl, bool exclusive, - RGWObjVersionTracker *objv_tracker, real_time mtime, map *pattrs) -{ - string section; - RGWMetadataLogData log_data; - int ret = pre_modify(handler, section, key, log_data, objv_tracker, MDLOG_STATUS_WRITE); - if (ret < 0) - return ret; - - string oid; - rgw_pool pool; - - handler->get_pool_and_oid(store, key, pool, oid); - - ret = store_in_heap(handler, key, bl, objv_tracker, mtime, pattrs); - if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: " << __func__ << ": store_in_heap() key=" << key << " returned ret=" << ret << dendl; - goto done; - } - - ret = rgw_put_system_obj(store, pool, oid, bl, exclusive, - objv_tracker, mtime, pattrs); - - if (ret < 0) { - int r = remove_from_heap(handler, key, objv_tracker); - if (r < 0) { - ldout(store->ctx(), 0) << "ERROR: " << __func__ << ": remove_from_heap() key=" << key << " returned ret=" << r << dendl; - } - } -done: - /* cascading ret into post_modify() */ - - ret = post_modify(handler, section, key, log_data, objv_tracker, ret); - if (ret < 0) - return ret; - - return 0; -} - -int RGWMetadataManager::remove_entry(RGWMetadataHandler *handler, - const string& key, - RGWObjVersionTracker *objv_tracker) -{ - string section; - RGWMetadataLogData log_data; - int ret = pre_modify(handler, section, key, log_data, objv_tracker, MDLOG_STATUS_REMOVE); - if (ret < 0) { - return ret; - } - - string oid; - rgw_pool pool; - - handler->get_pool_and_oid(store, key, pool, oid); - - rgw_raw_obj obj(pool, oid); - - auto obj_ctx = store->svc.sysobj->init_obj_ctx(); - auto sysobj = obj_ctx.get_obj(obj); - ret = sysobj.wop() - .set_objv_tracker(objv_tracker) - .remove(null_yield); - /* cascading ret into post_modify() */ - - ret = post_modify(handler, section, key, log_data, objv_tracker, ret); - if (ret < 0) { - return ret; - } - - return 0; -} - int RGWMetadataManager::get_log_shard_id(const string& section, const string& key, int *shard_id) { diff --git a/src/rgw/rgw_metadata.h b/src/rgw/rgw_metadata.h index e410767704b..546355c2f06 100644 --- a/src/rgw/rgw_metadata.h +++ b/src/rgw/rgw_metadata.h @@ -11,12 +11,15 @@ #include "include/types.h" #include "rgw_common.h" #include "rgw_period_history.h" +#include "rgw_mdlog_types.h" #include "cls/version/cls_version_types.h" #include "cls/log/cls_log_types.h" #include "common/RWLock.h" #include "common/RefCountedObj.h" #include "common/ceph_time.h" +#include "services/svc_meta_be.h" + class RGWRados; class RGWCoroutine; @@ -26,15 +29,6 @@ struct RGWObjVersionTracker; struct obj_version; -enum RGWMDLogStatus { - MDLOG_STATUS_UNKNOWN, - MDLOG_STATUS_WRITE, - MDLOG_STATUS_SETATTRS, - MDLOG_STATUS_REMOVE, - MDLOG_STATUS_COMPLETE, - MDLOG_STATUS_ABORT, -}; - class RGWMetadataObject { protected: obj_version objv; @@ -52,33 +46,20 @@ public: class RGWMetadataManager; class RGWMetadataHandler { + friend class RGWSI_MetaBackend; friend class RGWMetadataManager; -public: - enum sync_type_t { - APPLY_ALWAYS, - APPLY_UPDATES, - APPLY_NEWER - }; - static bool string_to_sync_type(const string& sync_string, - sync_type_t& type) { - if (sync_string.compare("update-by-version") == 0) - type = APPLY_UPDATES; - else if (sync_string.compare("update-by-timestamp") == 0) - type = APPLY_NEWER; - else if (sync_string.compare("always") == 0) - type = APPLY_ALWAYS; - else - return false; - return true; - } + RGWSI_MetaBackend_Handle be_handle{0}; +public: virtual ~RGWMetadataHandler() {} virtual string get_type() = 0; + virtual RGWSI_MetaBackend::ModuleRef get_backend_module(RGWSI_MetaBackend::Type be_type) = 0; + virtual int get(RGWRados *store, string& entry, RGWMetadataObject **obj) = 0; virtual int put(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker, - real_time mtime, JSONObj *obj, sync_type_t type) = 0; + real_time mtime, JSONObj *obj, RGWMDLogSyncType type) = 0; virtual int remove(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker) = 0; virtual int list_keys_init(RGWRados *store, const string& marker, void **phandle) = 0; @@ -87,13 +68,10 @@ public: virtual string get_marker(void *handle) = 0; - /* key to use for hashing entries for log shard placement */ - virtual void get_hash_key(const string& section, const string& key, string& hash_key) { - hash_key = section + ":" + key; - } + int init(RGWMetadataManager *manager); + protected: - virtual void get_pool_and_oid(RGWRados *store, const string& key, rgw_pool& pool, string& oid) = 0; /** * Compare an incoming versus on-disk tag/version+mtime combo against * the sync mode to see if the new one should replace the on-disk one. @@ -102,7 +80,7 @@ protected: */ static bool check_versions(const obj_version& ondisk, const real_time& ondisk_time, const obj_version& incoming, const real_time& incoming_time, - sync_type_t sync_mode) { + RGWMDLogSyncType sync_mode) { switch (sync_mode) { case APPLY_UPDATES: if ((ondisk.tag != incoming.tag) || @@ -118,254 +96,26 @@ protected: } return true; } - - /* - * The tenant_name is always returned on purpose. May be empty, of course. - */ - static void parse_bucket(const string& bucket, - string *tenant_name, - string *bucket_name, - string *bucket_instance = nullptr /* optional */) - { - int pos = bucket.find('/'); - if (pos >= 0) { - *tenant_name = bucket.substr(0, pos); - } else { - tenant_name->clear(); - } - string bn = bucket.substr(pos + 1); - pos = bn.find (':'); - if (pos < 0) { - *bucket_name = std::move(bn); - return; - } - *bucket_name = bn.substr(0, pos); - if (bucket_instance) { - *bucket_instance = bn.substr(pos + 1); - } - } -}; - -#define META_LOG_OBJ_PREFIX "meta.log." - -struct RGWMetadataLogInfo { - string marker; - real_time last_update; - - void dump(Formatter *f) const; - void decode_json(JSONObj *obj); -}; - -class RGWCompletionManager; - -class RGWMetadataLogInfoCompletion : public RefCountedObject { - public: - using info_callback_t = std::function; - private: - cls_log_header header; - librados::IoCtx io_ctx; - librados::AioCompletion *completion; - std::mutex mutex; //< protects callback between cancel/complete - boost::optional callback; //< cleared on cancel - public: - explicit RGWMetadataLogInfoCompletion(info_callback_t callback); - ~RGWMetadataLogInfoCompletion() override; - - librados::IoCtx& get_io_ctx() { return io_ctx; } - cls_log_header& get_header() { return header; } - librados::AioCompletion* get_completion() { return completion; } - - void finish(librados::completion_t cb) { - std::lock_guard lock(mutex); - if (callback) { - (*callback)(completion->get_return_value(), header); - } - } - void cancel() { - std::lock_guard lock(mutex); - callback = boost::none; - } }; -class RGWMetadataLog { - CephContext *cct; - RGWRados *store; - const string prefix; - - static std::string make_prefix(const std::string& period) { - if (period.empty()) - return META_LOG_OBJ_PREFIX; - return META_LOG_OBJ_PREFIX + period + "."; - } - - RWLock lock; - set modified_shards; - - void mark_modified(int shard_id); -public: - RGWMetadataLog(CephContext *_cct, RGWRados *_store, const std::string& period) - : cct(_cct), store(_store), - prefix(make_prefix(period)), - lock("RGWMetaLog::lock") {} - - void get_shard_oid(int id, string& oid) const { - char buf[16]; - snprintf(buf, sizeof(buf), "%d", id); - oid = prefix + buf; - } - - int add_entry(RGWMetadataHandler *handler, const string& section, const string& key, bufferlist& bl); - int store_entries_in_shard(list& entries, int shard_id, librados::AioCompletion *completion); - - struct LogListCtx { - int cur_shard; - string marker; - real_time from_time; - real_time end_time; - - string cur_oid; - - bool done; - - LogListCtx() : cur_shard(0), done(false) {} - }; - - void init_list_entries(int shard_id, const real_time& from_time, const real_time& end_time, string& marker, void **handle); - void complete_list_entries(void *handle); - int list_entries(void *handle, - int max_entries, - list& entries, - string *out_marker, - bool *truncated); - - int trim(int shard_id, const real_time& from_time, const real_time& end_time, const string& start_marker, const string& end_marker); - int get_info(int shard_id, RGWMetadataLogInfo *info); - int get_info_async(int shard_id, RGWMetadataLogInfoCompletion *completion); - int lock_exclusive(int shard_id, timespan duration, string&zone_id, string& owner_id); - int unlock(int shard_id, string& zone_id, string& owner_id); - - int update_shards(list& shards); - - void read_clear_modified(set &modified); -}; - -struct LogStatusDump { - RGWMDLogStatus status; - - explicit LogStatusDump(RGWMDLogStatus _status) : status(_status) {} - void dump(Formatter *f) const; -}; - -struct RGWMetadataLogData { - obj_version read_version; - obj_version write_version; - RGWMDLogStatus status; - - RGWMetadataLogData() : status(MDLOG_STATUS_UNKNOWN) {} - - void encode(bufferlist& bl) const; - void decode(bufferlist::const_iterator& bl); - void dump(Formatter *f) const; - void decode_json(JSONObj *obj); -}; -WRITE_CLASS_ENCODER(RGWMetadataLogData) - -struct RGWMetadataLogHistory { - epoch_t oldest_realm_epoch; - std::string oldest_period_id; - - void encode(bufferlist& bl) const { - ENCODE_START(1, 1, bl); - encode(oldest_realm_epoch, bl); - encode(oldest_period_id, bl); - ENCODE_FINISH(bl); - } - void decode(bufferlist::const_iterator& p) { - DECODE_START(1, p); - decode(oldest_realm_epoch, p); - decode(oldest_period_id, p); - DECODE_FINISH(p); - } - - static const std::string oid; -}; -WRITE_CLASS_ENCODER(RGWMetadataLogHistory) - class RGWMetadataManager { map handlers; CephContext *cct; - RGWRados *store; - - // maintain a separate metadata log for each period - std::map md_logs; - // use the current period's log for mutating operations - RGWMetadataLog* current_log = nullptr; int find_handler(const string& metadata_key, RGWMetadataHandler **handler, string& entry); - int pre_modify(RGWMetadataHandler *handler, string& section, const string& key, - RGWMetadataLogData& log_data, RGWObjVersionTracker *objv_tracker, - RGWMDLogStatus op_type); - int post_modify(RGWMetadataHandler *handler, const string& section, const string& key, RGWMetadataLogData& log_data, - RGWObjVersionTracker *objv_tracker, int ret); - - string heap_oid(RGWMetadataHandler *handler, const string& key, const obj_version& objv); - int store_in_heap(RGWMetadataHandler *handler, const string& key, bufferlist& bl, - RGWObjVersionTracker *objv_tracker, real_time mtime, - map *pattrs); - int remove_from_heap(RGWMetadataHandler *handler, const string& key, RGWObjVersionTracker *objv_tracker); - int prepare_mutate(RGWRados *store, rgw_pool& pool, const string& oid, - const real_time& mtime, - RGWObjVersionTracker *objv_tracker, - RGWMetadataHandler::sync_type_t sync_mode); + +protected: + int register_handler(RGWMetadataHandler *handler, RGWSI_MetaBackend_Handle *phandle); public: - RGWMetadataManager(CephContext *_cct, RGWRados *_store); + RGWMetadataManager(CephContext *_cct); ~RGWMetadataManager(); - RGWRados* get_store() { return store; } - - int init(const std::string& current_period); - - /// initialize the oldest log period if it doesn't exist, and attach it to - /// our current history - RGWPeriodHistory::Cursor init_oldest_log_period(); - - /// read the oldest log period, and return a cursor to it in our existing - /// period history - RGWPeriodHistory::Cursor read_oldest_log_period() const; - - /// read the oldest log period asynchronously and write its result to the - /// given cursor pointer - RGWCoroutine* read_oldest_log_period_cr(RGWPeriodHistory::Cursor *period, - RGWObjVersionTracker *objv) const; - - /// try to advance the oldest log period when the given period is trimmed, - /// using a rados lock to provide atomicity - RGWCoroutine* trim_log_period_cr(RGWPeriodHistory::Cursor period, - RGWObjVersionTracker *objv) const; - - /// find or create the metadata log for the given period - RGWMetadataLog* get_log(const std::string& period); - - int register_handler(RGWMetadataHandler *handler); - - template - int mutate(RGWMetadataHandler *handler, const string& key, - const ceph::real_time& mtime, RGWObjVersionTracker *objv_tracker, - RGWMDLogStatus op_type, - RGWMetadataHandler::sync_type_t sync_mode, - F&& f); - RGWMetadataHandler *get_handler(const string& type); - int put_entry(RGWMetadataHandler *handler, const string& key, bufferlist& bl, bool exclusive, - RGWObjVersionTracker *objv_tracker, real_time mtime, map *pattrs = NULL); - int remove_entry(RGWMetadataHandler *handler, - const string& key, - RGWObjVersionTracker *objv_tracker); int get(string& metadata_key, Formatter *f); int put(string& metadata_key, bufferlist& bl, - RGWMetadataHandler::sync_type_t sync_mode, + RGWMDLogSyncType sync_mode, obj_version *existing_version = NULL); int remove(string& metadata_key); @@ -379,48 +129,11 @@ public: void dump_log_entry(cls_log_entry& entry, Formatter *f); void get_sections(list& sections); - int lock_exclusive(string& metadata_key, timespan duration, string& owner_id); - int unlock(string& metadata_key, string& owner_id); int get_log_shard_id(const string& section, const string& key, int *shard_id); void parse_metadata_key(const string& metadata_key, string& type, string& entry); }; -template -int RGWMetadataManager::mutate(RGWMetadataHandler *handler, const string& key, - const ceph::real_time& mtime, RGWObjVersionTracker *objv_tracker, - RGWMDLogStatus op_type, - RGWMetadataHandler::sync_type_t sync_mode, - F&& f) -{ - string oid; - rgw_pool pool; - - handler->get_pool_and_oid(store, key, pool, oid); - - int ret = prepare_mutate(store, pool, oid, mtime, objv_tracker, sync_mode); - if (ret < 0 || - ret == STATUS_NO_APPLY) { - return ret; - } - - string section; - RGWMetadataLogData log_data; - ret = pre_modify(handler, section, key, log_data, objv_tracker, MDLOG_STATUS_WRITE); - if (ret < 0) { - return ret; - } - - ret = std::forward(f)(); - - /* cascading ret into post_modify() */ - - ret = post_modify(handler, section, key, log_data, objv_tracker, ret); - if (ret < 0) - return ret; - - return 0; -} #endif diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 0a372da9576..27ae1443b03 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -1073,7 +1073,6 @@ void RGWRados::finalize() svc.shutdown(); - delete meta_mgr; delete binfo_cache; delete obj_tombstone_cache; @@ -1113,7 +1112,6 @@ int RGWRados::init_rados() return ret; } - meta_mgr = new RGWMetadataManager(cct, this); data_log = new RGWDataChangesLog(cct, this); cr_registry = crs.release(); return ret; @@ -1175,10 +1173,6 @@ int RGWRados::init_complete() return ret; } - period_puller.reset(new RGWPeriodPuller(this)); - period_history.reset(new RGWPeriodHistory(cct, period_puller.get(), - svc.zone->get_current_period())); - ret = open_root_pool_ctx(); if (ret < 0) return ret; @@ -7817,6 +7811,8 @@ int RGWRados::get_bucket_entrypoint_info(RGWSysObjectCtx& obj_ctx, rgw_cache_entry_info *cache_info, boost::optional refresh_version) { +#warning FIXME +#if 0 bufferlist bl; string bucket_entry; @@ -7836,6 +7832,7 @@ int RGWRados::get_bucket_entrypoint_info(RGWSysObjectCtx& obj_ctx, return -EIO; } return 0; +#endif } int RGWRados::get_bucket_info(RGWSysObjectCtx& obj_ctx, @@ -7871,7 +7868,7 @@ int RGWRados::try_refresh_bucket_info(RGWBucketInfo& info, .set_mtime(pmtime) .set_attrs(pattrs) .set_pinfo(&info) - .set_refresh_version(info.objv_tracker.read_version) + .set_refresh_version(rv) .exec(); if (r < 0) { return r; @@ -7890,7 +7887,10 @@ int RGWRados::put_bucket_entrypoint_info(const string& tenant_name, const string encode(entry_point, epbl); string bucket_entry; rgw_make_bucket_entry_name(tenant_name, bucket_name, bucket_entry); +#warning FIXME +#if 0 return rgw_bucket_store_info(this, bucket_entry, epbl, exclusive, pattrs, &objv_tracker, mtime); +#endif } int RGWRados::put_bucket_instance_info(RGWBucketInfo& info, bool exclusive, diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index e6cb4245837..0200d547df0 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -543,7 +543,7 @@ public: pools_initialized(false), quota_handler(NULL), cr_registry(NULL), - meta_mgr(NULL), data_log(NULL), reshard(NULL) {} + data_log(NULL), reshard(NULL) {} RGWRados& set_use_cache(bool status) { use_cache = status; @@ -598,15 +598,8 @@ public: */ string host_id; - // pulls missing periods for period_history - std::unique_ptr period_puller; - // maintains a connected history of periods - std::unique_ptr period_history; - RGWAsyncRadosProcessor* get_async_rados() const { return async_rados; }; - RGWMetadataManager *meta_mgr; - RGWDataChangesLog *data_log; RGWReshard *reshard; diff --git a/src/rgw/rgw_rest_metadata.cc b/src/rgw/rgw_rest_metadata.cc index 0f81d54c9b8..a7b185e5a25 100644 --- a/src/rgw/rgw_rest_metadata.cc +++ b/src/rgw/rgw_rest_metadata.cc @@ -215,6 +215,19 @@ int RGWOp_Metadata_Put::get_data(bufferlist& bl) { return 0; } +static bool string_to_sync_type(const string& sync_string, + sync_type_t& type) { + if (sync_string.compare("update-by-version") == 0) + type = APPLY_UPDATES; + else if (sync_string.compare("update-by-timestamp") == 0) + type = APPLY_NEWER; + else if (sync_string.compare("always") == 0) + type = APPLY_ALWAYS; + else + return false; + return true; +} + void RGWOp_Metadata_Put::execute() { bufferlist bl; string metadata_key; diff --git a/src/rgw/rgw_service.h b/src/rgw/rgw_service.h index cc7ff46b346..f1c4845006d 100644 --- a/src/rgw/rgw_service.h +++ b/src/rgw/rgw_service.h @@ -47,6 +47,9 @@ public: class RGWSI_Finisher; class RGWSI_Bucket; class RGWSI_Cls; +class RGWSI_MDLog; +class RGWSI_Meta; +class RGWSI_MetaBackend_SObj; class RGWSI_Notify; class RGWSI_RADOS; class RGWSI_Zone; @@ -65,6 +68,9 @@ struct RGWServices_Def std::unique_ptr finisher; std::unique_ptr bucket; std::unique_ptr cls; + std::unique_ptr mdlog; + std::unique_ptr meta; + std::unique_ptr meta_be_sobj; std::unique_ptr notify; std::unique_ptr rados; std::unique_ptr zone; @@ -90,6 +96,9 @@ struct RGWServices RGWSI_Finisher *finisher{nullptr}; RGWSI_Bucket *bucket{nullptr}; RGWSI_Cls *cls{nullptr}; + RGWSI_Meta *mdlog{nullptr}; + RGWSI_Meta *meta{nullptr}; + RGWSI_MetaBackend_SObj *meta_be_sobj{nullptr}; RGWSI_Notify *notify{nullptr}; RGWSI_RADOS *rados{nullptr}; RGWSI_Zone *zone{nullptr}; diff --git a/src/rgw/rgw_tools.cc b/src/rgw/rgw_tools.cc index 9a6c5eb196d..703025897a1 100644 --- a/src/rgw/rgw_tools.cc +++ b/src/rgw/rgw_tools.cc @@ -96,7 +96,7 @@ int rgw_init_ioctx(librados::Rados *rados, const rgw_pool& pool, return 0; } -int rgw_put_system_obj(RGWRados *rgwstore, const rgw_pool& pool, const string& oid, bufferlist& data, bool exclusive, +int rgw_put_system_obj(RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& oid, bufferlist& data, bool exclusive, RGWObjVersionTracker *objv_tracker, real_time set_mtime, map *pattrs) { map no_attrs; @@ -106,7 +106,6 @@ int rgw_put_system_obj(RGWRados *rgwstore, const rgw_pool& pool, const string& o rgw_raw_obj obj(pool, oid); - auto obj_ctx = rgwstore->svc.sysobj->init_obj_ctx(); auto sysobj = obj_ctx.get_obj(obj); int ret = sysobj.wop() .set_objv_tracker(objv_tracker) @@ -115,18 +114,6 @@ int rgw_put_system_obj(RGWRados *rgwstore, const rgw_pool& pool, const string& o .set_attrs(*pattrs) .write(data, null_yield); - if (ret == -ENOENT) { - ret = rgwstore->create_pool(pool); - if (ret >= 0) { - ret = sysobj.wop() - .set_objv_tracker(objv_tracker) - .set_exclusive(exclusive) - .set_mtime(set_mtime) - .set_attrs(*pattrs) - .write(data, null_yield); - } - } - return ret; } diff --git a/src/rgw/rgw_tools.h b/src/rgw/rgw_tools.h index d3a27f9d1ca..ea28f64aceb 100644 --- a/src/rgw/rgw_tools.h +++ b/src/rgw/rgw_tools.h @@ -22,7 +22,7 @@ int rgw_init_ioctx(librados::Rados *rados, const rgw_pool& pool, bool create = false, bool mostly_omap = false); -int rgw_put_system_obj(RGWRados *rgwstore, const rgw_pool& pool, const string& oid, bufferlist& data, bool exclusive, +int rgw_put_system_obj(RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& oid, bufferlist& data, bool exclusive, RGWObjVersionTracker *objv_tracker, real_time set_mtime, map *pattrs = NULL); int rgw_get_system_obj(RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& key, bufferlist& bl, RGWObjVersionTracker *objv_tracker, real_time *pmtime, optional_yield y, map *pattrs = NULL, diff --git a/src/rgw/rgw_zone.cc b/src/rgw/rgw_zone.cc index 27a2ef5d620..ac3b30d2b79 100644 --- a/src/rgw/rgw_zone.cc +++ b/src/rgw/rgw_zone.cc @@ -1532,7 +1532,6 @@ int get_zones_pool_set(CephContext* cct, } if (zone.get_id() != my_zone_id) { pool_names.insert(zone.domain_root); - pool_names.insert(zone.metadata_heap); pool_names.insert(zone.control_pool); pool_names.insert(zone.gc_pool); pool_names.insert(zone.log_pool); @@ -1605,9 +1604,6 @@ int RGWZoneParams::fix_pool_names() } domain_root = fix_zone_pool_dup(pools, name, ".rgw.meta:root", domain_root); - if (!metadata_heap.name.empty()) { - metadata_heap = fix_zone_pool_dup(pools, name, ".rgw.meta:heap", metadata_heap); - } control_pool = fix_zone_pool_dup(pools, name, ".rgw.control", control_pool); gc_pool = fix_zone_pool_dup(pools, name ,".rgw.log:gc", gc_pool); lc_pool = fix_zone_pool_dup(pools, name ,".rgw.log:lc", lc_pool); diff --git a/src/rgw/rgw_zone.h b/src/rgw/rgw_zone.h index 14bd53b2e6e..d825c889eee 100644 --- a/src/rgw/rgw_zone.h +++ b/src/rgw/rgw_zone.h @@ -350,7 +350,6 @@ WRITE_CLASS_ENCODER(RGWZonePlacementInfo) struct RGWZoneParams : RGWSystemMetaObj { rgw_pool domain_root; - rgw_pool metadata_heap; rgw_pool control_pool; rgw_pool gc_pool; rgw_pool lc_pool; @@ -412,7 +411,8 @@ struct RGWZoneParams : RGWSystemMetaObj { RGWSystemMetaObj::encode(bl); encode(system_key, bl); encode(placement_pools, bl); - encode(metadata_heap, bl); + rgw_pool unused_metadata_heap; + encode(unused_metadata_heap, bl); encode(realm_id, bl); encode(lc_pool, bl); map old_tier_config; @@ -446,8 +446,10 @@ struct RGWZoneParams : RGWSystemMetaObj { decode(system_key, bl); if (struct_v >= 4) decode(placement_pools, bl); - if (struct_v >= 5) - decode(metadata_heap, bl); + if (struct_v >= 5) { + rgw_pool unused_metadata_heap; + decode(unused_metadata_heap, bl); + } if (struct_v >= 6) { decode(realm_id, bl); } diff --git a/src/rgw/services/svc_mdlog.cc b/src/rgw/services/svc_mdlog.cc new file mode 100644 index 00000000000..98b88a98f5e --- /dev/null +++ b/src/rgw/services/svc_mdlog.cc @@ -0,0 +1,340 @@ +#include "svc_mdlog.h" +#include "svc_zone.h" + +#include "rgw/rgw_tools.h" +#include "rgw/rgw_mdlog.h" +#include "rgw/rgw_coroutine.h" + +int RGWSI_MDLog::read_history(RGWRados *store, RGWMetadataLogHistory *state, + RGWObjVersionTracker *objv_tracker) +{ + auto obj_ctx = sysobj_svc->init_obj_ctx(); + auto& pool = zone_svc->get_zone_params().log_pool; + const auto& oid = RGWMetadataLogHistory::oid; + bufferlist bl; + int ret = rgw_get_system_obj(obj_ctx, pool, oid, bl, objv_tracker, nullptr, null_yield); + if (ret < 0) { + return ret; + } + if (bl.length() == 0) { + /* bad history object, remove it */ + rgw_raw_obj obj(pool, oid); + auto sysobj = obj_ctx.get_obj(obj); + ret = sysobj.wop().remove(null_yield); + if (ret < 0) { + ldout(cct, 0) << "ERROR: meta history is empty, but cannot remove it (" << cpp_strerror(-ret) << ")" << dendl; + return ret; + } + return -ENOENT; + } + try { + auto p = bl.cbegin(); + state->decode(p); + } catch (buffer::error& e) { + ldout(cct, 1) << "failed to decode the mdlog history: " + << e.what() << dendl; + return -EIO; + } + return 0; +} + +int RGWSI_MDLog::write_history(const RGWMetadataLogHistory& state, + RGWObjVersionTracker *objv_tracker, + bool exclusive) +{ + bufferlist bl; + state.encode(bl); + + auto& pool = zone_svc->get_zone_params().log_pool; + const auto& oid = RGWMetadataLogHistory::oid; + return rgw_put_system_obj(sysobj_svc, pool, oid, bl, + exclusive, objv_tracker, real_time{}); +} + +namespace { + +using Cursor = RGWPeriodHistory::Cursor; + +/// read the mdlog history and use it to initialize the given cursor +class ReadHistoryCR : public RGWCoroutine { + RGWSI_Zone *zone_svc; + RGWSI_SysObj *sysobj_svc; + Cursor *cursor; + RGWObjVersionTracker *objv_tracker; + RGWMetadataLogHistory state; + public: + ReadHistoryCR(RGWSI_Zone *zone_svc, + RGWSI_SysObj *sysobj_svc, + Cursor *cursor, + RGWObjVersionTracker *objv_tracker) + : RGWCoroutine(zone_svc->ctx()), zone_svc(zone_svc), + sysobj_svc(sysobj_svc), + cursor(cursor), + objv_tracker(objv_tracker) + {} + + int operate() { + reenter(this) { + yield { + rgw_raw_obj obj{zone_svc->get_zone_params().log_pool, + RGWMetadataLogHistory::oid}; + constexpr bool empty_on_enoent = false; + + using ReadCR = RGWSimpleRadosReadCR; + call(new ReadCR(store->get_async_rados(), sysobj_svc, obj, + &state, empty_on_enoent, objv_tracker)); + } + if (retcode < 0) { + ldout(cct, 1) << "failed to read mdlog history: " + << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } + *cursor = store->period_history->lookup(state.oldest_realm_epoch); + if (!*cursor) { + return set_cr_error(cursor->get_error()); + } + + ldout(cct, 10) << "read mdlog history with oldest period id=" + << state.oldest_period_id << " realm_epoch=" + << state.oldest_realm_epoch << dendl; + return set_cr_done(); + } + return 0; + } +}; + +/// write the given cursor to the mdlog history +class WriteHistoryCR : public RGWCoroutine { + RGWSI_Zone *zone_svc; + RGWSI_SysObj *sysobj_svc; + Cursor cursor; + RGWObjVersionTracker *objv; + RGWMetadataLogHistory state; + public: + WriteHistoryCR(RGWSI_Zone *zone_svc, RGWSI_SysObj *sysobj_svc, + const Cursor& cursor, + RGWObjVersionTracker *objv) + : RGWCoroutine(zone_svc->ctx()), zone_svc(zone_svc), cursor(cursor), objv(objv) + {} + + int operate() { + reenter(this) { + state.oldest_period_id = cursor.get_period().get_id(); + state.oldest_realm_epoch = cursor.get_epoch(); + + yield { + rgw_raw_obj obj{zone_svc->get_zone_params().log_pool, + RGWMetadataLogHistory::oid}; + + using WriteCR = RGWSimpleRadosWriteCR; + call(new WriteCR(store->get_async_rados(), sysobj_svc, obj, state, objv)); + } + if (retcode < 0) { + ldout(cct, 1) << "failed to write mdlog history: " + << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } + + ldout(cct, 10) << "wrote mdlog history with oldest period id=" + << state.oldest_period_id << " realm_epoch=" + << state.oldest_realm_epoch << dendl; + return set_cr_done(); + } + return 0; + } +}; + +/// update the mdlog history to reflect trimmed logs +class TrimHistoryCR : public RGWCoroutine { + RGWRados *store; + const Cursor cursor; //< cursor to trimmed period + RGWObjVersionTracker *objv; //< to prevent racing updates + Cursor next; //< target cursor for oldest log period + Cursor existing; //< existing cursor read from disk + + public: + TrimHistoryCR(RGWRados *store, Cursor cursor, RGWObjVersionTracker *objv) + : RGWCoroutine(store->ctx()), + store(store), cursor(cursor), objv(objv), next(cursor) + { + next.next(); // advance past cursor + } + + int operate() { + reenter(this) { + // read an existing history, and write the new history if it's newer + yield call(new ReadHistoryCR(store, &existing, objv)); + if (retcode < 0) { + return set_cr_error(retcode); + } + // reject older trims with ECANCELED + if (cursor.get_epoch() < existing.get_epoch()) { + ldout(cct, 4) << "found oldest log epoch=" << existing.get_epoch() + << ", rejecting trim at epoch=" << cursor.get_epoch() << dendl; + return set_cr_error(-ECANCELED); + } + // overwrite with updated history + yield call(new WriteHistoryCR(store, next, objv)); + if (retcode < 0) { + return set_cr_error(retcode); + } + return set_cr_done(); + } + return 0; + } +}; + +// traverse all the way back to the beginning of the period history, and +// return a cursor to the first period in a fully attached history +Cursor find_oldest_period(RGWRados *store) +{ + auto cct = store->ctx(); + auto cursor = store->period_history->get_current(); + + while (cursor) { + // advance to the period's predecessor + if (!cursor.has_prev()) { + auto& predecessor = cursor.get_period().get_predecessor(); + if (predecessor.empty()) { + // this is the first period, so our logs must start here + ldout(cct, 10) << "find_oldest_period returning first " + "period " << cursor.get_period().get_id() << dendl; + return cursor; + } + // pull the predecessor and add it to our history + RGWPeriod period; + int r = store->period_puller->pull(predecessor, period); + if (r < 0) { + return Cursor{r}; + } + auto prev = store->period_history->insert(std::move(period)); + if (!prev) { + return prev; + } + ldout(cct, 20) << "find_oldest_period advancing to " + "predecessor period " << predecessor << dendl; + ceph_assert(cursor.has_prev()); + } + cursor.prev(); + } + ldout(cct, 10) << "find_oldest_period returning empty cursor" << dendl; + return cursor; +} + +} // anonymous namespace + +Cursor RGWMetadataManager::init_oldest_log_period() +{ + // read the mdlog history + RGWMetadataLogHistory state; + RGWObjVersionTracker objv; + int ret = read_history(store, &state, &objv); + + if (ret == -ENOENT) { + // initialize the mdlog history and write it + ldout(cct, 10) << "initializing mdlog history" << dendl; + auto cursor = find_oldest_period(store); + if (!cursor) { + return cursor; + } + + // write the initial history + state.oldest_realm_epoch = cursor.get_epoch(); + state.oldest_period_id = cursor.get_period().get_id(); + + constexpr bool exclusive = true; // don't overwrite + int ret = write_history(store, state, &objv, exclusive); + if (ret < 0 && ret != -EEXIST) { + ldout(cct, 1) << "failed to write mdlog history: " + << cpp_strerror(ret) << dendl; + return Cursor{ret}; + } + return cursor; + } else if (ret < 0) { + ldout(cct, 1) << "failed to read mdlog history: " + << cpp_strerror(ret) << dendl; + return Cursor{ret}; + } + + // if it's already in the history, return it + auto cursor = store->period_history->lookup(state.oldest_realm_epoch); + if (cursor) { + return cursor; + } + // pull the oldest period by id + RGWPeriod period; + ret = store->period_puller->pull(state.oldest_period_id, period); + if (ret < 0) { + ldout(cct, 1) << "failed to read period id=" << state.oldest_period_id + << " for mdlog history: " << cpp_strerror(ret) << dendl; + return Cursor{ret}; + } + // verify its realm_epoch + if (period.get_realm_epoch() != state.oldest_realm_epoch) { + ldout(cct, 1) << "inconsistent mdlog history: read period id=" + << period.get_id() << " with realm_epoch=" << period.get_realm_epoch() + << ", expected realm_epoch=" << state.oldest_realm_epoch << dendl; + return Cursor{-EINVAL}; + } + // attach the period to our history + return store->period_history->attach(std::move(period)); +} + +Cursor RGWMetadataManager::read_oldest_log_period() const +{ + RGWMetadataLogHistory state; + int ret = read_history(store, &state, nullptr); + if (ret < 0) { + ldout(store->ctx(), 1) << "failed to read mdlog history: " + << cpp_strerror(ret) << dendl; + return Cursor{ret}; + } + + ldout(store->ctx(), 10) << "read mdlog history with oldest period id=" + << state.oldest_period_id << " realm_epoch=" + << state.oldest_realm_epoch << dendl; + + return store->period_history->lookup(state.oldest_realm_epoch); +} + +RGWCoroutine* RGWMetadataManager::read_oldest_log_period_cr(Cursor *period, + RGWObjVersionTracker *objv) const +{ + return new ReadHistoryCR(store, period, objv); +} + +RGWCoroutine* RGWMetadataManager::trim_log_period_cr(Cursor period, + RGWObjVersionTracker *objv) const +{ + return new TrimHistoryCR(store, period, objv); +} + +RGWMetadataLog* RGWMetadataManager::get_log(const std::string& period) +{ + // construct the period's log in place if it doesn't exist + auto insert = md_logs.emplace(std::piecewise_construct, + std::forward_as_tuple(period), + std::forward_as_tuple(cct, store, period)); + return &insert.first->second; +} + +int init(RGWSI_Zone *_zone_svc, RGWSI_SysObj *_sysobj_svc, + const std::string& current_period) +{ + zone_svc = _zone_svc; + sysobj_svc = _sysobj_svc; + current_log = get_log(current_period); + + period_puller.reset(new RGWPeriodPuller(this)); + period_history.reset(new RGWPeriodHistory(cct, period_puller.get(), + zone_svc->get_current_period())); + + return 0; +} +} + +int RGWSI_MDLog::add_entry(RGWSI_MetaBacked::Module *module, const string& section, const string& key, bufferlist& bl) +{ + ceph_assert(current_log); // must have called init() + return current_log->add_entry(module, section, key, logbl); +} diff --git a/src/rgw/services/svc_mdlog.h b/src/rgw/services/svc_mdlog.h new file mode 100644 index 00000000000..7cbbf5c9be9 --- /dev/null +++ b/src/rgw/services/svc_mdlog.h @@ -0,0 +1,87 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2019 Red Hat, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#pragma once + +#include "common/static_ptr.h" + +#include "rgw/rgw_service.h" +#include "rgw/rgw_period_history.h" +#include "rgw/rgw_period_puller.h" + +#include "svc_meta_be.h" + + +class RGWMetadataLog; +class RGWMetadataLogHistory; +class RGWCoroutine; + +class RGWSI_Zone; +class RGWSI_SysObj; + + +class RGWSI_MDLog : public RGWServiceInstance +{ + RGWSI_Zone *zone_svc{nullptr}; + RGWSI_SysObj *sysobj_svc{nullptr}; + + // maintain a separate metadata log for each period + std::map md_logs; + + // use the current period's log for mutating operations + RGWMetadataLog* current_log{nullptr}; + + /// find or create the metadata log for the given period + RGWMetadataLog* get_log(const std::string& period); + + // pulls missing periods for period_history + std::unique_ptr period_puller; + // maintains a connected history of periods + std::unique_ptr period_history; + +public: + RGWSI_MDLog(CephContext *cct) : RGWServiceInstance(cct) {} + virtual ~RGWSI_MDLog() {} + + /// initialize the oldest log period if it doesn't exist, and attach it to + /// our current history + RGWPeriodHistory::Cursor init_oldest_log_period(); + + /// read the oldest log period, and return a cursor to it in our existing + /// period history + RGWPeriodHistory::Cursor read_oldest_log_period() const; + + /// read the oldest log period asynchronously and write its result to the + /// given cursor pointer + RGWCoroutine* read_oldest_log_period_cr(RGWPeriodHistory::Cursor *period, + RGWObjVersionTracker *objv) const; + + /// try to advance the oldest log period when the given period is trimmed, + /// using a rados lock to provide atomicity + RGWCoroutine* trim_log_period_cr(RGWPeriodHistory::Cursor period, + RGWObjVersionTracker *objv) const; + + int init(RGWSI_Zone *_zone_svc, RGWSI_SysObj *_sysobj_svc, + const std::string& current_period); + + int read_history(RGWMetadataLogHistory *state, RGWObjVersionTracker *objv_tracker); + int write_history(const RGWMetadataLogHistory& state, + RGWObjVersionTracker *objv_tracker, + bool exclusive = false); + + int add_entry(RGWSI_MetaBackend::Module *module, const string& section, const string& key, bufferlist& bl); +}; + diff --git a/src/rgw/services/svc_meta_be.cc b/src/rgw/services/svc_meta_be.cc new file mode 100644 index 00000000000..c4d40136cb3 --- /dev/null +++ b/src/rgw/services/svc_meta_be.cc @@ -0,0 +1,177 @@ + + +#include "svc_meta_be.h" +#include "svc_mdlog.h" + +#include "rgw/rgw_mdlog.h" + +#define dout_subsys ceph_subsys_rgw + + +RGWSI_MetaBackend::Context::~Context() {} // needed, even though destructor is pure virtual +RGWSI_MetaBackend::Module::~Module() {} // needed, even though destructor is pure virtual + +int RGWSI_MetaBackend::pre_modify(RGWSI_MetaBackend::Context *ctx, + RGWMetadataLogData& log_data, + RGWObjVersionTracker *objv_tracker, + RGWMDLogStatus op_type) +{ + /* if write version has not been set, and there's a read version, set it so that we can + * log it + */ + if (objv_tracker) { + if (objv_tracker->read_version.ver && !objv_tracker->write_version.ver) { + objv_tracker->write_version = objv_tracker->read_version; + objv_tracker->write_version.ver++; + } + log_data.read_version = objv_tracker->read_version; + log_data.write_version = objv_tracker->write_version; + } + + log_data.status = op_type; + + bufferlist logbl; + encode(log_data, logbl); + + int ret = mdlog_svc->add_entry(ctx->module, ctx->section, ctx->key, logbl); + if (ret < 0) + return ret; + + return 0; +} + +int RGWSI_MetaBackend::post_modify(RGWSI_MetaBackend::Context *ctx, + RGWMetadataLogData& log_data, + RGWObjVersionTracker *objv_tracker, int ret) +{ + if (ret >= 0) + log_data.status = MDLOG_STATUS_COMPLETE; + else + log_data.status = MDLOG_STATUS_ABORT; + + bufferlist logbl; + encode(log_data, logbl); + + int r = mdlog_svc->add_entry(ctx->module, ctx->section, ctx->key, logbl); + if (ret < 0) + return ret; + + if (r < 0) + return r; + + return 0; +} + +int RGWSI_MetaBackend::prepare_mutate(RGWSI_MetaBackend::Context *ctx, + const real_time& mtime, + RGWObjVersionTracker *objv_tracker, + RGWMDLogSyncType sync_mode) +{ + bufferlist bl; + real_time orig_mtime; + int ret = get_entry(ctx, &bl, objv_tracker, &orig_mtime, + nullptr, nullptr, boost::none); + if (ret < 0 && ret != -ENOENT) { + return ret; + } + if (ret != -ENOENT && + !RGWMetadataHandler::check_versions(objv_tracker->read_version, orig_mtime, + objv_tracker->write_version, mtime, sync_mode)) { + return STATUS_NO_APPLY; + } + + if (objv_tracker->write_version.tag.empty()) { + if (objv_tracker->read_version.tag.empty()) { + objv_tracker->generate_new_write_ver(cct); + } else { + objv_tracker->write_version = objv_tracker->read_version; + objv_tracker->write_version.ver++; + } + } + return 0; +} + +int RGWSI_MetaBackend::mutate(RGWSI_MetaBackend::Context *ctx, + const ceph::real_time& mtime, + RGWObjVersionTracker *objv_tracker, + RGWMDLogStatus op_type, + RGWMDLogSyncType sync_mode, + std::function f, + bool generic_prepare) +{ + int ret; + + if (generic_prepare) { + ret = prepare_mutate(ctx, mtime, objv_tracker, sync_mode); + if (ret < 0 || + ret == STATUS_NO_APPLY) { + return ret; + } + } + + RGWMetadataLogData log_data; + ret = pre_modify(ctx, log_data, objv_tracker, op_type); + if (ret < 0) { + return ret; + } + + ret = f(); + + /* cascading ret into post_modify() */ + + ret = post_modify(ctx, log_data, objv_tracker, ret); + if (ret < 0) + return ret; + + return 0; +} + +int RGWSI_MetaBackend::get(Context *ctx, + bufferlist *pbl, + RGWObjVersionTracker *objv_tracker, + real_time *pmtime, + map *pattrs, + rgw_cache_entry_info *cache_info, + boost::optional refresh_version) +{ + return get_entry(ctx, pbl, + objv_tracker, pmtime, + pattrs, + cache_info, + refresh_version); +} + +int RGWSI_MetaBackend::put(Context *ctx, + bufferlist& bl, + bool exclusive, + RGWObjVersionTracker *objv_tracker, + const ceph::real_time& mtime, + map *pattrs, + RGWMDLogSyncType sync_mode) +{ + std::function f = [&]() { + return put_entry(ctx, bl, + exclusive, objv_tracker, + mtime, pattrs); + }; + + return mutate(ctx, mtime, objv_tracker, + MDLOG_STATUS_WRITE, sync_mode, + f, + false); +} + +int RGWSI_MetaBackend::remove(Context *ctx, + RGWObjVersionTracker *objv_tracker, + const ceph::real_time& mtime, + RGWMDLogSyncType sync_mode) +{ + std::function f = [&]() { + return remove_entry(ctx, objv_tracker); + }; + + return mutate(ctx, mtime, objv_tracker, + MDLOG_STATUS_REMOVE, sync_mode, + f, + false); +} diff --git a/src/rgw/services/svc_meta_be.h b/src/rgw/services/svc_meta_be.h new file mode 100644 index 00000000000..c2f36c32f9b --- /dev/null +++ b/src/rgw/services/svc_meta_be.h @@ -0,0 +1,146 @@ + +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2019 Red Hat, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#pragma once + +#include "rgw/rgw_service.h" +#include "rgw/rgw_mdlog_types.h" + +class RGWMetadataHandler; +class RGWMetadataLogData; + +class RGWSI_MDLog; +class RGWSI_Meta; + +typedef void *RGWSI_MetaBackend_Handle; + +class RGWSI_MetaBackend : public RGWServiceInstance +{ + friend class RGWSI_Meta; +public: + class Module; + class Context; +protected: + map handlers; + + RGWSI_MDLog *mdlog_svc{nullptr}; + + int find_handler(const string& metadata_key, RGWMetadataHandler **handler, string& entry); + + void base_init(RGWSI_MDLog *_mdlog_svc) { + mdlog_svc = _mdlog_svc; + } + + virtual int init_handler(RGWMetadataHandler *handler, RGWSI_MetaBackend_Handle *phandle) { return 0; } + + int prepare_mutate(RGWSI_MetaBackend::Context *ctx, + const real_time& mtime, + RGWObjVersionTracker *objv_tracker, + RGWMDLogSyncType sync_mode); + + virtual int mutate(Context *ctx, + const ceph::real_time& mtime, RGWObjVersionTracker *objv_tracker, + RGWMDLogStatus op_type, + RGWMDLogSyncType sync_mode, + std::function f, + bool generic_prepare); + + virtual int pre_modify(Context *ctx, + RGWMetadataLogData& log_data, + RGWObjVersionTracker *objv_tracker, + RGWMDLogStatus op_type); + virtual int post_modify(Context *ctx, + RGWMetadataLogData& log_data, + RGWObjVersionTracker *objv_tracker, int ret); +public: + class Module { + /* + * Backend specialization module + */ + public: + virtual ~Module() = 0; + /* key to use for hashing entries for log shard placement */ + virtual void get_hash_key(const string& section, const string& key, string& hash_key) { + hash_key = section + ":" + key; + } + }; + + using ModuleRef = std::shared_ptr; + + struct Context { /* + * A single metadata operation context. Will be holding info about + * backend and operation itself; operation might span multiple backend + * calls. + */ + virtual ~Context() = 0; + + RGWSI_MetaBackend_Handle handle; + Module *module{nullptr}; + std::string section; + std::string key; + }; + + enum Type { + MDBE_SOBJ = 0, + }; + + RGWSI_MetaBackend(CephContext *cct) : RGWServiceInstance(cct) {} + virtual ~RGWSI_MetaBackend() {} + + virtual Type get_type() = 0; + + virtual void init_ctx(RGWSI_MetaBackend_Handle handle, const string& key, Context *ctx) = 0; + + /* these should be implemented by backends */ + virtual int get_entry(Context *ctx, + bufferlist *pbl, + RGWObjVersionTracker *objv_tracker, + real_time *pmtime, + map *pattrs = nullptr, + rgw_cache_entry_info *cache_info = nullptr, + boost::optional refresh_version = boost::none) = 0; + virtual int put_entry(Context *ctx, + bufferlist& bl, + bool exclusive, + RGWObjVersionTracker *objv_tracker, + real_time mtime, map *pattrs = nullptr) = 0; + virtual int remove_entry(Context *ctx, + RGWObjVersionTracker *objv_tracker) = 0; + + /* these should be called by handlers */ + virtual int get(Context *ctx, + bufferlist *pbl, + RGWObjVersionTracker *objv_tracker, + real_time *pmtime, + map *pattrs = nullptr, + rgw_cache_entry_info *cache_info = nullptr, + boost::optional refresh_version = boost::none) = 0; + + virtual int put(Context *ctx, + bufferlist& bl, + bool exclusive, + RGWObjVersionTracker *objv_tracker, + const ceph::real_time& mtime, + map *pattrs, + RGWMDLogSyncType sync_mode); + + virtual int remove(Context *ctx, + RGWObjVersionTracker *objv_tracker, + const ceph::real_time& mtime, + RGWMDLogSyncType sync_mode); +}; + diff --git a/src/rgw/services/svc_meta_be_sobj.cc b/src/rgw/services/svc_meta_be_sobj.cc new file mode 100644 index 00000000000..e5df7fc621c --- /dev/null +++ b/src/rgw/services/svc_meta_be_sobj.cc @@ -0,0 +1,88 @@ + +#include "svc_meta_be_sobj.h" + +#include "rgw/rgw_tools.h" +#include "rgw/rgw_metadata.h" +#include "rgw/rgw_mdlog.h" + +struct rgwsi_meta_be_sobj_handler_info { + RGWSI_MetaBackend::ModuleRef _module; + RGWSI_MBSObj_Handler_Module *module; + string section; +}; + +RGWSI_MetaBackend_SObj::RGWSI_MetaBackend_SObj(CephContext *cct) : RGWSI_MetaBackend(cct) { +} + +RGWSI_MetaBackend_SObj::~RGWSI_MetaBackend_SObj() { +} + +int RGWSI_MetaBackend_SObj::init_handler(RGWMetadataHandler *handler, RGWSI_MetaBackend_Handle *phandle) +{ + const auto& section = handler->get_type(); + + auto& info = handlers[handler->get_type()]; + info.section = section; + + info._module = handler->get_backend_module(get_type()); + info.module = static_cast(info._module.get()); + + *phandle = (RGWSI_MetaBackend_Handle)(&info); + + return 0; +} + +void RGWSI_MetaBackend_SObj::init_ctx(RGWSI_MetaBackend_Handle handle, const string& key, RGWSI_MetaBackend::Context *_ctx) +{ + RGWSI_MetaBackend_SObj::Context_SObj *ctx = static_cast(_ctx); + rgwsi_meta_be_sobj_handler_info *h = static_cast(ctx->handle); + + ctx->handle = handle; + ctx->module = h->module; + ctx->section = h->section; + ctx->key = key; + ctx->obj_ctx.emplace(sysobj_svc->init_obj_ctx()); + static_cast(ctx->module)->get_pool_and_oid(key, ctx->pool, ctx->oid); +} + +int RGWSI_MetaBackend_SObj::get_entry(RGWSI_MetaBackend::Context *_ctx, + bufferlist *pbl, + RGWObjVersionTracker *objv_tracker, + real_time *pmtime, + map *pattrs, + rgw_cache_entry_info *cache_info, + boost::optional refresh_version) +{ + RGWSI_MetaBackend_SObj::Context_SObj *ctx = static_cast(_ctx); + + return rgw_get_system_obj(*ctx->obj_ctx, ctx->pool, ctx->oid, *pbl, + objv_tracker, pmtime, + null_yield, + pattrs, cache_info, + refresh_version); +} + +int RGWSI_MetaBackend_SObj::put_entry(RGWSI_MetaBackend::Context *_ctx, + bufferlist& bl, bool exclusive, + RGWObjVersionTracker *objv_tracker, + real_time mtime, map *pattrs) +{ + RGWSI_MetaBackend_SObj::Context_SObj *ctx = static_cast(_ctx); + + return rgw_put_system_obj(*ctx->obj_ctx, ctx->pool, ctx->oid, bl, exclusive, + objv_tracker, mtime, pattrs); +} + +int RGWSI_MetaBackend_SObj::remove_entry(RGWSI_MetaBackend::Context *_ctx, + RGWObjVersionTracker *objv_tracker) +{ + RGWSI_MetaBackend_SObj::Context_SObj *ctx = static_cast(_ctx); + + rgw_raw_obj k(ctx->pool, ctx->oid); + + auto sysobj = ctx->obj_ctx->get_obj(k); + return sysobj.wop() + .set_objv_tracker(objv_tracker) + .remove(null_yield); +} + diff --git a/src/rgw/services/svc_meta_be_sobj.h b/src/rgw/services/svc_meta_be_sobj.h new file mode 100644 index 00000000000..223b36f3450 --- /dev/null +++ b/src/rgw/services/svc_meta_be_sobj.h @@ -0,0 +1,81 @@ + + +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2019 Red Hat, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#pragma once + +#include "rgw/rgw_service.h" + +#include "svc_meta_be.h" +#include "svc_sys_obj.h" + + +struct rgwsi_meta_be_sobj_handler_info; + +class RGWSI_MBSObj_Handler_Module : public RGWSI_MetaBackend::Module { +public: + virtual void get_pool_and_oid(const string& key, rgw_pool& pool, string& oid) = 0; + virtual void key_to_oid(string& key) {} + virtual void oid_to_key(string& oid) {} +}; + + +class RGWSI_MetaBackend_SObj : public RGWSI_MetaBackend +{ + RGWSI_SysObj *sysobj_svc{nullptr}; + + map handlers; + +protected: + int init_handler(RGWMetadataHandler *handler, RGWSI_MetaBackend_Handle *phandle) override; + +public: + struct Context_SObj : public RGWSI_MetaBackend::Context { + std::optional obj_ctx; + rgw_pool pool; + string oid; + }; + + RGWSI_MetaBackend_SObj(CephContext *cct); + virtual ~RGWSI_MetaBackend_SObj(); + + RGWSI_MetaBackend::Type get_type() { + return MDBE_SOBJ; + } + + void init(RGWSI_SysObj *_sysobj_svc, + RGWSI_MDLog *_mdlog_svc) { + base_init(mdlog_svc); + sysobj_svc = _sysobj_svc; + } + + void init_ctx(RGWSI_MetaBackend_Handle handle, const string& key, RGWSI_MetaBackend::Context *ctx) override; + + virtual int get_entry(RGWSI_MetaBackend::Context *ctx, + bufferlist *pbl, + RGWObjVersionTracker *objv_tracker, + real_time *pmtime, + map *pattrs = nullptr, + rgw_cache_entry_info *cache_info = nullptr, + boost::optional refresh_version = boost::none) = 0; + virtual int put_entry(RGWSI_MetaBackend::Context *ctx, bufferlist& bl, bool exclusive, + RGWObjVersionTracker *objv_tracker, real_time mtime, map *pattrs = nullptr) = 0; + virtual int remove_entry(RGWSI_MetaBackend::Context *ctx, + RGWObjVersionTracker *objv_tracker) = 0; +}; + +