#define DEFAULT_MULTIPART_SYNC_PART_SIZE (32 * 1024 * 1024)
-// TODO: have various bucket naming schemes at a global/user and a bucket level
+static string default_target_path = "rgw-${zonegroup}-${sid}/${bucket}";
-static string aws_bucket_name(const RGWBucketInfo& bucket_info, const string& bucket_suffix, bool user_buckets=false){
- string bucket_name="rgwx" + bucket_info.zonegroup;
- if (user_buckets){
- bucket_name+=bucket_info.owner.tenant + bucket_info.owner.id;
- }
- bucket_name.erase(std::remove(bucket_name.begin(),bucket_name.end(),'-'));
- if (!bucket_suffix.empty()) {
- bucket_name = bucket_name + "-" + bucket_suffix;
+static string get_key_oid(const rgw_obj_key& key)
+{
+ string oid = key.name;
+ if (!key.instance.empty() &&
+ !key.have_null_instance()) {
+ oid += string(":") + key.instance;
}
- return bucket_name;
-}
-
-static string aws_object_name(const RGWBucketInfo& bucket_info, const rgw_obj_key&key, bool user_buckets=false){
- string object_name;
- object_name += bucket_info.owner.to_str() + "/";
- object_name += bucket_info.bucket.name + "/" + key.name;
- return object_name;
+ return oid;
}
static string obj_to_aws_path(const rgw_obj& obj)
{
- return obj.bucket.name + "/" + obj.key.name;
+ string path = obj.bucket.name + "/" + get_key_oid(obj.key);
+
+
+ return path;
}
/*
]
}
+target path optional variables:
+
+(evaluated at init)
+sid: sync instance id, randomly generated by sync process on first sync initalization
+zonegroup: zonegroup name
+zonegroup_id: zonegroup name
+zone: zone name
+zone_id: zone name
+
+(evaluated when syncing)
+bucket: bucket name
+owner: bucket owner
+
*/
struct AWSSyncConfig_Connection {
struct AWSSyncConfig_Default {
std::shared_ptr<AWSSyncConfig_Connection> conn;
+ string target_path;
void init(const JSONFormattable& config) {
conn = make_shared<AWSSyncConfig_Connection>();
conn->init(config["connection"]);
+ target_path = config["target_path"];
+ if (target_path.empty()) {
+ target_path = default_target_path;
+ }
}
void dump_conf(CephContext *cct, JSONFormatter& jf) const {
if (conn) {
conn->dump_conf(cct, jf);
}
+ encode_json("target_path", target_path, &jf);
}
};
}
};
+static void find_and_replace(const string& src, const string& find, const string& replace, string *dest)
+{
+ string s = src;
+
+ size_t pos = s.find(find);
+ while (pos != string::npos) {
+ size_t next_ofs = pos + find.size();
+ s = s.substr(0, pos) + replace + s.substr(next_ofs);
+ pos = s.find(find, next_ofs);
+ }
+
+ *dest = s;
+}
+
+static void apply_meta_param(const string& src, const string& param, const string& val, string *dest)
+{
+ string s = string("${") + param + "}";
+ find_and_replace(src, s, val, dest);
+}
+
struct AWSSyncConfig {
AWSSyncConfig_Default default_conf;
}
AWSSyncConfig() {}
- AWSSyncConfig(const AWSSyncConfig& c) : default_conf(c.default_conf),
- connections(c.connections),
- bucket_suffix(c.bucket_suffix),
- s3(c.s3) {}
int init(CephContext *cct, const JSONFormattable& config) {
if (config.exists("default")) {
return 0;
}
+ void expand_target(RGWDataSyncEnv *sync_env, const string& sid, const string& path, string *dest) {
+ apply_meta_param(path, "sid", sid, dest);
+
+ RGWZoneGroup& zg = sync_env->store->get_zonegroup();
+ apply_meta_param(path, "zonegroup", zg.get_name(), dest);
+ apply_meta_param(path, "zonegroup_id", zg.get_id(), dest);
+
+ RGWZone& zone = sync_env->store->get_zone();
+ apply_meta_param(path, "zone", zone.name, dest);
+ apply_meta_param(path, "zone_id", zone.id, dest);
+ }
+
+ void update_config(RGWDataSyncEnv *sync_env, const string& sid) {
+ expand_target(sync_env, sid, default_conf.target_path, &default_conf.target_path);
+ ldout(sync_env->cct, 20) << "updated target: (default) -> " << default_conf.target_path << dendl;
+ for (auto& t : explicit_targets) {
+ expand_target(sync_env, sid, t.second.path, &t.second.path);
+ ldout(sync_env->cct, 20) << "updated target: " << t.first << " -> " << t.second.path << dendl;
+ }
+ }
+
void dump_conf(CephContext *cct, JSONFormatter& jf) const {
Formatter::ObjectSection config(jf, "config");
default_conf.dump_conf(cct, jf);
{ // targets
Formatter::ArraySection as(jf, "targets");
- for (auto t : explicit_targets) {
+ for (auto& t : explicit_targets) {
Formatter::ObjectSection target_section(jf, "target");
encode_json("name", t.first, &jf);
t.second.dump_conf(cct, jf);
}
}
}
+
+ string get_path(const RGWBucketInfo& bucket_info,
+ const rgw_obj_key& obj) const {
+ string bucket_str;
+ string owner;
+ if (!bucket_info.owner.tenant.empty()) {
+ bucket_str = owner = bucket_info.owner.tenant + "-";
+ owner += bucket_info.owner.id;
+ }
+ bucket_str += bucket_info.bucket.name;
+ if (!bucket_suffix.empty()) {
+ bucket_str = bucket_str + "-" + bucket_suffix;
+ }
+ const Target *target{nullptr};
+ const string *path{nullptr};
+ if (find_target(bucket_info.bucket, &target)) {
+ path = &target->path;
+ }
+ if (!path || path->empty()) {
+ path = &default_conf.target_path;
+ }
+
+ string new_path;
+ apply_meta_param(*path, "bucket", bucket_str, &new_path);
+ apply_meta_param(new_path, "owner", owner, &new_path);
+
+ new_path += string("/") + get_key_oid(obj);
+
+ return new_path;
+ }
+
+ void get_target(const RGWBucketInfo& bucket_info,
+ const rgw_obj_key& obj,
+ string *bucket_name,
+ string *obj_name) const {
+ string path = get_path(bucket_info, obj);
+ size_t pos = path.find('/');
+
+ *bucket_name = path.substr(0, pos);
+ *obj_name = path.substr(pos + 1);
+ }
};
struct AWSSyncInstanceEnv {
void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) {
char buf[32];
- snprintf(buf, sizeof(buf), "s3-%llu", (unsigned long long)instance_id);
+ snprintf(buf, sizeof(buf), "%llx", (unsigned long long)instance_id);
id = buf;
+
+ conf.update_config(sync_env, id);
+
auto& conn = conf.default_conf.conn;
if (conn) {
default_conn.reset(new S3RESTConn(sync_env->cct,
int get_conn(RGWDataSyncEnv *sync_env, const rgw_bucket& bucket, RGWRESTConn **connection) const {
const AWSSyncConfig::Target *target;
- if (!conf.find_target(bucket, &target)) {
+ if (!conf.find_target(bucket, &target) ||
+ target->connection_id.empty()) {
ldout(sync_env->cct, 20) << "Couldn't find configured target connection for bucket " << bucket.name << ", using default connection" << dendl;
*connection = default_conn.get();
bufferlist res;
unordered_map <string, bool> bucket_created;
string target_bucket_name;
+ string target_obj_name;
rgw_rest_obj rest_obj;
int ret{0};
return set_cr_error(-EINVAL);
}
- target_bucket_name = aws_bucket_name(bucket_info, instance.conf.bucket_suffix);
+ instance.conf.get_target(bucket_info, key, &target_bucket_name, &target_obj_name);
ret = instance.get_conn(sync_env, bucket_info.bucket, &dest_conn);
if (ret < 0) {
if (bucket_created.find(target_bucket_name) == bucket_created.end()){
yield {
- ldout(sync_env->cct,0) << "AWS: creating bucket" << target_bucket_name << dendl;
+ ldout(sync_env->cct,0) << "AWS: creating bucket " << target_bucket_name << dendl;
bufferlist bl;
call(new RGWPutRawRESTResourceCR <bufferlist> (sync_env->cct, dest_conn,
sync_env->http_manager,
rgw_bucket target_bucket;
target_bucket.name = target_bucket_name; /* this is only possible because we only use bucket name for
uri resolution */
- rgw_obj dest_obj(target_bucket, aws_object_name(bucket_info, key));
+ rgw_obj dest_obj(target_bucket, target_obj_name);
rgw_sync_aws_src_obj_properties src_properties;
ldout(sync_env->cct, 0) << ": remove remote obj: z=" << sync_env->source_zone
<< " b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << dendl;
yield {
- string path = aws_bucket_name(bucket_info, instance.conf.bucket_suffix) + "/" + aws_object_name(bucket_info, key);
+ string path = instance.conf.get_path(bucket_info, key);
ldout(sync_env->cct, 0) << "AWS: removing aws object at" << path << dendl;
ret = instance.get_conn(sync_env, bucket_info.bucket, &dest_conn);
if (ret < 0) {