From c687d01d1b79493ca8b9eede03be61f76e996c02 Mon Sep 17 00:00:00 2001 From: Soumya Koduri Date: Wed, 23 Dec 2020 11:14:53 +0530 Subject: [PATCH] rgw/CloudTransition: Tier objects to remote cloud If the storage class configured is of cloud, transition the objects to remote endpoint configured. In case the object size is >mulitpart size limit (say 5M), upload the object into multiparts. As part of transition, map rgw attributes to http attrs, including ACLs. A new attribute (x-amz-meta-source: rgw) is added to denote that the object is transitioned from RGW source. Added two new options to tier-config to configure multipart size - * multipart_sync_threshold - determines the limit of object size, when exceeded transitioned in multiparts * multipart_min_part_size - the minimum size of the multipart upload part Default values for both the options is 32M and minimum value supported is 5M. Signed-off-by: Soumya Koduri --- src/rgw/CMakeLists.txt | 1 + src/rgw/rgw_cr_rest.cc | 102 ++++ src/rgw/rgw_cr_rest.h | 57 +++ src/rgw/rgw_json_enc.cc | 4 + src/rgw/rgw_lc.cc | 120 ++++- src/rgw/rgw_lc_tier.cc | 1046 +++++++++++++++++++++++++++++++++++++++ src/rgw/rgw_lc_tier.h | 169 +++++++ src/rgw/rgw_zone.cc | 36 ++ src/rgw/rgw_zone.h | 8 + 9 files changed, 1525 insertions(+), 18 deletions(-) create mode 100644 src/rgw/rgw_lc_tier.cc create mode 100644 src/rgw/rgw_lc_tier.h diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index 5acbd17a2dd07..0f1ddd9f5bcee 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -72,6 +72,7 @@ set(librgw_common_srcs rgw_ldap.cc rgw_lc.cc rgw_lc_s3.cc + rgw_lc_tier.cc rgw_metadata.cc rgw_multi.cc rgw_multi_del.cc diff --git a/src/rgw/rgw_cr_rest.cc b/src/rgw/rgw_cr_rest.cc index 0bd169f99e7d3..2a20db0acba08 100644 --- a/src/rgw/rgw_cr_rest.cc +++ b/src/rgw/rgw_cr_rest.cc @@ -349,3 +349,105 @@ int RGWStreamSpliceCR::operate(const DoutPrefixProvider *dpp) { return 0; } +RGWStreamReadCRF::RGWStreamReadCRF(RGWRados* rados, RGWBucketInfo& bucket_info, + RGWObjectCtx& obj_ctx, rgw_obj& obj) : + op_target(rados, bucket_info, obj_ctx, obj), read_op(&op_target) {} +RGWStreamReadCRF::~RGWStreamReadCRF() {} + +RGWStreamWriteCR::RGWStreamWriteCR(CephContext *_cct, RGWHTTPManager *_mgr, + shared_ptr& _in_crf, + shared_ptr& _out_crf) : RGWCoroutine(_cct), cct(_cct), http_manager(_mgr), + in_crf(_in_crf), out_crf(_out_crf) {} +RGWStreamWriteCR::~RGWStreamWriteCR() { } + +int RGWStreamWriteCR::operate() { + off_t ofs; + off_t end; + int ret; + uint64_t read_len = 0; + rgw_rest_obj rest_obj; + + reenter(this) { + ret = in_crf->init(); + if (ret < 0) { + ldout(cct, 0) << "ERROR: fail to initialize in_crf, ret = " << ret << dendl; + return set_cr_error(ret); + } + in_crf->get_range(ofs, end); + rest_obj = in_crf->get_rest_obj(); + + do { + bl.clear(); + ret = in_crf->read(ofs, end, bl); + if (ret < 0) { + ldout(cct, 0) << "ERROR: fail to read object data, ret = " << ret << dendl; + return set_cr_error(ret); + } + read_len = ret; + + if (retcode < 0) { + ldout(cct, 20) << __func__ << ": read_op.read() retcode=" << retcode << dendl; + return set_cr_error(ret); + } + + if (bl.length() == 0) { + break; + } + + if (!sent_attrs) { + ret = out_crf->init(); + if (ret < 0) { + ldout(cct, 0) << "ERROR: fail to initialize out_crf, ret = " << ret << dendl; + return set_cr_error(ret); + } + + out_crf->send_ready(rest_obj); + ret = out_crf->send(); + if (ret < 0) { + return set_cr_error(ret); + } + sent_attrs = true; + } + + total_read += bl.length(); + + do { + /* Cant do yield here as read_op doesnt work well with yield and results + * in deadlock. + */ + ret = out_crf->write(bl, &need_retry); + if (ret < 0) { + return set_cr_error(ret); + } + + if (retcode < 0) { + ldout(cct, 20) << __func__ << ": out_crf->write() retcode=" << retcode << dendl; + return set_cr_error(ret); + } + } while (need_retry); + + ofs += read_len; + + } while (ofs <= end); + + do { + /* Ensure out_crf is initialized */ + if (!sent_attrs) { + break; + } + + /* This has to be under yield. Otherwise sometimes this loop + * never finishes, infinitely waiting for req to be done. + */ + yield { + int ret = out_crf->drain_writes(&need_retry); + if (ret < 0) { + return set_cr_error(ret); + } + } + } while (need_retry); + + return set_cr_done(); + } + return 0; +} diff --git a/src/rgw/rgw_cr_rest.h b/src/rgw/rgw_cr_rest.h index cb103aeb83455..46d050c5cd763 100644 --- a/src/rgw/rgw_cr_rest.h +++ b/src/rgw/rgw_cr_rest.h @@ -588,3 +588,60 @@ public: int operate(const DoutPrefixProvider *dpp) override; }; + +class RGWStreamReadCRF { +public: + RGWRados::Object op_target; + RGWRados::Object::Read read_op; + off_t ofs; + off_t end; + rgw_rest_obj rest_obj; + + RGWStreamReadCRF(RGWRados* rados, RGWBucketInfo& bucket_info, + RGWObjectCtx& obj_ctx, rgw_obj& obj); + virtual ~RGWStreamReadCRF(); + + virtual int init() {return 0; } + virtual int init_rest_obj() {return 0;} + + int set_range(off_t _ofs, off_t _end) { + ofs = _ofs; + end = _end; + + return 0; + } + + int get_range(off_t &_ofs, off_t &_end) { + _ofs = ofs; + _end = end; + + return 0; + } + + rgw_rest_obj get_rest_obj() { + return rest_obj; + } + + virtual int read(off_t ofs, off_t end, bufferlist &bl) {return 0;}; + +}; + +class RGWStreamWriteCR : public RGWCoroutine { + CephContext *cct; + RGWHTTPManager *http_manager; + string url; + std::shared_ptr in_crf; + std::shared_ptr out_crf; + bufferlist bl; + bool need_retry{false}; + bool sent_attrs{false}; + uint64_t total_read{0}; + int ret{0}; +public: + RGWStreamWriteCR(CephContext *_cct, RGWHTTPManager *_mgr, + std::shared_ptr& _in_crf, + std::shared_ptr& _out_crf); + ~RGWStreamWriteCR(); + + int operate() override; +}; diff --git a/src/rgw/rgw_json_enc.cc b/src/rgw/rgw_json_enc.cc index 9f8ab1626c26c..879f0f9164505 100644 --- a/src/rgw/rgw_json_enc.cc +++ b/src/rgw/rgw_json_enc.cc @@ -1449,6 +1449,8 @@ void RGWZoneGroupPlacementTier::dump(Formatter *f) const encode_json("tier_storage_class", tier_storage_class, f); encode_json("target_path", target_path, f); encode_json("acl_mappings", acl_mappings, f); + encode_json("multipart_sync_threshold", multipart_sync_threshold, f); + encode_json("multipart_min_part_size", multipart_min_part_size, f); } void RGWZoneGroupPlacementTier::decode_json(JSONObj *obj) @@ -1468,6 +1470,8 @@ void RGWZoneGroupPlacementTier::decode_json(JSONObj *obj) JSONDecoder::decode_json("tier_storage_class", tier_storage_class, obj); JSONDecoder::decode_json("target_path", target_path, obj); JSONDecoder::decode_json("acl_mappings", acl_mappings, obj); + JSONDecoder::decode_json("multipart_sync_threshold", multipart_sync_threshold, obj); + JSONDecoder::decode_json("multipart_min_part_size", multipart_min_part_size, obj); } void RGWZoneGroupPlacementTarget::dump(Formatter *f) const diff --git a/src/rgw/rgw_lc.cc b/src/rgw/rgw_lc.cc index bd6d997ed9e1d..83f1ae4c65683 100644 --- a/src/rgw/rgw_lc.cc +++ b/src/rgw/rgw_lc.cc @@ -28,6 +28,7 @@ #include "rgw_multi.h" #include "rgw_sal.h" #include "rgw_rados.h" +#include "rgw_lc_tier.h" // this seems safe to use, at least for now--arguably, we should // prefer header-only fmt, in general @@ -541,6 +542,8 @@ struct lc_op_ctx { const DoutPrefixProvider *dpp; WorkQ* wq; + RGWZoneGroupPlacementTier tier = {}; + lc_op_ctx(op_env& env, rgw_bucket_dir_entry& o, boost::optional next_key_name, ceph::real_time effective_mtime, @@ -1248,32 +1251,113 @@ public: return need_to_process; } + /* find out if the the storage class is remote cloud */ + int get_tier_target(const RGWZoneGroup &zonegroup, rgw_placement_rule& rule, + string& storage_class, RGWZoneGroupPlacementTier &tier) { + std::map::const_iterator titer; + titer = zonegroup.placement_targets.find(rule.name); + if (titer == zonegroup.placement_targets.end()) { + return -1; + } + + if (storage_class.empty()) { + storage_class = rule.storage_class; + } + + const auto& target_rule = titer->second; + std::map::const_iterator ttier; + ttier = target_rule.tier_targets.find(storage_class); + if (ttier != target_rule.tier_targets.end()) { + tier = ttier->second; + } + + return 0; + } + int transition_obj_to_cloud(lc_op_ctx& oc) { + std::shared_ptr conn; + + /* init */ + string id = "cloudid"; + string endpoint=oc.tier.endpoint; + RGWAccessKey key = oc.tier.key; + HostStyle host_style = oc.tier.host_style; + string bucket_name = oc.tier.target_path; + + if (bucket_name.empty()) { + bucket_name = "cloud-bucket"; + } + + conn.reset(new S3RESTConn(oc.cct, oc.store->svc()->zone, + id, { endpoint }, key, host_style)); + + /* http_mngr */ + RGWCoroutinesManager crs(oc.store->ctx(), oc.store->getRados()->get_cr_registry()); + RGWHTTPManager http_manager(oc.store->ctx(), crs.get_completion_mgr()); + + int ret = http_manager.start(); + if (ret < 0) { + ldpp_dout(oc.dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl; + return ret; + } + + RGWLCCloudTierCtx tier_ctx(oc.cct, oc.o, oc.store, oc.bucket_info, + oc.obj, oc.rctx, conn, bucket_name, oc.tier.tier_storage_class, + &http_manager); + tier_ctx.acl_mappings = oc.tier.acl_mappings; + tier_ctx.multipart_min_part_size = oc.tier.multipart_min_part_size; + tier_ctx.multipart_sync_threshold = oc.tier.multipart_sync_threshold; + + ret = crs.run(new RGWLCCloudTierCR(tier_ctx)); + http_manager.stop(); + + if (ret < 0) { + ldpp_dout(oc.dpp, 0) << "failed in RGWCloudTierCR() ret=" << ret << dendl; + return ret; + } + + return 0; + } + int process(lc_op_ctx& oc) { auto& o = oc.o; + int r; + std::string tier_type = ""; + const RGWZoneGroup& zonegroup = oc.store->svc()->zone->get_zonegroup(); rgw_placement_rule target_placement; target_placement.inherit_from(oc.bucket->get_placement_rule()); target_placement.storage_class = transition.storage_class; - if (!oc.store->get_zone()->get_params(). - valid_placement(target_placement)) { - ldpp_dout(oc.dpp, 0) << "ERROR: non existent dest placement: " - << target_placement - << " bucket="<< oc.bucket - << " rule_id=" << oc.op.id - << " " << oc.wq->thr_name() << dendl; - return -EINVAL; - } + r = get_tier_target(zonegroup, target_placement, target_placement.storage_class, oc.tier); - int r = oc.obj->transition(oc.rctx, oc.bucket, target_placement, o.meta.mtime, - o.versioned_epoch, oc.dpp, null_yield); - if (r < 0) { - ldpp_dout(oc.dpp, 0) << "ERROR: failed to transition obj " - << oc.bucket << ":" << o.key - << " -> " << transition.storage_class - << " " << cpp_strerror(r) - << " " << oc.wq->thr_name() << dendl; - return r; + if (!r && oc.tier.tier_type == "cloud") { + ldpp_dout(oc.dpp, 0) << "Found cloud tier: " << target_placement.storage_class << dendl; + r = transition_obj_to_cloud(oc); + if (r < 0) { + ldpp_dout(oc.dpp, 0) << "ERROR: failed to transition obj to cloud (r=" << r << ")" + << dendl; + } + } else { + if (!oc.store->get_zone()->get_params(). + valid_placement(target_placement)) { + ldpp_dout(oc.dpp, 0) << "ERROR: non existent dest placement: " + << target_placement + << " bucket="<< oc.bucket + << " rule_id=" << oc.op.id + << " " << oc.wq->thr_name() << dendl; + return -EINVAL; + } + + int r = oc.obj->transition(oc.rctx, oc.bucket, target_placement, o.meta.mtime, + o.versioned_epoch, oc.dpp, null_yield); + if (r < 0) { + ldpp_dout(oc.dpp, 0) << "ERROR: failed to transition obj " + << oc.bucket << ":" << o.key + << " -> " << transition.storage_class + << " " << cpp_strerror(r) + << " " << oc.wq->thr_name() << dendl; + return r; + } } ldpp_dout(oc.dpp, 2) << "TRANSITIONED:" << oc.bucket << ":" << o.key << " -> " diff --git a/src/rgw/rgw_lc_tier.cc b/src/rgw/rgw_lc_tier.cc new file mode 100644 index 0000000000000..e1582c4eaf283 --- /dev/null +++ b/src/rgw/rgw_lc_tier.cc @@ -0,0 +1,1046 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#include +#include +#include + +#include "common/Formatter.h" +#include +#include "rgw_lc.h" +#include "rgw_lc_tier.h" +#include "rgw_string.h" +#include "rgw_zone.h" +#include "rgw_common.h" +#include "rgw_rest.h" + +#include +#include +#include + +#include + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_rgw + +using namespace std; + +static string get_key_oid(const rgw_obj_key& key) +{ + string oid = key.name; + if (!key.instance.empty() && + !key.have_null_instance()) { + oid += string(":") + key.instance; + } + return oid; +} + +static string obj_to_aws_path(const rgw_obj& obj) +{ + string path = obj.bucket.name + "/" + get_key_oid(obj.key); + return path; +} + +static std::set keep_headers = { "CONTENT_TYPE", + "CONTENT_ENCODING", + "CONTENT_DISPOSITION", + "CONTENT_LANGUAGE" }; + +/* + * mapping between rgw object attrs and output http fields + * + static const struct rgw_http_attr base_rgw_to_http_attrs[] = { + { RGW_ATTR_CONTENT_LANG, "Content-Language" }, + { RGW_ATTR_EXPIRES, "Expires" }, + { RGW_ATTR_CACHE_CONTROL, "Cache-Control" }, + { RGW_ATTR_CONTENT_DISP, "Content-Disposition" }, + { RGW_ATTR_CONTENT_ENC, "Content-Encoding" }, + { RGW_ATTR_USER_MANIFEST, "X-Object-Manifest" }, + { RGW_ATTR_X_ROBOTS_TAG , "X-Robots-Tag" }, + { RGW_ATTR_STORAGE_CLASS , "X-Amz-Storage-Class" }, +// RGW_ATTR_AMZ_WEBSITE_REDIRECT_LOCATION header depends on access mode: +// S3 endpoint: x-amz-website-redirect-location +// S3Website endpoint: Location +{ RGW_ATTR_AMZ_WEBSITE_REDIRECT_LOCATION, "x-amz-website-redirect-location" }, +}; */ + +static void init_headers(map& attrs, + map& headers) +{ + for (auto kv : attrs) { + const char * name = kv.first.c_str(); + const auto aiter = rgw_to_http_attrs.find(name); + + if (aiter != std::end(rgw_to_http_attrs)) { + headers[aiter->second] = rgw_bl_str(kv.second); + } else if (strcmp(name, RGW_ATTR_SLO_UINDICATOR) == 0) { + // this attr has an extra length prefix from encode() in prior versions + headers["X-Object-Meta-Static-Large-Object"] = "True"; + } else if (strncmp(name, RGW_ATTR_META_PREFIX, + sizeof(RGW_ATTR_META_PREFIX)-1) == 0) { + name += sizeof(RGW_ATTR_META_PREFIX) - 1; + string sname(name); + string name_prefix = "X-Object-Meta-"; + char full_name_buf[name_prefix.size() + sname.size() + 1]; + snprintf(full_name_buf, sizeof(full_name_buf), "%.*s%.*s", + static_cast(name_prefix.length()), + name_prefix.data(), + static_cast(sname.length()), + sname.data()); + headers[full_name_buf] = rgw_bl_str(kv.second); + } else if (strcmp(name,RGW_ATTR_CONTENT_TYPE) == 0) { + /* Verify if its right way to copy this field */ + headers["CONTENT_TYPE"] = rgw_bl_str(kv.second); + } + } +} + +class RGWLCStreamReadCRF : public RGWStreamReadCRF +{ + CephContext *cct; + map attrs; + uint64_t obj_size; + rgw_obj& obj; + const real_time &mtime; + + bool multipart; + uint64_t m_part_size; + off_t m_part_off; + off_t m_part_end; + + public: + RGWLCStreamReadCRF(CephContext *_cct, RGWRados* rados, RGWBucketInfo& bucket_info, + RGWObjectCtx& obj_ctx, rgw_obj& _obj, const real_time &_mtime) : RGWStreamReadCRF(rados, bucket_info, obj_ctx, _obj), cct(_cct), obj(_obj), mtime(_mtime) {} + + ~RGWLCStreamReadCRF() {}; + + void set_multipart(uint64_t part_size, off_t part_off, off_t part_end) { + multipart = true; + m_part_size = part_size; + m_part_off = part_off; + m_part_end = part_end; + } + + int init() override { + optional_yield y = null_yield; + real_time read_mtime; + + read_op.params.attrs = &attrs; + read_op.params.lastmod = &read_mtime; + read_op.params.obj_size = &obj_size; + + int ret = read_op.prepare(y); + if (ret < 0) { + ldout(cct, 0) << "ERROR: fail to prepare read_op, ret = " << ret << dendl; + return ret; + } + + if (read_mtime != mtime) { + /* raced */ + return -ECANCELED; + } + + ret = init_rest_obj(); + if (ret < 0) { + ldout(cct, 0) << "ERROR: fail to initialize rest_obj, ret = " << ret << dendl; + return ret; + } + + if (!multipart) { + set_range(0, obj_size - 1); + } else { + set_range(m_part_off, m_part_end); + } + return 0; + } + + int init_rest_obj() override { + /* Initialize rgw_rest_obj. + * Reference: do_decode_rest_obj + * Check how to copy headers content */ + rest_obj.init(obj.key); + + if (!multipart) { + rest_obj.content_len = obj_size; + } else { + rest_obj.content_len = m_part_size; + } + + /* For mulitpart attrs are sent as prt of InitMultipartCR itself */ + if (multipart) { + return 0; + } + /* + * XXX: verify if its right way to copy attrs into + * rest obj + */ + init_headers(attrs, rest_obj.attrs); + + rest_obj.acls.set_ctx(cct); + auto aiter = attrs.find(RGW_ATTR_ACL); + if (aiter != attrs.end()) { + bufferlist& bl = aiter->second; + auto bliter = bl.cbegin(); + try { + rest_obj.acls.decode(bliter); + } catch (buffer::error& err) { + ldout(cct, 0) << "ERROR: failed to decode policy off attrs" << dendl; + return -EIO; + } + } else { + ldout(cct, 0) << "WARNING: acl attrs not provided" << dendl; + } + return 0; + } + + int read(off_t ofs, off_t end, bufferlist &bl) { + optional_yield y = null_yield; + + return read_op.read(ofs, end, bl, y); + } +}; + + +class RGWLCStreamPutCRF : public RGWStreamWriteHTTPResourceCRF +{ + CephContext *cct; + RGWHTTPManager *http_manager; + rgw_lc_obj_properties obj_properties; + std::shared_ptr conn; + rgw::sal::RGWObject* dest_obj; + string etag; + + public: + RGWLCStreamPutCRF(CephContext *_cct, + RGWCoroutinesEnv *_env, + RGWCoroutine *_caller, + RGWHTTPManager *_http_manager, + const rgw_lc_obj_properties& _obj_properties, + std::shared_ptr _conn, + rgw::sal::RGWObject* _dest_obj) : + RGWStreamWriteHTTPResourceCRF(_cct, _env, _caller, _http_manager), + cct(_cct), http_manager(_http_manager), obj_properties(_obj_properties), conn(_conn), dest_obj(_dest_obj) { + } + + + int init() override { + /* init output connection */ + RGWRESTStreamS3PutObj *out_req{nullptr}; + + if (multipart.is_multipart) { + char buf[32]; + snprintf(buf, sizeof(buf), "%d", multipart.part_num); + rgw_http_param_pair params[] = { { "uploadId", multipart.upload_id.c_str() }, + { "partNumber", buf }, + { nullptr, nullptr } }; + conn->put_obj_send_init(dest_obj, params, &out_req); + } else { + conn->put_obj_send_init(dest_obj, nullptr, &out_req); + } + + set_req(out_req); + + return RGWStreamWriteHTTPResourceCRF::init(); + } + + static bool keep_attr(const string& h) { + return (keep_headers.find(h) != keep_headers.end() || + boost::algorithm::starts_with(h, "X_AMZ_")); + } + + static void init_send_attrs(CephContext *cct, + const rgw_rest_obj& rest_obj, + const rgw_lc_obj_properties& obj_properties, + map *attrs) { + + map& acl_mappings(obj_properties.target_acl_mappings); + string target_storage_class = obj_properties.target_storage_class; + + auto& new_attrs = *attrs; + + new_attrs.clear(); + + for (auto& hi : rest_obj.attrs) { + if (keep_attr(hi.first)) { + new_attrs.insert(hi); + } + } + + auto acl = rest_obj.acls.get_acl(); + + map > access_map; + + if (!acl_mappings.empty()) { + for (auto& grant : acl.get_grant_map()) { + auto& orig_grantee = grant.first; + auto& perm = grant.second; + + string grantee; + + const auto& am = acl_mappings; + + auto iter = am.find(orig_grantee); + if (iter == am.end()) { + ldout(cct, 20) << "acl_mappings: Could not find " << orig_grantee << " .. ignoring" << dendl; + continue; + } + + grantee = iter->second.dest_id; + + string type; + + switch (iter->second.type) { + case ACL_TYPE_CANON_USER: + type = "id"; + break; + case ACL_TYPE_EMAIL_USER: + type = "emailAddress"; + break; + case ACL_TYPE_GROUP: + type = "uri"; + break; + default: + continue; + } + + string tv = type + "=" + grantee; + + int flags = perm.get_permission().get_permissions(); + if ((flags & RGW_PERM_FULL_CONTROL) == RGW_PERM_FULL_CONTROL) { + access_map[flags].push_back(tv); + continue; + } + + for (int i = 1; i <= RGW_PERM_WRITE_ACP; i <<= 1) { + if (flags & i) { + access_map[i].push_back(tv); + } + } + } + } + + for (auto aiter : access_map) { + int grant_type = aiter.first; + + string header_str("x-amz-grant-"); + + switch (grant_type) { + case RGW_PERM_READ: + header_str.append("read"); + break; + case RGW_PERM_WRITE: + header_str.append("write"); + break; + case RGW_PERM_READ_ACP: + header_str.append("read-acp"); + break; + case RGW_PERM_WRITE_ACP: + header_str.append("write-acp"); + break; + case RGW_PERM_FULL_CONTROL: + header_str.append("full-control"); + break; + } + + string s; + + for (auto viter : aiter.second) { + if (!s.empty()) { + s.append(", "); + } + s.append(viter); + } + + ldout(cct, 20) << "acl_mappings: set acl: " << header_str << "=" << s << dendl; + + new_attrs[header_str] = s; + } + + /* Copy target storage class */ + if (!target_storage_class.empty()) { + new_attrs["x-amz-storage-class"] = target_storage_class; + } else { + new_attrs["x-amz-storage-class"] = "STANDARD"; + } + + /* New attribute to specify its transitioned from RGW */ + new_attrs["x-amz-meta-rgwx-source"] = "rgw"; + + char buf[32]; + snprintf(buf, sizeof(buf), "%llu", (long long)obj_properties.versioned_epoch); + new_attrs["x-amz-meta-rgwx-versioned-epoch"] = buf; + + utime_t ut(obj_properties.mtime); + snprintf(buf, sizeof(buf), "%lld.%09lld", + (long long)ut.sec(), + (long long)ut.nsec()); + + new_attrs["x-amz-meta-rgwx-source-mtime"] = buf; + new_attrs["x-amz-meta-rgwx-source-etag"] = obj_properties.etag; + new_attrs["x-amz-meta-rgwx-source-key"] = rest_obj.key.name; + if (!rest_obj.key.instance.empty()) { + new_attrs["x-amz-meta-rgwx-source-version-id"] = rest_obj.key.instance; + } + } + + void send_ready(const rgw_rest_obj& rest_obj) override { + RGWRESTStreamS3PutObj *r = static_cast(req); + + map new_attrs; + if (!multipart.is_multipart) { + init_send_attrs(cct, rest_obj, obj_properties, &new_attrs); + } + + r->set_send_length(rest_obj.content_len); + + RGWAccessControlPolicy policy; + + r->send_ready(conn->get_key(), new_attrs, policy, false); + } + + void handle_headers(const map& headers) { + for (auto h : headers) { + if (h.first == "ETAG") { + etag = h.second; + } + } + } + + bool get_etag(string *petag) { + if (etag.empty()) { + return false; + } + *petag = etag; + return true; + } +}; + + + +class RGWLCStreamObjToCloudPlainCR : public RGWCoroutine { + RGWLCCloudTierCtx& tier_ctx; + + std::shared_ptr in_crf; + std::shared_ptr out_crf; + + public: + RGWLCStreamObjToCloudPlainCR(RGWLCCloudTierCtx& _tier_ctx) + : RGWCoroutine(_tier_ctx.cct), tier_ctx(_tier_ctx) {} + + int operate() override { + rgw_lc_obj_properties obj_properties(tier_ctx.o.meta.mtime, + tier_ctx.o.meta.etag, + tier_ctx.o.versioned_epoch, + tier_ctx.acl_mappings, + tier_ctx.target_storage_class); + + rgw_bucket target_bucket; + string target_obj_name; + + target_bucket.name = tier_ctx.target_bucket_name; + target_obj_name = tier_ctx.obj.key.name; // cross check with aws module + + std::shared_ptr dest_bucket; + dest_bucket.reset(new rgw::sal::RGWRadosBucket(tier_ctx.store, target_bucket)); + + std::shared_ptr dest_obj; + dest_obj.reset(new rgw::sal::RGWRadosObject(tier_ctx.store, rgw_obj_key(target_obj_name), (rgw::sal::RGWRadosBucket *)(dest_bucket.get()))); + + + reenter(this) { + /* Prepare Read from source */ + in_crf.reset(new RGWLCStreamReadCRF(tier_ctx.cct, tier_ctx.store->getRados(), tier_ctx.bucket_info, tier_ctx.rctx, tier_ctx.obj, tier_ctx.o.meta.mtime)); + + out_crf.reset(new RGWLCStreamPutCRF((CephContext *)(tier_ctx.cct), get_env(), this, + (RGWHTTPManager*)(tier_ctx.http_manager), + obj_properties, tier_ctx.conn, static_cast(dest_obj.get()))); + + /* actual Read & Write */ + yield call(new RGWStreamWriteCR(cct, (RGWHTTPManager*)(tier_ctx.http_manager), in_crf, out_crf)); + if (retcode < 0) { + return set_cr_error(retcode); + } + + return set_cr_done(); + } + + return 0; + } +}; + +class RGWLCStreamObjToCloudMultipartPartCR : public RGWCoroutine { + RGWLCCloudTierCtx& tier_ctx; + + string upload_id; + + rgw_lc_multipart_part_info part_info; + + string *petag; + std::shared_ptr in_crf; + std::shared_ptr out_crf; + + public: + RGWLCStreamObjToCloudMultipartPartCR(RGWLCCloudTierCtx& _tier_ctx, + const string& _upload_id, + const rgw_lc_multipart_part_info& _part_info, + string *_petag) + : RGWCoroutine(_tier_ctx.cct), tier_ctx(_tier_ctx), + upload_id(_upload_id), + part_info(_part_info), + petag(_petag) {} + + int operate() override { + rgw_lc_obj_properties obj_properties(tier_ctx.o.meta.mtime, + tier_ctx.o.meta.etag, + tier_ctx.o.versioned_epoch, + tier_ctx.acl_mappings, + tier_ctx.target_storage_class); + rgw_bucket target_bucket; + string target_obj_name; + off_t end; + + target_bucket.name = tier_ctx.target_bucket_name; + target_obj_name = tier_ctx.obj.key.name; // cross check with aws module + + std::shared_ptr dest_bucket; + dest_bucket.reset(new rgw::sal::RGWRadosBucket(tier_ctx.store, target_bucket)); + + std::shared_ptr dest_obj; + dest_obj.reset(new rgw::sal::RGWRadosObject(tier_ctx.store, rgw_obj_key(target_obj_name), (rgw::sal::RGWRadosBucket *)(dest_bucket.get()))); + + reenter(this) { + /* Prepare Read from source */ + in_crf.reset(new RGWLCStreamReadCRF(tier_ctx.cct, tier_ctx.store->getRados(), tier_ctx.bucket_info, tier_ctx.rctx, tier_ctx.obj, tier_ctx.o.meta.mtime)); + + end = part_info.ofs + part_info.size - 1; + std::static_pointer_cast(in_crf)->set_multipart(part_info.size, part_info.ofs, end); + + /* Prepare write */ + out_crf.reset(new RGWLCStreamPutCRF((CephContext *)(tier_ctx.cct), get_env(), this, + (RGWHTTPManager*)(tier_ctx.http_manager), + obj_properties, tier_ctx.conn, static_cast(dest_obj.get()))); + + out_crf->set_multipart(upload_id, part_info.part_num, part_info.size); + + /* actual Read & Write */ + yield call(new RGWStreamWriteCR(cct, (RGWHTTPManager*)(tier_ctx.http_manager), in_crf, out_crf)); + if (retcode < 0) { + return set_cr_error(retcode); + } + + if (!(static_cast(out_crf.get()))->get_etag(petag)) { + ldout(tier_ctx.cct, 0) << "ERROR: failed to get etag from PUT request" << dendl; + return set_cr_error(-EIO); + } + + return set_cr_done(); + } + + return 0; + } +}; + +class RGWLCAbortMultipartCR : public RGWCoroutine { + CephContext *cct; + RGWHTTPManager *http_manager; + RGWRESTConn *dest_conn; + rgw_obj dest_obj; + + string upload_id; + + public: + RGWLCAbortMultipartCR(CephContext *_cct, + RGWHTTPManager *_http_manager, + RGWRESTConn *_dest_conn, + const rgw_obj& _dest_obj, + const string& _upload_id) : RGWCoroutine(_cct), + cct(_cct), http_manager(_http_manager), + dest_conn(_dest_conn), + dest_obj(_dest_obj), + upload_id(_upload_id) {} + + int operate() override { + reenter(this) { + + yield { + rgw_http_param_pair params[] = { { "uploadId", upload_id.c_str() }, {nullptr, nullptr} }; + bufferlist bl; + call(new RGWDeleteRESTResourceCR(cct, dest_conn, http_manager, + obj_to_aws_path(dest_obj), params)); + } + + if (retcode < 0) { + ldout(cct, 0) << "ERROR: failed to abort multipart upload for dest object=" << dest_obj << " (retcode=" << retcode << ")" << dendl; + return set_cr_error(retcode); + } + + return set_cr_done(); + } + + return 0; + } +}; + +class RGWLCInitMultipartCR : public RGWCoroutine { + CephContext *cct; + RGWHTTPManager *http_manager; + RGWRESTConn *dest_conn; + rgw_obj dest_obj; + + uint64_t obj_size; + map attrs; + + bufferlist out_bl; + + string *upload_id; + + struct InitMultipartResult { + string bucket; + string key; + string upload_id; + + void decode_xml(XMLObj *obj) { + RGWXMLDecoder::decode_xml("Bucket", bucket, obj); + RGWXMLDecoder::decode_xml("Key", key, obj); + RGWXMLDecoder::decode_xml("UploadId", upload_id, obj); + } + } result; + + public: + RGWLCInitMultipartCR(CephContext *_cct, + RGWHTTPManager *_http_manager, + RGWRESTConn *_dest_conn, + const rgw_obj& _dest_obj, + uint64_t _obj_size, + const map& _attrs, + string *_upload_id) : RGWCoroutine(_cct), + cct(_cct), + http_manager(_http_manager), + dest_conn(_dest_conn), + dest_obj(_dest_obj), + obj_size(_obj_size), + attrs(_attrs), + upload_id(_upload_id) {} + + int operate() override { + reenter(this) { + + yield { + rgw_http_param_pair params[] = { { "uploads", nullptr }, {nullptr, nullptr} }; + bufferlist bl; + call(new RGWPostRawRESTResourceCR (cct, dest_conn, http_manager, + obj_to_aws_path(dest_obj), params, &attrs, bl, &out_bl)); + } + + if (retcode < 0) { + ldout(cct, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj << dendl; + return set_cr_error(retcode); + } + { + /* + * If one of the following fails we cannot abort upload, as we cannot + * extract the upload id. If one of these fail it's very likely that that's + * the least of our problem. + */ + RGWXMLDecoder::XMLParser parser; + if (!parser.init()) { + ldout(cct, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl; + return set_cr_error(-EIO); + } + + if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) { + string str(out_bl.c_str(), out_bl.length()); + ldout(cct, 5) << "ERROR: failed to parse xml: " << str << dendl; + return set_cr_error(-EIO); + } + + try { + RGWXMLDecoder::decode_xml("InitiateMultipartUploadResult", result, &parser, true); + } catch (RGWXMLDecoder::err& err) { + string str(out_bl.c_str(), out_bl.length()); + ldout(cct, 5) << "ERROR: unexpected xml: " << str << dendl; + return set_cr_error(-EIO); + } + } + + ldout(cct, 20) << "init multipart result: bucket=" << result.bucket << " key=" << result.key << " upload_id=" << result.upload_id << dendl; + + *upload_id = result.upload_id; + + return set_cr_done(); + } + + return 0; + } +}; + +class RGWLCCompleteMultipartCR : public RGWCoroutine { + CephContext *cct; + RGWHTTPManager *http_manager; + RGWRESTConn *dest_conn; + rgw_obj dest_obj; + + bufferlist out_bl; + + string upload_id; + + struct CompleteMultipartReq { + map parts; + + explicit CompleteMultipartReq(const map& _parts) : parts(_parts) {} + + void dump_xml(Formatter *f) const { + for (auto p : parts) { + f->open_object_section("Part"); + encode_xml("PartNumber", p.first, f); + encode_xml("ETag", p.second.etag, f); + f->close_section(); + }; + } + } req_enc; + + struct CompleteMultipartResult { + string location; + string bucket; + string key; + string etag; + + void decode_xml(XMLObj *obj) { + RGWXMLDecoder::decode_xml("Location", bucket, obj); + RGWXMLDecoder::decode_xml("Bucket", bucket, obj); + RGWXMLDecoder::decode_xml("Key", key, obj); + RGWXMLDecoder::decode_xml("ETag", etag, obj); + } + } result; + + public: + RGWLCCompleteMultipartCR(CephContext *_cct, + RGWHTTPManager *_http_manager, + RGWRESTConn *_dest_conn, + const rgw_obj& _dest_obj, + string _upload_id, + const map& _parts) : RGWCoroutine(_cct), + cct(_cct), http_manager(_http_manager), + dest_conn(_dest_conn), + dest_obj(_dest_obj), + upload_id(_upload_id), + req_enc(_parts) {} + + int operate() override { + reenter(this) { + + yield { + rgw_http_param_pair params[] = { { "uploadId", upload_id.c_str() }, {nullptr, nullptr} }; + stringstream ss; + XMLFormatter formatter; + + encode_xml("CompleteMultipartUpload", req_enc, &formatter); + + formatter.flush(ss); + + bufferlist bl; + bl.append(ss.str()); + + call(new RGWPostRawRESTResourceCR (cct, dest_conn, http_manager, + obj_to_aws_path(dest_obj), params, nullptr, bl, &out_bl)); + } + + if (retcode < 0) { + ldout(cct, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj << dendl; + return set_cr_error(retcode); + } + { + /* + * If one of the following fails we cannot abort upload, as we cannot + * extract the upload id. If one of these fail it's very likely that that's + * the least of our problem. + */ + RGWXMLDecoder::XMLParser parser; + if (!parser.init()) { + ldout(cct, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl; + return set_cr_error(-EIO); + } + + if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) { + string str(out_bl.c_str(), out_bl.length()); + ldout(cct, 5) << "ERROR: failed to parse xml: " << str << dendl; + return set_cr_error(-EIO); + } + + try { + RGWXMLDecoder::decode_xml("CompleteMultipartUploadResult", result, &parser, true); + } catch (RGWXMLDecoder::err& err) { + string str(out_bl.c_str(), out_bl.length()); + ldout(cct, 5) << "ERROR: unexpected xml: " << str << dendl; + return set_cr_error(-EIO); + } + } + + ldout(cct, 20) << "complete multipart result: location=" << result.location << " bucket=" << result.bucket << " key=" << result.key << " etag=" << result.etag << dendl; + + return set_cr_done(); + } + + return 0; + } +}; + + +class RGWLCStreamAbortMultipartUploadCR : public RGWCoroutine { + CephContext *cct; + RGWHTTPManager *http_manager; + RGWRESTConn *dest_conn; + const rgw_obj dest_obj; + const rgw_raw_obj status_obj; + + string upload_id; + + public: + + RGWLCStreamAbortMultipartUploadCR(CephContext *_cct, + RGWHTTPManager *_http_manager, + RGWRESTConn *_dest_conn, + const rgw_obj& _dest_obj, + const rgw_raw_obj& _status_obj, + const string& _upload_id) : RGWCoroutine(_cct), cct(_cct), http_manager(_http_manager), + dest_conn(_dest_conn), + dest_obj(_dest_obj), + status_obj(_status_obj), + upload_id(_upload_id) {} + + int operate() override { + reenter(this) { + yield call(new RGWLCAbortMultipartCR(cct, http_manager, dest_conn, dest_obj, upload_id)); + if (retcode < 0) { + ldout(cct, 0) << "ERROR: failed to abort multipart upload dest obj=" << dest_obj << " upload_id=" << upload_id << " retcode=" << retcode << dendl; + /* ignore error, best effort */ + } +#ifdef TODO_STATUS_OBJ + yield call(new RGWRadosRemoveCR(tier_ctx.store, status_obj)); + if (retcode < 0) { + ldout(tier_ctx.cct, 0) << "ERROR: failed to remove sync status obj obj=" << status_obj << " retcode=" << retcode << dendl; + /* ignore error, best effort */ + } +#endif + return set_cr_done(); + } + + return 0; + } +}; + +class RGWLCStreamObjToCloudMultipartCR : public RGWCoroutine { + RGWLCCloudTierCtx& tier_ctx; + RGWRESTConn *source_conn; + rgw_obj src_obj; + rgw_obj dest_obj; + + uint64_t obj_size; + string src_etag; + rgw_rest_obj rest_obj; + + rgw_lc_multipart_upload_info status; + + map new_attrs; + + rgw_lc_multipart_part_info *pcur_part_info{nullptr}; + + int ret_err{0}; + + rgw_raw_obj status_obj; + + public: + RGWLCStreamObjToCloudMultipartCR(RGWLCCloudTierCtx& _tier_ctx) : RGWCoroutine(_tier_ctx.cct), tier_ctx(_tier_ctx) {} + + int operate() override { + rgw_lc_obj_properties obj_properties(tier_ctx.o.meta.mtime, + tier_ctx.o.meta.etag, + tier_ctx.o.versioned_epoch, + tier_ctx.acl_mappings, + tier_ctx.target_storage_class); + bool init_multipart{false}; + + rgw_obj& obj = tier_ctx.obj; + obj_size = tier_ctx.o.meta.size; + + rgw_bucket target_bucket; + target_bucket.name = tier_ctx.target_bucket_name; + string target_obj_name = obj.key.name; // cross check with aws module + rgw_obj dest_obj(target_bucket, target_obj_name); + std::shared_ptr in_crf; + rgw_rest_obj rest_obj; + + reenter(this) { +#ifdef TODO_STATUS_OBJ + yield call(new RGWSimpleRadosReadCR(tier_ctx.async_rados, tier_ctx.store->svc()->sysobj, + status_obj, &status, false)); + + if (retcode < 0 && retcode != -ENOENT) { + ldout(tier_ctx.cct, 0) << "ERROR: failed to read sync status of object " << src_obj << " retcode=" << retcode << dendl; + return retcode; + } + + if (retcode >= 0) { + /* check here that mtime and size did not change */ + if (status.src_properties.mtime != src_properties.mtime || status.obj_size != obj_size || + status.src_properties.etag != src_properties.etag) { + yield call(new RGWLCStreamAbortMultipartUploadCR( tier_ctx.cct, tier_ctx.http_manager, tier_ctx.conn.get(), dest_obj, status_obj, status.upload_id)); + retcode = -ENOENT; + } + } + + if (retcode == -ENOENT) { + } +#endif + if (!init_multipart) { + in_crf.reset(new RGWLCStreamReadCRF(tier_ctx.cct, tier_ctx.store->getRados(), tier_ctx.bucket_info, tier_ctx.rctx, tier_ctx.obj, tier_ctx.o.meta.mtime)); + + in_crf->init(); + + rest_obj = in_crf->get_rest_obj(); + + RGWLCStreamPutCRF::init_send_attrs(tier_ctx.cct, rest_obj, obj_properties, &new_attrs); + + yield call(new RGWLCInitMultipartCR(tier_ctx.cct, tier_ctx.http_manager, tier_ctx.conn.get(), dest_obj, obj_size, std::move(new_attrs), &status.upload_id)); + if (retcode < 0) { + return set_cr_error(retcode); + } + + init_multipart = true; + status.obj_size = obj_size; +#define MULTIPART_MAX_PARTS 10000 + uint64_t min_part_size = obj_size / MULTIPART_MAX_PARTS; + uint64_t min_conf_size = tier_ctx.multipart_min_part_size; + + if (min_conf_size < MULTIPART_MIN_POSSIBLE_PART_SIZE) { + min_conf_size = MULTIPART_MIN_POSSIBLE_PART_SIZE; + } + + status.part_size = std::max(min_conf_size, min_part_size); + status.num_parts = (obj_size + status.part_size - 1) / status.part_size; + status.cur_part = 1; + } + + for (; (uint32_t)status.cur_part <= status.num_parts; ++status.cur_part) { + ldout(tier_ctx.cct, 20) << "status.cur_part = "<(sync_env->async_rados, sync_env->store->svc()->sysobj, status_obj, status)); + if (retcode < 0) { + ldout(tier_ctx.cct, 0) << "ERROR: failed to store multipart upload state, retcode=" << retcode << dendl; + /* continue with upload anyway */ + } +#endif + ldout(tier_ctx.cct, 0) << "sync of object=" << tier_ctx.obj << " via multipart upload, finished sending part #" << status.cur_part << " etag=" << pcur_part_info->etag << dendl; + } + + yield call(new RGWLCCompleteMultipartCR(tier_ctx.cct, tier_ctx.http_manager, tier_ctx.conn.get(), dest_obj, status.upload_id, status.parts)); + if (retcode < 0) { + ldout(tier_ctx.cct, 0) << "ERROR: failed to complete multipart upload of obj=" << tier_ctx.obj << " (error: " << cpp_strerror(-retcode) << ")" << dendl; + ret_err = retcode; + yield call(new RGWLCStreamAbortMultipartUploadCR(tier_ctx.cct, tier_ctx.http_manager, tier_ctx.conn.get(), dest_obj, status_obj, status.upload_id)); + return set_cr_error(ret_err); + } + +#ifdef TODO_STATUS_OBJ + /* remove status obj */ + yield call(new RGWRadosRemoveCR(tier_ctx.store, status_obj)); + if (retcode < 0) { + ldout(tier_ctx.cct, 0) << "ERROR: failed to abort multipart upload obj=" << tier_ctx.obj << " upload_id=" << status.upload_id << " part number " << status.cur_part << " (" << cpp_strerror(-retcode) << ")" << dendl; + /* ignore error, best effort */ + } +#endif + return set_cr_done(); + } + return 0; + } +}; + +int RGWLCCloudTierCR::operate() { + reenter(this) { + + yield { + // xxx: find if bucket is already created + ldout(tier_ctx.cct,0) << "Cloud_tier_ctx: creating bucket " << tier_ctx.target_bucket_name << dendl; + bufferlist bl; + call(new RGWPutRawRESTResourceCR (tier_ctx.cct, tier_ctx.conn.get(), + tier_ctx.http_manager, + tier_ctx.target_bucket_name, nullptr, bl, &out_bl)); + } + if (retcode < 0 ) { + RGWXMLDecoder::XMLParser parser; + if (!parser.init()) { + ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize xml parser for parsing create_bucket response from server" << dendl; + return set_cr_error(retcode); + } + + if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) { + string str(out_bl.c_str(), out_bl.length()); + ldout(tier_ctx.cct, 5) << "ERROR: failed to parse xml: " << str << dendl; + return set_cr_error(retcode); + } + + try { + RGWXMLDecoder::decode_xml("Error", result, &parser, true); + } catch (RGWXMLDecoder::err& err) { + string str(out_bl.c_str(), out_bl.length()); + ldout(tier_ctx.cct, 5) << "ERROR: unexpected xml: " << str << dendl; + return set_cr_error(retcode); + } + + if ((result.code != "BucketAlreadyOwnedByYou") && + (result.code != "BucketAlreadyExists")) { + return set_cr_error(retcode); + } + } + + bucket_created = true; + + yield { + uint64_t size = tier_ctx.o.meta.size; + uint64_t multipart_sync_threshold = tier_ctx.multipart_sync_threshold; + + if (multipart_sync_threshold < MULTIPART_MIN_POSSIBLE_PART_SIZE) { + multipart_sync_threshold = MULTIPART_MIN_POSSIBLE_PART_SIZE; + } + + if (size < multipart_sync_threshold) { + call (new RGWLCStreamObjToCloudPlainCR(tier_ctx)); + } else { + call(new RGWLCStreamObjToCloudMultipartCR(tier_ctx)); + + } + } + + if (retcode < 0) { + return set_cr_error(retcode); + } + + return set_cr_done(); + } //reenter + + return 0; +} + diff --git a/src/rgw/rgw_lc_tier.h b/src/rgw/rgw_lc_tier.h new file mode 100644 index 0000000000000..0a4da62f6e1fb --- /dev/null +++ b/src/rgw/rgw_lc_tier.h @@ -0,0 +1,169 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#ifndef CEPH_RGW_LC_TIER_H +#define CEPH_RGW_LC_TIER_H + +#include "rgw_lc.h" +#include "rgw_cr_rados.h" +#include "rgw_rest_conn.h" +#include "rgw_cr_rest.h" +#include "rgw_coroutine.h" +#include "rgw_rados.h" +#include "rgw_zone.h" + +#define DEFAULT_MULTIPART_SYNC_PART_SIZE (32 * 1024 * 1024) +#define MULTIPART_MIN_POSSIBLE_PART_SIZE (5 * 1024 * 1024) + +struct RGWLCCloudTierCtx { + CephContext *cct; + + /* Source */ + rgw_bucket_dir_entry& o; + rgw::sal::RGWRadosStore *store; + RGWBucketInfo& bucket_info; + + rgw_obj obj; + RGWObjectCtx& rctx; + + /* Remote */ + std::shared_ptr conn; + string target_bucket_name; + string target_storage_class; + RGWHTTPManager *http_manager; + + map acl_mappings; + uint64_t multipart_min_part_size; + uint64_t multipart_sync_threshold; + + RGWLCCloudTierCtx(CephContext* _cct, rgw_bucket_dir_entry& _o, + rgw::sal::RGWRadosStore* _store, RGWBucketInfo &_binfo, rgw_obj _obj, + RGWObjectCtx& _rctx, std::shared_ptr _conn, string _bucket, + string _storage_class, RGWHTTPManager *_http) + : cct(_cct), o(_o), store(_store), bucket_info(_binfo), + obj(_obj), rctx(_rctx), conn(_conn), target_bucket_name(_bucket), + target_storage_class(_storage_class), http_manager(_http) {} +}; + +class RGWLCCloudTierCR : public RGWCoroutine { + RGWLCCloudTierCtx& tier_ctx; + bufferlist out_bl; + int retcode; + bool bucket_created = false; + struct CreateBucketResult { + string code; + + void decode_xml(XMLObj *obj) { + RGWXMLDecoder::decode_xml("Code", code, obj); + } + } result; + + public: + RGWLCCloudTierCR(RGWLCCloudTierCtx& _tier_ctx) : + RGWCoroutine(_tier_ctx.cct), tier_ctx(_tier_ctx) {} + + int operate() override; +}; + +struct rgw_lc_multipart_part_info { + int part_num{0}; + uint64_t ofs{0}; + uint64_t size{0}; + string etag; + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(part_num, bl); + encode(ofs, bl); + encode(size, bl); + encode(etag, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(part_num, bl); + decode(ofs, bl); + decode(size, bl); + decode(etag, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(rgw_lc_multipart_part_info) + +struct rgw_lc_obj_properties { + ceph::real_time mtime; + string etag; + uint64_t versioned_epoch{0}; + map& target_acl_mappings; + string target_storage_class; + + rgw_lc_obj_properties(ceph::real_time _mtime, string _etag, + uint64_t _versioned_epoch, map& _t_acl_mappings, + string _t_storage_class) : + mtime(_mtime), etag(_etag), + versioned_epoch(_versioned_epoch), + target_acl_mappings(_t_acl_mappings), + target_storage_class(_t_storage_class) {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(mtime, bl); + encode(etag, bl); + encode(versioned_epoch, bl); + encode(target_acl_mappings, bl); + encode(target_storage_class, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(mtime, bl); + decode(etag, bl); + decode(versioned_epoch, bl); + decode(target_acl_mappings, bl); + decode(target_storage_class, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(rgw_lc_obj_properties) + +struct rgw_lc_multipart_upload_info { + string upload_id; + uint64_t obj_size; + uint32_t part_size{0}; + uint32_t num_parts{0}; + + int cur_part{0}; + uint64_t cur_ofs{0}; + + std::map parts; + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(upload_id, bl); + encode(obj_size, bl); + encode(part_size, bl); + encode(num_parts, bl); + encode(cur_part, bl); + encode(cur_ofs, bl); + encode(parts, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(upload_id, bl); + decode(obj_size, bl); + decode(part_size, bl); + decode(num_parts, bl); + decode(cur_part, bl); + decode(cur_ofs, bl); + decode(parts, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(rgw_lc_multipart_upload_info) + +#endif diff --git a/src/rgw/rgw_zone.cc b/src/rgw/rgw_zone.cc index 1d013ae5a97c0..578b4f81bf3f2 100644 --- a/src/rgw/rgw_zone.cc +++ b/src/rgw/rgw_zone.cc @@ -2081,8 +2081,23 @@ void RGWZoneGroupMap::decode(bufferlist::const_iterator& bl) { } } +static int conf_to_uint64(const JSONFormattable& config, const string& key, uint64_t *pval) +{ + string sval; + if (config.find(key, &sval)) { + string err; + uint64_t val = strict_strtoll(sval.c_str(), 10, &err); + if (!err.empty()) { + return -EINVAL; + } + *pval = val; + } + return 0; +} + int RGWZoneGroupPlacementTier::update_params(const JSONFormattable& config) { + int r = -1; if (config.exists("endpoint")) { endpoint = config["endpoint"]; @@ -2108,6 +2123,21 @@ int RGWZoneGroupPlacementTier::update_params(const JSONFormattable& config) if (config.exists("secret")) { key.key = config["secret"]; } + + if (config.exists("multipart_sync_threshold")) { + r = conf_to_uint64(config, "multipart_sync_threshold", &multipart_sync_threshold); + if (r < 0) { + multipart_sync_threshold = DEFAULT_MULTIPART_SYNC_PART_SIZE; + } + } + + if (config.exists("multipart_min_part_size")) { + r = conf_to_uint64(config, "multipart_min_part_size", &multipart_min_part_size); + if (r < 0) { + multipart_min_part_size = DEFAULT_MULTIPART_SYNC_PART_SIZE; + } + } + if (config.exists("acls")) { const JSONFormattable& cc = config["acls"]; if (cc.is_array()) { @@ -2149,6 +2179,12 @@ int RGWZoneGroupPlacementTier::clear_params(const JSONFormattable& config) if (config.exists("secret")) { key.key.clear(); } + if (config.exists("multipart_sync_threshold")) { + multipart_sync_threshold = DEFAULT_MULTIPART_SYNC_PART_SIZE; + } + if (config.exists("multipart_min_part_size")) { + multipart_min_part_size = DEFAULT_MULTIPART_SYNC_PART_SIZE; + } if (config.exists("acls")) { const JSONFormattable& cc = config["acls"]; if (cc.is_array()) { diff --git a/src/rgw/rgw_zone.h b/src/rgw/rgw_zone.h index c145ca1b8423b..886e63ed93b7b 100644 --- a/src/rgw/rgw_zone.h +++ b/src/rgw/rgw_zone.h @@ -748,6 +748,7 @@ struct RGWTierACLMapping { WRITE_CLASS_ENCODER(RGWTierACLMapping) struct RGWZoneGroupPlacementTier { +#define DEFAULT_MULTIPART_SYNC_PART_SIZE (32 * 1024 * 1024) std::string storage_class; std::string tier_type; std::string endpoint; @@ -759,6 +760,9 @@ struct RGWZoneGroupPlacementTier { string target_path; map acl_mappings; + uint64_t multipart_sync_threshold{DEFAULT_MULTIPART_SYNC_PART_SIZE}; + uint64_t multipart_min_part_size{DEFAULT_MULTIPART_SYNC_PART_SIZE}; + int update_params(const JSONFormattable& config); int clear_params(const JSONFormattable& config); @@ -773,6 +777,8 @@ struct RGWZoneGroupPlacementTier { encode(tier_storage_class, bl); encode(target_path, bl); encode(acl_mappings, bl); + encode(multipart_sync_threshold, bl); + encode(multipart_min_part_size, bl); ENCODE_FINISH(bl); } @@ -792,6 +798,8 @@ struct RGWZoneGroupPlacementTier { decode(tier_storage_class, bl); decode(target_path, bl); decode(acl_mappings, bl); + decode(multipart_sync_threshold, bl); + decode(multipart_min_part_size, bl); DECODE_FINISH(bl); } void dump(Formatter *f) const; -- 2.39.5