From 557b519881c54ee7195d84968d1e2632827b7713 Mon Sep 17 00:00:00 2001 From: Soumya Koduri Date: Sun, 16 Aug 2020 14:31:50 +0530 Subject: [PATCH] rgw/CloudTransition: Verify if the object is already tiered Add class to fetch headers from remote endpoint and verify if the object is already tiered. & Few other fixes stated below - * Erase data in the head of cloud transitioned object * 'placement rm' command should erase tier_config details * A new option added in the object manifest to denote if the object is tiered in multiparts Signed-off-by: Soumya Koduri --- src/rgw/rgw_admin.cc | 5 + src/rgw/rgw_json_enc.cc | 1 + src/rgw/rgw_lc.cc | 34 ++++- src/rgw/rgw_lc_tier.cc | 267 +++++++++++++++++++++++++++++++++++++ src/rgw/rgw_lc_tier.h | 16 +++ src/rgw/rgw_obj_manifest.h | 7 +- 6 files changed, 326 insertions(+), 4 deletions(-) diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 2378499fa94ef..0552da0246268 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -5109,6 +5109,11 @@ int main(int argc, const char **argv) if (iter != zonegroup.placement_targets.end()) { RGWZoneGroupPlacementTarget& info = zonegroup.placement_targets[placement_id]; info.storage_classes.erase(*opt_storage_class); + + auto ptiter = info.tier_targets.find(*opt_storage_class); + if (ptiter != info.tier_targets.end()) { + info.tier_targets.erase(ptiter); + } } } } else if (opt_cmd == OPT::ZONEGROUP_PLACEMENT_DEFAULT) { diff --git a/src/rgw/rgw_json_enc.cc b/src/rgw/rgw_json_enc.cc index c054c12bb3c9e..df2f37c5e3a51 100644 --- a/src/rgw/rgw_json_enc.cc +++ b/src/rgw/rgw_json_enc.cc @@ -103,6 +103,7 @@ void RGWObjTier::dump(Formatter *f) const { f->dump_string("name", name); f->dump_object("tier_placement", tier_placement); + f->dump_bool("is_multipart_upload", is_multipart_upload); } void rgw_bucket_placement::dump(Formatter *f) const diff --git a/src/rgw/rgw_lc.cc b/src/rgw/rgw_lc.cc index 578e445335433..225b62667b29a 100644 --- a/src/rgw/rgw_lc.cc +++ b/src/rgw/rgw_lc.cc @@ -1291,29 +1291,40 @@ public: RGWRados::Object op_target(tier_ctx.store->getRados(), tier_ctx.bucket_info, tier_ctx.rctx, tier_ctx.obj); + real_time read_mtime; RGWRados::Object::Read read_op(&op_target); read_op.params.attrs = &attrs; + read_op.params.lastmod = &read_mtime; int r = read_op.prepare(null_yield); if (r < 0) { return r; } + if (read_mtime != tier_ctx.o.meta.mtime) { + /* raced */ + return -ECANCELED; + } + tier_ctx.rctx.set_atomic(tier_ctx.obj); RGWRados::Object::Write obj_op(&op_target); RGWObjState *s = tier_ctx.rctx.get_state(tier_ctx.obj); obj_op.meta.modify_tail = true; + obj_op.meta.flags = PUT_OBJ_CREATE; obj_op.meta.category = RGWObjCategory::CloudTiered; obj_op.meta.delete_at = real_time(); - obj_op.meta.data = NULL; + bufferlist blo; + blo.append(""); + obj_op.meta.data = &blo; obj_op.meta.if_match = NULL; obj_op.meta.if_nomatch = NULL; obj_op.meta.user_data = NULL; obj_op.meta.zones_trace = NULL; + obj_op.meta.delete_at = real_time(); RGWObjManifest *pmanifest; @@ -1321,6 +1332,7 @@ public: RGWObjTier tier_config; tier_config.name = oc.tier.storage_class; tier_config.tier_placement = oc.tier; + tier_config.is_multipart_upload = tier_ctx.is_multipart_upload; pmanifest->set_tier_type("cloud"); pmanifest->set_tier_config(tier_config); @@ -1336,7 +1348,9 @@ public: /* should the obj_size also be set to '0' or is it needed * to keep track of original size before transition. * But unless obj_size is set to '0', obj_iters cannot - * be reset I guess + * be reset I guess. For regular transitioned objects + * obj_size remains the same even when object is moved to other + * storage class. So maybe better to keep it the same way. */ //pmanifest->set_obj_size(0); @@ -1347,6 +1361,8 @@ public: bl.append(oc.tier.storage_class); attrs[RGW_ATTR_STORAGE_CLASS] = bl; + attrs.erase(RGW_ATTR_ID_TAG); + attrs.erase(RGW_ATTR_TAIL_TAG); obj_op.write_meta(tier_ctx.o.meta.size, 0, attrs, null_yield); if (r < 0) { @@ -1391,7 +1407,19 @@ public: tier_ctx.multipart_sync_threshold = oc.tier.multipart_sync_threshold; tier_ctx.storage_class = oc.tier.storage_class; - ret = crs.run(new RGWLCCloudTierCR(tier_ctx)); + bool al_tiered = false; + ret = crs.run(new RGWLCCloudCheckCR(tier_ctx, &al_tiered)); + + if (ret < 0) { + ldpp_dout(oc.dpp, 0) << "XXXXXXXXXXXXXX failed in RGWCloudCheckCR() ret=" << ret << dendl; + } + + if (!al_tiered) { + ldout(tier_ctx.cct, 0) << "XXXXXXXXXXXXXX lc.cc is_already_tiered false" << dendl; + ret = crs.run(new RGWLCCloudTierCR(tier_ctx)); + } else { + ldout(tier_ctx.cct, 0) << "XXXXXXXXXXXXXX lc.cc is_already_tiered true" << dendl; + } http_manager.stop(); if (ret < 0) { diff --git a/src/rgw/rgw_lc_tier.cc b/src/rgw/rgw_lc_tier.cc index bb9c64516654a..b8af3e6ab2e68 100644 --- a/src/rgw/rgw_lc_tier.cc +++ b/src/rgw/rgw_lc_tier.cc @@ -96,6 +96,207 @@ static void init_headers(map& attrs, } } +static int do_decode_rest_obj(CephContext *cct, map& attrs, map& headers, rgw_rest_obj *info) +{ + for (auto header : headers) { + const string& val = header.second; + if (header.first == "RGWX_OBJECT_SIZE") { + info->content_len = atoi(val.c_str()); + } else { + info->attrs[header.first] = val; + } + } + + info->acls.set_ctx(cct); + auto aiter = attrs.find(RGW_ATTR_ACL); + if (aiter != attrs.end()) { + bufferlist& bl = aiter->second; + auto bliter = bl.cbegin(); + try { + info->acls.decode(bliter); + } catch (buffer::error& err) { + ldout(cct, 0) << "ERROR: failed to decode policy off attrs" << dendl; + return -EIO; + } + } else { + ldout(cct, 0) << "WARNING: acl attrs not provided" << dendl; + } + + return 0; +} + +class RGWLCStreamGetCRF : public RGWStreamReadHTTPResourceCRF +{ + RGWRESTConn::get_obj_params req_params; + + CephContext *cct; + RGWHTTPManager *http_manager; + rgw_lc_obj_properties obj_properties; + std::shared_ptr conn; + rgw::sal::RGWObject* dest_obj; + string etag; + RGWRESTStreamRWRequest *in_req; + map headers; + +public: + RGWLCStreamGetCRF(CephContext *_cct, + RGWCoroutinesEnv *_env, + RGWCoroutine *_caller, + RGWHTTPManager *_http_manager, + const rgw_lc_obj_properties& _obj_properties, + std::shared_ptr _conn, + rgw::sal::RGWObject* _dest_obj) : + RGWStreamReadHTTPResourceCRF(_cct, _env, _caller, _http_manager, _dest_obj->get_key()), + cct(_cct), http_manager(_http_manager), obj_properties(_obj_properties), conn(_conn), dest_obj(_dest_obj) { + } + + + + int init() override { + /* init input connection */ + + req_params.get_op = false; /* Need only headers */ +// req_params.skip_decrypt = false; + req_params.prepend_metadata = true; + req_params.rgwx_stat = true; + req_params.sync_manifest = true; + req_params.skip_decrypt = true; + +// req_params.unmod_ptr = &src_properties.mtime; +// req_params.etag = src_properties.etag; +// req_params.mod_zone_id = src_properties.zone_short_id; +// req_params.mod_pg_ver = src_properties.pg_ver; + +// if (range.is_set) { +// req_params.range_is_set = true; +// req_params.range_start = range.ofs; +// req_params.range_end = range.ofs + range.size - 1; +// } + + int ret = conn->get_obj(dest_obj, req_params, false /* send */, &in_req); + if (ret < 0) { + ldout(cct, 0) << "ERROR: " << __func__ << "(): conn->get_obj() returned ret=" << ret << dendl; + return ret; + } + + set_req(in_req); + + return RGWStreamReadHTTPResourceCRF::init(); + } + + int init2() { + /* init input connection */ + + req_params.get_op = false; /* Need only headers */ +// req_params.skip_decrypt = false; + req_params.prepend_metadata = true; + req_params.rgwx_stat = true; + req_params.sync_manifest = true; + req_params.skip_decrypt = true; + +// req_params.unmod_ptr = &src_properties.mtime; +// req_params.etag = src_properties.etag; +// req_params.mod_zone_id = src_properties.zone_short_id; +// req_params.mod_pg_ver = src_properties.pg_ver; + +// if (range.is_set) { +// req_params.range_is_set = true; +// req_params.range_start = range.ofs; +// req_params.range_end = range.ofs + range.size - 1; +// } + + string etag; + real_time set_mtime; + + int ret = conn->get_obj(dest_obj, req_params, true /* send */, &in_req); + if (ret < 0) { + ldout(cct, 0) << "ERROR: " << __func__ << "(): conn->get_obj() returned ret=" << ret << dendl; + return ret; + } + + ret = conn->complete_request(in_req, nullptr, nullptr, + nullptr, nullptr, &headers); + if (ret < 0 && ret != -ENOENT) { + ldout(cct, 0) << "ERROR: " << __func__ << "(): XXXXXXXXXXXX conn->complete_request() returned ret=" << ret << dendl; + return ret; + } + // set_req(in_req); + + // return RGWStreamReadHTTPResourceCRF::init(); + return 0; + } + + int decode_rest_obj(map& headers, bufferlist& extra_data) override { + map src_attrs; + + ldout(cct, 20) << __func__ << ":" << " headers=" << headers << " extra_data.length()=" << extra_data.length() << dendl; + + if (extra_data.length() > 0) { + JSONParser jp; + if (!jp.parse(extra_data.c_str(), extra_data.length())) { + ldout(cct, 0) << "ERROR: failed to parse response extra data. len=" << extra_data.length() << " data=" << extra_data.c_str() << dendl; + return -EIO; + } + + JSONDecoder::decode_json("attrs", src_attrs, &jp); + } + return do_decode_rest_obj(cct, src_attrs, headers, &rest_obj); + } + + void handle_headers(const map& _headers) { + headers = _headers; + } + + int is_already_tiered() { + char buf[32]; + /*rgw_rest_obj rest_obj; + rest_obj.init(dest_obj->get_key()); + + + if (do_decode_rest_obj(cct, attrs, headers, &rest_obj)) { + ldout(sc->cct, 0) << "ERROR: failed to decode rest obj out of headers=" << headers << ", attrs=" << attrs << dendl; + return set_cr_error(-EINVAL); + } + + for (auto header : headers) { + const string& val = header.second; + if (header.first == "RGWX_OBJECT_SIZE") { + info->content_len = atoi(val.c_str()); + } else { + info->attrs[header.first] = val; + } + }*/ + + map attrs = headers; +// req->get_out_headers(&attrs); + // get_attrs(&attrs); + + for (auto a : attrs) { + ldout(cct, 0) << "XXXXXXXXXXXXXX GetCrf attr[" << a.first << "] = " << a.second <init failed, ret = " << ret << dendl; + return set_cr_error(ret); + } +//reenter(this) { + bl.clear(); +/* do { +// yield { + ret = get_crf->get_headers(&need_retry); + if (ret < 0) { + ldout(tier_ctx.cct, 0) << "XXXXXXXXXXXXXX get_crf->read failed, ret = " << ret << dendl; + return set_cr_error(ret); +// } + } + if (retcode < 0) { + ldout(cct, 20) << __func__ << ": in_crf->read() retcode=" << retcode << dendl; + return set_cr_error(ret); + } + } while (need_retry); */ + + if ((static_cast(get_crf.get()))->is_already_tiered()) { + *already_tiered = true; + ldout(tier_ctx.cct, 0) << "XXXXXXXXXXXXXX is_already_tiered true" << dendl; + return set_cr_done(); + } + + ldout(tier_ctx.cct, 0) << "XXXXXXXXXXXXXX is_already_tiered false..going with out_crf writing" << dendl; + + return set_cr_done(); +// } //reenter + + return 0; +} + map , utime_t> target_buckets; int RGWLCCloudTierCR::operate() { @@ -1035,6 +1301,7 @@ int RGWLCCloudTierCR::operate() { if (size < multipart_sync_threshold) { call (new RGWLCStreamObjToCloudPlainCR(tier_ctx)); } else { + tier_ctx.is_multipart_upload = true; call(new RGWLCStreamObjToCloudMultipartCR(tier_ctx)); } diff --git a/src/rgw/rgw_lc_tier.h b/src/rgw/rgw_lc_tier.h index 070920c7c8299..0e6477f9512ab 100644 --- a/src/rgw/rgw_lc_tier.h +++ b/src/rgw/rgw_lc_tier.h @@ -37,6 +37,8 @@ struct RGWLCCloudTierCtx { uint64_t multipart_min_part_size; uint64_t multipart_sync_threshold; + bool is_multipart_upload{false}; + RGWLCCloudTierCtx(CephContext* _cct, rgw_bucket_dir_entry& _o, rgw::sal::RGWRadosStore* _store, RGWBucketInfo &_binfo, rgw_obj _obj, RGWObjectCtx& _rctx, std::shared_ptr _conn, string _bucket, @@ -65,6 +67,20 @@ class RGWLCCloudTierCR : public RGWCoroutine { int operate() override; }; +class RGWLCCloudCheckCR : public RGWCoroutine { + RGWLCCloudTierCtx& tier_ctx; + bufferlist bl; + bool need_retry{false}; + int retcode; + bool *already_tiered; + + public: + RGWLCCloudCheckCR(RGWLCCloudTierCtx& _tier_ctx, bool *_al_ti) : + RGWCoroutine(_tier_ctx.cct), tier_ctx(_tier_ctx), already_tiered(_al_ti) {} + + int operate() override; +}; + struct rgw_lc_multipart_part_info { int part_num{0}; uint64_t ofs{0}; diff --git a/src/rgw/rgw_obj_manifest.h b/src/rgw/rgw_obj_manifest.h index 9cb7fec8216b7..b5a31b5f7be9b 100644 --- a/src/rgw/rgw_obj_manifest.h +++ b/src/rgw/rgw_obj_manifest.h @@ -151,7 +151,8 @@ WRITE_CLASS_ENCODER(RGWObjManifestRule) struct RGWObjTier { string name; RGWZoneGroupPlacementTier tier_placement; - /* XXX: Add multipart upload details */ + bool is_multipart_upload{false}; + /* XXX: Add any multipart upload details */ RGWObjTier(): name("none") {} @@ -159,6 +160,7 @@ struct RGWObjTier { ENCODE_START(2, 2, bl); encode(name, bl); encode(tier_placement, bl); + encode(is_multipart_upload, bl); ENCODE_FINISH(bl); } @@ -166,6 +168,7 @@ struct RGWObjTier { DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, bl); decode(name, bl); decode(tier_placement, bl); + decode(is_multipart_upload, bl); DECODE_FINISH(bl); } void dump(Formatter *f) const; @@ -466,6 +469,7 @@ public: tier_config.name = t.name; tier_config.tier_placement = t.tier_placement; + tier_config.is_multipart_upload = t.is_multipart_upload; } void get_tier_config(RGWObjTier* t) { @@ -474,6 +478,7 @@ public: t->name = tier_config.name; t->tier_placement = tier_config.tier_placement; + t->is_multipart_upload = tier_config.is_multipart_upload; } class obj_iterator { -- 2.39.5