From 6e83d24ddef8ac9b3fad49d15246497fc5f0b960 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Mon, 15 Jan 2018 18:01:48 -0800 Subject: [PATCH] rgw: aws sync: use configurable target path Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_sync_module_aws.cc | 154 ++++++++++++++++++++++++++------- 1 file changed, 125 insertions(+), 29 deletions(-) diff --git a/src/rgw/rgw_sync_module_aws.cc b/src/rgw/rgw_sync_module_aws.cc index fb744fb7df8d3..560e0ffc956b3 100644 --- a/src/rgw/rgw_sync_module_aws.cc +++ b/src/rgw/rgw_sync_module_aws.cc @@ -17,30 +17,24 @@ #define DEFAULT_MULTIPART_SYNC_PART_SIZE (32 * 1024 * 1024) -// TODO: have various bucket naming schemes at a global/user and a bucket level +static string default_target_path = "rgw-${zonegroup}-${sid}/${bucket}"; -static string aws_bucket_name(const RGWBucketInfo& bucket_info, const string& bucket_suffix, bool user_buckets=false){ - string bucket_name="rgwx" + bucket_info.zonegroup; - if (user_buckets){ - bucket_name+=bucket_info.owner.tenant + bucket_info.owner.id; - } - bucket_name.erase(std::remove(bucket_name.begin(),bucket_name.end(),'-')); - if (!bucket_suffix.empty()) { - bucket_name = bucket_name + "-" + bucket_suffix; +static string get_key_oid(const rgw_obj_key& key) +{ + string oid = key.name; + if (!key.instance.empty() && + !key.have_null_instance()) { + oid += string(":") + key.instance; } - return bucket_name; -} - -static string aws_object_name(const RGWBucketInfo& bucket_info, const rgw_obj_key&key, bool user_buckets=false){ - string object_name; - object_name += bucket_info.owner.to_str() + "/"; - object_name += bucket_info.bucket.name + "/" + key.name; - return object_name; + return oid; } static string obj_to_aws_path(const rgw_obj& obj) { - return obj.bucket.name + "/" + obj.key.name; + string path = obj.bucket.name + "/" + get_key_oid(obj.key); + + + return path; } /* @@ -79,6 +73,19 @@ static string obj_to_aws_path(const rgw_obj& obj) ] } +target path optional variables: + +(evaluated at init) +sid: sync instance id, randomly generated by sync process on first sync initalization +zonegroup: zonegroup name +zonegroup_id: zonegroup name +zone: zone name +zone_id: zone name + +(evaluated when syncing) +bucket: bucket name +owner: bucket owner + */ struct AWSSyncConfig_Connection { @@ -158,10 +165,15 @@ struct AWSSyncConfig_S3 { struct AWSSyncConfig_Default { std::shared_ptr conn; + string target_path; void init(const JSONFormattable& config) { conn = make_shared(); conn->init(config["connection"]); + target_path = config["target_path"]; + if (target_path.empty()) { + target_path = default_target_path; + } } void dump_conf(CephContext *cct, JSONFormatter& jf) const { @@ -169,6 +181,7 @@ struct AWSSyncConfig_Default { if (conn) { conn->dump_conf(cct, jf); } + encode_json("target_path", target_path, &jf); } }; @@ -184,6 +197,26 @@ struct AWSSyncConfig_Target { } }; +static void find_and_replace(const string& src, const string& find, const string& replace, string *dest) +{ + string s = src; + + size_t pos = s.find(find); + while (pos != string::npos) { + size_t next_ofs = pos + find.size(); + s = s.substr(0, pos) + replace + s.substr(next_ofs); + pos = s.find(find, next_ofs); + } + + *dest = s; +} + +static void apply_meta_param(const string& src, const string& param, const string& val, string *dest) +{ + string s = string("${") + param + "}"; + find_and_replace(src, s, val, dest); +} + struct AWSSyncConfig { AWSSyncConfig_Default default_conf; @@ -236,10 +269,6 @@ struct AWSSyncConfig { } AWSSyncConfig() {} - AWSSyncConfig(const AWSSyncConfig& c) : default_conf(c.default_conf), - connections(c.connections), - bucket_suffix(c.bucket_suffix), - s3(c.s3) {} int init(CephContext *cct, const JSONFormattable& config) { if (config.exists("default")) { @@ -304,6 +333,27 @@ struct AWSSyncConfig { return 0; } + void expand_target(RGWDataSyncEnv *sync_env, const string& sid, const string& path, string *dest) { + apply_meta_param(path, "sid", sid, dest); + + RGWZoneGroup& zg = sync_env->store->get_zonegroup(); + apply_meta_param(path, "zonegroup", zg.get_name(), dest); + apply_meta_param(path, "zonegroup_id", zg.get_id(), dest); + + RGWZone& zone = sync_env->store->get_zone(); + apply_meta_param(path, "zone", zone.name, dest); + apply_meta_param(path, "zone_id", zone.id, dest); + } + + void update_config(RGWDataSyncEnv *sync_env, const string& sid) { + expand_target(sync_env, sid, default_conf.target_path, &default_conf.target_path); + ldout(sync_env->cct, 20) << "updated target: (default) -> " << default_conf.target_path << dendl; + for (auto& t : explicit_targets) { + expand_target(sync_env, sid, t.second.path, &t.second.path); + ldout(sync_env->cct, 20) << "updated target: " << t.first << " -> " << t.second.path << dendl; + } + } + void dump_conf(CephContext *cct, JSONFormatter& jf) const { Formatter::ObjectSection config(jf, "config"); default_conf.dump_conf(cct, jf); @@ -318,13 +368,54 @@ struct AWSSyncConfig { { // targets Formatter::ArraySection as(jf, "targets"); - for (auto t : explicit_targets) { + for (auto& t : explicit_targets) { Formatter::ObjectSection target_section(jf, "target"); encode_json("name", t.first, &jf); t.second.dump_conf(cct, jf); } } } + + string get_path(const RGWBucketInfo& bucket_info, + const rgw_obj_key& obj) const { + string bucket_str; + string owner; + if (!bucket_info.owner.tenant.empty()) { + bucket_str = owner = bucket_info.owner.tenant + "-"; + owner += bucket_info.owner.id; + } + bucket_str += bucket_info.bucket.name; + if (!bucket_suffix.empty()) { + bucket_str = bucket_str + "-" + bucket_suffix; + } + const Target *target{nullptr}; + const string *path{nullptr}; + if (find_target(bucket_info.bucket, &target)) { + path = &target->path; + } + if (!path || path->empty()) { + path = &default_conf.target_path; + } + + string new_path; + apply_meta_param(*path, "bucket", bucket_str, &new_path); + apply_meta_param(new_path, "owner", owner, &new_path); + + new_path += string("/") + get_key_oid(obj); + + return new_path; + } + + void get_target(const RGWBucketInfo& bucket_info, + const rgw_obj_key& obj, + string *bucket_name, + string *obj_name) const { + string path = get_path(bucket_info, obj); + size_t pos = path.find('/'); + + *bucket_name = path.substr(0, pos); + *obj_name = path.substr(pos + 1); + } }; struct AWSSyncInstanceEnv { @@ -338,8 +429,11 @@ struct AWSSyncInstanceEnv { void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) { char buf[32]; - snprintf(buf, sizeof(buf), "s3-%llu", (unsigned long long)instance_id); + snprintf(buf, sizeof(buf), "%llx", (unsigned long long)instance_id); id = buf; + + conf.update_config(sync_env, id); + auto& conn = conf.default_conf.conn; if (conn) { default_conn.reset(new S3RESTConn(sync_env->cct, @@ -365,7 +459,8 @@ struct AWSSyncInstanceEnv { int get_conn(RGWDataSyncEnv *sync_env, const rgw_bucket& bucket, RGWRESTConn **connection) const { const AWSSyncConfig::Target *target; - if (!conf.find_target(bucket, &target)) { + if (!conf.find_target(bucket, &target) || + target->connection_id.empty()) { ldout(sync_env->cct, 20) << "Couldn't find configured target connection for bucket " << bucket.name << ", using default connection" << dendl; *connection = default_conn.get(); @@ -1075,6 +1170,7 @@ class RGWAWSHandleRemoteObjCBCR: public RGWStatRemoteObjCBCR { bufferlist res; unordered_map bucket_created; string target_bucket_name; + string target_obj_name; rgw_rest_obj rest_obj; int ret{0}; @@ -1127,7 +1223,7 @@ public: return set_cr_error(-EINVAL); } - target_bucket_name = aws_bucket_name(bucket_info, instance.conf.bucket_suffix); + instance.conf.get_target(bucket_info, key, &target_bucket_name, &target_obj_name); ret = instance.get_conn(sync_env, bucket_info.bucket, &dest_conn); if (ret < 0) { @@ -1137,7 +1233,7 @@ public: if (bucket_created.find(target_bucket_name) == bucket_created.end()){ yield { - ldout(sync_env->cct,0) << "AWS: creating bucket" << target_bucket_name << dendl; + ldout(sync_env->cct,0) << "AWS: creating bucket " << target_bucket_name << dendl; bufferlist bl; call(new RGWPutRawRESTResourceCR (sync_env->cct, dest_conn, sync_env->http_manager, @@ -1179,7 +1275,7 @@ public: rgw_bucket target_bucket; target_bucket.name = target_bucket_name; /* this is only possible because we only use bucket name for uri resolution */ - rgw_obj dest_obj(target_bucket, aws_object_name(bucket_info, key)); + rgw_obj dest_obj(target_bucket, target_obj_name); rgw_sync_aws_src_obj_properties src_properties; @@ -1243,7 +1339,7 @@ public: ldout(sync_env->cct, 0) << ": remove remote obj: z=" << sync_env->source_zone << " b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << dendl; yield { - string path = aws_bucket_name(bucket_info, instance.conf.bucket_suffix) + "/" + aws_object_name(bucket_info, key); + string path = instance.conf.get_path(bucket_info, key); ldout(sync_env->cct, 0) << "AWS: removing aws object at" << path << dendl; ret = instance.get_conn(sync_env, bucket_info.bucket, &dest_conn); if (ret < 0) { -- 2.39.5