]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: aws sync, store temp per-object sync info
authorYehuda Sadeh <yehuda@redhat.com>
Wed, 25 Oct 2017 00:06:01 +0000 (17:06 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 10 Apr 2018 15:05:39 +0000 (08:05 -0700)
When doing a multipart object sync, need to store the object's info
so that we can either continue if upload is interrupted, and abort
older upload_id in case object changed.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_data_sync.cc
src/rgw/rgw_data_sync.h
src/rgw/rgw_sync_module_aws.cc
src/rgw/rgw_sync_module_aws.h

index 5a350da2a5f28de465f1ab4db6364d040944fc1c..807f1d81a0040cb497241ef1b71a0cea341fc3f7 100644 (file)
@@ -35,6 +35,7 @@ static string datalog_sync_status_oid_prefix = "datalog.sync-status";
 static string datalog_sync_status_shard_prefix = "datalog.sync-status.shard";
 static string datalog_sync_full_sync_index_prefix = "data.full-sync.index";
 static string bucket_status_oid_prefix = "bucket.sync-status";
+static string object_status_oid_prefix = "bucket.sync-status";
 
 class RGWSyncDebugLogger {
   CephContext *cct;
@@ -3184,6 +3185,13 @@ string RGWBucketSyncStatusManager::status_oid(const string& source_zone,
   return bucket_status_oid_prefix + "." + source_zone + ":" + bs.get_key();
 }
 
+string RGWBucketSyncStatusManager::obj_status_oid(const string& source_zone,
+                                                  const rgw_obj& obj)
+{
+  return object_status_oid_prefix + "." + source_zone + ":" + obj.bucket.get_key() + ":" +
+         obj.key.name + ":" + obj.key.instance;
+}
+
 class RGWCollectBucketSyncStatusCR : public RGWShardCollectCR {
   static constexpr int max_concurrent_shards = 16;
   RGWRados *const store;
index 1c4a6a531b8c4e974122d741e9d45b903ed81bd1..088486e13c4fbeedc65d6d1c135f6067f3071a98 100644 (file)
@@ -531,6 +531,7 @@ public:
   int init_sync_status();
 
   static string status_oid(const string& source_zone, const rgw_bucket_shard& bs);
+  static string obj_status_oid(const string& source_zone, const rgw_obj& obj); /* can be used by sync modules */
 
   int read_sync_status();
   int run();
index 9413566f10261fb0eb6fee4e7127164c75b41060..f89364f777dfc72d4bade054fbce3a09681c9d34 100644 (file)
@@ -5,6 +5,7 @@
 #include "rgw_sync_module.h"
 #include "rgw_data_sync.h"
 #include "rgw_sync_module_aws.h"
+#include "rgw_cr_rados.h"
 #include "rgw_rest_conn.h"
 #include "rgw_cr_rest.h"
 #include "rgw_acl.h"
@@ -224,13 +225,6 @@ public:
   }
 };
 
-struct multipart_part_info {
-  int part_num;
-  uint64_t ofs;
-  uint64_t size;
-  string etag;
-};
-
 class RGWAWSStreamObjToCloudMultipartPartCR : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
   RGWRESTConn *source_conn;
@@ -240,7 +234,7 @@ class RGWAWSStreamObjToCloudMultipartPartCR : public RGWCoroutine {
 
   string upload_id;
 
-  multipart_part_info part_info;
+  rgw_sync_aws_multipart_part_info part_info;
 
   std::shared_ptr<RGWStreamReadHTTPResourceCRF> in_crf;
   std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
@@ -254,7 +248,7 @@ public:
                                 RGWRESTConn *_dest_conn,
                                 const rgw_obj& _dest_obj,
                                 const string& _upload_id,
-                                const multipart_part_info& _part_info,
+                                const rgw_sync_aws_multipart_part_info& _part_info,
                                 string *_petag) : RGWCoroutine(_sync_env->cct),
                                                    sync_env(_sync_env),
                                                    source_conn(_source_conn),
@@ -434,9 +428,9 @@ class RGWAWSCompleteMultipartCR : public RGWCoroutine {
   string upload_id;
 
   struct CompleteMultipartReq {
-    map<int, multipart_part_info> parts;
+    map<int, rgw_sync_aws_multipart_part_info> parts;
 
-    CompleteMultipartReq(const map<int, multipart_part_info>& _parts) : parts(_parts) {}
+    CompleteMultipartReq(const map<int, rgw_sync_aws_multipart_part_info>& _parts) : parts(_parts) {}
 
     void dump_xml(Formatter *f) const {
       for (auto p : parts) {
@@ -467,7 +461,7 @@ public:
                         RGWRESTConn *_dest_conn,
                         const rgw_obj& _dest_obj,
                         string _upload_id,
-                        const map<int, multipart_part_info>& _parts) : RGWCoroutine(_sync_env->cct),
+                        const map<int, rgw_sync_aws_multipart_part_info>& _parts) : RGWCoroutine(_sync_env->cct),
                                                    sync_env(_sync_env),
                                                    dest_conn(_dest_conn),
                                                    dest_obj(_dest_obj),
@@ -533,6 +527,46 @@ public:
   }
 };
 
+
+class RGWAWSStreamAbortMultipartUploadCR : public RGWCoroutine {
+  RGWDataSyncEnv *sync_env;
+  RGWRESTConn *dest_conn;
+  const rgw_obj dest_obj;
+  const rgw_raw_obj status_obj;
+
+  string upload_id;
+
+public:
+
+  RGWAWSStreamAbortMultipartUploadCR(RGWDataSyncEnv *_sync_env,
+                                RGWRESTConn *_dest_conn,
+                                const rgw_obj& _dest_obj,
+                                const rgw_raw_obj& _status_obj,
+                                const string& _upload_id) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+                                                            dest_conn(_dest_conn),
+                                                            dest_obj(_dest_obj),
+                                                            status_obj(_status_obj),
+                                                            upload_id(_upload_id) {}
+
+  int operate() {
+    reenter(this) {
+      yield call(new RGWAWSAbortMultipartCR(sync_env, dest_conn, dest_obj, upload_id));
+      if (retcode < 0) {
+        ldout(sync_env->cct, 0) << "ERROR: failed to abort multipart upload dest obj=" << dest_obj << " upload_id=" << upload_id << " retcode=" << retcode << dendl;
+        /* ignore error, best effort */
+      }
+      yield call(new RGWRadosRemoveCR(sync_env->store, status_obj));
+      if (retcode < 0) {
+        ldout(sync_env->cct, 0) << "ERROR: failed to remove sync status obj obj=" << status_obj << " retcode=" << retcode << dendl;
+        /* ignore error, best effort */
+      }
+      return set_cr_done();
+    }
+
+    return 0;
+  }
+};
+
 class RGWAWSStreamObjToCloudMultipartCR : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
   RGWRESTConn *source_conn;
@@ -543,20 +577,14 @@ class RGWAWSStreamObjToCloudMultipartCR : public RGWCoroutine {
   uint64_t obj_size;
   ceph::real_time mtime;
 
-  string upload_id;
-
-  uint32_t part_size;
-  int num_parts;
-
-  int cur_part{0};
-  uint64_t cur_ofs{0};
+  rgw_sync_aws_multipart_upload_info status;
 
-  map<int, multipart_part_info> parts;
-
-  multipart_part_info *pcur_part_info{nullptr};
+  rgw_sync_aws_multipart_part_info *pcur_part_info{nullptr};
 
   int ret_err{0};
 
+  rgw_raw_obj status_obj;
+
 public:
   RGWAWSStreamObjToCloudMultipartCR(RGWDataSyncEnv *_sync_env,
                                 RGWRESTConn *_source_conn,
@@ -571,65 +599,93 @@ public:
                                                    src_obj(_src_obj),
                                                    dest_obj(_dest_obj),
                                                    obj_size(_obj_size),
-                                                   mtime(_mtime) {
-#warning flexible part size needed
-    part_size = 5 * 1024 * 1024;
-
-    num_parts = (obj_size + part_size - 1) / part_size;
+                                                   mtime(_mtime),
+                                                   status_obj(sync_env->store->get_zone_params().log_pool,
+                                                              RGWBucketSyncStatusManager::obj_status_oid(sync_env->source_zone, src_obj)) {
   }
 
 
   int operate() {
     reenter(this) {
-      yield call(new RGWAWSInitMultipartCR(sync_env, dest_conn, dest_obj, obj_size, mtime, &upload_id));
-      if (retcode < 0) {
-        return set_cr_error(retcode);
+      yield call(new RGWSimpleRadosReadCR<rgw_sync_aws_multipart_upload_info>(sync_env->async_rados, sync_env->store,
+                                                                 status_obj, &status, false));
+
+      if (retcode < 0 && retcode != -ENOENT) {
+        ldout(sync_env->cct, 0) << "ERROR: failed to read sync status of object " << src_obj << " retcode=" << retcode << dendl;
+        return retcode;
+      }
+
+      if (retcode >= 0) {
+        /* check here that mtime and size did not change */
+
+        if (status.mtime != mtime || status.obj_size != obj_size) {
+          yield call(new RGWAWSStreamAbortMultipartUploadCR(sync_env, dest_conn, dest_obj, status_obj, status.upload_id));
+          retcode = -ENOENT;
+        }
       }
 
-      for (cur_part = 1; cur_part <= num_parts; ++cur_part) {
+      if (retcode == -ENOENT) {
+        yield call(new RGWAWSInitMultipartCR(sync_env, dest_conn, dest_obj, status.obj_size, status.mtime, &status.upload_id));
+        if (retcode < 0) {
+          return set_cr_error(retcode);
+        }
+
+        status.obj_size = obj_size;
+        status.mtime = mtime;
+#warning flexible part size needed
+        status.part_size = 5 * 1024 * 1024;
+        status.num_parts = (obj_size + status.part_size - 1) / status.part_size;
+        status.cur_part = 1;
+      }
+
+      for (; (uint32_t)status.cur_part <= status.num_parts; ++status.cur_part) {
         yield {
-          multipart_part_info& cur_part_info = parts[cur_part];
-          cur_part_info.part_num = cur_part;
-          cur_part_info.ofs = cur_ofs;
-          cur_part_info.size = std::min((uint64_t)part_size, obj_size - cur_ofs);
+          rgw_sync_aws_multipart_part_info& cur_part_info = status.parts[status.cur_part];
+          cur_part_info.part_num = status.cur_part;
+          cur_part_info.ofs = status.cur_ofs;
+          cur_part_info.size = std::min((uint64_t)status.part_size, status.obj_size - status.cur_ofs);
 
           pcur_part_info = &cur_part_info;
 
-          cur_ofs += part_size;
+          status.cur_ofs += status.part_size;
 
           call(new RGWAWSStreamObjToCloudMultipartPartCR(sync_env,
                                                              source_conn, src_obj,
                                                              dest_conn, dest_obj,
-                                                             upload_id,
+                                                             status.upload_id,
                                                              cur_part_info,
                                                              &cur_part_info.etag));
         }
 
         if (retcode < 0) {
-          ldout(sync_env->cct, 0) << "ERROR: failed to sync obj=" << src_obj << ", sync via multipart upload, upload_id=" << upload_id << " part number " << cur_part << " (error: " << cpp_strerror(-retcode) << ")" << dendl;
+          ldout(sync_env->cct, 0) << "ERROR: failed to sync obj=" << src_obj << ", sync via multipart upload, upload_id=" << status.upload_id << " part number " << status.cur_part << " (error: " << cpp_strerror(-retcode) << ")" << dendl;
           ret_err = retcode;
-          yield call(new RGWAWSAbortMultipartCR(sync_env, dest_conn, dest_obj, upload_id));
-          if (retcode < 0) {
-            ldout(sync_env->cct, 0) << "ERROR: failed to abort multipart upload obj=" << src_obj << " upload_id=" << upload_id << " part number " << cur_part << " (" << cpp_strerror(-retcode) << ")" << dendl;
-          }
+          yield call(new RGWAWSStreamAbortMultipartUploadCR(sync_env, dest_conn, dest_obj, status_obj, status.upload_id));
           return set_cr_error(ret_err);
         }
 
-        ldout(sync_env->cct, 20) << "sync of object=" << src_obj << " via multipart upload, finished sending part #" << cur_part << " etag=" << pcur_part_info->etag << dendl;
-
+        yield call(new RGWSimpleRadosWriteCR<rgw_sync_aws_multipart_upload_info>(sync_env->async_rados, sync_env->store, status_obj, status));
+        if (retcode < 0) {
+          ldout(sync_env->cct, 0) << "ERROR: failed to store multipart upload state, retcode=" << retcode << dendl;
+          /* continue with upload anyway */
+        }
+        ldout(sync_env->cct, 20) << "sync of object=" << src_obj << " via multipart upload, finished sending part #" << status.cur_part << " etag=" << pcur_part_info->etag << dendl;
       }
 
-      yield call(new RGWAWSCompleteMultipartCR(sync_env, dest_conn, dest_obj, upload_id, parts));
+      yield call(new RGWAWSCompleteMultipartCR(sync_env, dest_conn, dest_obj, status.upload_id, status.parts));
       if (retcode < 0) {
         ldout(sync_env->cct, 0) << "ERROR: failed to complete multipart upload of obj=" << src_obj << " (error: " << cpp_strerror(-retcode) << ")" << dendl;
         ret_err = retcode;
-        yield call(new RGWAWSAbortMultipartCR(sync_env, dest_conn, dest_obj, upload_id));
-        if (retcode < 0) {
-          ldout(sync_env->cct, 0) << "ERROR: failed to abort multipart upload obj=" << src_obj << " upload_id=" << upload_id << " part number " << cur_part << " (" << cpp_strerror(-retcode) << ")" << dendl;
-        }
+        yield call(new RGWAWSStreamAbortMultipartUploadCR(sync_env, dest_conn, dest_obj, status_obj, status.upload_id));
         return set_cr_error(ret_err);
       }
 
+      /* remove status obj */
+      yield call(new RGWRadosRemoveCR(sync_env->store, status_obj));
+      if (retcode < 0) {
+        ldout(sync_env->cct, 0) << "ERROR: failed to abort multipart upload obj=" << src_obj << " upload_id=" << status.upload_id << " part number " << status.cur_part << " (" << cpp_strerror(-retcode) << ")" << dendl;
+        /* ignore error, best effort */
+      }
       return set_cr_done();
     }
 
index 1f80eeaf196244e65a40660160e10e2f77cfb4ac..5180da5f7cdf666e17c37a5014f28a6d0ad458a3 100644 (file)
@@ -3,6 +3,72 @@
 
 #include "rgw_sync_module.h"
 
+struct rgw_sync_aws_multipart_part_info {
+  int part_num{0};
+  uint64_t ofs{0};
+  uint64_t size{0};
+  string etag;
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(part_num, bl);
+    encode(ofs, bl);
+    encode(size, bl);
+    encode(etag, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::iterator& bl) {
+    DECODE_START(1, bl);
+    decode(part_num, bl);
+    decode(ofs, bl);
+    decode(size, bl);
+    decode(etag, bl);
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(rgw_sync_aws_multipart_part_info)
+
+struct rgw_sync_aws_multipart_upload_info {
+  string upload_id;
+  uint64_t obj_size;
+  ceph::real_time mtime;
+  uint32_t part_size{0};
+  uint32_t num_parts{0};
+
+  int cur_part{0};
+  uint64_t cur_ofs{0};
+
+  std::map<int, rgw_sync_aws_multipart_part_info> parts;
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(upload_id, bl);
+    encode(obj_size, bl);
+    encode(mtime, bl);
+    encode(part_size, bl);
+    encode(num_parts, bl);
+    encode(cur_part, bl);
+    encode(cur_ofs, bl);
+    encode(parts, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::iterator& bl) {
+    DECODE_START(1, bl);
+    decode(upload_id, bl);
+    decode(obj_size, bl);
+    decode(mtime, bl);
+    decode(part_size, bl);
+    decode(num_parts, bl);
+    decode(cur_part, bl);
+    decode(cur_ofs, bl);
+    decode(parts, bl);
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(rgw_sync_aws_multipart_upload_info)
+
 class RGWAWSSyncModule : public RGWSyncModule {
  public:
   RGWAWSSyncModule() {}