From: Yehuda Sadeh Date: Fri, 12 Jan 2018 22:57:24 +0000 (-0800) Subject: rgw: aws sync: new config structure, support multiple connections X-Git-Tag: v13.1.0~270^2~31 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=ce1e91541fdad0b526374ad328fe9b407bff536d;p=ceph.git rgw: aws sync: new config structure, support multiple connections Signed-off-by: Yehuda Sadeh --- diff --git a/src/common/ceph_json.cc b/src/common/ceph_json.cc index 89252b272b6da..803a97a6cd3dd 100644 --- a/src/common/ceph_json.cc +++ b/src/common/ceph_json.cc @@ -815,6 +815,15 @@ int JSONFormattable::erase(const string& name) return 0; } +void JSONFormattable::derive_from(const JSONFormattable& parent) +{ + for (auto& o : parent.obj) { + if (obj.find(o.first) == obj.end()) { + obj[o.first] = o.second; + } + } +} + void encode_json(const char *name, const JSONFormattable& v, Formatter *f) { switch (v.type) { diff --git a/src/common/ceph_json.h b/src/common/ceph_json.h index 308abe867cf8f..cbd6e1bf2266e 100644 --- a/src/common/ceph_json.h +++ b/src/common/ceph_json.h @@ -586,6 +586,8 @@ struct JSONFormattable { int set(const string& name, const string& val); int erase(const string& name); + + void derive_from(const JSONFormattable& jf); }; WRITE_CLASS_ENCODER(JSONFormattable) diff --git a/src/rgw/rgw_sync_module_aws.cc b/src/rgw/rgw_sync_module_aws.cc index c027ef7c79bf6..396d49950e14a 100644 --- a/src/rgw/rgw_sync_module_aws.cc +++ b/src/rgw/rgw_sync_module_aws.cc @@ -43,23 +43,189 @@ static string obj_to_aws_path(const rgw_obj& obj) return obj.bucket.name + "/" + obj.key.name; } -struct AWSSyncConfig { - string s3_endpoint; - string bucket_suffix; +/* + + json configuration definition: + + { + default = { + "connection": { + "access_key": , + "secret": , + "endpoint": , + "host_style" + }, + "target_path": "rgwx-${sid}/${bucket}" # how a bucket name is mapped to destination path, + # final object name will be target_path + "/" + obj + }, + "connections": [ + { + "connection_id": , + "access_key": , + "secret": , + "endpoint": , + } ... ], + "targets": [ + { + "source_bucket": , # can specify either specific bucket name (foo), or prefix (foo*) + "target_path": , # (override default) + "connection_id": # (override default) + } ... ], + "acl_mapping": [ # list of source uids and how they map into destination uids in the dest objects acls + { + "source_id": , + "dest_id": + } ... + ] + } + +*/ + +struct AWSSyncConfig_Connection { + string connection_id; + string endpoint; RGWAccessKey key; - HostStyle host_style; + HostStyle host_style{PathStyle}; + + void init(const JSONFormattable& config) { + connection_id = config["connectionn_id"]; + endpoint = config["endpoint"]; + key = RGWAccessKey(config["access_key"], config["secret"]); + string host_style_str = config["host_style"]; + if (host_style_str != "virtual") { + host_style = PathStyle; + } else { + host_style = VirtualStyle; + } + } +}; + +static int conf_to_uint64(CephContext *cct, const JSONFormattable& config, const string& key, uint64_t *pval) +{ + string sval; + if (config.find(key, &sval)) { + string err; + uint64_t val = strict_strtoll(sval.c_str(), 10, &err); + if (!err.empty()) { + ldout(cct, 0) << "ERROR: could not parse configurable value for cloud sync module: " << key << ": " << sval << dendl; + return -EINVAL; + } + *pval = val; + } + return 0; +} +struct AWSSyncConfig_S3 { uint64_t multipart_sync_threshold{DEFAULT_MULTIPART_SYNC_PART_SIZE}; uint64_t multipart_min_part_size{DEFAULT_MULTIPART_SYNC_PART_SIZE}; + + int init(CephContext *cct, const JSONFormattable& config) { + int r = conf_to_uint64(cct, config, "multipart_sync_threshold", &multipart_sync_threshold); + if (r < 0) { + return r; + } + + r = conf_to_uint64(cct, config, "multipart_min_part_size", &multipart_min_part_size); + if (r < 0) { + return r; + } +#define MULTIPART_MIN_POSSIBLE_PART_SIZE (5 * 1024 * 1024) + if (multipart_min_part_size < MULTIPART_MIN_POSSIBLE_PART_SIZE) { + multipart_min_part_size = MULTIPART_MIN_POSSIBLE_PART_SIZE; + } + return 0; + } +}; + +struct AWSSyncConfig_Default { + std::shared_ptr conn; + + void init(const JSONFormattable& config) { + conn = make_shared(); + conn->init(config["connection"]); + } +}; + +struct AWSSyncConfig { + AWSSyncConfig_Default default_conf; + + map connections; + + string bucket_suffix; + + AWSSyncConfig_S3 s3; + + AWSSyncConfig() {} + AWSSyncConfig(const AWSSyncConfig& c) : default_conf(c.default_conf), + connections(c.connections), + bucket_suffix(c.bucket_suffix), + s3(c.s3) {} + + int init(CephContext *cct, const JSONFormattable& config) { + if (config.exists("default")) { + default_conf.init(config["default"]); + } + + auto& default_conn = config["default"]["connection"]; + + for (auto conn : config["connections"].array()) { + auto new_conn = conn; + new_conn.derive_from(default_conn); + + connections[new_conn["connection_id"]].init(new_conn); + } + + bucket_suffix = config["bucket_suffix"]; + + int r = s3.init(cct, config["s3"]); + if (r < 0) { + return r; + } + + return 0; + } }; struct AWSSyncInstanceEnv { AWSSyncConfig conf; string id; - std::unique_ptr conn; + std::unique_ptr default_conn; + + map > connections; AWSSyncInstanceEnv(const AWSSyncConfig& _conf) : conf(_conf) {} + + void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) { + char buf[32]; + snprintf(buf, sizeof(buf), "s3-%llu", (unsigned long long)instance_id); + id = buf; + auto& conn = conf.default_conf.conn; + if (conn) { + default_conn.reset(new S3RESTConn(sync_env->cct, + sync_env->store, + id, + { conn->endpoint }, + conn->key, + conn->host_style)); + } + + for (auto i : conf.connections) { + auto& c = i.second; + connections[c.connection_id].reset(new S3RESTConn(sync_env->cct, + sync_env->store, + id, + { c.endpoint }, + c.key, + c.host_style)); + + } + } + + RGWRESTConn *get_conn(const rgw_bucket& bucket) const { +#warning FIXME + return default_conn.get(); + } }; class RGWRESTStreamGetCRF : public RGWStreamReadHTTPResourceCRF @@ -663,7 +829,7 @@ public: status.src_properties = src_properties; #define MULTIPART_MAX_PARTS 10000 uint64_t min_part_size = obj_size / MULTIPART_MAX_PARTS; - status.part_size = std::max(conf.multipart_min_part_size, min_part_size); + status.part_size = std::max(conf.s3.multipart_min_part_size, min_part_size); status.num_parts = (obj_size + status.part_size - 1) / status.part_size; status.cur_part = 1; } @@ -748,7 +914,8 @@ int decode_attr(map& attrs, const char *attr_name, T *result // maybe use Fetch Remote Obj instead? class RGWAWSHandleRemoteObjCBCR: public RGWStatRemoteObjCBCR { const AWSSyncInstanceEnv& instance; - RGWRESTConn *source_conn; + RGWRESTConn *source_conn{nullptr}; + RGWRESTConn *dest_conn{nullptr}; bufferlist res; unordered_map bucket_created; string target_bucket_name; @@ -805,11 +972,14 @@ public: } target_bucket_name = aws_bucket_name(bucket_info, instance.conf.bucket_suffix); + + dest_conn = instance.get_conn(bucket_info.bucket); + if (bucket_created.find(target_bucket_name) == bucket_created.end()){ yield { ldout(sync_env->cct,0) << "AWS: creating bucket" << target_bucket_name << dendl; bufferlist bl; - call(new RGWPutRawRESTResourceCR (sync_env->cct, instance.conn.get(), + call(new RGWPutRawRESTResourceCR (sync_env->cct, dest_conn, sync_env->http_manager, target_bucket_name, nullptr, bl, &out_bl)); } @@ -858,12 +1028,12 @@ public: src_properties.zone_short_id = src_zone_short_id; src_properties.pg_ver = src_pg_ver; - if (size < instance.conf.multipart_sync_threshold) { + if (size < instance.conf.s3.multipart_sync_threshold) { call(new RGWAWSStreamObjToCloudPlainCR(sync_env, source_conn, src_obj, src_properties, - instance.conn.get(), dest_obj)); + dest_conn, dest_obj)); } else { - call(new RGWAWSStreamObjToCloudMultipartCR(sync_env, instance.conf, source_conn, src_obj, instance.conn.get(), + call(new RGWAWSStreamObjToCloudMultipartCR(sync_env, instance.conf, source_conn, src_obj, dest_conn, dest_obj, size, src_properties)); } } @@ -895,7 +1065,8 @@ public: }; class RGWAWSRemoveRemoteObjCBCR : public RGWCoroutine { - RGWDataSyncEnv *sync_env; + RGWDataSyncEnv *sync_env{nullptr}; + RGWRESTConn *dest_conn{nullptr}; RGWBucketInfo bucket_info; rgw_obj_key key; ceph::real_time mtime; @@ -913,7 +1084,8 @@ public: yield { string path = aws_bucket_name(bucket_info, instance.conf.bucket_suffix) + "/" + aws_object_name(bucket_info, key); ldout(sync_env->cct, 0) << "AWS: removing aws object at" << path << dendl; - call(new RGWDeleteRESTResourceCR(sync_env->cct, instance.conn.get(), + dest_conn = instance.get_conn(bucket_info.bucket); + call(new RGWDeleteRESTResourceCR(sync_env->cct, dest_conn, sync_env->http_manager, path, nullptr /* params */)); } @@ -938,13 +1110,7 @@ public: } void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) { - instance.id = string("s3:") + instance.conf.s3_endpoint; - instance.conn.reset(new S3RESTConn(cct, - sync_env->store, - instance.id, - { instance.conf.s3_endpoint }, - instance.conf.key, - instance.conf.host_style)); + instance.init(sync_env, instance_id); } ~RGWAWSDataSyncModule() {} @@ -977,54 +1143,14 @@ public: } }; -static int conf_to_uint64(CephContext *cct, const JSONFormattable& config, const string& key, uint64_t *pval) -{ - string sval; - if (config.find(key, &sval)) { - string err; - uint64_t val = strict_strtoll(sval.c_str(), 10, &err); - if (!err.empty()) { - ldout(cct, 0) << "ERROR: could not parse configurable value for cloud sync module: " << key << ": " << sval << dendl; - return -EINVAL; - } - *pval = val; - } - return 0; -} - int RGWAWSSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance){ AWSSyncConfig conf; - conf.s3_endpoint = config["s3_endpoint"]; - - string host_style_str = config["host_style"]; - if (host_style_str != "virtual") { - conf.host_style = PathStyle; - } else { - conf.host_style = VirtualStyle; - } - - conf.bucket_suffix = config["bucket_suffix"]; - - string access_key = config["access_key"]; - string secret = config["secret"]; - - conf.key = RGWAccessKey(access_key, secret); - - int r = conf_to_uint64(cct, config, "multipart_sync_threshold", &conf.multipart_sync_threshold); + int r = conf.init(cct, config); if (r < 0) { return r; } - r = conf_to_uint64(cct, config, "multipart_min_part_size", &conf.multipart_min_part_size); - if (r < 0) { - return r; - } -#define MULTIPART_MIN_POSSIBLE_PART_SIZE (5 * 1024 * 1024) - if (conf.multipart_min_part_size < MULTIPART_MIN_POSSIBLE_PART_SIZE) { - conf.multipart_min_part_size = MULTIPART_MIN_POSSIBLE_PART_SIZE; - } - instance->reset(new RGWAWSSyncModuleInstance(cct, conf)); return 0; }