]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/CloudTransition: Store the status of multipart uploads
authorSoumya Koduri <skoduri@redhat.com>
Tue, 18 Aug 2020 07:02:22 +0000 (12:32 +0530)
committerSoumya Koduri <skoduri@redhat.com>
Thu, 18 Nov 2021 07:22:47 +0000 (12:52 +0530)
Store the status of multipart upload parts to verify if the object
hasn't changed during the transition and if yes, abort the upload.

Also avoid re-creating target buckets -

Its not ideal to try creating target bucket for every object
transition to cloud. To avoid it caching the bucket creations in
a map with an expiry period set to '2*lc_debug_interval' for each
entry.

Signed-off-by: Soumya Koduri <skoduri@redhat.com>
src/rgw/rgw_lc.cc
src/rgw/rgw_lc_tier.cc
src/rgw/rgw_lc_tier.h

index dc33345ef157fb80e780b6f6c159bc7fd75da6d8..578e4453354330813e4d5f257428dd40f6aa3e4c 100644 (file)
@@ -1389,6 +1389,7 @@ public:
     tier_ctx.acl_mappings = oc.tier.acl_mappings;
     tier_ctx.multipart_min_part_size = oc.tier.multipart_min_part_size;
     tier_ctx.multipart_sync_threshold = oc.tier.multipart_sync_threshold;
+    tier_ctx.storage_class = oc.tier.storage_class;
 
     ret = crs.run(new RGWLCCloudTierCR(tier_ctx));
     http_manager.stop();
index e1582c4eaf2832d52b1cd589bdda2595fd08d446..bb9c64516654a8833cf49258a31257e3c609b63e 100644 (file)
@@ -13,6 +13,7 @@
 #include "rgw_zone.h"
 #include "rgw_common.h"
 #include "rgw_rest.h"
+#include "svc_zone.h"
 
 #include <boost/algorithm/string/split.hpp>
 #include <boost/algorithm/string.hpp>
@@ -787,9 +788,7 @@ class RGWLCCompleteMultipartCR : public RGWCoroutine {
 
 
 class RGWLCStreamAbortMultipartUploadCR : public RGWCoroutine {
-  CephContext *cct;
-  RGWHTTPManager *http_manager;
-  RGWRESTConn *dest_conn;
+  RGWLCCloudTierCtx& tier_ctx;
   const rgw_obj dest_obj;
   const rgw_raw_obj status_obj;
 
@@ -797,31 +796,25 @@ class RGWLCStreamAbortMultipartUploadCR : public RGWCoroutine {
 
   public:
 
-  RGWLCStreamAbortMultipartUploadCR(CephContext *_cct,
-      RGWHTTPManager *_http_manager,
-      RGWRESTConn *_dest_conn,
+  RGWLCStreamAbortMultipartUploadCR(RGWLCCloudTierCtx& _tier_ctx,
       const rgw_obj& _dest_obj,
       const rgw_raw_obj& _status_obj,
-      const string& _upload_id) : RGWCoroutine(_cct), cct(_cct), http_manager(_http_manager),
-  dest_conn(_dest_conn),
-  dest_obj(_dest_obj),
-  status_obj(_status_obj),
-  upload_id(_upload_id) {}
+      const string& _upload_id) : RGWCoroutine(_tier_ctx.cct), tier_ctx(_tier_ctx),
+                                  dest_obj(_dest_obj), status_obj(_status_obj),
+                                  upload_id(_upload_id) {}
 
   int operate() override {
     reenter(this) {
-      yield call(new RGWLCAbortMultipartCR(cct, http_manager, dest_conn, dest_obj, upload_id));
+      yield call(new RGWLCAbortMultipartCR(tier_ctx.cct, tier_ctx.http_manager, tier_ctx.conn.get(), dest_obj, upload_id));
       if (retcode < 0) {
-        ldout(cct, 0) << "ERROR: failed to abort multipart upload dest obj=" << dest_obj << " upload_id=" << upload_id << " retcode=" << retcode << dendl;
+        ldout(tier_ctx.cct, 0) << "ERROR: failed to abort multipart upload dest obj=" << dest_obj << " upload_id=" << upload_id << " retcode=" << retcode << dendl;
         /* ignore error, best effort */
       }
-#ifdef TODO_STATUS_OBJ
       yield call(new RGWRadosRemoveCR(tier_ctx.store, status_obj));
       if (retcode < 0) {
         ldout(tier_ctx.cct, 0) << "ERROR: failed to remove sync status obj obj=" << status_obj << " retcode=" << retcode << dendl;
         /* ignore error, best effort */
       }
-#endif
       return set_cr_done();
     }
 
@@ -858,7 +851,6 @@ class RGWLCStreamObjToCloudMultipartCR : public RGWCoroutine {
         tier_ctx.o.versioned_epoch,
         tier_ctx.acl_mappings,
         tier_ctx.target_storage_class);
-    bool init_multipart{false};
 
     rgw_obj& obj = tier_ctx.obj;
     obj_size = tier_ctx.o.meta.size;
@@ -870,9 +862,11 @@ class RGWLCStreamObjToCloudMultipartCR : public RGWCoroutine {
     std::shared_ptr<RGWStreamReadCRF> in_crf;
     rgw_rest_obj rest_obj;
 
+    status_obj = rgw_raw_obj(tier_ctx.store->svc()->zone->get_zone_params().log_pool,
+                  "lc_multipart_" + obj.get_oid());
+
     reenter(this) {
-#ifdef TODO_STATUS_OBJ
-      yield call(new RGWSimpleRadosReadCR<rgw_lc_multipart_upload_info>(tier_ctx.async_rados, tier_ctx.store->svc()->sysobj,
+      yield call(new RGWSimpleRadosReadCR<rgw_lc_multipart_upload_info>(tier_ctx.store->svc()->rados->get_async_processor(), tier_ctx.store->svc()->sysobj,
             status_obj, &status, false));
 
       if (retcode < 0 && retcode != -ENOENT) {
@@ -882,17 +876,14 @@ class RGWLCStreamObjToCloudMultipartCR : public RGWCoroutine {
 
       if (retcode >= 0) {
         /* check here that mtime and size did not change */
-        if (status.src_properties.mtime != src_properties.mtime || status.obj_size != obj_size ||
-            status.src_properties.etag != src_properties.etag) {
-          yield call(new RGWLCStreamAbortMultipartUploadCR( tier_ctx.cct, tier_ctx.http_manager, tier_ctx.conn.get(), dest_obj, status_obj, status.upload_id));
+        if (status.mtime != obj_properties.mtime || status.obj_size != obj_size ||
+            status.etag != obj_properties.etag) {
+          yield call(new RGWLCStreamAbortMultipartUploadCR(tier_ctx, dest_obj, status_obj, status.upload_id));
           retcode = -ENOENT;
         }
       }
 
       if (retcode == -ENOENT) {
-      }
-#endif
-      if (!init_multipart) {
         in_crf.reset(new RGWLCStreamReadCRF(tier_ctx.cct, tier_ctx.store->getRados(), tier_ctx.bucket_info, tier_ctx.rctx, tier_ctx.obj, tier_ctx.o.meta.mtime));
 
         in_crf->init();
@@ -906,8 +897,9 @@ class RGWLCStreamObjToCloudMultipartCR : public RGWCoroutine {
           return set_cr_error(retcode);
         }
 
-        init_multipart = true;
         status.obj_size = obj_size;
+        status.mtime = obj_properties.mtime;
+        status.etag = obj_properties.etag;
 #define MULTIPART_MAX_PARTS 10000
         uint64_t min_part_size = obj_size / MULTIPART_MAX_PARTS;
         uint64_t min_conf_size = tier_ctx.multipart_min_part_size;
@@ -942,17 +934,15 @@ class RGWLCStreamObjToCloudMultipartCR : public RGWCoroutine {
         if (retcode < 0) {
           ldout(tier_ctx.cct, 0) << "ERROR: failed to sync obj=" << tier_ctx.obj << ", sync via multipart upload, upload_id=" << status.upload_id << " part number " << status.cur_part << " (error: " << cpp_strerror(-retcode) << ")" << dendl;
           ret_err = retcode;
-          yield call(new RGWLCStreamAbortMultipartUploadCR(tier_ctx.cct, tier_ctx.http_manager, tier_ctx.conn.get(), dest_obj, status_obj, status.upload_id));
+          yield call(new RGWLCStreamAbortMultipartUploadCR(tier_ctx, dest_obj, status_obj, status.upload_id));
           return set_cr_error(ret_err);
         }
 
-#ifdef TODO_STATUS_OBJ
-        yield call(new RGWSimpleRadosWriteCR<rgw_lc_multipart_upload_info>(sync_env->async_rados, sync_env->store->svc()->sysobj, status_obj, status));
+        yield call(new RGWSimpleRadosWriteCR<rgw_lc_multipart_upload_info>(tier_ctx.store->svc()->rados->get_async_processor(), tier_ctx.store->svc()->sysobj, status_obj, status));
         if (retcode < 0) {
           ldout(tier_ctx.cct, 0) << "ERROR: failed to store multipart upload state, retcode=" << retcode << dendl;
           /* continue with upload anyway */
         }
-#endif
         ldout(tier_ctx.cct, 0) << "sync of object=" << tier_ctx.obj << " via multipart upload, finished sending part #" << status.cur_part << " etag=" << pcur_part_info->etag << dendl;
       }
 
@@ -960,64 +950,80 @@ class RGWLCStreamObjToCloudMultipartCR : public RGWCoroutine {
       if (retcode < 0) {
         ldout(tier_ctx.cct, 0) << "ERROR: failed to complete multipart upload of obj=" << tier_ctx.obj << " (error: " << cpp_strerror(-retcode) << ")" << dendl;
         ret_err = retcode;
-        yield call(new RGWLCStreamAbortMultipartUploadCR(tier_ctx.cct, tier_ctx.http_manager, tier_ctx.conn.get(), dest_obj, status_obj, status.upload_id));
+        yield call(new RGWLCStreamAbortMultipartUploadCR(tier_ctx, dest_obj, status_obj, status.upload_id));
         return set_cr_error(ret_err);
       }
 
-#ifdef TODO_STATUS_OBJ
       /* remove status obj */
       yield call(new RGWRadosRemoveCR(tier_ctx.store, status_obj));
       if (retcode < 0) {
         ldout(tier_ctx.cct, 0) << "ERROR: failed to abort multipart upload obj=" << tier_ctx.obj << " upload_id=" << status.upload_id << " part number " << status.cur_part << " (" << cpp_strerror(-retcode) << ")" << dendl;
         /* ignore error, best effort */
       }
-#endif
       return set_cr_done();
     }
     return 0;
   }
 };
 
+map <pair<string, string>, utime_t> target_buckets;
+
 int RGWLCCloudTierCR::operate() {
+  pair<string, string> key(tier_ctx.storage_class, tier_ctx.target_bucket_name);
+  bool bucket_created = false;
+  
   reenter(this) {
 
-    yield {
-      // xxx: find if bucket is already created
-      ldout(tier_ctx.cct,0) << "Cloud_tier_ctx: creating bucket " << tier_ctx.target_bucket_name << dendl;
-      bufferlist bl;
-      call(new RGWPutRawRESTResourceCR <bufferlist> (tier_ctx.cct, tier_ctx.conn.get(),
+    if (target_buckets.find(key) != target_buckets.end()) {
+      utime_t t = target_buckets[key];
+
+      utime_t now = ceph_clock_now();
+
+      if (now - t <  (2 * cct->_conf->rgw_lc_debug_interval)) { /* not expired */
+        bucket_created = true;
+      }
+    }
+
+    if (!bucket_created){
+      yield {
+        ldout(tier_ctx.cct,0) << "Cloud_tier_ctx: creating bucket " << tier_ctx.target_bucket_name << dendl;
+        bufferlist bl;
+        call(new RGWPutRawRESTResourceCR <bufferlist> (tier_ctx.cct, tier_ctx.conn.get(),
             tier_ctx.http_manager,
             tier_ctx.target_bucket_name, nullptr, bl, &out_bl));
-    }
-    if (retcode < 0 ) {
-      RGWXMLDecoder::XMLParser parser;
-      if (!parser.init()) {
-        ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize xml parser for parsing create_bucket response from server" << dendl;
-        return set_cr_error(retcode);
       }
+      if (retcode < 0 ) {
+        RGWXMLDecoder::XMLParser parser;
+        if (!parser.init()) {
+          ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize xml parser for parsing create_bucket response from server" << dendl;
+          return set_cr_error(retcode);
+        }
 
-      if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) {
-        string str(out_bl.c_str(), out_bl.length());
-        ldout(tier_ctx.cct, 5) << "ERROR: failed to parse xml: " << str << dendl;
-        return set_cr_error(retcode);
-      }
+        if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) {
+          string str(out_bl.c_str(), out_bl.length());
+          ldout(tier_ctx.cct, 5) << "ERROR: failed to parse xml: " << str << dendl;
+          return set_cr_error(retcode);
+        }
 
-      try {
-        RGWXMLDecoder::decode_xml("Error", result, &parser, true);
-      } catch (RGWXMLDecoder::err& err) {
-        string str(out_bl.c_str(), out_bl.length());
-        ldout(tier_ctx.cct, 5) << "ERROR: unexpected xml: " << str << dendl;
-        return set_cr_error(retcode);
-      }
+        try {
+          RGWXMLDecoder::decode_xml("Error", result, &parser, true);
+        } catch (RGWXMLDecoder::err& err) {
+          string str(out_bl.c_str(), out_bl.length());
+          ldout(tier_ctx.cct, 5) << "ERROR: unexpected xml: " << str << dendl;
+          return set_cr_error(retcode);
+        }
 
-      if ((result.code != "BucketAlreadyOwnedByYou") &&
-          (result.code != "BucketAlreadyExists")) {
-        return set_cr_error(retcode);
+        if (result.code != "BucketAlreadyOwnedByYou") {
+          return set_cr_error(retcode);
+        }
       }
-    }
 
-    bucket_created = true;
+      target_buckets[key] = ceph_clock_now();
+    }
 
+    /* XXX: even if target_bucket doesnt exist and transition fails, this
+     * co-routine is still returning success..
+     */
     yield {
       uint64_t size = tier_ctx.o.meta.size;
       uint64_t multipart_sync_threshold = tier_ctx.multipart_sync_threshold;
index 0a4da62f6e1fb9ac37f5c174f08cb3b74435e67a..070920c7c8299dca00282f43e8062a60e67c66e6 100644 (file)
@@ -22,6 +22,7 @@ struct RGWLCCloudTierCtx {
   rgw_bucket_dir_entry& o;
   rgw::sal::RGWRadosStore *store;
   RGWBucketInfo& bucket_info;
+  string storage_class;
 
   rgw_obj obj;
   RGWObjectCtx& rctx;
@@ -49,7 +50,6 @@ class RGWLCCloudTierCR : public RGWCoroutine {
   RGWLCCloudTierCtx& tier_ctx;
   bufferlist out_bl;
   int retcode;
-  bool bucket_created = false;
   struct CreateBucketResult {
     string code;
 
@@ -132,6 +132,8 @@ WRITE_CLASS_ENCODER(rgw_lc_obj_properties)
 struct rgw_lc_multipart_upload_info {
   string upload_id;
   uint64_t obj_size;
+  ceph::real_time mtime;
+  string etag;
   uint32_t part_size{0};
   uint32_t num_parts{0};
 
@@ -144,6 +146,8 @@ struct rgw_lc_multipart_upload_info {
     ENCODE_START(1, 1, bl);
     encode(upload_id, bl);
     encode(obj_size, bl);
+    encode(mtime, bl);
+    encode(etag, bl);
     encode(part_size, bl);
     encode(num_parts, bl);
     encode(cur_part, bl);
@@ -156,6 +160,8 @@ struct rgw_lc_multipart_upload_info {
     DECODE_START(1, bl);
     decode(upload_id, bl);
     decode(obj_size, bl);
+    decode(mtime, bl);
+    decode(etag, bl);
     decode(part_size, bl);
     decode(num_parts, bl);
     decode(cur_part, bl);