From: Soumya Koduri Date: Wed, 23 Jun 2021 18:00:11 +0000 (+0530) Subject: rgw/CloudTransition: Replace Coroutines with RGWRestConn APIs X-Git-Tag: v17.1.0~411^2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=44317eacf09183fba542de21ff47ce20c3a7df57;p=ceph-ci.git rgw/CloudTransition: Replace Coroutines with RGWRestConn APIs To avoid the overhead of using coroutines during lifecycle transition, RGWRESTStream* APIs are used to transition objects to remote cloud. Also handled few optimizations and cleanup stated below: * Store the list of cloud target buckets as part of LCWorker instead of making it global. This list is maintained for the duration of RGWLC::process(), post which discarded. * Refactor code to remove coroutine based class definitions which are no longer needed and use direct function calls instead. * Check for cloud transitioned objects using tier-type and return error if accessed in RGWGetObj, RGWCopyObj and RGWPutObj ops. Signed-off-by: Soumya Koduri --- diff --git a/src/rgw/rgw_cr_rest.cc b/src/rgw/rgw_cr_rest.cc index c270067b2b3..0bd169f99e7 100644 --- a/src/rgw/rgw_cr_rest.cc +++ b/src/rgw/rgw_cr_rest.cc @@ -349,99 +349,3 @@ int RGWStreamSpliceCR::operate(const DoutPrefixProvider *dpp) { return 0; } -RGWStreamReadCRF::RGWStreamReadCRF(std::unique_ptr* obj, - RGWObjectCtx& obj_ctx) : read_op((*obj)->get_read_op(&obj_ctx)) {} -RGWStreamReadCRF::~RGWStreamReadCRF() {} - -RGWStreamWriteCR::RGWStreamWriteCR(CephContext* _cct, RGWHTTPManager* _mgr, - shared_ptr& _in_crf, - shared_ptr& _out_crf) : RGWCoroutine(_cct), cct(_cct), http_manager(_mgr), - in_crf(_in_crf), out_crf(_out_crf) {} -RGWStreamWriteCR::~RGWStreamWriteCR() { } - -int RGWStreamWriteCR::operate(const DoutPrefixProvider* dpp) { - reenter(this) { - ret = in_crf->init(); - if (ret < 0) { - ldout(cct, 0) << "ERROR: fail to initialize in_crf, ret = " << ret << dendl; - return set_cr_error(ret); - } - in_crf->get_range(ofs, end); - rest_obj = in_crf->get_rest_obj(); - - do { - bl.clear(); - yield { - ret = in_crf->read(ofs, end, bl); - if (ret < 0) { - ldout(cct, 0) << "ERROR: fail to read object data, ret = " << ret << dendl; - return set_cr_error(ret); - } - } - if (retcode < 0) { - ldout(cct, 20) << __func__ << ": read_op.read() retcode=" << retcode << dendl; - return set_cr_error(ret); - } - - read_len = bl.length(); - - if (bl.length() == 0) { - break; - } - - if (!sent_attrs) { - ret = out_crf->init(); - if (ret < 0) { - ldout(cct, 0) << "ERROR: fail to initialize out_crf, ret = " << ret << dendl; - return set_cr_error(ret); - } - - out_crf->send_ready(dpp, rest_obj); - ret = out_crf->send(); - if (ret < 0) { - return set_cr_error(ret); - } - sent_attrs = true; - } - - total_read += bl.length(); - - do { - yield { - ret = out_crf->write(bl, &need_retry); - if (ret < 0) { - return set_cr_error(ret); - } - } - - if (retcode < 0) { - ldout(cct, 20) << __func__ << ": out_crf->write() retcode=" << retcode << dendl; - return set_cr_error(ret); - } - } while (need_retry); - - ofs += read_len; - - } while (ofs <= end); - - do { - /* Ensure out_crf is initialized */ - if (!sent_attrs) { - break; - } - - /* This has to be under yield. Otherwise sometimes this loop - * never finishes, infinitely waiting for req to be done. - */ - yield { - int ret = out_crf->drain_writes(&need_retry); - if (ret < 0) { - return set_cr_error(ret); - } - } - } while (need_retry); - - return set_cr_done(); - } - return 0; -} diff --git a/src/rgw/rgw_cr_rest.h b/src/rgw/rgw_cr_rest.h index e09c554f7aa..cb103aeb834 100644 --- a/src/rgw/rgw_cr_rest.h +++ b/src/rgw/rgw_cr_rest.h @@ -9,7 +9,7 @@ #include "rgw_coroutine.h" #include "rgw_rest_conn.h" -#include "rgw_sal.h" + struct rgw_rest_obj { rgw_obj_key key; @@ -588,64 +588,3 @@ public: int operate(const DoutPrefixProvider *dpp) override; }; - -class RGWStreamReadCRF { -public: - std::unique_ptr read_op; - off_t ofs; - off_t end; - rgw_rest_obj rest_obj; - std::unique_ptr* obj; - - RGWStreamReadCRF(std::unique_ptr* obj, RGWObjectCtx& obj_ctx); - virtual ~RGWStreamReadCRF(); - - virtual int init() {return 0; } - virtual int init_rest_obj() {return 0;} - - int set_range(off_t _ofs, off_t _end) { - ofs = _ofs; - end = _end; - - return 0; - } - - int get_range(off_t &_ofs, off_t &_end) { - _ofs = ofs; - _end = end; - - return 0; - } - - rgw_rest_obj get_rest_obj() { - return rest_obj; - } - - virtual int read(off_t ofs, off_t end, bufferlist &bl) {return 0;}; - -}; - -class RGWStreamWriteCR : public RGWCoroutine { - CephContext *cct; - RGWHTTPManager *http_manager; - string url; - std::shared_ptr in_crf; - std::shared_ptr out_crf; - bufferlist bl; - bool need_retry{false}; - bool sent_attrs{false}; - uint64_t total_read{0}; - int ret{0}; - off_t ofs; - off_t end; - uint64_t read_len = 0; - rgw_rest_obj rest_obj; - -public: - RGWStreamWriteCR(CephContext *_cct, RGWHTTPManager *_mgr, - std::shared_ptr& _in_crf, - std::shared_ptr& _out_crf); - ~RGWStreamWriteCR(); - - int operate(const DoutPrefixProvider *dpp) override; -}; diff --git a/src/rgw/rgw_lc.cc b/src/rgw/rgw_lc.cc index c9eb675f8c5..fc7e2455ed9 100644 --- a/src/rgw/rgw_lc.cc +++ b/src/rgw/rgw_lc.cc @@ -223,6 +223,7 @@ void *RGWLC::LCWorker::entry() { << r << dendl; } ldpp_dout(dpp, 2) << "life cycle: stop" << dendl; + cloud_targets.clear(); // clear cloud targets } if (lc->going_down()) break; @@ -684,18 +685,8 @@ public: static constexpr uint32_t FLAG_EWAIT_SYNC = 0x0001; static constexpr uint32_t FLAG_DWAIT_SYNC = 0x0002; static constexpr uint32_t FLAG_EDRAIN_SYNC = 0x0004; - static constexpr uint32_t FLAG_HTTP_MGR = 0x0008; private: - class C_WorkQTimerCtx: public Context { - WorkQ* wq; - public: - C_WorkQTimerCtx(WorkQ* _wq): wq(_wq) {} - void finish(int r) override { - wq->stop_http_manager(); - } - }; - const work_f bsf = [](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {}; RGWLC::LCWorker* wk; uint32_t qmax; @@ -705,28 +696,14 @@ private: uint32_t flags; vector items; work_f f; - std::unique_ptr crs; - std::unique_ptr http_manager; - bool is_http_mgr_started{false}; - ceph::mutex timer_mtx; - SafeTimer timer; - int timer_wait_sec = 200; //seconds - C_WorkQTimerCtx* timer_ctx = nullptr; public: WorkQ(RGWLC::LCWorker* wk, uint32_t ix, uint32_t qmax) - : wk(wk), qmax(qmax), ix(ix), flags(FLAG_NONE), f(bsf), - timer_mtx(ceph::make_mutex("WorkQTimerMutex")), - timer((CephContext*)(wk->cct), timer_mtx) + : wk(wk), qmax(qmax), ix(ix), flags(FLAG_NONE), f(bsf) { create(thr_name().c_str()); - timer.init(); } - ~WorkQ() { - timer.shutdown(); - } - std::string thr_name() { return std::string{"wp_thrd: "} + std::to_string(wk->ix) + ", " + std::to_string(ix); @@ -736,51 +713,6 @@ public: f = _f; } - RGWCoroutinesManager* get_crs() { return crs.get(); } - RGWHTTPManager* get_http_manager() { return http_manager.get(); } - - int start_http_manager(rgw::sal::Store* store) { - int ret = 0; - - if (is_http_mgr_started) - return 0; - - /* http_mngr */ - if(!crs) { - crs.reset(new RGWCoroutinesManager(store->ctx(), store->get_cr_registry())); - } - if (!http_manager) { - http_manager.reset(new RGWHTTPManager(store->ctx(), crs.get()->get_completion_mgr())); - } - - ret = http_manager->start(); - if (ret < 0) { - dout(5) << "RGWLC:: http_manager->start() failed ret = " - << ret << dendl; - return ret; - } - - is_http_mgr_started = true; - flags |= FLAG_HTTP_MGR; - - return ret; - } - - int stop_http_manager() { - if (!is_http_mgr_started) { - return 0; - } - - http_manager.reset(); - crs.reset(); - - is_http_mgr_started = false; - flags &= ~FLAG_HTTP_MGR; - timer.cancel_all_events(); - timer_ctx = nullptr; - return 0; - } - void enqueue(WorkItem&& item) { unique_lock uniq(mtx); while ((!wk->get_lc()->going_down()) && @@ -788,10 +720,6 @@ public: flags |= FLAG_EWAIT_SYNC; cv.wait_for(uniq, 200ms); } - if (timer_ctx && (flags & FLAG_HTTP_MGR)) { - timer.cancel_all_events(); - timer_ctx = nullptr; - } items.push_back(item); if (flags & FLAG_DWAIT_SYNC) { flags &= ~FLAG_DWAIT_SYNC; @@ -816,10 +744,6 @@ private: if (flags & FLAG_EDRAIN_SYNC) { flags &= ~FLAG_EDRAIN_SYNC; } - if ((flags & FLAG_HTTP_MGR) && !timer_ctx) { - timer_ctx = new C_WorkQTimerCtx(this); - timer.add_event_after(timer_wait_sec, timer_ctx); - } flags |= FLAG_DWAIT_SYNC; cv.wait_for(uniq, 200ms); } @@ -1329,28 +1253,26 @@ public: } /* find out if the the storage class is remote cloud */ - int get_tier_target(const RGWZoneGroup &zonegroup, rgw_placement_rule& rule, - string& storage_class, RGWZoneGroupPlacementTier &tier) { + int get_tier_target(const RGWZoneGroup &zonegroup, const rgw_placement_rule& rule, + RGWZoneGroupPlacementTier &tier) { std::map::const_iterator titer; titer = zonegroup.placement_targets.find(rule.name); if (titer == zonegroup.placement_targets.end()) { return -ENOENT; } - if (storage_class.empty()) { - storage_class = rule.storage_class; - } - const auto& target_rule = titer->second; std::map::const_iterator ttier; - ttier = target_rule.tier_targets.find(storage_class); + ttier = target_rule.tier_targets.find(rule.storage_class); if (ttier != target_rule.tier_targets.end()) { tier = ttier->second; + } else { // not found + return -ENOENT; } return 0; } - int delete_tier_obj(lc_op_ctx& oc, RGWLCCloudTierCtx& tier_ctx) { + int delete_tier_obj(lc_op_ctx& oc) { int ret = 0; /* If bucket is versioned, create delete_marker for current version @@ -1387,28 +1309,28 @@ public: } attrs = oc.obj->get_attrs(); - (*tier_ctx.obj)->set_atomic(&tier_ctx.rctx); - RGWObjState *s = tier_ctx.rctx.get_state((*tier_ctx.obj)->get_obj()); - std::unique_ptr obj_op(oc.obj->get_write_op(&oc.rctx)); - - obj_op->params.modify_tail = true; - obj_op->params.flags = PUT_OBJ_CREATE; - obj_op->params.category = RGWObjCategory::CloudTiered; - obj_op->params.delete_at = real_time(); + rgw::sal::RadosStore *rados = static_cast(oc.store); + RGWRados::Object op_target(rados->getRados(), oc.bucket->get_info(), oc.rctx, oc.obj->get_obj()); + RGWRados::Object::Write obj_op(&op_target); + + obj_op.meta.modify_tail = true; + obj_op.meta.flags = PUT_OBJ_CREATE; + obj_op.meta.category = RGWObjCategory::CloudTiered; + obj_op.meta.delete_at = real_time(); bufferlist blo; - blo.append(""); - obj_op->params.data = &blo; - obj_op->params.if_match = NULL; - obj_op->params.if_nomatch = NULL; - obj_op->params.user_data = NULL; - obj_op->params.zones_trace = NULL; - obj_op->params.delete_at = real_time(); - obj_op->params.olh_epoch = tier_ctx.o.versioned_epoch; + obj_op.meta.data = &blo; + obj_op.meta.if_match = NULL; + obj_op.meta.if_nomatch = NULL; + obj_op.meta.user_data = NULL; + obj_op.meta.zones_trace = NULL; + obj_op.meta.delete_at = real_time(); + obj_op.meta.olh_epoch = tier_ctx.o.versioned_epoch; RGWObjManifest *pmanifest; + RGWObjManifest manifest; - pmanifest = &(*s->manifest); + pmanifest = &manifest; RGWObjTier tier_config; tier_config.name = oc.tier.storage_class; tier_config.tier_placement = oc.tier; @@ -1421,19 +1343,13 @@ public: rgw_placement_rule target_placement; target_placement.inherit_from(tier_ctx.bucket_info.placement_rule); target_placement.storage_class = oc.tier.storage_class; - pmanifest->set_head(target_placement, (*tier_ctx.obj)->get_obj(), 0); + pmanifest->set_head(target_placement, tier_ctx.obj->get_obj(), 0); - pmanifest->set_tail_placement(target_placement, (*tier_ctx.obj)->get_obj().bucket); + pmanifest->set_tail_placement(target_placement, tier_ctx.obj->get_obj().bucket); - /* should the obj_size also be set to '0' or is it needed - * to keep track of original size before transition. - * But unless obj_size is set to '0', obj_iters cannot - * be reset I guess. For regular transitioned objects - * obj_size remains the same even when object is moved to other - * storage class. So maybe better to keep it the same way. - */ + pmanifest->set_obj_size(0); - obj_op->params.manifest = pmanifest; + obj_op.meta.manifest = pmanifest; /* update storage class */ bufferlist bl; @@ -1443,15 +1359,7 @@ public: attrs.erase(RGW_ATTR_ID_TAG); attrs.erase(RGW_ATTR_TAIL_TAG); - obj_op->params.attrs = &attrs; - - r = obj_op->prepare(null_yield); - if (r < 0) { - return r; - } - - - r = obj_op->write_meta(oc.dpp, tier_ctx.o.meta.size, 0, null_yield); + r = obj_op.write_meta(oc.dpp, 0, 0, attrs, null_yield); if (r < 0) { return r; } @@ -1460,8 +1368,6 @@ public: } int transition_obj_to_cloud(lc_op_ctx& oc) { - std::shared_ptr conn; - /* init */ string id = "cloudid"; string endpoint = oc.tier.t.s3.endpoint; @@ -1482,65 +1388,48 @@ public: boost::algorithm::to_lower(bucket_name); } - conn.reset(new S3RESTConn(oc.cct, oc.store, id, { endpoint }, key, region, host_style)); - - int ret = oc.wq->start_http_manager(oc.store); - if (ret < 0) { - ldpp_dout(oc.dpp, 0) << "failed in start_http_manager() ret=" << ret << dendl; - return ret; - } - - RGWCoroutinesManager* crs = oc.wq->get_crs(); - RGWHTTPManager* http_manager = oc.wq->get_http_manager(); - - if (!crs || !http_manager) { - /* maybe race..return and retry */ - ldpp_dout(oc.dpp, 0) << " http_manager and crs not initialized" << dendl; - return -1; - } + /* Create RGW REST connection */ + S3RESTConn conn(oc.cct, oc.store, id, { endpoint }, key, region, host_style); RGWLCCloudTierCtx tier_ctx(oc.cct, oc.dpp, oc.o, oc.store, oc.bucket->get_info(), - &oc.obj, oc.rctx, conn, bucket_name, - oc.tier.t.s3.target_storage_class, http_manager); + oc.obj.get(), oc.rctx, conn, bucket_name, + oc.tier.t.s3.target_storage_class); tier_ctx.acl_mappings = oc.tier.t.s3.acl_mappings; tier_ctx.multipart_min_part_size = oc.tier.t.s3.multipart_min_part_size; tier_ctx.multipart_sync_threshold = oc.tier.t.s3.multipart_sync_threshold; tier_ctx.storage_class = oc.tier.storage_class; - bool al_tiered = false; + // check if target_path is already created + std::set& cloud_targets = oc.env.worker->get_cloud_targets(); + std::pair::iterator, bool> it; - /* Since multiple zones may try to transition the same object to the cloud, - * verify if the object is already transitioned. And since its just a best - * effort, do not bail out in case of any errors. - */ - ret = crs->run(oc.dpp, new RGWLCCloudCheckCR(tier_ctx, &al_tiered)); - - if (ret < 0) { - ldpp_dout(oc.dpp, 0) << "ERROR: failed in RGWCloudCheckCR() ret=" << ret << dendl; - } + it = cloud_targets.insert(bucket_name); + tier_ctx.target_bucket_created = !(it.second); + + ldpp_dout(oc.dpp, 0) << "Transitioning object(" << oc.o.key << ") to the cloud endpoint(" << endpoint << ")" << dendl; + + /* Transition object to cloud end point */ + int ret = rgw_cloud_tier_transfer_object(tier_ctx); - if (al_tiered) { - ldout(tier_ctx.cct, 20) << "Object (" << oc.o.key << ") is already tiered" << dendl; - return 0; - } else { - ret = crs->run(oc.dpp, new RGWLCCloudTierCR(tier_ctx)); - } - if (ret < 0) { - ldpp_dout(oc.dpp, 0) << "ERROR: failed in RGWCloudTierCR() ret=" << ret << dendl; + ldpp_dout(oc.dpp, 0) << "ERROR: failed to transfer object(" << oc.o.key << ") to the cloud endpoint(" << endpoint << ") ret=" << ret << dendl; return ret; + + if (!tier_ctx.target_bucket_created) { + cloud_targets.erase(it.first); + } } if (delete_object) { - ret = delete_tier_obj(oc, tier_ctx); + ret = delete_tier_obj(oc); if (ret < 0) { - ldpp_dout(oc.dpp, 0) << "ERROR: Deleting tier object failed ret=" << ret << dendl; + ldpp_dout(oc.dpp, 0) << "ERROR: Deleting tier object(" << oc.o.key << ") failed ret=" << ret << dendl; return ret; } } else { ret = update_tier_obj(oc, tier_ctx); if (ret < 0) { - ldpp_dout(oc.dpp, 0) << "ERROR: Updating tier object failed ret=" << ret << dendl; + ldpp_dout(oc.dpp, 0) << "ERROR: Updating tier object(" << oc.o.key << ") failed ret=" << ret << dendl; return ret; } } @@ -1551,6 +1440,13 @@ public: int process(lc_op_ctx& oc) { auto& o = oc.o; int r; + + if (oc.o.meta.category == RGWObjCategory::CloudTiered) { + /* Skip objects which are already cloud tiered. */ + ldpp_dout(oc.dpp, 30) << "Object(key:" << oc.o.key << ") is already cloud tiered to cloud-s3 tier: " << oc.o.meta.storage_class << dendl; + return 0; + } + std::string tier_type = ""; const RGWZoneGroup& zonegroup = oc.store->get_zone()->get_zonegroup(); @@ -1558,16 +1454,10 @@ public: target_placement.inherit_from(oc.bucket->get_placement_rule()); target_placement.storage_class = transition.storage_class; - r = get_tier_target(zonegroup, target_placement, target_placement.storage_class, oc.tier); + r = get_tier_target(zonegroup, target_placement, oc.tier); if (!r && oc.tier.tier_type == "cloud-s3") { - ldpp_dout(oc.dpp, 20) << "Found cloud s3 tier: " << target_placement.storage_class << dendl; - if (oc.o.meta.category == RGWObjCategory::CloudTiered) { - /* Skip objects which are already cloud tiered. */ - ldpp_dout(oc.dpp, 20) << "Object(key:" << oc.o.key << ") is already cloud tiered to cloud-s3 tier: " << oc.o.meta.storage_class << dendl; - return 0; - } - + ldpp_dout(oc.dpp, 30) << "Found cloud s3 tier: " << target_placement.storage_class << dendl; if (!oc.o.is_current() && !pass_object_lock_check(oc.store, oc.obj.get(), oc.rctx, oc.dpp)) { /* Skip objects which has object lock enabled. */ diff --git a/src/rgw/rgw_lc.h b/src/rgw/rgw_lc.h index d66ad3a504a..ac1612f46aa 100644 --- a/src/rgw/rgw_lc.h +++ b/src/rgw/rgw_lc.h @@ -480,6 +480,10 @@ public: std::mutex lock; std::condition_variable cond; WorkPool* workpool{nullptr}; + /* save the target bucket names created as part of object transition + * to cloud. This list is maintained for the duration of each RGWLC::process() + * post which it is discarded. */ + std::set cloud_targets; public: @@ -493,6 +497,7 @@ public: void stop(); bool should_work(utime_t& now); int schedule_next_start_time(utime_t& start, utime_t& now); + std::set& get_cloud_targets() { return cloud_targets; } ~LCWorker(); friend class RGWRados; diff --git a/src/rgw/rgw_lc_tier.cc b/src/rgw/rgw_lc_tier.cc index d7d62cbf6a6..704b7dee2e2 100644 --- a/src/rgw/rgw_lc_tier.cc +++ b/src/rgw/rgw_lc_tier.cc @@ -19,13 +19,61 @@ #include #include -#include - #define dout_context g_ceph_context #define dout_subsys ceph_subsys_rgw using namespace std; +struct rgw_lc_multipart_part_info { + int part_num{0}; + uint64_t ofs{0}; + uint64_t size{0}; + std::string etag; +}; + +struct rgw_lc_obj_properties { + ceph::real_time mtime; + std::string etag; + uint64_t versioned_epoch{0}; + std::map& target_acl_mappings; + std::string target_storage_class; + + rgw_lc_obj_properties(ceph::real_time _mtime, std::string _etag, + uint64_t _versioned_epoch, std::map& _t_acl_mappings, + std::string _t_storage_class) : + mtime(_mtime), etag(_etag), + versioned_epoch(_versioned_epoch), + target_acl_mappings(_t_acl_mappings), + target_storage_class(_t_storage_class) {} +}; + +struct rgw_lc_multipart_upload_info { + std::string upload_id; + uint64_t obj_size; + ceph::real_time mtime; + std::string etag; + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(upload_id, bl); + encode(obj_size, bl); + encode(mtime, bl); + encode(etag, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(upload_id, bl); + decode(obj_size, bl); + decode(mtime, bl); + decode(etag, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(rgw_lc_multipart_upload_info) + static inline string get_key_instance(const rgw_obj_key& key) { if (!key.instance.empty() && @@ -51,10 +99,92 @@ static inline string obj_to_aws_path(const rgw_obj& obj) return path; } +static int read_upload_status(const DoutPrefixProvider *dpp, rgw::sal::Store *store, + const rgw_raw_obj *status_obj, rgw_lc_multipart_upload_info *status) +{ + int ret = 0; + rgw::sal::RadosStore *rados = dynamic_cast(store); + + if (!rados) { + ldpp_dout(dpp, 0) << "ERROR: Not a RadosStore. Cannot be transitioned to cloud." << dendl; + return -1; + } + + auto& pool = status_obj->pool; + const auto oid = status_obj->oid; + auto obj_ctx = rados->svc()->sysobj->init_obj_ctx(); + bufferlist bl; + + ret = rgw_get_system_obj(obj_ctx, pool, oid, bl, nullptr, nullptr, + null_yield, dpp); + + if (ret < 0) { + return ret; + } + + if (bl.length() > 0) { + try { + auto p = bl.cbegin(); + status->decode(p); + } catch (buffer::error& e) { + ldpp_dout(dpp, 10) << "failed to decode status obj: " + << e.what() << dendl; + return -EIO; + } + } else { + return -EIO; + } + + return 0; +} + +static int put_upload_status(const DoutPrefixProvider *dpp, rgw::sal::Store *store, + const rgw_raw_obj *status_obj, rgw_lc_multipart_upload_info *status) +{ + int ret = 0; + rgw::sal::RadosStore *rados = dynamic_cast(store); + + if (!rados) { + ldpp_dout(dpp, 0) << "ERROR: Not a RadosStore. Cannot be transitioned to cloud." << dendl; + return -1; + } + + auto& pool = status_obj->pool; + const auto oid = status_obj->oid; + auto obj_ctx = rados->svc()->sysobj->init_obj_ctx(); + bufferlist bl; + status->encode(bl); + + ret = rgw_put_system_obj(dpp, obj_ctx, pool, oid, bl, true, nullptr, + real_time{}, null_yield); + + return ret; +} + +static int delete_upload_status(const DoutPrefixProvider *dpp, rgw::sal::Store *store, + const rgw_raw_obj *status_obj) +{ + int ret = 0; + rgw::sal::RadosStore *rados = dynamic_cast(store); + + if (!rados) { + ldpp_dout(dpp, 0) << "ERROR: Not a RadosStore. Cannot be transitioned to cloud." << dendl; + return -1; + } + + auto& pool = status_obj->pool; + const auto oid = status_obj->oid; + auto sysobj = rados->svc()->sysobj; + + ret = rgw_delete_system_obj(dpp, sysobj, pool, oid, nullptr, null_yield); + + return ret; +} + static std::set keep_headers = { "CONTENT_TYPE", - "CONTENT_ENCODING", - "CONTENT_DISPOSITION", - "CONTENT_LANGUAGE" }; + "CONTENT_ENCODING", + "CONTENT_DISPOSITION", + "CONTENT_LANGUAGE" }; /* * mapping between rgw object attrs and output http fields @@ -75,7 +205,7 @@ static std::set keep_headers = { "CONTENT_TYPE", }; */ static void init_headers(map& attrs, - map& headers) + map& headers) { for (auto& kv : attrs) { const char * name = kv.first.c_str(); @@ -83,14 +213,11 @@ static void init_headers(map& attrs, if (aiter != std::end(rgw_to_http_attrs)) { headers[aiter->second] = rgw_bl_str(kv.second); - } else if (strcmp(name, RGW_ATTR_SLO_UINDICATOR) == 0) { - // this attr has an extra length prefix from encode() in prior versions - headers["X-Object-Meta-Static-Large-Object"] = "True"; } else if (strncmp(name, RGW_ATTR_META_PREFIX, sizeof(RGW_ATTR_META_PREFIX)-1) == 0) { name += sizeof(RGW_ATTR_META_PREFIX) - 1; string sname(name); - string name_prefix = "X-Object-Meta-"; + string name_prefix = RGW_ATTR_META_PREFIX; char full_name_buf[name_prefix.size() + sname.size() + 1]; snprintf(full_name_buf, sizeof(full_name_buf), "%.*s%.*s", static_cast(name_prefix.length()), @@ -99,71 +226,104 @@ static void init_headers(map& attrs, sname.data()); headers[full_name_buf] = rgw_bl_str(kv.second); } else if (strcmp(name,RGW_ATTR_CONTENT_TYPE) == 0) { - /* Verify if its right way to copy this field */ headers["CONTENT_TYPE"] = rgw_bl_str(kv.second); } } } -int RGWLCStreamGetCRF::init(const DoutPrefixProvider *dpp) { - /* init input connection */ - req_params.get_op = false; /* Need only headers */ - req_params.prepend_metadata = true; - req_params.rgwx_stat = true; - req_params.sync_manifest = true; - req_params.skip_decrypt = true; +/* Read object or just head from remote endpoint. For now initializes only headers, + * but can be extended to fetch etag, mtime etc if needed. + */ +static int cloud_tier_get_object(RGWLCCloudTierCtx& tier_ctx, bool head, + std::map& headers) { + RGWRESTConn::get_obj_params req_params; + RGWBucketInfo b; + std::string target_obj_name; + int ret = 0; + std::unique_ptr dest_bucket; + std::unique_ptr dest_obj; + rgw_lc_obj_properties obj_properties(tier_ctx.o.meta.mtime, tier_ctx.o.meta.etag, + tier_ctx.o.versioned_epoch, tier_ctx.acl_mappings, + tier_ctx.target_storage_class); + std::string etag; + RGWRESTStreamRWRequest *in_req; + + b.bucket.name = tier_ctx.target_bucket_name; + target_obj_name = tier_ctx.bucket_info.bucket.name + "/" + + tier_ctx.obj->get_name(); + if (!tier_ctx.o.is_current()) { + target_obj_name += get_key_instance(tier_ctx.obj->get_key()); + } - string etag; - real_time set_mtime; + ret = tier_ctx.store->get_bucket(nullptr, b, &dest_bucket); + if (ret < 0) { + ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to initialize dest_bucket - " << tier_ctx.target_bucket_name << " , reterr = " << ret << dendl; + return ret; + } - int ret = conn->get_obj(dpp, dest_obj, req_params, true /* send */, &in_req); - if (ret < 0) { - ldout(cct, 0) << "ERROR: " << __func__ << "(): conn->get_obj() returned ret=" << ret << dendl; - return ret; - } + dest_obj = dest_bucket->get_object(rgw_obj_key(target_obj_name)); + if (!dest_obj) { + ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to initialize dest_object path - " << target_obj_name << dendl; + return -1; + } + /* init input connection */ + req_params.get_op = !head; + req_params.prepend_metadata = true; + req_params.rgwx_stat = true; + req_params.sync_manifest = true; + req_params.skip_decrypt = true; + + ret = tier_ctx.conn.get_obj(tier_ctx.dpp, dest_obj.get(), req_params, true /* send */, &in_req); + if (ret < 0) { + ldpp_dout(tier_ctx.dpp, 0) << "ERROR: " << __func__ << "(): conn.get_obj() returned ret=" << ret << dendl; + return ret; + } - /* fetch only headers */ - ret = conn->complete_request(in_req, nullptr, nullptr, nullptr, nullptr, &headers, null_yield); - if (ret < 0 && ret != -ENOENT) { - ldout(cct, 20) << "ERROR: " << __func__ << "(): conn->complete_request() returned ret=" << ret << dendl; - return ret; - } - return 0; + /* fetch headers */ + ret = tier_ctx.conn.complete_request(in_req, nullptr, nullptr, nullptr, nullptr, &headers, null_yield); + if (ret < 0 && ret != -ENOENT) { + ldpp_dout(tier_ctx.dpp, 20) << "ERROR: " << __func__ << "(): conn.complete_request() returned ret=" << ret << dendl; + return ret; } + return 0; +} -int RGWLCStreamGetCRF::is_already_tiered() { - char buf[32]; - map attrs = headers; +static bool is_already_tiered(const DoutPrefixProvider *dpp, + std::map& headers, + ceph::real_time& mtime) { + char buf[32]; + map attrs = headers; - for (const auto& a : attrs) { - ldout(cct, 20) << "GetCrf attr[" << a.first << "] = " << a.second <params.lastmod = &read_mtime; - return read_op->read(ofs, end, bl, y, dpp); + int ret = read_op->prepare(y, dpp); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: fail to prepare read_op, ret = " << ret << dendl; + return ret; } -}; - -class RGWLCStreamPutCRF : public RGWStreamWriteHTTPResourceCRF -{ - CephContext *cct; - RGWHTTPManager *http_manager; - rgw_lc_obj_properties obj_properties; - std::shared_ptr conn; - rgw::sal::Object* dest_obj; - string etag; + if (read_mtime != mtime) { + /* raced */ + return -ECANCELED; + } - public: - RGWLCStreamPutCRF(CephContext *_cct, - RGWCoroutinesEnv *_env, - RGWCoroutine *_caller, - RGWHTTPManager *_http_manager, - const rgw_lc_obj_properties& _obj_properties, - std::shared_ptr _conn, - rgw::sal::Object* _dest_obj) : - RGWStreamWriteHTTPResourceCRF(_cct, _env, _caller, _http_manager), - cct(_cct), http_manager(_http_manager), obj_properties(_obj_properties), conn(_conn), dest_obj(_dest_obj) { - } + attrs = obj->get_attrs(); + obj_size = obj->get_obj_size(); + ret = init_rest_obj(); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: fail to initialize rest_obj, ret = " << ret << dendl; + return ret; + } - int init() override { - /* init output connection */ - RGWRESTStreamS3PutObj *out_req{nullptr}; + if (!multipart) { + set_range(0, obj_size - 1); + } else { + set_range(m_part_off, m_part_end); + } + return 0; +} - if (multipart.is_multipart) { - char buf[32]; - snprintf(buf, sizeof(buf), "%d", multipart.part_num); - rgw_http_param_pair params[] = { { "uploadId", multipart.upload_id.c_str() }, - { "partNumber", buf }, - { nullptr, nullptr } }; - conn->put_obj_send_init(dest_obj, params, &out_req); - } else { - conn->put_obj_send_init(dest_obj, nullptr, &out_req); - } +int RGWLCStreamRead::init_rest_obj() { + /* Initialize rgw_rest_obj. + * Reference: do_decode_rest_obj + * Check how to copy headers content */ + rest_obj.init(obj->get_key()); - set_req(out_req); + if (!multipart) { + rest_obj.content_len = obj_size; + } else { + rest_obj.content_len = m_part_size; + } - return RGWStreamWriteHTTPResourceCRF::init(); + /* For mulitpart attrs are sent as part of InitMultipartCR itself */ + if (multipart) { + return 0; } - static bool keep_attr(const string& h) { - return (keep_headers.find(h) != keep_headers.end() || - boost::algorithm::starts_with(h, "X_AMZ_")); + /* + * XXX: verify if its right way to copy attrs into rest obj + */ + init_headers(attrs, rest_obj.attrs); + + rest_obj.acls.set_ctx(cct); + const auto aiter = attrs.find(RGW_ATTR_ACL); + if (aiter != attrs.end()) { + bufferlist& bl = aiter->second; + auto bliter = bl.cbegin(); + try { + rest_obj.acls.decode(bliter); + } catch (buffer::error& err) { + ldpp_dout(dpp, 0) << "ERROR: failed to decode policy off attrs" << dendl; + return -EIO; + } + } else { + ldpp_dout(dpp, 0) << "WARNING: acl attrs not provided" << dendl; } + return 0; +} - static void init_send_attrs(CephContext *cct, const rgw_rest_obj& rest_obj, - const rgw_lc_obj_properties& obj_properties, - map *attrs) { +int RGWLCStreamRead::read(off_t ofs, off_t end, RGWGetDataCB *out_cb) { + int ret = read_op->iterate(dpp, ofs, end, out_cb, null_yield); + return ret; +} - map& acl_mappings(obj_properties.target_acl_mappings); - string target_storage_class = obj_properties.target_storage_class; +int RGWLCCloudStreamPut::init() { + /* init output connection */ + if (multipart.is_multipart) { + char buf[32]; + snprintf(buf, sizeof(buf), "%d", multipart.part_num); + rgw_http_param_pair params[] = { { "uploadId", multipart.upload_id.c_str() }, + { "partNumber", buf }, + { nullptr, nullptr } }; + conn.put_obj_send_init(dest_obj, params, &out_req); + } else { + conn.put_obj_send_init(dest_obj, nullptr, &out_req); + } - attrs->clear(); + return 0; +} - for (auto& hi : rest_obj.attrs) { - if (keep_attr(hi.first)) { - attrs->insert(hi); - } - } +bool RGWLCCloudStreamPut::keep_attr(const string& h) { + return (keep_headers.find(h) != keep_headers.end() || + boost::algorithm::starts_with(h, "X_AMZ_")); +} - const auto acl = rest_obj.acls.get_acl(); +void RGWLCCloudStreamPut::init_send_attrs(const DoutPrefixProvider *dpp, + const rgw_rest_obj& rest_obj, + const rgw_lc_obj_properties& obj_properties, + std::map& attrs) { - map > access_map; + map& acl_mappings(obj_properties.target_acl_mappings); + const std::string& target_storage_class = obj_properties.target_storage_class; - if (!acl_mappings.empty()) { - for (auto& grant : acl.get_grant_map()) { - auto& orig_grantee = grant.first; - auto& perm = grant.second; + attrs.clear(); - string grantee; + for (auto& hi : rest_obj.attrs) { + if (keep_attr(hi.first)) { + attrs.insert(hi); + } + } - const auto& am = acl_mappings; + const auto acl = rest_obj.acls.get_acl(); - const auto iter = am.find(orig_grantee); - if (iter == am.end()) { - ldout(cct, 20) << "acl_mappings: Could not find " << orig_grantee << " .. ignoring" << dendl; - continue; - } + map > access_map; - grantee = iter->second.dest_id; - - string type; - - switch (iter->second.type) { - case ACL_TYPE_CANON_USER: - type = "id"; - break; - case ACL_TYPE_EMAIL_USER: - type = "emailAddress"; - break; - case ACL_TYPE_GROUP: - type = "uri"; - break; - default: - continue; - } + if (!acl_mappings.empty()) { + for (auto& grant : acl.get_grant_map()) { + auto& orig_grantee = grant.first; + auto& perm = grant.second; - string tv = type + "=" + grantee; + string grantee; - int flags = perm.get_permission().get_permissions(); - if ((flags & RGW_PERM_FULL_CONTROL) == RGW_PERM_FULL_CONTROL) { - access_map[flags].push_back(tv); - continue; - } + const auto& am = acl_mappings; - for (int i = 1; i <= RGW_PERM_WRITE_ACP; i <<= 1) { - if (flags & i) { - access_map[i].push_back(tv); - } - } + const auto iter = am.find(orig_grantee); + if (iter == am.end()) { + ldpp_dout(dpp, 20) << "acl_mappings: Could not find " << orig_grantee << " .. ignoring" << dendl; + continue; } - } - for (const auto& aiter : access_map) { - int grant_type = aiter.first; + grantee = iter->second.dest_id; - string header_str("x-amz-grant-"); + string type; - switch (grant_type) { - case RGW_PERM_READ: - header_str.append("read"); - break; - case RGW_PERM_WRITE: - header_str.append("write"); - break; - case RGW_PERM_READ_ACP: - header_str.append("read-acp"); + switch (iter->second.type) { + case ACL_TYPE_CANON_USER: + type = "id"; break; - case RGW_PERM_WRITE_ACP: - header_str.append("write-acp"); + case ACL_TYPE_EMAIL_USER: + type = "emailAddress"; break; - case RGW_PERM_FULL_CONTROL: - header_str.append("full-control"); + case ACL_TYPE_GROUP: + type = "uri"; break; + default: + continue; } - string s; + string tv = type + "=" + grantee; - for (const auto& viter : aiter.second) { - if (!s.empty()) { - s.append(", "); - } - s.append(viter); + int flags = perm.get_permission().get_permissions(); + if ((flags & RGW_PERM_FULL_CONTROL) == RGW_PERM_FULL_CONTROL) { + access_map[flags].push_back(tv); + continue; } - ldout(cct, 20) << "acl_mappings: set acl: " << header_str << "=" << s << dendl; - - (*attrs)[header_str] = s; + for (int i = 1; i <= RGW_PERM_WRITE_ACP; i <<= 1) { + if (flags & i) { + access_map[i].push_back(tv); + } + } } + } - /* Copy target storage class */ - if (!target_storage_class.empty()) { - (*attrs)["x-amz-storage-class"] = target_storage_class; - } else { - (*attrs)["x-amz-storage-class"] = "STANDARD"; + for (const auto& aiter : access_map) { + int grant_type = aiter.first; + + string header_str("x-amz-grant-"); + + switch (grant_type) { + case RGW_PERM_READ: + header_str.append("read"); + break; + case RGW_PERM_WRITE: + header_str.append("write"); + break; + case RGW_PERM_READ_ACP: + header_str.append("read-acp"); + break; + case RGW_PERM_WRITE_ACP: + header_str.append("write-acp"); + break; + case RGW_PERM_FULL_CONTROL: + header_str.append("full-control"); + break; } - /* New attribute to specify its transitioned from RGW */ - (*attrs)["x-amz-meta-rgwx-source"] = "rgw"; + string s; - char buf[32]; - snprintf(buf, sizeof(buf), "%llu", (long long)obj_properties.versioned_epoch); - (*attrs)["x-amz-meta-rgwx-versioned-epoch"] = buf; - - utime_t ut(obj_properties.mtime); - snprintf(buf, sizeof(buf), "%lld.%09lld", - (long long)ut.sec(), - (long long)ut.nsec()); - - (*attrs)["x-amz-meta-rgwx-source-mtime"] = buf; - (*attrs)["x-amz-meta-rgwx-source-etag"] = obj_properties.etag; - (*attrs)["x-amz-meta-rgwx-source-key"] = rest_obj.key.name; - if (!rest_obj.key.instance.empty()) { - (*attrs)["x-amz-meta-rgwx-source-version-id"] = rest_obj.key.instance; - } - for (const auto& a : (*attrs)) { - ldout(cct, 30) << "init_send_attrs attr[" << a.first << "] = " << a.second < new_attrs; - if (!multipart.is_multipart) { - init_send_attrs(cct, rest_obj, obj_properties, &new_attrs); - } + /* New attribute to specify its transitioned from RGW */ + attrs["x-amz-meta-rgwx-source"] = "rgw"; - r->set_send_length(rest_obj.content_len); + char buf[32]; + snprintf(buf, sizeof(buf), "%llu", (long long)obj_properties.versioned_epoch); + attrs["x-amz-meta-rgwx-versioned-epoch"] = buf; - RGWAccessControlPolicy policy; + utime_t ut(obj_properties.mtime); + snprintf(buf, sizeof(buf), "%lld.%09lld", + (long long)ut.sec(), + (long long)ut.nsec()); - r->send_ready(dpp, conn->get_key(), new_attrs, policy); + attrs["x-amz-meta-rgwx-source-mtime"] = buf; + attrs["x-amz-meta-rgwx-source-etag"] = obj_properties.etag; + attrs["x-amz-meta-rgwx-source-key"] = rest_obj.key.name; + if (!rest_obj.key.instance.empty()) { + attrs["x-amz-meta-rgwx-source-version-id"] = rest_obj.key.instance; + } + for (const auto& a : attrs) { + ldpp_dout(dpp, 30) << "init_send_attrs attr[" << a.first << "] = " << a.second <& headers) { - for (const auto& h : headers) { - if (h.first == "ETAG") { - etag = h.second; - } - } +void RGWLCCloudStreamPut::send_ready(const DoutPrefixProvider *dpp, const rgw_rest_obj& rest_obj) { + auto r = static_cast(out_req); + + std::map new_attrs; + if (!multipart.is_multipart) { + init_send_attrs(dpp, rest_obj, obj_properties, new_attrs); } - bool get_etag(string *petag) { - if (etag.empty()) { - return false; + r->set_send_length(rest_obj.content_len); + + RGWAccessControlPolicy policy; + + r->send_ready(dpp, conn.get_key(), new_attrs, policy); +} + +void RGWLCCloudStreamPut::handle_headers(const map& headers) { + for (const auto& h : headers) { + if (h.first == "ETAG") { + etag = h.second; } - *petag = etag; - return true; } -}; +} +bool RGWLCCloudStreamPut::get_etag(string *petag) { + if (etag.empty()) { + return false; + } + *petag = etag; + return true; +} + +void RGWLCCloudStreamPut::set_multipart(const string& upload_id, int part_num, uint64_t part_size) { + multipart.is_multipart = true; + multipart.upload_id = upload_id; + multipart.part_num = part_num; + multipart.part_size = part_size; +} +int RGWLCCloudStreamPut::send() { + int ret = RGWHTTP::send(out_req); + return ret; +} -class RGWLCStreamObjToCloudPlainCR : public RGWCoroutine { - RGWLCCloudTierCtx& tier_ctx; +RGWGetDataCB *RGWLCCloudStreamPut::get_cb() { + return out_req->get_out_cb(); +} - std::shared_ptr in_crf; - std::shared_ptr out_crf; +int RGWLCCloudStreamPut::complete_request() { + int ret = conn.complete_request(out_req, etag, &obj_properties.mtime, null_yield); + return ret; +} - std::unique_ptr dest_bucket; - std::unique_ptr dest_obj; +/* Read local copy and write to Cloud endpoint */ +static int cloud_tier_transfer_object(const DoutPrefixProvider* dpp, + RGWLCStreamRead* readf, RGWLCCloudStreamPut* writef) { + std::string url; + bufferlist bl; + bool sent_attrs{false}; + int ret{0}; + off_t ofs; + off_t end; - rgw_lc_obj_properties obj_properties; - RGWBucketInfo b; - string target_obj_name; + ret = readf->init(); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: fail to initialize in_crf, ret = " << ret << dendl; + return ret; + } + readf->get_range(ofs, end); + rgw_rest_obj& rest_obj = readf->get_rest_obj(); + if (!sent_attrs) { + ret = writef->init(); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: fail to initialize out_crf, ret = " << ret << dendl; + return ret; + } - rgw::sal::Object *o; + writef->send_ready(dpp, rest_obj); + ret = writef->send(); + if (ret < 0) { + return ret; + } + sent_attrs = true; + } - public: - RGWLCStreamObjToCloudPlainCR(RGWLCCloudTierCtx& _tier_ctx) - : RGWCoroutine(_tier_ctx.cct), tier_ctx(_tier_ctx), - obj_properties(tier_ctx.o.meta.mtime, tier_ctx.o.meta.etag, - tier_ctx.o.versioned_epoch, tier_ctx.acl_mappings, - tier_ctx.target_storage_class){} - - int operate(const DoutPrefixProvider *dpp) { - - reenter(this) { - b.bucket.name = tier_ctx.target_bucket_name; - target_obj_name = tier_ctx.bucket_info.bucket.name + "/" + - (*tier_ctx.obj)->get_name(); - if (!tier_ctx.o.is_current()) { - target_obj_name += get_key_instance((*tier_ctx.obj)->get_key()); - } + ret = readf->read(ofs, end, writef->get_cb()); - retcode = tier_ctx.store->get_bucket(nullptr, b, &dest_bucket); - if (retcode < 0) { - ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize dest_bucket - " << tier_ctx.target_bucket_name << " , retcode = " << retcode << dendl; - return retcode; - } - - dest_obj = dest_bucket->get_object(rgw_obj_key(target_obj_name)); - if (!dest_obj) { - ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize dest_object path - " << target_obj_name << dendl; - return -1; - } + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: fail to read from in_crf, ret = " << ret << dendl; + return ret; + } - o = dest_obj.get(); + ret = writef->complete_request(); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: fail to complete request, ret = " << ret << dendl; + return ret; + } - // tier_ctx.obj.set_atomic(&tier_ctx.rctx); -- might need when updated to zipper SAL + return 0; +} - /* Prepare Read from source */ - in_crf.reset(new RGWLCStreamReadCRF(tier_ctx.cct, dpp, - tier_ctx.rctx, tier_ctx.obj, tier_ctx.o.meta.mtime)); +static int cloud_tier_plain_transfer(RGWLCCloudTierCtx& tier_ctx) { + int ret; + std::unique_ptr dest_bucket; + std::unique_ptr dest_obj; - out_crf.reset(new RGWLCStreamPutCRF((CephContext *)(tier_ctx.cct), get_env(), this, - (RGWHTTPManager*)(tier_ctx.http_manager), obj_properties, tier_ctx.conn, o)); + rgw_lc_obj_properties obj_properties(tier_ctx.o.meta.mtime, tier_ctx.o.meta.etag, + tier_ctx.o.versioned_epoch, tier_ctx.acl_mappings, + tier_ctx.target_storage_class); + RGWBucketInfo b; + std::string target_obj_name; - /* actual Read & Write */ - yield call(new RGWStreamWriteCR(cct, (RGWHTTPManager*)(tier_ctx.http_manager), in_crf, out_crf)); - if (retcode < 0) { - return set_cr_error(retcode); - } + b.bucket.name = tier_ctx.target_bucket_name; + target_obj_name = tier_ctx.bucket_info.bucket.name + "/" + + tier_ctx.obj->get_name(); + if (!tier_ctx.o.is_current()) { + target_obj_name += get_key_instance(tier_ctx.obj->get_key()); + } - return set_cr_done(); - } + ret = tier_ctx.store->get_bucket(nullptr, b, &dest_bucket); + if (ret < 0) { + ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to initialize dest_bucket - " << tier_ctx.target_bucket_name << " , ret = " << ret << dendl; + return ret; + } - return 0; + dest_obj = dest_bucket->get_object(rgw_obj_key(target_obj_name)); + if (!dest_obj) { + ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to initialize dest_object path - " << target_obj_name << dendl; + return -1; } -}; -class RGWLCStreamObjToCloudMultipartPartCR : public RGWCoroutine { - RGWLCCloudTierCtx& tier_ctx; + tier_ctx.obj->set_atomic(&tier_ctx.rctx); + + /* Prepare Read from source */ + /* TODO: Define readf, writef as stack variables. For some reason, + * when used as stack variables (esp., readf), the transition seems to + * be taking lot of time eventually erroring out at times. + */ + std::shared_ptr readf; + readf.reset(new RGWLCStreamRead(tier_ctx.cct, tier_ctx.dpp, + tier_ctx.rctx, tier_ctx.obj, tier_ctx.o.meta.mtime)); - string upload_id; + std::shared_ptr writef; + writef.reset(new RGWLCCloudStreamPut(tier_ctx.dpp, obj_properties, tier_ctx.conn, + dest_obj.get())); - rgw_lc_multipart_part_info part_info; + /* actual Read & Write */ + ret = cloud_tier_transfer_object(tier_ctx.dpp, readf.get(), writef.get()); - string *petag; - std::shared_ptr in_crf; - std::shared_ptr out_crf; + return ret; +} +static int cloud_tier_send_multipart_part(RGWLCCloudTierCtx& tier_ctx, + const std::string& upload_id, + const rgw_lc_multipart_part_info& part_info, + std::string *petag) { + int ret; std::unique_ptr dest_bucket; std::unique_ptr dest_obj; - rgw_lc_obj_properties obj_properties; + rgw_lc_obj_properties obj_properties(tier_ctx.o.meta.mtime, tier_ctx.o.meta.etag, + tier_ctx.o.versioned_epoch, tier_ctx.acl_mappings, + tier_ctx.target_storage_class); RGWBucketInfo b; - string target_obj_name; + std::string target_obj_name; off_t end; - public: - RGWLCStreamObjToCloudMultipartPartCR(RGWLCCloudTierCtx& _tier_ctx, const string& _upload_id, - const rgw_lc_multipart_part_info& _part_info, - string *_petag) : RGWCoroutine(_tier_ctx.cct), tier_ctx(_tier_ctx), - upload_id(_upload_id), part_info(_part_info), petag(_petag), - obj_properties(tier_ctx.o.meta.mtime, tier_ctx.o.meta.etag, - tier_ctx.o.versioned_epoch, tier_ctx.acl_mappings, - tier_ctx.target_storage_class){} - - int operate(const DoutPrefixProvider *dpp) override { - reenter(this) { - b.bucket.name = tier_ctx.target_bucket_name; - target_obj_name = tier_ctx.bucket_info.bucket.name + "/" + - (*tier_ctx.obj)->get_name(); - if (!tier_ctx.o.is_current()) { - target_obj_name += get_key_instance((*tier_ctx.obj)->get_key()); - } - - retcode = tier_ctx.store->get_bucket(nullptr, b, &dest_bucket); - if (retcode < 0) { - ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize dest_bucket - " << tier_ctx.target_bucket_name << " , retcode = " << retcode << dendl; - return retcode; - } - - dest_obj = dest_bucket->get_object(rgw_obj_key(target_obj_name)); - if (!dest_obj) { - ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize dest_object path - " << target_obj_name << dendl; - return -1; - } + b.bucket.name = tier_ctx.target_bucket_name; + target_obj_name = tier_ctx.bucket_info.bucket.name + "/" + + tier_ctx.obj->get_name(); + if (!tier_ctx.o.is_current()) { + target_obj_name += get_key_instance(tier_ctx.obj->get_key()); + } - // tier_ctx.obj.set_atomic(&tier_ctx.rctx); -- might need when updated to zipper SAL + ret = tier_ctx.store->get_bucket(nullptr, b, &dest_bucket); + if (ret < 0) { + ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to initialize dest_bucket - " << tier_ctx.target_bucket_name << " , ret = " << ret << dendl; + return ret; + } - /* Prepare Read from source */ - in_crf.reset(new RGWLCStreamReadCRF(tier_ctx.cct, tier_ctx.dpp, - tier_ctx.rctx, tier_ctx.obj, tier_ctx.o.meta.mtime)); + dest_obj = dest_bucket->get_object(rgw_obj_key(target_obj_name)); + if (!dest_obj) { + ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to initialize dest_object path - " << target_obj_name << dendl; + return -1; + } - end = part_info.ofs + part_info.size - 1; - std::static_pointer_cast(in_crf)->set_multipart(part_info.size, part_info.ofs, end); + tier_ctx.obj->set_atomic(&tier_ctx.rctx); - /* Prepare write */ - out_crf.reset(new RGWLCStreamPutCRF((CephContext *)(tier_ctx.cct), get_env(), this, - (RGWHTTPManager*)(tier_ctx.http_manager), obj_properties, tier_ctx.conn, - dest_obj.get())); + /* TODO: Define readf, writef as stack variables. For some reason, + * when used as stack variables (esp., readf), the transition seems to + * be taking lot of time eventually erroring out at times. */ + std::shared_ptr readf; + readf.reset(new RGWLCStreamRead(tier_ctx.cct, tier_ctx.dpp, + tier_ctx.rctx, tier_ctx.obj, tier_ctx.o.meta.mtime)); - out_crf->set_multipart(upload_id, part_info.part_num, part_info.size); + std::shared_ptr writef; + writef.reset(new RGWLCCloudStreamPut(tier_ctx.dpp, obj_properties, tier_ctx.conn, + dest_obj.get())); - /* actual Read & Write */ - yield call(new RGWStreamWriteCR(cct, (RGWHTTPManager*)(tier_ctx.http_manager), in_crf, out_crf)); - if (retcode < 0) { - return set_cr_error(retcode); - } + /* Prepare Read from source */ + end = part_info.ofs + part_info.size - 1; + readf->set_multipart(part_info.size, part_info.ofs, end); - if (!(static_cast(out_crf.get()))->get_etag(petag)) { - ldout(tier_ctx.cct, 0) << "ERROR: failed to get etag from PUT request" << dendl; - return set_cr_error(-EIO); - } + /* Prepare write */ + writef->set_multipart(upload_id, part_info.part_num, part_info.size); - return set_cr_done(); - } + /* actual Read & Write */ + ret = cloud_tier_transfer_object(tier_ctx.dpp, readf.get(), writef.get()); + if (ret < 0) { + return ret; + } - return 0; + if (!(writef->get_etag(petag))) { + ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to get etag from PUT request" << dendl; + return -EIO; } -}; -class RGWLCAbortMultipartCR : public RGWCoroutine { - CephContext *cct; - RGWHTTPManager *http_manager; - RGWRESTConn *dest_conn; - rgw_obj dest_obj; + return 0; +} - string upload_id; +static int cloud_tier_abort_multipart(const DoutPrefixProvider *dpp, + RGWRESTConn& dest_conn, const rgw_obj& dest_obj, + const std::string& upload_id) { + int ret; + bufferlist out_bl; + bufferlist bl; + rgw_http_param_pair params[] = { { "uploadId", upload_id.c_str() }, {nullptr, nullptr} }; - public: - RGWLCAbortMultipartCR(CephContext *_cct, RGWHTTPManager *_http_manager, - RGWRESTConn *_dest_conn, const rgw_obj& _dest_obj, - const string& _upload_id) : RGWCoroutine(_cct), - cct(_cct), http_manager(_http_manager), - dest_conn(_dest_conn), dest_obj(_dest_obj), - upload_id(_upload_id) {} - - int operate(const DoutPrefixProvider *dpp) override { - reenter(this) { - - yield { - rgw_http_param_pair params[] = { { "uploadId", upload_id.c_str() }, {nullptr, nullptr} }; - bufferlist bl; - call(new RGWDeleteRESTResourceCR(cct, dest_conn, http_manager, - obj_to_aws_path(dest_obj), params)); - } + string resource = obj_to_aws_path(dest_obj); + ret = dest_conn.send_resource(dpp, "DELETE", resource, params, nullptr, + out_bl, &bl, nullptr, null_yield); - if (retcode < 0) { - ldout(cct, 0) << "ERROR: failed to abort multipart upload for dest object=" << dest_obj << " (retcode=" << retcode << ")" << dendl; - return set_cr_error(retcode); - } - return set_cr_done(); - } - - return 0; + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: failed to abort multipart upload for dest object=" << dest_obj << " (ret=" << ret << ")" << dendl; + return ret; } -}; - -class RGWLCInitMultipartCR : public RGWCoroutine { - CephContext *cct; - RGWHTTPManager *http_manager; - RGWRESTConn *dest_conn; - rgw_obj dest_obj; - uint64_t obj_size; - map attrs; + return 0; +} +static int cloud_tier_init_multipart(const DoutPrefixProvider *dpp, + RGWRESTConn& dest_conn, const rgw_obj& dest_obj, + uint64_t obj_size, std::map& attrs, + std::string& upload_id) { bufferlist out_bl; - - string *upload_id; + bufferlist bl; struct InitMultipartResult { - string bucket; - string key; - string upload_id; + std::string bucket; + std::string key; + std::string upload_id; void decode_xml(XMLObj *obj) { RGWXMLDecoder::decode_xml("Bucket", bucket, obj); @@ -699,81 +914,67 @@ class RGWLCInitMultipartCR : public RGWCoroutine { } } result; - public: - RGWLCInitMultipartCR(CephContext *_cct, RGWHTTPManager *_http_manager, - RGWRESTConn *_dest_conn, const rgw_obj& _dest_obj, - uint64_t _obj_size, const map& _attrs, - string *_upload_id) : RGWCoroutine(_cct), cct(_cct), - http_manager(_http_manager), dest_conn(_dest_conn), - dest_obj(_dest_obj), obj_size(_obj_size), - attrs(_attrs), upload_id(_upload_id) {} - - int operate(const DoutPrefixProvider *dpp) override { - reenter(this) { - - yield { - rgw_http_param_pair params[] = { { "uploads", nullptr }, {nullptr, nullptr} }; - bufferlist bl; - call(new RGWPostRawRESTResourceCR (cct, dest_conn, http_manager, - obj_to_aws_path(dest_obj), params, &attrs, bl, &out_bl)); - } + int ret; + rgw_http_param_pair params[] = { { "uploads", nullptr }, {nullptr, nullptr} }; - if (retcode < 0) { - ldout(cct, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj << dendl; - return set_cr_error(retcode); - } - { - /* - * If one of the following fails we cannot abort upload, as we cannot - * extract the upload id. If one of these fail it's very likely that that's - * the least of our problem. - */ - RGWXMLDecoder::XMLParser parser; - if (!parser.init()) { - ldout(cct, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl; - return set_cr_error(-EIO); - } + string resource = obj_to_aws_path(dest_obj); - if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) { - string str(out_bl.c_str(), out_bl.length()); - ldout(cct, 5) << "ERROR: failed to parse xml: " << str << dendl; - return set_cr_error(-EIO); - } + ret = dest_conn.send_resource(dpp, "POST", resource, params, &attrs, + out_bl, &bl, nullptr, null_yield); - try { - RGWXMLDecoder::decode_xml("InitiateMultipartUploadResult", result, &parser, true); - } catch (RGWXMLDecoder::err& err) { - string str(out_bl.c_str(), out_bl.length()); - ldout(cct, 5) << "ERROR: unexpected xml: " << str << dendl; - return set_cr_error(-EIO); - } - } + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj << dendl; + return ret; + } + /* + * If one of the following fails we cannot abort upload, as we cannot + * extract the upload id. If one of these fail it's very likely that that's + * the least of our problem. + */ + RGWXMLDecoder::XMLParser parser; + if (!parser.init()) { + ldpp_dout(dpp, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl; + return -EIO; + } - ldout(cct, 20) << "init multipart result: bucket=" << result.bucket << " key=" << result.key << " upload_id=" << result.upload_id << dendl; + if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) { + string str(out_bl.c_str(), out_bl.length()); + ldpp_dout(dpp, 5) << "ERROR: failed to parse xml initmultipart: " << str << dendl; + return -EIO; + } - *upload_id = result.upload_id; + try { + RGWXMLDecoder::decode_xml("InitiateMultipartUploadResult", result, &parser, true); + } catch (RGWXMLDecoder::err& err) { + string str(out_bl.c_str(), out_bl.length()); + ldpp_dout(dpp, 5) << "ERROR: unexpected xml: " << str << dendl; + return -EIO; + } - return set_cr_done(); - } + ldpp_dout(dpp, 20) << "init multipart result: bucket=" << result.bucket << " key=" << result.key << " upload_id=" << result.upload_id << dendl; - return 0; - } -}; + upload_id = result.upload_id; -class RGWLCCompleteMultipartCR : public RGWCoroutine { - CephContext *cct; - RGWHTTPManager *http_manager; - RGWRESTConn *dest_conn; - rgw_obj dest_obj; + return 0; +} - bufferlist out_bl; +static int cloud_tier_complete_multipart(const DoutPrefixProvider *dpp, + RGWRESTConn& dest_conn, const rgw_obj& dest_obj, + std::string& upload_id, + const std::map& parts) { + rgw_http_param_pair params[] = { { "uploadId", upload_id.c_str() }, {nullptr, nullptr} }; + + stringstream ss; + XMLFormatter formatter; + int ret; - string upload_id; + bufferlist bl, out_bl; + string resource = obj_to_aws_path(dest_obj); struct CompleteMultipartReq { - map parts; + std::map parts; - explicit CompleteMultipartReq(const map& _parts) : parts(_parts) {} + explicit CompleteMultipartReq(const std::map& _parts) : parts(_parts) {} void dump_xml(Formatter *f) const { for (const auto& p : parts) { @@ -783,13 +984,13 @@ class RGWLCCompleteMultipartCR : public RGWCoroutine { f->close_section(); }; } - } req_enc; + } req_enc(parts); struct CompleteMultipartResult { - string location; - string bucket; - string key; - string etag; + std::string location; + std::string bucket; + std::string key; + std::string etag; void decode_xml(XMLObj *obj) { RGWXMLDecoder::decode_xml("Location", bucket, obj); @@ -799,396 +1000,332 @@ class RGWLCCompleteMultipartCR : public RGWCoroutine { } } result; - public: - RGWLCCompleteMultipartCR(CephContext *_cct, RGWHTTPManager *_http_manager, - RGWRESTConn *_dest_conn, const rgw_obj& _dest_obj, - string _upload_id, const map& _parts) : - RGWCoroutine(_cct), cct(_cct), http_manager(_http_manager), - dest_conn(_dest_conn), dest_obj(_dest_obj), upload_id(_upload_id), - req_enc(_parts) {} - - int operate(const DoutPrefixProvider *dpp) override { - reenter(this) { - - yield { - rgw_http_param_pair params[] = { { "uploadId", upload_id.c_str() }, {nullptr, nullptr} }; - stringstream ss; - XMLFormatter formatter; + encode_xml("CompleteMultipartUpload", req_enc, &formatter); - encode_xml("CompleteMultipartUpload", req_enc, &formatter); + formatter.flush(ss); + bl.append(ss.str()); - formatter.flush(ss); + ret = dest_conn.send_resource(dpp, "POST", resource, params, nullptr, + out_bl, &bl, nullptr, null_yield); - bufferlist bl; - bl.append(ss.str()); - call(new RGWPostRawRESTResourceCR (cct, dest_conn, http_manager, - obj_to_aws_path(dest_obj), params, nullptr, bl, &out_bl)); - } - - if (retcode < 0) { - ldout(cct, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj << dendl; - return set_cr_error(retcode); - } - { - /* - * If one of the following fails we cannot abort upload, as we cannot - * extract the upload id. If one of these fail it's very likely that that's - * the least of our problem. - */ - RGWXMLDecoder::XMLParser parser; - if (!parser.init()) { - ldout(cct, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl; - return set_cr_error(-EIO); - } - - if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) { - string str(out_bl.c_str(), out_bl.length()); - ldout(cct, 5) << "ERROR: failed to parse xml: " << str << dendl; - return set_cr_error(-EIO); - } - - try { - RGWXMLDecoder::decode_xml("CompleteMultipartUploadResult", result, &parser, true); - } catch (RGWXMLDecoder::err& err) { - string str(out_bl.c_str(), out_bl.length()); - ldout(cct, 5) << "ERROR: unexpected xml: " << str << dendl; - return set_cr_error(-EIO); - } - } - - ldout(cct, 20) << "complete multipart result: location=" << result.location << " bucket=" << result.bucket << " key=" << result.key << " etag=" << result.etag << dendl; - - return set_cr_done(); - } + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: failed to complete multipart upload for dest object=" << dest_obj << dendl; + return ret; + } + /* + * If one of the following fails we cannot abort upload, as we cannot + * extract the upload id. If one of these fail it's very likely that that's + * the least of our problem. + */ + RGWXMLDecoder::XMLParser parser; + if (!parser.init()) { + ldpp_dout(dpp, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl; + return -EIO; + } - return 0; + if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) { + string str(out_bl.c_str(), out_bl.length()); + ldpp_dout(dpp, 5) << "ERROR: failed to parse xml Completemultipart: " << str << dendl; + return -EIO; } -}; + try { + RGWXMLDecoder::decode_xml("CompleteMultipartUploadResult", result, &parser, true); + } catch (RGWXMLDecoder::err& err) { + string str(out_bl.c_str(), out_bl.length()); + ldpp_dout(dpp, 5) << "ERROR: unexpected xml: " << str << dendl; + return -EIO; + } -class RGWLCStreamAbortMultipartUploadCR : public RGWCoroutine { - RGWLCCloudTierCtx& tier_ctx; - const rgw_obj dest_obj; - const rgw_raw_obj status_obj; + ldpp_dout(dpp, 20) << "complete multipart result: location=" << result.location << " bucket=" << result.bucket << " key=" << result.key << " etag=" << result.etag << dendl; - string upload_id; + return ret; +} - public: +static int cloud_tier_abort_multipart_upload(RGWLCCloudTierCtx& tier_ctx, + const rgw_obj& dest_obj, const rgw_raw_obj& status_obj, + const std::string& upload_id) { + int ret; - RGWLCStreamAbortMultipartUploadCR(RGWLCCloudTierCtx& _tier_ctx, - const rgw_obj& _dest_obj, const rgw_raw_obj& _status_obj, - const string& _upload_id) : RGWCoroutine(_tier_ctx.cct), - tier_ctx(_tier_ctx), dest_obj(_dest_obj), status_obj(_status_obj), - upload_id(_upload_id) {} - - int operate(const DoutPrefixProvider *dpp) override { - reenter(this) { - yield call(new RGWLCAbortMultipartCR(tier_ctx.cct, tier_ctx.http_manager, tier_ctx.conn.get(), dest_obj, upload_id)); - if (retcode < 0) { - ldout(tier_ctx.cct, 0) << "ERROR: failed to abort multipart upload dest obj=" << dest_obj << " upload_id=" << upload_id << " retcode=" << retcode << dendl; - /* ignore error, best effort */ - } - yield call(new RGWRadosRemoveCR(dynamic_cast(tier_ctx.store), status_obj)); - if (retcode < 0) { - ldout(tier_ctx.cct, 0) << "ERROR: failed to remove sync status obj obj=" << status_obj << " retcode=" << retcode << dendl; - /* ignore error, best effort */ - } - return set_cr_done(); - } + ret = cloud_tier_abort_multipart(tier_ctx.dpp, tier_ctx.conn, dest_obj, upload_id); - return 0; + if (ret < 0) { + ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to abort multipart upload dest obj=" << dest_obj << " upload_id=" << upload_id << " ret=" << ret << dendl; + /* ignore error, best effort */ } -}; + /* remove status obj */ + ret = delete_upload_status(tier_ctx.dpp, tier_ctx.store, &status_obj); + if (ret < 0) { + ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to remove sync status obj obj=" << status_obj << " ret=" << ret << dendl; + // ignore error, best effort + } + return 0; +} -class RGWLCStreamObjToCloudMultipartCR : public RGWCoroutine { - RGWLCCloudTierCtx& tier_ctx; - RGWRESTConn *source_conn; +static int cloud_tier_multipart_transfer(RGWLCCloudTierCtx& tier_ctx) { rgw_obj src_obj; rgw_obj dest_obj; uint64_t obj_size; - string src_etag; + std::string src_etag; rgw_rest_obj rest_obj; rgw_lc_multipart_upload_info status; - std::shared_ptr in_crf; - map new_attrs; + std::map new_attrs; rgw_raw_obj status_obj; - rgw_lc_obj_properties obj_properties; RGWBucketInfo b; - string target_obj_name; + std::string target_obj_name; rgw_bucket target_bucket; - rgw::sal::RadosStore *rados; - - public: - RGWLCStreamObjToCloudMultipartCR(RGWLCCloudTierCtx& _tier_ctx) - : RGWCoroutine(_tier_ctx.cct), tier_ctx(_tier_ctx), - obj_properties(tier_ctx.o.meta.mtime, tier_ctx.o.meta.etag, - tier_ctx.o.versioned_epoch, tier_ctx.acl_mappings, - tier_ctx.target_storage_class){} - int operate(const DoutPrefixProvider *dpp) override { - reenter(this) { + int ret; - obj_size = tier_ctx.o.meta.size; + rgw_lc_obj_properties obj_properties(tier_ctx.o.meta.mtime, tier_ctx.o.meta.etag, + tier_ctx.o.versioned_epoch, tier_ctx.acl_mappings, + tier_ctx.target_storage_class); - target_bucket.name = tier_ctx.target_bucket_name; + uint32_t part_size{0}; + uint32_t num_parts{0}; - target_obj_name = tier_ctx.bucket_info.bucket.name + "/" + - (*tier_ctx.obj)->get_name(); - if (!tier_ctx.o.is_current()) { - target_obj_name += get_key_instance((*tier_ctx.obj)->get_key()); - } - dest_obj.init(target_bucket, target_obj_name); + int cur_part{0}; + uint64_t cur_ofs{0}; + std::map parts; - status_obj = rgw_raw_obj(tier_ctx.store->get_zone()->get_params().log_pool, - "lc_multipart_" + (*tier_ctx.obj)->get_oid()); + obj_size = tier_ctx.o.meta.size; - rados = dynamic_cast(tier_ctx.store); + target_bucket.name = tier_ctx.target_bucket_name; - if (!rados) { - ldout(tier_ctx.cct, 0) << "ERROR: Not a RadosStore. Cannot be transitioned to cloud." << dendl; - return -1; - } + target_obj_name = tier_ctx.bucket_info.bucket.name + "/" + + tier_ctx.obj->get_name(); + if (!tier_ctx.o.is_current()) { + target_obj_name += get_key_instance(tier_ctx.obj->get_key()); + } + dest_obj.init(target_bucket, target_obj_name); - yield call(new RGWSimpleRadosReadCR(dpp, rados->svc()->rados->get_async_processor(), rados->svc()->sysobj, - status_obj, &status, false)); + status_obj = rgw_raw_obj(tier_ctx.store->get_zone()->get_params().log_pool, + "lc_multipart_" + tier_ctx.obj->get_oid()); - if (retcode < 0 && retcode != -ENOENT) { - ldout(tier_ctx.cct, 0) << "ERROR: failed to read sync status of object " << src_obj << " retcode=" << retcode << dendl; - return retcode; - } + ret = read_upload_status(tier_ctx.dpp, tier_ctx.store, &status_obj, &status); - if (retcode >= 0) { - /* check here that mtime and size did not change */ - if (status.mtime != obj_properties.mtime || status.obj_size != obj_size || - status.etag != obj_properties.etag) { - yield call(new RGWLCStreamAbortMultipartUploadCR(tier_ctx, dest_obj, status_obj, status.upload_id)); - retcode = -ENOENT; - } - } + if (ret < 0 && ret != -ENOENT) { + ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to read sync status of object " << src_obj << " ret=" << ret << dendl; + return ret; + } - if (retcode == -ENOENT) { - in_crf.reset(new RGWLCStreamReadCRF(tier_ctx.cct, tier_ctx.dpp, tier_ctx.rctx, tier_ctx.obj, tier_ctx.o.meta.mtime)); + if (ret >= 0) { + // check here that mtime and size did not change + if (status.mtime != obj_properties.mtime || status.obj_size != obj_size || + status.etag != obj_properties.etag) { + cloud_tier_abort_multipart_upload(tier_ctx, dest_obj, status_obj, status.upload_id); + ret = -ENOENT; + } + } - in_crf->init(); + if (ret == -ENOENT) { + RGWLCStreamRead readf(tier_ctx.cct, tier_ctx.dpp, tier_ctx.rctx, tier_ctx.obj, tier_ctx.o.meta.mtime); - rest_obj = in_crf->get_rest_obj(); + readf.init(); - RGWLCStreamPutCRF::init_send_attrs(tier_ctx.cct, rest_obj, obj_properties, &new_attrs); + rest_obj = readf.get_rest_obj(); - yield call(new RGWLCInitMultipartCR(tier_ctx.cct, tier_ctx.http_manager, tier_ctx.conn.get(), dest_obj, obj_size, std::move(new_attrs), &status.upload_id)); - if (retcode < 0) { - return set_cr_error(retcode); - } + RGWLCCloudStreamPut::init_send_attrs(tier_ctx.dpp, rest_obj, obj_properties, new_attrs); - status.obj_size = obj_size; - status.mtime = obj_properties.mtime; - status.etag = obj_properties.etag; -#define MULTIPART_MAX_PARTS 10000 - uint64_t min_part_size = obj_size / MULTIPART_MAX_PARTS; - uint64_t min_conf_size = tier_ctx.multipart_min_part_size; + ret = cloud_tier_init_multipart(tier_ctx.dpp, tier_ctx.conn, dest_obj, obj_size, new_attrs, status.upload_id); + if (ret < 0) { + return ret; + } - if (min_conf_size < MULTIPART_MIN_POSSIBLE_PART_SIZE) { - min_conf_size = MULTIPART_MIN_POSSIBLE_PART_SIZE; - } + status.obj_size = obj_size; + status.mtime = obj_properties.mtime; + status.etag = obj_properties.etag; - status.part_size = std::max(min_conf_size, min_part_size); - status.num_parts = (obj_size + status.part_size - 1) / status.part_size; - status.cur_part = 1; - status.cur_ofs = 0; - } + ret = put_upload_status(tier_ctx.dpp, tier_ctx.store, &status_obj, &status); - for (; (uint32_t)status.cur_part <= status.num_parts; ++status.cur_part) { - ldout(tier_ctx.cct, 20) << "status.cur_part = "<(dpp, rados->svc()->rados->get_async_processor(), rados->svc()->sysobj, status_obj, status)); + for (; (uint32_t)cur_part <= num_parts; ++cur_part) { + ldpp_dout(tier_ctx.dpp, 20) << "cur_part = "<< cur_part << ", info.ofs = " << cur_ofs << ", info.size = " << part_size << ", obj size = " << obj_size<< ", num_parts:" << num_parts << dendl; + rgw_lc_multipart_part_info& cur_part_info = parts[cur_part]; + cur_part_info.part_num = cur_part; + cur_part_info.ofs = cur_ofs; + cur_part_info.size = std::min((uint64_t)part_size, obj_size - cur_ofs); - if (retcode < 0) { - ldout(tier_ctx.cct, 0) << "ERROR: failed to store multipart upload state, retcode=" << retcode << dendl; - /* continue with upload anyway */ - } - ldout(tier_ctx.cct, 0) << "sync of object=" << tier_ctx.obj << " via multipart upload, finished sending part #" << status.cur_part << " etag=" << status.parts[status.cur_part].etag << dendl; - } + cur_ofs += cur_part_info.size; - yield call(new RGWLCCompleteMultipartCR(tier_ctx.cct, tier_ctx.http_manager, tier_ctx.conn.get(), dest_obj, status.upload_id, status.parts)); - if (retcode < 0) { - ldout(tier_ctx.cct, 0) << "ERROR: failed to complete multipart upload of obj=" << tier_ctx.obj << " (error: " << cpp_strerror(-retcode) << ")" << dendl; - yield call(new RGWLCStreamAbortMultipartUploadCR(tier_ctx, dest_obj, status_obj, status.upload_id)); - return set_cr_error(retcode); - } + ret = cloud_tier_send_multipart_part(tier_ctx, + status.upload_id, + cur_part_info, + &cur_part_info.etag); - /* remove status obj */ - yield call(new RGWRadosRemoveCR(rados, status_obj)); - if (retcode < 0) { - ldout(tier_ctx.cct, 0) << "ERROR: failed to abort multipart upload obj=" << tier_ctx.obj << " upload_id=" << status.upload_id << " part number " << status.cur_part << " (" << cpp_strerror(-retcode) << ")" << dendl; - /* ignore error, best effort */ - } - return set_cr_done(); + if (ret < 0) { + ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to send multipart part of obj=" << tier_ctx.obj << ", sync via multipart upload, upload_id=" << status.upload_id << " part number " << cur_part << " (error: " << cpp_strerror(-ret) << ")" << dendl; + cloud_tier_abort_multipart_upload(tier_ctx, dest_obj, status_obj, status.upload_id); + return ret; } - return 0; + } -}; -int RGWLCCloudCheckCR::operate(const DoutPrefixProvider *dpp) { - /* Check if object has already been transitioned */ - reenter(this) { - b.bucket.name = tier_ctx.target_bucket_name; - target_obj_name = tier_ctx.bucket_info.bucket.name + "/" + - (*tier_ctx.obj)->get_name(); - if (!tier_ctx.o.is_current()) { - target_obj_name += get_key_instance((*tier_ctx.obj)->get_key()); - } + ret = cloud_tier_complete_multipart(tier_ctx.dpp, tier_ctx.conn, dest_obj, status.upload_id, parts); + if (ret < 0) { + ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to complete multipart upload of obj=" << tier_ctx.obj << " (error: " << cpp_strerror(-ret) << ")" << dendl; + cloud_tier_abort_multipart_upload(tier_ctx, dest_obj, status_obj, status.upload_id); + return ret; + } - retcode = tier_ctx.store->get_bucket(nullptr, b, &dest_bucket); - if (retcode < 0) { - ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize dest_bucket - " << tier_ctx.target_bucket_name << " , reterr = " << retcode << dendl; - return ret; - } - - dest_obj = dest_bucket->get_object(rgw_obj_key(target_obj_name)); - if (!dest_obj) { - ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize dest_object path - " << target_obj_name << dendl; - return -1; - } + /* remove status obj */ + ret = delete_upload_status(tier_ctx.dpp, tier_ctx.store, &status_obj); + if (ret < 0) { + ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to abort multipart upload obj=" << tier_ctx.obj << " upload_id=" << status.upload_id << " part number " << cur_part << " (" << cpp_strerror(-ret) << ")" << dendl; + // ignore error, best effort + } + return 0; +} - get_crf.reset(new RGWLCStreamGetCRF(tier_ctx.cct, get_env(), this, tier_ctx.http_manager, obj_properties, - tier_ctx.conn, dest_obj.get())); - - /* Having yield here doesn't seem to wait for init2() to fetch the headers - * before calling is_already_tiered() below - */ - yield { - retcode = get_crf->init(dpp); - if (retcode < 0) { - ldout(tier_ctx.cct, 0) << "ERROR: failed to fetch HEAD from cloud for obj=" << tier_ctx.obj << " , retcode = " << retcode << dendl; - return set_cr_error(ret); - } - } - if (retcode < 0) { - ldout(tier_ctx.cct, 20) << __func__ << ": get_crf()->init retcode=" << retcode << dendl; - return set_cr_error(retcode); - } - if (get_crf.get()->is_already_tiered()) { - *already_tiered = true; - ldout(tier_ctx.cct, 20) << "is_already_tiered true" << dendl; - return set_cr_done(); - } +/* Check if object has already been transitioned */ +static int cloud_tier_check_object(RGWLCCloudTierCtx& tier_ctx, bool& already_tiered) { + int ret; + std::map headers; - ldout(tier_ctx.cct, 20) << "is_already_tiered false..going with out_crf writing" << dendl; + /* Fetch Head object */ + ret = cloud_tier_get_object(tier_ctx, true, headers); - return set_cr_done(); + if (ret < 0) { + ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to fetch HEAD from cloud for obj=" << tier_ctx.obj << " , ret = " << ret << dendl; + return ret; + } + + already_tiered = is_already_tiered(tier_ctx.dpp, headers, tier_ctx.o.meta.mtime); + + if (already_tiered) { + ldpp_dout(tier_ctx.dpp, 20) << "is_already_tiered true" << dendl; + } else { + ldpp_dout(tier_ctx.dpp, 20) << "is_already_tiered false..going with out_crf writing" << dendl; } - return 0; -} -map , utime_t> target_buckets; + return ret; +} -int RGWLCCloudTierCR::operate(const DoutPrefixProvider *dpp) { +static int cloud_tier_create_bucket(RGWLCCloudTierCtx& tier_ctx) { + bufferlist out_bl; + int ret = 0; pair key(tier_ctx.storage_class, tier_ctx.target_bucket_name); - bool bucket_created = false; + struct CreateBucketResult { + std::string code; - reenter(this) { + void decode_xml(XMLObj *obj) { + RGWXMLDecoder::decode_xml("Code", code, obj); + } + } result; - if (target_buckets.find(key) != target_buckets.end()) { - utime_t t = target_buckets[key]; + ldpp_dout(tier_ctx.dpp, 30) << "Cloud_tier_ctx: creating bucket:" << tier_ctx.target_bucket_name << dendl; + bufferlist bl; + string resource = tier_ctx.target_bucket_name; - utime_t now = ceph_clock_now(); + ret = tier_ctx.conn.send_resource(tier_ctx.dpp, "PUT", resource, nullptr, nullptr, + out_bl, &bl, nullptr, null_yield); - if (now - t < (2 * cct->_conf->rgw_lc_debug_interval)) { /* not expired */ - bucket_created = true; - } + if (ret < 0 ) { + ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to create target bucket: " << tier_ctx.target_bucket_name << ", ret:" << ret << dendl; + return ret; + } + if (out_bl.length() > 0) { + RGWXMLDecoder::XMLParser parser; + if (!parser.init()) { + ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to initialize xml parser for parsing create_bucket response from server" << dendl; + return -EIO; } - if (!bucket_created){ - yield { - ldout(tier_ctx.cct,10) << "Cloud_tier_ctx: creating bucket:" << tier_ctx.target_bucket_name << dendl; - bufferlist bl; - call(new RGWPutRawRESTResourceCR (tier_ctx.cct, tier_ctx.conn.get(), - tier_ctx.http_manager, - tier_ctx.target_bucket_name, nullptr, bl, &out_bl)); - } - if (retcode < 0 ) { - ldout(tier_ctx.cct, 0) << "ERROR: failed to create target bucket: " << tier_ctx.target_bucket_name << ", retcode:" << retcode << dendl; - return set_cr_error(retcode); - } - if (out_bl.length() > 0) { - RGWXMLDecoder::XMLParser parser; - if (!parser.init()) { - ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize xml parser for parsing create_bucket response from server" << dendl; - return set_cr_error(-EIO); - } + if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) { + string str(out_bl.c_str(), out_bl.length()); + ldpp_dout(tier_ctx.dpp, 5) << "ERROR: failed to parse xml createbucket: " << str << dendl; + return -EIO; + } - if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) { - string str(out_bl.c_str(), out_bl.length()); - ldout(tier_ctx.cct, 5) << "ERROR: failed to parse xml: " << str << dendl; - return set_cr_error(-EIO); - } + try { + RGWXMLDecoder::decode_xml("Error", result, &parser, true); + } catch (RGWXMLDecoder::err& err) { + string str(out_bl.c_str(), out_bl.length()); + ldpp_dout(tier_ctx.dpp, 5) << "ERROR: unexpected xml: " << str << dendl; + return -EIO; + } - try { - RGWXMLDecoder::decode_xml("Error", result, &parser, true); - } catch (RGWXMLDecoder::err& err) { - string str(out_bl.c_str(), out_bl.length()); - ldout(tier_ctx.cct, 5) << "ERROR: unexpected xml: " << str << dendl; - return set_cr_error(-EIO); - } + if (result.code != "BucketAlreadyOwnedByYou") { + ldpp_dout(tier_ctx.dpp, 0) << "ERROR: Creating target bucket failed with error: " << result.code << dendl; + return -EIO; + } + } - if (result.code != "BucketAlreadyOwnedByYou") { - ldout(tier_ctx.cct, 0) << "ERROR: Creating target bucket failed with error: " << result.code << dendl; - return set_cr_error(-EIO); - } - } + return 0; +} + +int rgw_cloud_tier_transfer_object(RGWLCCloudTierCtx& tier_ctx) { + int ret = 0; + + /* If run first time attempt to create the target bucket */ + if (!tier_ctx.target_bucket_created) { + ret = cloud_tier_create_bucket(tier_ctx); - target_buckets[key] = ceph_clock_now(); + if (ret < 0) { + ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to create target bucket on the cloud endpoint ret=" << ret << dendl; + return ret; } + tier_ctx.target_bucket_created = true; + } - yield { - uint64_t size = tier_ctx.o.meta.size; - uint64_t multipart_sync_threshold = tier_ctx.multipart_sync_threshold; + /* Since multiple zones may try to transition the same object to the cloud, + * verify if the object is already transitioned. And since its just a best + * effort, do not bail out in case of any errors. + */ + bool already_tiered = false; + ret = cloud_tier_check_object(tier_ctx, already_tiered); - if (multipart_sync_threshold < MULTIPART_MIN_POSSIBLE_PART_SIZE) { - multipart_sync_threshold = MULTIPART_MIN_POSSIBLE_PART_SIZE; - } + if (ret < 0) { + ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to check object on the cloud endpoint ret=" << ret << dendl; + } - if (size < multipart_sync_threshold) { - call (new RGWLCStreamObjToCloudPlainCR(tier_ctx)); - } else { - tier_ctx.is_multipart_upload = true; - call(new RGWLCStreamObjToCloudMultipartCR(tier_ctx)); + if (already_tiered) { + ldpp_dout(tier_ctx.dpp, 20) << "Object (" << tier_ctx.o.key << ") is already tiered" << dendl; + return 0; + } - } - } + uint64_t size = tier_ctx.o.meta.size; + uint64_t multipart_sync_threshold = tier_ctx.multipart_sync_threshold; - if (retcode < 0) { - return set_cr_error(retcode); - } + if (multipart_sync_threshold < MULTIPART_MIN_POSSIBLE_PART_SIZE) { + multipart_sync_threshold = MULTIPART_MIN_POSSIBLE_PART_SIZE; + } - return set_cr_done(); - } //reenter + if (size < multipart_sync_threshold) { + ret = cloud_tier_plain_transfer(tier_ctx); + } else { + tier_ctx.is_multipart_upload = true; + ret = cloud_tier_multipart_transfer(tier_ctx); + } - return 0; -} + if (ret < 0) { + ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to transition object ret=" << ret << dendl; + } + return ret; +} diff --git a/src/rgw/rgw_lc_tier.h b/src/rgw/rgw_lc_tier.h index 91053f6ea76..86df479e828 100644 --- a/src/rgw/rgw_lc_tier.h +++ b/src/rgw/rgw_lc_tier.h @@ -5,13 +5,11 @@ #define CEPH_RGW_LC_TIER_H #include "rgw_lc.h" -#include "rgw_cr_rados.h" #include "rgw_rest_conn.h" -#include "rgw_cr_rest.h" -#include "rgw_coroutine.h" #include "rgw_rados.h" #include "rgw_zone.h" #include "rgw_sal_rados.h" +#include "rgw_cr_rest.h" #define DEFAULT_MULTIPART_SYNC_PART_SIZE (32 * 1024 * 1024) #define MULTIPART_MIN_POSSIBLE_PART_SIZE (5 * 1024 * 1024) @@ -24,209 +22,34 @@ struct RGWLCCloudTierCtx { rgw_bucket_dir_entry& o; rgw::sal::Store *store; RGWBucketInfo& bucket_info; - string storage_class; + std::string storage_class; - std::unique_ptr* obj; + rgw::sal::Object *obj; RGWObjectCtx& rctx; /* Remote */ - std::shared_ptr conn; - string target_bucket_name; - string target_storage_class; - RGWHTTPManager *http_manager; + RGWRESTConn& conn; + std::string target_bucket_name; + std::string target_storage_class; - map acl_mappings; + std::map acl_mappings; uint64_t multipart_min_part_size; uint64_t multipart_sync_threshold; bool is_multipart_upload{false}; + bool target_bucket_created{true}; RGWLCCloudTierCtx(CephContext* _cct, const DoutPrefixProvider *_dpp, - rgw_bucket_dir_entry& _o, rgw::sal::Store* _store, - RGWBucketInfo &_binfo, std::unique_ptr* _obj, - RGWObjectCtx& _rctx, std::shared_ptr _conn, string _bucket, - string _storage_class, RGWHTTPManager *_http) - : cct(_cct), dpp(_dpp), o(_o), store(_store), bucket_info(_binfo), - obj(_obj), rctx(_rctx), conn(_conn), target_bucket_name(_bucket), - target_storage_class(_storage_class), http_manager(_http) {} -}; - -struct rgw_lc_multipart_part_info { - int part_num{0}; - uint64_t ofs{0}; - uint64_t size{0}; - string etag; - - void encode(bufferlist& bl) const { - ENCODE_START(1, 1, bl); - encode(part_num, bl); - encode(ofs, bl); - encode(size, bl); - encode(etag, bl); - ENCODE_FINISH(bl); - } - - void decode(bufferlist::const_iterator& bl) { - DECODE_START(1, bl); - decode(part_num, bl); - decode(ofs, bl); - decode(size, bl); - decode(etag, bl); - DECODE_FINISH(bl); - } + rgw_bucket_dir_entry& _o, rgw::sal::Store *_store, + RGWBucketInfo &_binfo, rgw::sal::Object *_obj, + RGWObjectCtx& _rctx, RGWRESTConn& _conn, std::string& _bucket, + std::string& _storage_class) : + cct(_cct), dpp(_dpp), o(_o), store(_store), bucket_info(_binfo), + obj(_obj), rctx(_rctx), conn(_conn), target_bucket_name(_bucket), + target_storage_class(_storage_class) {} }; -WRITE_CLASS_ENCODER(rgw_lc_multipart_part_info) - -struct rgw_lc_obj_properties { - ceph::real_time mtime; - string etag; - uint64_t versioned_epoch{0}; - map& target_acl_mappings; - string target_storage_class; - - rgw_lc_obj_properties(ceph::real_time _mtime, string _etag, - uint64_t _versioned_epoch, map& _t_acl_mappings, - string _t_storage_class) : - mtime(_mtime), etag(_etag), - versioned_epoch(_versioned_epoch), - target_acl_mappings(_t_acl_mappings), - target_storage_class(_t_storage_class) {} - - void encode(bufferlist& bl) const { - ENCODE_START(1, 1, bl); - encode(mtime, bl); - encode(etag, bl); - encode(versioned_epoch, bl); - encode(target_acl_mappings, bl); - encode(target_storage_class, bl); - ENCODE_FINISH(bl); - } - void decode(bufferlist::const_iterator& bl) { - DECODE_START(1, bl); - decode(mtime, bl); - decode(etag, bl); - decode(versioned_epoch, bl); - decode(target_acl_mappings, bl); - decode(target_storage_class, bl); - DECODE_FINISH(bl); - } -}; -WRITE_CLASS_ENCODER(rgw_lc_obj_properties) - -struct rgw_lc_multipart_upload_info { - string upload_id; - uint64_t obj_size; - ceph::real_time mtime; - string etag; - uint32_t part_size{0}; - uint32_t num_parts{0}; - - int cur_part{0}; - uint64_t cur_ofs{0}; - - std::map parts; - - void encode(bufferlist& bl) const { - ENCODE_START(1, 1, bl); - encode(upload_id, bl); - encode(obj_size, bl); - encode(mtime, bl); - encode(etag, bl); - encode(part_size, bl); - encode(num_parts, bl); - encode(cur_part, bl); - encode(cur_ofs, bl); - encode(parts, bl); - ENCODE_FINISH(bl); - } - - void decode(bufferlist::const_iterator& bl) { - DECODE_START(1, bl); - decode(upload_id, bl); - decode(obj_size, bl); - decode(mtime, bl); - decode(etag, bl); - decode(part_size, bl); - decode(num_parts, bl); - decode(cur_part, bl); - decode(cur_ofs, bl); - decode(parts, bl); - DECODE_FINISH(bl); - } -}; -WRITE_CLASS_ENCODER(rgw_lc_multipart_upload_info) - -class RGWLCStreamGetCRF : public RGWStreamReadHTTPResourceCRF -{ - RGWRESTConn::get_obj_params req_params; - - CephContext *cct; - RGWHTTPManager *http_manager; - rgw_lc_obj_properties obj_properties; - std::shared_ptr conn; - rgw::sal::Object* dest_obj; - string etag; - RGWRESTStreamRWRequest *in_req; - map headers; - - public: - RGWLCStreamGetCRF(CephContext *_cct, - RGWCoroutinesEnv *_env, - RGWCoroutine *_caller, - RGWHTTPManager *_http_manager, - const rgw_lc_obj_properties& _obj_properties, - std::shared_ptr _conn, - rgw::sal::Object* _dest_obj) : - RGWStreamReadHTTPResourceCRF(_cct, _env, _caller, _http_manager, _dest_obj->get_key()), - cct(_cct), http_manager(_http_manager), obj_properties(_obj_properties), - conn(_conn), dest_obj(_dest_obj) {} - int init(const DoutPrefixProvider *dpp); - int is_already_tiered(); -}; - -class RGWLCCloudTierCR : public RGWCoroutine { - RGWLCCloudTierCtx& tier_ctx; - bufferlist out_bl; - int retcode; - struct CreateBucketResult { - string code; - - void decode_xml(XMLObj *obj) { - RGWXMLDecoder::decode_xml("Code", code, obj); - } - } result; - - public: - RGWLCCloudTierCR(RGWLCCloudTierCtx& _tier_ctx) : - RGWCoroutine(_tier_ctx.cct), tier_ctx(_tier_ctx) {} - - int operate(const DoutPrefixProvider *dpp) override; -}; - -class RGWLCCloudCheckCR : public RGWCoroutine { - RGWLCCloudTierCtx& tier_ctx; - bufferlist bl; - bool need_retry{false}; - int retcode; - bool *already_tiered; - rgw_lc_obj_properties obj_properties; - RGWBucketInfo b; - string target_obj_name; - int ret = 0; - std::unique_ptr dest_bucket; - std::unique_ptr dest_obj; - std::unique_ptr get_crf; - - public: - RGWLCCloudCheckCR(RGWLCCloudTierCtx& _tier_ctx, bool *_al_ti) : - RGWCoroutine(_tier_ctx.cct), tier_ctx(_tier_ctx), already_tiered(_al_ti), - obj_properties(tier_ctx.o.meta.mtime, tier_ctx.o.meta.etag, - tier_ctx.o.versioned_epoch, tier_ctx.acl_mappings, - tier_ctx.target_storage_class){} - - int operate(const DoutPrefixProvider *dpp) override; -}; +/* Transition object to cloud endpoint */ +int rgw_cloud_tier_transfer_object(RGWLCCloudTierCtx& tier_ctx); #endif diff --git a/src/rgw/rgw_obj_manifest.h b/src/rgw/rgw_obj_manifest.h index 03bfefad5c2..a3d82a66808 100644 --- a/src/rgw/rgw_obj_manifest.h +++ b/src/rgw/rgw_obj_manifest.h @@ -149,7 +149,7 @@ struct RGWObjManifestRule { WRITE_CLASS_ENCODER(RGWObjManifestRule) struct RGWObjTier { - string name; + std::string name; RGWZoneGroupPlacementTier tier_placement; bool is_multipart_upload{false}; @@ -193,7 +193,7 @@ protected: std::string tail_instance; /* tail object's instance */ - string tier_type; + std::string tier_type; RGWObjTier tier_config; void convert_to_explicit(const DoutPrefixProvider *dpp, const RGWZoneGroup& zonegroup, const RGWZoneParams& zone_params); @@ -452,7 +452,7 @@ public: return tier_type; } - inline void set_tier_type(string value) { + inline void set_tier_type(std::string value) { /* Only "cloud-s3" tier-type is supported for now */ if (value == "cloud-s3") { tier_type = value; diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 751b3e72f51..20ad0d985b8 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -2210,7 +2210,7 @@ void RGWGetObj::execute(optional_yield y) if (attr_iter != attrs.end() && get_type() == RGW_OP_GET_OBJ && get_data) { RGWObjManifest m; decode(m, attr_iter->second); - if (m.get_tier_type() == "cloud") { + if (m.get_tier_type() == "cloud-s3") { /* XXX: Instead send presigned redirect or read-through */ op_ret = -ERR_INVALID_OBJECT_STATE; ldpp_dout(this, 0) << "ERROR: Cannot get cloud tiered object. Failing with " @@ -3988,6 +3988,18 @@ void RGWPutObj::execute(optional_yield y) ldpp_dout(this, 0) << "ERROR: get copy source obj state returned with error" << op_ret << dendl; return; } + bufferlist bl; + if (astate->get_attr(RGW_ATTR_MANIFEST, bl)) { + RGWObjManifest m; + decode(m, bl); + if (m.get_tier_type() == "cloud-s3") { + op_ret = -ERR_INVALID_OBJECT_STATE; + ldpp_dout(this, 0) << "ERROR: Cannot copy cloud tiered object. Failing with " + << op_ret << dendl; + return; + } + } + if (!astate->exists){ op_ret = -ENOENT; return; @@ -5419,6 +5431,20 @@ void RGWCopyObj::execute(optional_yield y) if (op_ret < 0) { return; } + + /* Check if the src object is cloud-tiered */ + bufferlist bl; + if (astate->get_attr(RGW_ATTR_MANIFEST, bl)) { + RGWObjManifest m; + decode(m, bl); + if (m.get_tier_type() == "cloud-s3") { + op_ret = -ERR_INVALID_OBJECT_STATE; + ldpp_dout(this, 0) << "ERROR: Cannot copy cloud tiered object. Failing with " + << op_ret << dendl; + return; + } + } + obj_size = astate->size; if (!s->system_request) { // no quota enforcement for system requests diff --git a/src/rgw/rgw_rest_client.h b/src/rgw/rgw_rest_client.h index 7214babb9b9..caf6ffa35c8 100644 --- a/src/rgw/rgw_rest_client.h +++ b/src/rgw/rgw_rest_client.h @@ -221,7 +221,8 @@ public: class RGWRESTStreamSendRequest : public RGWRESTStreamRWRequest { public: - RGWRESTStreamSendRequest(CephContext *_cct, const string& method, const string& _url, + RGWRESTStreamSendRequest(CephContext *_cct, const std::string& method, + const std::string& _url, ReceiveCB *_cb, param_vec_t *_headers, param_vec_t *_params, std::optional _api_name, HostStyle _host_style = PathStyle) : RGWRESTStreamRWRequest(_cct, method, _url, _cb, _headers, _params, _api_name, _host_style) {} diff --git a/src/rgw/rgw_rest_conn.cc b/src/rgw/rgw_rest_conn.cc index 726bc58a5aa..afcc86db11c 100644 --- a/src/rgw/rgw_rest_conn.cc +++ b/src/rgw/rgw_rest_conn.cc @@ -376,12 +376,12 @@ int RGWRESTConn::get_resource(const DoutPrefixProvider *dpp, return req.complete_request(y); } -int RGWRESTConn::send_resource(const DoutPrefixProvider *dpp, const string& method, - const string& resource, rgw_http_param_pair *extra_params, - map *extra_headers, bufferlist& bl, +int RGWRESTConn::send_resource(const DoutPrefixProvider *dpp, const std::string& method, + const std::string& resource, rgw_http_param_pair *extra_params, + std::map *extra_headers, bufferlist& bl, bufferlist *send_data, RGWHTTPManager *mgr, optional_yield y) { - string url; + std::string url; int ret = get_url(url); if (ret < 0) return ret; @@ -398,7 +398,7 @@ int RGWRESTConn::send_resource(const DoutPrefixProvider *dpp, const string& meth RGWRESTStreamSendRequest req(cct, method, url, &cb, NULL, ¶ms, api_name, host_style); - map headers; + std::map headers; if (extra_headers) { headers.insert(extra_headers->begin(), extra_headers->end()); } diff --git a/src/rgw/rgw_rest_conn.h b/src/rgw/rgw_rest_conn.h index 0dfd0c7415c..cae8120705c 100644 --- a/src/rgw/rgw_rest_conn.h +++ b/src/rgw/rgw_rest_conn.h @@ -201,10 +201,10 @@ public: optional_yield y); int send_resource(const DoutPrefixProvider *dpp, - const string& method, - const string& resource, + const std::string& method, + const std::string& resource, rgw_http_param_pair *extra_params, - map* extra_headers, + std::map* extra_headers, bufferlist& bl, bufferlist *send_data, RGWHTTPManager *mgr, diff --git a/src/rgw/rgw_zone.h b/src/rgw/rgw_zone.h index 05eb34db31a..9a0066d9ad1 100644 --- a/src/rgw/rgw_zone.h +++ b/src/rgw/rgw_zone.h @@ -687,13 +687,13 @@ struct RGWTierACLMapping { RGWTierACLMapping() = default; RGWTierACLMapping(ACLGranteeTypeEnum t, - const string& s, - const string& d) : type(t), + const std::string& s, + const std::string& d) : type(t), source_id(s), dest_id(d) {} void init(const JSONFormattable& config) { - const string& t = config["type"]; + const std::string& t = config["type"]; if (t == "email") { type = ACL_TYPE_EMAIL_USER; @@ -735,11 +735,11 @@ struct RGWZoneGroupPlacementTierS3 { RGWAccessKey key; std::string region; HostStyle host_style{PathStyle}; - string target_storage_class; + std::string target_storage_class; /* Should below be bucket/zone specific?? */ - string target_path; - map acl_mappings; + std::string target_path; + std::map acl_mappings; uint64_t multipart_sync_threshold{DEFAULT_MULTIPART_SYNC_PART_SIZE}; uint64_t multipart_min_part_size{DEFAULT_MULTIPART_SYNC_PART_SIZE};