From aa362e889933ba1418071c18e92120883600db3e Mon Sep 17 00:00:00 2001 From: Soumya Koduri Date: Mon, 22 Mar 2021 16:14:12 +0530 Subject: [PATCH] rgw/CloudTransition: Include aws region name for remote endpoint With commit#81ad226, aws auth v4 rquires region name for remote endpoint connection. Include the same in the tier parameters. & misc fixes Signed-off-by: Soumya Koduri --- doc/radosgw/cloud-transition.rst | 22 +- doc/radosgw/placement.rst | 2 + src/rgw/rgw_acl.cc | 30 --- src/rgw/rgw_acl.h | 3 - src/rgw/rgw_cr_rest.cc | 35 ++- src/rgw/rgw_cr_rest.h | 16 +- src/rgw/rgw_json_enc.cc | 2 + src/rgw/rgw_lc.cc | 134 ++++++++-- src/rgw/rgw_lc_tier.cc | 404 ++++++++++++++----------------- src/rgw/rgw_lc_tier.h | 113 ++++++--- src/rgw/rgw_obj_manifest.h | 21 +- src/rgw/rgw_zone.cc | 8 +- src/rgw/rgw_zone.h | 38 +-- 13 files changed, 452 insertions(+), 376 deletions(-) diff --git a/doc/radosgw/cloud-transition.rst b/doc/radosgw/cloud-transition.rst index 85f983579846a..b144fa4820e48 100644 --- a/doc/radosgw/cloud-transition.rst +++ b/doc/radosgw/cloud-transition.rst @@ -2,9 +2,9 @@ Cloud Transition ================ -This feature enables data transition to a remote cloud service as part of `Lifecycle Configuration `__ via `Storage Classes `__. The transition is unidirectional; data cannot be transitioned back from the remote zone. The goal of this feature is to enable data transition to multiple cloud providers. The currently supported cloud providers are those that are compatible with AWS (S3). +This feature enables data transition to a remote cloud service as part of `Lifecycle Configuration `__ via :ref:`storage_classes`. The transition is unidirectional; data cannot be transitioned back from the remote zone. The goal of this feature is to enable data transition to multiple cloud providers. The currently supported cloud providers are those that are compatible with AWS (S3). -Special storage class of tier type ``cloud-s3`` is used to configure the remote cloud S3 object store service to which the data needs to be transitioned to. These are defined in terms of zonegroup placement targets and unlike regular storage classes, do not need a data pool. Any additions or modifications need period commit to get reflected. +Special storage class of tier type ``cloud-s3`` is used to configure the remote cloud S3 object store service to which the data needs to be transitioned. These are defined in terms of zonegroup placement targets and unlike regular storage classes, do not need a data pool. User credentials for the remote cloud object store service need to be configured. Note that source ACLs will not be preserved. It is possible to map permissions of specific source users to specific destination users. @@ -19,6 +19,7 @@ Cloud Storage Class Configuration "access_key": , "secret": , "endpoint": , + "region": , "host_style": , "acls": [ { "type": , "source_id": , @@ -46,6 +47,10 @@ The secret key for the remote cloud S3 service. URL of remote cloud S3 service endpoint. +* ``region`` (string) + +The remote cloud S3 service region name. + * ``host_style`` (path | virtual) Type of host style to be used when accessing remote cloud S3 endpoint (default: ``path``). @@ -115,9 +120,14 @@ Minimum parts size to use when transitioning objects using multipart upload. How to Configure ~~~~~~~~~~~~~~~~ -See `Adding a Storage Class `__ for how to configure storage-class for a zonegroup. The cloud transition requires a creation of a special storage class with tier type defined as ``cloud-s3`` +See :ref:`adding_a_storage_class` for how to configure storage-class for a zonegroup. The cloud transition requires a creation of a special storage class with tier type defined as ``cloud-s3`` -Note: Once a storage class is created of ``--tier-type=cloud-s3``, it cannot be later modified to any other storage class type. +.. note:: If you have not done any previous `Multisite Configuration`_, + a ``default`` zone and zonegroup are created for you, and changes + to the zone/zonegroup will not take effect until the Ceph Object + Gateways are restarted. If you have created a realm for multisite, + the zone/zonegroup changes will take effect once the changes are + committed with ``radosgw-admin period update --commit``. :: @@ -169,6 +179,8 @@ For example: ] +.. note:: Once a storage class is created of ``--tier-type=cloud-s3``, it cannot be later modified to any other storage class type. + The tier configuration can be then done using the following command :: @@ -340,3 +352,5 @@ Future Work * Federation between RGW and Cloud services. * Support transition to other cloud provideres (like Azure). + +.. _`Multisite Configuration`: ../multisite diff --git a/doc/radosgw/placement.rst b/doc/radosgw/placement.rst index c255cccb3553f..595fde2947040 100644 --- a/doc/radosgw/placement.rst +++ b/doc/radosgw/placement.rst @@ -123,6 +123,8 @@ Then provide the zone placement info for that target: --index-pool default.rgw.temporary.index \ --data-extra-pool default.rgw.temporary.non-ec +.. _adding_a_storage_class: + Adding a Storage Class ---------------------- diff --git a/src/rgw/rgw_acl.cc b/src/rgw/rgw_acl.cc index 86a7a96fd70a7..413b88a9b76d9 100644 --- a/src/rgw/rgw_acl.cc +++ b/src/rgw/rgw_acl.cc @@ -76,36 +76,6 @@ bool operator!=(const RGWAccessControlPolicy& lhs, return !(lhs == rhs); } -string get_acl_type_str (const ACLGranteeTypeEnum& type) { - string s; - switch (type) { - case ACL_TYPE_EMAIL_USER: - s = "email"; - break; - case ACL_TYPE_GROUP: - s = "uri"; - break; - default: - s = "id"; - break; - } - - return s; -} - -ACLGranteeTypeEnum get_acl_type (const string& t) { - ACLGranteeTypeEnum type; - if (t == "email") { - type = ACL_TYPE_EMAIL_USER; - } else if (t == "uri") { - type = ACL_TYPE_GROUP; - } else { - type = ACL_TYPE_CANON_USER; - } - - return type; -} - void RGWAccessControlList::_add_grant(ACLGrant *grant) { ACLPermission& perm = grant->get_permission(); diff --git a/src/rgw/rgw_acl.h b/src/rgw/rgw_acl.h index b5d9676906bb1..6fc16c7800326 100644 --- a/src/rgw/rgw_acl.h +++ b/src/rgw/rgw_acl.h @@ -46,9 +46,6 @@ enum ACLGroupTypeEnum { ACL_GROUP_AUTHENTICATED_USERS = 2, }; -string get_acl_type_str (const ACLGranteeTypeEnum& type); -ACLGranteeTypeEnum get_acl_type (const string& t); - class ACLPermission { protected: diff --git a/src/rgw/rgw_cr_rest.cc b/src/rgw/rgw_cr_rest.cc index 56688e3e6404b..c270067b2b3eb 100644 --- a/src/rgw/rgw_cr_rest.cc +++ b/src/rgw/rgw_cr_rest.cc @@ -349,23 +349,17 @@ int RGWStreamSpliceCR::operate(const DoutPrefixProvider *dpp) { return 0; } -RGWStreamReadCRF::RGWStreamReadCRF(std::unique_ptr* obj, +RGWStreamReadCRF::RGWStreamReadCRF(std::unique_ptr* obj, RGWObjectCtx& obj_ctx) : read_op((*obj)->get_read_op(&obj_ctx)) {} RGWStreamReadCRF::~RGWStreamReadCRF() {} -RGWStreamWriteCR::RGWStreamWriteCR(CephContext *_cct, RGWHTTPManager *_mgr, +RGWStreamWriteCR::RGWStreamWriteCR(CephContext* _cct, RGWHTTPManager* _mgr, shared_ptr& _in_crf, shared_ptr& _out_crf) : RGWCoroutine(_cct), cct(_cct), http_manager(_mgr), in_crf(_in_crf), out_crf(_out_crf) {} RGWStreamWriteCR::~RGWStreamWriteCR() { } -int RGWStreamWriteCR::operate() { - off_t ofs; - off_t end; - int ret; - uint64_t read_len = 0; - rgw_rest_obj rest_obj; - +int RGWStreamWriteCR::operate(const DoutPrefixProvider* dpp) { reenter(this) { ret = in_crf->init(); if (ret < 0) { @@ -377,10 +371,12 @@ int RGWStreamWriteCR::operate() { do { bl.clear(); - ret = in_crf->read(ofs, end, bl); - if (ret < 0) { - ldout(cct, 0) << "ERROR: fail to read object data, ret = " << ret << dendl; - return set_cr_error(ret); + yield { + ret = in_crf->read(ofs, end, bl); + if (ret < 0) { + ldout(cct, 0) << "ERROR: fail to read object data, ret = " << ret << dendl; + return set_cr_error(ret); + } } if (retcode < 0) { ldout(cct, 20) << __func__ << ": read_op.read() retcode=" << retcode << dendl; @@ -400,7 +396,7 @@ int RGWStreamWriteCR::operate() { return set_cr_error(ret); } - out_crf->send_ready(rest_obj); + out_crf->send_ready(dpp, rest_obj); ret = out_crf->send(); if (ret < 0) { return set_cr_error(ret); @@ -411,12 +407,11 @@ int RGWStreamWriteCR::operate() { total_read += bl.length(); do { - /* Cant do yield here as read_op doesnt work well with yield and results - * in deadlock. - */ - ret = out_crf->write(bl, &need_retry); - if (ret < 0) { - return set_cr_error(ret); + yield { + ret = out_crf->write(bl, &need_retry); + if (ret < 0) { + return set_cr_error(ret); + } } if (retcode < 0) { diff --git a/src/rgw/rgw_cr_rest.h b/src/rgw/rgw_cr_rest.h index 399bc6470b31d..e09c554f7aafe 100644 --- a/src/rgw/rgw_cr_rest.h +++ b/src/rgw/rgw_cr_rest.h @@ -9,8 +9,7 @@ #include "rgw_coroutine.h" #include "rgw_rest_conn.h" -#include "rgw_rados.h" - +#include "rgw_sal.h" struct rgw_rest_obj { rgw_obj_key key; @@ -592,13 +591,13 @@ public: class RGWStreamReadCRF { public: - std::unique_ptr read_op; + std::unique_ptr read_op; off_t ofs; off_t end; rgw_rest_obj rest_obj; - std::unique_ptr* obj; + std::unique_ptr* obj; - RGWStreamReadCRF(std::unique_ptr* obj, RGWObjectCtx& obj_ctx); + RGWStreamReadCRF(std::unique_ptr* obj, RGWObjectCtx& obj_ctx); virtual ~RGWStreamReadCRF(); virtual int init() {return 0; } @@ -637,11 +636,16 @@ class RGWStreamWriteCR : public RGWCoroutine { bool sent_attrs{false}; uint64_t total_read{0}; int ret{0}; + off_t ofs; + off_t end; + uint64_t read_len = 0; + rgw_rest_obj rest_obj; + public: RGWStreamWriteCR(CephContext *_cct, RGWHTTPManager *_mgr, std::shared_ptr& _in_crf, std::shared_ptr& _out_crf); ~RGWStreamWriteCR(); - int operate() override; + int operate(const DoutPrefixProvider *dpp) override; }; diff --git a/src/rgw/rgw_json_enc.cc b/src/rgw/rgw_json_enc.cc index 7d18d1f2495e5..5c3ae67350cf3 100644 --- a/src/rgw/rgw_json_enc.cc +++ b/src/rgw/rgw_json_enc.cc @@ -1465,6 +1465,7 @@ void RGWZoneGroupPlacementTierS3::dump(Formatter *f) const encode_json("endpoint", endpoint, f); encode_json("access_key", key.id, f); encode_json("secret", key.key, f); + encode_json("region", region, f); string s = (host_style == PathStyle ? "path" : "virtual"); encode_json("host_style", s, f); encode_json("target_storage_class", target_storage_class, f); @@ -1490,6 +1491,7 @@ void RGWZoneGroupPlacementTierS3::decode_json(JSONObj *obj) JSONDecoder::decode_json("endpoint", endpoint, obj); JSONDecoder::decode_json("access_key", key.id, obj); JSONDecoder::decode_json("secret", key.key, obj); + JSONDecoder::decode_json("region", region, obj); string s; JSONDecoder::decode_json("host_style", s, obj); if (s != "virtual") { diff --git a/src/rgw/rgw_lc.cc b/src/rgw/rgw_lc.cc index 7015b1f34aa07..c9eb675f8c512 100644 --- a/src/rgw/rgw_lc.cc +++ b/src/rgw/rgw_lc.cc @@ -684,8 +684,18 @@ public: static constexpr uint32_t FLAG_EWAIT_SYNC = 0x0001; static constexpr uint32_t FLAG_DWAIT_SYNC = 0x0002; static constexpr uint32_t FLAG_EDRAIN_SYNC = 0x0004; + static constexpr uint32_t FLAG_HTTP_MGR = 0x0008; private: + class C_WorkQTimerCtx: public Context { + WorkQ* wq; + public: + C_WorkQTimerCtx(WorkQ* _wq): wq(_wq) {} + void finish(int r) override { + wq->stop_http_manager(); + } + }; + const work_f bsf = [](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {}; RGWLC::LCWorker* wk; uint32_t qmax; @@ -695,14 +705,28 @@ private: uint32_t flags; vector items; work_f f; + std::unique_ptr crs; + std::unique_ptr http_manager; + bool is_http_mgr_started{false}; + ceph::mutex timer_mtx; + SafeTimer timer; + int timer_wait_sec = 200; //seconds + C_WorkQTimerCtx* timer_ctx = nullptr; public: WorkQ(RGWLC::LCWorker* wk, uint32_t ix, uint32_t qmax) - : wk(wk), qmax(qmax), ix(ix), flags(FLAG_NONE), f(bsf) + : wk(wk), qmax(qmax), ix(ix), flags(FLAG_NONE), f(bsf), + timer_mtx(ceph::make_mutex("WorkQTimerMutex")), + timer((CephContext*)(wk->cct), timer_mtx) { create(thr_name().c_str()); + timer.init(); } + ~WorkQ() { + timer.shutdown(); + } + std::string thr_name() { return std::string{"wp_thrd: "} + std::to_string(wk->ix) + ", " + std::to_string(ix); @@ -712,6 +736,51 @@ public: f = _f; } + RGWCoroutinesManager* get_crs() { return crs.get(); } + RGWHTTPManager* get_http_manager() { return http_manager.get(); } + + int start_http_manager(rgw::sal::Store* store) { + int ret = 0; + + if (is_http_mgr_started) + return 0; + + /* http_mngr */ + if(!crs) { + crs.reset(new RGWCoroutinesManager(store->ctx(), store->get_cr_registry())); + } + if (!http_manager) { + http_manager.reset(new RGWHTTPManager(store->ctx(), crs.get()->get_completion_mgr())); + } + + ret = http_manager->start(); + if (ret < 0) { + dout(5) << "RGWLC:: http_manager->start() failed ret = " + << ret << dendl; + return ret; + } + + is_http_mgr_started = true; + flags |= FLAG_HTTP_MGR; + + return ret; + } + + int stop_http_manager() { + if (!is_http_mgr_started) { + return 0; + } + + http_manager.reset(); + crs.reset(); + + is_http_mgr_started = false; + flags &= ~FLAG_HTTP_MGR; + timer.cancel_all_events(); + timer_ctx = nullptr; + return 0; + } + void enqueue(WorkItem&& item) { unique_lock uniq(mtx); while ((!wk->get_lc()->going_down()) && @@ -719,6 +788,10 @@ public: flags |= FLAG_EWAIT_SYNC; cv.wait_for(uniq, 200ms); } + if (timer_ctx && (flags & FLAG_HTTP_MGR)) { + timer.cancel_all_events(); + timer_ctx = nullptr; + } items.push_back(item); if (flags & FLAG_DWAIT_SYNC) { flags &= ~FLAG_DWAIT_SYNC; @@ -743,6 +816,10 @@ private: if (flags & FLAG_EDRAIN_SYNC) { flags &= ~FLAG_EDRAIN_SYNC; } + if ((flags & FLAG_HTTP_MGR) && !timer_ctx) { + timer_ctx = new C_WorkQTimerCtx(this); + timer.add_event_after(timer_wait_sec, timer_ctx); + } flags |= FLAG_DWAIT_SYNC; cv.wait_for(uniq, 200ms); } @@ -1280,10 +1357,10 @@ public: */ if (oc.bucket->versioned() && oc.o.is_current() && !oc.o.is_delete_marker()) { ret = remove_expired_obj(oc.dpp, oc, false); - ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") current & not delete_marker" << "s versioned_epoch: " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl; + ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") current & not delete_marker" << " versioned_epoch: " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl; } else { ret = remove_expired_obj(oc.dpp, oc, true); - ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") not current " << "s versioned_epoch: " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl; + ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") not current " << "versioned_epoch: " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl; } return ret; } @@ -1295,7 +1372,7 @@ public: real_time read_mtime; - std::unique_ptr read_op(oc.obj->get_read_op(&oc.rctx)); + std::unique_ptr read_op(oc.obj->get_read_op(&oc.rctx)); read_op->params.lastmod = &read_mtime; @@ -1313,7 +1390,7 @@ public: (*tier_ctx.obj)->set_atomic(&tier_ctx.rctx); RGWObjState *s = tier_ctx.rctx.get_state((*tier_ctx.obj)->get_obj()); - std::unique_ptr obj_op(oc.obj->get_write_op(&oc.rctx)); + std::unique_ptr obj_op(oc.obj->get_write_op(&oc.rctx)); obj_op->params.modify_tail = true; obj_op->params.flags = PUT_OBJ_CREATE; @@ -1337,7 +1414,7 @@ public: tier_config.tier_placement = oc.tier; tier_config.is_multipart_upload = tier_ctx.is_multipart_upload; - pmanifest->set_tier_type("cloud"); + pmanifest->set_tier_type("cloud-s3"); pmanifest->set_tier_config(tier_config); /* check if its necessary */ @@ -1355,7 +1432,6 @@ public: * obj_size remains the same even when object is moved to other * storage class. So maybe better to keep it the same way. */ - //pmanifest->set_obj_size(0); obj_op->params.manifest = pmanifest; @@ -1370,6 +1446,10 @@ public: obj_op->params.attrs = &attrs; r = obj_op->prepare(null_yield); + if (r < 0) { + return r; + } + r = obj_op->write_meta(oc.dpp, tier_ctx.o.meta.size, 0, null_yield); if (r < 0) { @@ -1384,8 +1464,9 @@ public: /* init */ string id = "cloudid"; - string endpoint=oc.tier.t.s3.endpoint; + string endpoint = oc.tier.t.s3.endpoint; RGWAccessKey key = oc.tier.t.s3.key; + string region = oc.tier.t.s3.region; HostStyle host_style = oc.tier.t.s3.host_style; string bucket_name = oc.tier.t.s3.target_path; const RGWZoneGroup& zonegroup = oc.store->get_zone()->get_zonegroup(); @@ -1401,21 +1482,26 @@ public: boost::algorithm::to_lower(bucket_name); } - conn.reset(new S3RESTConn(oc.cct, oc.store, id, { endpoint }, key, host_style)); + conn.reset(new S3RESTConn(oc.cct, oc.store, id, { endpoint }, key, region, host_style)); - /* http_mngr */ - RGWCoroutinesManager crs(oc.store->ctx(), oc.store->get_cr_registry()); - RGWHTTPManager http_manager(oc.store->ctx(), crs.get_completion_mgr()); - - int ret = http_manager.start(); + int ret = oc.wq->start_http_manager(oc.store); if (ret < 0) { - ldpp_dout(oc.dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl; + ldpp_dout(oc.dpp, 0) << "failed in start_http_manager() ret=" << ret << dendl; return ret; } + RGWCoroutinesManager* crs = oc.wq->get_crs(); + RGWHTTPManager* http_manager = oc.wq->get_http_manager(); + + if (!crs || !http_manager) { + /* maybe race..return and retry */ + ldpp_dout(oc.dpp, 0) << " http_manager and crs not initialized" << dendl; + return -1; + } + RGWLCCloudTierCtx tier_ctx(oc.cct, oc.dpp, oc.o, oc.store, oc.bucket->get_info(), &oc.obj, oc.rctx, conn, bucket_name, - oc.tier.t.s3.target_storage_class, &http_manager); + oc.tier.t.s3.target_storage_class, http_manager); tier_ctx.acl_mappings = oc.tier.t.s3.acl_mappings; tier_ctx.multipart_min_part_size = oc.tier.t.s3.multipart_min_part_size; tier_ctx.multipart_sync_threshold = oc.tier.t.s3.multipart_sync_threshold; @@ -1427,7 +1513,7 @@ public: * verify if the object is already transitioned. And since its just a best * effort, do not bail out in case of any errors. */ - ret = crs.run(new RGWLCCloudCheckCR(tier_ctx, &al_tiered)); + ret = crs->run(oc.dpp, new RGWLCCloudCheckCR(tier_ctx, &al_tiered)); if (ret < 0) { ldpp_dout(oc.dpp, 0) << "ERROR: failed in RGWCloudCheckCR() ret=" << ret << dendl; @@ -1435,14 +1521,14 @@ public: if (al_tiered) { ldout(tier_ctx.cct, 20) << "Object (" << oc.o.key << ") is already tiered" << dendl; - http_manager.stop(); return 0; } else { - ret = crs.run(new RGWLCCloudTierCR(tier_ctx)); + ret = crs->run(oc.dpp, new RGWLCCloudTierCR(tier_ctx)); } if (ret < 0) { ldpp_dout(oc.dpp, 0) << "ERROR: failed in RGWCloudTierCR() ret=" << ret << dendl; + return ret; } if (delete_object) { @@ -1458,7 +1544,6 @@ public: return ret; } } - http_manager.stop(); return 0; } @@ -1490,6 +1575,14 @@ public: return 0; } + /* Allow transition for only RadosStore */ + rgw::sal::RadosStore *rados = dynamic_cast(oc.store); + + if (!rados) { + ldpp_dout(oc.dpp, 10) << "Object(key:" << oc.o.key << ") is not on RadosStore. Skipping transition to cloud-s3 tier: " << target_placement.storage_class << dendl; + return -1; + } + r = transition_obj_to_cloud(oc); if (r < 0) { ldpp_dout(oc.dpp, 0) << "ERROR: failed to transition obj(key:" << oc.o.key << ") to cloud (r=" << r << ")" @@ -1823,6 +1916,7 @@ int RGWLC::bucket_lc_post(int index, int max_lock_sec, sleep(5); continue; } + if (ret < 0) return 0; ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() lock " << obj_names[index] diff --git a/src/rgw/rgw_lc_tier.cc b/src/rgw/rgw_lc_tier.cc index 805604438d823..d7d62cbf6a621 100644 --- a/src/rgw/rgw_lc_tier.cc +++ b/src/rgw/rgw_lc_tier.cc @@ -26,7 +26,7 @@ using namespace std; -static string get_key_instance(const rgw_obj_key& key) +static inline string get_key_instance(const rgw_obj_key& key) { if (!key.instance.empty() && !key.have_null_instance()) { @@ -35,7 +35,7 @@ static string get_key_instance(const rgw_obj_key& key) return ""; } -static string get_key_oid(const rgw_obj_key& key) +static inline string get_key_oid(const rgw_obj_key& key) { string oid = key.name; if (!key.instance.empty() && @@ -45,7 +45,7 @@ static string get_key_oid(const rgw_obj_key& key) return oid; } -static string obj_to_aws_path(const rgw_obj& obj) +static inline string obj_to_aws_path(const rgw_obj& obj) { string path = obj.bucket.name + "/" + get_key_oid(obj.key); return path; @@ -77,7 +77,7 @@ static std::set keep_headers = { "CONTENT_TYPE", static void init_headers(map& attrs, map& headers) { - for (auto kv : attrs) { + for (auto& kv : attrs) { const char * name = kv.first.c_str(); const auto aiter = rgw_to_http_attrs.find(name); @@ -105,32 +105,7 @@ static void init_headers(map& attrs, } } -class RGWLCStreamGetCRF : public RGWStreamReadHTTPResourceCRF -{ - RGWRESTConn::get_obj_params req_params; - - CephContext *cct; - RGWHTTPManager *http_manager; - rgw_lc_obj_properties obj_properties; - std::shared_ptr conn; - rgw::sal::RGWObject* dest_obj; - string etag; - RGWRESTStreamRWRequest *in_req; - map headers; - - public: - RGWLCStreamGetCRF(CephContext *_cct, - RGWCoroutinesEnv *_env, - RGWCoroutine *_caller, - RGWHTTPManager *_http_manager, - const rgw_lc_obj_properties& _obj_properties, - std::shared_ptr _conn, - rgw::sal::RGWObject* _dest_obj) : - RGWStreamReadHTTPResourceCRF(_cct, _env, _caller, _http_manager, _dest_obj->get_key()), - cct(_cct), http_manager(_http_manager), obj_properties(_obj_properties), - conn(_conn), dest_obj(_dest_obj) {} - - int init() { +int RGWLCStreamGetCRF::init(const DoutPrefixProvider *dpp) { /* init input connection */ req_params.get_op = false; /* Need only headers */ req_params.prepend_metadata = true; @@ -141,15 +116,14 @@ class RGWLCStreamGetCRF : public RGWStreamReadHTTPResourceCRF string etag; real_time set_mtime; - int ret = conn->get_obj(dest_obj, req_params, true /* send */, &in_req); + int ret = conn->get_obj(dpp, dest_obj, req_params, true /* send */, &in_req); if (ret < 0) { ldout(cct, 0) << "ERROR: " << __func__ << "(): conn->get_obj() returned ret=" << ret << dendl; return ret; } /* fetch only headers */ - ret = conn->complete_request(in_req, nullptr, nullptr, - nullptr, nullptr, &headers, null_yield); + ret = conn->complete_request(in_req, nullptr, nullptr, nullptr, nullptr, &headers, null_yield); if (ret < 0 && ret != -ENOENT) { ldout(cct, 20) << "ERROR: " << __func__ << "(): conn->complete_request() returned ret=" << ret << dendl; return ret; @@ -157,11 +131,11 @@ class RGWLCStreamGetCRF : public RGWStreamReadHTTPResourceCRF return 0; } - int is_already_tiered() { +int RGWLCStreamGetCRF::is_already_tiered() { char buf[32]; map attrs = headers; - for (auto a : attrs) { + for (const auto& a : attrs) { ldout(cct, 20) << "GetCrf attr[" << a.first << "] = " << a.second < attrs; uint64_t obj_size; - std::unique_ptr* obj; + std::unique_ptr* obj; const real_time &mtime; bool multipart; @@ -200,7 +173,7 @@ class RGWLCStreamReadCRF : public RGWStreamReadCRF public: RGWLCStreamReadCRF(CephContext *_cct, const DoutPrefixProvider *_dpp, - RGWObjectCtx& obj_ctx, std::unique_ptr* _obj, + RGWObjectCtx& obj_ctx, std::unique_ptr* _obj, const real_time &_mtime) : RGWStreamReadCRF(_obj, obj_ctx), cct(_cct), dpp(_dpp), obj(_obj), mtime(_mtime) {} @@ -272,7 +245,7 @@ class RGWLCStreamReadCRF : public RGWStreamReadCRF init_headers(attrs, rest_obj.attrs); rest_obj.acls.set_ctx(cct); - auto aiter = attrs.find(RGW_ATTR_ACL); + const auto aiter = attrs.find(RGW_ATTR_ACL); if (aiter != attrs.end()) { bufferlist& bl = aiter->second; auto bliter = bl.cbegin(); @@ -302,7 +275,7 @@ class RGWLCStreamPutCRF : public RGWStreamWriteHTTPResourceCRF RGWHTTPManager *http_manager; rgw_lc_obj_properties obj_properties; std::shared_ptr conn; - rgw::sal::RGWObject* dest_obj; + rgw::sal::Object* dest_obj; string etag; public: @@ -312,7 +285,7 @@ class RGWLCStreamPutCRF : public RGWStreamWriteHTTPResourceCRF RGWHTTPManager *_http_manager, const rgw_lc_obj_properties& _obj_properties, std::shared_ptr _conn, - rgw::sal::RGWObject* _dest_obj) : + rgw::sal::Object* _dest_obj) : RGWStreamWriteHTTPResourceCRF(_cct, _env, _caller, _http_manager), cct(_cct), http_manager(_http_manager), obj_properties(_obj_properties), conn(_conn), dest_obj(_dest_obj) { } @@ -350,17 +323,15 @@ class RGWLCStreamPutCRF : public RGWStreamWriteHTTPResourceCRF map& acl_mappings(obj_properties.target_acl_mappings); string target_storage_class = obj_properties.target_storage_class; - auto& new_attrs = *attrs; - - new_attrs.clear(); + attrs->clear(); for (auto& hi : rest_obj.attrs) { if (keep_attr(hi.first)) { - new_attrs.insert(hi); + attrs->insert(hi); } } - auto acl = rest_obj.acls.get_acl(); + const auto acl = rest_obj.acls.get_acl(); map > access_map; @@ -373,7 +344,7 @@ class RGWLCStreamPutCRF : public RGWStreamWriteHTTPResourceCRF const auto& am = acl_mappings; - auto iter = am.find(orig_grantee); + const auto iter = am.find(orig_grantee); if (iter == am.end()) { ldout(cct, 20) << "acl_mappings: Could not find " << orig_grantee << " .. ignoring" << dendl; continue; @@ -413,7 +384,7 @@ class RGWLCStreamPutCRF : public RGWStreamWriteHTTPResourceCRF } } - for (auto aiter : access_map) { + for (const auto& aiter : access_map) { int grant_type = aiter.first; string header_str("x-amz-grant-"); @@ -438,7 +409,7 @@ class RGWLCStreamPutCRF : public RGWStreamWriteHTTPResourceCRF string s; - for (auto viter : aiter.second) { + for (const auto& viter : aiter.second) { if (!s.empty()) { s.append(", "); } @@ -447,37 +418,40 @@ class RGWLCStreamPutCRF : public RGWStreamWriteHTTPResourceCRF ldout(cct, 20) << "acl_mappings: set acl: " << header_str << "=" << s << dendl; - new_attrs[header_str] = s; + (*attrs)[header_str] = s; } /* Copy target storage class */ if (!target_storage_class.empty()) { - new_attrs["x-amz-storage-class"] = target_storage_class; + (*attrs)["x-amz-storage-class"] = target_storage_class; } else { - new_attrs["x-amz-storage-class"] = "STANDARD"; + (*attrs)["x-amz-storage-class"] = "STANDARD"; } /* New attribute to specify its transitioned from RGW */ - new_attrs["x-amz-meta-rgwx-source"] = "rgw"; + (*attrs)["x-amz-meta-rgwx-source"] = "rgw"; char buf[32]; snprintf(buf, sizeof(buf), "%llu", (long long)obj_properties.versioned_epoch); - new_attrs["x-amz-meta-rgwx-versioned-epoch"] = buf; + (*attrs)["x-amz-meta-rgwx-versioned-epoch"] = buf; utime_t ut(obj_properties.mtime); snprintf(buf, sizeof(buf), "%lld.%09lld", (long long)ut.sec(), (long long)ut.nsec()); - new_attrs["x-amz-meta-rgwx-source-mtime"] = buf; - new_attrs["x-amz-meta-rgwx-source-etag"] = obj_properties.etag; - new_attrs["x-amz-meta-rgwx-source-key"] = rest_obj.key.name; + (*attrs)["x-amz-meta-rgwx-source-mtime"] = buf; + (*attrs)["x-amz-meta-rgwx-source-etag"] = obj_properties.etag; + (*attrs)["x-amz-meta-rgwx-source-key"] = rest_obj.key.name; if (!rest_obj.key.instance.empty()) { - new_attrs["x-amz-meta-rgwx-source-version-id"] = rest_obj.key.instance; + (*attrs)["x-amz-meta-rgwx-source-version-id"] = rest_obj.key.instance; + } + for (const auto& a : (*attrs)) { + ldout(cct, 30) << "init_send_attrs attr[" << a.first << "] = " << a.second <(req); map new_attrs; @@ -489,11 +463,11 @@ class RGWLCStreamPutCRF : public RGWStreamWriteHTTPResourceCRF RGWAccessControlPolicy policy; - r->send_ready(conn->get_key(), new_attrs, policy, false); + r->send_ready(dpp, conn->get_key(), new_attrs, policy); } void handle_headers(const map& headers) { - for (auto h : headers) { + for (const auto& h : headers) { if (h.first == "ETAG") { etag = h.second; } @@ -517,51 +491,50 @@ class RGWLCStreamObjToCloudPlainCR : public RGWCoroutine { std::shared_ptr in_crf; std::shared_ptr out_crf; - std::unique_ptr dest_bucket; - std::unique_ptr dest_obj; + std::unique_ptr dest_bucket; + std::unique_ptr dest_obj; + + rgw_lc_obj_properties obj_properties; + RGWBucketInfo b; + string target_obj_name; + + rgw::sal::Object *o; public: RGWLCStreamObjToCloudPlainCR(RGWLCCloudTierCtx& _tier_ctx) - : RGWCoroutine(_tier_ctx.cct), tier_ctx(_tier_ctx) {} - - int operate() override { - rgw_lc_obj_properties obj_properties(tier_ctx.o.meta.mtime, - tier_ctx.o.meta.etag, - tier_ctx.o.versioned_epoch, - tier_ctx.acl_mappings, - tier_ctx.target_storage_class); + : RGWCoroutine(_tier_ctx.cct), tier_ctx(_tier_ctx), + obj_properties(tier_ctx.o.meta.mtime, tier_ctx.o.meta.etag, + tier_ctx.o.versioned_epoch, tier_ctx.acl_mappings, + tier_ctx.target_storage_class){} - string target_obj_name; - RGWBucketInfo b; - int reterr = 0; + int operate(const DoutPrefixProvider *dpp) { - b.bucket.name = tier_ctx.target_bucket_name; - target_obj_name = tier_ctx.bucket_info.bucket.name + "/" + - (*tier_ctx.obj)->get_name(); - if (!tier_ctx.o.is_current()) { - target_obj_name += get_key_instance((*tier_ctx.obj)->get_key()); - } + reenter(this) { + b.bucket.name = tier_ctx.target_bucket_name; + target_obj_name = tier_ctx.bucket_info.bucket.name + "/" + + (*tier_ctx.obj)->get_name(); + if (!tier_ctx.o.is_current()) { + target_obj_name += get_key_instance((*tier_ctx.obj)->get_key()); + } - reterr = tier_ctx.store->get_bucket(nullptr, b, &dest_bucket); - if (reterr < 0) { - ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize dest_bucket - " << tier_ctx.target_bucket_name << " , reterr = " << reterr << dendl; - return reterr; - } + retcode = tier_ctx.store->get_bucket(nullptr, b, &dest_bucket); + if (retcode < 0) { + ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize dest_bucket - " << tier_ctx.target_bucket_name << " , retcode = " << retcode << dendl; + return retcode; + } - dest_obj = dest_bucket->get_object(rgw_obj_key(target_obj_name)); - if (!dest_obj) { - ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize dest_object path - " << target_obj_name << dendl; - return -1; - } - - rgw::sal::RGWObject *o = static_cast(dest_obj.get()); + dest_obj = dest_bucket->get_object(rgw_obj_key(target_obj_name)); + if (!dest_obj) { + ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize dest_object path - " << target_obj_name << dendl; + return -1; + } + o = dest_obj.get(); - reenter(this) { // tier_ctx.obj.set_atomic(&tier_ctx.rctx); -- might need when updated to zipper SAL /* Prepare Read from source */ - in_crf.reset(new RGWLCStreamReadCRF(tier_ctx.cct, tier_ctx.dpp, + in_crf.reset(new RGWLCStreamReadCRF(tier_ctx.cct, dpp, tier_ctx.rctx, tier_ctx.obj, tier_ctx.o.meta.mtime)); out_crf.reset(new RGWLCStreamPutCRF((CephContext *)(tier_ctx.cct), get_env(), this, @@ -591,45 +564,44 @@ class RGWLCStreamObjToCloudMultipartPartCR : public RGWCoroutine { std::shared_ptr in_crf; std::shared_ptr out_crf; - std::unique_ptr dest_bucket; - std::unique_ptr dest_obj; + std::unique_ptr dest_bucket; + std::unique_ptr dest_obj; + + rgw_lc_obj_properties obj_properties; + RGWBucketInfo b; + string target_obj_name; + off_t end; public: RGWLCStreamObjToCloudMultipartPartCR(RGWLCCloudTierCtx& _tier_ctx, const string& _upload_id, const rgw_lc_multipart_part_info& _part_info, string *_petag) : RGWCoroutine(_tier_ctx.cct), tier_ctx(_tier_ctx), - upload_id(_upload_id), part_info(_part_info), petag(_petag) {} - - int operate() override { - ldout(cct, 0) << "In CloudMultipartPartCR XXXXXXXXXXXXXXXXXXX" << dendl; - rgw_lc_obj_properties obj_properties(tier_ctx.o.meta.mtime, tier_ctx.o.meta.etag, - tier_ctx.o.versioned_epoch, tier_ctx.acl_mappings, - tier_ctx.target_storage_class); - string target_obj_name; - off_t end; - RGWBucketInfo b; - int reterr = 0; + upload_id(_upload_id), part_info(_part_info), petag(_petag), + obj_properties(tier_ctx.o.meta.mtime, tier_ctx.o.meta.etag, + tier_ctx.o.versioned_epoch, tier_ctx.acl_mappings, + tier_ctx.target_storage_class){} - b.bucket.name = tier_ctx.target_bucket_name; - target_obj_name = tier_ctx.bucket_info.bucket.name + "/" + - (*tier_ctx.obj)->get_name(); - if (!tier_ctx.o.is_current()) { - target_obj_name += get_key_instance((*tier_ctx.obj)->get_key()); - } + int operate(const DoutPrefixProvider *dpp) override { + reenter(this) { + b.bucket.name = tier_ctx.target_bucket_name; + target_obj_name = tier_ctx.bucket_info.bucket.name + "/" + + (*tier_ctx.obj)->get_name(); + if (!tier_ctx.o.is_current()) { + target_obj_name += get_key_instance((*tier_ctx.obj)->get_key()); + } - reterr = tier_ctx.store->get_bucket(nullptr, b, &dest_bucket); - if (reterr < 0) { - ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize dest_bucket - " << tier_ctx.target_bucket_name << " , reterr = " << reterr << dendl; - return reterr; - } + retcode = tier_ctx.store->get_bucket(nullptr, b, &dest_bucket); + if (retcode < 0) { + ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize dest_bucket - " << tier_ctx.target_bucket_name << " , retcode = " << retcode << dendl; + return retcode; + } - dest_obj = dest_bucket->get_object(rgw_obj_key(target_obj_name)); - if (!dest_obj) { - ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize dest_object path - " << target_obj_name << dendl; - return -1; - } + dest_obj = dest_bucket->get_object(rgw_obj_key(target_obj_name)); + if (!dest_obj) { + ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize dest_object path - " << target_obj_name << dendl; + return -1; + } - reenter(this) { // tier_ctx.obj.set_atomic(&tier_ctx.rctx); -- might need when updated to zipper SAL /* Prepare Read from source */ @@ -642,7 +614,7 @@ class RGWLCStreamObjToCloudMultipartPartCR : public RGWCoroutine { /* Prepare write */ out_crf.reset(new RGWLCStreamPutCRF((CephContext *)(tier_ctx.cct), get_env(), this, (RGWHTTPManager*)(tier_ctx.http_manager), obj_properties, tier_ctx.conn, - static_cast(dest_obj.get()))); + dest_obj.get())); out_crf->set_multipart(upload_id, part_info.part_num, part_info.size); @@ -680,7 +652,7 @@ class RGWLCAbortMultipartCR : public RGWCoroutine { dest_conn(_dest_conn), dest_obj(_dest_obj), upload_id(_upload_id) {} - int operate() override { + int operate(const DoutPrefixProvider *dpp) override { reenter(this) { yield { @@ -736,7 +708,7 @@ class RGWLCInitMultipartCR : public RGWCoroutine { dest_obj(_dest_obj), obj_size(_obj_size), attrs(_attrs), upload_id(_upload_id) {} - int operate() override { + int operate(const DoutPrefixProvider *dpp) override { reenter(this) { yield { @@ -804,7 +776,7 @@ class RGWLCCompleteMultipartCR : public RGWCoroutine { explicit CompleteMultipartReq(const map& _parts) : parts(_parts) {} void dump_xml(Formatter *f) const { - for (auto p : parts) { + for (const auto& p : parts) { f->open_object_section("Part"); encode_xml("PartNumber", p.first, f); encode_xml("ETag", p.second.etag, f); @@ -835,7 +807,7 @@ class RGWLCCompleteMultipartCR : public RGWCoroutine { dest_conn(_dest_conn), dest_obj(_dest_obj), upload_id(_upload_id), req_enc(_parts) {} - int operate() override { + int operate(const DoutPrefixProvider *dpp) override { reenter(this) { yield { @@ -901,7 +873,6 @@ class RGWLCStreamAbortMultipartUploadCR : public RGWCoroutine { const rgw_raw_obj status_obj; string upload_id; - int ret = -1; public: @@ -911,17 +882,16 @@ class RGWLCStreamAbortMultipartUploadCR : public RGWCoroutine { tier_ctx(_tier_ctx), dest_obj(_dest_obj), status_obj(_status_obj), upload_id(_upload_id) {} - int operate() override { + int operate(const DoutPrefixProvider *dpp) override { reenter(this) { yield call(new RGWLCAbortMultipartCR(tier_ctx.cct, tier_ctx.http_manager, tier_ctx.conn.get(), dest_obj, upload_id)); if (retcode < 0) { ldout(tier_ctx.cct, 0) << "ERROR: failed to abort multipart upload dest obj=" << dest_obj << " upload_id=" << upload_id << " retcode=" << retcode << dendl; /* ignore error, best effort */ } - ret = tier_ctx.store->delete_system_obj(status_obj.pool, status_obj.oid, nullptr, null_yield); - - if (ret < 0) { - ldout(tier_ctx.cct, 0) << "ERROR: failed to remove sync status obj obj=" << status_obj << " retcode=" << ret << dendl; + yield call(new RGWRadosRemoveCR(dynamic_cast(tier_ctx.store), status_obj)); + if (retcode < 0) { + ldout(tier_ctx.cct, 0) << "ERROR: failed to remove sync status obj obj=" << status_obj << " retcode=" << retcode << dendl; /* ignore error, best effort */ } return set_cr_done(); @@ -946,59 +916,54 @@ class RGWLCStreamObjToCloudMultipartCR : public RGWCoroutine { map new_attrs; - int ret_err{0}; - rgw_raw_obj status_obj; - bufferlist bl; + + rgw_lc_obj_properties obj_properties; + RGWBucketInfo b; + string target_obj_name; + rgw_bucket target_bucket; + rgw::sal::RadosStore *rados; public: - RGWLCStreamObjToCloudMultipartCR(RGWLCCloudTierCtx& _tier_ctx) : RGWCoroutine(_tier_ctx.cct), tier_ctx(_tier_ctx) {} + RGWLCStreamObjToCloudMultipartCR(RGWLCCloudTierCtx& _tier_ctx) + : RGWCoroutine(_tier_ctx.cct), tier_ctx(_tier_ctx), + obj_properties(tier_ctx.o.meta.mtime, tier_ctx.o.meta.etag, + tier_ctx.o.versioned_epoch, tier_ctx.acl_mappings, + tier_ctx.target_storage_class){} - int operate() override { - rgw_lc_obj_properties obj_properties(tier_ctx.o.meta.mtime, - tier_ctx.o.meta.etag, - tier_ctx.o.versioned_epoch, - tier_ctx.acl_mappings, - tier_ctx.target_storage_class); + int operate(const DoutPrefixProvider *dpp) override { + reenter(this) { - //rgw_obj& obj = (*tier_ctx.obj)->get_obj(); - obj_size = tier_ctx.o.meta.size; + obj_size = tier_ctx.o.meta.size; - rgw_bucket target_bucket; - target_bucket.name = tier_ctx.target_bucket_name; + target_bucket.name = tier_ctx.target_bucket_name; - string target_obj_name; - target_obj_name = tier_ctx.bucket_info.bucket.name + "/" + - (*tier_ctx.obj)->get_name(); - if (!tier_ctx.o.is_current()) { - target_obj_name += get_key_instance((*tier_ctx.obj)->get_key()); - } - rgw_obj dest_obj(target_bucket, target_obj_name); - rgw_rest_obj rest_obj; - - reenter(this) { + target_obj_name = tier_ctx.bucket_info.bucket.name + "/" + + (*tier_ctx.obj)->get_name(); + if (!tier_ctx.o.is_current()) { + target_obj_name += get_key_instance((*tier_ctx.obj)->get_key()); + } + dest_obj.init(target_bucket, target_obj_name); - status_obj = rgw_raw_obj(tier_ctx.store->get_zone()->get_params().log_pool, - "lc_multipart_" + (*tier_ctx.obj)->get_oid()); + status_obj = rgw_raw_obj(tier_ctx.store->get_zone()->get_params().log_pool, + "lc_multipart_" + (*tier_ctx.obj)->get_oid()); - ret_err = tier_ctx.store->get_system_obj(tier_ctx.dpp, status_obj.pool, - status_obj.oid, bl, NULL, NULL, null_yield); + rados = dynamic_cast(tier_ctx.store); - if (ret_err < 0 && ret_err != -ENOENT) { - ldout(tier_ctx.cct, 0) << "ERROR: failed to read sync status of object " << src_obj << " retcode=" << ret_err << dendl; - return retcode; + if (!rados) { + ldout(tier_ctx.cct, 0) << "ERROR: Not a RadosStore. Cannot be transitioned to cloud." << dendl; + return -1; } - if (ret_err >= 0) { - auto iter = bl.cbegin(); - try { - decode(status, iter); - } catch (buffer::error& err) { - return -EIO; - } + yield call(new RGWSimpleRadosReadCR(dpp, rados->svc()->rados->get_async_processor(), rados->svc()->sysobj, + status_obj, &status, false)); + + if (retcode < 0 && retcode != -ENOENT) { + ldout(tier_ctx.cct, 0) << "ERROR: failed to read sync status of object " << src_obj << " retcode=" << retcode << dendl; + return retcode; } - if (ret_err >= 0) { + if (retcode >= 0) { /* check here that mtime and size did not change */ if (status.mtime != obj_properties.mtime || status.obj_size != obj_size || status.etag != obj_properties.etag) { @@ -1007,7 +972,7 @@ class RGWLCStreamObjToCloudMultipartCR : public RGWCoroutine { } } - if (ret_err == -ENOENT) { + if (retcode == -ENOENT) { in_crf.reset(new RGWLCStreamReadCRF(tier_ctx.cct, tier_ctx.dpp, tier_ctx.rctx, tier_ctx.obj, tier_ctx.o.meta.mtime)); in_crf->init(); @@ -1056,16 +1021,14 @@ class RGWLCStreamObjToCloudMultipartCR : public RGWCoroutine { if (retcode < 0) { ldout(tier_ctx.cct, 0) << "ERROR: failed to sync obj=" << tier_ctx.obj << ", sync via multipart upload, upload_id=" << status.upload_id << " part number " << status.cur_part << " (error: " << cpp_strerror(-retcode) << ")" << dendl; - ret_err = retcode; yield call(new RGWLCStreamAbortMultipartUploadCR(tier_ctx, dest_obj, status_obj, status.upload_id)); - return set_cr_error(ret_err); + return set_cr_error(retcode); } - encode(status, bl); - ret_err = tier_ctx.store->put_system_obj(status_obj.pool, status_obj.oid, - bl, false, NULL, real_time(), null_yield, NULL); - if (ret_err < 0) { - ldout(tier_ctx.cct, 0) << "ERROR: failed to store multipart upload state, retcode=" << ret_err << dendl; + yield call(new RGWSimpleRadosWriteCR(dpp, rados->svc()->rados->get_async_processor(), rados->svc()->sysobj, status_obj, status)); + + if (retcode < 0) { + ldout(tier_ctx.cct, 0) << "ERROR: failed to store multipart upload state, retcode=" << retcode << dendl; /* continue with upload anyway */ } ldout(tier_ctx.cct, 0) << "sync of object=" << tier_ctx.obj << " via multipart upload, finished sending part #" << status.cur_part << " etag=" << status.parts[status.cur_part].etag << dendl; @@ -1074,16 +1037,14 @@ class RGWLCStreamObjToCloudMultipartCR : public RGWCoroutine { yield call(new RGWLCCompleteMultipartCR(tier_ctx.cct, tier_ctx.http_manager, tier_ctx.conn.get(), dest_obj, status.upload_id, status.parts)); if (retcode < 0) { ldout(tier_ctx.cct, 0) << "ERROR: failed to complete multipart upload of obj=" << tier_ctx.obj << " (error: " << cpp_strerror(-retcode) << ")" << dendl; - ret_err = retcode; yield call(new RGWLCStreamAbortMultipartUploadCR(tier_ctx, dest_obj, status_obj, status.upload_id)); - return set_cr_error(ret_err); + return set_cr_error(retcode); } /* remove status obj */ - ret_err = tier_ctx.store->delete_system_obj(status_obj.pool, status_obj.oid, nullptr, null_yield); - - if (ret_err < 0) { - ldout(tier_ctx.cct, 0) << "ERROR: failed to abort multipart upload obj=" << tier_ctx.obj << " upload_id=" << status.upload_id << " part number " << status.cur_part << " (" << cpp_strerror(-ret_err) << ")" << dendl; + yield call(new RGWRadosRemoveCR(rados, status_obj)); + if (retcode < 0) { + ldout(tier_ctx.cct, 0) << "ERROR: failed to abort multipart upload obj=" << tier_ctx.obj << " upload_id=" << status.upload_id << " part number " << status.cur_part << " (" << cpp_strerror(-retcode) << ")" << dendl; /* ignore error, best effort */ } return set_cr_done(); @@ -1092,55 +1053,46 @@ class RGWLCStreamObjToCloudMultipartCR : public RGWCoroutine { } }; -int RGWLCCloudCheckCR::operate() { +int RGWLCCloudCheckCR::operate(const DoutPrefixProvider *dpp) { /* Check if object has already been transitioned */ - rgw_lc_obj_properties obj_properties(tier_ctx.o.meta.mtime, tier_ctx.o.meta.etag, - tier_ctx.o.versioned_epoch, tier_ctx.acl_mappings, - tier_ctx.target_storage_class); - - RGWBucketInfo b; - string target_obj_name; - int reterr = 0; - - b.bucket.name = tier_ctx.target_bucket_name; - target_obj_name = tier_ctx.bucket_info.bucket.name + "/" + - (*tier_ctx.obj)->get_name(); + reenter(this) { + b.bucket.name = tier_ctx.target_bucket_name; + target_obj_name = tier_ctx.bucket_info.bucket.name + "/" + + (*tier_ctx.obj)->get_name(); if (!tier_ctx.o.is_current()) { target_obj_name += get_key_instance((*tier_ctx.obj)->get_key()); } - std::unique_ptr dest_bucket; - std::unique_ptr dest_obj; - - - reterr = tier_ctx.store->get_bucket(nullptr, b, &dest_bucket); - if (reterr < 0) { - ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize dest_bucket - " << tier_ctx.target_bucket_name << " , reterr = " << reterr << dendl; - return reterr; - } + retcode = tier_ctx.store->get_bucket(nullptr, b, &dest_bucket); + if (retcode < 0) { + ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize dest_bucket - " << tier_ctx.target_bucket_name << " , reterr = " << retcode << dendl; + return ret; + } - dest_obj = dest_bucket->get_object(rgw_obj_key(target_obj_name)); - if (!dest_obj) { - ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize dest_object path - " << target_obj_name << dendl; - return -1; - } + dest_obj = dest_bucket->get_object(rgw_obj_key(target_obj_name)); + if (!dest_obj) { + ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize dest_object path - " << target_obj_name << dendl; + return -1; + } - std::shared_ptr get_crf; - get_crf.reset(new RGWLCStreamGetCRF((CephContext *)(tier_ctx.cct), get_env(), this, - (RGWHTTPManager*)(tier_ctx.http_manager), - obj_properties, tier_ctx.conn, static_cast(dest_obj.get()))); - int ret = 0; + get_crf.reset(new RGWLCStreamGetCRF(tier_ctx.cct, get_env(), this, tier_ctx.http_manager, obj_properties, + tier_ctx.conn, dest_obj.get())); - reenter(this) { /* Having yield here doesn't seem to wait for init2() to fetch the headers * before calling is_already_tiered() below */ - ret = get_crf->init(); - if (ret < 0) { - ldout(tier_ctx.cct, 0) << "ERROR: failed to fetch HEAD from cloud for obj=" << tier_ctx.obj << " , ret = " << ret << dendl; - return set_cr_error(ret); + yield { + retcode = get_crf->init(dpp); + if (retcode < 0) { + ldout(tier_ctx.cct, 0) << "ERROR: failed to fetch HEAD from cloud for obj=" << tier_ctx.obj << " , retcode = " << retcode << dendl; + return set_cr_error(ret); + } + } + if (retcode < 0) { + ldout(tier_ctx.cct, 20) << __func__ << ": get_crf()->init retcode=" << retcode << dendl; + return set_cr_error(retcode); } - if ((static_cast(get_crf.get()))->is_already_tiered()) { + if (get_crf.get()->is_already_tiered()) { *already_tiered = true; ldout(tier_ctx.cct, 20) << "is_already_tiered true" << dendl; return set_cr_done(); @@ -1155,7 +1107,7 @@ int RGWLCCloudCheckCR::operate() { map , utime_t> target_buckets; -int RGWLCCloudTierCR::operate() { +int RGWLCCloudTierCR::operate(const DoutPrefixProvider *dpp) { pair key(tier_ctx.storage_class, tier_ctx.target_bucket_name); bool bucket_created = false; diff --git a/src/rgw/rgw_lc_tier.h b/src/rgw/rgw_lc_tier.h index eb323f2a6ed32..91053f6ea76cc 100644 --- a/src/rgw/rgw_lc_tier.h +++ b/src/rgw/rgw_lc_tier.h @@ -11,6 +11,7 @@ #include "rgw_coroutine.h" #include "rgw_rados.h" #include "rgw_zone.h" +#include "rgw_sal_rados.h" #define DEFAULT_MULTIPART_SYNC_PART_SIZE (32 * 1024 * 1024) #define MULTIPART_MIN_POSSIBLE_PART_SIZE (5 * 1024 * 1024) @@ -21,11 +22,11 @@ struct RGWLCCloudTierCtx { /* Source */ rgw_bucket_dir_entry& o; - rgw::sal::RGWStore *store; + rgw::sal::Store *store; RGWBucketInfo& bucket_info; string storage_class; - std::unique_ptr* obj; + std::unique_ptr* obj; RGWObjectCtx& rctx; /* Remote */ @@ -41,8 +42,8 @@ struct RGWLCCloudTierCtx { bool is_multipart_upload{false}; RGWLCCloudTierCtx(CephContext* _cct, const DoutPrefixProvider *_dpp, - rgw_bucket_dir_entry& _o, rgw::sal::RGWStore* _store, - RGWBucketInfo &_binfo, std::unique_ptr* _obj, + rgw_bucket_dir_entry& _o, rgw::sal::Store* _store, + RGWBucketInfo &_binfo, std::unique_ptr* _obj, RGWObjectCtx& _rctx, std::shared_ptr _conn, string _bucket, string _storage_class, RGWHTTPManager *_http) : cct(_cct), dpp(_dpp), o(_o), store(_store), bucket_info(_binfo), @@ -50,39 +51,6 @@ struct RGWLCCloudTierCtx { target_storage_class(_storage_class), http_manager(_http) {} }; -class RGWLCCloudTierCR : public RGWCoroutine { - RGWLCCloudTierCtx& tier_ctx; - bufferlist out_bl; - int retcode; - struct CreateBucketResult { - string code; - - void decode_xml(XMLObj *obj) { - RGWXMLDecoder::decode_xml("Code", code, obj); - } - } result; - - public: - RGWLCCloudTierCR(RGWLCCloudTierCtx& _tier_ctx) : - RGWCoroutine(_tier_ctx.cct), tier_ctx(_tier_ctx) {} - - int operate() override; -}; - -class RGWLCCloudCheckCR : public RGWCoroutine { - RGWLCCloudTierCtx& tier_ctx; - bufferlist bl; - bool need_retry{false}; - int retcode; - bool *already_tiered; - - public: - RGWLCCloudCheckCR(RGWLCCloudTierCtx& _tier_ctx, bool *_al_ti) : - RGWCoroutine(_tier_ctx.cct), tier_ctx(_tier_ctx), already_tiered(_al_ti) {} - - int operate() override; -}; - struct rgw_lc_multipart_part_info { int part_num{0}; uint64_t ofs{0}; @@ -190,4 +158,75 @@ struct rgw_lc_multipart_upload_info { }; WRITE_CLASS_ENCODER(rgw_lc_multipart_upload_info) +class RGWLCStreamGetCRF : public RGWStreamReadHTTPResourceCRF +{ + RGWRESTConn::get_obj_params req_params; + + CephContext *cct; + RGWHTTPManager *http_manager; + rgw_lc_obj_properties obj_properties; + std::shared_ptr conn; + rgw::sal::Object* dest_obj; + string etag; + RGWRESTStreamRWRequest *in_req; + map headers; + + public: + RGWLCStreamGetCRF(CephContext *_cct, + RGWCoroutinesEnv *_env, + RGWCoroutine *_caller, + RGWHTTPManager *_http_manager, + const rgw_lc_obj_properties& _obj_properties, + std::shared_ptr _conn, + rgw::sal::Object* _dest_obj) : + RGWStreamReadHTTPResourceCRF(_cct, _env, _caller, _http_manager, _dest_obj->get_key()), + cct(_cct), http_manager(_http_manager), obj_properties(_obj_properties), + conn(_conn), dest_obj(_dest_obj) {} + int init(const DoutPrefixProvider *dpp); + int is_already_tiered(); +}; + +class RGWLCCloudTierCR : public RGWCoroutine { + RGWLCCloudTierCtx& tier_ctx; + bufferlist out_bl; + int retcode; + struct CreateBucketResult { + string code; + + void decode_xml(XMLObj *obj) { + RGWXMLDecoder::decode_xml("Code", code, obj); + } + } result; + + public: + RGWLCCloudTierCR(RGWLCCloudTierCtx& _tier_ctx) : + RGWCoroutine(_tier_ctx.cct), tier_ctx(_tier_ctx) {} + + int operate(const DoutPrefixProvider *dpp) override; +}; + +class RGWLCCloudCheckCR : public RGWCoroutine { + RGWLCCloudTierCtx& tier_ctx; + bufferlist bl; + bool need_retry{false}; + int retcode; + bool *already_tiered; + rgw_lc_obj_properties obj_properties; + RGWBucketInfo b; + string target_obj_name; + int ret = 0; + std::unique_ptr dest_bucket; + std::unique_ptr dest_obj; + std::unique_ptr get_crf; + + public: + RGWLCCloudCheckCR(RGWLCCloudTierCtx& _tier_ctx, bool *_al_ti) : + RGWCoroutine(_tier_ctx.cct), tier_ctx(_tier_ctx), already_tiered(_al_ti), + obj_properties(tier_ctx.o.meta.mtime, tier_ctx.o.meta.etag, + tier_ctx.o.versioned_epoch, tier_ctx.acl_mappings, + tier_ctx.target_storage_class){} + + int operate(const DoutPrefixProvider *dpp) override; +}; + #endif diff --git a/src/rgw/rgw_obj_manifest.h b/src/rgw/rgw_obj_manifest.h index c54fd36f367ba..03bfefad5c277 100644 --- a/src/rgw/rgw_obj_manifest.h +++ b/src/rgw/rgw_obj_manifest.h @@ -152,7 +152,6 @@ struct RGWObjTier { string name; RGWZoneGroupPlacementTier tier_placement; bool is_multipart_upload{false}; - /* XXX: Add any multipart upload details */ RGWObjTier(): name("none") {} @@ -449,22 +448,20 @@ public: return max_head_size; } - const string get_tier_type() { + const std::string& get_tier_type() { return tier_type; } - void set_tier_type(string value) { - /* Only "cloud" tier-type is supported for now */ - if (value == "cloud") { + inline void set_tier_type(string value) { + /* Only "cloud-s3" tier-type is supported for now */ + if (value == "cloud-s3") { tier_type = value; - } else { - tier_type = "none"; } } - void set_tier_config(RGWObjTier t) { - /* Set only if tier_type set to "cloud" */ - if (tier_type != "cloud") + inline void set_tier_config(RGWObjTier t) { + /* Set only if tier_type set to "cloud-s3" */ + if (tier_type != "cloud-s3") return; tier_config.name = t.name; @@ -472,8 +469,8 @@ public: tier_config.is_multipart_upload = t.is_multipart_upload; } - const void get_tier_config(RGWObjTier* t) { - if (tier_type != "cloud") + inline const void get_tier_config(RGWObjTier* t) { + if (tier_type != "cloud-s3") return; t->name = tier_config.name; diff --git a/src/rgw/rgw_zone.cc b/src/rgw/rgw_zone.cc index 3c288d4285d09..a10f2476fadce 100644 --- a/src/rgw/rgw_zone.cc +++ b/src/rgw/rgw_zone.cc @@ -2081,7 +2081,7 @@ void RGWZoneGroupMap::decode(bufferlist::const_iterator& bl) { } } -static int conf_to_uint64(const JSONFormattable& config, const string& key, uint64_t *pval) +static inline int conf_to_uint64(const JSONFormattable& config, const string& key, uint64_t *pval) { string sval; if (config.find(key, &sval)) { @@ -2125,6 +2125,9 @@ int RGWZoneGroupPlacementTierS3::update_params(const JSONFormattable& config) if (config.exists("target_path")) { target_path = config["target_path"]; } + if (config.exists("region")) { + region = config["region"]; + } if (config.exists("host_style")) { string s; s = config["host_style"]; @@ -2198,6 +2201,9 @@ int RGWZoneGroupPlacementTierS3::clear_params(const JSONFormattable& config) if (config.exists("target_path")) { target_path.clear(); } + if (config.exists("region")) { + region.clear(); + } if (config.exists("host_style")) { /* default */ host_style = PathStyle; diff --git a/src/rgw/rgw_zone.h b/src/rgw/rgw_zone.h index 2376f8bdb7a79..05eb34db31a1d 100644 --- a/src/rgw/rgw_zone.h +++ b/src/rgw/rgw_zone.h @@ -695,16 +695,21 @@ struct RGWTierACLMapping { void init(const JSONFormattable& config) { const string& t = config["type"]; - type = get_acl_type(t); + if (t == "email") { + type = ACL_TYPE_EMAIL_USER; + } else if (t == "uri") { + type = ACL_TYPE_GROUP; + } else { + type = ACL_TYPE_CANON_USER; + } + source_id = config["source_id"]; dest_id = config["dest_id"]; } void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); - - string s = get_acl_type_str(type); - encode(s, bl); + encode((uint32_t)type, bl); encode(source_id, bl); encode(dest_id, bl); ENCODE_FINISH(bl); @@ -712,10 +717,9 @@ struct RGWTierACLMapping { void decode(bufferlist::const_iterator& bl) { DECODE_START(1, bl); - string s; - decode(s, bl); - - type = get_acl_type(s); + uint32_t it; + decode(it, bl); + type = (ACLGranteeTypeEnum)it; decode(source_id, bl); decode(dest_id, bl); DECODE_FINISH(bl); @@ -729,6 +733,7 @@ struct RGWZoneGroupPlacementTierS3 { #define DEFAULT_MULTIPART_SYNC_PART_SIZE (32 * 1024 * 1024) std::string endpoint; RGWAccessKey key; + std::string region; HostStyle host_style{PathStyle}; string target_storage_class; @@ -746,8 +751,8 @@ struct RGWZoneGroupPlacementTierS3 { ENCODE_START(1, 1, bl); encode(endpoint, bl); encode(key, bl); - string s = (host_style == PathStyle ? "path" : "virtual"); - encode(s, bl); + encode(region, bl); + encode((uint32_t)host_style, bl); encode(target_storage_class, bl); encode(target_path, bl); encode(acl_mappings, bl); @@ -760,13 +765,12 @@ struct RGWZoneGroupPlacementTierS3 { DECODE_START(1, bl); decode(endpoint, bl); decode(key, bl); - string s; - decode(s, bl); - if (s != "virtual") { - host_style = PathStyle; - } else { - host_style = VirtualStyle; - } + decode(region, bl); + + uint32_t it; + decode(it, bl); + host_style = (HostStyle)it; + decode(target_storage_class, bl); decode(target_path, bl); decode(acl_mappings, bl); -- 2.39.5