From: Daniel Gryniewicz Date: Tue, 15 Sep 2020 11:59:39 +0000 (-0400) Subject: Zipper - Assorted cleanups X-Git-Tag: v16.1.0~522^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=c590759da014dbcca7b73fe8c8765376a4e91185;p=ceph.git Zipper - Assorted cleanups - Move cluste stat into public header, allowing more zipper cleanup - Swift versioning - Implement a MPSerializer for Zipper. - Add Lifecycle APIs to Zipper. Signed-off-by: Daniel Gryniewicz --- diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index c85bf8564045..6829fe97befc 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -7224,7 +7224,7 @@ next: if (opt_cmd == OPT::LC_LIST) { formatter->open_array_section("lifecycle_list"); - vector bucket_lc_map; + vector bucket_lc_map; string marker; int index{0}; #define MAX_LC_LIST_ENTRIES 100 diff --git a/src/rgw/rgw_bucket.cc b/src/rgw/rgw_bucket.cc index 9234a66630c6..a4ee514b75e6 100644 --- a/src/rgw/rgw_bucket.cc +++ b/src/rgw/rgw_bucket.cc @@ -1819,7 +1819,8 @@ static int fix_single_bucket_lc(rgw::sal::RGWRadosStore *store, return ret; } - return rgw::lc::fix_lc_shard_entry(store, bucket_info, bucket_attrs); + return rgw::lc::fix_lc_shard_entry(store, store->get_rgwlc()->get_lc(), bucket_info, + bucket_attrs); } static void format_lc_status(Formatter* formatter, diff --git a/src/rgw/rgw_lc.cc b/src/rgw/rgw_lc.cc index c49e8c7e728d..26471a74642d 100644 --- a/src/rgw/rgw_lc.cc +++ b/src/rgw/rgw_lc.cc @@ -18,7 +18,6 @@ #include "common/containers.h" #include #include "include/random.h" -#include "cls/rgw/cls_rgw_client.h" #include "cls/lock/cls_lock_client.h" #include "rgw_perf_counters.h" #include "rgw_common.h" @@ -243,6 +242,7 @@ void *RGWLC::LCWorker::entry() { void RGWLC::initialize(CephContext *_cct, rgw::sal::RGWRadosStore *_store) { cct = _cct; store = _store; + sal_lc = std::move(store->get_lifecycle()); max_objs = cct->_conf->rgw_lc_max_objs; if (max_objs > HASH_PRIME) max_objs = HASH_PRIME; @@ -291,7 +291,7 @@ bool RGWLC::if_already_run_today(time_t start_date) return false; } -static inline std::ostream& operator<<(std::ostream &os, cls_rgw_lc_entry& ent) { +static inline std::ostream& operator<<(std::ostream &os, rgw::sal::Lifecycle::LCEntry& ent) { os << "getRados()->lc_pool_ctx, obj_names[index], - marker, MAX_LC_LIST_ENTRIES, entries); + int ret = sal_lc->list_entries(obj_names[index], marker, MAX_LC_LIST_ENTRIES, entries); if (ret < 0) return ret; for (auto& entry : entries) { entry.start_time = ceph_clock_now(); entry.status = lc_uninitial; // lc_uninitial? really? - ret = cls_rgw_lc_set_entry(store->getRados()->lc_pool_ctx, - obj_names[index], entry); + ret = sal_lc->set_entry(obj_names[index], entry); if (ret < 0) { ldpp_dout(this, 0) << "RGWLC::bucket_lc_prepare() failed to set entry on " @@ -370,17 +368,13 @@ static bool obj_has_expired(CephContext *cct, ceph::real_time mtime, int days, return (timediff >= cmp); } -static bool pass_object_lock_check(RGWRados *store, RGWBucketInfo& bucket_info, - rgw_obj& obj, RGWObjectCtx& ctx) +static bool pass_object_lock_check(rgw::sal::RGWStore* store, rgw::sal::RGWObject* obj, RGWObjectCtx& ctx) { - if (!bucket_info.obj_lock_enabled()) { + if (!obj->get_bucket()->get_info().obj_lock_enabled()) { return true; } - RGWRados::Object op_target(store, bucket_info, ctx, obj); - RGWRados::Object::Read read_op(&op_target); - map attrs; - read_op.params.attrs = &attrs; - int ret = read_op.prepare(null_yield); + std::unique_ptr read_op = obj->get_read_op(&ctx); + int ret = read_op->prepare(null_yield); if (ret < 0) { if (ret == -ENOENT) { return true; @@ -388,8 +382,8 @@ static bool pass_object_lock_check(RGWRados *store, RGWBucketInfo& bucket_info, return false; } } else { - auto iter = attrs.find(RGW_ATTR_OBJECT_RETENTION); - if (iter != attrs.end()) { + auto iter = obj->get_attrs().find(RGW_ATTR_OBJECT_RETENTION); + if (iter != obj->get_attrs().end()) { RGWObjectRetention retention; try { decode(retention, iter->second); @@ -403,8 +397,8 @@ static bool pass_object_lock_check(RGWRados *store, RGWBucketInfo& bucket_info, return false; } } - iter = attrs.find(RGW_ATTR_OBJECT_LEGAL_HOLD); - if (iter != attrs.end()) { + iter = obj->get_attrs().find(RGW_ATTR_OBJECT_LEGAL_HOLD); + if (iter != obj->get_attrs().end()) { RGWObjectLegalHold obj_legal_hold; try { decode(obj_legal_hold, iter->second); @@ -422,30 +416,26 @@ static bool pass_object_lock_check(RGWRados *store, RGWBucketInfo& bucket_info, } class LCObjsLister { - rgw::sal::RGWRadosStore *store; - RGWBucketInfo& bucket_info; - RGWRados::Bucket target; - RGWRados::Bucket::List list_op; - bool is_truncated{false}; - rgw_obj_key next_marker; + rgw::sal::RGWStore *store; + rgw::sal::RGWBucket* bucket; + rgw::sal::RGWBucket::ListParams list_params; + rgw::sal::RGWBucket::ListResults list_results; string prefix; - vector objs; vector::iterator obj_iter; rgw_bucket_dir_entry pre_obj; int64_t delay_ms; public: - LCObjsLister(rgw::sal::RGWRadosStore *_store, RGWBucketInfo& _bucket_info) : - store(_store), bucket_info(_bucket_info), - target(store->getRados(), bucket_info), list_op(&target) { - list_op.params.list_versions = bucket_info.versioned(); - list_op.params.allow_unordered = true; + LCObjsLister(rgw::sal::RGWStore *_store, rgw::sal::RGWBucket* _bucket) : + store(_store), bucket(_bucket) { + list_params.list_versions = bucket->versioned(); + list_params.allow_unordered = true; delay_ms = store->ctx()->_conf.get_val("rgw_lc_thread_delay"); } void set_prefix(const string& p) { prefix = p; - list_op.params.prefix = prefix; + list_params.prefix = prefix; } int init() { @@ -453,13 +443,12 @@ public: } int fetch() { - int ret = list_op.list_objects( - 1000, &objs, NULL, &is_truncated, null_yield); + int ret = bucket->list(list_params, 1000, list_results, null_yield); if (ret < 0) { return ret; } - obj_iter = objs.begin(); + obj_iter = list_results.objs.begin(); return 0; } @@ -471,13 +460,13 @@ public: bool get_obj(rgw_bucket_dir_entry **obj, std::function fetch_barrier = []() { /* nada */}) { - if (obj_iter == objs.end()) { - if (!is_truncated) { + if (obj_iter == list_results.objs.end()) { + if (!list_results.is_truncated) { delay(); return false; } else { fetch_barrier(); - list_op.params.marker = pre_obj.key; + list_params.marker = pre_obj.key; int ret = fetch(); if (ret < 0) { ldout(store->ctx(), 0) << "ERROR: list_op returned ret=" << ret @@ -489,7 +478,7 @@ public: } /* returning address of entry in objs */ *obj = &(*obj_iter); - return obj_iter != objs.end(); + return obj_iter != list_results.objs.end(); } rgw_bucket_dir_entry get_prev_obj() { @@ -502,8 +491,8 @@ public: } boost::optional next_key_name() { - if (obj_iter == objs.end() || - (obj_iter + 1) == objs.end()) { + if (obj_iter == list_results.objs.end() || + (obj_iter + 1) == list_results.objs.end()) { /* this should have been called after get_obj() was called, so this should * only happen if is_truncated is false */ return boost::none; @@ -521,12 +510,12 @@ struct op_env { lc_op op; rgw::sal::RGWRadosStore *store; LCWorker* worker; - RGWBucketInfo& bucket_info; + rgw::sal::RGWBucket* bucket; LCObjsLister& ol; op_env(lc_op& _op, rgw::sal::RGWRadosStore *_store, LCWorker* _worker, - RGWBucketInfo& _bucket_info, LCObjsLister& _ol) - : op(_op), store(_store), worker(_worker), bucket_info(_bucket_info), + rgw::sal::RGWBucket* _bucket, LCObjsLister& _ol) + : op(_op), store(_store), worker(_worker), bucket(_bucket), ol(_ol) {} }; /* op_env */ @@ -541,11 +530,11 @@ struct lc_op_ctx { ceph::real_time effective_mtime; rgw::sal::RGWRadosStore *store; - RGWBucketInfo& bucket_info; + rgw::sal::RGWBucket* bucket; lc_op& op; // ok--refers to expanded env.op LCObjsLister& ol; - rgw_obj obj; + std::unique_ptr obj; RGWObjectCtx rctx; const DoutPrefixProvider *dpp; WorkQ* wq; @@ -556,9 +545,11 @@ struct lc_op_ctx { const DoutPrefixProvider *dpp, WorkQ* wq) : cct(env.store->ctx()), env(env), o(o), next_key_name(next_key_name), effective_mtime(effective_mtime), - store(env.store), bucket_info(env.bucket_info), op(env.op), ol(env.ol), - obj(env.bucket_info.bucket, o.key), rctx(env.store), dpp(dpp), wq(wq) - {} + store(env.store), bucket(env.bucket), op(env.op), ol(env.ol), + rctx(env.store), dpp(dpp), wq(wq) + { + obj = bucket->get_object(o.key); + } bool next_has_same_name(const std::string& key_name) { return (next_key_name && key_name.compare( @@ -570,10 +561,12 @@ struct lc_op_ctx { static int remove_expired_obj(lc_op_ctx& oc, bool remove_indeed) { auto& store = oc.store; - auto& bucket_info = oc.bucket_info; + auto& bucket_info = oc.bucket->get_info(); auto& o = oc.o; auto obj_key = o.key; auto& meta = o.meta; + int ret; + std::string version_id; if (!remove_indeed) { obj_key.instance.clear(); @@ -581,20 +574,24 @@ static int remove_expired_obj(lc_op_ctx& oc, bool remove_indeed) obj_key.instance = "null"; } - rgw_obj obj(bucket_info.bucket, obj_key); + std::unique_ptr bucket; + std::unique_ptr obj; + + ret = store->get_bucket(nullptr, bucket_info, &bucket); + if (ret < 0) { + return ret; + } + + obj = bucket->get_object(obj_key); + ACLOwner obj_owner; obj_owner.set_id(rgw_user {meta.owner}); obj_owner.set_name(meta.owner_display_name); + ACLOwner bucket_owner; + bucket_owner.set_id(bucket_info.owner); - RGWRados::Object del_target(store->getRados(), bucket_info, oc.rctx, obj); - RGWRados::Object::Delete del_op(&del_target); - - del_op.params.bucket_owner = bucket_info.owner; - del_op.params.versioning_status = bucket_info.versioning_status(); - del_op.params.obj_owner = obj_owner; - del_op.params.unmod_since = meta.mtime; - - return del_op.delete_obj(null_yield); + return obj->delete_object(&oc.rctx, obj_owner, bucket_owner, meta.mtime, false, 0, + version_id, null_yield); } /* remove_expired_obj */ class LCOpAction { @@ -822,24 +819,23 @@ static inline bool worker_should_stop(time_t stop_at, bool once) return !once && stop_at < time(nullptr); } -int RGWLC::handle_multipart_expiration( - RGWRados::Bucket *target, const multimap& prefix_map, - LCWorker* worker, time_t stop_at, bool once) +int RGWLC::handle_multipart_expiration(rgw::sal::RGWBucket* target, + const multimap& prefix_map, + LCWorker* worker, time_t stop_at, bool once) { MultipartMetaFilter mp_filter; vector objs; - bool is_truncated; int ret; - RGWBucketInfo& bucket_info = target->get_bucket_info(); - RGWRados::Bucket::List list_op(target); + rgw::sal::RGWBucket::ListParams params; + rgw::sal::RGWBucket::ListResults results; auto delay_ms = cct->_conf.get_val("rgw_lc_thread_delay"); - list_op.params.list_versions = false; + params.list_versions = false; /* lifecycle processing does not depend on total order, so can * take advantage of unordered listing optimizations--such as * operating on one shard at a time */ - list_op.params.allow_unordered = true; - list_op.params.ns = RGW_OBJ_NS_MULTIPART; - list_op.params.filter = &mp_filter; + params.allow_unordered = true; + params.ns = RGW_OBJ_NS_MULTIPART; + params.filter = &mp_filter; auto pf = [&](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) { auto wt = boost::get>(wi); @@ -851,7 +847,7 @@ int RGWLC::handle_multipart_expiration( return; } RGWObjectCtx rctx(store); - int ret = abort_multipart_upload(store, cct, &rctx, bucket_info, mp_obj); + int ret = abort_multipart_upload(store, cct, &rctx, target->get_info(), mp_obj); if (ret == 0) { if (perfcounter) { perfcounter->inc(l_rgw_lc_abort_mpu, 1); @@ -889,11 +885,10 @@ int RGWLC::handle_multipart_expiration( if (!prefix_iter->second.status || prefix_iter->second.mp_expiration <= 0) { continue; } - list_op.params.prefix = prefix_iter->first; + params.prefix = prefix_iter->first; do { objs.clear(); - list_op.params.marker = list_op.get_next_marker(); - ret = list_op.list_objects(1000, &objs, NULL, &is_truncated, null_yield); + ret = target->list(params, 1000, results, null_yield); if (ret < 0) { if (ret == (-ENOENT)) return 0; @@ -911,20 +906,18 @@ int RGWLC::handle_multipart_expiration( } /* for objs */ std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); - } while(is_truncated); + } while(results.is_truncated); } /* for prefix_map */ worker->workpool->drain(); return 0; } -static int read_obj_tags(RGWRados *store, RGWBucketInfo& bucket_info, - rgw_obj& obj, RGWObjectCtx& ctx, bufferlist& tags_bl) +static int read_obj_tags(rgw::sal::RGWObject* obj, RGWObjectCtx& ctx, bufferlist& tags_bl) { - RGWRados::Object op_target(store, bucket_info, ctx, obj); - RGWRados::Object::Read read_op(&op_target); + std::unique_ptr rop = obj->get_read_op(&ctx); - return read_op.get_attr(RGW_ATTR_TAGS, tags_bl, null_yield); + return rop->get_attr(RGW_ATTR_TAGS, tags_bl, null_yield); } static bool is_valid_op(const lc_op& op) @@ -968,8 +961,7 @@ static int check_tags(lc_op_ctx& oc, bool *skip) *skip = true; bufferlist tags_bl; - int ret = read_obj_tags(oc.store->getRados(), oc.bucket_info, oc.obj, - oc.rctx, tags_bl); + int ret = read_obj_tags(oc.obj.get(), oc.rctx, tags_bl); if (ret < 0) { if (ret != -ENODATA) { ldout(oc.cct, 5) << "ERROR: read_obj_tags returned r=" @@ -1084,20 +1076,20 @@ public: r = remove_expired_obj(oc, true); if (r < 0) { ldout(oc.cct, 0) << "ERROR: current is-dm remove_expired_obj " - << oc.bucket_info.bucket << ":" << o.key + << oc.bucket << ":" << o.key << " " << cpp_strerror(r) << " " << oc.wq->thr_name() << dendl; return r; } ldout(oc.cct, 2) << "DELETED: current is-dm " - << oc.bucket_info.bucket << ":" << o.key + << oc.bucket << ":" << o.key << " " << oc.wq->thr_name() << dendl; } else { /* ! o.is_delete_marker() */ - r = remove_expired_obj(oc, !oc.bucket_info.versioned()); + r = remove_expired_obj(oc, !oc.bucket->versioned()); if (r < 0) { ldout(oc.cct, 0) << "ERROR: remove_expired_obj " - << oc.bucket_info.bucket << ":" << o.key + << oc.bucket << ":" << o.key << " " << cpp_strerror(r) << " " << oc.wq->thr_name() << dendl; return r; @@ -1105,7 +1097,7 @@ public: if (perfcounter) { perfcounter->inc(l_rgw_lc_expire_current, 1); } - ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key + ldout(oc.cct, 2) << "DELETED:" << oc.bucket << ":" << o.key << " " << oc.wq->thr_name() << dendl; } return 0; @@ -1136,8 +1128,7 @@ public: << oc.wq->thr_name() << dendl; return is_expired && - pass_object_lock_check(oc.store->getRados(), - oc.bucket_info, oc.obj, oc.rctx); + pass_object_lock_check(oc.store, oc.obj.get(), oc.rctx); } int process(lc_op_ctx& oc) { @@ -1145,7 +1136,7 @@ public: int r = remove_expired_obj(oc, true); if (r < 0) { ldout(oc.cct, 0) << "ERROR: remove_expired_obj (non-current expiration) " - << oc.bucket_info.bucket << ":" << o.key + << oc.bucket << ":" << o.key << " " << cpp_strerror(r) << " " << oc.wq->thr_name() << dendl; return r; @@ -1153,7 +1144,7 @@ public: if (perfcounter) { perfcounter->inc(l_rgw_lc_expire_noncurrent, 1); } - ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key + ldout(oc.cct, 2) << "DELETED:" << oc.bucket << ":" << o.key << " (non-current expiration) " << oc.wq->thr_name() << dendl; return 0; @@ -1189,7 +1180,7 @@ public: int r = remove_expired_obj(oc, true); if (r < 0) { ldout(oc.cct, 0) << "ERROR: remove_expired_obj (delete marker expiration) " - << oc.bucket_info.bucket << ":" << o.key + << oc.bucket << ":" << o.key << " " << cpp_strerror(r) << " " << oc.wq->thr_name() << dendl; @@ -1198,7 +1189,7 @@ public: if (perfcounter) { perfcounter->inc(l_rgw_lc_expire_dm, 1); } - ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key + ldout(oc.cct, 2) << "DELETED:" << oc.bucket << ":" << o.key << " (delete marker expiration) " << oc.wq->thr_name() << dendl; return 0; @@ -1262,33 +1253,30 @@ public: auto& o = oc.o; rgw_placement_rule target_placement; - target_placement.inherit_from(oc.bucket_info.placement_rule); + target_placement.inherit_from(oc.bucket->get_placement_rule()); target_placement.storage_class = transition.storage_class; if (!oc.store->svc()->zone->get_zone_params(). valid_placement(target_placement)) { ldpp_dout(oc.dpp, 0) << "ERROR: non existent dest placement: " << target_placement - << " bucket="<< oc.bucket_info.bucket + << " bucket="<< oc.bucket << " rule_id=" << oc.op.id << " " << oc.wq->thr_name() << dendl; return -EINVAL; } - rgw::sal::RGWRadosBucket bucket(oc.store, oc.bucket_info); - rgw::sal::RGWRadosObject obj(oc.store, oc.obj.key, &bucket); - int r = oc.store->getRados()->transition_obj( - oc.rctx, &bucket, obj, target_placement, o.meta.mtime, - o.versioned_epoch, oc.dpp, null_yield); + int r = oc.obj->transition(oc.rctx, oc.bucket, target_placement, o.meta.mtime, + o.versioned_epoch, oc.dpp, null_yield); if (r < 0) { ldpp_dout(oc.dpp, 0) << "ERROR: failed to transition obj " - << oc.bucket_info.bucket << ":" << o.key + << oc.bucket << ":" << o.key << " -> " << transition.storage_class << " " << cpp_strerror(r) << " " << oc.wq->thr_name() << dendl; return r; } - ldpp_dout(oc.dpp, 2) << "TRANSITIONED:" << oc.bucket_info.bucket + ldpp_dout(oc.dpp, 2) << "TRANSITIONED:" << oc.bucket << ":" << o.key << " -> " << transition.storage_class << " " << oc.wq->thr_name() << dendl; @@ -1427,12 +1415,12 @@ int LCOpRule::process(rgw_bucket_dir_entry& o, int r = (*selected)->process(ctx); if (r < 0) { ldpp_dout(dpp, 0) << "ERROR: remove_expired_obj " - << env.bucket_info.bucket << ":" << o.key + << env.bucket << ":" << o.key << " " << cpp_strerror(r) << " " << wq->thr_name() << dendl; return r; } - ldpp_dout(dpp, 20) << "processed:" << env.bucket_info.bucket << ":" + ldpp_dout(dpp, 20) << "processed:" << env.bucket << ":" << o.key << " " << wq->thr_name() << dendl; } @@ -1444,8 +1432,7 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker, time_t stop_at, bool once) { RGWLifecycleConfiguration config(cct); - RGWBucketInfo bucket_info; - map bucket_attrs; + std::unique_ptr bucket; string no_ns, list_versions; vector objs; vector result; @@ -1453,9 +1440,14 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker, string bucket_tenant = result[0]; string bucket_name = result[1]; string bucket_marker = result[2]; - int ret = store->getRados()->get_bucket_info( - store->svc(), bucket_tenant, bucket_name, bucket_info, NULL, null_yield, - &bucket_attrs); + int ret = store->get_bucket(nullptr, bucket_tenant, bucket_name, &bucket, null_yield); + if (ret < 0) { + ldpp_dout(this, 0) << "LC:get_bucket for " << bucket_name + << " failed" << dendl; + return ret; + } + + ret = bucket->get_bucket_info(null_yield); if (ret < 0) { ldpp_dout(this, 0) << "LC:get_bucket_info for " << bucket_name << " failed" << dendl; @@ -1469,18 +1461,16 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker, } ); - if (bucket_info.bucket.marker != bucket_marker) { + if (bucket->get_marker() != bucket_marker) { ldpp_dout(this, 1) << "LC: deleting stale entry found for bucket=" << bucket_tenant << ":" << bucket_name - << " cur_marker=" << bucket_info.bucket.marker + << " cur_marker=" << bucket->get_marker() << " orig_marker=" << bucket_marker << dendl; return -ENOENT; } - RGWRados::Bucket target(store->getRados(), bucket_info); - - map::iterator aiter = bucket_attrs.find(RGW_ATTR_LC); - if (aiter == bucket_attrs.end()) + map::iterator aiter = bucket->get_attrs().find(RGW_ATTR_LC); + if (aiter == bucket->get_attrs().end()) return 0; bufferlist::const_iterator iter{&aiter->second}; @@ -1541,7 +1531,7 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker, pre_marker = next_marker; } - LCObjsLister ol(store, bucket_info); + LCObjsLister ol(store, bucket.get()); ol.set_prefix(prefix_iter->first); ret = ol.init(); @@ -1552,7 +1542,7 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker, return ret; } - op_env oenv(op, store, worker, bucket_info, ol); + op_env oenv(op, store, worker, bucket.get(), ol); LCOpRule orule(oenv); orule.build(); // why can't ctor do it? rgw_bucket_dir_entry* o{nullptr}; @@ -1564,27 +1554,26 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker, worker->workpool->drain(); } - ret = handle_multipart_expiration(&target, prefix_map, worker, stop_at, once); + ret = handle_multipart_expiration(bucket.get(), prefix_map, worker, stop_at, once); return ret; } int RGWLC::bucket_lc_post(int index, int max_lock_sec, - cls_rgw_lc_entry& entry, int& result, + rgw::sal::Lifecycle::LCEntry& entry, int& result, LCWorker* worker) { utime_t lock_duration(cct->_conf->rgw_lc_lock_max_time, 0); - rados::cls::lock::Lock l(lc_index_lock_name); - l.set_cookie(cookie); - l.set_duration(lock_duration); + rgw::sal::LCSerializer* lock = sal_lc->get_serializer(lc_index_lock_name, + obj_names[index], + cookie); dout(5) << "RGWLC::bucket_lc_post(): POST " << entry << " index: " << index << " worker ix: " << worker->ix << dendl; do { - int ret = l.lock_exclusive( - &store->getRados()->lc_pool_ctx, obj_names[index]); + int ret = lock->try_lock(lock_duration, null_yield); if (ret == -EBUSY || ret == -EEXIST) { /* already locked by another lc processor */ ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to acquire lock on " @@ -1597,8 +1586,7 @@ int RGWLC::bucket_lc_post(int index, int max_lock_sec, ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() lock " << obj_names[index] << dendl; if (result == -ENOENT) { - ret = cls_rgw_lc_rm_entry(store->getRados()->lc_pool_ctx, - obj_names[index], entry); + ret = sal_lc->rm_entry(obj_names[index], entry); if (ret < 0) { ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to remove entry " << obj_names[index] << dendl; @@ -1610,14 +1598,14 @@ int RGWLC::bucket_lc_post(int index, int max_lock_sec, entry.status = lc_complete; } - ret = cls_rgw_lc_set_entry(store->getRados()->lc_pool_ctx, - obj_names[index], entry); + ret = sal_lc->set_entry(obj_names[index], entry); if (ret < 0) { ldpp_dout(this, 0) << "RGWLC::process() failed to set entry on " << obj_names[index] << dendl; } clean: - l.unlock(&store->getRados()->lc_pool_ctx, obj_names[index]); + lock->unlock(); + delete lock; ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() unlock " << obj_names[index] << dendl; return 0; @@ -1625,15 +1613,13 @@ clean: } int RGWLC::list_lc_progress(string& marker, uint32_t max_entries, - vector& progress_map, + vector& progress_map, int& index) { progress_map.clear(); for(; index < max_objs; index++, marker="") { - vector entries; - int ret = - cls_rgw_lc_list(store->getRados()->lc_pool_ctx, obj_names[index], marker, - max_entries, entries); + vector entries; + int ret = sal_lc->list_entries(obj_names[index], marker, max_entries, entries); if (ret < 0) { if (ret == -ENOENT) { ldpp_dout(this, 10) << __func__ << "() ignoring unfound lc object=" @@ -1718,19 +1704,19 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker, << "index: " << index << " worker ix: " << worker->ix << dendl; - rados::cls::lock::Lock l(lc_index_lock_name); + rgw::sal::LCSerializer* lock = sal_lc->get_serializer(lc_index_lock_name, + obj_names[index], + std::string()); do { utime_t now = ceph_clock_now(); //string = bucket_name:bucket_id, start_time, int = LC_BUCKET_STATUS - cls_rgw_lc_entry entry; + rgw::sal::Lifecycle::LCEntry entry; if (max_lock_secs <= 0) return -EAGAIN; utime_t time(max_lock_secs, 0); - l.set_duration(time); - int ret = l.lock_exclusive(&store->getRados()->lc_pool_ctx, - obj_names[index]); + int ret = lock->try_lock(time, null_yield); if (ret == -EBUSY || ret == -EEXIST) { /* already locked by another lc processor */ ldpp_dout(this, 0) << "RGWLC::process() failed to acquire lock on " @@ -1741,9 +1727,8 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker, if (ret < 0) return 0; - cls_rgw_lc_obj_head head; - ret = cls_rgw_lc_get_head(store->getRados()->lc_pool_ctx, obj_names[index], - head); + rgw::sal::Lifecycle::LCHead head; + ret = sal_lc->get_head(obj_names[index], head); if (ret < 0) { ldpp_dout(this, 0) << "RGWLC::process() failed to get obj head " << obj_names[index] << ", ret=" << ret << dendl; @@ -1751,8 +1736,7 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker, } if (! (cct->_conf->rgw_lc_lock_max_time == 9969)) { - ret = cls_rgw_lc_get_entry(store->getRados()->lc_pool_ctx, - obj_names[index], head.marker, entry); + ret = sal_lc->get_entry(obj_names[index], head.marker, entry); if (ret >= 0) { if (entry.status == lc_processing) { if (expired_session(entry.start_time)) { @@ -1784,8 +1768,7 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker, } } - ret = cls_rgw_lc_get_next_entry(store->getRados()->lc_pool_ctx, - obj_names[index], head.marker, entry); + ret = sal_lc->get_next_entry(obj_names[index], head.marker, entry); if (ret < 0) { ldpp_dout(this, 0) << "RGWLC::process() failed to get obj entry " << obj_names[index] << dendl; @@ -1801,8 +1784,7 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker, << dendl; entry.status = lc_processing; - ret = cls_rgw_lc_set_entry(store->getRados()->lc_pool_ctx, - obj_names[index], entry); + ret = sal_lc->set_entry(obj_names[index], entry); if (ret < 0) { ldpp_dout(this, 0) << "RGWLC::process() failed to set obj entry " << obj_names[index] << entry.bucket << entry.status << dendl; @@ -1810,8 +1792,7 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker, } head.marker = entry.bucket; - ret = cls_rgw_lc_put_head(store->getRados()->lc_pool_ctx, - obj_names[index], head); + ret = sal_lc->put_head(obj_names[index], head); if (ret < 0) { ldpp_dout(this, 0) << "RGWLC::process() failed to put head " << obj_names[index] @@ -1823,7 +1804,8 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker, << " index: " << index << " worker ix: " << worker->ix << dendl; - l.unlock(&store->getRados()->lc_pool_ctx, obj_names[index]); + lock->unlock(); + delete lock; ret = bucket_lc_process(entry.bucket, worker, thread_stop_at(), once); bucket_lc_post(index, max_lock_secs, entry, ret, worker); } while(1 && !once); @@ -1831,7 +1813,8 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker, return 0; exit: - l.unlock(&store->getRados()->lc_pool_ctx, obj_names[index]); + lock->unlock(); + delete lock; return 0; } @@ -1967,6 +1950,7 @@ static std::string get_lc_shard_name(const rgw_bucket& bucket){ template static int guard_lc_modify(rgw::sal::RGWRadosStore* store, + rgw::sal::Lifecycle* sal_lc, const rgw_bucket& bucket, const string& cookie, const F& f) { CephContext *cct = store->ctx(); @@ -1977,21 +1961,20 @@ static int guard_lc_modify(rgw::sal::RGWRadosStore* store, get_lc_oid(cct, shard_id, &oid); /* XXX it makes sense to take shard_id for a bucket_id? */ - cls_rgw_lc_entry entry; + rgw::sal::Lifecycle::LCEntry entry; entry.bucket = shard_id; entry.status = lc_uninitial; int max_lock_secs = cct->_conf->rgw_lc_lock_max_time; - rados::cls::lock::Lock l(lc_index_lock_name); + rgw::sal::LCSerializer* lock = sal_lc->get_serializer(lc_index_lock_name, + oid, + cookie); utime_t time(max_lock_secs, 0); - l.set_duration(time); - l.set_cookie(cookie); - librados::IoCtx *ctx = store->getRados()->get_lc_pool_ctx(); int ret; do { - ret = l.lock_exclusive(ctx, oid); + ret = lock->try_lock(time, null_yield); if (ret == -EBUSY || ret == -EEXIST) { ldout(cct, 0) << "RGWLC::RGWPutLC() failed to acquire lock on " << oid << ", sleep 5, try again" << dendl; @@ -2003,14 +1986,15 @@ static int guard_lc_modify(rgw::sal::RGWRadosStore* store, << oid << ", ret=" << ret << dendl; break; } - ret = f(ctx, oid, entry); + ret = f(sal_lc, oid, entry); if (ret < 0) { ldout(cct, 0) << "RGWLC::RGWPutLC() failed to set entry on " << oid << ", ret=" << ret << dendl; } break; } while(true); - l.unlock(ctx, oid); + lock->unlock(); + delete lock; return ret; } @@ -2033,10 +2017,10 @@ int RGWLC::set_bucket_config(RGWBucketInfo& bucket_info, rgw_bucket& bucket = bucket_info.bucket; - ret = guard_lc_modify(store, bucket, cookie, - [&](librados::IoCtx *ctx, const string& oid, - const cls_rgw_lc_entry& entry) { - return cls_rgw_lc_set_entry(*ctx, oid, entry); + ret = guard_lc_modify(store, sal_lc.get(), bucket, cookie, + [&](rgw::sal::Lifecycle* sal_lc, const string& oid, + const rgw::sal::Lifecycle::LCEntry& entry) { + return sal_lc->set_entry(oid, entry); }); return ret; @@ -2060,10 +2044,10 @@ int RGWLC::remove_bucket_config(RGWBucketInfo& bucket_info, } - ret = guard_lc_modify(store, bucket, cookie, - [&](librados::IoCtx *ctx, const string& oid, - const cls_rgw_lc_entry& entry) { - return cls_rgw_lc_rm_entry(*ctx, oid, entry); + ret = guard_lc_modify(store, sal_lc.get(), bucket, cookie, + [&](rgw::sal::Lifecycle* sal_lc, const string& oid, + const rgw::sal::Lifecycle::LCEntry& entry) { + return sal_lc->rm_entry(oid, entry); }); return ret; @@ -2078,6 +2062,7 @@ RGWLC::~RGWLC() namespace rgw::lc { int fix_lc_shard_entry(rgw::sal::RGWRadosStore* store, + rgw::sal::Lifecycle* sal_lc, const RGWBucketInfo& bucket_info, const map& battrs) { @@ -2090,20 +2075,18 @@ int fix_lc_shard_entry(rgw::sal::RGWRadosStore* store, std::string lc_oid; get_lc_oid(store->ctx(), shard_name, &lc_oid); - cls_rgw_lc_entry entry; + rgw::sal::Lifecycle::LCEntry entry; // There are multiple cases we need to encounter here // 1. entry exists and is already set to marker, happens in plain buckets & newly resharded buckets // 2. entry doesn't exist, which usually happens when reshard has happened prior to update and next LC process has already dropped the update // 3. entry exists matching the current bucket id which was after a reshard (needs to be updated to the marker) // We are not dropping the old marker here as that would be caught by the next LC process update - auto lc_pool_ctx = store->getRados()->get_lc_pool_ctx(); - int ret = cls_rgw_lc_get_entry(*lc_pool_ctx, - lc_oid, shard_name, entry); + int ret = sal_lc->get_entry(lc_oid, shard_name, entry); if (ret == 0) { ldout(store->ctx(), 5) << "Entry already exists, nothing to do" << dendl; return ret; // entry is already existing correctly set to marker } - ldout(store->ctx(), 5) << "cls_rgw_lc_get_entry errored ret code=" << ret << dendl; + ldout(store->ctx(), 5) << "lc_get_entry errored ret code=" << ret << dendl; if (ret == -ENOENT) { ldout(store->ctx(), 1) << "No entry for bucket=" << bucket_info.bucket.name << " creating " << dendl; @@ -2113,11 +2096,11 @@ int fix_lc_shard_entry(rgw::sal::RGWRadosStore* store, std::string cookie = cookie_buf; ret = guard_lc_modify( - store, bucket_info.bucket, cookie, - [&lc_pool_ctx, &lc_oid](librados::IoCtx* ctx, + store, sal_lc, bucket_info.bucket, cookie, + [&sal_lc, &lc_oid](rgw::sal::Lifecycle* slc, const string& oid, - const cls_rgw_lc_entry& entry) { - return cls_rgw_lc_set_entry(*lc_pool_ctx, lc_oid, entry); + const rgw::sal::Lifecycle::LCEntry& entry) { + return slc->set_entry(lc_oid, entry); }); } diff --git a/src/rgw/rgw_lc.h b/src/rgw/rgw_lc.h index b0e87efba76c..8f231af6b614 100644 --- a/src/rgw/rgw_lc.h +++ b/src/rgw/rgw_lc.h @@ -462,6 +462,7 @@ WRITE_CLASS_ENCODER(RGWLifecycleConfiguration) class RGWLC : public DoutPrefixProvider { CephContext *cct; rgw::sal::RGWRadosStore *store; + std::unique_ptr sal_lc; int max_objs{0}; string *obj_names{nullptr}; std::atomic down_flag = { false }; @@ -516,12 +517,12 @@ public: bool expired_session(time_t started); time_t thread_stop_at(); int list_lc_progress(string& marker, uint32_t max_entries, - vector&, int& index); + vector&, int& index); int bucket_lc_prepare(int index, LCWorker* worker); int bucket_lc_process(string& shard_id, LCWorker* worker, time_t stop_at, bool once); int bucket_lc_post(int index, int max_lock_sec, - cls_rgw_lc_entry& entry, int& result, LCWorker* worker); + rgw::sal::Lifecycle::LCEntry& entry, int& result, LCWorker* worker); bool going_down(); void start_processor(); void stop_processor(); @@ -532,19 +533,22 @@ public: const map& bucket_attrs); CephContext *get_cct() const override { return cct; } + rgw::sal::Lifecycle *get_lc() const { return sal_lc.get(); } unsigned get_subsys() const; std::ostream& gen_prefix(std::ostream& out) const; private: - int handle_multipart_expiration(RGWRados::Bucket *target, + int handle_multipart_expiration(rgw::sal::RGWBucket* target, const multimap& prefix_map, LCWorker* worker, time_t stop_at, bool once); }; namespace rgw::lc { -int fix_lc_shard_entry(rgw::sal::RGWRadosStore *store, const RGWBucketInfo& bucket_info, +int fix_lc_shard_entry(rgw::sal::RGWRadosStore *store, + rgw::sal::Lifecycle* sal_lc, + const RGWBucketInfo& bucket_info, const map& battrs); std::string s3_expiration_header( diff --git a/src/rgw/rgw_lc_s3.cc b/src/rgw/rgw_lc_s3.cc index cba2b00c0f86..57a996f0cf82 100644 --- a/src/rgw/rgw_lc_s3.cc +++ b/src/rgw/rgw_lc_s3.cc @@ -313,7 +313,7 @@ void LCRule_S3::dump_xml(Formatter *f) const { } } -int RGWLifecycleConfiguration_S3::rebuild(RGWRados *store, RGWLifecycleConfiguration& dest) +int RGWLifecycleConfiguration_S3::rebuild(RGWLifecycleConfiguration& dest) { int ret = 0; multimap::iterator iter; diff --git a/src/rgw/rgw_lc_s3.h b/src/rgw/rgw_lc_s3.h index 0d6ffa93c93d..5aa9c8e8c496 100644 --- a/src/rgw/rgw_lc_s3.h +++ b/src/rgw/rgw_lc_s3.h @@ -95,7 +95,7 @@ public: RGWLifecycleConfiguration_S3() : RGWLifecycleConfiguration(nullptr) {} void decode_xml(XMLObj *obj); - int rebuild(RGWRados *store, RGWLifecycleConfiguration& dest); + int rebuild(RGWLifecycleConfiguration& dest); void dump_xml(Formatter *f) const; }; diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 519fd1109d0d..1b9e6e701ff0 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -3371,7 +3371,6 @@ void RGWDeleteBucket::execute(optional_yield y) int RGWPutObj::init_processing(optional_yield y) { copy_source = url_decode(s->info.env->get("HTTP_X_AMZ_COPY_SOURCE", "")); copy_source_range = s->info.env->get("HTTP_X_AMZ_COPY_SOURCE_RANGE"); - map src_attrs; size_t pos; int ret; @@ -3413,15 +3412,20 @@ int RGWPutObj::init_processing(optional_yield y) { return ret; } } - ret = store->getRados()->get_bucket_info(store->svc(), - copy_source_tenant_name, - copy_source_bucket_name, - copy_source_bucket_info, - NULL, s->yield, &src_attrs); + std::unique_ptr bucket; + ret = store->get_bucket(s->user.get(), copy_source_tenant_name, copy_source_bucket_name, + &bucket, s->yield); + if (ret < 0) { + ldpp_dout(this, 5) << __func__ << "(): get_bucket() returned ret=" << ret << dendl; + return ret; + } + + ret = bucket->get_bucket_info(s->yield); if (ret < 0) { ldpp_dout(this, 5) << __func__ << "(): get_bucket_info() returned ret=" << ret << dendl; return ret; } + copy_source_bucket_info = bucket->get_info(); /* handle x-amz-copy-source-range */ if (copy_source_range) { @@ -3790,12 +3794,7 @@ void RGWPutObj::execute(optional_yield y) /* Handle object versioning of Swift API. */ if (! multipart) { - op_ret = store->getRados()->swift_versioning_copy(obj_ctx, - s->bucket_owner.get_id(), - s->bucket.get(), - s->object.get(), - this, - s->yield); + op_ret = s->object->swift_versioning_copy(s->obj_ctx, this, s->yield); if (op_ret < 0) { return; } @@ -4836,10 +4835,7 @@ void RGWDeleteObj::execute(optional_yield y) s->object->set_atomic(s->obj_ctx); bool ver_restored = false; - op_ret = store->getRados()->swift_versioning_restore(*obj_ctx, s->bucket_owner.get_id(), - s->bucket.get(), - s->object.get(), - ver_restored, this); + op_ret = s->object->swift_versioning_restore(s->obj_ctx, ver_restored, this); if (op_ret < 0) { return; } @@ -5157,15 +5153,14 @@ void RGWCopyObj::execute(optional_yield y) return; } - RGWObjectCtx& obj_ctx = *static_cast(s->obj_ctx); if ( ! version_id.empty()) { dest_object->set_instance(version_id); } else if (dest_bucket->versioning_enabled()) { dest_object->gen_rand_obj_instance_name(); } - src_object->set_atomic(&obj_ctx); - dest_object->set_atomic(&obj_ctx); + src_object->set_atomic(s->obj_ctx); + dest_object->set_atomic(s->obj_ctx); encode_delete_at_attr(delete_at, attrs); @@ -5189,16 +5184,12 @@ void RGWCopyObj::execute(optional_yield y) /* Handle object versioning of Swift API. In case of copying to remote this * should fail gently (op_ret == 0) as the dst_obj will not exist here. */ - op_ret = store->getRados()->swift_versioning_copy(obj_ctx, - dest_bucket->get_info().owner, - dest_bucket.get(), - dest_object.get(), - this, - s->yield); + op_ret = dest_object->swift_versioning_copy(s->obj_ctx, this, s->yield); if (op_ret < 0) { return; } + RGWObjectCtx& obj_ctx = *static_cast(s->obj_ctx); op_ret = src_object->copy_object(obj_ctx, s->user.get(), &s->info, @@ -5557,7 +5548,7 @@ void RGWPutLC::execute(optional_yield y) return; } - op_ret = config.rebuild(store->getRados(), new_config); + op_ret = config.rebuild(new_config); if (op_ret < 0) return; @@ -5575,7 +5566,7 @@ void RGWPutLC::execute(optional_yield y) return; } - op_ret = store->getRados()->get_lc()->set_bucket_config(s->bucket->get_info(), s->bucket_attrs, &new_config); + op_ret = store->get_rgwlc()->set_bucket_config(s->bucket->get_info(), s->bucket_attrs, &new_config); if (op_ret < 0) { return; } @@ -5591,7 +5582,7 @@ void RGWDeleteLC::execute(optional_yield y) return; } - op_ret = store->getRados()->get_lc()->remove_bucket_config(s->bucket->get_info(), s->bucket_attrs); + op_ret = store->get_rgwlc()->remove_bucket_config(s->bucket->get_info(), s->bucket_attrs); if (op_ret < 0) { return; } @@ -6027,18 +6018,12 @@ void RGWCompleteMultipart::execute(optional_yield y) /*take a cls lock on meta_obj to prevent racing completions (or retries) from deleting the parts*/ - rgw_pool meta_pool; - rgw_raw_obj raw_obj; int max_lock_secs_mp = s->cct->_conf.get_val("rgw_mp_lock_max_time"); utime_t dur(max_lock_secs_mp, 0); - store->getRados()->obj_to_raw((s->bucket->get_info()).placement_rule, meta_obj->get_obj(), &raw_obj); - store->getRados()->get_obj_data_pool((s->bucket->get_info()).placement_rule, - meta_obj->get_obj(), &meta_pool); - store->getRados()->open_pool_ctx(meta_pool, serializer.ioctx, true); - - op_ret = serializer.try_lock(raw_obj.oid, dur, y); + serializer = meta_obj->get_serializer("RGWCompleteMultipart"); + op_ret = serializer->try_lock(dur, y); if (op_ret < 0) { ldpp_dout(this, 0) << "failed to acquire lock" << dendl; op_ret = -ERR_INTERNAL_ERROR; @@ -6208,11 +6193,11 @@ void RGWCompleteMultipart::execute(optional_yield y) return; // remove the upload obj - int r = store->getRados()->delete_obj(*static_cast(s->obj_ctx), - s->bucket->get_info(), meta_obj->get_obj(), 0); + string version_id; + int r = meta_obj->delete_object(s->obj_ctx, ACLOwner(), ACLOwner(), ceph::real_time(), false, 0, version_id, null_yield); if (r >= 0) { /* serializer's exclusive lock is released */ - serializer.clear_locked(); + serializer->clear_locked(); } else { ldpp_dout(this, 0) << "WARNING: failed to remove object " << meta_obj << dendl; } @@ -6225,28 +6210,13 @@ void RGWCompleteMultipart::execute(optional_yield y) } } -int RGWCompleteMultipart::MPSerializer::try_lock( - const std::string& _oid, - utime_t dur, optional_yield y) -{ - oid = _oid; - op.assert_exists(); - lock.set_duration(dur); - lock.lock_exclusive(&op); - int ret = rgw_rados_operate(ioctx, oid, &op, y); - if (! ret) { - locked = true; - } - return ret; -} - void RGWCompleteMultipart::complete() { /* release exclusive lock iff not already */ - if (unlikely(serializer.locked)) { - int r = serializer.unlock(); + if (unlikely(serializer && serializer->locked)) { + int r = serializer->unlock(); if (r < 0) { - ldpp_dout(this, 0) << "WARNING: failed to unlock " << serializer.oid << dendl; + ldpp_dout(this, 0) << "WARNING: failed to unlock " << serializer->oid << dendl; } } send_response(); @@ -7941,7 +7911,7 @@ void RGWGetObjLegalHold::execute(optional_yield y) void RGWGetClusterStat::execute(optional_yield y) { - op_ret = this->store->getRados()->get_rados_handle()->cluster_stat(stats_op); + op_ret = store->cluster_stat(stats_op); } diff --git a/src/rgw/rgw_op.h b/src/rgw/rgw_op.h index 46e2c109b574..23cb45d786d2 100644 --- a/src/rgw/rgw_op.h +++ b/src/rgw/rgw_op.h @@ -48,7 +48,6 @@ #include "rgw_torrent.h" #include "rgw_tag.h" #include "rgw_object_lock.h" -#include "cls/lock/cls_lock_client.h" #include "cls/rgw/cls_rgw_client.h" #include "rgw_public_access.h" @@ -1741,31 +1740,11 @@ protected: string etag; string version_id; bufferlist data; - - struct MPSerializer { - librados::IoCtx ioctx; - rados::cls::lock::Lock lock; - librados::ObjectWriteOperation op; - std::string oid; - bool locked; - - MPSerializer() : lock("RGWCompleteMultipart"), locked(false) - {} - - int try_lock(const std::string& oid, utime_t dur, optional_yield y); - - int unlock() { - return lock.unlock(&ioctx, oid); - } - - void clear_locked() { - locked = false; - } - } serializer; + rgw::sal::MPSerializer* serializer; public: - RGWCompleteMultipart() {} - ~RGWCompleteMultipart() override {} + RGWCompleteMultipart() : serializer(nullptr) {} + ~RGWCompleteMultipart() override { delete serializer; } int verify_permission(optional_yield y) override; void pre_exec() override; @@ -2375,7 +2354,7 @@ public: class RGWGetClusterStat : public RGWOp { protected: - struct rados_cluster_stat_t stats_op; + RGWClusterStat stats_op; public: RGWGetClusterStat() {} diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 644ea9cc442f..6e7e6c45e4ef 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -8135,7 +8135,7 @@ int RGWRados::process_gc(bool expired_only) } int RGWRados::list_lc_progress(string& marker, uint32_t max_entries, - vector& progress_map, + vector& progress_map, int& index) { return lc->list_lc_progress(marker, max_entries, progress_map, index); diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 529c4b101161..53fdb9b55627 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -370,7 +370,11 @@ public: class RGWGetDirHeader_CB; class RGWGetUserHeader_CB; -namespace rgw { namespace sal { class RGWRadosStore; } } +namespace rgw { namespace sal { + class RGWRadosStore; + class MPRadosSerializer; + class LCRadosSerializer; +} } class RGWAsyncRadosProcessor; @@ -396,7 +400,6 @@ class RGWRados friend class RGWGC; friend class RGWMetaNotifier; friend class RGWDataNotifier; - friend class RGWLC; friend class RGWObjectExpirer; friend class RGWMetaSyncProcessorThread; friend class RGWDataSyncProcessorThread; @@ -404,7 +407,8 @@ class RGWRados friend class RGWBucketReshard; friend class RGWBucketReshardLock; friend class BucketIndexLockGuard; - friend class RGWCompleteMultipart; + friend class rgw::sal::MPRadosSerializer; + friend class rgw::sal::LCRadosSerializer; friend class rgw::sal::RGWRadosStore; /** Open the pool used as root for this gateway */ @@ -1446,7 +1450,7 @@ public: int process_lc(); int list_lc_progress(string& marker, uint32_t max_entries, - vector& progress_map, int& index); + vector& progress_map, int& index); int bucket_check_index(RGWBucketInfo& bucket_info, map *existing_stats, diff --git a/src/rgw/rgw_sal.h b/src/rgw/rgw_sal.h index 45ed6203854c..cc74634dc86c 100644 --- a/src/rgw/rgw_sal.h +++ b/src/rgw/rgw_sal.h @@ -21,6 +21,7 @@ class RGWGetDataCB; struct RGWObjState; class RGWAccessListFilter; +class RGWLC; struct RGWUsageIter { string read_iter; @@ -29,6 +30,22 @@ struct RGWUsageIter { RGWUsageIter() : index(0) {} }; +/** + * @struct RGWClusterStat + * Cluster-wide usage information + */ +struct RGWClusterStat { + /// total device size + uint64_t kb; + /// total used + uint64_t kb_used; + /// total available/free + uint64_t kb_avail; + /// number of objects + uint64_t num_objects; +}; + + namespace rgw { namespace sal { #define RGW_SAL_VERSION 1 @@ -37,6 +54,8 @@ class RGWUser; class RGWBucket; class RGWObject; class RGWBucketList; +struct MPSerializer; +class Lifecycle; enum AttrsMod { ATTRSMOD_NONE = 0, @@ -55,7 +74,7 @@ class RGWStore : public DoutPrefixProvider { virtual std::unique_ptr get_object(const rgw_obj_key& k) = 0; virtual int get_bucket(RGWUser* u, const rgw_bucket& b, std::unique_ptr* bucket, optional_yield y) = 0; virtual int get_bucket(RGWUser* u, const RGWBucketInfo& i, std::unique_ptr* bucket) = 0; - virtual int get_bucket(RGWUser* u, const std::string& tenant, const std::string&name, std::unique_ptr* bucket, optional_yield y) = 0; + virtual int get_bucket(RGWUser* u, const std::string& tenant, const std::string& name, std::unique_ptr* bucket, optional_yield y) = 0; virtual int create_bucket(RGWUser& u, const rgw_bucket& b, const std::string& zonegroup_id, rgw_placement_rule& placement_rule, @@ -80,6 +99,9 @@ class RGWStore : public DoutPrefixProvider { optional_yield y) = 0; virtual const RGWZoneGroup& get_zonegroup() = 0; virtual int get_zonegroup(const string& id, RGWZoneGroup& zonegroup) = 0; + virtual int cluster_stat(RGWClusterStat& stats) = 0; + virtual std::unique_ptr get_lifecycle(void) = 0; + virtual RGWLC* get_rgwlc(void) = 0; virtual void finalize(void)=0; @@ -162,7 +184,7 @@ class RGWBucket { struct ListResults { vector objs; map common_prefixes; - bool is_truncated; + bool is_truncated{false}; rgw_obj_key next_marker; }; @@ -205,6 +227,7 @@ class RGWBucket { virtual int chown(RGWUser* new_user, RGWUser* old_user, optional_yield y) = 0; virtual int put_instance_info(bool exclusive, ceph::real_time mtime) = 0; virtual bool is_owner(RGWUser* user) = 0; + virtual RGWUser* get_owner(void) { return owner; }; virtual int check_empty(optional_yield y) = 0; virtual int check_quota(RGWQuotaInfo& user_quota, RGWQuotaInfo& bucket_quota, uint64_t obj_size, optional_yield y, bool check_size_only = false) = 0; virtual int set_instance_attrs(RGWAttrs& attrs, optional_yield y) = 0; @@ -432,6 +455,7 @@ class RGWObject { virtual int delete_obj_attrs(RGWObjectCtx *rctx, const char *attr_name, optional_yield y) = 0; virtual int copy_obj_data(RGWObjectCtx& rctx, RGWBucket* dest_bucket, RGWObject* dest_obj, uint16_t olh_epoch, std::string* petag, const DoutPrefixProvider *dpp, optional_yield y) = 0; virtual bool is_expired() = 0; + virtual MPSerializer* get_serializer(const std::string& lock_name) = 0; RGWAttrs& get_attrs(void) { return attrs; } ceph::real_time get_mtime(void) const { return mtime; } @@ -446,6 +470,14 @@ class RGWObject { void set_in_extra_data(bool i) { in_extra_data = i; } int range_to_ofs(uint64_t obj_size, int64_t &ofs, int64_t &end); + /* Swift versioning */ + virtual int swift_versioning_restore(RGWObjectCtx* obj_ctx, + bool& restored, /* out */ + const DoutPrefixProvider *dpp) = 0; + virtual int swift_versioning_copy(RGWObjectCtx* obj_ctx, + const DoutPrefixProvider *dpp, + optional_yield y) = 0; + /* OPs */ virtual std::unique_ptr get_read_op(RGWObjectCtx*) = 0; virtual std::unique_ptr get_write_op(RGWObjectCtx*) = 0; @@ -469,6 +501,14 @@ class RGWObject { } virtual void gen_rand_obj_instance_name() = 0; virtual void raw_obj_to_obj(const rgw_raw_obj& raw_obj) = 0; + virtual void get_raw_obj(rgw_raw_obj* raw_obj) = 0; + virtual int transition(RGWObjectCtx& rctx, + RGWBucket* bucket, + const rgw_placement_rule& placement_rule, + const real_time& mtime, + uint64_t olh_epoch, + const DoutPrefixProvider *dpp, + optional_yield y) = 0; /* dang - This is temporary, until the API is completed */ rgw_obj_key& get_key() { return key; } @@ -493,5 +533,63 @@ class RGWObject { } }; +struct Serializer { + Serializer() = default; + virtual ~Serializer() = default; + + virtual int try_lock(utime_t dur, optional_yield y) = 0; + virtual int unlock() = 0; +}; + +struct MPSerializer : Serializer { + bool locked; + std::string oid; + MPSerializer() : locked(false) {} + virtual ~MPSerializer() = default; + + void clear_locked() { + locked = false; + } +}; + +struct LCSerializer : Serializer { + LCSerializer() {} + virtual ~LCSerializer() = default; +}; + +class Lifecycle { +public: + struct LCHead { + time_t start_date{0}; + std::string marker; + + LCHead() = default; + LCHead(time_t _date, std::string& _marker) : start_date(_date), marker(_marker) {} + }; + + struct LCEntry { + std::string bucket; + uint64_t start_time{0}; + uint32_t status{0}; + + LCEntry() = default; + LCEntry(std::string& _bucket, uint64_t _time, uint32_t _status) : bucket(_bucket), start_time(_time), status(_status) {} + }; + + Lifecycle() = default; + virtual ~Lifecycle() = default; + + virtual int get_entry(const string& oid, const std::string& marker, LCEntry& entry) = 0; + virtual int get_next_entry(const string& oid, std::string& marker, LCEntry& entry) = 0; + virtual int set_entry(const string& oid, const LCEntry& entry) = 0; + virtual int list_entries(const string& oid, const string& marker, + uint32_t max_entries, vector& entries) = 0; + virtual int rm_entry(const string& oid, const LCEntry& entry) = 0; + virtual int get_head(const string& oid, LCHead& head) = 0; + virtual int put_head(const string& oid, const LCHead& head) = 0; + + virtual LCSerializer* get_serializer(const std::string& lock_name, const std::string& oid, const std::string& cookie) = 0; +}; + } } // namespace rgw::sal diff --git a/src/rgw/rgw_sal_rados.cc b/src/rgw/rgw_sal_rados.cc index e3efa4383b83..b2602f40cec4 100644 --- a/src/rgw/rgw_sal_rados.cc +++ b/src/rgw/rgw_sal_rados.cc @@ -28,12 +28,12 @@ #include "rgw_multi.h" #include "rgw_acl_s3.h" -/* Stuff for RGWRadosStore. Move to separate file when store split out */ #include "rgw_zone.h" #include "rgw_rest_conn.h" #include "services/svc_sys_obj.h" #include "services/svc_zone.h" #include "services/svc_tier_rados.h" +#include "cls/rgw/cls_rgw_client.h" #define dout_subsys ceph_subsys_rgw @@ -538,6 +538,11 @@ void RGWRadosObject::raw_obj_to_obj(const rgw_raw_obj& raw_obj) set_key(tobj.key); } +void RGWRadosObject::get_raw_obj(rgw_raw_obj* raw_obj) +{ + store->getRados()->obj_to_raw((bucket->get_info()).placement_rule, get_obj(), raw_obj); +} + int RGWRadosObject::omap_get_vals_by_keys(const std::string& oid, const std::set& keys, RGWAttrs *vals) @@ -556,6 +561,22 @@ int RGWRadosObject::omap_get_vals_by_keys(const std::string& oid, return cur_ioctx.omap_get_vals_by_keys(oid, keys, vals); } +MPSerializer* RGWRadosObject::get_serializer(const std::string& lock_name) +{ + return new MPRadosSerializer(store, this, lock_name); +} + +int RGWRadosObject::transition(RGWObjectCtx& rctx, + RGWBucket* bucket, + const rgw_placement_rule& placement_rule, + const real_time& mtime, + uint64_t olh_epoch, + const DoutPrefixProvider *dpp, + optional_yield y) +{ + return store->getRados()->transition_obj(rctx, bucket, *this, placement_rule, mtime, olh_epoch, dpp, y); +} + std::unique_ptr RGWRadosObject::get_read_op(RGWObjectCtx *ctx) { return std::unique_ptr(new RGWRadosObject::RadosReadOp(this, ctx)); @@ -752,6 +773,30 @@ int RGWRadosObject::RadosWriteOp::write_meta(uint64_t size, uint64_t accounted_s return ret; } +int RGWRadosObject::swift_versioning_restore(RGWObjectCtx* obj_ctx, + bool& restored, + const DoutPrefixProvider *dpp) +{ + return store->getRados()->swift_versioning_restore(*obj_ctx, + bucket->get_owner()->get_id(), + bucket, + this, + restored, + dpp); +} + +int RGWRadosObject::swift_versioning_copy(RGWObjectCtx* obj_ctx, + const DoutPrefixProvider *dpp, + optional_yield y) +{ + return store->getRados()->swift_versioning_copy(*obj_ctx, + bucket->get_info().owner, + bucket, + this, + dpp, + y); +} + int RGWRadosStore::get_bucket(RGWUser* u, const rgw_bucket& b, std::unique_ptr* bucket, optional_yield y) { int ret; @@ -886,6 +931,23 @@ int RGWRadosStore::get_zonegroup(const string& id, RGWZoneGroup& zonegroup) return rados->svc.zone->get_zonegroup(id, zonegroup); } +int RGWRadosStore::cluster_stat(RGWClusterStat& stats) +{ + rados_cluster_stat_t rados_stats; + int ret; + + ret = rados->get_rados_handle()->cluster_stat(rados_stats); + if (ret < 0) + return ret; + + stats.kb = rados_stats.kb; + stats.kb_used = rados_stats.kb_used; + stats.kb_avail = rados_stats.kb_avail; + stats.num_objects = rados_stats.num_objects; + + return ret; +} + int RGWRadosStore::create_bucket(RGWUser& u, const rgw_bucket& b, const string& zonegroup_id, rgw_placement_rule& placement_rule, @@ -1002,6 +1064,141 @@ int RGWRadosStore::create_bucket(RGWUser& u, const rgw_bucket& b, return ret; } +std::unique_ptr RGWRadosStore::get_lifecycle(void) +{ + return std::unique_ptr(new RadosLifecycle(this)); +} + + +MPRadosSerializer::MPRadosSerializer(RGWRadosStore* store, RGWRadosObject* obj, const std::string& lock_name) : + lock(lock_name) +{ + rgw_pool meta_pool; + rgw_raw_obj raw_obj; + + obj->get_raw_obj(&raw_obj); + oid = raw_obj.oid; + store->getRados()->get_obj_data_pool(obj->get_bucket()->get_placement_rule(), + obj->get_obj(), &meta_pool); + store->getRados()->open_pool_ctx(meta_pool, ioctx, true); +} + +int MPRadosSerializer::try_lock(utime_t dur, optional_yield y) +{ + op.assert_exists(); + lock.set_duration(dur); + lock.lock_exclusive(&op); + int ret = rgw_rados_operate(ioctx, oid, &op, y); + if (! ret) { + locked = true; + } + return ret; +} + +LCRadosSerializer::LCRadosSerializer(RGWRadosStore* store, const std::string& _oid, const std::string& lock_name, const std::string& cookie) : + lock(lock_name), oid(_oid) +{ + ioctx = &store->getRados()->lc_pool_ctx; + lock.set_cookie(cookie); +} + +int LCRadosSerializer::try_lock(utime_t dur, optional_yield y) +{ + lock.set_duration(dur); + return lock.lock_exclusive(ioctx, oid); +} + +int RadosLifecycle::get_entry(const string& oid, const std::string& marker, + LCEntry& entry) +{ + cls_rgw_lc_entry cls_entry; + int ret = cls_rgw_lc_get_entry(*store->getRados()->get_lc_pool_ctx(), oid, marker, cls_entry); + + entry.bucket = cls_entry.bucket; + entry.start_time = cls_entry.start_time; + entry.status = cls_entry.status; + + return ret; +} + +int RadosLifecycle::get_next_entry(const string& oid, std::string& marker, + LCEntry& entry) +{ + cls_rgw_lc_entry cls_entry; + int ret = cls_rgw_lc_get_next_entry(*store->getRados()->get_lc_pool_ctx(), oid, marker, + cls_entry); + + entry.bucket = cls_entry.bucket; + entry.start_time = cls_entry.start_time; + entry.status = cls_entry.status; + + return ret; +} + +int RadosLifecycle::set_entry(const string& oid, const LCEntry& entry) +{ + cls_rgw_lc_entry cls_entry; + + cls_entry.bucket = entry.bucket; + cls_entry.start_time = entry.start_time; + cls_entry.status = entry.status; + + return cls_rgw_lc_set_entry(*store->getRados()->get_lc_pool_ctx(), oid, cls_entry); +} + +int RadosLifecycle::list_entries(const string& oid, const string& marker, + uint32_t max_entries, vector& entries) +{ + vector cls_entries; + int ret = cls_rgw_lc_list(*store->getRados()->get_lc_pool_ctx(), oid, marker, max_entries, cls_entries); + + if (ret < 0) + return ret; + + for (auto& entry : cls_entries) { + entries.push_back(LCEntry(entry.bucket, entry.start_time, entry.status)); + } + + return ret; +} + +int RadosLifecycle::rm_entry(const string& oid, const LCEntry& entry) +{ + cls_rgw_lc_entry cls_entry; + + cls_entry.bucket = entry.bucket; + cls_entry.start_time = entry.start_time; + cls_entry.status = entry.status; + + return cls_rgw_lc_rm_entry(*store->getRados()->get_lc_pool_ctx(), oid, cls_entry); +} + +int RadosLifecycle::get_head(const string& oid, LCHead& head) +{ + cls_rgw_lc_obj_head cls_head; + int ret = cls_rgw_lc_get_head(*store->getRados()->get_lc_pool_ctx(), oid, cls_head); + + head.marker = cls_head.marker; + head.start_date = cls_head.start_date; + + return ret; +} + +int RadosLifecycle::put_head(const string& oid, const LCHead& head) +{ + cls_rgw_lc_obj_head cls_head; + + cls_head.marker = head.marker; + cls_head.start_date = head.start_date; + + return cls_rgw_lc_put_head(*store->getRados()->get_lc_pool_ctx(), oid, cls_head); +} + +LCSerializer* RadosLifecycle::get_serializer(const std::string& lock_name, const std::string& oid, const std::string& cookie) +{ + return new LCRadosSerializer(store, oid, lock_name, cookie); +} + } // namespace rgw::sal rgw::sal::RGWRadosStore *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread, bool run_reshard_thread, bool use_cache) diff --git a/src/rgw/rgw_sal_rados.h b/src/rgw/rgw_sal_rados.h index 0937870de96a..b6bd89b181e7 100644 --- a/src/rgw/rgw_sal_rados.h +++ b/src/rgw/rgw_sal_rados.h @@ -17,6 +17,7 @@ #include "rgw_sal.h" #include "rgw_rados.h" +#include "cls/lock/cls_lock_client.h" namespace rgw { namespace sal { @@ -131,9 +132,26 @@ class RGWRadosObject : public RGWObject { virtual bool is_expired() override; virtual void gen_rand_obj_instance_name() override; virtual void raw_obj_to_obj(const rgw_raw_obj& raw_obj) override; + virtual void get_raw_obj(rgw_raw_obj* raw_obj) override; virtual std::unique_ptr clone() { return std::unique_ptr(new RGWRadosObject(*this)); } + virtual MPSerializer* get_serializer(const std::string& lock_name) override; + virtual int transition(RGWObjectCtx& rctx, + RGWBucket* bucket, + const rgw_placement_rule& placement_rule, + const real_time& mtime, + uint64_t olh_epoch, + const DoutPrefixProvider *dpp, + optional_yield y) override; + + /* Swift versioning */ + virtual int swift_versioning_restore(RGWObjectCtx* obj_ctx, + bool& restored, + const DoutPrefixProvider *dpp) override; + virtual int swift_versioning_copy(RGWObjectCtx* obj_ctx, + const DoutPrefixProvider *dpp, + optional_yield y) override; /* OPs */ virtual std::unique_ptr get_read_op(RGWObjectCtx *) override; @@ -276,6 +294,9 @@ class RGWRadosStore : public RGWStore { optional_yield y) override; virtual const RGWZoneGroup& get_zonegroup() override; virtual int get_zonegroup(const string& id, RGWZoneGroup& zonegroup) override; + virtual int cluster_stat(RGWClusterStat& stats) override; + virtual std::unique_ptr get_lifecycle(void) override; + virtual RGWLC* get_rgwlc(void) { return rados->get_lc(); } void setRados(RGWRados * st) { rados = st; } RGWRados *getRados(void) { return rados; } @@ -302,6 +323,51 @@ class RGWRadosStore : public RGWStore { }; +class MPRadosSerializer : public MPSerializer { + librados::IoCtx ioctx; + rados::cls::lock::Lock lock; + librados::ObjectWriteOperation op; + +public: + MPRadosSerializer(RGWRadosStore* store, RGWRadosObject* obj, const std::string& lock_name); + + virtual int try_lock(utime_t dur, optional_yield y) override; + int unlock() { + return lock.unlock(&ioctx, oid); + } +}; + +class LCRadosSerializer : public LCSerializer { + librados::IoCtx* ioctx; + rados::cls::lock::Lock lock; + const std::string oid; + +public: + LCRadosSerializer(RGWRadosStore* store, const std::string& oid, const std::string& lock_name, const std::string& cookie); + + virtual int try_lock(utime_t dur, optional_yield y) override; + int unlock() { + return lock.unlock(ioctx, oid); + } +}; + +class RadosLifecycle : public Lifecycle { + RGWRadosStore* store; + +public: + RadosLifecycle(RGWRadosStore* _st) : store(_st) {} + + virtual int get_entry(const string& oid, const std::string& marker, LCEntry& entry) override; + virtual int get_next_entry(const string& oid, std::string& marker, LCEntry& entry) override; + virtual int set_entry(const string& oid, const LCEntry& entry) override; + virtual int list_entries(const string& oid, const string& marker, + uint32_t max_entries, vector& entries) override; + virtual int rm_entry(const string& oid, const LCEntry& entry) override; + virtual int get_head(const string& oid, LCHead& head) override; + virtual int put_head(const string& oid, const LCHead& head) override; + virtual LCSerializer* get_serializer(const std::string& lock_name, const std::string& oid, const std::string& cookie) override; +}; + } } // namespace rgw::sal class RGWStoreManager {