#define dout_subsys ceph_subsys_rgw
+
+#define DEFAULT_MULTIPART_SYNC_PART_SIZE (32 * 1024 * 1024)
+
// TODO: have various bucket naming schemes at a global/user and a bucket level
static string aws_bucket_name(const RGWBucketInfo& bucket_info, bool user_buckets=false){
struct AWSSyncConfig {
string s3_endpoint;
RGWAccessKey key;
+
+ uint64_t multipart_sync_threshold{DEFAULT_MULTIPART_SYNC_PART_SIZE};
+ uint64_t multipart_min_part_size{DEFAULT_MULTIPART_SYNC_PART_SIZE};
};
struct AWSSyncInstanceEnv {
class RGWAWSStreamObjToCloudMultipartCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
+ const AWSSyncConfig& conf;
RGWRESTConn *source_conn;
RGWRESTConn *dest_conn;
rgw_obj src_obj;
public:
RGWAWSStreamObjToCloudMultipartCR(RGWDataSyncEnv *_sync_env,
+ const AWSSyncConfig& _conf,
RGWRESTConn *_source_conn,
const rgw_obj& _src_obj,
RGWRESTConn *_dest_conn,
uint64_t _obj_size,
const rgw_sync_aws_src_obj_properties& _src_properties) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
+ conf(_conf),
source_conn(_source_conn),
dest_conn(_dest_conn),
src_obj(_src_obj),
status.obj_size = obj_size;
status.src_properties = src_properties;
-#warning flexible part size needed
- status.part_size = 5 * 1024 * 1024;
+#define MULTIPART_MAX_PARTS 10000
+ uint64_t min_part_size = obj_size / MULTIPART_MAX_PARTS;
+ status.part_size = std::max(conf.multipart_min_part_size, min_part_size);
status.num_parts = (obj_size + status.part_size - 1) / status.part_size;
status.cur_part = 1;
}
uint32_t src_zone_short_id{0};
uint64_t src_pg_ver{0};
- static constexpr uint32_t multipart_threshold = 8 * 1024 * 1024;
-
public:
RGWAWSHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
RGWBucketInfo& _bucket_info,
src_properties.zone_short_id = src_zone_short_id;
src_properties.pg_ver = src_pg_ver;
- if (size < multipart_threshold) {
+ if (size < instance.conf.multipart_sync_threshold) {
call(new RGWAWSStreamObjToCloudPlainCR(sync_env, source_conn, src_obj,
src_properties,
instance.conn.get(), dest_obj));
} else {
- call(new RGWAWSStreamObjToCloudMultipartCR(sync_env, source_conn, src_obj, instance.conn.get(),
+ call(new RGWAWSStreamObjToCloudMultipartCR(sync_env, instance.conf, source_conn, src_obj, instance.conn.get(),
dest_obj, size, src_properties));
}
}
}
};
+static int conf_to_uint64(CephContext *cct, const map<string, string, ltstr_nocase>& config, const string& key, uint64_t *pval)
+{
+ auto i = config.find(key);
+ if (i != config.end()) {
+ string err;
+ uint64_t val = strict_strtoll(i->second.c_str(), 10, &err);
+ if (!err.empty()) {
+ ldout(cct, 0) << "ERROR: could not parse configurable value for cloud sync module: " << i->first << ": " << i->second << dendl;
+ return -EINVAL;
+ }
+ *pval = val;
+ }
+ return 0;
+}
+
int RGWAWSSyncModule::create_instance(CephContext *cct, map<string, string, ltstr_nocase>& config, RGWSyncModuleInstanceRef *instance){
AWSSyncConfig conf;
auto i = config.find("s3_endpoint");
conf.key = RGWAccessKey(access_key, secret);
+ int r = conf_to_uint64(cct, config, "multipart_sync_threshold", &conf.multipart_sync_threshold);
+ if (r < 0) {
+ return r;
+ }
+
+ r = conf_to_uint64(cct, config, "multipart_min_part_size", &conf.multipart_min_part_size);
+ if (r < 0) {
+ return r;
+ }
+#define MULTIPART_MIN_POSSIBLE_PART_SIZE (5 * 1024 * 1024)
+ if (conf.multipart_min_part_size < MULTIPART_MIN_POSSIBLE_PART_SIZE) {
+ conf.multipart_min_part_size = MULTIPART_MIN_POSSIBLE_PART_SIZE;
+ }
+
instance->reset(new RGWAWSSyncModuleInstance(cct, conf));
return 0;
}