From 15723d5b1594040e799a063df011655de93a0e47 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Fri, 7 Jun 2019 15:26:32 -0700 Subject: [PATCH] rgw: move bucket overwrite code to common path In order to do that needed to make sure we have the old_info and identify overwrites. Moved the bilog and data log related overwrite handlers into svc.bucket, and also the bucket meta handler now implements put_checked() and not using the generic sobj call. Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_bucket.cc | 107 ++++++++++++++++++++++++--------- src/rgw/rgw_bucket.h | 15 +++++ src/rgw/services/svc_bucket.cc | 74 ++++++++++++++++++++++- src/rgw/services/svc_bucket.h | 6 ++ 4 files changed, 169 insertions(+), 33 deletions(-) diff --git a/src/rgw/rgw_bucket.cc b/src/rgw/rgw_bucket.cc index fb0bc668724..fdab9cfd2b0 100644 --- a/src/rgw/rgw_bucket.cc +++ b/src/rgw/rgw_bucket.cc @@ -2622,7 +2622,8 @@ public: ret = ctl.bucket->store_bucket_instance_info(be.bucket, new_bi, y, RGWBucketCtl::BucketInstance::PutParams() .set_exclusive(false) .set_mtime(orig_mtime) - .set_attrs(&attrs_m)); + .set_attrs(&attrs_m) + .set_orig_info(&old_bi)); if (ret < 0) { ldout(cct, 0) << "ERROR: failed to put new bucket instance info for bucket=" << new_bi.bucket << " ret=" << ret << dendl; return ret; @@ -2830,6 +2831,7 @@ public: } int put_check() override; + int put_checked() override; int put_post() override; }; @@ -2882,33 +2884,6 @@ int RGWMetadataHandlerPut_BucketInstance::put_check() bci.info.placement_rule = old_bci->info.placement_rule; } - if (exists && old_bci->info.datasync_flag_enabled() != bci.info.datasync_flag_enabled()) { - int shards_num = bci.info.num_shards? bci.info.num_shards : 1; - int shard_id = bci.info.num_shards? 0 : -1; - - if (!bci.info.datasync_flag_enabled()) { - ret = store->stop_bi_log_entries(bci.info, -1); - if (ret < 0) { - lderr(cct) << "ERROR: failed writing bilog" << dendl; - return ret; - } - } else { - ret = store->resync_bi_log_entries(bci.info, -1); - if (ret < 0) { - lderr(cct) << "ERROR: failed writing bilog" << dendl; - return ret; - } - } - - for (int i = 0; i < shards_num; ++i, ++shard_id) { - ret = store->data_log->add_entry(bci.info.bucket, shard_id); - if (ret < 0) { - lderr(cct) << "ERROR: failed writing data log" << dendl; - return ret; - } - } - } - /* record the read version (if any), store the new version */ bci.info.objv_tracker.read_version = objv_tracker.read_version; bci.info.objv_tracker.write_version = objv_tracker.write_version; @@ -2916,6 +2891,27 @@ int RGWMetadataHandlerPut_BucketInstance::put_check() return 0; } +int RGWMetadataHandlerPut_BucketInstance::put_checked() +{ + RGWBucketInstanceMetadataObject *orig_obj = static_cast(old_obj); + + RGWBucketInfo *orig_info = (orig_obj ? &orig_obj->get_bucket_info() : nullptr); + + auto& info = obj->get_bucket_info(); + auto mtime = obj->get_mtime(); + auto pattrs = obj->get_pattrs(); + + RGWSI_Bucket_BI_Ctx ctx(op->ctx()); + + return handler->svc.bucket->store_bucket_instance_info(ctx, + entry, + info, + orig_info, + false, + mtime, + pattrs); +} + int RGWMetadataHandlerPut_BucketInstance::put_post() { RGWBucketCompleteInfo& bci = obj->get_bci(); @@ -3061,6 +3057,7 @@ int RGWBucketCtl::do_store_bucket_instance_info(RGWSI_Bucket_BI_Ctx& ctx, return svc.bucket->store_bucket_instance_info(ctx, RGWSI_Bucket::get_bi_meta_key(bucket), info, + params.orig_info, params.exclusive, params.mtime, params.attrs, @@ -3095,6 +3092,55 @@ int RGWBucketCtl::remove_bucket_instance_info(const rgw_bucket& bucket, }); } +int RGWBucketCtl::do_store_linked_bucket_info(RGWSI_Bucket_X_Ctx& ctx, + RGWBucketInfo& info, + RGWBucketInfo *orig_info, + bool exclusive, real_time mtime, + obj_version *pep_objv, + map *pattrs, + bool create_entry_point) +{ + bool create_head = !info.has_instance_obj || create_entry_point; + + int ret = svc.bucket->store_bucket_instance_info(ctx.bi, + RGWSI_Bucket::get_bi_meta_key(info.bucket), + info, + orig_info, + exclusive, + mtime, pattrs); + if (ret < 0) { + return ret; + } + + if (!create_head) + return 0; /* done! */ + + RGWBucketEntryPoint entry_point; + entry_point.bucket = info.bucket; + entry_point.owner = info.owner; + entry_point.creation_time = info.creation_time; + entry_point.linked = true; + RGWObjVersionTracker ot; + if (pep_objv && !pep_objv->tag.empty()) { + ot.write_version = *pep_objv; + } else { + ot.generate_new_write_ver(cct); + if (pep_objv) { + *pep_objv = ot.write_version; + } + } + ret = svc.bucket->store_bucket_entrypoint_info(ctx.ep, + RGWSI_Bucket::get_entrypoint_meta_key(info.bucket), + entry_point, + exclusive, + mtime, + pattrs, + &ot); + if (ret < 0) + return ret; + + return 0; +} int RGWBucketCtl::convert_old_bucket_info(RGWSI_Bucket_X_Ctx& ctx, const rgw_bucket& bucket, optional_yield y) @@ -3127,7 +3173,7 @@ int RGWBucketCtl::convert_old_bucket_info(RGWSI_Bucket_X_Ctx& ctx, ot.generate_new_write_ver(cct); - ret = svc.bucket->store_linked_bucket_info(ctx.bi, info, false, ep_mtime, &ot.write_version, &attrs, true, y); + ret = do_store_linked_bucket_info(ctx, info, nullptr, false, ep_mtime, &ot.write_version, &attrs, true, y); if (ret < 0) { ldout(cct, 0) << "ERROR: failed to put_linked_bucket_info(): " << ret << dendl; return ret; @@ -3158,7 +3204,8 @@ int RGWBucketCtl::set_bucket_instance_attrs(RGWBucketInfo& bucket_info, bucket_info, y, BucketInstance::PutParams().set_attrs(&attrs) - .set_objv_tracker(objv_tracker)); + .set_objv_tracker(objv_tracker) + .set_orig_info(&bucket_info)); }); } diff --git a/src/rgw/rgw_bucket.h b/src/rgw/rgw_bucket.h index fa7689e2f2e..d6938961176 100644 --- a/src/rgw/rgw_bucket.h +++ b/src/rgw/rgw_bucket.h @@ -705,11 +705,18 @@ public: }; struct PutParams { + std::optional orig_info; /* nullopt: orig_info was not fetched, + nullptr: orig_info was not found (new bucket instance */ ceph::real_time mtime; bool exclusive{false}; map *attrs{nullptr}; RGWObjVersionTracker *objv_tracker{nullptr}; + PutParams& set_orig_info(RGWBucketInfo *pinfo) { + orig_info = pinfo; + return *this; + } + PutParams& set_mtime(const ceph::real_time& _mtime) { mtime = _mtime; return *this; @@ -803,6 +810,14 @@ private: optional_yield y, ceph::optional_ref_default params); + int do_store_linked_bucket_info(RGWSI_Bucket_X_Ctx& ctx, + RGWBucketInfo& info, + RGWBucketInfo *orig_info, + bool exclusive, real_time mtime, + obj_version *pep_objv, + map *pattrs, + bool create_entry_point); + int do_link_bucket(RGWSI_Bucket_EP_Ctx& ctx, const rgw_user& user, const rgw_bucket& bucket, diff --git a/src/rgw/services/svc_bucket.cc b/src/rgw/services/svc_bucket.cc index 893750706b7..210263657b4 100644 --- a/src/rgw/services/svc_bucket.cc +++ b/src/rgw/services/svc_bucket.cc @@ -260,7 +260,7 @@ int RGWSI_Bucket::store_bucket_entrypoint_info(RGWSI_Bucket_EP_Ctx& ctx, RGWSI_MBSObj_PutParams params(bl, pattrs, mtime, exclusive); - int ret = svc.meta_be->put_entry(ctx.get(), key, params, objv_tracker); + int ret = svc.meta_be->put(ctx.get(), key, params, objv_tracker); if (ret == -EEXIST) { /* well, if it's exclusive we shouldn't overwrite it, because we might race with another * bucket operation on this specific bucket (e.g., being synced from the master), but @@ -285,7 +285,7 @@ int RGWSI_Bucket::remove_bucket_entrypoint_info(RGWSI_Bucket_EP_Ctx& ctx, optional_yield y) { RGWSI_MBSObj_RemoveParams params; - return svc.meta_be->remove_entry(ctx.get(), key, params, objv_tracker, y); + return svc.meta_be->remove(ctx.get(), key, params, objv_tracker, y); } int RGWSI_Bucket::read_bucket_instance_info(RGWSI_Bucket_BI_Ctx& ctx, @@ -470,6 +470,7 @@ int RGWSI_Bucket::read_bucket_info(RGWSI_Bucket_X_Ctx& ctx, int RGWSI_Bucket::store_bucket_instance_info(RGWSI_Bucket_BI_Ctx& ctx, const string& key, RGWBucketInfo& info, + std::optional orig_info, bool exclusive, real_time mtime, map *pattrs, @@ -478,9 +479,44 @@ int RGWSI_Bucket::store_bucket_instance_info(RGWSI_Bucket_BI_Ctx& ctx, bufferlist bl; encode(info, bl); + /* + * we might need some special handling if overwriting + */ + + if (!orig_info && !exclusive) { /* if exclusive, we're going to fail when try + to overwrite, so the whole check here is moot */ + /* we're here because orig_info wasn't passed in */ + RGWBucketInfo _orig_info; + + /* + * we don't have info about what was there before, so need to fetch first + */ + int r = read_bucket_instance_info(ctx, + key, + &_orig_info, + nullptr, nullptr, + nullptr, boost::none); + if (r < 0) { + if (r != -ENOENT) { + ldout(cct, 0) << "ERROR: " << __func__ << "(): read_bucket_instance_info() of key=" << key << " returned r=" << r << dendl; + return r; + } + } else { + *orig_info = &_orig_info; + } + } + + if (orig_info && *orig_info && !exclusive) { + int r = handle_bucket_overwrite(ctx, key, info, *(orig_info.value())); + if (r < 0) { + ldout(cct, 0) << "ERROR: " << __func__ << "(): handle_bucket_overwrite() of key=" << key << " returned r=" << r << dendl; + return r; + } + } + RGWSI_MBSObj_PutParams params(bl, pattrs, mtime, exclusive); - int ret = svc.meta_be->put_entry(ctx.get(), key, params, &info.objv_tracker, y); + int ret = svc.meta_be->put(ctx.get(), key, params, &info.objv_tracker, y); if (ret == -EEXIST) { /* well, if it's exclusive we shouldn't overwrite it, because we might race with another * bucket operation on this specific bucket (e.g., being synced from the master), but @@ -500,6 +536,38 @@ int RGWSI_Bucket::store_bucket_instance_info(RGWSI_Bucket_BI_Ctx& ctx, return ret; } +int RGWSI_Bucket::handle_bucket_overwrite(RGWSI_Bucket_BI_Ctx& ctx, + const string& key, + const RGWBucketInfo& info, + const RGWBucketInfo& orig_info) +{ + if (orig_info.datasync_flag_enabled() != info.datasync_flag_enabled()) { + int shards_num = info.num_shards? info.num_shards : 1; + int shard_id = info.num_shards? 0 : -1; + + int ret; + if (!info.datasync_flag_enabled()) { + ret = store->stop_bi_log_entries(info, -1); + } else { + ret = store->resync_bi_log_entries(info, -1); + } + if (ret < 0) { + lderr(cct) << "ERROR: failed writing bilog (key=" << key << "); ret=" << ret << dendl; + return ret; + } + + for (int i = 0; i < shards_num; ++i, ++shard_id) { + ret = store->data_log->add_entry(info.bucket, shard_id); + if (ret < 0) { + lderr(cct) << "ERROR: failed writing data log (info.bucket=" << info.bucket << ", shard_id=" << shard_id << ")" << dendl; + return ret; + } + } + } + + return 0; +} + int RGWSI_Bucket::remove_bucket_instance_info(RGWSI_Bucket_BI_Ctx& ctx, const string& key, RGWObjVersionTracker *objv_tracker, diff --git a/src/rgw/services/svc_bucket.h b/src/rgw/services/svc_bucket.h index 5f32c08be44..1aa6fe2579a 100644 --- a/src/rgw/services/svc_bucket.h +++ b/src/rgw/services/svc_bucket.h @@ -65,6 +65,10 @@ class RGWSI_Bucket : public RGWServiceInstance RGWBucketEnt *ent, optional_yield y); + int handle_bucket_overwrite(RGWSI_Bucket_BI_Ctx& ctx, + const string& key, + const RGWBucketInfo& info, + const RGWBucketInfo& orig_info); public: struct Svc { RGWSI_Bucket *bucket{nullptr}; @@ -144,6 +148,8 @@ public: int store_bucket_instance_info(RGWSI_Bucket_BI_Ctx& ctx, const string& key, RGWBucketInfo& info, + std::optional orig_info, /* nullopt: orig_info was not fetched, + nullptr: orig_info was not found (new bucket instance */ bool exclusive, real_time mtime, map *pattrs, -- 2.39.5