From: Soumya Koduri Date: Tue, 18 Aug 2020 07:02:22 +0000 (+0530) Subject: rgw/CloudTransition: Store the status of multipart uploads X-Git-Tag: v17.1.0~411^2~11 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=6333c0e50c2ed500e95cbc7ae5e26de6ffc8db29;p=ceph.git rgw/CloudTransition: Store the status of multipart uploads Store the status of multipart upload parts to verify if the object hasn't changed during the transition and if yes, abort the upload. Also avoid re-creating target buckets - Its not ideal to try creating target bucket for every object transition to cloud. To avoid it caching the bucket creations in a map with an expiry period set to '2*lc_debug_interval' for each entry. Signed-off-by: Soumya Koduri --- diff --git a/src/rgw/rgw_lc.cc b/src/rgw/rgw_lc.cc index dc33345ef157..578e44533543 100644 --- a/src/rgw/rgw_lc.cc +++ b/src/rgw/rgw_lc.cc @@ -1389,6 +1389,7 @@ public: tier_ctx.acl_mappings = oc.tier.acl_mappings; tier_ctx.multipart_min_part_size = oc.tier.multipart_min_part_size; tier_ctx.multipart_sync_threshold = oc.tier.multipart_sync_threshold; + tier_ctx.storage_class = oc.tier.storage_class; ret = crs.run(new RGWLCCloudTierCR(tier_ctx)); http_manager.stop(); diff --git a/src/rgw/rgw_lc_tier.cc b/src/rgw/rgw_lc_tier.cc index e1582c4eaf28..bb9c64516654 100644 --- a/src/rgw/rgw_lc_tier.cc +++ b/src/rgw/rgw_lc_tier.cc @@ -13,6 +13,7 @@ #include "rgw_zone.h" #include "rgw_common.h" #include "rgw_rest.h" +#include "svc_zone.h" #include #include @@ -787,9 +788,7 @@ class RGWLCCompleteMultipartCR : public RGWCoroutine { class RGWLCStreamAbortMultipartUploadCR : public RGWCoroutine { - CephContext *cct; - RGWHTTPManager *http_manager; - RGWRESTConn *dest_conn; + RGWLCCloudTierCtx& tier_ctx; const rgw_obj dest_obj; const rgw_raw_obj status_obj; @@ -797,31 +796,25 @@ class RGWLCStreamAbortMultipartUploadCR : public RGWCoroutine { public: - RGWLCStreamAbortMultipartUploadCR(CephContext *_cct, - RGWHTTPManager *_http_manager, - RGWRESTConn *_dest_conn, + RGWLCStreamAbortMultipartUploadCR(RGWLCCloudTierCtx& _tier_ctx, const rgw_obj& _dest_obj, const rgw_raw_obj& _status_obj, - const string& _upload_id) : RGWCoroutine(_cct), cct(_cct), http_manager(_http_manager), - dest_conn(_dest_conn), - dest_obj(_dest_obj), - status_obj(_status_obj), - upload_id(_upload_id) {} + const string& _upload_id) : RGWCoroutine(_tier_ctx.cct), tier_ctx(_tier_ctx), + dest_obj(_dest_obj), status_obj(_status_obj), + upload_id(_upload_id) {} int operate() override { reenter(this) { - yield call(new RGWLCAbortMultipartCR(cct, http_manager, dest_conn, dest_obj, upload_id)); + yield call(new RGWLCAbortMultipartCR(tier_ctx.cct, tier_ctx.http_manager, tier_ctx.conn.get(), dest_obj, upload_id)); if (retcode < 0) { - ldout(cct, 0) << "ERROR: failed to abort multipart upload dest obj=" << dest_obj << " upload_id=" << upload_id << " retcode=" << retcode << dendl; + ldout(tier_ctx.cct, 0) << "ERROR: failed to abort multipart upload dest obj=" << dest_obj << " upload_id=" << upload_id << " retcode=" << retcode << dendl; /* ignore error, best effort */ } -#ifdef TODO_STATUS_OBJ yield call(new RGWRadosRemoveCR(tier_ctx.store, status_obj)); if (retcode < 0) { ldout(tier_ctx.cct, 0) << "ERROR: failed to remove sync status obj obj=" << status_obj << " retcode=" << retcode << dendl; /* ignore error, best effort */ } -#endif return set_cr_done(); } @@ -858,7 +851,6 @@ class RGWLCStreamObjToCloudMultipartCR : public RGWCoroutine { tier_ctx.o.versioned_epoch, tier_ctx.acl_mappings, tier_ctx.target_storage_class); - bool init_multipart{false}; rgw_obj& obj = tier_ctx.obj; obj_size = tier_ctx.o.meta.size; @@ -870,9 +862,11 @@ class RGWLCStreamObjToCloudMultipartCR : public RGWCoroutine { std::shared_ptr in_crf; rgw_rest_obj rest_obj; + status_obj = rgw_raw_obj(tier_ctx.store->svc()->zone->get_zone_params().log_pool, + "lc_multipart_" + obj.get_oid()); + reenter(this) { -#ifdef TODO_STATUS_OBJ - yield call(new RGWSimpleRadosReadCR(tier_ctx.async_rados, tier_ctx.store->svc()->sysobj, + yield call(new RGWSimpleRadosReadCR(tier_ctx.store->svc()->rados->get_async_processor(), tier_ctx.store->svc()->sysobj, status_obj, &status, false)); if (retcode < 0 && retcode != -ENOENT) { @@ -882,17 +876,14 @@ class RGWLCStreamObjToCloudMultipartCR : public RGWCoroutine { if (retcode >= 0) { /* check here that mtime and size did not change */ - if (status.src_properties.mtime != src_properties.mtime || status.obj_size != obj_size || - status.src_properties.etag != src_properties.etag) { - yield call(new RGWLCStreamAbortMultipartUploadCR( tier_ctx.cct, tier_ctx.http_manager, tier_ctx.conn.get(), dest_obj, status_obj, status.upload_id)); + if (status.mtime != obj_properties.mtime || status.obj_size != obj_size || + status.etag != obj_properties.etag) { + yield call(new RGWLCStreamAbortMultipartUploadCR(tier_ctx, dest_obj, status_obj, status.upload_id)); retcode = -ENOENT; } } if (retcode == -ENOENT) { - } -#endif - if (!init_multipart) { in_crf.reset(new RGWLCStreamReadCRF(tier_ctx.cct, tier_ctx.store->getRados(), tier_ctx.bucket_info, tier_ctx.rctx, tier_ctx.obj, tier_ctx.o.meta.mtime)); in_crf->init(); @@ -906,8 +897,9 @@ class RGWLCStreamObjToCloudMultipartCR : public RGWCoroutine { return set_cr_error(retcode); } - init_multipart = true; status.obj_size = obj_size; + status.mtime = obj_properties.mtime; + status.etag = obj_properties.etag; #define MULTIPART_MAX_PARTS 10000 uint64_t min_part_size = obj_size / MULTIPART_MAX_PARTS; uint64_t min_conf_size = tier_ctx.multipart_min_part_size; @@ -942,17 +934,15 @@ class RGWLCStreamObjToCloudMultipartCR : public RGWCoroutine { if (retcode < 0) { ldout(tier_ctx.cct, 0) << "ERROR: failed to sync obj=" << tier_ctx.obj << ", sync via multipart upload, upload_id=" << status.upload_id << " part number " << status.cur_part << " (error: " << cpp_strerror(-retcode) << ")" << dendl; ret_err = retcode; - yield call(new RGWLCStreamAbortMultipartUploadCR(tier_ctx.cct, tier_ctx.http_manager, tier_ctx.conn.get(), dest_obj, status_obj, status.upload_id)); + yield call(new RGWLCStreamAbortMultipartUploadCR(tier_ctx, dest_obj, status_obj, status.upload_id)); return set_cr_error(ret_err); } -#ifdef TODO_STATUS_OBJ - yield call(new RGWSimpleRadosWriteCR(sync_env->async_rados, sync_env->store->svc()->sysobj, status_obj, status)); + yield call(new RGWSimpleRadosWriteCR(tier_ctx.store->svc()->rados->get_async_processor(), tier_ctx.store->svc()->sysobj, status_obj, status)); if (retcode < 0) { ldout(tier_ctx.cct, 0) << "ERROR: failed to store multipart upload state, retcode=" << retcode << dendl; /* continue with upload anyway */ } -#endif ldout(tier_ctx.cct, 0) << "sync of object=" << tier_ctx.obj << " via multipart upload, finished sending part #" << status.cur_part << " etag=" << pcur_part_info->etag << dendl; } @@ -960,64 +950,80 @@ class RGWLCStreamObjToCloudMultipartCR : public RGWCoroutine { if (retcode < 0) { ldout(tier_ctx.cct, 0) << "ERROR: failed to complete multipart upload of obj=" << tier_ctx.obj << " (error: " << cpp_strerror(-retcode) << ")" << dendl; ret_err = retcode; - yield call(new RGWLCStreamAbortMultipartUploadCR(tier_ctx.cct, tier_ctx.http_manager, tier_ctx.conn.get(), dest_obj, status_obj, status.upload_id)); + yield call(new RGWLCStreamAbortMultipartUploadCR(tier_ctx, dest_obj, status_obj, status.upload_id)); return set_cr_error(ret_err); } -#ifdef TODO_STATUS_OBJ /* remove status obj */ yield call(new RGWRadosRemoveCR(tier_ctx.store, status_obj)); if (retcode < 0) { ldout(tier_ctx.cct, 0) << "ERROR: failed to abort multipart upload obj=" << tier_ctx.obj << " upload_id=" << status.upload_id << " part number " << status.cur_part << " (" << cpp_strerror(-retcode) << ")" << dendl; /* ignore error, best effort */ } -#endif return set_cr_done(); } return 0; } }; +map , utime_t> target_buckets; + int RGWLCCloudTierCR::operate() { + pair key(tier_ctx.storage_class, tier_ctx.target_bucket_name); + bool bucket_created = false; + reenter(this) { - yield { - // xxx: find if bucket is already created - ldout(tier_ctx.cct,0) << "Cloud_tier_ctx: creating bucket " << tier_ctx.target_bucket_name << dendl; - bufferlist bl; - call(new RGWPutRawRESTResourceCR (tier_ctx.cct, tier_ctx.conn.get(), + if (target_buckets.find(key) != target_buckets.end()) { + utime_t t = target_buckets[key]; + + utime_t now = ceph_clock_now(); + + if (now - t < (2 * cct->_conf->rgw_lc_debug_interval)) { /* not expired */ + bucket_created = true; + } + } + + if (!bucket_created){ + yield { + ldout(tier_ctx.cct,0) << "Cloud_tier_ctx: creating bucket " << tier_ctx.target_bucket_name << dendl; + bufferlist bl; + call(new RGWPutRawRESTResourceCR (tier_ctx.cct, tier_ctx.conn.get(), tier_ctx.http_manager, tier_ctx.target_bucket_name, nullptr, bl, &out_bl)); - } - if (retcode < 0 ) { - RGWXMLDecoder::XMLParser parser; - if (!parser.init()) { - ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize xml parser for parsing create_bucket response from server" << dendl; - return set_cr_error(retcode); } + if (retcode < 0 ) { + RGWXMLDecoder::XMLParser parser; + if (!parser.init()) { + ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize xml parser for parsing create_bucket response from server" << dendl; + return set_cr_error(retcode); + } - if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) { - string str(out_bl.c_str(), out_bl.length()); - ldout(tier_ctx.cct, 5) << "ERROR: failed to parse xml: " << str << dendl; - return set_cr_error(retcode); - } + if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) { + string str(out_bl.c_str(), out_bl.length()); + ldout(tier_ctx.cct, 5) << "ERROR: failed to parse xml: " << str << dendl; + return set_cr_error(retcode); + } - try { - RGWXMLDecoder::decode_xml("Error", result, &parser, true); - } catch (RGWXMLDecoder::err& err) { - string str(out_bl.c_str(), out_bl.length()); - ldout(tier_ctx.cct, 5) << "ERROR: unexpected xml: " << str << dendl; - return set_cr_error(retcode); - } + try { + RGWXMLDecoder::decode_xml("Error", result, &parser, true); + } catch (RGWXMLDecoder::err& err) { + string str(out_bl.c_str(), out_bl.length()); + ldout(tier_ctx.cct, 5) << "ERROR: unexpected xml: " << str << dendl; + return set_cr_error(retcode); + } - if ((result.code != "BucketAlreadyOwnedByYou") && - (result.code != "BucketAlreadyExists")) { - return set_cr_error(retcode); + if (result.code != "BucketAlreadyOwnedByYou") { + return set_cr_error(retcode); + } } - } - bucket_created = true; + target_buckets[key] = ceph_clock_now(); + } + /* XXX: even if target_bucket doesnt exist and transition fails, this + * co-routine is still returning success.. + */ yield { uint64_t size = tier_ctx.o.meta.size; uint64_t multipart_sync_threshold = tier_ctx.multipart_sync_threshold; diff --git a/src/rgw/rgw_lc_tier.h b/src/rgw/rgw_lc_tier.h index 0a4da62f6e1f..070920c7c829 100644 --- a/src/rgw/rgw_lc_tier.h +++ b/src/rgw/rgw_lc_tier.h @@ -22,6 +22,7 @@ struct RGWLCCloudTierCtx { rgw_bucket_dir_entry& o; rgw::sal::RGWRadosStore *store; RGWBucketInfo& bucket_info; + string storage_class; rgw_obj obj; RGWObjectCtx& rctx; @@ -49,7 +50,6 @@ class RGWLCCloudTierCR : public RGWCoroutine { RGWLCCloudTierCtx& tier_ctx; bufferlist out_bl; int retcode; - bool bucket_created = false; struct CreateBucketResult { string code; @@ -132,6 +132,8 @@ WRITE_CLASS_ENCODER(rgw_lc_obj_properties) struct rgw_lc_multipart_upload_info { string upload_id; uint64_t obj_size; + ceph::real_time mtime; + string etag; uint32_t part_size{0}; uint32_t num_parts{0}; @@ -144,6 +146,8 @@ struct rgw_lc_multipart_upload_info { ENCODE_START(1, 1, bl); encode(upload_id, bl); encode(obj_size, bl); + encode(mtime, bl); + encode(etag, bl); encode(part_size, bl); encode(num_parts, bl); encode(cur_part, bl); @@ -156,6 +160,8 @@ struct rgw_lc_multipart_upload_info { DECODE_START(1, bl); decode(upload_id, bl); decode(obj_size, bl); + decode(mtime, bl); + decode(etag, bl); decode(part_size, bl); decode(num_parts, bl); decode(cur_part, bl);