]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: aws sync: configurable multipart threshold, part size
authorYehuda Sadeh <yehuda@redhat.com>
Mon, 30 Oct 2017 23:48:05 +0000 (16:48 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 10 Apr 2018 15:05:39 +0000 (08:05 -0700)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_sync_module_aws.cc

index 4833b95a11a67defe050a03f17b594db0a7c885f..73fb938b2e6dc23b8c0b25c26ef4895e7c4ec949 100644 (file)
@@ -14,6 +14,9 @@
 
 #define dout_subsys ceph_subsys_rgw
 
+
+#define DEFAULT_MULTIPART_SYNC_PART_SIZE (32 * 1024 * 1024)
+
 // TODO: have various bucket naming schemes at a global/user and a bucket level
 
 static string aws_bucket_name(const RGWBucketInfo& bucket_info, bool user_buckets=false){
@@ -41,6 +44,9 @@ static string obj_to_aws_path(const rgw_obj& obj)
 struct AWSSyncConfig {
   string s3_endpoint;
   RGWAccessKey key;
+
+  uint64_t multipart_sync_threshold{DEFAULT_MULTIPART_SYNC_PART_SIZE};
+  uint64_t multipart_min_part_size{DEFAULT_MULTIPART_SYNC_PART_SIZE};
 };
 
 struct AWSSyncInstanceEnv {
@@ -591,6 +597,7 @@ public:
 
 class RGWAWSStreamObjToCloudMultipartCR : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
+  const AWSSyncConfig& conf;
   RGWRESTConn *source_conn;
   RGWRESTConn *dest_conn;
   rgw_obj src_obj;
@@ -610,6 +617,7 @@ class RGWAWSStreamObjToCloudMultipartCR : public RGWCoroutine {
 
 public:
   RGWAWSStreamObjToCloudMultipartCR(RGWDataSyncEnv *_sync_env,
+                                const AWSSyncConfig& _conf,
                                 RGWRESTConn *_source_conn,
                                 const rgw_obj& _src_obj,
                                 RGWRESTConn *_dest_conn,
@@ -617,6 +625,7 @@ public:
                                 uint64_t _obj_size,
                                 const rgw_sync_aws_src_obj_properties& _src_properties) : RGWCoroutine(_sync_env->cct),
                                                    sync_env(_sync_env),
+                                                   conf(_conf),
                                                    source_conn(_source_conn),
                                                    dest_conn(_dest_conn),
                                                    src_obj(_src_obj),
@@ -656,8 +665,9 @@ public:
 
         status.obj_size = obj_size;
         status.src_properties = src_properties;
-#warning flexible part size needed
-        status.part_size = 5 * 1024 * 1024;
+#define MULTIPART_MAX_PARTS 10000
+        uint64_t min_part_size = obj_size / MULTIPART_MAX_PARTS;
+        status.part_size = std::max(conf.multipart_min_part_size, min_part_size);
         status.num_parts = (obj_size + status.part_size - 1) / status.part_size;
         status.cur_part = 1;
       }
@@ -753,8 +763,6 @@ class RGWAWSHandleRemoteObjCBCR: public RGWStatRemoteObjCBCR {
   uint32_t src_zone_short_id{0};
   uint64_t src_pg_ver{0};
 
-  static constexpr uint32_t multipart_threshold = 8 * 1024 * 1024;
-
 public:
   RGWAWSHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
                             RGWBucketInfo& _bucket_info,
@@ -825,12 +833,12 @@ public:
         src_properties.zone_short_id = src_zone_short_id;
         src_properties.pg_ver = src_pg_ver;
 
-        if (size < multipart_threshold) {
+        if (size < instance.conf.multipart_sync_threshold) {
           call(new RGWAWSStreamObjToCloudPlainCR(sync_env, source_conn, src_obj,
                                                  src_properties,
                                                  instance.conn.get(), dest_obj));
         } else {
-          call(new RGWAWSStreamObjToCloudMultipartCR(sync_env, source_conn, src_obj, instance.conn.get(),
+          call(new RGWAWSStreamObjToCloudMultipartCR(sync_env, instance.conf, source_conn, src_obj, instance.conn.get(),
                                                      dest_obj, size, src_properties));
         }
       }
@@ -943,6 +951,21 @@ public:
   }
 };
 
+static int conf_to_uint64(CephContext *cct, const map<string, string, ltstr_nocase>& config, const string& key, uint64_t *pval)
+{
+  auto i = config.find(key);
+  if (i != config.end()) {
+    string err;
+    uint64_t val = strict_strtoll(i->second.c_str(), 10, &err);
+    if (!err.empty()) {
+      ldout(cct, 0) << "ERROR: could not parse configurable value for cloud sync module: " << i->first << ": " << i->second << dendl;
+      return -EINVAL;
+    }
+    *pval = val;
+  }
+  return 0;
+}
+
 int RGWAWSSyncModule::create_instance(CephContext *cct, map<string, string, ltstr_nocase>& config,  RGWSyncModuleInstanceRef *instance){
   AWSSyncConfig conf;
   auto i = config.find("s3_endpoint");
@@ -962,6 +985,20 @@ int RGWAWSSyncModule::create_instance(CephContext *cct, map<string, string, ltst
 
   conf.key = RGWAccessKey(access_key, secret);
 
+  int r = conf_to_uint64(cct, config, "multipart_sync_threshold", &conf.multipart_sync_threshold);
+  if (r < 0) {
+    return r;
+  }
+
+  r = conf_to_uint64(cct, config, "multipart_min_part_size", &conf.multipart_min_part_size);
+  if (r < 0) {
+    return r;
+  }
+#define MULTIPART_MIN_POSSIBLE_PART_SIZE (5 * 1024 * 1024)
+  if (conf.multipart_min_part_size < MULTIPART_MIN_POSSIBLE_PART_SIZE) {
+    conf.multipart_min_part_size = MULTIPART_MIN_POSSIBLE_PART_SIZE;
+  }
+
   instance->reset(new RGWAWSSyncModuleInstance(cct, conf));
   return 0;
 }